##// END OF EJS Templates
Now there are two receiver units one for data and one for plots
Now there are two receiver units one for data and one for plots

File last commit:

r957:d3acc9060c1d
r957:d3acc9060c1d
Show More
jroutils_publish.py
502 lines | 17.3 KiB | text/x-python | PythonLexer
'''
@author: Juan C. Espinoza
'''
import time
import json
import numpy
import paho.mqtt.client as mqtt
import zmq
from profilehooks import profile
import datetime
from zmq.utils.monitor import recv_monitor_message
from functools import wraps
from threading import Thread
from multiprocessing import Process
from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
from schainpy.model.data.jrodata import JROData
MAXNUMX = 100
MAXNUMY = 100
class PrettyFloat(float):
def __repr__(self):
return '%.2f' % self
def roundFloats(obj):
if isinstance(obj, list):
return map(roundFloats, obj)
elif isinstance(obj, float):
return round(obj, 2)
def decimate(z, MAXNUMY):
# dx = int(len(self.x)/self.__MAXNUMX) + 1
dy = int(len(z[0])/MAXNUMY) + 1
return z[::, ::dy]
class throttle(object):
"""Decorator that prevents a function from being called more than once every
time period.
To create a function that cannot be called more than once a minute, but
will sleep until it can be called:
@throttle(minutes=1)
def foo():
pass
for i in range(10):
foo()
print "This function has run %s times." % i
"""
def __init__(self, seconds=0, minutes=0, hours=0):
self.throttle_period = datetime.timedelta(
seconds=seconds, minutes=minutes, hours=hours
)
self.time_of_last_call = datetime.datetime.min
def __call__(self, fn):
@wraps(fn)
def wrapper(*args, **kwargs):
now = datetime.datetime.now()
time_since_last_call = now - self.time_of_last_call
time_left = self.throttle_period - time_since_last_call
if time_left > datetime.timedelta(seconds=0):
return
self.time_of_last_call = datetime.datetime.now()
return fn(*args, **kwargs)
return wrapper
class PublishData(Operation):
"""Clase publish."""
def __init__(self, **kwargs):
"""Inicio."""
Operation.__init__(self, **kwargs)
self.isConfig = False
self.client = None
self.zeromq = None
self.mqtt = None
def on_disconnect(self, client, userdata, rc):
if rc != 0:
print("Unexpected disconnection.")
self.connect()
def connect(self):
print 'trying to connect'
try:
self.client.connect(
host=self.host,
port=self.port,
keepalive=60*10,
bind_address='')
self.client.loop_start()
# self.client.publish(
# self.topic + 'SETUP',
# json.dumps(setup),
# retain=True
# )
except:
print "MQTT Conection error."
self.client = False
def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
self.counter = 0
self.topic = kwargs.get('topic', 'schain')
self.delay = kwargs.get('delay', 0)
self.plottype = kwargs.get('plottype', 'spectra')
self.host = kwargs.get('host', "10.10.10.82")
self.port = kwargs.get('port', 3000)
self.clientId = clientId
self.cnt = 0
self.zeromq = zeromq
self.mqtt = kwargs.get('plottype', 0)
self.client = None
self.verbose = verbose
self.dataOut.firstdata = True
setup = []
if mqtt is 1:
self.client = mqtt.Client(
client_id=self.clientId + self.topic + 'SCHAIN',
clean_session=True)
self.client.on_disconnect = self.on_disconnect
self.connect()
for plot in self.plottype:
setup.append({
'plot': plot,
'topic': self.topic + plot,
'title': getattr(self, plot + '_' + 'title', False),
'xlabel': getattr(self, plot + '_' + 'xlabel', False),
'ylabel': getattr(self, plot + '_' + 'ylabel', False),
'xrange': getattr(self, plot + '_' + 'xrange', False),
'yrange': getattr(self, plot + '_' + 'yrange', False),
'zrange': getattr(self, plot + '_' + 'zrange', False),
})
if zeromq is 1:
context = zmq.Context()
self.zmq_socket = context.socket(zmq.PUSH)
server = kwargs.get('server', 'zmq.pipe')
if 'tcp://' in server:
address = server
else:
address = 'ipc:///tmp/%s' % server
self.zmq_socket.connect(address)
time.sleep(1)
def publish_data(self):
self.dataOut.finished = False
if self.mqtt is 1:
yData = self.dataOut.heightList[:2].tolist()
if self.plottype == 'spectra':
data = getattr(self.dataOut, 'data_spc')
z = data/self.dataOut.normFactor
zdB = 10*numpy.log10(z)
xlen, ylen = zdB[0].shape
dx = int(xlen/MAXNUMX) + 1
dy = int(ylen/MAXNUMY) + 1
Z = [0 for i in self.dataOut.channelList]
for i in self.dataOut.channelList:
Z[i] = zdB[i][::dx, ::dy].tolist()
payload = {
'timestamp': self.dataOut.utctime,
'data': roundFloats(Z),
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
'interval': self.dataOut.getTimeInterval(),
'type': self.plottype,
'yData': yData
}
# print payload
elif self.plottype in ('rti', 'power'):
data = getattr(self.dataOut, 'data_spc')
z = data/self.dataOut.normFactor
avg = numpy.average(z, axis=1)
avgdB = 10*numpy.log10(avg)
xlen, ylen = z[0].shape
dy = numpy.floor(ylen/self.__MAXNUMY) + 1
AVG = [0 for i in self.dataOut.channelList]
for i in self.dataOut.channelList:
AVG[i] = avgdB[i][::dy].tolist()
payload = {
'timestamp': self.dataOut.utctime,
'data': roundFloats(AVG),
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
'interval': self.dataOut.getTimeInterval(),
'type': self.plottype,
'yData': yData
}
elif self.plottype == 'noise':
noise = self.dataOut.getNoise()/self.dataOut.normFactor
noisedB = 10*numpy.log10(noise)
payload = {
'timestamp': self.dataOut.utctime,
'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
'interval': self.dataOut.getTimeInterval(),
'type': self.plottype,
'yData': yData
}
elif self.plottype == 'snr':
data = getattr(self.dataOut, 'data_SNR')
avgdB = 10*numpy.log10(data)
ylen = data[0].size
dy = numpy.floor(ylen/self.__MAXNUMY) + 1
AVG = [0 for i in self.dataOut.channelList]
for i in self.dataOut.channelList:
AVG[i] = avgdB[i][::dy].tolist()
payload = {
'timestamp': self.dataOut.utctime,
'data': roundFloats(AVG),
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
'type': self.plottype,
'yData': yData
}
else:
print "Tipo de grafico invalido"
payload = {
'data': 'None',
'timestamp': 'None',
'type': None
}
# print 'Publishing data to {}'.format(self.host)
self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
if self.zeromq is 1:
if self.verbose:
print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
self.zmq_socket.send_pyobj(self.dataOut)
self.dataOut.firstdata = False
def run(self, dataOut, **kwargs):
self.dataOut = dataOut
if not self.isConfig:
self.setup(**kwargs)
self.isConfig = True
self.publish_data()
time.sleep(self.delay)
def close(self):
if self.zeromq is 1:
self.dataOut.finished = True
self.zmq_socket.send_pyobj(self.dataOut)
self.zmq_socket.close()
if self.client:
self.client.loop_stop()
self.client.disconnect()
class ReceiverData(ProcessingUnit):
def __init__(self, **kwargs):
ProcessingUnit.__init__(self, **kwargs)
self.isConfig = False
server = kwargs.get('server', 'zmq.pipe')
if 'tcp://' in server:
address = server
else:
address = 'ipc:///tmp/%s' % server
self.address = address
self.dataOut = JROData()
def setup(self):
self.context = zmq.Context()
self.receiver = self.context.socket(zmq.PULL)
self.receiver.bind(self.address)
time.sleep(0.5)
print '[Starting] ReceiverData from {}'.format(self.address)
def run(self):
if not self.isConfig:
self.setup()
self.isConfig = True
self.dataOut = self.receiver.recv_pyobj()
print '[Receiving] {} - {}'.format(self.dataOut.type,
self.dataOut.datatime.ctime())
class PlotterReceiver(ProcessingUnit, Process):
throttle_value = 5
def __init__(self, **kwargs):
ProcessingUnit.__init__(self, **kwargs)
Process.__init__(self)
self.mp = False
self.isConfig = False
self.isWebConfig = False
self.plottypes = []
self.connections = 0
server = kwargs.get('server', 'zmq.pipe')
plot_server = kwargs.get('plot_server', 'zmq.web')
if 'tcp://' in server:
address = server
else:
address = 'ipc:///tmp/%s' % server
if 'tcp://' in plot_server:
plot_address = plot_server
else:
plot_address = 'ipc:///tmp/%s' % plot_server
self.address = address
self.plot_address = plot_address
self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
self.realtime = kwargs.get('realtime', False)
self.throttle_value = kwargs.get('throttle', 5)
self.sendData = self.initThrottle(self.throttle_value)
self.setup()
def setup(self):
self.data = {}
self.data['times'] = []
for plottype in self.plottypes:
self.data[plottype] = {}
self.data['noise'] = {}
self.data['throttle'] = self.throttle_value
self.data['ENDED'] = False
self.isConfig = True
self.data_web = {}
def event_monitor(self, monitor):
events = {}
for name in dir(zmq):
if name.startswith('EVENT_'):
value = getattr(zmq, name)
events[value] = name
while monitor.poll():
evt = recv_monitor_message(monitor)
if evt['event'] == 32:
self.connections += 1
if evt['event'] == 512:
pass
if self.connections == 0 and self.started is True:
self.ended = True
evt.update({'description': events[evt['event']]})
if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
break
monitor.close()
print("event monitor thread done!")
def initThrottle(self, throttle_value):
@throttle(seconds=throttle_value)
def sendDataThrottled(fn_sender, data):
fn_sender(data)
return sendDataThrottled
def send(self, data):
# print '[sending] data=%s size=%s' % (data.keys(), len(data['times']))
self.sender.send_pyobj(data)
def update(self):
t = self.dataOut.utctime
if t in self.data['times']:
return
self.data['times'].append(t)
self.data['dataOut'] = self.dataOut
for plottype in self.plottypes:
if plottype == 'spc':
z = self.dataOut.data_spc/self.dataOut.normFactor
self.data[plottype] = 10*numpy.log10(z)
self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor)
if plottype == 'cspc':
jcoherence = self.dataOut.data_cspc/numpy.sqrt(self.dataOut.data_spc*self.dataOut.data_spc)
self.data['cspc_coh'] = numpy.abs(jcoherence)
self.data['cspc_phase'] = numpy.arctan2(jcoherence.imag, jcoherence.real)*180/numpy.pi
if plottype == 'rti':
self.data[plottype][t] = self.dataOut.getPower()
if plottype == 'snr':
self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR)
if plottype == 'dop':
self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP)
if plottype == 'mean':
self.data[plottype][t] = self.dataOut.data_MEAN
if plottype == 'std':
self.data[plottype][t] = self.dataOut.data_STD
if plottype == 'coh':
self.data[plottype][t] = self.dataOut.getCoherence()
if plottype == 'phase':
self.data[plottype][t] = self.dataOut.getCoherence(phase=True)
if plottype == 'output':
self.data[plottype][t] = self.dataOut.data_output
if plottype == 'param':
self.data[plottype][t] = self.dataOut.data_param
if self.realtime:
self.data_web['timestamp'] = t
if plottype == 'spc':
self.data_web[plottype] = roundFloats(decimate(self.data[plottype]).tolist())
elif plottype == 'cspc':
self.data_web['cspc_coh'] = roundFloats(decimate(self.data['cspc_coh']).tolist())
self.data_web['cspc_phase'] = roundFloats(decimate(self.data['cspc_phase']).tolist())
elif plottype == 'noise':
self.data_web['noise'] = roundFloats(self.data['noise'][t].tolist())
else:
self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist())
self.data_web['interval'] = self.dataOut.getTimeInterval()
self.data_web['type'] = plottype
def run(self):
print '[Starting] {} from {}'.format(self.name, self.address)
self.context = zmq.Context()
self.receiver = self.context.socket(zmq.PULL)
self.receiver.bind(self.address)
monitor = self.receiver.get_monitor_socket()
self.sender = self.context.socket(zmq.PUB)
if self.realtime:
self.sender_web = self.context.socket(zmq.PUB)
self.sender_web.connect(self.plot_address)
time.sleep(1)
if 'server' in self.kwargs:
self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
else:
self.sender.bind("ipc:///tmp/zmq.plots")
time.sleep(3)
t = Thread(target=self.event_monitor, args=(monitor,))
t.start()
while True:
self.dataOut = self.receiver.recv_pyobj()
# print '[Receiving] {} - {}'.format(self.dataOut.type,
# self.dataOut.datatime.ctime())
self.update()
if self.dataOut.firstdata is True:
self.data['STARTED'] = True
if self.dataOut.finished is True:
self.send(self.data)
self.connections -= 1
if self.connections == 0 and self.started:
self.ended = True
self.data['ENDED'] = True
self.send(self.data)
self.setup()
self.started = False
else:
if self.realtime:
self.send(self.data)
self.sender_web.send_string(json.dumps(self.data_web))
else:
self.sendData(self.send, self.data)
self.started = True
self.data['STARTED'] = False
return
def sendToWeb(self):
if not self.isWebConfig:
context = zmq.Context()
sender_web_config = context.socket(zmq.PUB)
if 'tcp://' in self.plot_address:
dum, address, port = self.plot_address.split(':')
conf_address = '{}:{}:{}'.format(dum, address, int(port)+1)
else:
conf_address = self.plot_address + '.config'
sender_web_config.bind(conf_address)
time.sleep(1)
for kwargs in self.operationKwargs.values():
if 'plot' in kwargs:
print '[Sending] Config data to web for {}'.format(kwargs['code'].upper())
sender_web_config.send_string(json.dumps(kwargs))
self.isWebConfig = True