@@ -2,12 +2,15 | |||||
2 | @author: Juan C. Espinoza |
|
2 | @author: Juan C. Espinoza | |
3 | ''' |
|
3 | ''' | |
4 |
|
4 | |||
|
5 | import os | |||
|
6 | import glob | |||
5 | import time |
|
7 | import time | |
6 | import json |
|
8 | import json | |
7 | import numpy |
|
9 | import numpy | |
8 | import paho.mqtt.client as mqtt |
|
10 | import paho.mqtt.client as mqtt | |
9 | import zmq |
|
11 | import zmq | |
10 | import datetime |
|
12 | import datetime | |
|
13 | import ftplib | |||
11 | from zmq.utils.monitor import recv_monitor_message |
|
14 | from zmq.utils.monitor import recv_monitor_message | |
12 | from functools import wraps |
|
15 | from functools import wraps | |
13 | from threading import Thread |
|
16 | from threading import Thread | |
@@ -17,8 +20,33 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit | |||||
17 | from schainpy.model.data.jrodata import JROData |
|
20 | from schainpy.model.data.jrodata import JROData | |
18 | from schainpy.utils import log |
|
21 | from schainpy.utils import log | |
19 |
|
22 | |||
20 |
MAXNUMX = |
|
23 | MAXNUMX = 500 | |
21 |
MAXNUMY = |
|
24 | MAXNUMY = 500 | |
|
25 | ||||
|
26 | PLOT_CODES = { | |||
|
27 | 'rti': 0, #Range time intensity (RTI). | |||
|
28 | 'spc': 1, #Spectra (and Cross-spectra) information. | |||
|
29 | 'cspc': 2, #Cross-Correlation information. | |||
|
30 | 'coh': 3, #Coherence map. | |||
|
31 | 'base': 4, #Base lines graphic. | |||
|
32 | 'row': 5, #Row Spectra. | |||
|
33 | 'total' : 6, #Total Power. | |||
|
34 | 'drift' : 7, #Drifts graphics. | |||
|
35 | 'height' : 8, #Height profile. | |||
|
36 | 'phase' : 9, #Signal Phase. | |||
|
37 | 'power' : 16, | |||
|
38 | 'noise' : 17, | |||
|
39 | 'beacon' : 18, | |||
|
40 | #USED IN jroplot_parameters.py | |||
|
41 | 'wind' : 22, | |||
|
42 | 'skymap' : 23, | |||
|
43 | # 'MPHASE_CODE' : 24, | |||
|
44 | 'moments' : 25, | |||
|
45 | 'param' : 26, | |||
|
46 | 'spc_fit' : 27, | |||
|
47 | 'ew_drifts' : 28, | |||
|
48 | 'reflectivity': 30 | |||
|
49 | } | |||
22 |
|
50 | |||
23 | class PrettyFloat(float): |
|
51 | class PrettyFloat(float): | |
24 | def __repr__(self): |
|
52 | def __repr__(self): | |
@@ -82,10 +110,11 class Data(object): | |||||
82 | Object to hold data to be plotted |
|
110 | Object to hold data to be plotted | |
83 | ''' |
|
111 | ''' | |
84 |
|
112 | |||
85 | def __init__(self, plottypes, throttle_value, exp_code): |
|
113 | def __init__(self, plottypes, throttle_value, exp_code, buffering=True): | |
86 | self.plottypes = plottypes |
|
114 | self.plottypes = plottypes | |
87 | self.throttle = throttle_value |
|
115 | self.throttle = throttle_value | |
88 | self.exp_code = exp_code |
|
116 | self.exp_code = exp_code | |
|
117 | self.buffering = buffering | |||
89 | self.ended = False |
|
118 | self.ended = False | |
90 | self.localtime = False |
|
119 | self.localtime = False | |
91 | self.__times = [] |
|
120 | self.__times = [] | |
@@ -102,7 +131,7 class Data(object): | |||||
102 | if key not in self.data: |
|
131 | if key not in self.data: | |
103 | raise KeyError(log.error('Missing key: {}'.format(key))) |
|
132 | raise KeyError(log.error('Missing key: {}'.format(key))) | |
104 |
|
133 | |||
105 | if 'spc' in key: |
|
134 | if 'spc' in key or not self.buffering: | |
106 | ret = self.data[key] |
|
135 | ret = self.data[key] | |
107 | else: |
|
136 | else: | |
108 | ret = numpy.array([self.data[key][x] for x in self.times]) |
|
137 | ret = numpy.array([self.data[key][x] for x in self.times]) | |
@@ -118,6 +147,7 class Data(object): | |||||
118 | Configure object |
|
147 | Configure object | |
119 | ''' |
|
148 | ''' | |
120 |
|
149 | |||
|
150 | self.type = '' | |||
121 | self.ended = False |
|
151 | self.ended = False | |
122 | self.data = {} |
|
152 | self.data = {} | |
123 | self.__times = [] |
|
153 | self.__times = [] | |
@@ -134,7 +164,7 class Data(object): | |||||
134 | ''' |
|
164 | ''' | |
135 |
|
165 | |||
136 | if len(self.data[key]): |
|
166 | if len(self.data[key]): | |
137 | if 'spc' in key: |
|
167 | if 'spc' in key or not self.buffering: | |
138 | return self.data[key].shape |
|
168 | return self.data[key].shape | |
139 | return self.data[key][self.__times[0]].shape |
|
169 | return self.data[key][self.__times[0]].shape | |
140 | return (0,) |
|
170 | return (0,) | |
@@ -147,6 +177,7 class Data(object): | |||||
147 | if tm in self.__times: |
|
177 | if tm in self.__times: | |
148 | return |
|
178 | return | |
149 |
|
179 | |||
|
180 | self.type = dataOut.type | |||
150 | self.parameters = getattr(dataOut, 'parameters', []) |
|
181 | self.parameters = getattr(dataOut, 'parameters', []) | |
151 | if hasattr(dataOut, 'pairsList'): |
|
182 | if hasattr(dataOut, 'pairsList'): | |
152 | self.pairs = dataOut.pairsList |
|
183 | self.pairs = dataOut.pairsList | |
@@ -166,27 +197,32 class Data(object): | |||||
166 | if plot == 'cspc': |
|
197 | if plot == 'cspc': | |
167 | self.data[plot] = dataOut.data_cspc |
|
198 | self.data[plot] = dataOut.data_cspc | |
168 | if plot == 'noise': |
|
199 | if plot == 'noise': | |
169 |
|
|
200 | buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor) | |
170 | if plot == 'rti': |
|
201 | if plot == 'rti': | |
171 |
|
|
202 | buffer = dataOut.getPower() | |
172 | if plot == 'snr_db': |
|
203 | if plot == 'snr_db': | |
173 | self.data['snr'][tm] = dataOut.data_SNR |
|
204 | self.data['snr'][tm] = dataOut.data_SNR | |
174 | if plot == 'snr': |
|
205 | if plot == 'snr': | |
175 |
|
|
206 | buffer = 10*numpy.log10(dataOut.data_SNR) | |
176 | if plot == 'dop': |
|
207 | if plot == 'dop': | |
177 |
|
|
208 | buffer = 10*numpy.log10(dataOut.data_DOP) | |
178 | if plot == 'mean': |
|
209 | if plot == 'mean': | |
179 |
|
|
210 | buffer = dataOut.data_MEAN | |
180 | if plot == 'std': |
|
211 | if plot == 'std': | |
181 |
|
|
212 | buffer = dataOut.data_STD | |
182 | if plot == 'coh': |
|
213 | if plot == 'coh': | |
183 |
|
|
214 | buffer = dataOut.getCoherence() | |
184 | if plot == 'phase': |
|
215 | if plot == 'phase': | |
185 |
|
|
216 | buffer = dataOut.getCoherence(phase=True) | |
186 | if plot == 'output': |
|
217 | if plot == 'output': | |
187 |
|
|
218 | buffer = dataOut.data_output | |
188 | if plot == 'param': |
|
219 | if plot == 'param': | |
189 |
|
|
220 | buffer = dataOut.data_param | |
|
221 | ||||
|
222 | if self.buffering: | |||
|
223 | self.data[plot][tm] = buffer | |||
|
224 | else: | |||
|
225 | self.data[plot] = buffer | |||
190 |
|
226 | |||
191 | def normalize_heights(self): |
|
227 | def normalize_heights(self): | |
192 | ''' |
|
228 | ''' | |
@@ -220,7 +256,7 class Data(object): | |||||
220 | tm = self.times[-1] |
|
256 | tm = self.times[-1] | |
221 |
|
257 | |||
222 | for key in self.data: |
|
258 | for key in self.data: | |
223 | if key in ('spc', 'cspc'): |
|
259 | if key in ('spc', 'cspc') or not self.buffering: | |
224 | dx = int(self.data[key].shape[1]/MAXNUMX) + 1 |
|
260 | dx = int(self.data[key].shape[1]/MAXNUMX) + 1 | |
225 | dy = int(self.data[key].shape[2]/MAXNUMY) + 1 |
|
261 | dy = int(self.data[key].shape[2]/MAXNUMY) + 1 | |
226 | data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist()) |
|
262 | data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist()) | |
@@ -235,6 +271,8 class Data(object): | |||||
235 | ret['yrange'] = roundFloats(self.heights.tolist()) |
|
271 | ret['yrange'] = roundFloats(self.heights.tolist()) | |
236 | if key in ('spc', 'cspc'): |
|
272 | if key in ('spc', 'cspc'): | |
237 | ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist()) |
|
273 | ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist()) | |
|
274 | else: | |||
|
275 | ret['xrange'] = [] | |||
238 | if hasattr(self, 'pairs'): |
|
276 | if hasattr(self, 'pairs'): | |
239 | ret['pairs'] = self.pairs |
|
277 | ret['pairs'] = self.pairs | |
240 | return json.dumps(ret) |
|
278 | return json.dumps(ret) | |
@@ -489,7 +527,7 class PlotterReceiver(ProcessingUnit, Process): | |||||
489 |
|
527 | |||
490 | throttle_value = 5 |
|
528 | throttle_value = 5 | |
491 | __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle', |
|
529 | __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle', | |
492 | 'exp_code', 'web_server'] |
|
530 | 'exp_code', 'web_server', 'buffering'] | |
493 |
|
531 | |||
494 | def __init__(self, **kwargs): |
|
532 | def __init__(self, **kwargs): | |
495 |
|
533 | |||
@@ -510,6 +548,7 class PlotterReceiver(ProcessingUnit, Process): | |||||
510 | self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')] |
|
548 | self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')] | |
511 | self.realtime = kwargs.get('realtime', False) |
|
549 | self.realtime = kwargs.get('realtime', False) | |
512 | self.localtime = kwargs.get('localtime', True) |
|
550 | self.localtime = kwargs.get('localtime', True) | |
|
551 | self.buffering = kwargs.get('buffering', True) | |||
513 | self.throttle_value = kwargs.get('throttle', 5) |
|
552 | self.throttle_value = kwargs.get('throttle', 5) | |
514 | self.exp_code = kwargs.get('exp_code', None) |
|
553 | self.exp_code = kwargs.get('exp_code', None) | |
515 | self.sendData = self.initThrottle(self.throttle_value) |
|
554 | self.sendData = self.initThrottle(self.throttle_value) | |
@@ -518,8 +557,8 class PlotterReceiver(ProcessingUnit, Process): | |||||
518 |
|
557 | |||
519 | def setup(self): |
|
558 | def setup(self): | |
520 |
|
559 | |||
521 | self.data = Data(self.plottypes, self.throttle_value, self.exp_code) |
|
560 | self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering) | |
522 |
self.isConfig = True |
|
561 | self.isConfig = True | |
523 |
|
562 | |||
524 | def event_monitor(self, monitor): |
|
563 | def event_monitor(self, monitor): | |
525 |
|
564 | |||
@@ -553,12 +592,12 class PlotterReceiver(ProcessingUnit, Process): | |||||
553 | return sendDataThrottled |
|
592 | return sendDataThrottled | |
554 |
|
593 | |||
555 | def send(self, data): |
|
594 | def send(self, data): | |
556 |
log. |
|
595 | log.log('Sending {}'.format(data), self.name) | |
557 | self.sender.send_pyobj(data) |
|
596 | self.sender.send_pyobj(data) | |
558 |
|
597 | |||
559 | def run(self): |
|
598 | def run(self): | |
560 |
|
599 | |||
561 |
log. |
|
600 | log.log( | |
562 | 'Starting from {}'.format(self.address), |
|
601 | 'Starting from {}'.format(self.address), | |
563 | self.name |
|
602 | self.name | |
564 | ) |
|
603 | ) | |
@@ -573,7 +612,7 class PlotterReceiver(ProcessingUnit, Process): | |||||
573 | 'Sending to web: {}'.format(self.web_address), |
|
612 | 'Sending to web: {}'.format(self.web_address), | |
574 | self.name |
|
613 | self.name | |
575 | ) |
|
614 | ) | |
576 |
self.sender_web = self.context.socket(zmq.PU |
|
615 | self.sender_web = self.context.socket(zmq.PUSH) | |
577 | self.sender_web.connect(self.web_address) |
|
616 | self.sender_web.connect(self.web_address) | |
578 | time.sleep(1) |
|
617 | time.sleep(1) | |
579 |
|
618 | |||
@@ -623,9 +662,168 class PlotterReceiver(ProcessingUnit, Process): | |||||
623 | if self.realtime: |
|
662 | if self.realtime: | |
624 | self.send(self.data) |
|
663 | self.send(self.data) | |
625 | if self.web_address: |
|
664 | if self.web_address: | |
626 |
|
|
665 | payload = self.data.jsonify() | |
|
666 | log.log('Sending to web... type:{}, size:{}'.format(dataOut.type, len(payload)), self.name) | |||
|
667 | self.sender_web.send(payload) | |||
627 | else: |
|
668 | else: | |
628 | self.sendData(self.send, self.data, coerce=coerce) |
|
669 | self.sendData(self.send, self.data, coerce=coerce) | |
629 | coerce = False |
|
670 | coerce = False | |
630 |
|
671 | |||
631 | return |
|
672 | return | |
|
673 | ||||
|
674 | ||||
|
675 | class SendToFTP(Operation, Process): | |||
|
676 | ||||
|
677 | ''' | |||
|
678 | Operation to send data over FTP. | |||
|
679 | ''' | |||
|
680 | ||||
|
681 | __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout'] | |||
|
682 | ||||
|
683 | def __init__(self, **kwargs): | |||
|
684 | ''' | |||
|
685 | patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...] | |||
|
686 | ''' | |||
|
687 | Operation.__init__(self, **kwargs) | |||
|
688 | Process.__init__(self) | |||
|
689 | self.server = kwargs.get('server') | |||
|
690 | self.username = kwargs.get('username') | |||
|
691 | self.password = kwargs.get('password') | |||
|
692 | self.patterns = kwargs.get('patterns') | |||
|
693 | self.timeout = kwargs.get('timeout', 10) | |||
|
694 | self.times = [time.time() for p in self.patterns] | |||
|
695 | self.latest = ['' for p in self.patterns] | |||
|
696 | self.mp = False | |||
|
697 | self.ftp = None | |||
|
698 | ||||
|
699 | def setup(self): | |||
|
700 | ||||
|
701 | log.log('Connecting to ftp://{}'.format(self.server), self.name) | |||
|
702 | try: | |||
|
703 | self.ftp = ftplib.FTP(self.server, timeout=self.timeout) | |||
|
704 | except ftplib.all_errors: | |||
|
705 | log.error('Server connection fail: {}'.format(self.server), self.name) | |||
|
706 | if self.ftp is not None: | |||
|
707 | self.ftp.close() | |||
|
708 | self.ftp = None | |||
|
709 | self.isConfig = False | |||
|
710 | return | |||
|
711 | ||||
|
712 | try: | |||
|
713 | self.ftp.login(self.username, self.password) | |||
|
714 | except ftplib.all_errors: | |||
|
715 | log.error('The given username y/o password are incorrect', self.name) | |||
|
716 | if self.ftp is not None: | |||
|
717 | self.ftp.close() | |||
|
718 | self.ftp = None | |||
|
719 | self.isConfig = False | |||
|
720 | return | |||
|
721 | ||||
|
722 | log.success('Connection success', self.name) | |||
|
723 | self.isConfig = True | |||
|
724 | return | |||
|
725 | ||||
|
726 | def check(self): | |||
|
727 | ||||
|
728 | try: | |||
|
729 | self.ftp.voidcmd("NOOP") | |||
|
730 | except: | |||
|
731 | log.warning('Connection lost... trying to reconnect', self.name) | |||
|
732 | if self.ftp is not None: | |||
|
733 | self.ftp.close() | |||
|
734 | self.ftp = None | |||
|
735 | self.setup() | |||
|
736 | ||||
|
737 | def find_files(self, path, ext): | |||
|
738 | ||||
|
739 | files = glob.glob1(path, '*{}'.format(ext)) | |||
|
740 | files.sort() | |||
|
741 | if files: | |||
|
742 | return files[-1] | |||
|
743 | return None | |||
|
744 | ||||
|
745 | def getftpname(self, filename, exp_code, sub_exp_code): | |||
|
746 | ||||
|
747 | thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d') | |||
|
748 | YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year | |||
|
749 | DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday | |||
|
750 | exp_code = '%3.3d'%exp_code | |||
|
751 | sub_exp_code = '%2.2d'%sub_exp_code | |||
|
752 | plot_code = '%2.2d'% PLOT_CODES[filename.split('_')[0].split('-')[0]] | |||
|
753 | name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png' | |||
|
754 | return name | |||
|
755 | ||||
|
756 | def upload(self, src, dst): | |||
|
757 | ||||
|
758 | log.log('Uploading {} '.format(src), self.name, nl=False) | |||
|
759 | ||||
|
760 | fp = open(src, 'rb') | |||
|
761 | command = 'STOR {}'.format(dst) | |||
|
762 | ||||
|
763 | try: | |||
|
764 | self.ftp.storbinary(command, fp, blocksize=1024) | |||
|
765 | except ftplib.all_errors, e: | |||
|
766 | log.error('{}'.format(e), self.name) | |||
|
767 | if self.ftp is not None: | |||
|
768 | self.ftp.close() | |||
|
769 | self.ftp = None | |||
|
770 | return | |||
|
771 | ||||
|
772 | try: | |||
|
773 | self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst)) | |||
|
774 | except ftplib.all_errors, e: | |||
|
775 | log.error('{}'.format(e), self.name) | |||
|
776 | if self.ftp is not None: | |||
|
777 | self.ftp.close() | |||
|
778 | self.ftp = None | |||
|
779 | ||||
|
780 | fp.close() | |||
|
781 | ||||
|
782 | log.success('OK', tag='') | |||
|
783 | ||||
|
784 | def send_files(self): | |||
|
785 | ||||
|
786 | for x, pattern in enumerate(self.patterns): | |||
|
787 | local, remote, ext, delay, exp_code, sub_exp_code = pattern | |||
|
788 | if time.time()-self.times[x] >= delay: | |||
|
789 | srcname = self.find_files(local, ext) | |||
|
790 | ||||
|
791 | if srcname is None or srcname == self.latest[x]: | |||
|
792 | continue | |||
|
793 | ||||
|
794 | if 'png' in ext: | |||
|
795 | dstname = self.getftpname(srcname, exp_code, sub_exp_code) | |||
|
796 | else: | |||
|
797 | dstname = srcname | |||
|
798 | ||||
|
799 | src = os.path.join(local, srcname) | |||
|
800 | ||||
|
801 | if os.path.getmtime(src) < time.time() - 30*60: | |||
|
802 | continue | |||
|
803 | ||||
|
804 | dst = os.path.join(remote, dstname) | |||
|
805 | ||||
|
806 | if self.ftp is None: | |||
|
807 | continue | |||
|
808 | ||||
|
809 | self.upload(src, dst) | |||
|
810 | ||||
|
811 | self.times[x] = time.time() | |||
|
812 | self.latest[x] = srcname | |||
|
813 | ||||
|
814 | def run(self): | |||
|
815 | ||||
|
816 | while True: | |||
|
817 | if not self.isConfig: | |||
|
818 | self.setup() | |||
|
819 | if self.ftp is not None: | |||
|
820 | self.check() | |||
|
821 | self.send_files() | |||
|
822 | time.sleep(2) | |||
|
823 | ||||
|
824 | def close(): | |||
|
825 | ||||
|
826 | if self.ftp is not None: | |||
|
827 | if self.ftp is not None: | |||
|
828 | self.ftp.close() | |||
|
829 | self.terminate() |
General Comments 0
You need to be logged in to leave comments.
Login now