diff --git a/schainpy/model/graphics/jroplot_data.py b/schainpy/model/graphics/jroplot_data.py index ef4c80a..f2ac422 100644 --- a/schainpy/model/graphics/jroplot_data.py +++ b/schainpy/model/graphics/jroplot_data.py @@ -613,6 +613,7 @@ class PolarMapPlot(Plot): self.titles = ['{} {}'.format( self.data.parameters[x], title) for x in self.channels] + class ScopePlot(Plot): ''' diff --git a/schainpy/model/utils/jroutils_publish.py b/schainpy/model/utils/jroutils_publish.py index 2013f89..d631130 100644 --- a/schainpy/model/utils/jroutils_publish.py +++ b/schainpy/model/utils/jroutils_publish.py @@ -14,7 +14,7 @@ from functools import wraps from threading import Thread from multiprocessing import Process -from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit +from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit, MPDecorator from schainpy.model.data.jrodata import JROData from schainpy.utils import log @@ -65,17 +65,11 @@ class PublishData(Operation): __attrs__ = ['host', 'port', 'delay', 'verbose'] - def __init__(self, **kwargs): - """Inicio.""" - Operation.__init__(self, **kwargs) - self.isConfig = False - def setup(self, server='zmq.pipe', delay=0, verbose=True, **kwargs): self.counter = 0 self.delay = kwargs.get('delay', 0) self.cnt = 0 self.verbose = verbose - setup = [] context = zmq.Context() self.zmq_socket = context.socket(zmq.PUSH) server = kwargs.get('server', 'zmq.pipe') @@ -154,32 +148,43 @@ class ReceiverData(ProcessingUnit): self.dataOut.datatime.ctime(),), 'Receiving') - -class SendToFTP(Operation, Process): +@MPDecorator +class SendToFTP(Operation): ''' Operation to send data over FTP. + patternX = 'local, remote, ext, period, exp_code, sub_exp_code' ''' - __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout'] + __attrs__ = ['server', 'username', 'password', 'timeout', 'patternX'] - def __init__(self, **kwargs): + def __init__(self): ''' - patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...] ''' - Operation.__init__(self, **kwargs) - Process.__init__(self) - self.server = kwargs.get('server') - self.username = kwargs.get('username') - self.password = kwargs.get('password') - self.patterns = kwargs.get('patterns') - self.timeout = kwargs.get('timeout', 30) - self.times = [time.time() for p in self.patterns] - self.latest = ['' for p in self.patterns] - self.mp = False + Operation.__init__(self) self.ftp = None + self.ready = False - def setup(self): + def setup(self, server, username, password, timeout, **kwargs): + ''' + ''' + + self.server = server + self.username = username + self.password = password + self.timeout = timeout + self.patterns = [] + self.times = [] + self.latest = [] + for arg, value in kwargs.items(): + if 'pattern' in arg: + self.patterns.append(value) + self.times.append(time.time()) + self.latest.append('') + + def connect(self): + ''' + ''' log.log('Connecting to ftp://{}'.format(self.server), self.name) try: @@ -189,7 +194,7 @@ class SendToFTP(Operation, Process): if self.ftp is not None: self.ftp.close() self.ftp = None - self.isConfig = False + self.ready = False return try: @@ -199,11 +204,11 @@ class SendToFTP(Operation, Process): if self.ftp is not None: self.ftp.close() self.ftp = None - self.isConfig = False + self.ready = False return log.success('Connection success', self.name) - self.isConfig = True + self.ready = True return def check(self): @@ -215,7 +220,7 @@ class SendToFTP(Operation, Process): if self.ftp is not None: self.ftp.close() self.ftp = None - self.setup() + self.connect() def find_files(self, path, ext): @@ -228,17 +233,21 @@ class SendToFTP(Operation, Process): def getftpname(self, filename, exp_code, sub_exp_code): thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d') - YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year - DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday - exp_code = '%3.3d'%exp_code - sub_exp_code = '%2.2d'%sub_exp_code - plot_code = '%2.2d'% get_plot_code(filename) + YEAR_STR = '%4.4d' % thisDatetime.timetuple().tm_year + DOY_STR = '%3.3d' % thisDatetime.timetuple().tm_yday + exp_code = '%3.3d' % exp_code + sub_exp_code = '%2.2d' % sub_exp_code + plot_code = '%2.2d' % get_plot_code(filename) name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png' return name def upload(self, src, dst): - log.log('Uploading {} '.format(src), self.name, nl=False) + log.log('Uploading {} -> {} '.format( + src.split('/')[-1], dst.split('/')[-1]), + self.name, + nl=False + ) fp = open(src, 'rb') command = 'STOR {}'.format(dst) @@ -268,18 +277,20 @@ class SendToFTP(Operation, Process): def send_files(self): for x, pattern in enumerate(self.patterns): - local, remote, ext, delay, exp_code, sub_exp_code = pattern - if time.time()-self.times[x] >= delay: - srcname = self.find_files(local, ext) - src = os.path.join(local, srcname) - if os.path.getmtime(src) < time.time() - 30*60: + local, remote, ext, period, exp_code, sub_exp_code = pattern + if time.time()-self.times[x] >= int(period): + srcname = self.find_files(local, ext) + src = os.path.join(local, srcname) + if os.path.getmtime(src) < time.time() - 30*60: + log.warning('Skipping old file {}'.format(srcname)) continue if srcname is None or srcname == self.latest[x]: + log.warning('File alreday uploaded {}'.format(srcname)) continue if 'png' in ext: - dstname = self.getftpname(srcname, exp_code, sub_exp_code) + dstname = self.getftpname(srcname, int(exp_code), int(sub_exp_code)) else: dstname = srcname @@ -289,21 +300,27 @@ class SendToFTP(Operation, Process): self.times[x] = time.time() self.latest[x] = srcname else: - self.isConfig = False + self.ready = False break - def run(self): + def run(self, dataOut, server, username, password, timeout=10, **kwargs): - while True: - if not self.isConfig: - self.setup() - if self.ftp is not None: - self.check() - self.send_files() - time.sleep(10) + if not self.isConfig: + self.setup( + server=server, + username=username, + password=password, + timeout=timeout, + **kwargs + ) + self.isConfig = True + if not self.ready: + self.connect() + if self.ftp is not None: + self.check() + self.send_files() - def close(): + def close(self): if self.ftp is not None: self.ftp.close() - self.terminate() \ No newline at end of file