jroutils_publish.py
358 lines
| 9.8 KiB
| text/x-python
|
PythonLexer
r1334 | # Copyright (c) 2012-2020 Jicamarca Radio Observatory | |||
# All rights reserved. | ||||
# | ||||
# Distributed under the terms of the BSD 3-clause license. | ||||
"""Utilities for publish/send data, files & plots over different protocols | ||||
""" | ||||
|
r859 | |||
r1135 | import os | |||
import glob | ||||
|
r859 | import time | ||
import json | ||||
import numpy | ||||
|
r883 | import zmq | ||
import datetime | ||||
r1135 | import ftplib | |||
|
r883 | from functools import wraps | ||
from threading import Thread | ||||
from multiprocessing import Process | ||||
|
r859 | |||
|
r1209 | from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit, MPDecorator | ||
|
r957 | from schainpy.model.data.jrodata import JROData | ||
|
r1062 | from schainpy.utils import log | ||
|
r883 | |||
r1135 | ||||
PLOT_CODES = { | ||||
r1139 | 'rti': 0, # Range time intensity (RTI). | |||
'spc': 1, # Spectra (and Cross-spectra) information. | ||||
'cspc': 2, # Cross-Correlation information. | ||||
'coh': 3, # Coherence map. | ||||
'base': 4, # Base lines graphic. | ||||
'row': 5, # Row Spectra. | ||||
r1142 | 'total': 6, # Total Power. | |||
'drift': 7, # Drifts graphics. | ||||
'height': 8, # Height profile. | ||||
'phase': 9, # Signal Phase. | ||||
'power': 16, | ||||
'noise': 17, | ||||
'beacon': 18, | ||||
'wind': 22, | ||||
'skymap': 23, | ||||
'Unknown': 24, | ||||
'V-E': 25, # PIP Velocity. | ||||
'Z-E': 26, # PIP Reflectivity. | ||||
'V-A': 27, # RHI Velocity. | ||||
'Z-A': 28, # RHI Reflectivity. | ||||
r1135 | } | |||
|
r859 | |||
r1142 | def get_plot_code(s): | |||
label = s.split('_')[0] | ||||
codes = [key for key in PLOT_CODES if key in label] | ||||
r1279 | if codes: | |||
r1142 | return PLOT_CODES[codes[0]] | |||
else: | ||||
return 24 | ||||
|
r859 | |||
|
r860 | |||
|
r859 | class PublishData(Operation): | ||
|
r1062 | ''' | ||
Operation to send data over zmq. | ||||
''' | ||||
|
r860 | |||
|
r1187 | __attrs__ = ['host', 'port', 'delay', 'verbose'] | ||
|
r1097 | |||
|
r1187 | def setup(self, server='zmq.pipe', delay=0, verbose=True, **kwargs): | ||
|
r883 | self.counter = 0 | ||
|
r859 | self.delay = kwargs.get('delay', 0) | ||
self.cnt = 0 | ||||
r1279 | self.verbose = verbose | |||
|
r1187 | context = zmq.Context() | ||
self.zmq_socket = context.socket(zmq.PUSH) | ||||
server = kwargs.get('server', 'zmq.pipe') | ||||
if 'tcp://' in server: | ||||
address = server | ||||
else: | ||||
address = 'ipc:///tmp/%s' % server | ||||
self.zmq_socket.connect(address) | ||||
time.sleep(1) | ||||
|
r883 | |||
|
r931 | |||
|
r883 | def publish_data(self): | ||
self.dataOut.finished = False | ||||
r1279 | ||||
|
r1187 | if self.verbose: | ||
log.log( | ||||
'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime), | ||||
self.name | ||||
) | ||||
self.zmq_socket.send_pyobj(self.dataOut) | ||||
|
r883 | |||
def run(self, dataOut, **kwargs): | ||||
|
r860 | self.dataOut = dataOut | ||
if not self.isConfig: | ||||
|
r883 | self.setup(**kwargs) | ||
|
r859 | self.isConfig = True | ||
|
r860 | |||
|
r883 | self.publish_data() | ||
|
r860 | time.sleep(self.delay) | ||
|
r859 | def close(self): | ||
r1279 | ||||
|
r1187 | self.dataOut.finished = True | ||
self.zmq_socket.send_pyobj(self.dataOut) | ||||
time.sleep(0.1) | ||||
self.zmq_socket.close() | ||||
r1279 | ||||
|
r957 | |||
class ReceiverData(ProcessingUnit): | ||||
|
r1097 | __attrs__ = ['server'] | ||
|
r957 | def __init__(self, **kwargs): | ||
ProcessingUnit.__init__(self, **kwargs) | ||||
self.isConfig = False | ||||
server = kwargs.get('server', 'zmq.pipe') | ||||
if 'tcp://' in server: | ||||
address = server | ||||
else: | ||||
address = 'ipc:///tmp/%s' % server | ||||
self.address = address | ||||
self.dataOut = JROData() | ||||
def setup(self): | ||||
self.context = zmq.Context() | ||||
self.receiver = self.context.socket(zmq.PULL) | ||||
self.receiver.bind(self.address) | ||||
time.sleep(0.5) | ||||
|
r1062 | log.success('ReceiverData from {}'.format(self.address)) | ||
|
r957 | |||
def run(self): | ||||
if not self.isConfig: | ||||
self.setup() | ||||
self.isConfig = True | ||||
self.dataOut = self.receiver.recv_pyobj() | ||||
|
r1062 | log.log('{} - {}'.format(self.dataOut.type, | ||
self.dataOut.datatime.ctime(),), | ||||
'Receiving') | ||||
|
r957 | |||
|
r1209 | @MPDecorator | ||
class SendToFTP(Operation): | ||||
r1334 | """Operation for send files over FTP | |||
This operation is used to send files over FTP, you can send different files | ||||
from different folders by adding as many `pattern` as you wish. | ||||
Parameters: | ||||
----------- | ||||
server : str | ||||
FTP server address. | ||||
username : str | ||||
FTP username | ||||
password : str | ||||
FTP password | ||||
timeout : int | ||||
timeout to restart the connection | ||||
patternX : list | ||||
detail of files to be send must have the following order: local, remote | ||||
r1370 | ext, period, exp_code, sub_exp_code | |||
r1334 | Example: | |||
-------- | ||||
r1370 | ||||
r1334 | ftp = proc_unit.addOperation(name='SendToFTP', optype='external') | |||
ftp.addParameter(name='server', value='jro-app.igp.gob.pe') | ||||
ftp.addParameter(name='username', value='wmaster') | ||||
ftp.addParameter(name='password', value='mst2010vhf') | ||||
ftp.addParameter( | ||||
r1370 | name='pattern1', | |||
r1334 | value='/local/path/rti,/remote/path,png,300,11,0' | |||
) | ||||
ftp.addParameter( | ||||
name='pattern2', | ||||
value='/local/path/spc,/remote/path,png,300,11,0' | ||||
) | ||||
ftp.addParameter( | ||||
r1370 | name='pattern3', | |||
r1334 | value='/local/path/param,/remote/path,hdf5,300,,' | |||
) | ||||
""" | ||||
r1135 | ||||
|
r1209 | __attrs__ = ['server', 'username', 'password', 'timeout', 'patternX'] | ||
r1135 | ||||
|
r1209 | def __init__(self): | ||
r1135 | ''' | |||
''' | ||||
|
r1209 | Operation.__init__(self) | ||
r1135 | self.ftp = None | |||
|
r1209 | self.ready = False | ||
r1337 | self.current_time = time.time() | |||
r1135 | ||||
|
r1209 | def setup(self, server, username, password, timeout, **kwargs): | ||
''' | ||||
''' | ||||
self.server = server | ||||
self.username = username | ||||
self.password = password | ||||
self.timeout = timeout | ||||
self.patterns = [] | ||||
self.times = [] | ||||
self.latest = [] | ||||
for arg, value in kwargs.items(): | ||||
if 'pattern' in arg: | ||||
self.patterns.append(value) | ||||
r1334 | self.times.append(0) | |||
|
r1209 | self.latest.append('') | ||
def connect(self): | ||||
''' | ||||
''' | ||||
r1135 | ||||
log.log('Connecting to ftp://{}'.format(self.server), self.name) | ||||
try: | ||||
r1337 | self.ftp = ftplib.FTP(self.server, timeout=1) | |||
r1135 | except ftplib.all_errors: | |||
log.error('Server connection fail: {}'.format(self.server), self.name) | ||||
if self.ftp is not None: | ||||
self.ftp.close() | ||||
self.ftp = None | ||||
|
r1209 | self.ready = False | ||
r1279 | return | |||
r1135 | ||||
try: | ||||
self.ftp.login(self.username, self.password) | ||||
except ftplib.all_errors: | ||||
log.error('The given username y/o password are incorrect', self.name) | ||||
if self.ftp is not None: | ||||
self.ftp.close() | ||||
self.ftp = None | ||||
|
r1209 | self.ready = False | ||
r1135 | return | |||
log.success('Connection success', self.name) | ||||
|
r1209 | self.ready = True | ||
r1135 | return | |||
def check(self): | ||||
try: | ||||
r1337 | if not self.ready: | |||
if time.time()-self.current_time < self.timeout: | ||||
return | ||||
else: | ||||
self.current_time = time.time() | ||||
r1135 | self.ftp.voidcmd("NOOP") | |||
except: | ||||
log.warning('Connection lost... trying to reconnect', self.name) | ||||
if self.ftp is not None: | ||||
self.ftp.close() | ||||
self.ftp = None | ||||
|
r1209 | self.connect() | ||
r1135 | ||||
def find_files(self, path, ext): | ||||
r1334 | files = glob.glob1(path.strip(), '*{}'.format(ext.strip())) | |||
r1135 | files.sort() | |||
if files: | ||||
return files[-1] | ||||
return None | ||||
def getftpname(self, filename, exp_code, sub_exp_code): | ||||
thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d') | ||||
|
r1209 | YEAR_STR = '%4.4d' % thisDatetime.timetuple().tm_year | ||
DOY_STR = '%3.3d' % thisDatetime.timetuple().tm_yday | ||||
exp_code = '%3.3d' % exp_code | ||||
sub_exp_code = '%2.2d' % sub_exp_code | ||||
plot_code = '%2.2d' % get_plot_code(filename) | ||||
r1135 | name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png' | |||
return name | ||||
def upload(self, src, dst): | ||||
|
r1209 | log.log('Uploading {} -> {} '.format( | ||
r1279 | src.split('/')[-1], dst.split('/')[-1]), | |||
self.name, | ||||
|
r1209 | nl=False | ||
) | ||||
r1135 | ||||
fp = open(src, 'rb') | ||||
command = 'STOR {}'.format(dst) | ||||
try: | ||||
self.ftp.storbinary(command, fp, blocksize=1024) | ||||
|
r1167 | except Exception as e: | ||
r1135 | log.error('{}'.format(e), self.name) | |||
r1142 | return 0 | |||
r1135 | ||||
try: | ||||
self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst)) | ||||
|
r1167 | except Exception as e: | ||
r1135 | log.error('{}'.format(e), self.name) | |||
r1142 | return 0 | |||
r1135 | ||||
fp.close() | ||||
log.success('OK', tag='') | ||||
r1142 | return 1 | |||
r1279 | ||||
r1135 | def send_files(self): | |||
for x, pattern in enumerate(self.patterns): | ||||
|
r1209 | local, remote, ext, period, exp_code, sub_exp_code = pattern | ||
r1279 | ||||
r1334 | if (self.dataOut.utctime - self.times[x]) < int(period): | |||
continue | ||||
r1135 | ||||
r1334 | srcname = self.find_files(local, ext) | |||
r1370 | ||||
r1334 | if srcname is None: | |||
continue | ||||
r1370 | ||||
r1334 | if srcname == self.latest[x]: | |||
r1370 | log.warning('File alreday uploaded {}'.format(srcname)) | |||
r1334 | continue | |||
r1370 | ||||
r1334 | if exp_code.strip(): | |||
dstname = self.getftpname(srcname, int(exp_code), int(sub_exp_code)) | ||||
else: | ||||
r1370 | dstname = srcname | |||
r1135 | ||||
r1334 | src = os.path.join(local, srcname) | |||
dst = os.path.join(remote.strip(), dstname) | ||||
if self.upload(src, dst): | ||||
self.times[x] = self.dataOut.utctime | ||||
r1370 | self.latest[x] = srcname | |||
r1135 | ||||
r1337 | def run(self, dataOut, server, username, password, timeout=60, **kwargs): | |||
r1135 | ||||
|
r1209 | if not self.isConfig: | ||
self.setup( | ||||
r1279 | server=server, | |||
username=username, | ||||
password=password, | ||||
timeout=timeout, | ||||
|
r1209 | **kwargs | ||
) | ||||
self.isConfig = True | ||||
self.connect() | ||||
r1370 | ||||
r1334 | self.dataOut = dataOut | |||
self.check() | ||||
r1337 | if self.ready: | |||
|
r1209 | self.send_files() | ||
r1135 | ||||
|
r1209 | def close(self): | ||
r1135 | ||||
if self.ftp is not None: | ||||
r1139 | self.ftp.close() | |||