jroutils_publish.py
308 lines
| 8.6 KiB
| text/x-python
|
PythonLexer
|
r859 | ''' | |
@author: Juan C. Espinoza | |||
''' | |||
r1135 | import os | ||
import glob | |||
|
r859 | import time | |
import json | |||
import numpy | |||
|
r883 | import zmq | |
import datetime | |||
r1135 | import ftplib | ||
|
r883 | from functools import wraps | |
from threading import Thread | |||
from multiprocessing import Process | |||
|
r859 | ||
|
r883 | from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit | |
|
r957 | from schainpy.model.data.jrodata import JROData | |
|
r1062 | from schainpy.utils import log | |
|
r883 | ||
r1135 | MAXNUMX = 500 | ||
MAXNUMY = 500 | |||
PLOT_CODES = { | |||
r1139 | 'rti': 0, # Range time intensity (RTI). | ||
'spc': 1, # Spectra (and Cross-spectra) information. | |||
'cspc': 2, # Cross-Correlation information. | |||
'coh': 3, # Coherence map. | |||
'base': 4, # Base lines graphic. | |||
'row': 5, # Row Spectra. | |||
r1142 | 'total': 6, # Total Power. | ||
'drift': 7, # Drifts graphics. | |||
'height': 8, # Height profile. | |||
'phase': 9, # Signal Phase. | |||
'power': 16, | |||
'noise': 17, | |||
'beacon': 18, | |||
'wind': 22, | |||
'skymap': 23, | |||
'Unknown': 24, | |||
'V-E': 25, # PIP Velocity. | |||
'Z-E': 26, # PIP Reflectivity. | |||
'V-A': 27, # RHI Velocity. | |||
'Z-A': 28, # RHI Reflectivity. | |||
r1135 | } | ||
|
r859 | ||
r1142 | def get_plot_code(s): | ||
label = s.split('_')[0] | |||
codes = [key for key in PLOT_CODES if key in label] | |||
if codes: | |||
return PLOT_CODES[codes[0]] | |||
else: | |||
return 24 | |||
|
r859 | ||
|
r931 | def decimate(z, MAXNUMY): | |
|
r904 | dy = int(len(z[0])/MAXNUMY) + 1 | |
r911 | |||
|
r904 | return z[::, ::dy] | |
|
r859 | ||
|
r860 | ||
|
r859 | class PublishData(Operation): | |
|
r1062 | ''' | |
Operation to send data over zmq. | |||
''' | |||
|
r860 | ||
|
r1187 | __attrs__ = ['host', 'port', 'delay', 'verbose'] | |
|
r1097 | ||
|
r883 | def __init__(self, **kwargs): | |
|
r860 | """Inicio.""" | |
|
r883 | Operation.__init__(self, **kwargs) | |
|
r859 | self.isConfig = False | |
|
r1187 | ||
def setup(self, server='zmq.pipe', delay=0, verbose=True, **kwargs): | |||
|
r883 | self.counter = 0 | |
|
r859 | self.delay = kwargs.get('delay', 0) | |
self.cnt = 0 | |||
|
r1062 | self.verbose = verbose | |
|
r860 | setup = [] | |
|
r1187 | context = zmq.Context() | |
self.zmq_socket = context.socket(zmq.PUSH) | |||
server = kwargs.get('server', 'zmq.pipe') | |||
if 'tcp://' in server: | |||
address = server | |||
else: | |||
address = 'ipc:///tmp/%s' % server | |||
self.zmq_socket.connect(address) | |||
time.sleep(1) | |||
|
r883 | ||
|
r931 | ||
|
r883 | def publish_data(self): | |
self.dataOut.finished = False | |||
|
r1187 | ||
if self.verbose: | |||
log.log( | |||
'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime), | |||
self.name | |||
) | |||
self.zmq_socket.send_pyobj(self.dataOut) | |||
|
r883 | ||
def run(self, dataOut, **kwargs): | |||
|
r860 | self.dataOut = dataOut | |
if not self.isConfig: | |||
|
r883 | self.setup(**kwargs) | |
|
r859 | self.isConfig = True | |
|
r860 | ||
|
r883 | self.publish_data() | |
|
r860 | time.sleep(self.delay) | |
|
r859 | def close(self): | |
|
r1187 | ||
self.dataOut.finished = True | |||
self.zmq_socket.send_pyobj(self.dataOut) | |||
time.sleep(0.1) | |||
self.zmq_socket.close() | |||
|
r957 | ||
class ReceiverData(ProcessingUnit): | |||
|
r1097 | __attrs__ = ['server'] | |
|
r957 | def __init__(self, **kwargs): | |
ProcessingUnit.__init__(self, **kwargs) | |||
self.isConfig = False | |||
server = kwargs.get('server', 'zmq.pipe') | |||
if 'tcp://' in server: | |||
address = server | |||
else: | |||
address = 'ipc:///tmp/%s' % server | |||
self.address = address | |||
self.dataOut = JROData() | |||
def setup(self): | |||
self.context = zmq.Context() | |||
self.receiver = self.context.socket(zmq.PULL) | |||
self.receiver.bind(self.address) | |||
time.sleep(0.5) | |||
|
r1062 | log.success('ReceiverData from {}'.format(self.address)) | |
|
r957 | ||
def run(self): | |||
if not self.isConfig: | |||
self.setup() | |||
self.isConfig = True | |||
self.dataOut = self.receiver.recv_pyobj() | |||
|
r1062 | log.log('{} - {}'.format(self.dataOut.type, | |
self.dataOut.datatime.ctime(),), | |||
'Receiving') | |||
|
r957 | ||
r1135 | class SendToFTP(Operation, Process): | ||
''' | |||
Operation to send data over FTP. | |||
''' | |||
__attrs__ = ['server', 'username', 'password', 'patterns', 'timeout'] | |||
def __init__(self, **kwargs): | |||
''' | |||
patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...] | |||
''' | |||
Operation.__init__(self, **kwargs) | |||
Process.__init__(self) | |||
self.server = kwargs.get('server') | |||
self.username = kwargs.get('username') | |||
self.password = kwargs.get('password') | |||
self.patterns = kwargs.get('patterns') | |||
r1139 | self.timeout = kwargs.get('timeout', 30) | ||
r1135 | self.times = [time.time() for p in self.patterns] | ||
self.latest = ['' for p in self.patterns] | |||
self.mp = False | |||
self.ftp = None | |||
def setup(self): | |||
log.log('Connecting to ftp://{}'.format(self.server), self.name) | |||
try: | |||
self.ftp = ftplib.FTP(self.server, timeout=self.timeout) | |||
except ftplib.all_errors: | |||
log.error('Server connection fail: {}'.format(self.server), self.name) | |||
if self.ftp is not None: | |||
self.ftp.close() | |||
self.ftp = None | |||
self.isConfig = False | |||
return | |||
try: | |||
self.ftp.login(self.username, self.password) | |||
except ftplib.all_errors: | |||
log.error('The given username y/o password are incorrect', self.name) | |||
if self.ftp is not None: | |||
self.ftp.close() | |||
self.ftp = None | |||
self.isConfig = False | |||
return | |||
log.success('Connection success', self.name) | |||
self.isConfig = True | |||
return | |||
def check(self): | |||
try: | |||
self.ftp.voidcmd("NOOP") | |||
except: | |||
log.warning('Connection lost... trying to reconnect', self.name) | |||
if self.ftp is not None: | |||
self.ftp.close() | |||
self.ftp = None | |||
self.setup() | |||
def find_files(self, path, ext): | |||
files = glob.glob1(path, '*{}'.format(ext)) | |||
files.sort() | |||
if files: | |||
return files[-1] | |||
return None | |||
def getftpname(self, filename, exp_code, sub_exp_code): | |||
thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d') | |||
YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year | |||
DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday | |||
exp_code = '%3.3d'%exp_code | |||
sub_exp_code = '%2.2d'%sub_exp_code | |||
r1142 | plot_code = '%2.2d'% get_plot_code(filename) | ||
r1135 | name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png' | ||
return name | |||
def upload(self, src, dst): | |||
log.log('Uploading {} '.format(src), self.name, nl=False) | |||
fp = open(src, 'rb') | |||
command = 'STOR {}'.format(dst) | |||
try: | |||
self.ftp.storbinary(command, fp, blocksize=1024) | |||
|
r1167 | except Exception as e: | |
r1135 | log.error('{}'.format(e), self.name) | ||
if self.ftp is not None: | |||
self.ftp.close() | |||
self.ftp = None | |||
r1142 | return 0 | ||
r1135 | |||
try: | |||
self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst)) | |||
|
r1167 | except Exception as e: | |
r1135 | log.error('{}'.format(e), self.name) | ||
if self.ftp is not None: | |||
self.ftp.close() | |||
self.ftp = None | |||
r1142 | return 0 | ||
r1135 | |||
fp.close() | |||
log.success('OK', tag='') | |||
r1142 | return 1 | ||
r1135 | |||
def send_files(self): | |||
for x, pattern in enumerate(self.patterns): | |||
local, remote, ext, delay, exp_code, sub_exp_code = pattern | |||
if time.time()-self.times[x] >= delay: | |||
r1142 | srcname = self.find_files(local, ext) | ||
src = os.path.join(local, srcname) | |||
if os.path.getmtime(src) < time.time() - 30*60: | |||
continue | |||
r1135 | if srcname is None or srcname == self.latest[x]: | ||
continue | |||
if 'png' in ext: | |||
dstname = self.getftpname(srcname, exp_code, sub_exp_code) | |||
else: | |||
r1142 | dstname = srcname | ||
r1135 | |||
dst = os.path.join(remote, dstname) | |||
r1142 | if self.upload(src, dst): | ||
self.times[x] = time.time() | |||
self.latest[x] = srcname | |||
else: | |||
self.isConfig = False | |||
break | |||
r1135 | |||
def run(self): | |||
while True: | |||
if not self.isConfig: | |||
self.setup() | |||
if self.ftp is not None: | |||
self.check() | |||
r1142 | self.send_files() | ||
r1139 | time.sleep(10) | ||
r1135 | |||
def close(): | |||
if self.ftp is not None: | |||
r1139 | self.ftp.close() | ||
|
r1167 | self.terminate() |