jroutils_publish.py
326 lines
| 9.0 KiB
| text/x-python
|
PythonLexer
|
r859 | ''' | ||
@author: Juan C. Espinoza | ||||
''' | ||||
r1135 | import os | |||
import glob | ||||
|
r859 | import time | ||
import json | ||||
import numpy | ||||
|
r883 | import zmq | ||
import datetime | ||||
r1135 | import ftplib | |||
|
r883 | from functools import wraps | ||
from threading import Thread | ||||
from multiprocessing import Process | ||||
|
r859 | |||
|
r1209 | from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit, MPDecorator | ||
|
r957 | from schainpy.model.data.jrodata import JROData | ||
|
r1062 | from schainpy.utils import log | ||
|
r883 | |||
r1135 | MAXNUMX = 500 | |||
MAXNUMY = 500 | ||||
PLOT_CODES = { | ||||
r1139 | 'rti': 0, # Range time intensity (RTI). | |||
'spc': 1, # Spectra (and Cross-spectra) information. | ||||
'cspc': 2, # Cross-Correlation information. | ||||
'coh': 3, # Coherence map. | ||||
'base': 4, # Base lines graphic. | ||||
'row': 5, # Row Spectra. | ||||
r1142 | 'total': 6, # Total Power. | |||
'drift': 7, # Drifts graphics. | ||||
'height': 8, # Height profile. | ||||
'phase': 9, # Signal Phase. | ||||
'power': 16, | ||||
'noise': 17, | ||||
'beacon': 18, | ||||
'wind': 22, | ||||
'skymap': 23, | ||||
'Unknown': 24, | ||||
'V-E': 25, # PIP Velocity. | ||||
'Z-E': 26, # PIP Reflectivity. | ||||
'V-A': 27, # RHI Velocity. | ||||
'Z-A': 28, # RHI Reflectivity. | ||||
r1135 | } | |||
|
r859 | |||
r1142 | def get_plot_code(s): | |||
label = s.split('_')[0] | ||||
codes = [key for key in PLOT_CODES if key in label] | ||||
if codes: | ||||
return PLOT_CODES[codes[0]] | ||||
else: | ||||
return 24 | ||||
|
r859 | |||
|
r931 | def decimate(z, MAXNUMY): | ||
|
r904 | dy = int(len(z[0])/MAXNUMY) + 1 | ||
r911 | ||||
|
r904 | return z[::, ::dy] | ||
|
r859 | |||
|
r860 | |||
|
r859 | class PublishData(Operation): | ||
|
r1062 | ''' | ||
Operation to send data over zmq. | ||||
''' | ||||
|
r860 | |||
|
r1187 | __attrs__ = ['host', 'port', 'delay', 'verbose'] | ||
|
r1097 | |||
|
r1187 | def setup(self, server='zmq.pipe', delay=0, verbose=True, **kwargs): | ||
|
r883 | self.counter = 0 | ||
|
r859 | self.delay = kwargs.get('delay', 0) | ||
self.cnt = 0 | ||||
|
r1062 | self.verbose = verbose | ||
|
r1187 | context = zmq.Context() | ||
self.zmq_socket = context.socket(zmq.PUSH) | ||||
server = kwargs.get('server', 'zmq.pipe') | ||||
if 'tcp://' in server: | ||||
address = server | ||||
else: | ||||
address = 'ipc:///tmp/%s' % server | ||||
self.zmq_socket.connect(address) | ||||
time.sleep(1) | ||||
|
r883 | |||
|
r931 | |||
|
r883 | def publish_data(self): | ||
self.dataOut.finished = False | ||||
|
r1187 | |||
if self.verbose: | ||||
log.log( | ||||
'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime), | ||||
self.name | ||||
) | ||||
self.zmq_socket.send_pyobj(self.dataOut) | ||||
|
r883 | |||
def run(self, dataOut, **kwargs): | ||||
|
r860 | self.dataOut = dataOut | ||
if not self.isConfig: | ||||
|
r883 | self.setup(**kwargs) | ||
|
r859 | self.isConfig = True | ||
|
r860 | |||
|
r883 | self.publish_data() | ||
|
r860 | time.sleep(self.delay) | ||
|
r859 | def close(self): | ||
|
r1187 | |||
self.dataOut.finished = True | ||||
self.zmq_socket.send_pyobj(self.dataOut) | ||||
time.sleep(0.1) | ||||
self.zmq_socket.close() | ||||
|
r957 | |||
class ReceiverData(ProcessingUnit): | ||||
|
r1097 | __attrs__ = ['server'] | ||
|
r957 | def __init__(self, **kwargs): | ||
ProcessingUnit.__init__(self, **kwargs) | ||||
self.isConfig = False | ||||
server = kwargs.get('server', 'zmq.pipe') | ||||
if 'tcp://' in server: | ||||
address = server | ||||
else: | ||||
address = 'ipc:///tmp/%s' % server | ||||
self.address = address | ||||
self.dataOut = JROData() | ||||
def setup(self): | ||||
self.context = zmq.Context() | ||||
self.receiver = self.context.socket(zmq.PULL) | ||||
self.receiver.bind(self.address) | ||||
time.sleep(0.5) | ||||
|
r1062 | log.success('ReceiverData from {}'.format(self.address)) | ||
|
r957 | |||
def run(self): | ||||
if not self.isConfig: | ||||
self.setup() | ||||
self.isConfig = True | ||||
self.dataOut = self.receiver.recv_pyobj() | ||||
|
r1062 | log.log('{} - {}'.format(self.dataOut.type, | ||
self.dataOut.datatime.ctime(),), | ||||
'Receiving') | ||||
|
r957 | |||
|
r1209 | @MPDecorator | ||
class SendToFTP(Operation): | ||||
r1135 | ||||
''' | ||||
Operation to send data over FTP. | ||||
|
r1209 | patternX = 'local, remote, ext, period, exp_code, sub_exp_code' | ||
r1135 | ''' | |||
|
r1209 | __attrs__ = ['server', 'username', 'password', 'timeout', 'patternX'] | ||
r1135 | ||||
|
r1209 | def __init__(self): | ||
r1135 | ''' | |||
''' | ||||
|
r1209 | Operation.__init__(self) | ||
r1135 | self.ftp = None | |||
|
r1209 | self.ready = False | ||
r1135 | ||||
|
r1209 | def setup(self, server, username, password, timeout, **kwargs): | ||
''' | ||||
''' | ||||
self.server = server | ||||
self.username = username | ||||
self.password = password | ||||
self.timeout = timeout | ||||
self.patterns = [] | ||||
self.times = [] | ||||
self.latest = [] | ||||
for arg, value in kwargs.items(): | ||||
if 'pattern' in arg: | ||||
self.patterns.append(value) | ||||
self.times.append(time.time()) | ||||
self.latest.append('') | ||||
def connect(self): | ||||
''' | ||||
''' | ||||
r1135 | ||||
log.log('Connecting to ftp://{}'.format(self.server), self.name) | ||||
try: | ||||
self.ftp = ftplib.FTP(self.server, timeout=self.timeout) | ||||
except ftplib.all_errors: | ||||
log.error('Server connection fail: {}'.format(self.server), self.name) | ||||
if self.ftp is not None: | ||||
self.ftp.close() | ||||
self.ftp = None | ||||
|
r1209 | self.ready = False | ||
r1135 | return | |||
try: | ||||
self.ftp.login(self.username, self.password) | ||||
except ftplib.all_errors: | ||||
log.error('The given username y/o password are incorrect', self.name) | ||||
if self.ftp is not None: | ||||
self.ftp.close() | ||||
self.ftp = None | ||||
|
r1209 | self.ready = False | ||
r1135 | return | |||
log.success('Connection success', self.name) | ||||
|
r1209 | self.ready = True | ||
r1135 | return | |||
def check(self): | ||||
try: | ||||
self.ftp.voidcmd("NOOP") | ||||
except: | ||||
log.warning('Connection lost... trying to reconnect', self.name) | ||||
if self.ftp is not None: | ||||
self.ftp.close() | ||||
self.ftp = None | ||||
|
r1209 | self.connect() | ||
r1135 | ||||
def find_files(self, path, ext): | ||||
files = glob.glob1(path, '*{}'.format(ext)) | ||||
files.sort() | ||||
if files: | ||||
return files[-1] | ||||
return None | ||||
def getftpname(self, filename, exp_code, sub_exp_code): | ||||
thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d') | ||||
|
r1209 | YEAR_STR = '%4.4d' % thisDatetime.timetuple().tm_year | ||
DOY_STR = '%3.3d' % thisDatetime.timetuple().tm_yday | ||||
exp_code = '%3.3d' % exp_code | ||||
sub_exp_code = '%2.2d' % sub_exp_code | ||||
plot_code = '%2.2d' % get_plot_code(filename) | ||||
r1135 | name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png' | |||
return name | ||||
def upload(self, src, dst): | ||||
|
r1209 | log.log('Uploading {} -> {} '.format( | ||
src.split('/')[-1], dst.split('/')[-1]), | ||||
self.name, | ||||
nl=False | ||||
) | ||||
r1135 | ||||
fp = open(src, 'rb') | ||||
command = 'STOR {}'.format(dst) | ||||
try: | ||||
self.ftp.storbinary(command, fp, blocksize=1024) | ||||
|
r1167 | except Exception as e: | ||
r1135 | log.error('{}'.format(e), self.name) | |||
if self.ftp is not None: | ||||
self.ftp.close() | ||||
self.ftp = None | ||||
r1142 | return 0 | |||
r1135 | ||||
try: | ||||
self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst)) | ||||
|
r1167 | except Exception as e: | ||
r1135 | log.error('{}'.format(e), self.name) | |||
if self.ftp is not None: | ||||
self.ftp.close() | ||||
self.ftp = None | ||||
r1142 | return 0 | |||
r1135 | ||||
fp.close() | ||||
log.success('OK', tag='') | ||||
r1142 | return 1 | |||
r1135 | ||||
def send_files(self): | ||||
for x, pattern in enumerate(self.patterns): | ||||
|
r1209 | local, remote, ext, period, exp_code, sub_exp_code = pattern | ||
if time.time()-self.times[x] >= int(period): | ||||
srcname = self.find_files(local, ext) | ||||
src = os.path.join(local, srcname) | ||||
if os.path.getmtime(src) < time.time() - 30*60: | ||||
log.warning('Skipping old file {}'.format(srcname)) | ||||
r1142 | continue | |||
r1135 | if srcname is None or srcname == self.latest[x]: | |||
|
r1209 | log.warning('File alreday uploaded {}'.format(srcname)) | ||
r1135 | continue | |||
if 'png' in ext: | ||||
|
r1209 | dstname = self.getftpname(srcname, int(exp_code), int(sub_exp_code)) | ||
r1135 | else: | |||
r1142 | dstname = srcname | |||
r1135 | ||||
dst = os.path.join(remote, dstname) | ||||
r1142 | if self.upload(src, dst): | |||
self.times[x] = time.time() | ||||
self.latest[x] = srcname | ||||
else: | ||||
|
r1209 | self.ready = False | ||
r1142 | break | |||
r1135 | ||||
|
r1209 | def run(self, dataOut, server, username, password, timeout=10, **kwargs): | ||
r1135 | ||||
|
r1209 | if not self.isConfig: | ||
self.setup( | ||||
server=server, | ||||
username=username, | ||||
password=password, | ||||
timeout=timeout, | ||||
**kwargs | ||||
) | ||||
self.isConfig = True | ||||
if not self.ready: | ||||
self.connect() | ||||
if self.ftp is not None: | ||||
self.check() | ||||
self.send_files() | ||||
r1135 | ||||
|
r1209 | def close(self): | ||
r1135 | ||||
if self.ftp is not None: | ||||
r1139 | self.ftp.close() | |||