##// END OF EJS Templates
New Operation SendToFTP
jespinoza -
r1135:eed97f9f9bf5
parent child
Show More
@@ -2,12 +2,15
2 @author: Juan C. Espinoza
2 @author: Juan C. Espinoza
3 '''
3 '''
4
4
5 import os
6 import glob
5 import time
7 import time
6 import json
8 import json
7 import numpy
9 import numpy
8 import paho.mqtt.client as mqtt
10 import paho.mqtt.client as mqtt
9 import zmq
11 import zmq
10 import datetime
12 import datetime
13 import ftplib
11 from zmq.utils.monitor import recv_monitor_message
14 from zmq.utils.monitor import recv_monitor_message
12 from functools import wraps
15 from functools import wraps
13 from threading import Thread
16 from threading import Thread
@@ -17,8 +20,33 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 from schainpy.model.data.jrodata import JROData
20 from schainpy.model.data.jrodata import JROData
18 from schainpy.utils import log
21 from schainpy.utils import log
19
22
20 MAXNUMX = 100
23 MAXNUMX = 500
21 MAXNUMY = 100
24 MAXNUMY = 500
25
26 PLOT_CODES = {
27 'rti': 0, #Range time intensity (RTI).
28 'spc': 1, #Spectra (and Cross-spectra) information.
29 'cspc': 2, #Cross-Correlation information.
30 'coh': 3, #Coherence map.
31 'base': 4, #Base lines graphic.
32 'row': 5, #Row Spectra.
33 'total' : 6, #Total Power.
34 'drift' : 7, #Drifts graphics.
35 'height' : 8, #Height profile.
36 'phase' : 9, #Signal Phase.
37 'power' : 16,
38 'noise' : 17,
39 'beacon' : 18,
40 #USED IN jroplot_parameters.py
41 'wind' : 22,
42 'skymap' : 23,
43 # 'MPHASE_CODE' : 24,
44 'moments' : 25,
45 'param' : 26,
46 'spc_fit' : 27,
47 'ew_drifts' : 28,
48 'reflectivity': 30
49 }
22
50
23 class PrettyFloat(float):
51 class PrettyFloat(float):
24 def __repr__(self):
52 def __repr__(self):
@@ -82,10 +110,11 class Data(object):
82 Object to hold data to be plotted
110 Object to hold data to be plotted
83 '''
111 '''
84
112
85 def __init__(self, plottypes, throttle_value, exp_code):
113 def __init__(self, plottypes, throttle_value, exp_code, buffering=True):
86 self.plottypes = plottypes
114 self.plottypes = plottypes
87 self.throttle = throttle_value
115 self.throttle = throttle_value
88 self.exp_code = exp_code
116 self.exp_code = exp_code
117 self.buffering = buffering
89 self.ended = False
118 self.ended = False
90 self.localtime = False
119 self.localtime = False
91 self.__times = []
120 self.__times = []
@@ -102,7 +131,7 class Data(object):
102 if key not in self.data:
131 if key not in self.data:
103 raise KeyError(log.error('Missing key: {}'.format(key)))
132 raise KeyError(log.error('Missing key: {}'.format(key)))
104
133
105 if 'spc' in key:
134 if 'spc' in key or not self.buffering:
106 ret = self.data[key]
135 ret = self.data[key]
107 else:
136 else:
108 ret = numpy.array([self.data[key][x] for x in self.times])
137 ret = numpy.array([self.data[key][x] for x in self.times])
@@ -118,6 +147,7 class Data(object):
118 Configure object
147 Configure object
119 '''
148 '''
120
149
150 self.type = ''
121 self.ended = False
151 self.ended = False
122 self.data = {}
152 self.data = {}
123 self.__times = []
153 self.__times = []
@@ -134,7 +164,7 class Data(object):
134 '''
164 '''
135
165
136 if len(self.data[key]):
166 if len(self.data[key]):
137 if 'spc' in key:
167 if 'spc' in key or not self.buffering:
138 return self.data[key].shape
168 return self.data[key].shape
139 return self.data[key][self.__times[0]].shape
169 return self.data[key][self.__times[0]].shape
140 return (0,)
170 return (0,)
@@ -147,6 +177,7 class Data(object):
147 if tm in self.__times:
177 if tm in self.__times:
148 return
178 return
149
179
180 self.type = dataOut.type
150 self.parameters = getattr(dataOut, 'parameters', [])
181 self.parameters = getattr(dataOut, 'parameters', [])
151 if hasattr(dataOut, 'pairsList'):
182 if hasattr(dataOut, 'pairsList'):
152 self.pairs = dataOut.pairsList
183 self.pairs = dataOut.pairsList
@@ -166,27 +197,32 class Data(object):
166 if plot == 'cspc':
197 if plot == 'cspc':
167 self.data[plot] = dataOut.data_cspc
198 self.data[plot] = dataOut.data_cspc
168 if plot == 'noise':
199 if plot == 'noise':
169 self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
200 buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
170 if plot == 'rti':
201 if plot == 'rti':
171 self.data[plot][tm] = dataOut.getPower()
202 buffer = dataOut.getPower()
172 if plot == 'snr_db':
203 if plot == 'snr_db':
173 self.data['snr'][tm] = dataOut.data_SNR
204 self.data['snr'][tm] = dataOut.data_SNR
174 if plot == 'snr':
205 if plot == 'snr':
175 self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR)
206 buffer = 10*numpy.log10(dataOut.data_SNR)
176 if plot == 'dop':
207 if plot == 'dop':
177 self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP)
208 buffer = 10*numpy.log10(dataOut.data_DOP)
178 if plot == 'mean':
209 if plot == 'mean':
179 self.data[plot][tm] = dataOut.data_MEAN
210 buffer = dataOut.data_MEAN
180 if plot == 'std':
211 if plot == 'std':
181 self.data[plot][tm] = dataOut.data_STD
212 buffer = dataOut.data_STD
182 if plot == 'coh':
213 if plot == 'coh':
183 self.data[plot][tm] = dataOut.getCoherence()
214 buffer = dataOut.getCoherence()
184 if plot == 'phase':
215 if plot == 'phase':
185 self.data[plot][tm] = dataOut.getCoherence(phase=True)
216 buffer = dataOut.getCoherence(phase=True)
186 if plot == 'output':
217 if plot == 'output':
187 self.data[plot][tm] = dataOut.data_output
218 buffer = dataOut.data_output
188 if plot == 'param':
219 if plot == 'param':
189 self.data[plot][tm] = dataOut.data_param
220 buffer = dataOut.data_param
221
222 if self.buffering:
223 self.data[plot][tm] = buffer
224 else:
225 self.data[plot] = buffer
190
226
191 def normalize_heights(self):
227 def normalize_heights(self):
192 '''
228 '''
@@ -220,7 +256,7 class Data(object):
220 tm = self.times[-1]
256 tm = self.times[-1]
221
257
222 for key in self.data:
258 for key in self.data:
223 if key in ('spc', 'cspc'):
259 if key in ('spc', 'cspc') or not self.buffering:
224 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
260 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
225 dy = int(self.data[key].shape[2]/MAXNUMY) + 1
261 dy = int(self.data[key].shape[2]/MAXNUMY) + 1
226 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
262 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
@@ -235,6 +271,8 class Data(object):
235 ret['yrange'] = roundFloats(self.heights.tolist())
271 ret['yrange'] = roundFloats(self.heights.tolist())
236 if key in ('spc', 'cspc'):
272 if key in ('spc', 'cspc'):
237 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
273 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
274 else:
275 ret['xrange'] = []
238 if hasattr(self, 'pairs'):
276 if hasattr(self, 'pairs'):
239 ret['pairs'] = self.pairs
277 ret['pairs'] = self.pairs
240 return json.dumps(ret)
278 return json.dumps(ret)
@@ -489,7 +527,7 class PlotterReceiver(ProcessingUnit, Process):
489
527
490 throttle_value = 5
528 throttle_value = 5
491 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
529 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
492 'exp_code', 'web_server']
530 'exp_code', 'web_server', 'buffering']
493
531
494 def __init__(self, **kwargs):
532 def __init__(self, **kwargs):
495
533
@@ -510,6 +548,7 class PlotterReceiver(ProcessingUnit, Process):
510 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
548 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
511 self.realtime = kwargs.get('realtime', False)
549 self.realtime = kwargs.get('realtime', False)
512 self.localtime = kwargs.get('localtime', True)
550 self.localtime = kwargs.get('localtime', True)
551 self.buffering = kwargs.get('buffering', True)
513 self.throttle_value = kwargs.get('throttle', 5)
552 self.throttle_value = kwargs.get('throttle', 5)
514 self.exp_code = kwargs.get('exp_code', None)
553 self.exp_code = kwargs.get('exp_code', None)
515 self.sendData = self.initThrottle(self.throttle_value)
554 self.sendData = self.initThrottle(self.throttle_value)
@@ -518,8 +557,8 class PlotterReceiver(ProcessingUnit, Process):
518
557
519 def setup(self):
558 def setup(self):
520
559
521 self.data = Data(self.plottypes, self.throttle_value, self.exp_code)
560 self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering)
522 self.isConfig = True
561 self.isConfig = True
523
562
524 def event_monitor(self, monitor):
563 def event_monitor(self, monitor):
525
564
@@ -553,12 +592,12 class PlotterReceiver(ProcessingUnit, Process):
553 return sendDataThrottled
592 return sendDataThrottled
554
593
555 def send(self, data):
594 def send(self, data):
556 log.success('Sending {}'.format(data), self.name)
595 log.log('Sending {}'.format(data), self.name)
557 self.sender.send_pyobj(data)
596 self.sender.send_pyobj(data)
558
597
559 def run(self):
598 def run(self):
560
599
561 log.success(
600 log.log(
562 'Starting from {}'.format(self.address),
601 'Starting from {}'.format(self.address),
563 self.name
602 self.name
564 )
603 )
@@ -573,7 +612,7 class PlotterReceiver(ProcessingUnit, Process):
573 'Sending to web: {}'.format(self.web_address),
612 'Sending to web: {}'.format(self.web_address),
574 self.name
613 self.name
575 )
614 )
576 self.sender_web = self.context.socket(zmq.PUB)
615 self.sender_web = self.context.socket(zmq.PUSH)
577 self.sender_web.connect(self.web_address)
616 self.sender_web.connect(self.web_address)
578 time.sleep(1)
617 time.sleep(1)
579
618
@@ -623,9 +662,168 class PlotterReceiver(ProcessingUnit, Process):
623 if self.realtime:
662 if self.realtime:
624 self.send(self.data)
663 self.send(self.data)
625 if self.web_address:
664 if self.web_address:
626 self.sender_web.send(self.data.jsonify())
665 payload = self.data.jsonify()
666 log.log('Sending to web... type:{}, size:{}'.format(dataOut.type, len(payload)), self.name)
667 self.sender_web.send(payload)
627 else:
668 else:
628 self.sendData(self.send, self.data, coerce=coerce)
669 self.sendData(self.send, self.data, coerce=coerce)
629 coerce = False
670 coerce = False
630
671
631 return
672 return
673
674
675 class SendToFTP(Operation, Process):
676
677 '''
678 Operation to send data over FTP.
679 '''
680
681 __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout']
682
683 def __init__(self, **kwargs):
684 '''
685 patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...]
686 '''
687 Operation.__init__(self, **kwargs)
688 Process.__init__(self)
689 self.server = kwargs.get('server')
690 self.username = kwargs.get('username')
691 self.password = kwargs.get('password')
692 self.patterns = kwargs.get('patterns')
693 self.timeout = kwargs.get('timeout', 10)
694 self.times = [time.time() for p in self.patterns]
695 self.latest = ['' for p in self.patterns]
696 self.mp = False
697 self.ftp = None
698
699 def setup(self):
700
701 log.log('Connecting to ftp://{}'.format(self.server), self.name)
702 try:
703 self.ftp = ftplib.FTP(self.server, timeout=self.timeout)
704 except ftplib.all_errors:
705 log.error('Server connection fail: {}'.format(self.server), self.name)
706 if self.ftp is not None:
707 self.ftp.close()
708 self.ftp = None
709 self.isConfig = False
710 return
711
712 try:
713 self.ftp.login(self.username, self.password)
714 except ftplib.all_errors:
715 log.error('The given username y/o password are incorrect', self.name)
716 if self.ftp is not None:
717 self.ftp.close()
718 self.ftp = None
719 self.isConfig = False
720 return
721
722 log.success('Connection success', self.name)
723 self.isConfig = True
724 return
725
726 def check(self):
727
728 try:
729 self.ftp.voidcmd("NOOP")
730 except:
731 log.warning('Connection lost... trying to reconnect', self.name)
732 if self.ftp is not None:
733 self.ftp.close()
734 self.ftp = None
735 self.setup()
736
737 def find_files(self, path, ext):
738
739 files = glob.glob1(path, '*{}'.format(ext))
740 files.sort()
741 if files:
742 return files[-1]
743 return None
744
745 def getftpname(self, filename, exp_code, sub_exp_code):
746
747 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
748 YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year
749 DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday
750 exp_code = '%3.3d'%exp_code
751 sub_exp_code = '%2.2d'%sub_exp_code
752 plot_code = '%2.2d'% PLOT_CODES[filename.split('_')[0].split('-')[0]]
753 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
754 return name
755
756 def upload(self, src, dst):
757
758 log.log('Uploading {} '.format(src), self.name, nl=False)
759
760 fp = open(src, 'rb')
761 command = 'STOR {}'.format(dst)
762
763 try:
764 self.ftp.storbinary(command, fp, blocksize=1024)
765 except ftplib.all_errors, e:
766 log.error('{}'.format(e), self.name)
767 if self.ftp is not None:
768 self.ftp.close()
769 self.ftp = None
770 return
771
772 try:
773 self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
774 except ftplib.all_errors, e:
775 log.error('{}'.format(e), self.name)
776 if self.ftp is not None:
777 self.ftp.close()
778 self.ftp = None
779
780 fp.close()
781
782 log.success('OK', tag='')
783
784 def send_files(self):
785
786 for x, pattern in enumerate(self.patterns):
787 local, remote, ext, delay, exp_code, sub_exp_code = pattern
788 if time.time()-self.times[x] >= delay:
789 srcname = self.find_files(local, ext)
790
791 if srcname is None or srcname == self.latest[x]:
792 continue
793
794 if 'png' in ext:
795 dstname = self.getftpname(srcname, exp_code, sub_exp_code)
796 else:
797 dstname = srcname
798
799 src = os.path.join(local, srcname)
800
801 if os.path.getmtime(src) < time.time() - 30*60:
802 continue
803
804 dst = os.path.join(remote, dstname)
805
806 if self.ftp is None:
807 continue
808
809 self.upload(src, dst)
810
811 self.times[x] = time.time()
812 self.latest[x] = srcname
813
814 def run(self):
815
816 while True:
817 if not self.isConfig:
818 self.setup()
819 if self.ftp is not None:
820 self.check()
821 self.send_files()
822 time.sleep(2)
823
824 def close():
825
826 if self.ftp is not None:
827 if self.ftp is not None:
828 self.ftp.close()
829 self.terminate()
General Comments 0
You need to be logged in to leave comments. Login now