##// END OF EJS Templates
merged branches
merged branches

File last commit:

r1370:81f892b894eb merge
r1370:81f892b894eb merge
Show More
jroutils_publish.py
358 lines | 9.8 KiB | text/x-python | PythonLexer
Fix xmin & xmax in plots, fix SendToFTP
r1334 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
# All rights reserved.
#
# Distributed under the terms of the BSD 3-clause license.
"""Utilities for publish/send data, files & plots over different protocols
"""
Juan C. Valdez
Add Operation publish using MQTT
r859
New Operation SendToFTP
r1135 import os
import glob
Juan C. Valdez
Add Operation publish using MQTT
r859 import time
import json
import numpy
Juan C. Valdez
zmq support in PublishData
r883 import zmq
import datetime
New Operation SendToFTP
r1135 import ftplib
Juan C. Valdez
zmq support in PublishData
r883 from functools import wraps
from threading import Thread
from multiprocessing import Process
Juan C. Valdez
Add Operation publish using MQTT
r859
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit, MPDecorator
Juan C. Espinoza
Now there are two receiver units one for data and one for plots
r957 from schainpy.model.data.jrodata import JROData
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 from schainpy.utils import log
Juan C. Valdez
zmq support in PublishData
r883
New Operation SendToFTP
r1135
PLOT_CODES = {
Update plot codes, add meta attribute to dataOut to send metadata to plots
r1139 'rti': 0, # Range time intensity (RTI).
'spc': 1, # Spectra (and Cross-spectra) information.
'cspc': 2, # Cross-Correlation information.
'coh': 3, # Coherence map.
'base': 4, # Base lines graphic.
'row': 5, # Row Spectra.
Fix SendByFTP and add plot codes
r1142 'total': 6, # Total Power.
'drift': 7, # Drifts graphics.
'height': 8, # Height profile.
'phase': 9, # Signal Phase.
'power': 16,
'noise': 17,
'beacon': 18,
'wind': 22,
'skymap': 23,
'Unknown': 24,
'V-E': 25, # PIP Velocity.
'Z-E': 26, # PIP Reflectivity.
'V-A': 27, # RHI Velocity.
'Z-A': 28, # RHI Reflectivity.
New Operation SendToFTP
r1135 }
Juan C. Valdez
Add Operation publish using MQTT
r859
Fix SendByFTP and add plot codes
r1142 def get_plot_code(s):
label = s.split('_')[0]
codes = [key for key in PLOT_CODES if key in label]
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 if codes:
Fix SendByFTP and add plot codes
r1142 return PLOT_CODES[codes[0]]
else:
return 24
Juan C. Valdez
Add Operation publish using MQTT
r859
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Valdez
Add Operation publish using MQTT
r859 class PublishData(Operation):
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 '''
Operation to send data over zmq.
'''
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 __attrs__ = ['host', 'port', 'delay', 'verbose']
Juan C. Espinoza
Add __attrs__ attribute to Process clasess to improve CLI finder
r1097
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 def setup(self, server='zmq.pipe', delay=0, verbose=True, **kwargs):
Juan C. Valdez
zmq support in PublishData
r883 self.counter = 0
Juan C. Valdez
Add Operation publish using MQTT
r859 self.delay = kwargs.get('delay', 0)
self.cnt = 0
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 self.verbose = verbose
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 context = zmq.Context()
self.zmq_socket = context.socket(zmq.PUSH)
server = kwargs.get('server', 'zmq.pipe')
if 'tcp://' in server:
address = server
else:
address = 'ipc:///tmp/%s' % server
self.zmq_socket.connect(address)
time.sleep(1)
Juan C. Valdez
zmq support in PublishData
r883
José Chávez
finishing day, need testing
r931
Juan C. Valdez
zmq support in PublishData
r883 def publish_data(self):
self.dataOut.finished = False
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 if self.verbose:
log.log(
'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
self.name
)
self.zmq_socket.send_pyobj(self.dataOut)
Juan C. Valdez
zmq support in PublishData
r883
def run(self, dataOut, **kwargs):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 self.dataOut = dataOut
if not self.isConfig:
Juan C. Valdez
zmq support in PublishData
r883 self.setup(**kwargs)
Juan C. Valdez
Add Operation publish using MQTT
r859 self.isConfig = True
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Valdez
zmq support in PublishData
r883 self.publish_data()
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860 time.sleep(self.delay)
Juan C. Valdez
Add Operation publish using MQTT
r859 def close(self):
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 self.dataOut.finished = True
self.zmq_socket.send_pyobj(self.dataOut)
time.sleep(0.1)
self.zmq_socket.close()
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
Juan C. Espinoza
Now there are two receiver units one for data and one for plots
r957
class ReceiverData(ProcessingUnit):
Juan C. Espinoza
Add __attrs__ attribute to Process clasess to improve CLI finder
r1097 __attrs__ = ['server']
Juan C. Espinoza
Now there are two receiver units one for data and one for plots
r957 def __init__(self, **kwargs):
ProcessingUnit.__init__(self, **kwargs)
self.isConfig = False
server = kwargs.get('server', 'zmq.pipe')
if 'tcp://' in server:
address = server
else:
address = 'ipc:///tmp/%s' % server
self.address = address
self.dataOut = JROData()
def setup(self):
self.context = zmq.Context()
self.receiver = self.context.socket(zmq.PULL)
self.receiver.bind(self.address)
time.sleep(0.5)
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 log.success('ReceiverData from {}'.format(self.address))
Juan C. Espinoza
Now there are two receiver units one for data and one for plots
r957
def run(self):
if not self.isConfig:
self.setup()
self.isConfig = True
self.dataOut = self.receiver.recv_pyobj()
Juan C. Espinoza
Fix publish and plots operations issue #929
r1062 log.log('{} - {}'.format(self.dataOut.type,
self.dataOut.datatime.ctime(),),
'Receiving')
Juan C. Espinoza
Now there are two receiver units one for data and one for plots
r957
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 @MPDecorator
class SendToFTP(Operation):
Fix xmin & xmax in plots, fix SendToFTP
r1334 """Operation for send files over FTP
This operation is used to send files over FTP, you can send different files
from different folders by adding as many `pattern` as you wish.
Parameters:
-----------
server : str
FTP server address.
username : str
FTP username
password : str
FTP password
timeout : int
timeout to restart the connection
patternX : list
detail of files to be send must have the following order: local, remote
merged branches
r1370 ext, period, exp_code, sub_exp_code
Fix xmin & xmax in plots, fix SendToFTP
r1334 Example:
--------
merged branches
r1370
Fix xmin & xmax in plots, fix SendToFTP
r1334 ftp = proc_unit.addOperation(name='SendToFTP', optype='external')
ftp.addParameter(name='server', value='jro-app.igp.gob.pe')
ftp.addParameter(name='username', value='wmaster')
ftp.addParameter(name='password', value='mst2010vhf')
ftp.addParameter(
merged branches
r1370 name='pattern1',
Fix xmin & xmax in plots, fix SendToFTP
r1334 value='/local/path/rti,/remote/path,png,300,11,0'
)
ftp.addParameter(
name='pattern2',
value='/local/path/spc,/remote/path,png,300,11,0'
)
ftp.addParameter(
merged branches
r1370 name='pattern3',
Fix xmin & xmax in plots, fix SendToFTP
r1334 value='/local/path/param,/remote/path,hdf5,300,,'
)
"""
New Operation SendToFTP
r1135
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 __attrs__ = ['server', 'username', 'password', 'timeout', 'patternX']
New Operation SendToFTP
r1135
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 def __init__(self):
New Operation SendToFTP
r1135 '''
'''
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 Operation.__init__(self)
New Operation SendToFTP
r1135 self.ftp = None
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 self.ready = False
Fix SendToFTP
r1337 self.current_time = time.time()
New Operation SendToFTP
r1135
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 def setup(self, server, username, password, timeout, **kwargs):
'''
'''
self.server = server
self.username = username
self.password = password
self.timeout = timeout
self.patterns = []
self.times = []
self.latest = []
for arg, value in kwargs.items():
if 'pattern' in arg:
self.patterns.append(value)
Fix xmin & xmax in plots, fix SendToFTP
r1334 self.times.append(0)
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 self.latest.append('')
def connect(self):
'''
'''
New Operation SendToFTP
r1135
log.log('Connecting to ftp://{}'.format(self.server), self.name)
try:
Fix SendToFTP
r1337 self.ftp = ftplib.FTP(self.server, timeout=1)
New Operation SendToFTP
r1135 except ftplib.all_errors:
log.error('Server connection fail: {}'.format(self.server), self.name)
if self.ftp is not None:
self.ftp.close()
self.ftp = None
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 self.ready = False
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 return
New Operation SendToFTP
r1135
try:
self.ftp.login(self.username, self.password)
except ftplib.all_errors:
log.error('The given username y/o password are incorrect', self.name)
if self.ftp is not None:
self.ftp.close()
self.ftp = None
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 self.ready = False
New Operation SendToFTP
r1135 return
log.success('Connection success', self.name)
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 self.ready = True
New Operation SendToFTP
r1135 return
def check(self):
try:
Fix SendToFTP
r1337 if not self.ready:
if time.time()-self.current_time < self.timeout:
return
else:
self.current_time = time.time()
New Operation SendToFTP
r1135 self.ftp.voidcmd("NOOP")
except:
log.warning('Connection lost... trying to reconnect', self.name)
if self.ftp is not None:
self.ftp.close()
self.ftp = None
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 self.connect()
New Operation SendToFTP
r1135
def find_files(self, path, ext):
Fix xmin & xmax in plots, fix SendToFTP
r1334 files = glob.glob1(path.strip(), '*{}'.format(ext.strip()))
New Operation SendToFTP
r1135 files.sort()
if files:
return files[-1]
return None
def getftpname(self, filename, exp_code, sub_exp_code):
thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 YEAR_STR = '%4.4d' % thisDatetime.timetuple().tm_year
DOY_STR = '%3.3d' % thisDatetime.timetuple().tm_yday
exp_code = '%3.3d' % exp_code
sub_exp_code = '%2.2d' % sub_exp_code
plot_code = '%2.2d' % get_plot_code(filename)
New Operation SendToFTP
r1135 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
return name
def upload(self, src, dst):
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 log.log('Uploading {} -> {} '.format(
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 src.split('/')[-1], dst.split('/')[-1]),
self.name,
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 nl=False
)
New Operation SendToFTP
r1135
fp = open(src, 'rb')
command = 'STOR {}'.format(dst)
try:
self.ftp.storbinary(command, fp, blocksize=1024)
George Yong
Python 2to3, Spectra (all operations) working
r1167 except Exception as e:
New Operation SendToFTP
r1135 log.error('{}'.format(e), self.name)
Fix SendByFTP and add plot codes
r1142 return 0
New Operation SendToFTP
r1135
try:
self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
George Yong
Python 2to3, Spectra (all operations) working
r1167 except Exception as e:
New Operation SendToFTP
r1135 log.error('{}'.format(e), self.name)
Fix SendByFTP and add plot codes
r1142 return 0
New Operation SendToFTP
r1135
fp.close()
log.success('OK', tag='')
Fix SendByFTP and add plot codes
r1142 return 1
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
New Operation SendToFTP
r1135 def send_files(self):
for x, pattern in enumerate(self.patterns):
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 local, remote, ext, period, exp_code, sub_exp_code = pattern
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
Fix xmin & xmax in plots, fix SendToFTP
r1334 if (self.dataOut.utctime - self.times[x]) < int(period):
continue
New Operation SendToFTP
r1135
Fix xmin & xmax in plots, fix SendToFTP
r1334 srcname = self.find_files(local, ext)
merged branches
r1370
Fix xmin & xmax in plots, fix SendToFTP
r1334 if srcname is None:
continue
merged branches
r1370
Fix xmin & xmax in plots, fix SendToFTP
r1334 if srcname == self.latest[x]:
merged branches
r1370 log.warning('File alreday uploaded {}'.format(srcname))
Fix xmin & xmax in plots, fix SendToFTP
r1334 continue
merged branches
r1370
Fix xmin & xmax in plots, fix SendToFTP
r1334 if exp_code.strip():
dstname = self.getftpname(srcname, int(exp_code), int(sub_exp_code))
else:
merged branches
r1370 dstname = srcname
New Operation SendToFTP
r1135
Fix xmin & xmax in plots, fix SendToFTP
r1334 src = os.path.join(local, srcname)
dst = os.path.join(remote.strip(), dstname)
if self.upload(src, dst):
self.times[x] = self.dataOut.utctime
merged branches
r1370 self.latest[x] = srcname
New Operation SendToFTP
r1135
Fix SendToFTP
r1337 def run(self, dataOut, server, username, password, timeout=60, **kwargs):
New Operation SendToFTP
r1135
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 if not self.isConfig:
self.setup(
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 server=server,
username=username,
password=password,
timeout=timeout,
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 **kwargs
)
self.isConfig = True
self.connect()
merged branches
r1370
Fix xmin & xmax in plots, fix SendToFTP
r1334 self.dataOut = dataOut
self.check()
Fix SendToFTP
r1337 if self.ready:
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 self.send_files()
New Operation SendToFTP
r1135
Juan C. Espinoza
Fix SendToFTP operation for v3
r1209 def close(self):
New Operation SendToFTP
r1135
if self.ftp is not None:
Update plot codes, add meta attribute to dataOut to send metadata to plots
r1139 self.ftp.close()