##// END OF EJS Templates
New Operation SendToFTP
jespinoza -
r1135:eed97f9f9bf5
parent child
Show More
@@ -2,12 +2,15
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 import os
6 import glob
5 7 import time
6 8 import json
7 9 import numpy
8 10 import paho.mqtt.client as mqtt
9 11 import zmq
10 12 import datetime
13 import ftplib
11 14 from zmq.utils.monitor import recv_monitor_message
12 15 from functools import wraps
13 16 from threading import Thread
@@ -17,8 +20,33 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 20 from schainpy.model.data.jrodata import JROData
18 21 from schainpy.utils import log
19 22
20 MAXNUMX = 100
21 MAXNUMY = 100
23 MAXNUMX = 500
24 MAXNUMY = 500
25
26 PLOT_CODES = {
27 'rti': 0, #Range time intensity (RTI).
28 'spc': 1, #Spectra (and Cross-spectra) information.
29 'cspc': 2, #Cross-Correlation information.
30 'coh': 3, #Coherence map.
31 'base': 4, #Base lines graphic.
32 'row': 5, #Row Spectra.
33 'total' : 6, #Total Power.
34 'drift' : 7, #Drifts graphics.
35 'height' : 8, #Height profile.
36 'phase' : 9, #Signal Phase.
37 'power' : 16,
38 'noise' : 17,
39 'beacon' : 18,
40 #USED IN jroplot_parameters.py
41 'wind' : 22,
42 'skymap' : 23,
43 # 'MPHASE_CODE' : 24,
44 'moments' : 25,
45 'param' : 26,
46 'spc_fit' : 27,
47 'ew_drifts' : 28,
48 'reflectivity': 30
49 }
22 50
23 51 class PrettyFloat(float):
24 52 def __repr__(self):
@@ -82,10 +110,11 class Data(object):
82 110 Object to hold data to be plotted
83 111 '''
84 112
85 def __init__(self, plottypes, throttle_value, exp_code):
113 def __init__(self, plottypes, throttle_value, exp_code, buffering=True):
86 114 self.plottypes = plottypes
87 115 self.throttle = throttle_value
88 116 self.exp_code = exp_code
117 self.buffering = buffering
89 118 self.ended = False
90 119 self.localtime = False
91 120 self.__times = []
@@ -102,7 +131,7 class Data(object):
102 131 if key not in self.data:
103 132 raise KeyError(log.error('Missing key: {}'.format(key)))
104 133
105 if 'spc' in key:
134 if 'spc' in key or not self.buffering:
106 135 ret = self.data[key]
107 136 else:
108 137 ret = numpy.array([self.data[key][x] for x in self.times])
@@ -118,6 +147,7 class Data(object):
118 147 Configure object
119 148 '''
120 149
150 self.type = ''
121 151 self.ended = False
122 152 self.data = {}
123 153 self.__times = []
@@ -134,7 +164,7 class Data(object):
134 164 '''
135 165
136 166 if len(self.data[key]):
137 if 'spc' in key:
167 if 'spc' in key or not self.buffering:
138 168 return self.data[key].shape
139 169 return self.data[key][self.__times[0]].shape
140 170 return (0,)
@@ -147,6 +177,7 class Data(object):
147 177 if tm in self.__times:
148 178 return
149 179
180 self.type = dataOut.type
150 181 self.parameters = getattr(dataOut, 'parameters', [])
151 182 if hasattr(dataOut, 'pairsList'):
152 183 self.pairs = dataOut.pairsList
@@ -166,27 +197,32 class Data(object):
166 197 if plot == 'cspc':
167 198 self.data[plot] = dataOut.data_cspc
168 199 if plot == 'noise':
169 self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
200 buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
170 201 if plot == 'rti':
171 self.data[plot][tm] = dataOut.getPower()
202 buffer = dataOut.getPower()
172 203 if plot == 'snr_db':
173 204 self.data['snr'][tm] = dataOut.data_SNR
174 205 if plot == 'snr':
175 self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR)
206 buffer = 10*numpy.log10(dataOut.data_SNR)
176 207 if plot == 'dop':
177 self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP)
208 buffer = 10*numpy.log10(dataOut.data_DOP)
178 209 if plot == 'mean':
179 self.data[plot][tm] = dataOut.data_MEAN
210 buffer = dataOut.data_MEAN
180 211 if plot == 'std':
181 self.data[plot][tm] = dataOut.data_STD
212 buffer = dataOut.data_STD
182 213 if plot == 'coh':
183 self.data[plot][tm] = dataOut.getCoherence()
214 buffer = dataOut.getCoherence()
184 215 if plot == 'phase':
185 self.data[plot][tm] = dataOut.getCoherence(phase=True)
216 buffer = dataOut.getCoherence(phase=True)
186 217 if plot == 'output':
187 self.data[plot][tm] = dataOut.data_output
218 buffer = dataOut.data_output
188 219 if plot == 'param':
189 self.data[plot][tm] = dataOut.data_param
220 buffer = dataOut.data_param
221
222 if self.buffering:
223 self.data[plot][tm] = buffer
224 else:
225 self.data[plot] = buffer
190 226
191 227 def normalize_heights(self):
192 228 '''
@@ -220,7 +256,7 class Data(object):
220 256 tm = self.times[-1]
221 257
222 258 for key in self.data:
223 if key in ('spc', 'cspc'):
259 if key in ('spc', 'cspc') or not self.buffering:
224 260 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
225 261 dy = int(self.data[key].shape[2]/MAXNUMY) + 1
226 262 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
@@ -235,6 +271,8 class Data(object):
235 271 ret['yrange'] = roundFloats(self.heights.tolist())
236 272 if key in ('spc', 'cspc'):
237 273 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
274 else:
275 ret['xrange'] = []
238 276 if hasattr(self, 'pairs'):
239 277 ret['pairs'] = self.pairs
240 278 return json.dumps(ret)
@@ -489,7 +527,7 class PlotterReceiver(ProcessingUnit, Process):
489 527
490 528 throttle_value = 5
491 529 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
492 'exp_code', 'web_server']
530 'exp_code', 'web_server', 'buffering']
493 531
494 532 def __init__(self, **kwargs):
495 533
@@ -510,6 +548,7 class PlotterReceiver(ProcessingUnit, Process):
510 548 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
511 549 self.realtime = kwargs.get('realtime', False)
512 550 self.localtime = kwargs.get('localtime', True)
551 self.buffering = kwargs.get('buffering', True)
513 552 self.throttle_value = kwargs.get('throttle', 5)
514 553 self.exp_code = kwargs.get('exp_code', None)
515 554 self.sendData = self.initThrottle(self.throttle_value)
@@ -518,8 +557,8 class PlotterReceiver(ProcessingUnit, Process):
518 557
519 558 def setup(self):
520 559
521 self.data = Data(self.plottypes, self.throttle_value, self.exp_code)
522 self.isConfig = True
560 self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering)
561 self.isConfig = True
523 562
524 563 def event_monitor(self, monitor):
525 564
@@ -553,12 +592,12 class PlotterReceiver(ProcessingUnit, Process):
553 592 return sendDataThrottled
554 593
555 594 def send(self, data):
556 log.success('Sending {}'.format(data), self.name)
595 log.log('Sending {}'.format(data), self.name)
557 596 self.sender.send_pyobj(data)
558 597
559 598 def run(self):
560 599
561 log.success(
600 log.log(
562 601 'Starting from {}'.format(self.address),
563 602 self.name
564 603 )
@@ -573,7 +612,7 class PlotterReceiver(ProcessingUnit, Process):
573 612 'Sending to web: {}'.format(self.web_address),
574 613 self.name
575 614 )
576 self.sender_web = self.context.socket(zmq.PUB)
615 self.sender_web = self.context.socket(zmq.PUSH)
577 616 self.sender_web.connect(self.web_address)
578 617 time.sleep(1)
579 618
@@ -623,9 +662,168 class PlotterReceiver(ProcessingUnit, Process):
623 662 if self.realtime:
624 663 self.send(self.data)
625 664 if self.web_address:
626 self.sender_web.send(self.data.jsonify())
665 payload = self.data.jsonify()
666 log.log('Sending to web... type:{}, size:{}'.format(dataOut.type, len(payload)), self.name)
667 self.sender_web.send(payload)
627 668 else:
628 669 self.sendData(self.send, self.data, coerce=coerce)
629 670 coerce = False
630 671
631 672 return
673
674
675 class SendToFTP(Operation, Process):
676
677 '''
678 Operation to send data over FTP.
679 '''
680
681 __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout']
682
683 def __init__(self, **kwargs):
684 '''
685 patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...]
686 '''
687 Operation.__init__(self, **kwargs)
688 Process.__init__(self)
689 self.server = kwargs.get('server')
690 self.username = kwargs.get('username')
691 self.password = kwargs.get('password')
692 self.patterns = kwargs.get('patterns')
693 self.timeout = kwargs.get('timeout', 10)
694 self.times = [time.time() for p in self.patterns]
695 self.latest = ['' for p in self.patterns]
696 self.mp = False
697 self.ftp = None
698
699 def setup(self):
700
701 log.log('Connecting to ftp://{}'.format(self.server), self.name)
702 try:
703 self.ftp = ftplib.FTP(self.server, timeout=self.timeout)
704 except ftplib.all_errors:
705 log.error('Server connection fail: {}'.format(self.server), self.name)
706 if self.ftp is not None:
707 self.ftp.close()
708 self.ftp = None
709 self.isConfig = False
710 return
711
712 try:
713 self.ftp.login(self.username, self.password)
714 except ftplib.all_errors:
715 log.error('The given username y/o password are incorrect', self.name)
716 if self.ftp is not None:
717 self.ftp.close()
718 self.ftp = None
719 self.isConfig = False
720 return
721
722 log.success('Connection success', self.name)
723 self.isConfig = True
724 return
725
726 def check(self):
727
728 try:
729 self.ftp.voidcmd("NOOP")
730 except:
731 log.warning('Connection lost... trying to reconnect', self.name)
732 if self.ftp is not None:
733 self.ftp.close()
734 self.ftp = None
735 self.setup()
736
737 def find_files(self, path, ext):
738
739 files = glob.glob1(path, '*{}'.format(ext))
740 files.sort()
741 if files:
742 return files[-1]
743 return None
744
745 def getftpname(self, filename, exp_code, sub_exp_code):
746
747 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
748 YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year
749 DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday
750 exp_code = '%3.3d'%exp_code
751 sub_exp_code = '%2.2d'%sub_exp_code
752 plot_code = '%2.2d'% PLOT_CODES[filename.split('_')[0].split('-')[0]]
753 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
754 return name
755
756 def upload(self, src, dst):
757
758 log.log('Uploading {} '.format(src), self.name, nl=False)
759
760 fp = open(src, 'rb')
761 command = 'STOR {}'.format(dst)
762
763 try:
764 self.ftp.storbinary(command, fp, blocksize=1024)
765 except ftplib.all_errors, e:
766 log.error('{}'.format(e), self.name)
767 if self.ftp is not None:
768 self.ftp.close()
769 self.ftp = None
770 return
771
772 try:
773 self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
774 except ftplib.all_errors, e:
775 log.error('{}'.format(e), self.name)
776 if self.ftp is not None:
777 self.ftp.close()
778 self.ftp = None
779
780 fp.close()
781
782 log.success('OK', tag='')
783
784 def send_files(self):
785
786 for x, pattern in enumerate(self.patterns):
787 local, remote, ext, delay, exp_code, sub_exp_code = pattern
788 if time.time()-self.times[x] >= delay:
789 srcname = self.find_files(local, ext)
790
791 if srcname is None or srcname == self.latest[x]:
792 continue
793
794 if 'png' in ext:
795 dstname = self.getftpname(srcname, exp_code, sub_exp_code)
796 else:
797 dstname = srcname
798
799 src = os.path.join(local, srcname)
800
801 if os.path.getmtime(src) < time.time() - 30*60:
802 continue
803
804 dst = os.path.join(remote, dstname)
805
806 if self.ftp is None:
807 continue
808
809 self.upload(src, dst)
810
811 self.times[x] = time.time()
812 self.latest[x] = srcname
813
814 def run(self):
815
816 while True:
817 if not self.isConfig:
818 self.setup()
819 if self.ftp is not None:
820 self.check()
821 self.send_files()
822 time.sleep(2)
823
824 def close():
825
826 if self.ftp is not None:
827 if self.ftp is not None:
828 self.ftp.close()
829 self.terminate()
General Comments 0
You need to be logged in to leave comments. Login now