##// END OF EJS Templates
Merge with px1000 branch
Merge with px1000 branch

File last commit:

r1164:80b8ab8c0c11 merge
r1164:80b8ab8c0c11 merge
Show More
jroutils_publish.py
864 lines | 28.1 KiB | text/x-python | PythonLexer
Juan C. Valdez
Add Operation publish using MQTT
r859 '''
@author: Juan C. Espinoza
'''
New Operation SendToFTP
r1135 import os
import glob
Juan C. Valdez
Add Operation publish using MQTT
r859 import time
import json
import numpy
import paho.mqtt.client as mqtt
Juan C. Valdez
zmq support in PublishData
r883 import zmq
import datetime
New Operation SendToFTP
r1135 import ftplib
Juan C. Valdez
zmq support in PublishData
r883 from zmq.utils.monitor import recv_monitor_message
from functools import wraps
from threading import Thread
from multiprocessing import Process
Juan C. Valdez
Add Operation publish using MQTT
r859
Juan C. Valdez
zmq support in PublishData
r883 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
Juan C. Espinoza
Now there are two receiver units one for data and one for plots
r957 from schainpy.model.data.jrodata import JROData
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 from schainpy.utils import log
Juan C. Valdez
zmq support in PublishData
r883
New Operation SendToFTP
r1135 MAXNUMX = 500
MAXNUMY = 500
PLOT_CODES = {
Update plot codes, add meta attribute to dataOut to send metadata to plots
r1139 'rti': 0, # Range time intensity (RTI).
'spc': 1, # Spectra (and Cross-spectra) information.
'cspc': 2, # Cross-Correlation information.
'coh': 3, # Coherence map.
'base': 4, # Base lines graphic.
'row': 5, # Row Spectra.
Fix SendByFTP and add plot codes
r1142 'total': 6, # Total Power.
'drift': 7, # Drifts graphics.
'height': 8, # Height profile.
'phase': 9, # Signal Phase.
'power': 16,
'noise': 17,
'beacon': 18,
'wind': 22,
'skymap': 23,
'Unknown': 24,
'V-E': 25, # PIP Velocity.
'Z-E': 26, # PIP Reflectivity.
'V-A': 27, # RHI Velocity.
'Z-A': 28, # RHI Reflectivity.
New Operation SendToFTP
r1135 }
Juan C. Valdez
Add Operation publish using MQTT
r859
Fix SendByFTP and add plot codes
r1142 def get_plot_code(s):
label = s.split('_')[0]
codes = [key for key in PLOT_CODES if key in label]
if codes:
return PLOT_CODES[codes[0]]
else:
return 24
Juan C. Valdez
Add Operation publish using MQTT
r859
Juan C. Valdez
merge from graphics branch
r863 def roundFloats(obj):
if isinstance(obj, list):
return map(roundFloats, obj)
elif isinstance(obj, float):
return round(obj, 2)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
José Chávez
finishing day, need testing
r931 def decimate(z, MAXNUMY):
José Chávez
receiver data modificado para web
r904 dy = int(len(z[0])/MAXNUMY) + 1
Fix SendToWeb operation
r911
José Chávez
receiver data modificado para web
r904 return z[::, ::dy]
Juan C. Valdez
Add Operation publish using MQTT
r859
Juan C. Valdez
zmq support in PublishData
r883 class throttle(object):
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 '''
Decorator that prevents a function from being called more than once every
Juan C. Valdez
zmq support in PublishData
r883 time period.
To create a function that cannot be called more than once a minute, but
will sleep until it can be called:
@throttle(minutes=1)
def foo():
pass
for i in range(10):
foo()
print "This function has run %s times." % i
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 '''
Juan C. Valdez
zmq support in PublishData
r883
def __init__(self, seconds=0, minutes=0, hours=0):
self.throttle_period = datetime.timedelta(
seconds=seconds, minutes=minutes, hours=hours
)
José Chávez
funcionando todo
r898
Juan C. Valdez
zmq support in PublishData
r883 self.time_of_last_call = datetime.datetime.min
def __call__(self, fn):
@wraps(fn)
def wrapper(*args, **kwargs):
Juan C. Espinoza
Fix time when publishing Parameters
r1094 coerce = kwargs.pop('coerce', None)
if coerce:
self.time_of_last_call = datetime.datetime.now()
return fn(*args, **kwargs)
else:
now = datetime.datetime.now()
time_since_last_call = now - self.time_of_last_call
time_left = self.throttle_period - time_since_last_call
Juan C. Valdez
zmq support in PublishData
r883
Juan C. Espinoza
Fix time when publishing Parameters
r1094 if time_left > datetime.timedelta(seconds=0):
return
Juan C. Valdez
zmq support in PublishData
r883
self.time_of_last_call = datetime.datetime.now()
return fn(*args, **kwargs)
return wrapper
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 class Data(object):
'''
Object to hold data to be plotted
'''
New Operation SendToFTP
r1135 def __init__(self, plottypes, throttle_value, exp_code, buffering=True):
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 self.plottypes = plottypes
self.throttle = throttle_value
Juan C. Espinoza
Fix publish to web_server
r1114 self.exp_code = exp_code
New Operation SendToFTP
r1135 self.buffering = buffering
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 self.ended = False
Juan C. Espinoza
Add localtime anf fix pause figs
r1089 self.localtime = False
Update plot codes, add meta attribute to dataOut to send metadata to plots
r1139 self.meta = {}
Juan C. Espinoza
Add localtime anf fix pause figs
r1089 self.__times = []
self.__heights = []
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062
def __str__(self):
dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
def __len__(self):
return len(self.__times)
def __getitem__(self, key):
if key not in self.data:
raise KeyError(log.error('Missing key: {}'.format(key)))
New Operation SendToFTP
r1135 if 'spc' in key or not self.buffering:
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 ret = self.data[key]
else:
ret = numpy.array([self.data[key][x] for x in self.times])
if ret.ndim > 1:
ret = numpy.swapaxes(ret, 0, 1)
return ret
Juan C. Espinoza
Fix time when publishing Parameters
r1094 def __contains__(self, key):
return key in self.data
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 def setup(self):
'''
Configure object
'''
New Operation SendToFTP
r1135 self.type = ''
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 self.ended = False
self.data = {}
self.__times = []
self.__heights = []
self.__all_heights = set()
for plot in self.plottypes:
MADReader support for HDF5 (mad2 & mad3)
r1065 if 'snr' in plot:
plot = 'snr'
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 self.data[plot] = {}
def shape(self, key):
'''
Get the shape of the one-element data for the given key
'''
if len(self.data[key]):
New Operation SendToFTP
r1135 if 'spc' in key or not self.buffering:
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 return self.data[key].shape
return self.data[key][self.__times[0]].shape
return (0,)
Julia Reader Done!! Task #1085
r1105 def update(self, dataOut, tm):
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 '''
Update data object with new dataOut
'''
Julia Reader Done!! Task #1085
r1105
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 if tm in self.__times:
return
New Operation SendToFTP
r1135 self.type = dataOut.type
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 self.parameters = getattr(dataOut, 'parameters', [])
Juan C. Espinoza
Move schaincli to schainpy.cli
r1113 if hasattr(dataOut, 'pairsList'):
self.pairs = dataOut.pairsList
Update plot codes, add meta attribute to dataOut to send metadata to plots
r1139 if hasattr(dataOut, 'meta'):
self.meta = dataOut.meta
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 self.channels = dataOut.channelList
self.interval = dataOut.getTimeInterval()
Juan C. Espinoza
Add localtime anf fix pause figs
r1089 self.localtime = dataOut.useLocalTime
MADReader support for HDF5 (mad2 & mad3)
r1065 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
Juan C. Espinoza
Add new events to PlotData, fix utc times
r1087 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 self.__heights.append(dataOut.heightList)
self.__all_heights.update(dataOut.heightList)
self.__times.append(tm)
for plot in self.plottypes:
if plot == 'spc':
z = dataOut.data_spc/dataOut.normFactor
Juan C. Espinoza
Merge with px1000 branch
r1164 buffer = 10*numpy.log10(z)
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 if plot == 'cspc':
Juan C. Espinoza
Merge with px1000 branch
r1164 buffer = dataOut.data_cspc
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 if plot == 'noise':
New Operation SendToFTP
r1135 buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 if plot == 'rti':
New Operation SendToFTP
r1135 buffer = dataOut.getPower()
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 if plot == 'snr_db':
Juan C. Espinoza
Merge with px1000 branch
r1164 buffer = dataOut.data_SNR
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 if plot == 'snr':
New Operation SendToFTP
r1135 buffer = 10*numpy.log10(dataOut.data_SNR)
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 if plot == 'dop':
New Operation SendToFTP
r1135 buffer = 10*numpy.log10(dataOut.data_DOP)
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 if plot == 'mean':
New Operation SendToFTP
r1135 buffer = dataOut.data_MEAN
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 if plot == 'std':
New Operation SendToFTP
r1135 buffer = dataOut.data_STD
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 if plot == 'coh':
New Operation SendToFTP
r1135 buffer = dataOut.getCoherence()
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 if plot == 'phase':
New Operation SendToFTP
r1135 buffer = dataOut.getCoherence(phase=True)
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 if plot == 'output':
New Operation SendToFTP
r1135 buffer = dataOut.data_output
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 if plot == 'param':
New Operation SendToFTP
r1135 buffer = dataOut.data_param
Juan C. Espinoza
Merge with px1000 branch
r1164 if 'spc' in plot:
New Operation SendToFTP
r1135 self.data[plot] = buffer
Juan C. Espinoza
Merge with px1000 branch
r1164 else:
if self.buffering:
self.data[plot][tm] = buffer
else:
self.data[plot] = buffer
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062
def normalize_heights(self):
'''
Ensure same-dimension of the data for different heighList
'''
H = numpy.array(list(self.__all_heights))
H.sort()
for key in self.data:
shape = self.shape(key)[:-1] + H.shape
for tm, obj in self.data[key].items():
h = self.__heights[self.__times.index(tm)]
if H.size == h.size:
continue
index = numpy.where(numpy.in1d(H, h))[0]
dummy = numpy.zeros(shape) + numpy.nan
if len(shape) == 2:
dummy[:, index] = obj
else:
dummy[index] = obj
self.data[key][tm] = dummy
self.__heights = [H for tm in self.__times]
def jsonify(self, decimate=False):
'''
Convert data to json
'''
Juan C. Espinoza
Fix publish to web_server
r1114 data = {}
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 tm = self.times[-1]
Juan C. Espinoza
Change PUB/SUB to REQ/REP for send data to web server
r1161 dy = int(self.heights.size/MAXNUMY) + 1
Juan C. Espinoza
Fix publish to web_server
r1114 for key in self.data:
New Operation SendToFTP
r1135 if key in ('spc', 'cspc') or not self.buffering:
Add decimation to jsonify data to web & fix typo KM
r1122 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 else:
Juan C. Espinoza
Fix publish to web_server
r1114 data[key] = roundFloats(self.data[key][tm].tolist())
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062
Juan C. Espinoza
Fix publish to web_server
r1114 ret = {'data': data}
ret['exp_code'] = self.exp_code
ret['time'] = tm
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 ret['interval'] = self.interval
Add decimation to jsonify data to web & fix typo KM
r1122 ret['localtime'] = self.localtime
Juan C. Espinoza
Change PUB/SUB to REQ/REP for send data to web server
r1161 ret['yrange'] = roundFloats(self.heights[::dy].tolist())
if 'spc' in self.data or 'cspc' in self.data:
Add decimation to jsonify data to web & fix typo KM
r1122 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
Add default values in jsonify Data
r1127 else:
ret['xrange'] = []
Add decimation to jsonify data to web & fix typo KM
r1122 if hasattr(self, 'pairs'):
ret['pairs'] = self.pairs
Add default values in jsonify Data
r1127 else:
ret['pairs'] = []
Update plot codes, add meta attribute to dataOut to send metadata to plots
r1139
Juan C. Espinoza
Merge with px1000 branch
r1164 for key, value in self.meta.items():
Update plot codes, add meta attribute to dataOut to send metadata to plots
r1139 ret[key] = value
Juan C. Espinoza
Fix publish to web_server
r1114 return json.dumps(ret)
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062
@property
def times(self):
'''
Return the list of times of the current data
'''
ret = numpy.array(self.__times)
ret.sort()
return ret
@property
def heights(self):
'''
Return the list of heights of the current data
'''
return numpy.array(self.__heights[-1])
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Valdez
Add Operation publish using MQTT
r859 class PublishData(Operation):
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 '''
Operation to send data over zmq.
'''
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Add __attrs__ attribute to Process clasess to improve CLI finder
r1097 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
Juan C. Valdez
zmq support in PublishData
r883 def __init__(self, **kwargs):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 """Inicio."""
Juan C. Valdez
zmq support in PublishData
r883 Operation.__init__(self, **kwargs)
Juan C. Valdez
Add Operation publish using MQTT
r859 self.isConfig = False
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 self.client = None
Juan C. Valdez
zmq support in PublishData
r883 self.zeromq = None
self.mqtt = None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
def on_disconnect(self, client, userdata, rc):
Juan C. Valdez
Add Operation publish using MQTT
r859 if rc != 0:
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 log.warning('Unexpected disconnection.')
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 self.connect()
def connect(self):
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 log.warning('trying to connect')
Juan C. Valdez
Add Operation publish using MQTT
r859 try:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 self.client.connect(
host=self.host,
port=self.port,
keepalive=60*10,
bind_address='')
self.client.loop_start()
# self.client.publish(
# self.topic + 'SETUP',
# json.dumps(setup),
# retain=True
# )
Juan C. Valdez
Add Operation publish using MQTT
r859 except:
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 log.error('MQTT Conection error.')
Juan C. Valdez
Add Operation publish using MQTT
r859 self.client = False
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
José Chávez
finishing day, need testing
r931 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
Juan C. Valdez
zmq support in PublishData
r883 self.counter = 0
Juan C. Valdez
Add Operation publish using MQTT
r859 self.topic = kwargs.get('topic', 'schain')
self.delay = kwargs.get('delay', 0)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 self.plottype = kwargs.get('plottype', 'spectra')
Juan C. Valdez
zmq support in PublishData
r883 self.host = kwargs.get('host', "10.10.10.82")
self.port = kwargs.get('port', 3000)
Juan C. Valdez
merge from graphics branch
r863 self.clientId = clientId
Juan C. Valdez
Add Operation publish using MQTT
r859 self.cnt = 0
Juan C. Valdez
zmq support in PublishData
r883 self.zeromq = zeromq
self.mqtt = kwargs.get('plottype', 0)
self.client = None
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 self.verbose = verbose
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 setup = []
Juan C. Valdez
zmq support in PublishData
r883 if mqtt is 1:
self.client = mqtt.Client(
client_id=self.clientId + self.topic + 'SCHAIN',
clean_session=True)
self.client.on_disconnect = self.on_disconnect
self.connect()
for plot in self.plottype:
setup.append({
'plot': plot,
'topic': self.topic + plot,
'title': getattr(self, plot + '_' + 'title', False),
'xlabel': getattr(self, plot + '_' + 'xlabel', False),
'ylabel': getattr(self, plot + '_' + 'ylabel', False),
'xrange': getattr(self, plot + '_' + 'xrange', False),
'yrange': getattr(self, plot + '_' + 'yrange', False),
'zrange': getattr(self, plot + '_' + 'zrange', False),
})
if zeromq is 1:
context = zmq.Context()
self.zmq_socket = context.socket(zmq.PUSH)
server = kwargs.get('server', 'zmq.pipe')
ReceiverData Operation, test PlotData
r889
Juan C. Valdez
fix zmq protocol
r886 if 'tcp://' in server:
Juan C. Valdez
zmq support in PublishData
r883 address = server
else:
address = 'ipc:///tmp/%s' % server
ReceiverData Operation, test PlotData
r889
Juan C. Valdez
zmq support in PublishData
r883 self.zmq_socket.connect(address)
time.sleep(1)
José Chávez
finishing day, need testing
r931
Juan C. Valdez
zmq support in PublishData
r883 def publish_data(self):
self.dataOut.finished = False
if self.mqtt is 1:
yData = self.dataOut.heightList[:2].tolist()
if self.plottype == 'spectra':
data = getattr(self.dataOut, 'data_spc')
z = data/self.dataOut.normFactor
zdB = 10*numpy.log10(z)
xlen, ylen = zdB[0].shape
ReceiverData Operation, test PlotData
r889 dx = int(xlen/MAXNUMX) + 1
dy = int(ylen/MAXNUMY) + 1
Juan C. Valdez
zmq support in PublishData
r883 Z = [0 for i in self.dataOut.channelList]
for i in self.dataOut.channelList:
Z[i] = zdB[i][::dx, ::dy].tolist()
payload = {
'timestamp': self.dataOut.utctime,
'data': roundFloats(Z),
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
'interval': self.dataOut.getTimeInterval(),
'type': self.plottype,
'yData': yData
}
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Valdez
zmq support in PublishData
r883 elif self.plottype in ('rti', 'power'):
data = getattr(self.dataOut, 'data_spc')
z = data/self.dataOut.normFactor
avg = numpy.average(z, axis=1)
avgdB = 10*numpy.log10(avg)
xlen, ylen = z[0].shape
dy = numpy.floor(ylen/self.__MAXNUMY) + 1
AVG = [0 for i in self.dataOut.channelList]
for i in self.dataOut.channelList:
AVG[i] = avgdB[i][::dy].tolist()
payload = {
'timestamp': self.dataOut.utctime,
'data': roundFloats(AVG),
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
'interval': self.dataOut.getTimeInterval(),
'type': self.plottype,
'yData': yData
}
elif self.plottype == 'noise':
noise = self.dataOut.getNoise()/self.dataOut.normFactor
noisedB = 10*numpy.log10(noise)
payload = {
'timestamp': self.dataOut.utctime,
'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
'interval': self.dataOut.getTimeInterval(),
'type': self.plottype,
'yData': yData
}
elif self.plottype == 'snr':
data = getattr(self.dataOut, 'data_SNR')
avgdB = 10*numpy.log10(data)
ylen = data[0].size
dy = numpy.floor(ylen/self.__MAXNUMY) + 1
AVG = [0 for i in self.dataOut.channelList]
for i in self.dataOut.channelList:
AVG[i] = avgdB[i][::dy].tolist()
payload = {
'timestamp': self.dataOut.utctime,
'data': roundFloats(AVG),
'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
'type': self.plottype,
'yData': yData
}
else:
print "Tipo de grafico invalido"
payload = {
'data': 'None',
'timestamp': 'None',
'type': None
}
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062
Juan C. Valdez
zmq support in PublishData
r883 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
if self.zeromq is 1:
José Chávez
finishing day, need testing
r931 if self.verbose:
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 log.log(
Juan C. Espinoza
Add localtime anf fix pause figs
r1089 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
self.name
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 )
Juan C. Valdez
zmq support in PublishData
r883 self.zmq_socket.send_pyobj(self.dataOut)
def run(self, dataOut, **kwargs):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 self.dataOut = dataOut
if not self.isConfig:
Juan C. Valdez
zmq support in PublishData
r883 self.setup(**kwargs)
Juan C. Valdez
Add Operation publish using MQTT
r859 self.isConfig = True
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Valdez
zmq support in PublishData
r883 self.publish_data()
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 time.sleep(self.delay)
Juan C. Valdez
Add Operation publish using MQTT
r859 def close(self):
Juan C. Valdez
zmq support in PublishData
r883 if self.zeromq is 1:
self.dataOut.finished = True
José Chávez
ningun cambio
r927 self.zmq_socket.send_pyobj(self.dataOut)
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 time.sleep(0.1)
José Chávez
finishing day, need testing
r931 self.zmq_socket.close()
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 if self.client:
self.client.loop_stop()
self.client.disconnect()
ReceiverData Operation, test PlotData
r889
Juan C. Espinoza
Now there are two receiver units one for data and one for plots
r957
class ReceiverData(ProcessingUnit):
Juan C. Espinoza
Add __attrs__ attribute to Process clasess to improve CLI finder
r1097 __attrs__ = ['server']
Juan C. Espinoza
Now there are two receiver units one for data and one for plots
r957 def __init__(self, **kwargs):
ProcessingUnit.__init__(self, **kwargs)
self.isConfig = False
server = kwargs.get('server', 'zmq.pipe')
if 'tcp://' in server:
address = server
else:
address = 'ipc:///tmp/%s' % server
self.address = address
self.dataOut = JROData()
def setup(self):
self.context = zmq.Context()
self.receiver = self.context.socket(zmq.PULL)
self.receiver.bind(self.address)
time.sleep(0.5)
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 log.success('ReceiverData from {}'.format(self.address))
Juan C. Espinoza
Now there are two receiver units one for data and one for plots
r957
def run(self):
if not self.isConfig:
self.setup()
self.isConfig = True
self.dataOut = self.receiver.recv_pyobj()
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 log.log('{} - {}'.format(self.dataOut.type,
self.dataOut.datatime.ctime(),),
'Receiving')
Juan C. Espinoza
Now there are two receiver units one for data and one for plots
r957
class PlotterReceiver(ProcessingUnit, Process):
ReceiverData Operation, test PlotData
r889
José Chávez
funcionando todo
r898 throttle_value = 5
Juan C. Espinoza
Fix publish to web_server
r1114 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
New Operation SendToFTP
r1135 'exp_code', 'web_server', 'buffering']
José Chávez
funcionando todo
r898
ReceiverData Operation, test PlotData
r889 def __init__(self, **kwargs):
ProcessingUnit.__init__(self, **kwargs)
Process.__init__(self)
self.mp = False
self.isConfig = False
Juan C. Espinoza
Update version, fix kwargs for self operations (methods), Add SendToWeb...
r906 self.isWebConfig = False
ReceiverData Operation, test PlotData
r889 self.connections = 0
server = kwargs.get('server', 'zmq.pipe')
Juan C. Espinoza
Fix publish to web_server
r1114 web_server = kwargs.get('web_server', None)
ReceiverData Operation, test PlotData
r889 if 'tcp://' in server:
address = server
else:
address = 'ipc:///tmp/%s' % server
self.address = address
Juan C. Espinoza
Fix publish to web_server
r1114 self.web_address = web_server
self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
ReceiverData Operation, test PlotData
r889 self.realtime = kwargs.get('realtime', False)
Juan C. Espinoza
Add localtime anf fix pause figs
r1089 self.localtime = kwargs.get('localtime', True)
New Operation SendToFTP
r1135 self.buffering = kwargs.get('buffering', True)
Fix all PlotData, add SpectraMean, CrossSpectra plots, now Parameters extends Spectra fix bugs in ParametersProc
r922 self.throttle_value = kwargs.get('throttle', 5)
Juan C. Espinoza
Fix publish to web_server
r1114 self.exp_code = kwargs.get('exp_code', None)
José Chávez
funcionando todo
r898 self.sendData = self.initThrottle(self.throttle_value)
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 self.dates = []
ReceiverData Operation, test PlotData
r889 self.setup()
def setup(self):
New Operation SendToFTP
r1135 self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering)
self.isConfig = True
ReceiverData Operation, test PlotData
r889
def event_monitor(self, monitor):
events = {}
for name in dir(zmq):
if name.startswith('EVENT_'):
value = getattr(zmq, name)
events[value] = name
while monitor.poll():
evt = recv_monitor_message(monitor)
if evt['event'] == 32:
self.connections += 1
if evt['event'] == 512:
pass
Add WindProfiler support, multiSchain by_day
r923
ReceiverData Operation, test PlotData
r889 evt.update({'description': events[evt['event']]})
if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
break
monitor.close()
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 print('event monitor thread done!')
ReceiverData Operation, test PlotData
r889
José Chávez
funcionando todo
r898 def initThrottle(self, throttle_value):
@throttle(seconds=throttle_value)
def sendDataThrottled(fn_sender, data):
fn_sender(data)
return sendDataThrottled
ReceiverData Operation, test PlotData
r889
def send(self, data):
New Operation SendToFTP
r1135 log.log('Sending {}'.format(data), self.name)
ReceiverData Operation, test PlotData
r889 self.sender.send_pyobj(data)
def run(self):
New Operation SendToFTP
r1135 log.log(
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 'Starting from {}'.format(self.address),
self.name
)
ReceiverData Operation, test PlotData
r889
self.context = zmq.Context()
self.receiver = self.context.socket(zmq.PULL)
self.receiver.bind(self.address)
monitor = self.receiver.get_monitor_socket()
self.sender = self.context.socket(zmq.PUB)
Juan C. Espinoza
Fix publish to web_server
r1114 if self.web_address:
log.success(
'Sending to web: {}'.format(self.web_address),
self.name
)
Juan C. Espinoza
Change PUB/SUB to REQ/REP for send data to web server
r1161 self.sender_web = self.context.socket(zmq.REQ)
Juan C. Espinoza
Fix publish to web_server
r1114 self.sender_web.connect(self.web_address)
Juan C. Espinoza
Change PUB/SUB to REQ/REP for send data to web server
r1161 self.poll = zmq.Poller()
self.poll.register(self.sender_web, zmq.POLLIN)
cambio bind x connect in ReceiverData
r916 time.sleep(1)
Juan C. Espinoza
fixing merge
r938
Juan C. Espinoza
Add SkyMapPlotData, operation can access parent kwargs, fix server plot for multiple ReceiverData
r937 if 'server' in self.kwargs:
self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
else:
self.sender.bind("ipc:///tmp/zmq.plots")
ReceiverData Operation, test PlotData
r889
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 time.sleep(2)
Juan C. Espinoza
fixing merge
r938
José Chávez
receiver data modificado para web
r904 t = Thread(target=self.event_monitor, args=(monitor,))
ReceiverData Operation, test PlotData
r889 t.start()
while True:
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 dataOut = self.receiver.recv_pyobj()
Juan C. Espinoza
Fix time when publishing Parameters
r1094 if not dataOut.flagNoData:
Julia Reader Done!! Task #1085
r1105 if dataOut.type == 'Parameters':
Juan C. Espinoza
Fix time when publishing Parameters
r1094 tm = dataOut.utctimeInit
else:
tm = dataOut.utctime
if dataOut.useLocalTime:
if not self.localtime:
tm += time.timezone
dt = datetime.datetime.fromtimestamp(tm).date()
else:
if self.localtime:
tm -= time.timezone
dt = datetime.datetime.utcfromtimestamp(tm).date()
coerce = False
if dt not in self.dates:
if self.data:
self.data.ended = True
self.send(self.data)
coerce = True
self.data.setup()
self.dates.append(dt)
José Chávez
finishing day, need testing
r931
Julia Reader Done!! Task #1085
r1105 self.data.update(dataOut, tm)
Juan C. Espinoza
Fix time when publishing Parameters
r1094
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 if dataOut.finished is True:
ReceiverData Operation, test PlotData
r889 self.connections -= 1
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 if self.connections == 0 and dt in self.dates:
self.data.ended = True
ReceiverData Operation, test PlotData
r889 self.send(self.data)
Juan C. Espinoza
Change PUB/SUB to REQ/REP for send data to web server
r1161 # self.data.setup()
time.sleep(1)
break
ReceiverData Operation, test PlotData
r889 else:
if self.realtime:
self.send(self.data)
Juan C. Espinoza
Fix publish to web_server
r1114 if self.web_address:
Juan C. Espinoza
Change PUB/SUB to REQ/REP for send data to web server
r1161 retries = 5
while True:
self.sender_web.send(self.data.jsonify())
socks = dict(self.poll.poll(5000))
if socks.get(self.sender_web) == zmq.POLLIN:
reply = self.sender_web.recv_string()
if reply == 'ok':
break
else:
print("Malformed reply from server: %s" % reply)
else:
print("No response from server, retrying...")
self.sender_web.setsockopt(zmq.LINGER, 0)
self.sender_web.close()
self.poll.unregister(self.sender_web)
retries -= 1
if retries == 0:
print("Server seems to be offline, abandoning")
break
self.sender_web = self.context.socket(zmq.REQ)
self.sender_web.connect(self.web_address)
self.poll.register(self.sender_web, zmq.POLLIN)
time.sleep(1)
Juan C. Espinoza
Fix time when publishing Parameters
r1094 else:
self.sendData(self.send, self.data, coerce=coerce)
coerce = False
ReceiverData Operation, test PlotData
r889
return
New Operation SendToFTP
r1135
class SendToFTP(Operation, Process):
'''
Operation to send data over FTP.
'''
__attrs__ = ['server', 'username', 'password', 'patterns', 'timeout']
def __init__(self, **kwargs):
'''
patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...]
'''
Operation.__init__(self, **kwargs)
Process.__init__(self)
self.server = kwargs.get('server')
self.username = kwargs.get('username')
self.password = kwargs.get('password')
self.patterns = kwargs.get('patterns')
Update plot codes, add meta attribute to dataOut to send metadata to plots
r1139 self.timeout = kwargs.get('timeout', 30)
New Operation SendToFTP
r1135 self.times = [time.time() for p in self.patterns]
self.latest = ['' for p in self.patterns]
self.mp = False
self.ftp = None
def setup(self):
log.log('Connecting to ftp://{}'.format(self.server), self.name)
try:
self.ftp = ftplib.FTP(self.server, timeout=self.timeout)
except ftplib.all_errors:
log.error('Server connection fail: {}'.format(self.server), self.name)
if self.ftp is not None:
self.ftp.close()
self.ftp = None
self.isConfig = False
return
try:
self.ftp.login(self.username, self.password)
except ftplib.all_errors:
log.error('The given username y/o password are incorrect', self.name)
if self.ftp is not None:
self.ftp.close()
self.ftp = None
self.isConfig = False
return
log.success('Connection success', self.name)
self.isConfig = True
return
def check(self):
try:
self.ftp.voidcmd("NOOP")
except:
log.warning('Connection lost... trying to reconnect', self.name)
if self.ftp is not None:
self.ftp.close()
self.ftp = None
self.setup()
def find_files(self, path, ext):
files = glob.glob1(path, '*{}'.format(ext))
files.sort()
if files:
return files[-1]
return None
def getftpname(self, filename, exp_code, sub_exp_code):
thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year
DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday
exp_code = '%3.3d'%exp_code
sub_exp_code = '%2.2d'%sub_exp_code
Fix SendByFTP and add plot codes
r1142 plot_code = '%2.2d'% get_plot_code(filename)
New Operation SendToFTP
r1135 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
return name
def upload(self, src, dst):
log.log('Uploading {} '.format(src), self.name, nl=False)
fp = open(src, 'rb')
command = 'STOR {}'.format(dst)
try:
self.ftp.storbinary(command, fp, blocksize=1024)
Fix SendByFTP and add plot codes
r1142 except Exception, e:
New Operation SendToFTP
r1135 log.error('{}'.format(e), self.name)
if self.ftp is not None:
self.ftp.close()
self.ftp = None
Fix SendByFTP and add plot codes
r1142 return 0
New Operation SendToFTP
r1135
try:
self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
Fix SendByFTP and add plot codes
r1142 except Exception, e:
New Operation SendToFTP
r1135 log.error('{}'.format(e), self.name)
if self.ftp is not None:
self.ftp.close()
self.ftp = None
Fix SendByFTP and add plot codes
r1142 return 0
New Operation SendToFTP
r1135
fp.close()
log.success('OK', tag='')
Fix SendByFTP and add plot codes
r1142 return 1
New Operation SendToFTP
r1135
def send_files(self):
for x, pattern in enumerate(self.patterns):
local, remote, ext, delay, exp_code, sub_exp_code = pattern
if time.time()-self.times[x] >= delay:
Fix SendByFTP and add plot codes
r1142 srcname = self.find_files(local, ext)
src = os.path.join(local, srcname)
if os.path.getmtime(src) < time.time() - 30*60:
continue
New Operation SendToFTP
r1135 if srcname is None or srcname == self.latest[x]:
continue
if 'png' in ext:
dstname = self.getftpname(srcname, exp_code, sub_exp_code)
else:
Fix SendByFTP and add plot codes
r1142 dstname = srcname
New Operation SendToFTP
r1135
dst = os.path.join(remote, dstname)
Fix SendByFTP and add plot codes
r1142 if self.upload(src, dst):
self.times[x] = time.time()
self.latest[x] = srcname
else:
self.isConfig = False
break
New Operation SendToFTP
r1135
def run(self):
while True:
if not self.isConfig:
self.setup()
if self.ftp is not None:
self.check()
Fix SendByFTP and add plot codes
r1142 self.send_files()
Update plot codes, add meta attribute to dataOut to send metadata to plots
r1139 time.sleep(10)
New Operation SendToFTP
r1135
def close():
if self.ftp is not None:
Update plot codes, add meta attribute to dataOut to send metadata to plots
r1139 self.ftp.close()
New Operation SendToFTP
r1135 self.terminate()