jroutils_publish.py
445 lines
| 15.7 KiB
| text/x-python
|
PythonLexer
|
r859 | ''' | ||
@author: Juan C. Espinoza | ||||
''' | ||||
import time | ||||
import json | ||||
import numpy | ||||
import paho.mqtt.client as mqtt | ||||
|
r883 | import zmq | ||
import cPickle as pickle | ||||
import datetime | ||||
from zmq.utils.monitor import recv_monitor_message | ||||
from functools import wraps | ||||
from threading import Thread | ||||
from multiprocessing import Process | ||||
|
r859 | |||
|
r883 | from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit | ||
r889 | MAXNUMX = 100 | |||
MAXNUMY = 100 | ||||
|
r859 | |||
class PrettyFloat(float): | ||||
def __repr__(self): | ||||
return '%.2f' % self | ||||
|
r863 | def roundFloats(obj): | ||
if isinstance(obj, list): | ||||
return map(roundFloats, obj) | ||||
elif isinstance(obj, float): | ||||
return round(obj, 2) | ||||
|
r860 | |||
|
r904 | def decimate(z): | ||
# dx = int(len(self.x)/self.__MAXNUMX) + 1 | ||||
r911 | ||||
|
r904 | dy = int(len(z[0])/MAXNUMY) + 1 | ||
r911 | ||||
|
r904 | return z[::, ::dy] | ||
|
r859 | |||
|
r883 | class throttle(object): | ||
"""Decorator that prevents a function from being called more than once every | ||||
time period. | ||||
To create a function that cannot be called more than once a minute, but | ||||
will sleep until it can be called: | ||||
@throttle(minutes=1) | ||||
def foo(): | ||||
pass | ||||
for i in range(10): | ||||
foo() | ||||
print "This function has run %s times." % i | ||||
""" | ||||
def __init__(self, seconds=0, minutes=0, hours=0): | ||||
self.throttle_period = datetime.timedelta( | ||||
seconds=seconds, minutes=minutes, hours=hours | ||||
) | ||||
|
r898 | |||
|
r883 | self.time_of_last_call = datetime.datetime.min | ||
def __call__(self, fn): | ||||
@wraps(fn) | ||||
def wrapper(*args, **kwargs): | ||||
now = datetime.datetime.now() | ||||
time_since_last_call = now - self.time_of_last_call | ||||
time_left = self.throttle_period - time_since_last_call | ||||
if time_left > datetime.timedelta(seconds=0): | ||||
return | ||||
self.time_of_last_call = datetime.datetime.now() | ||||
return fn(*args, **kwargs) | ||||
return wrapper | ||||
|
r860 | |||
|
r859 | class PublishData(Operation): | ||
|
r860 | """Clase publish.""" | ||
|
r883 | def __init__(self, **kwargs): | ||
|
r860 | """Inicio.""" | ||
|
r883 | Operation.__init__(self, **kwargs) | ||
|
r859 | self.isConfig = False | ||
|
r860 | self.client = None | ||
|
r883 | self.zeromq = None | ||
self.mqtt = None | ||||
|
r860 | |||
def on_disconnect(self, client, userdata, rc): | ||||
|
r859 | if rc != 0: | ||
|
r860 | print("Unexpected disconnection.") | ||
self.connect() | ||||
def connect(self): | ||||
print 'trying to connect' | ||||
|
r859 | try: | ||
|
r860 | self.client.connect( | ||
host=self.host, | ||||
port=self.port, | ||||
keepalive=60*10, | ||||
bind_address='') | ||||
self.client.loop_start() | ||||
# self.client.publish( | ||||
# self.topic + 'SETUP', | ||||
# json.dumps(setup), | ||||
# retain=True | ||||
# ) | ||||
|
r859 | except: | ||
|
r860 | print "MQTT Conection error." | ||
|
r859 | self.client = False | ||
|
r860 | |||
|
r883 | def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs): | ||
self.counter = 0 | ||||
|
r859 | self.topic = kwargs.get('topic', 'schain') | ||
self.delay = kwargs.get('delay', 0) | ||||
|
r860 | self.plottype = kwargs.get('plottype', 'spectra') | ||
|
r883 | self.host = kwargs.get('host', "10.10.10.82") | ||
self.port = kwargs.get('port', 3000) | ||||
|
r863 | self.clientId = clientId | ||
|
r859 | self.cnt = 0 | ||
|
r883 | self.zeromq = zeromq | ||
self.mqtt = kwargs.get('plottype', 0) | ||||
self.client = None | ||||
|
r860 | setup = [] | ||
|
r883 | if mqtt is 1: | ||
self.client = mqtt.Client( | ||||
client_id=self.clientId + self.topic + 'SCHAIN', | ||||
clean_session=True) | ||||
self.client.on_disconnect = self.on_disconnect | ||||
self.connect() | ||||
for plot in self.plottype: | ||||
setup.append({ | ||||
'plot': plot, | ||||
'topic': self.topic + plot, | ||||
'title': getattr(self, plot + '_' + 'title', False), | ||||
'xlabel': getattr(self, plot + '_' + 'xlabel', False), | ||||
'ylabel': getattr(self, plot + '_' + 'ylabel', False), | ||||
'xrange': getattr(self, plot + '_' + 'xrange', False), | ||||
'yrange': getattr(self, plot + '_' + 'yrange', False), | ||||
'zrange': getattr(self, plot + '_' + 'zrange', False), | ||||
}) | ||||
if zeromq is 1: | ||||
context = zmq.Context() | ||||
self.zmq_socket = context.socket(zmq.PUSH) | ||||
server = kwargs.get('server', 'zmq.pipe') | ||||
r889 | ||||
|
r886 | if 'tcp://' in server: | ||
|
r883 | address = server | ||
else: | ||||
address = 'ipc:///tmp/%s' % server | ||||
r889 | ||||
|
r883 | self.zmq_socket.connect(address) | ||
time.sleep(1) | ||||
def publish_data(self): | ||||
self.dataOut.finished = False | ||||
if self.mqtt is 1: | ||||
yData = self.dataOut.heightList[:2].tolist() | ||||
if self.plottype == 'spectra': | ||||
data = getattr(self.dataOut, 'data_spc') | ||||
z = data/self.dataOut.normFactor | ||||
zdB = 10*numpy.log10(z) | ||||
xlen, ylen = zdB[0].shape | ||||
r889 | dx = int(xlen/MAXNUMX) + 1 | |||
dy = int(ylen/MAXNUMY) + 1 | ||||
|
r883 | Z = [0 for i in self.dataOut.channelList] | ||
for i in self.dataOut.channelList: | ||||
Z[i] = zdB[i][::dx, ::dy].tolist() | ||||
payload = { | ||||
'timestamp': self.dataOut.utctime, | ||||
'data': roundFloats(Z), | ||||
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList], | ||||
'interval': self.dataOut.getTimeInterval(), | ||||
'type': self.plottype, | ||||
'yData': yData | ||||
} | ||||
# print payload | ||||
|
r860 | |||
|
r883 | elif self.plottype in ('rti', 'power'): | ||
data = getattr(self.dataOut, 'data_spc') | ||||
z = data/self.dataOut.normFactor | ||||
avg = numpy.average(z, axis=1) | ||||
avgdB = 10*numpy.log10(avg) | ||||
xlen, ylen = z[0].shape | ||||
dy = numpy.floor(ylen/self.__MAXNUMY) + 1 | ||||
AVG = [0 for i in self.dataOut.channelList] | ||||
for i in self.dataOut.channelList: | ||||
AVG[i] = avgdB[i][::dy].tolist() | ||||
payload = { | ||||
'timestamp': self.dataOut.utctime, | ||||
'data': roundFloats(AVG), | ||||
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList], | ||||
'interval': self.dataOut.getTimeInterval(), | ||||
'type': self.plottype, | ||||
'yData': yData | ||||
} | ||||
elif self.plottype == 'noise': | ||||
noise = self.dataOut.getNoise()/self.dataOut.normFactor | ||||
noisedB = 10*numpy.log10(noise) | ||||
payload = { | ||||
'timestamp': self.dataOut.utctime, | ||||
'data': roundFloats(noisedB.reshape(-1, 1).tolist()), | ||||
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList], | ||||
'interval': self.dataOut.getTimeInterval(), | ||||
'type': self.plottype, | ||||
'yData': yData | ||||
} | ||||
elif self.plottype == 'snr': | ||||
data = getattr(self.dataOut, 'data_SNR') | ||||
avgdB = 10*numpy.log10(data) | ||||
ylen = data[0].size | ||||
dy = numpy.floor(ylen/self.__MAXNUMY) + 1 | ||||
AVG = [0 for i in self.dataOut.channelList] | ||||
for i in self.dataOut.channelList: | ||||
AVG[i] = avgdB[i][::dy].tolist() | ||||
payload = { | ||||
'timestamp': self.dataOut.utctime, | ||||
'data': roundFloats(AVG), | ||||
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList], | ||||
'type': self.plottype, | ||||
'yData': yData | ||||
} | ||||
else: | ||||
print "Tipo de grafico invalido" | ||||
payload = { | ||||
'data': 'None', | ||||
'timestamp': 'None', | ||||
'type': None | ||||
} | ||||
# print 'Publishing data to {}'.format(self.host) | ||||
self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0) | ||||
if self.zeromq is 1: | ||||
r889 | print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime) | |||
|
r883 | self.zmq_socket.send_pyobj(self.dataOut) | ||
def run(self, dataOut, **kwargs): | ||||
|
r860 | self.dataOut = dataOut | ||
if not self.isConfig: | ||||
|
r883 | self.setup(**kwargs) | ||
|
r859 | self.isConfig = True | ||
|
r860 | |||
|
r883 | self.publish_data() | ||
|
r860 | time.sleep(self.delay) | ||
|
r859 | def close(self): | ||
|
r883 | if self.zeromq is 1: | ||
self.dataOut.finished = True | ||||
r923 | # self.zmq_socket.send_pyobj(self.dataOut) CHECK IT!!! | |||
|
r883 | |||
|
r860 | if self.client: | ||
self.client.loop_stop() | ||||
self.client.disconnect() | ||||
r889 | ||||
class ReceiverData(ProcessingUnit, Process): | ||||
|
r898 | throttle_value = 5 | ||
r889 | def __init__(self, **kwargs): | |||
ProcessingUnit.__init__(self, **kwargs) | ||||
Process.__init__(self) | ||||
self.mp = False | ||||
self.isConfig = False | ||||
|
r906 | self.isWebConfig = False | ||
r889 | self.plottypes =[] | |||
self.connections = 0 | ||||
server = kwargs.get('server', 'zmq.pipe') | ||||
|
r906 | plot_server = kwargs.get('plot_server', 'zmq.web') | ||
r889 | if 'tcp://' in server: | |||
address = server | ||||
else: | ||||
address = 'ipc:///tmp/%s' % server | ||||
|
r906 | if 'tcp://' in plot_server: | ||
plot_address = plot_server | ||||
else: | ||||
plot_address = 'ipc:///tmp/%s' % plot_server | ||||
r889 | self.address = address | |||
|
r906 | self.plot_address = plot_address | ||
r889 | self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')] | |||
self.realtime = kwargs.get('realtime', False) | ||||
r922 | self.throttle_value = kwargs.get('throttle', 5) | |||
|
r898 | self.sendData = self.initThrottle(self.throttle_value) | ||
r889 | self.setup() | |||
def setup(self): | ||||
self.data = {} | ||||
self.data['times'] = [] | ||||
for plottype in self.plottypes: | ||||
self.data[plottype] = {} | ||||
r892 | self.data['noise'] = {} | |||
|
r898 | self.data['throttle'] = self.throttle_value | ||
self.data['ENDED'] = False | ||||
r889 | self.isConfig = True | |||
|
r902 | self.data_web = {} | ||
r889 | ||||
def event_monitor(self, monitor): | ||||
events = {} | ||||
for name in dir(zmq): | ||||
if name.startswith('EVENT_'): | ||||
value = getattr(zmq, name) | ||||
events[value] = name | ||||
while monitor.poll(): | ||||
evt = recv_monitor_message(monitor) | ||||
if evt['event'] == 32: | ||||
self.connections += 1 | ||||
if evt['event'] == 512: | ||||
pass | ||||
if self.connections == 0 and self.started is True: | ||||
self.ended = True | ||||
r923 | ||||
r889 | evt.update({'description': events[evt['event']]}) | |||
if evt['event'] == zmq.EVENT_MONITOR_STOPPED: | ||||
break | ||||
monitor.close() | ||||
|
r900 | print("event monitor thread done!") | ||
r889 | ||||
|
r898 | def initThrottle(self, throttle_value): | ||
@throttle(seconds=throttle_value) | ||||
def sendDataThrottled(fn_sender, data): | ||||
fn_sender(data) | ||||
return sendDataThrottled | ||||
r889 | ||||
def send(self, data): | ||||
|
r904 | # print '[sending] data=%s size=%s' % (data.keys(), len(data['times'])) | ||
r889 | self.sender.send_pyobj(data) | |||
def update(self): | ||||
r923 | ||||
|
r920 | t = self.dataOut.utctime | ||
r923 | ||||
if t in self.data['times']: | ||||
return | ||||
r889 | self.data['times'].append(t) | |||
self.data['dataOut'] = self.dataOut | ||||
r923 | ||||
r889 | for plottype in self.plottypes: | |||
if plottype == 'spc': | ||||
z = self.dataOut.data_spc/self.dataOut.normFactor | ||||
r892 | self.data[plottype] = 10*numpy.log10(z) | |||
self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor) | ||||
r922 | if plottype == 'cspc': | |||
jcoherence = self.dataOut.data_cspc/numpy.sqrt(self.dataOut.data_spc*self.dataOut.data_spc) | ||||
self.data['cspc_coh'] = numpy.abs(jcoherence) | ||||
self.data['cspc_phase'] = numpy.arctan2(jcoherence.imag, jcoherence.real)*180/numpy.pi | ||||
r889 | if plottype == 'rti': | |||
self.data[plottype][t] = self.dataOut.getPower() | ||||
if plottype == 'snr': | ||||
self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR) | ||||
if plottype == 'dop': | ||||
self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP) | ||||
r922 | if plottype == 'mean': | |||
self.data[plottype][t] = self.dataOut.data_MEAN | ||||
if plottype == 'std': | ||||
self.data[plottype][t] = self.dataOut.data_STD | ||||
r889 | if plottype == 'coh': | |||
self.data[plottype][t] = self.dataOut.getCoherence() | ||||
if plottype == 'phase': | ||||
self.data[plottype][t] = self.dataOut.getCoherence(phase=True) | ||||
r923 | if plottype == 'wind': | |||
self.data[plottype][t] = self.dataOut.data_output | ||||
|
r902 | if self.realtime: | ||
r923 | self.data_web['timestamp'] = t | |||
r911 | if plottype == 'spc': | |||
self.data_web[plottype] = roundFloats(decimate(self.data[plottype]).tolist()) | ||||
r922 | elif plottype == 'cspc': | |||
self.data_web['cspc_coh'] = roundFloats(decimate(self.data['cspc_coh']).tolist()) | ||||
self.data_web['cspc_phase'] = roundFloats(decimate(self.data['cspc_phase']).tolist()) | ||||
elif plottype == 'noise': | ||||
self.data_web['noise'] = roundFloats(self.data['noise'][t].tolist()) | ||||
r911 | else: | |||
self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist()) | ||||
|
r909 | self.data_web['interval'] = self.dataOut.getTimeInterval() | ||
self.data_web['type'] = plottype | ||||
r912 | ||||
r889 | def run(self): | |||
print '[Starting] {} from {}'.format(self.name, self.address) | ||||
self.context = zmq.Context() | ||||
self.receiver = self.context.socket(zmq.PULL) | ||||
self.receiver.bind(self.address) | ||||
monitor = self.receiver.get_monitor_socket() | ||||
self.sender = self.context.socket(zmq.PUB) | ||||
r911 | if self.realtime: | |||
self.sender_web = self.context.socket(zmq.PUB) | ||||
r916 | self.sender_web.connect(self.plot_address) | |||
time.sleep(1) | ||||
r889 | self.sender.bind("ipc:///tmp/zmq.plots") | |||
|
r904 | t = Thread(target=self.event_monitor, args=(monitor,)) | ||
r889 | t.start() | |||
while True: | ||||
self.dataOut = self.receiver.recv_pyobj() | ||||
|
r898 | # print '[Receiving] {} - {}'.format(self.dataOut.type, | ||
# self.dataOut.datatime.ctime()) | ||||
r889 | ||||
self.update() | ||||
if self.dataOut.finished is True: | ||||
self.send(self.data) | ||||
self.connections -= 1 | ||||
r893 | if self.connections == 0 and self.started: | |||
r889 | self.ended = True | |||
self.data['ENDED'] = True | ||||
self.send(self.data) | ||||
self.setup() | ||||
else: | ||||
if self.realtime: | ||||
self.send(self.data) | ||||
|
r904 | self.sender_web.send_string(json.dumps(self.data_web)) | ||
r889 | else: | |||
|
r898 | self.sendData(self.send, self.data) | ||
r889 | self.started = True | |||
return | ||||
r911 | ||||
|
r906 | def sendToWeb(self): | ||
r911 | ||||
|
r906 | if not self.isWebConfig: | ||
context = zmq.Context() | ||||
sender_web_config = context.socket(zmq.PUB) | ||||
if 'tcp://' in self.plot_address: | ||||
dum, address, port = self.plot_address.split(':') | ||||
conf_address = '{}:{}:{}'.format(dum, address, int(port)+1) | ||||
else: | ||||
conf_address = self.plot_address + '.config' | ||||
r911 | sender_web_config.bind(conf_address) | |||
time.sleep(1) | ||||
|
r906 | for kwargs in self.operationKwargs.values(): | ||
if 'plot' in kwargs: | ||||
r911 | print '[Sending] Config data to web for {}'.format(kwargs['code'].upper()) | |||
|
r906 | sender_web_config.send_string(json.dumps(kwargs)) | ||
self.isWebConfig = True | ||||