@@ -2,12 +2,15 | |||
|
2 | 2 | @author: Juan C. Espinoza |
|
3 | 3 | ''' |
|
4 | 4 | |
|
5 | import os | |
|
6 | import glob | |
|
5 | 7 | import time |
|
6 | 8 | import json |
|
7 | 9 | import numpy |
|
8 | 10 | import paho.mqtt.client as mqtt |
|
9 | 11 | import zmq |
|
10 | 12 | import datetime |
|
13 | import ftplib | |
|
11 | 14 | from zmq.utils.monitor import recv_monitor_message |
|
12 | 15 | from functools import wraps |
|
13 | 16 | from threading import Thread |
@@ -17,8 +20,33 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit | |||
|
17 | 20 | from schainpy.model.data.jrodata import JROData |
|
18 | 21 | from schainpy.utils import log |
|
19 | 22 | |
|
20 |
MAXNUMX = |
|
|
21 |
MAXNUMY = |
|
|
23 | MAXNUMX = 500 | |
|
24 | MAXNUMY = 500 | |
|
25 | ||
|
26 | PLOT_CODES = { | |
|
27 | 'rti': 0, #Range time intensity (RTI). | |
|
28 | 'spc': 1, #Spectra (and Cross-spectra) information. | |
|
29 | 'cspc': 2, #Cross-Correlation information. | |
|
30 | 'coh': 3, #Coherence map. | |
|
31 | 'base': 4, #Base lines graphic. | |
|
32 | 'row': 5, #Row Spectra. | |
|
33 | 'total' : 6, #Total Power. | |
|
34 | 'drift' : 7, #Drifts graphics. | |
|
35 | 'height' : 8, #Height profile. | |
|
36 | 'phase' : 9, #Signal Phase. | |
|
37 | 'power' : 16, | |
|
38 | 'noise' : 17, | |
|
39 | 'beacon' : 18, | |
|
40 | #USED IN jroplot_parameters.py | |
|
41 | 'wind' : 22, | |
|
42 | 'skymap' : 23, | |
|
43 | # 'MPHASE_CODE' : 24, | |
|
44 | 'moments' : 25, | |
|
45 | 'param' : 26, | |
|
46 | 'spc_fit' : 27, | |
|
47 | 'ew_drifts' : 28, | |
|
48 | 'reflectivity': 30 | |
|
49 | } | |
|
22 | 50 | |
|
23 | 51 | class PrettyFloat(float): |
|
24 | 52 | def __repr__(self): |
@@ -82,10 +110,11 class Data(object): | |||
|
82 | 110 | Object to hold data to be plotted |
|
83 | 111 | ''' |
|
84 | 112 | |
|
85 | def __init__(self, plottypes, throttle_value, exp_code): | |
|
113 | def __init__(self, plottypes, throttle_value, exp_code, buffering=True): | |
|
86 | 114 | self.plottypes = plottypes |
|
87 | 115 | self.throttle = throttle_value |
|
88 | 116 | self.exp_code = exp_code |
|
117 | self.buffering = buffering | |
|
89 | 118 | self.ended = False |
|
90 | 119 | self.localtime = False |
|
91 | 120 | self.__times = [] |
@@ -102,7 +131,7 class Data(object): | |||
|
102 | 131 | if key not in self.data: |
|
103 | 132 | raise KeyError(log.error('Missing key: {}'.format(key))) |
|
104 | 133 | |
|
105 | if 'spc' in key: | |
|
134 | if 'spc' in key or not self.buffering: | |
|
106 | 135 | ret = self.data[key] |
|
107 | 136 | else: |
|
108 | 137 | ret = numpy.array([self.data[key][x] for x in self.times]) |
@@ -118,6 +147,7 class Data(object): | |||
|
118 | 147 | Configure object |
|
119 | 148 | ''' |
|
120 | 149 | |
|
150 | self.type = '' | |
|
121 | 151 | self.ended = False |
|
122 | 152 | self.data = {} |
|
123 | 153 | self.__times = [] |
@@ -134,7 +164,7 class Data(object): | |||
|
134 | 164 | ''' |
|
135 | 165 | |
|
136 | 166 | if len(self.data[key]): |
|
137 | if 'spc' in key: | |
|
167 | if 'spc' in key or not self.buffering: | |
|
138 | 168 | return self.data[key].shape |
|
139 | 169 | return self.data[key][self.__times[0]].shape |
|
140 | 170 | return (0,) |
@@ -147,6 +177,7 class Data(object): | |||
|
147 | 177 | if tm in self.__times: |
|
148 | 178 | return |
|
149 | 179 | |
|
180 | self.type = dataOut.type | |
|
150 | 181 | self.parameters = getattr(dataOut, 'parameters', []) |
|
151 | 182 | if hasattr(dataOut, 'pairsList'): |
|
152 | 183 | self.pairs = dataOut.pairsList |
@@ -166,27 +197,32 class Data(object): | |||
|
166 | 197 | if plot == 'cspc': |
|
167 | 198 | self.data[plot] = dataOut.data_cspc |
|
168 | 199 | if plot == 'noise': |
|
169 |
|
|
|
200 | buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor) | |
|
170 | 201 | if plot == 'rti': |
|
171 |
|
|
|
202 | buffer = dataOut.getPower() | |
|
172 | 203 | if plot == 'snr_db': |
|
173 | 204 | self.data['snr'][tm] = dataOut.data_SNR |
|
174 | 205 | if plot == 'snr': |
|
175 |
|
|
|
206 | buffer = 10*numpy.log10(dataOut.data_SNR) | |
|
176 | 207 | if plot == 'dop': |
|
177 |
|
|
|
208 | buffer = 10*numpy.log10(dataOut.data_DOP) | |
|
178 | 209 | if plot == 'mean': |
|
179 |
|
|
|
210 | buffer = dataOut.data_MEAN | |
|
180 | 211 | if plot == 'std': |
|
181 |
|
|
|
212 | buffer = dataOut.data_STD | |
|
182 | 213 | if plot == 'coh': |
|
183 |
|
|
|
214 | buffer = dataOut.getCoherence() | |
|
184 | 215 | if plot == 'phase': |
|
185 |
|
|
|
216 | buffer = dataOut.getCoherence(phase=True) | |
|
186 | 217 | if plot == 'output': |
|
187 |
|
|
|
218 | buffer = dataOut.data_output | |
|
188 | 219 | if plot == 'param': |
|
189 |
|
|
|
220 | buffer = dataOut.data_param | |
|
221 | ||
|
222 | if self.buffering: | |
|
223 | self.data[plot][tm] = buffer | |
|
224 | else: | |
|
225 | self.data[plot] = buffer | |
|
190 | 226 | |
|
191 | 227 | def normalize_heights(self): |
|
192 | 228 | ''' |
@@ -220,7 +256,7 class Data(object): | |||
|
220 | 256 | tm = self.times[-1] |
|
221 | 257 | |
|
222 | 258 | for key in self.data: |
|
223 | if key in ('spc', 'cspc'): | |
|
259 | if key in ('spc', 'cspc') or not self.buffering: | |
|
224 | 260 | dx = int(self.data[key].shape[1]/MAXNUMX) + 1 |
|
225 | 261 | dy = int(self.data[key].shape[2]/MAXNUMY) + 1 |
|
226 | 262 | data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist()) |
@@ -235,6 +271,8 class Data(object): | |||
|
235 | 271 | ret['yrange'] = roundFloats(self.heights.tolist()) |
|
236 | 272 | if key in ('spc', 'cspc'): |
|
237 | 273 | ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist()) |
|
274 | else: | |
|
275 | ret['xrange'] = [] | |
|
238 | 276 | if hasattr(self, 'pairs'): |
|
239 | 277 | ret['pairs'] = self.pairs |
|
240 | 278 | return json.dumps(ret) |
@@ -489,7 +527,7 class PlotterReceiver(ProcessingUnit, Process): | |||
|
489 | 527 | |
|
490 | 528 | throttle_value = 5 |
|
491 | 529 | __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle', |
|
492 | 'exp_code', 'web_server'] | |
|
530 | 'exp_code', 'web_server', 'buffering'] | |
|
493 | 531 | |
|
494 | 532 | def __init__(self, **kwargs): |
|
495 | 533 | |
@@ -510,6 +548,7 class PlotterReceiver(ProcessingUnit, Process): | |||
|
510 | 548 | self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')] |
|
511 | 549 | self.realtime = kwargs.get('realtime', False) |
|
512 | 550 | self.localtime = kwargs.get('localtime', True) |
|
551 | self.buffering = kwargs.get('buffering', True) | |
|
513 | 552 | self.throttle_value = kwargs.get('throttle', 5) |
|
514 | 553 | self.exp_code = kwargs.get('exp_code', None) |
|
515 | 554 | self.sendData = self.initThrottle(self.throttle_value) |
@@ -518,8 +557,8 class PlotterReceiver(ProcessingUnit, Process): | |||
|
518 | 557 | |
|
519 | 558 | def setup(self): |
|
520 | 559 | |
|
521 | self.data = Data(self.plottypes, self.throttle_value, self.exp_code) | |
|
522 |
self.isConfig = True |
|
|
560 | self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering) | |
|
561 | self.isConfig = True | |
|
523 | 562 | |
|
524 | 563 | def event_monitor(self, monitor): |
|
525 | 564 | |
@@ -553,12 +592,12 class PlotterReceiver(ProcessingUnit, Process): | |||
|
553 | 592 | return sendDataThrottled |
|
554 | 593 | |
|
555 | 594 | def send(self, data): |
|
556 |
log. |
|
|
595 | log.log('Sending {}'.format(data), self.name) | |
|
557 | 596 | self.sender.send_pyobj(data) |
|
558 | 597 | |
|
559 | 598 | def run(self): |
|
560 | 599 | |
|
561 |
log. |
|
|
600 | log.log( | |
|
562 | 601 | 'Starting from {}'.format(self.address), |
|
563 | 602 | self.name |
|
564 | 603 | ) |
@@ -573,7 +612,7 class PlotterReceiver(ProcessingUnit, Process): | |||
|
573 | 612 | 'Sending to web: {}'.format(self.web_address), |
|
574 | 613 | self.name |
|
575 | 614 | ) |
|
576 |
self.sender_web = self.context.socket(zmq.PU |
|
|
615 | self.sender_web = self.context.socket(zmq.PUSH) | |
|
577 | 616 | self.sender_web.connect(self.web_address) |
|
578 | 617 | time.sleep(1) |
|
579 | 618 | |
@@ -623,9 +662,168 class PlotterReceiver(ProcessingUnit, Process): | |||
|
623 | 662 | if self.realtime: |
|
624 | 663 | self.send(self.data) |
|
625 | 664 | if self.web_address: |
|
626 |
|
|
|
665 | payload = self.data.jsonify() | |
|
666 | log.log('Sending to web... type:{}, size:{}'.format(dataOut.type, len(payload)), self.name) | |
|
667 | self.sender_web.send(payload) | |
|
627 | 668 | else: |
|
628 | 669 | self.sendData(self.send, self.data, coerce=coerce) |
|
629 | 670 | coerce = False |
|
630 | 671 | |
|
631 | 672 | return |
|
673 | ||
|
674 | ||
|
675 | class SendToFTP(Operation, Process): | |
|
676 | ||
|
677 | ''' | |
|
678 | Operation to send data over FTP. | |
|
679 | ''' | |
|
680 | ||
|
681 | __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout'] | |
|
682 | ||
|
683 | def __init__(self, **kwargs): | |
|
684 | ''' | |
|
685 | patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...] | |
|
686 | ''' | |
|
687 | Operation.__init__(self, **kwargs) | |
|
688 | Process.__init__(self) | |
|
689 | self.server = kwargs.get('server') | |
|
690 | self.username = kwargs.get('username') | |
|
691 | self.password = kwargs.get('password') | |
|
692 | self.patterns = kwargs.get('patterns') | |
|
693 | self.timeout = kwargs.get('timeout', 10) | |
|
694 | self.times = [time.time() for p in self.patterns] | |
|
695 | self.latest = ['' for p in self.patterns] | |
|
696 | self.mp = False | |
|
697 | self.ftp = None | |
|
698 | ||
|
699 | def setup(self): | |
|
700 | ||
|
701 | log.log('Connecting to ftp://{}'.format(self.server), self.name) | |
|
702 | try: | |
|
703 | self.ftp = ftplib.FTP(self.server, timeout=self.timeout) | |
|
704 | except ftplib.all_errors: | |
|
705 | log.error('Server connection fail: {}'.format(self.server), self.name) | |
|
706 | if self.ftp is not None: | |
|
707 | self.ftp.close() | |
|
708 | self.ftp = None | |
|
709 | self.isConfig = False | |
|
710 | return | |
|
711 | ||
|
712 | try: | |
|
713 | self.ftp.login(self.username, self.password) | |
|
714 | except ftplib.all_errors: | |
|
715 | log.error('The given username y/o password are incorrect', self.name) | |
|
716 | if self.ftp is not None: | |
|
717 | self.ftp.close() | |
|
718 | self.ftp = None | |
|
719 | self.isConfig = False | |
|
720 | return | |
|
721 | ||
|
722 | log.success('Connection success', self.name) | |
|
723 | self.isConfig = True | |
|
724 | return | |
|
725 | ||
|
726 | def check(self): | |
|
727 | ||
|
728 | try: | |
|
729 | self.ftp.voidcmd("NOOP") | |
|
730 | except: | |
|
731 | log.warning('Connection lost... trying to reconnect', self.name) | |
|
732 | if self.ftp is not None: | |
|
733 | self.ftp.close() | |
|
734 | self.ftp = None | |
|
735 | self.setup() | |
|
736 | ||
|
737 | def find_files(self, path, ext): | |
|
738 | ||
|
739 | files = glob.glob1(path, '*{}'.format(ext)) | |
|
740 | files.sort() | |
|
741 | if files: | |
|
742 | return files[-1] | |
|
743 | return None | |
|
744 | ||
|
745 | def getftpname(self, filename, exp_code, sub_exp_code): | |
|
746 | ||
|
747 | thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d') | |
|
748 | YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year | |
|
749 | DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday | |
|
750 | exp_code = '%3.3d'%exp_code | |
|
751 | sub_exp_code = '%2.2d'%sub_exp_code | |
|
752 | plot_code = '%2.2d'% PLOT_CODES[filename.split('_')[0].split('-')[0]] | |
|
753 | name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png' | |
|
754 | return name | |
|
755 | ||
|
756 | def upload(self, src, dst): | |
|
757 | ||
|
758 | log.log('Uploading {} '.format(src), self.name, nl=False) | |
|
759 | ||
|
760 | fp = open(src, 'rb') | |
|
761 | command = 'STOR {}'.format(dst) | |
|
762 | ||
|
763 | try: | |
|
764 | self.ftp.storbinary(command, fp, blocksize=1024) | |
|
765 | except ftplib.all_errors, e: | |
|
766 | log.error('{}'.format(e), self.name) | |
|
767 | if self.ftp is not None: | |
|
768 | self.ftp.close() | |
|
769 | self.ftp = None | |
|
770 | return | |
|
771 | ||
|
772 | try: | |
|
773 | self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst)) | |
|
774 | except ftplib.all_errors, e: | |
|
775 | log.error('{}'.format(e), self.name) | |
|
776 | if self.ftp is not None: | |
|
777 | self.ftp.close() | |
|
778 | self.ftp = None | |
|
779 | ||
|
780 | fp.close() | |
|
781 | ||
|
782 | log.success('OK', tag='') | |
|
783 | ||
|
784 | def send_files(self): | |
|
785 | ||
|
786 | for x, pattern in enumerate(self.patterns): | |
|
787 | local, remote, ext, delay, exp_code, sub_exp_code = pattern | |
|
788 | if time.time()-self.times[x] >= delay: | |
|
789 | srcname = self.find_files(local, ext) | |
|
790 | ||
|
791 | if srcname is None or srcname == self.latest[x]: | |
|
792 | continue | |
|
793 | ||
|
794 | if 'png' in ext: | |
|
795 | dstname = self.getftpname(srcname, exp_code, sub_exp_code) | |
|
796 | else: | |
|
797 | dstname = srcname | |
|
798 | ||
|
799 | src = os.path.join(local, srcname) | |
|
800 | ||
|
801 | if os.path.getmtime(src) < time.time() - 30*60: | |
|
802 | continue | |
|
803 | ||
|
804 | dst = os.path.join(remote, dstname) | |
|
805 | ||
|
806 | if self.ftp is None: | |
|
807 | continue | |
|
808 | ||
|
809 | self.upload(src, dst) | |
|
810 | ||
|
811 | self.times[x] = time.time() | |
|
812 | self.latest[x] = srcname | |
|
813 | ||
|
814 | def run(self): | |
|
815 | ||
|
816 | while True: | |
|
817 | if not self.isConfig: | |
|
818 | self.setup() | |
|
819 | if self.ftp is not None: | |
|
820 | self.check() | |
|
821 | self.send_files() | |
|
822 | time.sleep(2) | |
|
823 | ||
|
824 | def close(): | |
|
825 | ||
|
826 | if self.ftp is not None: | |
|
827 | if self.ftp is not None: | |
|
828 | self.ftp.close() | |
|
829 | self.terminate() |
General Comments 0
You need to be logged in to leave comments.
Login now