jroutils_publish.py
606 lines
| 19.4 KiB
| text/x-python
|
PythonLexer
|
r859 | ''' | |
@author: Juan C. Espinoza | |||
''' | |||
import time | |||
import json | |||
import numpy | |||
import paho.mqtt.client as mqtt | |||
|
r883 | import zmq | |
import datetime | |||
from zmq.utils.monitor import recv_monitor_message | |||
from functools import wraps | |||
from threading import Thread | |||
from multiprocessing import Process | |||
|
r859 | ||
|
r883 | from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit | |
|
r957 | from schainpy.model.data.jrodata import JROData | |
|
r1062 | from schainpy.utils import log | |
|
r883 | ||
r889 | MAXNUMX = 100 | ||
MAXNUMY = 100 | |||
|
r859 | ||
class PrettyFloat(float): | |||
def __repr__(self): | |||
return '%.2f' % self | |||
|
r863 | def roundFloats(obj): | |
if isinstance(obj, list): | |||
return map(roundFloats, obj) | |||
elif isinstance(obj, float): | |||
return round(obj, 2) | |||
|
r860 | ||
|
r931 | def decimate(z, MAXNUMY): | |
|
r904 | dy = int(len(z[0])/MAXNUMY) + 1 | |
r911 | |||
|
r904 | return z[::, ::dy] | |
|
r859 | ||
|
r883 | class throttle(object): | |
|
r1062 | ''' | |
Decorator that prevents a function from being called more than once every | |||
|
r883 | time period. | |
To create a function that cannot be called more than once a minute, but | |||
will sleep until it can be called: | |||
@throttle(minutes=1) | |||
def foo(): | |||
pass | |||
for i in range(10): | |||
foo() | |||
print "This function has run %s times." % i | |||
|
r1062 | ''' | |
|
r883 | ||
def __init__(self, seconds=0, minutes=0, hours=0): | |||
self.throttle_period = datetime.timedelta( | |||
seconds=seconds, minutes=minutes, hours=hours | |||
) | |||
|
r898 | ||
|
r883 | self.time_of_last_call = datetime.datetime.min | |
def __call__(self, fn): | |||
@wraps(fn) | |||
def wrapper(*args, **kwargs): | |||
now = datetime.datetime.now() | |||
time_since_last_call = now - self.time_of_last_call | |||
time_left = self.throttle_period - time_since_last_call | |||
if time_left > datetime.timedelta(seconds=0): | |||
return | |||
self.time_of_last_call = datetime.datetime.now() | |||
return fn(*args, **kwargs) | |||
return wrapper | |||
|
r1062 | class Data(object): | |
''' | |||
Object to hold data to be plotted | |||
''' | |||
def __init__(self, plottypes, throttle_value): | |||
self.plottypes = plottypes | |||
self.throttle = throttle_value | |||
self.ended = False | |||
self.__times = [] | |||
def __str__(self): | |||
dum = ['{}{}'.format(key, self.shape(key)) for key in self.data] | |||
return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times)) | |||
def __len__(self): | |||
return len(self.__times) | |||
def __getitem__(self, key): | |||
if key not in self.data: | |||
raise KeyError(log.error('Missing key: {}'.format(key))) | |||
if 'spc' in key: | |||
ret = self.data[key] | |||
else: | |||
ret = numpy.array([self.data[key][x] for x in self.times]) | |||
if ret.ndim > 1: | |||
ret = numpy.swapaxes(ret, 0, 1) | |||
return ret | |||
def setup(self): | |||
''' | |||
Configure object | |||
''' | |||
self.ended = False | |||
self.data = {} | |||
self.__times = [] | |||
self.__heights = [] | |||
self.__all_heights = set() | |||
for plot in self.plottypes: | |||
r1065 | if 'snr' in plot: | ||
plot = 'snr' | |||
|
r1062 | self.data[plot] = {} | |
def shape(self, key): | |||
''' | |||
Get the shape of the one-element data for the given key | |||
''' | |||
if len(self.data[key]): | |||
if 'spc' in key: | |||
return self.data[key].shape | |||
return self.data[key][self.__times[0]].shape | |||
return (0,) | |||
def update(self, dataOut): | |||
''' | |||
Update data object with new dataOut | |||
''' | |||
tm = dataOut.utctime | |||
if tm in self.__times: | |||
return | |||
self.parameters = getattr(dataOut, 'parameters', []) | |||
self.pairs = dataOut.pairsList | |||
self.channels = dataOut.channelList | |||
self.interval = dataOut.getTimeInterval() | |||
r1065 | if 'spc' in self.plottypes or 'cspc' in self.plottypes: | ||
|
r1087 | self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1)) | |
|
r1062 | self.__heights.append(dataOut.heightList) | |
self.__all_heights.update(dataOut.heightList) | |||
self.__times.append(tm) | |||
for plot in self.plottypes: | |||
if plot == 'spc': | |||
z = dataOut.data_spc/dataOut.normFactor | |||
self.data[plot] = 10*numpy.log10(z) | |||
if plot == 'cspc': | |||
self.data[plot] = dataOut.data_cspc | |||
if plot == 'noise': | |||
self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor) | |||
if plot == 'rti': | |||
self.data[plot][tm] = dataOut.getPower() | |||
if plot == 'snr_db': | |||
self.data['snr'][tm] = dataOut.data_SNR | |||
if plot == 'snr': | |||
self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR) | |||
if plot == 'dop': | |||
self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP) | |||
if plot == 'mean': | |||
self.data[plot][tm] = dataOut.data_MEAN | |||
if plot == 'std': | |||
self.data[plot][tm] = dataOut.data_STD | |||
if plot == 'coh': | |||
self.data[plot][tm] = dataOut.getCoherence() | |||
if plot == 'phase': | |||
self.data[plot][tm] = dataOut.getCoherence(phase=True) | |||
if plot == 'output': | |||
self.data[plot][tm] = dataOut.data_output | |||
if plot == 'param': | |||
self.data[plot][tm] = dataOut.data_param | |||
def normalize_heights(self): | |||
''' | |||
Ensure same-dimension of the data for different heighList | |||
''' | |||
H = numpy.array(list(self.__all_heights)) | |||
H.sort() | |||
for key in self.data: | |||
shape = self.shape(key)[:-1] + H.shape | |||
for tm, obj in self.data[key].items(): | |||
h = self.__heights[self.__times.index(tm)] | |||
if H.size == h.size: | |||
continue | |||
index = numpy.where(numpy.in1d(H, h))[0] | |||
dummy = numpy.zeros(shape) + numpy.nan | |||
if len(shape) == 2: | |||
dummy[:, index] = obj | |||
else: | |||
dummy[index] = obj | |||
self.data[key][tm] = dummy | |||
self.__heights = [H for tm in self.__times] | |||
def jsonify(self, decimate=False): | |||
''' | |||
Convert data to json | |||
''' | |||
ret = {} | |||
tm = self.times[-1] | |||
for key, value in self.data: | |||
if key in ('spc', 'cspc'): | |||
ret[key] = roundFloats(self.data[key].to_list()) | |||
else: | |||
ret[key] = roundFloats(self.data[key][tm].to_list()) | |||
ret['timestamp'] = tm | |||
ret['interval'] = self.interval | |||
@property | |||
def times(self): | |||
''' | |||
Return the list of times of the current data | |||
''' | |||
ret = numpy.array(self.__times) | |||
ret.sort() | |||
return ret | |||
@property | |||
def heights(self): | |||
''' | |||
Return the list of heights of the current data | |||
''' | |||
return numpy.array(self.__heights[-1]) | |||
|
r860 | ||
|
r859 | class PublishData(Operation): | |
|
r1062 | ''' | |
Operation to send data over zmq. | |||
''' | |||
|
r860 | ||
|
r883 | def __init__(self, **kwargs): | |
|
r860 | """Inicio.""" | |
|
r883 | Operation.__init__(self, **kwargs) | |
|
r859 | self.isConfig = False | |
|
r860 | self.client = None | |
|
r883 | self.zeromq = None | |
self.mqtt = None | |||
|
r860 | ||
def on_disconnect(self, client, userdata, rc): | |||
|
r859 | if rc != 0: | |
|
r1062 | log.warning('Unexpected disconnection.') | |
|
r860 | self.connect() | |
def connect(self): | |||
|
r1062 | log.warning('trying to connect') | |
|
r859 | try: | |
|
r860 | self.client.connect( | |
host=self.host, | |||
port=self.port, | |||
keepalive=60*10, | |||
bind_address='') | |||
self.client.loop_start() | |||
# self.client.publish( | |||
# self.topic + 'SETUP', | |||
# json.dumps(setup), | |||
# retain=True | |||
# ) | |||
|
r859 | except: | |
|
r1062 | log.error('MQTT Conection error.') | |
|
r859 | self.client = False | |
|
r860 | ||
|
r931 | def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs): | |
|
r883 | self.counter = 0 | |
|
r859 | self.topic = kwargs.get('topic', 'schain') | |
self.delay = kwargs.get('delay', 0) | |||
|
r860 | self.plottype = kwargs.get('plottype', 'spectra') | |
|
r883 | self.host = kwargs.get('host', "10.10.10.82") | |
self.port = kwargs.get('port', 3000) | |||
|
r863 | self.clientId = clientId | |
|
r859 | self.cnt = 0 | |
|
r883 | self.zeromq = zeromq | |
self.mqtt = kwargs.get('plottype', 0) | |||
self.client = None | |||
|
r1062 | self.verbose = verbose | |
|
r860 | setup = [] | |
|
r883 | if mqtt is 1: | |
self.client = mqtt.Client( | |||
client_id=self.clientId + self.topic + 'SCHAIN', | |||
clean_session=True) | |||
self.client.on_disconnect = self.on_disconnect | |||
self.connect() | |||
for plot in self.plottype: | |||
setup.append({ | |||
'plot': plot, | |||
'topic': self.topic + plot, | |||
'title': getattr(self, plot + '_' + 'title', False), | |||
'xlabel': getattr(self, plot + '_' + 'xlabel', False), | |||
'ylabel': getattr(self, plot + '_' + 'ylabel', False), | |||
'xrange': getattr(self, plot + '_' + 'xrange', False), | |||
'yrange': getattr(self, plot + '_' + 'yrange', False), | |||
'zrange': getattr(self, plot + '_' + 'zrange', False), | |||
}) | |||
if zeromq is 1: | |||
context = zmq.Context() | |||
self.zmq_socket = context.socket(zmq.PUSH) | |||
server = kwargs.get('server', 'zmq.pipe') | |||
r889 | |||
|
r886 | if 'tcp://' in server: | |
|
r883 | address = server | |
else: | |||
address = 'ipc:///tmp/%s' % server | |||
r889 | |||
|
r883 | self.zmq_socket.connect(address) | |
time.sleep(1) | |||
|
r931 | ||
|
r883 | def publish_data(self): | |
self.dataOut.finished = False | |||
if self.mqtt is 1: | |||
yData = self.dataOut.heightList[:2].tolist() | |||
if self.plottype == 'spectra': | |||
data = getattr(self.dataOut, 'data_spc') | |||
z = data/self.dataOut.normFactor | |||
zdB = 10*numpy.log10(z) | |||
xlen, ylen = zdB[0].shape | |||
r889 | dx = int(xlen/MAXNUMX) + 1 | ||
dy = int(ylen/MAXNUMY) + 1 | |||
|
r883 | Z = [0 for i in self.dataOut.channelList] | |
for i in self.dataOut.channelList: | |||
Z[i] = zdB[i][::dx, ::dy].tolist() | |||
payload = { | |||
'timestamp': self.dataOut.utctime, | |||
'data': roundFloats(Z), | |||
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList], | |||
'interval': self.dataOut.getTimeInterval(), | |||
'type': self.plottype, | |||
'yData': yData | |||
} | |||
|
r860 | ||
|
r883 | elif self.plottype in ('rti', 'power'): | |
data = getattr(self.dataOut, 'data_spc') | |||
z = data/self.dataOut.normFactor | |||
avg = numpy.average(z, axis=1) | |||
avgdB = 10*numpy.log10(avg) | |||
xlen, ylen = z[0].shape | |||
dy = numpy.floor(ylen/self.__MAXNUMY) + 1 | |||
AVG = [0 for i in self.dataOut.channelList] | |||
for i in self.dataOut.channelList: | |||
AVG[i] = avgdB[i][::dy].tolist() | |||
payload = { | |||
'timestamp': self.dataOut.utctime, | |||
'data': roundFloats(AVG), | |||
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList], | |||
'interval': self.dataOut.getTimeInterval(), | |||
'type': self.plottype, | |||
'yData': yData | |||
} | |||
elif self.plottype == 'noise': | |||
noise = self.dataOut.getNoise()/self.dataOut.normFactor | |||
noisedB = 10*numpy.log10(noise) | |||
payload = { | |||
'timestamp': self.dataOut.utctime, | |||
'data': roundFloats(noisedB.reshape(-1, 1).tolist()), | |||
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList], | |||
'interval': self.dataOut.getTimeInterval(), | |||
'type': self.plottype, | |||
'yData': yData | |||
} | |||
elif self.plottype == 'snr': | |||
data = getattr(self.dataOut, 'data_SNR') | |||
avgdB = 10*numpy.log10(data) | |||
ylen = data[0].size | |||
dy = numpy.floor(ylen/self.__MAXNUMY) + 1 | |||
AVG = [0 for i in self.dataOut.channelList] | |||
for i in self.dataOut.channelList: | |||
AVG[i] = avgdB[i][::dy].tolist() | |||
payload = { | |||
'timestamp': self.dataOut.utctime, | |||
'data': roundFloats(AVG), | |||
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList], | |||
'type': self.plottype, | |||
'yData': yData | |||
} | |||
else: | |||
print "Tipo de grafico invalido" | |||
payload = { | |||
'data': 'None', | |||
'timestamp': 'None', | |||
'type': None | |||
} | |||
|
r1062 | ||
|
r883 | self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0) | |
if self.zeromq is 1: | |||
|
r931 | if self.verbose: | |
|
r1062 | log.log( | |
'{} - {}'.format(self.dataOut.type, self.dataOut.datatime), | |||
'Sending' | |||
) | |||
|
r883 | self.zmq_socket.send_pyobj(self.dataOut) | |
def run(self, dataOut, **kwargs): | |||
|
r860 | self.dataOut = dataOut | |
if not self.isConfig: | |||
|
r883 | self.setup(**kwargs) | |
|
r859 | self.isConfig = True | |
|
r860 | ||
|
r883 | self.publish_data() | |
|
r860 | time.sleep(self.delay) | |
|
r859 | def close(self): | |
|
r883 | if self.zeromq is 1: | |
self.dataOut.finished = True | |||
|
r927 | self.zmq_socket.send_pyobj(self.dataOut) | |
|
r1062 | time.sleep(0.1) | |
|
r931 | self.zmq_socket.close() | |
|
r860 | if self.client: | |
self.client.loop_stop() | |||
self.client.disconnect() | |||
r889 | |||
|
r957 | ||
class ReceiverData(ProcessingUnit): | |||
def __init__(self, **kwargs): | |||
ProcessingUnit.__init__(self, **kwargs) | |||
self.isConfig = False | |||
server = kwargs.get('server', 'zmq.pipe') | |||
if 'tcp://' in server: | |||
address = server | |||
else: | |||
address = 'ipc:///tmp/%s' % server | |||
self.address = address | |||
self.dataOut = JROData() | |||
def setup(self): | |||
self.context = zmq.Context() | |||
self.receiver = self.context.socket(zmq.PULL) | |||
self.receiver.bind(self.address) | |||
time.sleep(0.5) | |||
|
r1062 | log.success('ReceiverData from {}'.format(self.address)) | |
|
r957 | ||
def run(self): | |||
if not self.isConfig: | |||
self.setup() | |||
self.isConfig = True | |||
self.dataOut = self.receiver.recv_pyobj() | |||
|
r1062 | log.log('{} - {}'.format(self.dataOut.type, | |
self.dataOut.datatime.ctime(),), | |||
'Receiving') | |||
|
r957 | ||
class PlotterReceiver(ProcessingUnit, Process): | |||
r889 | |||
|
r898 | throttle_value = 5 | |
r889 | def __init__(self, **kwargs): | ||
ProcessingUnit.__init__(self, **kwargs) | |||
Process.__init__(self) | |||
self.mp = False | |||
self.isConfig = False | |||
|
r906 | self.isWebConfig = False | |
r889 | self.connections = 0 | ||
server = kwargs.get('server', 'zmq.pipe') | |||
|
r906 | plot_server = kwargs.get('plot_server', 'zmq.web') | |
r889 | if 'tcp://' in server: | ||
address = server | |||
else: | |||
address = 'ipc:///tmp/%s' % server | |||
|
r906 | if 'tcp://' in plot_server: | |
plot_address = plot_server | |||
else: | |||
plot_address = 'ipc:///tmp/%s' % plot_server | |||
r889 | self.address = address | ||
|
r906 | self.plot_address = plot_address | |
r889 | self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')] | ||
self.realtime = kwargs.get('realtime', False) | |||
r922 | self.throttle_value = kwargs.get('throttle', 5) | ||
|
r898 | self.sendData = self.initThrottle(self.throttle_value) | |
|
r1062 | self.dates = [] | |
r889 | self.setup() | ||
def setup(self): | |||
|
r1062 | self.data = Data(self.plottypes, self.throttle_value) | |
self.isConfig = True | |||
r889 | |||
def event_monitor(self, monitor): | |||
events = {} | |||
for name in dir(zmq): | |||
if name.startswith('EVENT_'): | |||
value = getattr(zmq, name) | |||
events[value] = name | |||
while monitor.poll(): | |||
evt = recv_monitor_message(monitor) | |||
if evt['event'] == 32: | |||
self.connections += 1 | |||
if evt['event'] == 512: | |||
pass | |||
r923 | |||
r889 | evt.update({'description': events[evt['event']]}) | ||
if evt['event'] == zmq.EVENT_MONITOR_STOPPED: | |||
break | |||
monitor.close() | |||
|
r1062 | print('event monitor thread done!') | |
r889 | |||
|
r898 | def initThrottle(self, throttle_value): | |
@throttle(seconds=throttle_value) | |||
def sendDataThrottled(fn_sender, data): | |||
fn_sender(data) | |||
return sendDataThrottled | |||
r889 | |||
def send(self, data): | |||
|
r1062 | log.success('Sending {}'.format(data), self.name) | |
r889 | self.sender.send_pyobj(data) | ||
def run(self): | |||
|
r1062 | log.success( | |
'Starting from {}'.format(self.address), | |||
self.name | |||
) | |||
r889 | |||
self.context = zmq.Context() | |||
self.receiver = self.context.socket(zmq.PULL) | |||
self.receiver.bind(self.address) | |||
monitor = self.receiver.get_monitor_socket() | |||
self.sender = self.context.socket(zmq.PUB) | |||
r911 | if self.realtime: | ||
self.sender_web = self.context.socket(zmq.PUB) | |||
r916 | self.sender_web.connect(self.plot_address) | ||
time.sleep(1) | |||
|
r938 | ||
|
r937 | if 'server' in self.kwargs: | |
self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server'])) | |||
else: | |||
self.sender.bind("ipc:///tmp/zmq.plots") | |||
r889 | |||
|
r1062 | time.sleep(2) | |
|
r938 | ||
|
r904 | t = Thread(target=self.event_monitor, args=(monitor,)) | |
r889 | t.start() | ||
while True: | |||
|
r1062 | dataOut = self.receiver.recv_pyobj() | |
|
r1087 | dt = datetime.datetime.utcfromtimestamp(dataOut.utctime).date() | |
|
r1062 | sended = False | |
if dt not in self.dates: | |||
if self.data: | |||
self.data.ended = True | |||
self.send(self.data) | |||
sended = True | |||
self.data.setup() | |||
self.dates.append(dt) | |||
r889 | |||
|
r1062 | self.data.update(dataOut) | |
|
r931 | ||
|
r1062 | if dataOut.finished is True: | |
r889 | self.connections -= 1 | ||
|
r1062 | if self.connections == 0 and dt in self.dates: | |
self.data.ended = True | |||
r889 | self.send(self.data) | ||
|
r1062 | self.data.setup() | |
r889 | else: | ||
if self.realtime: | |||
self.send(self.data) | |||
|
r1062 | # self.sender_web.send_string(self.data.jsonify()) | |
r889 | else: | ||
|
r1062 | if not sended: | |
self.sendData(self.send, self.data) | |||
r889 | |||
return | |||
r911 | |||
|
r906 | def sendToWeb(self): | |
r911 | |||
|
r906 | if not self.isWebConfig: | |
context = zmq.Context() | |||
sender_web_config = context.socket(zmq.PUB) | |||
if 'tcp://' in self.plot_address: | |||
dum, address, port = self.plot_address.split(':') | |||
conf_address = '{}:{}:{}'.format(dum, address, int(port)+1) | |||
else: | |||
conf_address = self.plot_address + '.config' | |||
r911 | sender_web_config.bind(conf_address) | ||
time.sleep(1) | |||
|
r906 | for kwargs in self.operationKwargs.values(): | |
if 'plot' in kwargs: | |||
|
r1062 | log.success('[Sending] Config data to web for {}'.format(kwargs['code'].upper())) | |
|
r906 | sender_web_config.send_string(json.dumps(kwargs)) | |
|
r1062 | self.isWebConfig = True |