##// END OF EJS Templates
New plotting architecture with buffering/throttle capabilities
New plotting architecture with buffering/throttle capabilities

File last commit:

r1187:66a3db7e736d
r1187:66a3db7e736d
Show More
jroutils_publish.py
308 lines | 8.6 KiB | text/x-python | PythonLexer
Juan C. Valdez
Add Operation publish using MQTT
r859 '''
@author: Juan C. Espinoza
'''
New Operation SendToFTP
r1135 import os
import glob
Juan C. Valdez
Add Operation publish using MQTT
r859 import time
import json
import numpy
Juan C. Valdez
zmq support in PublishData
r883 import zmq
import datetime
New Operation SendToFTP
r1135 import ftplib
Juan C. Valdez
zmq support in PublishData
r883 from functools import wraps
from threading import Thread
from multiprocessing import Process
Juan C. Valdez
Add Operation publish using MQTT
r859
Juan C. Valdez
zmq support in PublishData
r883 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
Juan C. Espinoza
Now there are two receiver units one for data and one for plots
r957 from schainpy.model.data.jrodata import JROData
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 from schainpy.utils import log
Juan C. Valdez
zmq support in PublishData
r883
New Operation SendToFTP
r1135 MAXNUMX = 500
MAXNUMY = 500
PLOT_CODES = {
Update plot codes, add meta attribute to dataOut to send metadata to plots
r1139 'rti': 0, # Range time intensity (RTI).
'spc': 1, # Spectra (and Cross-spectra) information.
'cspc': 2, # Cross-Correlation information.
'coh': 3, # Coherence map.
'base': 4, # Base lines graphic.
'row': 5, # Row Spectra.
Fix SendByFTP and add plot codes
r1142 'total': 6, # Total Power.
'drift': 7, # Drifts graphics.
'height': 8, # Height profile.
'phase': 9, # Signal Phase.
'power': 16,
'noise': 17,
'beacon': 18,
'wind': 22,
'skymap': 23,
'Unknown': 24,
'V-E': 25, # PIP Velocity.
'Z-E': 26, # PIP Reflectivity.
'V-A': 27, # RHI Velocity.
'Z-A': 28, # RHI Reflectivity.
New Operation SendToFTP
r1135 }
Juan C. Valdez
Add Operation publish using MQTT
r859
Fix SendByFTP and add plot codes
r1142 def get_plot_code(s):
label = s.split('_')[0]
codes = [key for key in PLOT_CODES if key in label]
if codes:
return PLOT_CODES[codes[0]]
else:
return 24
Juan C. Valdez
Add Operation publish using MQTT
r859
José Chávez
finishing day, need testing
r931 def decimate(z, MAXNUMY):
José Chávez
receiver data modificado para web
r904 dy = int(len(z[0])/MAXNUMY) + 1
Fix SendToWeb operation
r911
José Chávez
receiver data modificado para web
r904 return z[::, ::dy]
Juan C. Valdez
Add Operation publish using MQTT
r859
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Valdez
Add Operation publish using MQTT
r859 class PublishData(Operation):
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 '''
Operation to send data over zmq.
'''
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 __attrs__ = ['host', 'port', 'delay', 'verbose']
Juan C. Espinoza
Add __attrs__ attribute to Process clasess to improve CLI finder
r1097
Juan C. Valdez
zmq support in PublishData
r883 def __init__(self, **kwargs):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 """Inicio."""
Juan C. Valdez
zmq support in PublishData
r883 Operation.__init__(self, **kwargs)
Juan C. Valdez
Add Operation publish using MQTT
r859 self.isConfig = False
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
def setup(self, server='zmq.pipe', delay=0, verbose=True, **kwargs):
Juan C. Valdez
zmq support in PublishData
r883 self.counter = 0
Juan C. Valdez
Add Operation publish using MQTT
r859 self.delay = kwargs.get('delay', 0)
self.cnt = 0
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 self.verbose = verbose
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 setup = []
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 context = zmq.Context()
self.zmq_socket = context.socket(zmq.PUSH)
server = kwargs.get('server', 'zmq.pipe')
if 'tcp://' in server:
address = server
else:
address = 'ipc:///tmp/%s' % server
self.zmq_socket.connect(address)
time.sleep(1)
Juan C. Valdez
zmq support in PublishData
r883
José Chávez
finishing day, need testing
r931
Juan C. Valdez
zmq support in PublishData
r883 def publish_data(self):
self.dataOut.finished = False
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
if self.verbose:
log.log(
'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
self.name
)
self.zmq_socket.send_pyobj(self.dataOut)
Juan C. Valdez
zmq support in PublishData
r883
def run(self, dataOut, **kwargs):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 self.dataOut = dataOut
if not self.isConfig:
Juan C. Valdez
zmq support in PublishData
r883 self.setup(**kwargs)
Juan C. Valdez
Add Operation publish using MQTT
r859 self.isConfig = True
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Valdez
zmq support in PublishData
r883 self.publish_data()
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 time.sleep(self.delay)
Juan C. Valdez
Add Operation publish using MQTT
r859 def close(self):
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
self.dataOut.finished = True
self.zmq_socket.send_pyobj(self.dataOut)
time.sleep(0.1)
self.zmq_socket.close()
Juan C. Espinoza
Now there are two receiver units one for data and one for plots
r957
class ReceiverData(ProcessingUnit):
Juan C. Espinoza
Add __attrs__ attribute to Process clasess to improve CLI finder
r1097 __attrs__ = ['server']
Juan C. Espinoza
Now there are two receiver units one for data and one for plots
r957 def __init__(self, **kwargs):
ProcessingUnit.__init__(self, **kwargs)
self.isConfig = False
server = kwargs.get('server', 'zmq.pipe')
if 'tcp://' in server:
address = server
else:
address = 'ipc:///tmp/%s' % server
self.address = address
self.dataOut = JROData()
def setup(self):
self.context = zmq.Context()
self.receiver = self.context.socket(zmq.PULL)
self.receiver.bind(self.address)
time.sleep(0.5)
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 log.success('ReceiverData from {}'.format(self.address))
Juan C. Espinoza
Now there are two receiver units one for data and one for plots
r957
def run(self):
if not self.isConfig:
self.setup()
self.isConfig = True
self.dataOut = self.receiver.recv_pyobj()
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 log.log('{} - {}'.format(self.dataOut.type,
self.dataOut.datatime.ctime(),),
'Receiving')
Juan C. Espinoza
Now there are two receiver units one for data and one for plots
r957
New Operation SendToFTP
r1135 class SendToFTP(Operation, Process):
'''
Operation to send data over FTP.
'''
__attrs__ = ['server', 'username', 'password', 'patterns', 'timeout']
def __init__(self, **kwargs):
'''
patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...]
'''
Operation.__init__(self, **kwargs)
Process.__init__(self)
self.server = kwargs.get('server')
self.username = kwargs.get('username')
self.password = kwargs.get('password')
self.patterns = kwargs.get('patterns')
Update plot codes, add meta attribute to dataOut to send metadata to plots
r1139 self.timeout = kwargs.get('timeout', 30)
New Operation SendToFTP
r1135 self.times = [time.time() for p in self.patterns]
self.latest = ['' for p in self.patterns]
self.mp = False
self.ftp = None
def setup(self):
log.log('Connecting to ftp://{}'.format(self.server), self.name)
try:
self.ftp = ftplib.FTP(self.server, timeout=self.timeout)
except ftplib.all_errors:
log.error('Server connection fail: {}'.format(self.server), self.name)
if self.ftp is not None:
self.ftp.close()
self.ftp = None
self.isConfig = False
return
try:
self.ftp.login(self.username, self.password)
except ftplib.all_errors:
log.error('The given username y/o password are incorrect', self.name)
if self.ftp is not None:
self.ftp.close()
self.ftp = None
self.isConfig = False
return
log.success('Connection success', self.name)
self.isConfig = True
return
def check(self):
try:
self.ftp.voidcmd("NOOP")
except:
log.warning('Connection lost... trying to reconnect', self.name)
if self.ftp is not None:
self.ftp.close()
self.ftp = None
self.setup()
def find_files(self, path, ext):
files = glob.glob1(path, '*{}'.format(ext))
files.sort()
if files:
return files[-1]
return None
def getftpname(self, filename, exp_code, sub_exp_code):
thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year
DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday
exp_code = '%3.3d'%exp_code
sub_exp_code = '%2.2d'%sub_exp_code
Fix SendByFTP and add plot codes
r1142 plot_code = '%2.2d'% get_plot_code(filename)
New Operation SendToFTP
r1135 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
return name
def upload(self, src, dst):
log.log('Uploading {} '.format(src), self.name, nl=False)
fp = open(src, 'rb')
command = 'STOR {}'.format(dst)
try:
self.ftp.storbinary(command, fp, blocksize=1024)
George Yong
Python 2to3, Spectra (all operations) working
r1167 except Exception as e:
New Operation SendToFTP
r1135 log.error('{}'.format(e), self.name)
if self.ftp is not None:
self.ftp.close()
self.ftp = None
Fix SendByFTP and add plot codes
r1142 return 0
New Operation SendToFTP
r1135
try:
self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
George Yong
Python 2to3, Spectra (all operations) working
r1167 except Exception as e:
New Operation SendToFTP
r1135 log.error('{}'.format(e), self.name)
if self.ftp is not None:
self.ftp.close()
self.ftp = None
Fix SendByFTP and add plot codes
r1142 return 0
New Operation SendToFTP
r1135
fp.close()
log.success('OK', tag='')
Fix SendByFTP and add plot codes
r1142 return 1
New Operation SendToFTP
r1135
def send_files(self):
for x, pattern in enumerate(self.patterns):
local, remote, ext, delay, exp_code, sub_exp_code = pattern
if time.time()-self.times[x] >= delay:
Fix SendByFTP and add plot codes
r1142 srcname = self.find_files(local, ext)
src = os.path.join(local, srcname)
if os.path.getmtime(src) < time.time() - 30*60:
continue
New Operation SendToFTP
r1135 if srcname is None or srcname == self.latest[x]:
continue
if 'png' in ext:
dstname = self.getftpname(srcname, exp_code, sub_exp_code)
else:
Fix SendByFTP and add plot codes
r1142 dstname = srcname
New Operation SendToFTP
r1135
dst = os.path.join(remote, dstname)
Fix SendByFTP and add plot codes
r1142 if self.upload(src, dst):
self.times[x] = time.time()
self.latest[x] = srcname
else:
self.isConfig = False
break
New Operation SendToFTP
r1135
def run(self):
while True:
if not self.isConfig:
self.setup()
if self.ftp is not None:
self.check()
Fix SendByFTP and add plot codes
r1142 self.send_files()
Update plot codes, add meta attribute to dataOut to send metadata to plots
r1139 time.sleep(10)
New Operation SendToFTP
r1135
def close():
if self.ftp is not None:
Update plot codes, add meta attribute to dataOut to send metadata to plots
r1139 self.ftp.close()
George Yong
Python 2to3, Spectra (all operations) working
r1167 self.terminate()