##// END OF EJS Templates
merge from graphics branch
merge from graphics branch

File last commit:

r863:2444bfa6a546
r863:2444bfa6a546
Show More
jroutils_publish.py
182 lines | 6.1 KiB | text/x-python | PythonLexer
Juan C. Valdez
Add Operation publish using MQTT
r859 '''
@author: Juan C. Espinoza
'''
import time
import json
import numpy
import paho.mqtt.client as mqtt
Juan C. Valdez
merge from graphics branch
r863 from pymongo import MongoClient
Juan C. Valdez
Add Operation publish using MQTT
r859
from schainpy.model.proc.jroproc_base import Operation
class PrettyFloat(float):
def __repr__(self):
return '%.2f' % self
Juan C. Valdez
merge from graphics branch
r863 def roundFloats(obj):
if isinstance(obj, list):
return map(roundFloats, obj)
elif isinstance(obj, float):
return round(obj, 2)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Valdez
Add Operation publish using MQTT
r859 def pretty_floats(obj):
if isinstance(obj, float):
return PrettyFloat(obj)
elif isinstance(obj, dict):
return dict((k, pretty_floats(v)) for k, v in obj.items())
elif isinstance(obj, (list, tuple)):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 return map(pretty_floats, obj)
Juan C. Valdez
Add Operation publish using MQTT
r859 return obj
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Valdez
Add Operation publish using MQTT
r859 class PublishData(Operation):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 """Clase publish."""
Juan C. Valdez
Add Operation publish using MQTT
r859 __MAXNUMX = 80
__MAXNUMY = 80
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Valdez
Add Operation publish using MQTT
r859 def __init__(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 """Inicio."""
Juan C. Valdez
Add Operation publish using MQTT
r859 Operation.__init__(self)
self.isConfig = False
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 self.client = None
def on_disconnect(self, client, userdata, rc):
Juan C. Valdez
Add Operation publish using MQTT
r859 if rc != 0:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 print("Unexpected disconnection.")
self.connect()
def connect(self):
print 'trying to connect'
Juan C. Valdez
Add Operation publish using MQTT
r859 try:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 self.client.connect(
host=self.host,
port=self.port,
keepalive=60*10,
bind_address='')
print "connected"
self.client.loop_start()
# self.client.publish(
# self.topic + 'SETUP',
# json.dumps(setup),
# retain=True
# )
Juan C. Valdez
Add Operation publish using MQTT
r859 except:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 print "MQTT Conection error."
Juan C. Valdez
Add Operation publish using MQTT
r859 self.client = False
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Valdez
merge from graphics branch
r863 def setup(self, host, port=1883, username=None, password=None, mongo=0, clientId="user", **kwargs):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Valdez
merge from graphics branch
r863 self.mongo = mongo
Juan C. Valdez
Add Operation publish using MQTT
r859 self.topic = kwargs.get('topic', 'schain')
self.delay = kwargs.get('delay', 0)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 self.plottype = kwargs.get('plottype', 'spectra')
Juan C. Valdez
Add Operation publish using MQTT
r859 self.host = host
self.port = port
Juan C. Valdez
merge from graphics branch
r863 self.clientId = clientId
Juan C. Valdez
Add Operation publish using MQTT
r859 self.cnt = 0
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 setup = []
Juan C. Valdez
merge from graphics branch
r863 if (self.mongo):
self.MongoClient = MongoClient("mongodb://127.0.0.1:3003")
self.MongoDB = self.MongoClient['meteor']
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 for plot in self.plottype:
setup.append({
'plot': plot,
'topic': self.topic + plot,
'title': getattr(self, plot + '_' + 'title', False),
'xlabel': getattr(self, plot + '_' + 'xlabel', False),
'ylabel': getattr(self, plot + '_' + 'ylabel', False),
'xrange': getattr(self, plot + '_' + 'xrange', False),
'yrange': getattr(self, plot + '_' + 'yrange', False),
'zrange': getattr(self, plot + '_' + 'zrange', False),
})
self.client = mqtt.Client(
Juan C. Valdez
merge from graphics branch
r863 client_id=self.clientId + self.topic + 'SCHAIN',
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 clean_session=True)
self.client.on_disconnect = self.on_disconnect
self.connect()
def publish_data(self, plottype):
data = getattr(self.dataOut, 'data_spc')
Juan C. Valdez
merge from graphics branch
r863 yData = self.dataOut.heightList[:2].tolist()
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 if plottype == 'spectra':
z = data/self.dataOut.normFactor
zdB = 10*numpy.log10(z)
xlen, ylen = zdB[0].shape
dx = numpy.floor(xlen/self.__MAXNUMX) + 1
dy = numpy.floor(ylen/self.__MAXNUMY) + 1
Z = [0 for i in self.dataOut.channelList]
for i in self.dataOut.channelList:
Z[i] = zdB[i][::dx, ::dy].tolist()
payload = {
'timestamp': self.dataOut.utctime,
Juan C. Valdez
merge from graphics branch
r863 'data': roundFloats(Z),
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
'interval': self.dataOut.getTimeInterval(),
Juan C. Valdez
merge from graphics branch
r863 'xRange': [0, 80],
'type': plottype,
'yData': yData
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 }
Juan C. Valdez
merge from graphics branch
r863 # print payload
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
elif plottype in ('rti', 'power'):
z = data/self.dataOut.normFactor
avg = numpy.average(z, axis=1)
avgdB = 10*numpy.log10(avg)
xlen, ylen = z[0].shape
dy = numpy.floor(ylen/self.__MAXNUMY) + 1
AVG = [0 for i in self.dataOut.channelList]
for i in self.dataOut.channelList:
AVG[i] = avgdB[i][::dy].tolist()
payload = {
'timestamp': self.dataOut.utctime,
Juan C. Valdez
merge from graphics branch
r863 'data': roundFloats(AVG),
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
'interval': self.dataOut.getTimeInterval(),
Juan C. Valdez
merge from graphics branch
r863 'xRange': [0, 80],
'type': plottype,
'yData': yData
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 }
elif plottype == 'noise':
noise = self.dataOut.getNoise()/self.dataOut.normFactor
noisedB = 10*numpy.log10(noise)
payload = {
'timestamp': self.dataOut.utctime,
Juan C. Valdez
merge from graphics branch
r863 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
'interval': self.dataOut.getTimeInterval(),
Juan C. Valdez
merge from graphics branch
r863 'xRange': [0, 80],
'type': plottype,
'yData': yData
}
else:
print "Tipo de grafico invalido"
payload = {
'data': 'None',
'timestamp': 'None',
'type': None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 }
print 'Publishing data to {}'.format(self.host)
print '*************************'
Juan C. Valdez
merge from graphics branch
r863 print self.client
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 self.client.publish(self.topic + plottype, json.dumps(payload), qos=0)
Juan C. Valdez
merge from graphics branch
r863 if (self.mongo):
print 'Publishing data to Mongo'
result = self.MongoDB.realtime.insert_one(payload)
print result.inserted_id
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
def run(self, dataOut, host, **kwargs):
self.dataOut = dataOut
if not self.isConfig:
Juan C. Valdez
Add Operation publish using MQTT
r859 self.setup(host, **kwargs)
self.isConfig = True
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
map(self.publish_data, self.plottype)
time.sleep(self.delay)
Juan C. Valdez
Add Operation publish using MQTT
r859 def close(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 if self.client:
self.client.loop_stop()
self.client.disconnect()