jroutils_publish.py
868 lines
| 28.5 KiB
| text/x-python
|
PythonLexer
|
r859 | ''' | |
@author: Juan C. Espinoza | |||
''' | |||
r1135 | import os | ||
import glob | |||
|
r859 | import time | |
import json | |||
import numpy | |||
import paho.mqtt.client as mqtt | |||
|
r883 | import zmq | |
import datetime | |||
r1135 | import ftplib | ||
|
r883 | from zmq.utils.monitor import recv_monitor_message | |
from functools import wraps | |||
from threading import Thread | |||
from multiprocessing import Process | |||
|
r859 | ||
|
r883 | from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit | |
|
r957 | from schainpy.model.data.jrodata import JROData | |
|
r1062 | from schainpy.utils import log | |
|
r883 | ||
r1135 | MAXNUMX = 500 | ||
MAXNUMY = 500 | |||
PLOT_CODES = { | |||
r1139 | 'rti': 0, # Range time intensity (RTI). | ||
'spc': 1, # Spectra (and Cross-spectra) information. | |||
'cspc': 2, # Cross-Correlation information. | |||
'coh': 3, # Coherence map. | |||
'base': 4, # Base lines graphic. | |||
'row': 5, # Row Spectra. | |||
r1142 | 'total': 6, # Total Power. | ||
'drift': 7, # Drifts graphics. | |||
'height': 8, # Height profile. | |||
'phase': 9, # Signal Phase. | |||
'power': 16, | |||
'noise': 17, | |||
'beacon': 18, | |||
'wind': 22, | |||
'skymap': 23, | |||
'Unknown': 24, | |||
'V-E': 25, # PIP Velocity. | |||
'Z-E': 26, # PIP Reflectivity. | |||
'V-A': 27, # RHI Velocity. | |||
'Z-A': 28, # RHI Reflectivity. | |||
r1135 | } | ||
|
r859 | ||
r1142 | def get_plot_code(s): | ||
label = s.split('_')[0] | |||
codes = [key for key in PLOT_CODES if key in label] | |||
if codes: | |||
return PLOT_CODES[codes[0]] | |||
else: | |||
return 24 | |||
|
r859 | ||
|
r863 | def roundFloats(obj): | |
if isinstance(obj, list): | |||
|
r1167 | return list(map(roundFloats, obj)) | |
|
r863 | elif isinstance(obj, float): | |
return round(obj, 2) | |||
|
r860 | ||
|
r931 | def decimate(z, MAXNUMY): | |
|
r904 | dy = int(len(z[0])/MAXNUMY) + 1 | |
r911 | |||
|
r904 | return z[::, ::dy] | |
|
r859 | ||
|
r883 | class throttle(object): | |
|
r1062 | ''' | |
Decorator that prevents a function from being called more than once every | |||
|
r883 | time period. | |
To create a function that cannot be called more than once a minute, but | |||
will sleep until it can be called: | |||
@throttle(minutes=1) | |||
def foo(): | |||
pass | |||
for i in range(10): | |||
foo() | |||
print "This function has run %s times." % i | |||
|
r1062 | ''' | |
|
r883 | ||
def __init__(self, seconds=0, minutes=0, hours=0): | |||
self.throttle_period = datetime.timedelta( | |||
seconds=seconds, minutes=minutes, hours=hours | |||
) | |||
|
r898 | ||
|
r883 | self.time_of_last_call = datetime.datetime.min | |
def __call__(self, fn): | |||
@wraps(fn) | |||
def wrapper(*args, **kwargs): | |||
|
r1094 | coerce = kwargs.pop('coerce', None) | |
if coerce: | |||
self.time_of_last_call = datetime.datetime.now() | |||
return fn(*args, **kwargs) | |||
else: | |||
now = datetime.datetime.now() | |||
time_since_last_call = now - self.time_of_last_call | |||
time_left = self.throttle_period - time_since_last_call | |||
|
r883 | ||
|
r1094 | if time_left > datetime.timedelta(seconds=0): | |
return | |||
|
r883 | ||
self.time_of_last_call = datetime.datetime.now() | |||
return fn(*args, **kwargs) | |||
return wrapper | |||
|
r1062 | class Data(object): | |
''' | |||
Object to hold data to be plotted | |||
''' | |||
r1135 | def __init__(self, plottypes, throttle_value, exp_code, buffering=True): | ||
|
r1062 | self.plottypes = plottypes | |
self.throttle = throttle_value | |||
|
r1114 | self.exp_code = exp_code | |
r1135 | self.buffering = buffering | ||
|
r1062 | self.ended = False | |
|
r1089 | self.localtime = False | |
r1139 | self.meta = {} | ||
|
r1089 | self.__times = [] | |
self.__heights = [] | |||
|
r1062 | ||
def __str__(self): | |||
dum = ['{}{}'.format(key, self.shape(key)) for key in self.data] | |||
return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times)) | |||
def __len__(self): | |||
return len(self.__times) | |||
def __getitem__(self, key): | |||
if key not in self.data: | |||
raise KeyError(log.error('Missing key: {}'.format(key))) | |||
r1135 | if 'spc' in key or not self.buffering: | ||
|
r1062 | ret = self.data[key] | |
else: | |||
ret = numpy.array([self.data[key][x] for x in self.times]) | |||
if ret.ndim > 1: | |||
ret = numpy.swapaxes(ret, 0, 1) | |||
return ret | |||
|
r1094 | def __contains__(self, key): | |
return key in self.data | |||
|
r1062 | def setup(self): | |
''' | |||
Configure object | |||
''' | |||
r1135 | self.type = '' | ||
|
r1062 | self.ended = False | |
self.data = {} | |||
self.__times = [] | |||
self.__heights = [] | |||
self.__all_heights = set() | |||
for plot in self.plottypes: | |||
r1065 | if 'snr' in plot: | ||
plot = 'snr' | |||
|
r1062 | self.data[plot] = {} | |
def shape(self, key): | |||
''' | |||
Get the shape of the one-element data for the given key | |||
''' | |||
if len(self.data[key]): | |||
r1135 | if 'spc' in key or not self.buffering: | ||
|
r1062 | return self.data[key].shape | |
return self.data[key][self.__times[0]].shape | |||
return (0,) | |||
r1105 | def update(self, dataOut, tm): | ||
|
r1062 | ''' | |
Update data object with new dataOut | |||
''' | |||
r1105 | |||
|
r1062 | if tm in self.__times: | |
return | |||
r1135 | self.type = dataOut.type | ||
|
r1062 | self.parameters = getattr(dataOut, 'parameters', []) | |
|
r1113 | if hasattr(dataOut, 'pairsList'): | |
self.pairs = dataOut.pairsList | |||
r1139 | if hasattr(dataOut, 'meta'): | ||
self.meta = dataOut.meta | |||
|
r1062 | self.channels = dataOut.channelList | |
self.interval = dataOut.getTimeInterval() | |||
|
r1089 | self.localtime = dataOut.useLocalTime | |
r1065 | if 'spc' in self.plottypes or 'cspc' in self.plottypes: | ||
|
r1087 | self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1)) | |
|
r1062 | self.__heights.append(dataOut.heightList) | |
self.__all_heights.update(dataOut.heightList) | |||
self.__times.append(tm) | |||
for plot in self.plottypes: | |||
if plot == 'spc': | |||
z = dataOut.data_spc/dataOut.normFactor | |||
|
r1164 | buffer = 10*numpy.log10(z) | |
|
r1062 | if plot == 'cspc': | |
|
r1164 | buffer = dataOut.data_cspc | |
|
r1062 | if plot == 'noise': | |
r1135 | buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor) | ||
|
r1062 | if plot == 'rti': | |
r1135 | buffer = dataOut.getPower() | ||
|
r1062 | if plot == 'snr_db': | |
|
r1164 | buffer = dataOut.data_SNR | |
|
r1062 | if plot == 'snr': | |
r1135 | buffer = 10*numpy.log10(dataOut.data_SNR) | ||
|
r1062 | if plot == 'dop': | |
r1135 | buffer = 10*numpy.log10(dataOut.data_DOP) | ||
|
r1062 | if plot == 'mean': | |
r1135 | buffer = dataOut.data_MEAN | ||
|
r1062 | if plot == 'std': | |
r1135 | buffer = dataOut.data_STD | ||
|
r1062 | if plot == 'coh': | |
r1135 | buffer = dataOut.getCoherence() | ||
|
r1062 | if plot == 'phase': | |
r1135 | buffer = dataOut.getCoherence(phase=True) | ||
|
r1062 | if plot == 'output': | |
r1135 | buffer = dataOut.data_output | ||
|
r1062 | if plot == 'param': | |
r1135 | buffer = dataOut.data_param | ||
|
r1164 | if 'spc' in plot: | |
r1135 | self.data[plot] = buffer | ||
|
r1164 | else: | |
if self.buffering: | |||
self.data[plot][tm] = buffer | |||
else: | |||
self.data[plot] = buffer | |||
|
r1062 | ||
def normalize_heights(self): | |||
''' | |||
Ensure same-dimension of the data for different heighList | |||
''' | |||
H = numpy.array(list(self.__all_heights)) | |||
H.sort() | |||
for key in self.data: | |||
shape = self.shape(key)[:-1] + H.shape | |||
|
r1167 | for tm, obj in list(self.data[key].items()): | |
|
r1062 | h = self.__heights[self.__times.index(tm)] | |
if H.size == h.size: | |||
continue | |||
index = numpy.where(numpy.in1d(H, h))[0] | |||
dummy = numpy.zeros(shape) + numpy.nan | |||
if len(shape) == 2: | |||
dummy[:, index] = obj | |||
else: | |||
dummy[index] = obj | |||
self.data[key][tm] = dummy | |||
self.__heights = [H for tm in self.__times] | |||
def jsonify(self, decimate=False): | |||
''' | |||
Convert data to json | |||
''' | |||
|
r1114 | data = {} | |
|
r1062 | tm = self.times[-1] | |
|
r1161 | dy = int(self.heights.size/MAXNUMY) + 1 | |
|
r1114 | for key in self.data: | |
r1135 | if key in ('spc', 'cspc') or not self.buffering: | ||
r1122 | dx = int(self.data[key].shape[1]/MAXNUMX) + 1 | ||
data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist()) | |||
|
r1062 | else: | |
|
r1114 | data[key] = roundFloats(self.data[key][tm].tolist()) | |
|
r1062 | ||
|
r1114 | ret = {'data': data} | |
ret['exp_code'] = self.exp_code | |||
ret['time'] = tm | |||
|
r1062 | ret['interval'] = self.interval | |
r1122 | ret['localtime'] = self.localtime | ||
|
r1161 | ret['yrange'] = roundFloats(self.heights[::dy].tolist()) | |
if 'spc' in self.data or 'cspc' in self.data: | |||
r1122 | ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist()) | ||
r1127 | else: | ||
ret['xrange'] = [] | |||
r1122 | if hasattr(self, 'pairs'): | ||
ret['pairs'] = self.pairs | |||
r1127 | else: | ||
ret['pairs'] = [] | |||
r1139 | |||
|
r1167 | for key, value in list(self.meta.items()): | |
r1139 | ret[key] = value | ||
|
r1114 | return json.dumps(ret) | |
|
r1062 | ||
@property | |||
def times(self): | |||
''' | |||
Return the list of times of the current data | |||
''' | |||
ret = numpy.array(self.__times) | |||
ret.sort() | |||
return ret | |||
@property | |||
def heights(self): | |||
''' | |||
Return the list of heights of the current data | |||
''' | |||
return numpy.array(self.__heights[-1]) | |||
|
r860 | ||
|
r859 | class PublishData(Operation): | |
|
r1062 | ''' | |
Operation to send data over zmq. | |||
''' | |||
|
r860 | ||
|
r1097 | __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose'] | |
|
r883 | def __init__(self, **kwargs): | |
|
r860 | """Inicio.""" | |
|
r883 | Operation.__init__(self, **kwargs) | |
|
r859 | self.isConfig = False | |
|
r860 | self.client = None | |
|
r883 | self.zeromq = None | |
self.mqtt = None | |||
|
r860 | ||
def on_disconnect(self, client, userdata, rc): | |||
|
r859 | if rc != 0: | |
|
r1062 | log.warning('Unexpected disconnection.') | |
|
r860 | self.connect() | |
def connect(self): | |||
|
r1062 | log.warning('trying to connect') | |
|
r859 | try: | |
|
r860 | self.client.connect( | |
host=self.host, | |||
port=self.port, | |||
keepalive=60*10, | |||
bind_address='') | |||
self.client.loop_start() | |||
# self.client.publish( | |||
# self.topic + 'SETUP', | |||
# json.dumps(setup), | |||
# retain=True | |||
# ) | |||
|
r859 | except: | |
|
r1062 | log.error('MQTT Conection error.') | |
|
r859 | self.client = False | |
|
r860 | ||
|
r931 | def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs): | |
|
r883 | self.counter = 0 | |
|
r859 | self.topic = kwargs.get('topic', 'schain') | |
self.delay = kwargs.get('delay', 0) | |||
|
r860 | self.plottype = kwargs.get('plottype', 'spectra') | |
|
r883 | self.host = kwargs.get('host', "10.10.10.82") | |
self.port = kwargs.get('port', 3000) | |||
|
r863 | self.clientId = clientId | |
|
r859 | self.cnt = 0 | |
|
r883 | self.zeromq = zeromq | |
self.mqtt = kwargs.get('plottype', 0) | |||
self.client = None | |||
|
r1062 | self.verbose = verbose | |
|
r860 | setup = [] | |
|
r883 | if mqtt is 1: | |
self.client = mqtt.Client( | |||
client_id=self.clientId + self.topic + 'SCHAIN', | |||
clean_session=True) | |||
self.client.on_disconnect = self.on_disconnect | |||
self.connect() | |||
for plot in self.plottype: | |||
setup.append({ | |||
'plot': plot, | |||
'topic': self.topic + plot, | |||
'title': getattr(self, plot + '_' + 'title', False), | |||
'xlabel': getattr(self, plot + '_' + 'xlabel', False), | |||
'ylabel': getattr(self, plot + '_' + 'ylabel', False), | |||
'xrange': getattr(self, plot + '_' + 'xrange', False), | |||
'yrange': getattr(self, plot + '_' + 'yrange', False), | |||
'zrange': getattr(self, plot + '_' + 'zrange', False), | |||
}) | |||
if zeromq is 1: | |||
context = zmq.Context() | |||
self.zmq_socket = context.socket(zmq.PUSH) | |||
server = kwargs.get('server', 'zmq.pipe') | |||
r889 | |||
|
r886 | if 'tcp://' in server: | |
|
r883 | address = server | |
else: | |||
address = 'ipc:///tmp/%s' % server | |||
r889 | |||
|
r883 | self.zmq_socket.connect(address) | |
time.sleep(1) | |||
|
r931 | ||
|
r883 | def publish_data(self): | |
self.dataOut.finished = False | |||
if self.mqtt is 1: | |||
yData = self.dataOut.heightList[:2].tolist() | |||
if self.plottype == 'spectra': | |||
data = getattr(self.dataOut, 'data_spc') | |||
z = data/self.dataOut.normFactor | |||
zdB = 10*numpy.log10(z) | |||
xlen, ylen = zdB[0].shape | |||
r889 | dx = int(xlen/MAXNUMX) + 1 | ||
dy = int(ylen/MAXNUMY) + 1 | |||
|
r883 | Z = [0 for i in self.dataOut.channelList] | |
for i in self.dataOut.channelList: | |||
Z[i] = zdB[i][::dx, ::dy].tolist() | |||
payload = { | |||
'timestamp': self.dataOut.utctime, | |||
'data': roundFloats(Z), | |||
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList], | |||
'interval': self.dataOut.getTimeInterval(), | |||
'type': self.plottype, | |||
'yData': yData | |||
} | |||
|
r860 | ||
|
r883 | elif self.plottype in ('rti', 'power'): | |
data = getattr(self.dataOut, 'data_spc') | |||
z = data/self.dataOut.normFactor | |||
avg = numpy.average(z, axis=1) | |||
avgdB = 10*numpy.log10(avg) | |||
xlen, ylen = z[0].shape | |||
dy = numpy.floor(ylen/self.__MAXNUMY) + 1 | |||
AVG = [0 for i in self.dataOut.channelList] | |||
for i in self.dataOut.channelList: | |||
AVG[i] = avgdB[i][::dy].tolist() | |||
payload = { | |||
'timestamp': self.dataOut.utctime, | |||
'data': roundFloats(AVG), | |||
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList], | |||
'interval': self.dataOut.getTimeInterval(), | |||
'type': self.plottype, | |||
'yData': yData | |||
} | |||
elif self.plottype == 'noise': | |||
noise = self.dataOut.getNoise()/self.dataOut.normFactor | |||
noisedB = 10*numpy.log10(noise) | |||
payload = { | |||
'timestamp': self.dataOut.utctime, | |||
'data': roundFloats(noisedB.reshape(-1, 1).tolist()), | |||
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList], | |||
'interval': self.dataOut.getTimeInterval(), | |||
'type': self.plottype, | |||
'yData': yData | |||
} | |||
elif self.plottype == 'snr': | |||
data = getattr(self.dataOut, 'data_SNR') | |||
avgdB = 10*numpy.log10(data) | |||
ylen = data[0].size | |||
dy = numpy.floor(ylen/self.__MAXNUMY) + 1 | |||
AVG = [0 for i in self.dataOut.channelList] | |||
for i in self.dataOut.channelList: | |||
AVG[i] = avgdB[i][::dy].tolist() | |||
payload = { | |||
'timestamp': self.dataOut.utctime, | |||
'data': roundFloats(AVG), | |||
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList], | |||
'type': self.plottype, | |||
'yData': yData | |||
} | |||
else: | |||
|
r1167 | print("Tipo de grafico invalido") | |
|
r883 | payload = { | |
'data': 'None', | |||
'timestamp': 'None', | |||
'type': None | |||
} | |||
|
r1062 | ||
|
r883 | self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0) | |
if self.zeromq is 1: | |||
|
r931 | if self.verbose: | |
|
r1062 | log.log( | |
|
r1089 | 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime), | |
self.name | |||
|
r1062 | ) | |
|
r883 | self.zmq_socket.send_pyobj(self.dataOut) | |
def run(self, dataOut, **kwargs): | |||
|
r860 | self.dataOut = dataOut | |
if not self.isConfig: | |||
|
r883 | self.setup(**kwargs) | |
|
r859 | self.isConfig = True | |
|
r860 | ||
|
r883 | self.publish_data() | |
|
r860 | time.sleep(self.delay) | |
|
r859 | def close(self): | |
|
r883 | if self.zeromq is 1: | |
self.dataOut.finished = True | |||
|
r927 | self.zmq_socket.send_pyobj(self.dataOut) | |
|
r1062 | time.sleep(0.1) | |
|
r931 | self.zmq_socket.close() | |
|
r860 | if self.client: | |
self.client.loop_stop() | |||
self.client.disconnect() | |||
r889 | |||
|
r957 | ||
class ReceiverData(ProcessingUnit): | |||
|
r1097 | __attrs__ = ['server'] | |
|
r957 | def __init__(self, **kwargs): | |
ProcessingUnit.__init__(self, **kwargs) | |||
self.isConfig = False | |||
server = kwargs.get('server', 'zmq.pipe') | |||
if 'tcp://' in server: | |||
address = server | |||
else: | |||
address = 'ipc:///tmp/%s' % server | |||
self.address = address | |||
self.dataOut = JROData() | |||
def setup(self): | |||
self.context = zmq.Context() | |||
self.receiver = self.context.socket(zmq.PULL) | |||
self.receiver.bind(self.address) | |||
time.sleep(0.5) | |||
|
r1062 | log.success('ReceiverData from {}'.format(self.address)) | |
|
r957 | ||
def run(self): | |||
if not self.isConfig: | |||
self.setup() | |||
self.isConfig = True | |||
self.dataOut = self.receiver.recv_pyobj() | |||
|
r1062 | log.log('{} - {}'.format(self.dataOut.type, | |
self.dataOut.datatime.ctime(),), | |||
'Receiving') | |||
|
r957 | ||
class PlotterReceiver(ProcessingUnit, Process): | |||
r889 | |||
|
r898 | throttle_value = 5 | |
|
r1114 | __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle', | |
r1135 | 'exp_code', 'web_server', 'buffering'] | ||
|
r898 | ||
r889 | def __init__(self, **kwargs): | ||
ProcessingUnit.__init__(self, **kwargs) | |||
Process.__init__(self) | |||
self.mp = False | |||
self.isConfig = False | |||
|
r906 | self.isWebConfig = False | |
r889 | self.connections = 0 | ||
server = kwargs.get('server', 'zmq.pipe') | |||
|
r1114 | web_server = kwargs.get('web_server', None) | |
r889 | if 'tcp://' in server: | ||
address = server | |||
else: | |||
address = 'ipc:///tmp/%s' % server | |||
self.address = address | |||
|
r1114 | self.web_address = web_server | |
self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')] | |||
r889 | self.realtime = kwargs.get('realtime', False) | ||
|
r1089 | self.localtime = kwargs.get('localtime', True) | |
r1135 | self.buffering = kwargs.get('buffering', True) | ||
r922 | self.throttle_value = kwargs.get('throttle', 5) | ||
|
r1114 | self.exp_code = kwargs.get('exp_code', None) | |
|
r898 | self.sendData = self.initThrottle(self.throttle_value) | |
|
r1062 | self.dates = [] | |
r889 | self.setup() | ||
def setup(self): | |||
r1135 | self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering) | ||
self.isConfig = True | |||
r889 | |||
def event_monitor(self, monitor): | |||
events = {} | |||
for name in dir(zmq): | |||
if name.startswith('EVENT_'): | |||
value = getattr(zmq, name) | |||
events[value] = name | |||
while monitor.poll(): | |||
evt = recv_monitor_message(monitor) | |||
if evt['event'] == 32: | |||
self.connections += 1 | |||
if evt['event'] == 512: | |||
pass | |||
r923 | |||
r889 | evt.update({'description': events[evt['event']]}) | ||
if evt['event'] == zmq.EVENT_MONITOR_STOPPED: | |||
break | |||
monitor.close() | |||
|
r1062 | print('event monitor thread done!') | |
r889 | |||
|
r898 | def initThrottle(self, throttle_value): | |
@throttle(seconds=throttle_value) | |||
def sendDataThrottled(fn_sender, data): | |||
fn_sender(data) | |||
return sendDataThrottled | |||
r889 | |||
def send(self, data): | |||
r1135 | log.log('Sending {}'.format(data), self.name) | ||
r889 | self.sender.send_pyobj(data) | ||
def run(self): | |||
r1135 | log.log( | ||
|
r1062 | 'Starting from {}'.format(self.address), | |
self.name | |||
) | |||
r889 | |||
self.context = zmq.Context() | |||
self.receiver = self.context.socket(zmq.PULL) | |||
self.receiver.bind(self.address) | |||
monitor = self.receiver.get_monitor_socket() | |||
self.sender = self.context.socket(zmq.PUB) | |||
|
r1114 | if self.web_address: | |
log.success( | |||
'Sending to web: {}'.format(self.web_address), | |||
self.name | |||
) | |||
|
r1161 | self.sender_web = self.context.socket(zmq.REQ) | |
|
r1114 | self.sender_web.connect(self.web_address) | |
|
r1161 | self.poll = zmq.Poller() | |
self.poll.register(self.sender_web, zmq.POLLIN) | |||
r916 | time.sleep(1) | ||
|
r938 | ||
|
r937 | if 'server' in self.kwargs: | |
self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server'])) | |||
else: | |||
self.sender.bind("ipc:///tmp/zmq.plots") | |||
r889 | |||
|
r1062 | time.sleep(2) | |
|
r938 | ||
|
r904 | t = Thread(target=self.event_monitor, args=(monitor,)) | |
r889 | t.start() | ||
while True: | |||
|
r1062 | dataOut = self.receiver.recv_pyobj() | |
|
r1094 | if not dataOut.flagNoData: | |
r1105 | if dataOut.type == 'Parameters': | ||
|
r1094 | tm = dataOut.utctimeInit | |
else: | |||
tm = dataOut.utctime | |||
if dataOut.useLocalTime: | |||
if not self.localtime: | |||
tm += time.timezone | |||
dt = datetime.datetime.fromtimestamp(tm).date() | |||
else: | |||
if self.localtime: | |||
tm -= time.timezone | |||
dt = datetime.datetime.utcfromtimestamp(tm).date() | |||
coerce = False | |||
if dt not in self.dates: | |||
if self.data: | |||
self.data.ended = True | |||
self.send(self.data) | |||
coerce = True | |||
self.data.setup() | |||
self.dates.append(dt) | |||
|
r931 | ||
r1105 | self.data.update(dataOut, tm) | ||
|
r1094 | ||
|
r1062 | if dataOut.finished is True: | |
r889 | self.connections -= 1 | ||
|
r1062 | if self.connections == 0 and dt in self.dates: | |
self.data.ended = True | |||
r889 | self.send(self.data) | ||
|
r1161 | # self.data.setup() | |
time.sleep(1) | |||
break | |||
r889 | else: | ||
if self.realtime: | |||
self.send(self.data) | |||
|
r1114 | if self.web_address: | |
|
r1161 | retries = 5 | |
while True: | |||
self.sender_web.send(self.data.jsonify()) | |||
socks = dict(self.poll.poll(5000)) | |||
if socks.get(self.sender_web) == zmq.POLLIN: | |||
reply = self.sender_web.recv_string() | |||
if reply == 'ok': | |||
|
r1166 | log.log("Response from server ok", self.name) | |
|
r1161 | break | |
else: | |||
|
r1166 | log.warning("Malformed reply from server: {}".format(reply), self.name) | |
|
r1161 | ||
else: | |||
|
r1166 | log.warning("No response from server, retrying...", self.name) | |
|
r1161 | self.sender_web.setsockopt(zmq.LINGER, 0) | |
self.sender_web.close() | |||
self.poll.unregister(self.sender_web) | |||
retries -= 1 | |||
if retries == 0: | |||
|
r1166 | log.error("Server seems to be offline, abandoning", self.name) | |
self.sender_web = self.context.socket(zmq.REQ) | |||
self.sender_web.connect(self.web_address) | |||
self.poll.register(self.sender_web, zmq.POLLIN) | |||
time.sleep(1) | |||
|
r1161 | break | |
self.sender_web = self.context.socket(zmq.REQ) | |||
self.sender_web.connect(self.web_address) | |||
self.poll.register(self.sender_web, zmq.POLLIN) | |||
time.sleep(1) | |||
|
r1094 | else: | |
self.sendData(self.send, self.data, coerce=coerce) | |||
coerce = False | |||
r889 | |||
return | |||
r1135 | |||
class SendToFTP(Operation, Process): | |||
''' | |||
Operation to send data over FTP. | |||
''' | |||
__attrs__ = ['server', 'username', 'password', 'patterns', 'timeout'] | |||
def __init__(self, **kwargs): | |||
''' | |||
patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...] | |||
''' | |||
Operation.__init__(self, **kwargs) | |||
Process.__init__(self) | |||
self.server = kwargs.get('server') | |||
self.username = kwargs.get('username') | |||
self.password = kwargs.get('password') | |||
self.patterns = kwargs.get('patterns') | |||
r1139 | self.timeout = kwargs.get('timeout', 30) | ||
r1135 | self.times = [time.time() for p in self.patterns] | ||
self.latest = ['' for p in self.patterns] | |||
self.mp = False | |||
self.ftp = None | |||
def setup(self): | |||
log.log('Connecting to ftp://{}'.format(self.server), self.name) | |||
try: | |||
self.ftp = ftplib.FTP(self.server, timeout=self.timeout) | |||
except ftplib.all_errors: | |||
log.error('Server connection fail: {}'.format(self.server), self.name) | |||
if self.ftp is not None: | |||
self.ftp.close() | |||
self.ftp = None | |||
self.isConfig = False | |||
return | |||
try: | |||
self.ftp.login(self.username, self.password) | |||
except ftplib.all_errors: | |||
log.error('The given username y/o password are incorrect', self.name) | |||
if self.ftp is not None: | |||
self.ftp.close() | |||
self.ftp = None | |||
self.isConfig = False | |||
return | |||
log.success('Connection success', self.name) | |||
self.isConfig = True | |||
return | |||
def check(self): | |||
try: | |||
self.ftp.voidcmd("NOOP") | |||
except: | |||
log.warning('Connection lost... trying to reconnect', self.name) | |||
if self.ftp is not None: | |||
self.ftp.close() | |||
self.ftp = None | |||
self.setup() | |||
def find_files(self, path, ext): | |||
files = glob.glob1(path, '*{}'.format(ext)) | |||
files.sort() | |||
if files: | |||
return files[-1] | |||
return None | |||
def getftpname(self, filename, exp_code, sub_exp_code): | |||
thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d') | |||
YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year | |||
DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday | |||
exp_code = '%3.3d'%exp_code | |||
sub_exp_code = '%2.2d'%sub_exp_code | |||
r1142 | plot_code = '%2.2d'% get_plot_code(filename) | ||
r1135 | name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png' | ||
return name | |||
def upload(self, src, dst): | |||
log.log('Uploading {} '.format(src), self.name, nl=False) | |||
fp = open(src, 'rb') | |||
command = 'STOR {}'.format(dst) | |||
try: | |||
self.ftp.storbinary(command, fp, blocksize=1024) | |||
|
r1167 | except Exception as e: | |
r1135 | log.error('{}'.format(e), self.name) | ||
if self.ftp is not None: | |||
self.ftp.close() | |||
self.ftp = None | |||
r1142 | return 0 | ||
r1135 | |||
try: | |||
self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst)) | |||
|
r1167 | except Exception as e: | |
r1135 | log.error('{}'.format(e), self.name) | ||
if self.ftp is not None: | |||
self.ftp.close() | |||
self.ftp = None | |||
r1142 | return 0 | ||
r1135 | |||
fp.close() | |||
log.success('OK', tag='') | |||
r1142 | return 1 | ||
r1135 | |||
def send_files(self): | |||
for x, pattern in enumerate(self.patterns): | |||
local, remote, ext, delay, exp_code, sub_exp_code = pattern | |||
if time.time()-self.times[x] >= delay: | |||
r1142 | srcname = self.find_files(local, ext) | ||
src = os.path.join(local, srcname) | |||
if os.path.getmtime(src) < time.time() - 30*60: | |||
continue | |||
r1135 | if srcname is None or srcname == self.latest[x]: | ||
continue | |||
if 'png' in ext: | |||
dstname = self.getftpname(srcname, exp_code, sub_exp_code) | |||
else: | |||
r1142 | dstname = srcname | ||
r1135 | |||
dst = os.path.join(remote, dstname) | |||
r1142 | if self.upload(src, dst): | ||
self.times[x] = time.time() | |||
self.latest[x] = srcname | |||
else: | |||
self.isConfig = False | |||
break | |||
r1135 | |||
def run(self): | |||
while True: | |||
if not self.isConfig: | |||
self.setup() | |||
if self.ftp is not None: | |||
self.check() | |||
r1142 | self.send_files() | ||
r1139 | time.sleep(10) | ||
r1135 | |||
def close(): | |||
if self.ftp is not None: | |||
r1139 | self.ftp.close() | ||
|
r1167 | self.terminate() |