##// END OF EJS Templates
Add WindProfiler support, multiSchain by_day
Add WindProfiler support, multiSchain by_day

File last commit:

r923:872a2100b6b2
r923:872a2100b6b2
Show More
jroutils_publish.py
445 lines | 15.7 KiB | text/x-python | PythonLexer
Juan C. Valdez
Add Operation publish using MQTT
r859 '''
@author: Juan C. Espinoza
'''
import time
import json
import numpy
import paho.mqtt.client as mqtt
Juan C. Valdez
zmq support in PublishData
r883 import zmq
import cPickle as pickle
import datetime
from zmq.utils.monitor import recv_monitor_message
from functools import wraps
from threading import Thread
from multiprocessing import Process
Juan C. Valdez
Add Operation publish using MQTT
r859
Juan C. Valdez
zmq support in PublishData
r883 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
ReceiverData Operation, test PlotData
r889 MAXNUMX = 100
MAXNUMY = 100
Juan C. Valdez
Add Operation publish using MQTT
r859
class PrettyFloat(float):
def __repr__(self):
return '%.2f' % self
Juan C. Valdez
merge from graphics branch
r863 def roundFloats(obj):
if isinstance(obj, list):
return map(roundFloats, obj)
elif isinstance(obj, float):
return round(obj, 2)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
José Chávez
receiver data modificado para web
r904 def decimate(z):
# dx = int(len(self.x)/self.__MAXNUMX) + 1
Fix SendToWeb operation
r911
José Chávez
receiver data modificado para web
r904 dy = int(len(z[0])/MAXNUMY) + 1
Fix SendToWeb operation
r911
José Chávez
receiver data modificado para web
r904 return z[::, ::dy]
Juan C. Valdez
Add Operation publish using MQTT
r859
Juan C. Valdez
zmq support in PublishData
r883 class throttle(object):
"""Decorator that prevents a function from being called more than once every
time period.
To create a function that cannot be called more than once a minute, but
will sleep until it can be called:
@throttle(minutes=1)
def foo():
pass
for i in range(10):
foo()
print "This function has run %s times." % i
"""
def __init__(self, seconds=0, minutes=0, hours=0):
self.throttle_period = datetime.timedelta(
seconds=seconds, minutes=minutes, hours=hours
)
José Chávez
funcionando todo
r898
Juan C. Valdez
zmq support in PublishData
r883 self.time_of_last_call = datetime.datetime.min
def __call__(self, fn):
@wraps(fn)
def wrapper(*args, **kwargs):
now = datetime.datetime.now()
time_since_last_call = now - self.time_of_last_call
time_left = self.throttle_period - time_since_last_call
if time_left > datetime.timedelta(seconds=0):
return
self.time_of_last_call = datetime.datetime.now()
return fn(*args, **kwargs)
return wrapper
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Valdez
Add Operation publish using MQTT
r859 class PublishData(Operation):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 """Clase publish."""
Juan C. Valdez
zmq support in PublishData
r883 def __init__(self, **kwargs):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 """Inicio."""
Juan C. Valdez
zmq support in PublishData
r883 Operation.__init__(self, **kwargs)
Juan C. Valdez
Add Operation publish using MQTT
r859 self.isConfig = False
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 self.client = None
Juan C. Valdez
zmq support in PublishData
r883 self.zeromq = None
self.mqtt = None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
def on_disconnect(self, client, userdata, rc):
Juan C. Valdez
Add Operation publish using MQTT
r859 if rc != 0:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 print("Unexpected disconnection.")
self.connect()
def connect(self):
print 'trying to connect'
Juan C. Valdez
Add Operation publish using MQTT
r859 try:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 self.client.connect(
host=self.host,
port=self.port,
keepalive=60*10,
bind_address='')
self.client.loop_start()
# self.client.publish(
# self.topic + 'SETUP',
# json.dumps(setup),
# retain=True
# )
Juan C. Valdez
Add Operation publish using MQTT
r859 except:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 print "MQTT Conection error."
Juan C. Valdez
Add Operation publish using MQTT
r859 self.client = False
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Valdez
zmq support in PublishData
r883 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
self.counter = 0
Juan C. Valdez
Add Operation publish using MQTT
r859 self.topic = kwargs.get('topic', 'schain')
self.delay = kwargs.get('delay', 0)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 self.plottype = kwargs.get('plottype', 'spectra')
Juan C. Valdez
zmq support in PublishData
r883 self.host = kwargs.get('host', "10.10.10.82")
self.port = kwargs.get('port', 3000)
Juan C. Valdez
merge from graphics branch
r863 self.clientId = clientId
Juan C. Valdez
Add Operation publish using MQTT
r859 self.cnt = 0
Juan C. Valdez
zmq support in PublishData
r883 self.zeromq = zeromq
self.mqtt = kwargs.get('plottype', 0)
self.client = None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 setup = []
Juan C. Valdez
zmq support in PublishData
r883 if mqtt is 1:
self.client = mqtt.Client(
client_id=self.clientId + self.topic + 'SCHAIN',
clean_session=True)
self.client.on_disconnect = self.on_disconnect
self.connect()
for plot in self.plottype:
setup.append({
'plot': plot,
'topic': self.topic + plot,
'title': getattr(self, plot + '_' + 'title', False),
'xlabel': getattr(self, plot + '_' + 'xlabel', False),
'ylabel': getattr(self, plot + '_' + 'ylabel', False),
'xrange': getattr(self, plot + '_' + 'xrange', False),
'yrange': getattr(self, plot + '_' + 'yrange', False),
'zrange': getattr(self, plot + '_' + 'zrange', False),
})
if zeromq is 1:
context = zmq.Context()
self.zmq_socket = context.socket(zmq.PUSH)
server = kwargs.get('server', 'zmq.pipe')
ReceiverData Operation, test PlotData
r889
Juan C. Valdez
fix zmq protocol
r886 if 'tcp://' in server:
Juan C. Valdez
zmq support in PublishData
r883 address = server
else:
address = 'ipc:///tmp/%s' % server
ReceiverData Operation, test PlotData
r889
Juan C. Valdez
zmq support in PublishData
r883 self.zmq_socket.connect(address)
time.sleep(1)
def publish_data(self):
self.dataOut.finished = False
if self.mqtt is 1:
yData = self.dataOut.heightList[:2].tolist()
if self.plottype == 'spectra':
data = getattr(self.dataOut, 'data_spc')
z = data/self.dataOut.normFactor
zdB = 10*numpy.log10(z)
xlen, ylen = zdB[0].shape
ReceiverData Operation, test PlotData
r889 dx = int(xlen/MAXNUMX) + 1
dy = int(ylen/MAXNUMY) + 1
Juan C. Valdez
zmq support in PublishData
r883 Z = [0 for i in self.dataOut.channelList]
for i in self.dataOut.channelList:
Z[i] = zdB[i][::dx, ::dy].tolist()
payload = {
'timestamp': self.dataOut.utctime,
'data': roundFloats(Z),
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
'interval': self.dataOut.getTimeInterval(),
'type': self.plottype,
'yData': yData
}
# print payload
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Valdez
zmq support in PublishData
r883 elif self.plottype in ('rti', 'power'):
data = getattr(self.dataOut, 'data_spc')
z = data/self.dataOut.normFactor
avg = numpy.average(z, axis=1)
avgdB = 10*numpy.log10(avg)
xlen, ylen = z[0].shape
dy = numpy.floor(ylen/self.__MAXNUMY) + 1
AVG = [0 for i in self.dataOut.channelList]
for i in self.dataOut.channelList:
AVG[i] = avgdB[i][::dy].tolist()
payload = {
'timestamp': self.dataOut.utctime,
'data': roundFloats(AVG),
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
'interval': self.dataOut.getTimeInterval(),
'type': self.plottype,
'yData': yData
}
elif self.plottype == 'noise':
noise = self.dataOut.getNoise()/self.dataOut.normFactor
noisedB = 10*numpy.log10(noise)
payload = {
'timestamp': self.dataOut.utctime,
'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
'interval': self.dataOut.getTimeInterval(),
'type': self.plottype,
'yData': yData
}
elif self.plottype == 'snr':
data = getattr(self.dataOut, 'data_SNR')
avgdB = 10*numpy.log10(data)
ylen = data[0].size
dy = numpy.floor(ylen/self.__MAXNUMY) + 1
AVG = [0 for i in self.dataOut.channelList]
for i in self.dataOut.channelList:
AVG[i] = avgdB[i][::dy].tolist()
payload = {
'timestamp': self.dataOut.utctime,
'data': roundFloats(AVG),
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
'type': self.plottype,
'yData': yData
}
else:
print "Tipo de grafico invalido"
payload = {
'data': 'None',
'timestamp': 'None',
'type': None
}
# print 'Publishing data to {}'.format(self.host)
self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
if self.zeromq is 1:
ReceiverData Operation, test PlotData
r889 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
Juan C. Valdez
zmq support in PublishData
r883 self.zmq_socket.send_pyobj(self.dataOut)
def run(self, dataOut, **kwargs):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 self.dataOut = dataOut
if not self.isConfig:
Juan C. Valdez
zmq support in PublishData
r883 self.setup(**kwargs)
Juan C. Valdez
Add Operation publish using MQTT
r859 self.isConfig = True
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Valdez
zmq support in PublishData
r883 self.publish_data()
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 time.sleep(self.delay)
Juan C. Valdez
Add Operation publish using MQTT
r859 def close(self):
Juan C. Valdez
zmq support in PublishData
r883 if self.zeromq is 1:
self.dataOut.finished = True
Add WindProfiler support, multiSchain by_day
r923 # self.zmq_socket.send_pyobj(self.dataOut) CHECK IT!!!
Juan C. Valdez
zmq support in PublishData
r883
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 if self.client:
self.client.loop_stop()
self.client.disconnect()
ReceiverData Operation, test PlotData
r889
class ReceiverData(ProcessingUnit, Process):
José Chávez
funcionando todo
r898 throttle_value = 5
ReceiverData Operation, test PlotData
r889 def __init__(self, **kwargs):
ProcessingUnit.__init__(self, **kwargs)
Process.__init__(self)
self.mp = False
self.isConfig = False
Juan C. Espinoza
Update version, fix kwargs for self operations (methods), Add SendToWeb...
r906 self.isWebConfig = False
ReceiverData Operation, test PlotData
r889 self.plottypes =[]
self.connections = 0
server = kwargs.get('server', 'zmq.pipe')
Juan C. Espinoza
Update version, fix kwargs for self operations (methods), Add SendToWeb...
r906 plot_server = kwargs.get('plot_server', 'zmq.web')
ReceiverData Operation, test PlotData
r889 if 'tcp://' in server:
address = server
else:
address = 'ipc:///tmp/%s' % server
Juan C. Espinoza
Update version, fix kwargs for self operations (methods), Add SendToWeb...
r906 if 'tcp://' in plot_server:
plot_address = plot_server
else:
plot_address = 'ipc:///tmp/%s' % plot_server
ReceiverData Operation, test PlotData
r889 self.address = address
Juan C. Espinoza
Update version, fix kwargs for self operations (methods), Add SendToWeb...
r906 self.plot_address = plot_address
ReceiverData Operation, test PlotData
r889 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
self.realtime = kwargs.get('realtime', False)
Fix all PlotData, add SpectraMean, CrossSpectra plots, now Parameters extends Spectra fix bugs in ParametersProc
r922 self.throttle_value = kwargs.get('throttle', 5)
José Chávez
funcionando todo
r898 self.sendData = self.initThrottle(self.throttle_value)
ReceiverData Operation, test PlotData
r889 self.setup()
def setup(self):
self.data = {}
self.data['times'] = []
for plottype in self.plottypes:
self.data[plottype] = {}
add multiSchain (@jchavez)
r892 self.data['noise'] = {}
José Chávez
funcionando todo
r898 self.data['throttle'] = self.throttle_value
self.data['ENDED'] = False
ReceiverData Operation, test PlotData
r889 self.isConfig = True
José Chávez
agregado para realtime, sin funcionar
r902 self.data_web = {}
ReceiverData Operation, test PlotData
r889
def event_monitor(self, monitor):
events = {}
for name in dir(zmq):
if name.startswith('EVENT_'):
value = getattr(zmq, name)
events[value] = name
while monitor.poll():
evt = recv_monitor_message(monitor)
if evt['event'] == 32:
self.connections += 1
if evt['event'] == 512:
pass
if self.connections == 0 and self.started is True:
self.ended = True
Add WindProfiler support, multiSchain by_day
r923
ReceiverData Operation, test PlotData
r889 evt.update({'description': events[evt['event']]})
if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
break
monitor.close()
José Chávez
funcionando en mi compu
r900 print("event monitor thread done!")
ReceiverData Operation, test PlotData
r889
José Chávez
funcionando todo
r898 def initThrottle(self, throttle_value):
@throttle(seconds=throttle_value)
def sendDataThrottled(fn_sender, data):
fn_sender(data)
return sendDataThrottled
ReceiverData Operation, test PlotData
r889
def send(self, data):
José Chávez
receiver data modificado para web
r904 # print '[sending] data=%s size=%s' % (data.keys(), len(data['times']))
ReceiverData Operation, test PlotData
r889 self.sender.send_pyobj(data)
def update(self):
Add WindProfiler support, multiSchain by_day
r923
José Chávez
utctime
r920 t = self.dataOut.utctime
Add WindProfiler support, multiSchain by_day
r923
if t in self.data['times']:
return
ReceiverData Operation, test PlotData
r889 self.data['times'].append(t)
self.data['dataOut'] = self.dataOut
Add WindProfiler support, multiSchain by_day
r923
ReceiverData Operation, test PlotData
r889 for plottype in self.plottypes:
if plottype == 'spc':
z = self.dataOut.data_spc/self.dataOut.normFactor
add multiSchain (@jchavez)
r892 self.data[plottype] = 10*numpy.log10(z)
self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor)
Fix all PlotData, add SpectraMean, CrossSpectra plots, now Parameters extends Spectra fix bugs in ParametersProc
r922 if plottype == 'cspc':
jcoherence = self.dataOut.data_cspc/numpy.sqrt(self.dataOut.data_spc*self.dataOut.data_spc)
self.data['cspc_coh'] = numpy.abs(jcoherence)
self.data['cspc_phase'] = numpy.arctan2(jcoherence.imag, jcoherence.real)*180/numpy.pi
ReceiverData Operation, test PlotData
r889 if plottype == 'rti':
self.data[plottype][t] = self.dataOut.getPower()
if plottype == 'snr':
self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR)
if plottype == 'dop':
self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP)
Fix all PlotData, add SpectraMean, CrossSpectra plots, now Parameters extends Spectra fix bugs in ParametersProc
r922 if plottype == 'mean':
self.data[plottype][t] = self.dataOut.data_MEAN
if plottype == 'std':
self.data[plottype][t] = self.dataOut.data_STD
ReceiverData Operation, test PlotData
r889 if plottype == 'coh':
self.data[plottype][t] = self.dataOut.getCoherence()
if plottype == 'phase':
self.data[plottype][t] = self.dataOut.getCoherence(phase=True)
Add WindProfiler support, multiSchain by_day
r923 if plottype == 'wind':
self.data[plottype][t] = self.dataOut.data_output
José Chávez
agregado para realtime, sin funcionar
r902 if self.realtime:
Add WindProfiler support, multiSchain by_day
r923 self.data_web['timestamp'] = t
Fix SendToWeb operation
r911 if plottype == 'spc':
self.data_web[plottype] = roundFloats(decimate(self.data[plottype]).tolist())
Fix all PlotData, add SpectraMean, CrossSpectra plots, now Parameters extends Spectra fix bugs in ParametersProc
r922 elif plottype == 'cspc':
self.data_web['cspc_coh'] = roundFloats(decimate(self.data['cspc_coh']).tolist())
self.data_web['cspc_phase'] = roundFloats(decimate(self.data['cspc_phase']).tolist())
elif plottype == 'noise':
self.data_web['noise'] = roundFloats(self.data['noise'][t].tolist())
Fix SendToWeb operation
r911 else:
self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist())
José Chávez
cambios en como se envia la data a la web
r909 self.data_web['interval'] = self.dataOut.getTimeInterval()
self.data_web['type'] = plottype
Merge remote into local...
r912
ReceiverData Operation, test PlotData
r889 def run(self):
print '[Starting] {} from {}'.format(self.name, self.address)
self.context = zmq.Context()
self.receiver = self.context.socket(zmq.PULL)
self.receiver.bind(self.address)
monitor = self.receiver.get_monitor_socket()
self.sender = self.context.socket(zmq.PUB)
Fix SendToWeb operation
r911 if self.realtime:
self.sender_web = self.context.socket(zmq.PUB)
cambio bind x connect in ReceiverData
r916 self.sender_web.connect(self.plot_address)
time.sleep(1)
ReceiverData Operation, test PlotData
r889 self.sender.bind("ipc:///tmp/zmq.plots")
José Chávez
receiver data modificado para web
r904 t = Thread(target=self.event_monitor, args=(monitor,))
ReceiverData Operation, test PlotData
r889 t.start()
while True:
self.dataOut = self.receiver.recv_pyobj()
José Chávez
funcionando todo
r898 # print '[Receiving] {} - {}'.format(self.dataOut.type,
# self.dataOut.datatime.ctime())
ReceiverData Operation, test PlotData
r889
self.update()
if self.dataOut.finished is True:
self.send(self.data)
self.connections -= 1
Merge branch 'schain_mp' of http://10.10.110.76/rhodecode/schain into schain_mp
r893 if self.connections == 0 and self.started:
ReceiverData Operation, test PlotData
r889 self.ended = True
self.data['ENDED'] = True
self.send(self.data)
self.setup()
else:
if self.realtime:
self.send(self.data)
José Chávez
receiver data modificado para web
r904 self.sender_web.send_string(json.dumps(self.data_web))
ReceiverData Operation, test PlotData
r889 else:
José Chávez
funcionando todo
r898 self.sendData(self.send, self.data)
ReceiverData Operation, test PlotData
r889 self.started = True
return
Fix SendToWeb operation
r911
Juan C. Espinoza
Update version, fix kwargs for self operations (methods), Add SendToWeb...
r906 def sendToWeb(self):
Fix SendToWeb operation
r911
Juan C. Espinoza
Update version, fix kwargs for self operations (methods), Add SendToWeb...
r906 if not self.isWebConfig:
context = zmq.Context()
sender_web_config = context.socket(zmq.PUB)
if 'tcp://' in self.plot_address:
dum, address, port = self.plot_address.split(':')
conf_address = '{}:{}:{}'.format(dum, address, int(port)+1)
else:
conf_address = self.plot_address + '.config'
Fix SendToWeb operation
r911 sender_web_config.bind(conf_address)
time.sleep(1)
Juan C. Espinoza
Update version, fix kwargs for self operations (methods), Add SendToWeb...
r906 for kwargs in self.operationKwargs.values():
if 'plot' in kwargs:
Fix SendToWeb operation
r911 print '[Sending] Config data to web for {}'.format(kwargs['code'].upper())
Juan C. Espinoza
Update version, fix kwargs for self operations (methods), Add SendToWeb...
r906 sender_web_config.send_string(json.dumps(kwargs))
self.isWebConfig = True