''' @author: Juan C. Espinoza ''' import os import glob import time import json import numpy import zmq import datetime import ftplib from functools import wraps from threading import Thread from multiprocessing import Process from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit, MPDecorator from schainpy.model.data.jrodata import JROData from schainpy.utils import log MAXNUMX = 500 MAXNUMY = 500 PLOT_CODES = { 'rti': 0, # Range time intensity (RTI). 'spc': 1, # Spectra (and Cross-spectra) information. 'cspc': 2, # Cross-Correlation information. 'coh': 3, # Coherence map. 'base': 4, # Base lines graphic. 'row': 5, # Row Spectra. 'total': 6, # Total Power. 'drift': 7, # Drifts graphics. 'height': 8, # Height profile. 'phase': 9, # Signal Phase. 'power': 16, 'noise': 17, 'beacon': 18, 'wind': 22, 'skymap': 23, 'Unknown': 24, 'V-E': 25, # PIP Velocity. 'Z-E': 26, # PIP Reflectivity. 'V-A': 27, # RHI Velocity. 'Z-A': 28, # RHI Reflectivity. } def get_plot_code(s): label = s.split('_')[0] codes = [key for key in PLOT_CODES if key in label] if codes: return PLOT_CODES[codes[0]] else: return 24 def decimate(z, MAXNUMY): dy = int(len(z[0])/MAXNUMY) + 1 return z[::, ::dy] class PublishData(Operation): ''' Operation to send data over zmq. ''' __attrs__ = ['host', 'port', 'delay', 'verbose'] def setup(self, server='zmq.pipe', delay=0, verbose=True, **kwargs): self.counter = 0 self.delay = kwargs.get('delay', 0) self.cnt = 0 self.verbose = verbose context = zmq.Context() self.zmq_socket = context.socket(zmq.PUSH) server = kwargs.get('server', 'zmq.pipe') if 'tcp://' in server: address = server else: address = 'ipc:///tmp/%s' % server self.zmq_socket.connect(address) time.sleep(1) def publish_data(self): self.dataOut.finished = False if self.verbose: log.log( 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime), self.name ) self.zmq_socket.send_pyobj(self.dataOut) def run(self, dataOut, **kwargs): self.dataOut = dataOut if not self.isConfig: self.setup(**kwargs) self.isConfig = True self.publish_data() time.sleep(self.delay) def close(self): self.dataOut.finished = True self.zmq_socket.send_pyobj(self.dataOut) time.sleep(0.1) self.zmq_socket.close() class ReceiverData(ProcessingUnit): __attrs__ = ['server'] def __init__(self, **kwargs): ProcessingUnit.__init__(self, **kwargs) self.isConfig = False server = kwargs.get('server', 'zmq.pipe') if 'tcp://' in server: address = server else: address = 'ipc:///tmp/%s' % server self.address = address self.dataOut = JROData() def setup(self): self.context = zmq.Context() self.receiver = self.context.socket(zmq.PULL) self.receiver.bind(self.address) time.sleep(0.5) log.success('ReceiverData from {}'.format(self.address)) def run(self): if not self.isConfig: self.setup() self.isConfig = True self.dataOut = self.receiver.recv_pyobj() log.log('{} - {}'.format(self.dataOut.type, self.dataOut.datatime.ctime(),), 'Receiving') @MPDecorator class SendToFTP(Operation): ''' Operation to send data over FTP. patternX = 'local, remote, ext, period, exp_code, sub_exp_code' ''' __attrs__ = ['server', 'username', 'password', 'timeout', 'patternX'] def __init__(self): ''' ''' Operation.__init__(self) self.ftp = None self.ready = False def setup(self, server, username, password, timeout, **kwargs): ''' ''' self.server = server self.username = username self.password = password self.timeout = timeout self.patterns = [] self.times = [] self.latest = [] for arg, value in kwargs.items(): if 'pattern' in arg: self.patterns.append(value) self.times.append(time.time()) self.latest.append('') def connect(self): ''' ''' log.log('Connecting to ftp://{}'.format(self.server), self.name) try: self.ftp = ftplib.FTP(self.server, timeout=self.timeout) except ftplib.all_errors: log.error('Server connection fail: {}'.format(self.server), self.name) if self.ftp is not None: self.ftp.close() self.ftp = None self.ready = False return try: self.ftp.login(self.username, self.password) except ftplib.all_errors: log.error('The given username y/o password are incorrect', self.name) if self.ftp is not None: self.ftp.close() self.ftp = None self.ready = False return log.success('Connection success', self.name) self.ready = True return def check(self): try: self.ftp.voidcmd("NOOP") except: log.warning('Connection lost... trying to reconnect', self.name) if self.ftp is not None: self.ftp.close() self.ftp = None self.connect() def find_files(self, path, ext): files = glob.glob1(path, '*{}'.format(ext)) files.sort() if files: return files[-1] return None def getftpname(self, filename, exp_code, sub_exp_code): thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d') YEAR_STR = '%4.4d' % thisDatetime.timetuple().tm_year DOY_STR = '%3.3d' % thisDatetime.timetuple().tm_yday exp_code = '%3.3d' % exp_code sub_exp_code = '%2.2d' % sub_exp_code plot_code = '%2.2d' % get_plot_code(filename) name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png' return name def upload(self, src, dst): log.log('Uploading {} -> {} '.format( src.split('/')[-1], dst.split('/')[-1]), self.name, nl=False ) fp = open(src, 'rb') command = 'STOR {}'.format(dst) try: self.ftp.storbinary(command, fp, blocksize=1024) except Exception as e: log.error('{}'.format(e), self.name) if self.ftp is not None: self.ftp.close() self.ftp = None return 0 try: self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst)) except Exception as e: log.error('{}'.format(e), self.name) if self.ftp is not None: self.ftp.close() self.ftp = None return 0 fp.close() log.success('OK', tag='') return 1 def send_files(self): for x, pattern in enumerate(self.patterns): local, remote, ext, period, exp_code, sub_exp_code = pattern if time.time()-self.times[x] >= int(period): srcname = self.find_files(local, ext) src = os.path.join(local, srcname) if os.path.getmtime(src) < time.time() - 30*60: log.warning('Skipping old file {}'.format(srcname)) continue if srcname is None or srcname == self.latest[x]: log.warning('File alreday uploaded {}'.format(srcname)) continue if 'png' in ext: dstname = self.getftpname(srcname, int(exp_code), int(sub_exp_code)) else: dstname = srcname dst = os.path.join(remote, dstname) if self.upload(src, dst): self.times[x] = time.time() self.latest[x] = srcname else: self.ready = False break def run(self, dataOut, server, username, password, timeout=10, **kwargs): if not self.isConfig: self.setup( server=server, username=username, password=password, timeout=timeout, **kwargs ) self.isConfig = True if not self.ready: self.connect() if self.ftp is not None: self.check() self.send_files() def close(self): if self.ftp is not None: self.ftp.close()