@@ -3,9 +3,10 import os | |||||
3 | import sys |
|
3 | import sys | |
4 | import zmq |
|
4 | import zmq | |
5 | import time |
|
5 | import time | |
|
6 | import numpy | |||
6 | import datetime |
|
7 | import datetime | |
7 | from functools import wraps |
|
8 | from functools import wraps | |
8 | import numpy |
|
9 | from threading import Thread | |
9 | import matplotlib |
|
10 | import matplotlib | |
10 |
|
11 | |||
11 | if 'BACKEND' in os.environ: |
|
12 | if 'BACKEND' in os.environ: | |
@@ -13,7 +14,7 if 'BACKEND' in os.environ: | |||||
13 | elif 'linux' in sys.platform: |
|
14 | elif 'linux' in sys.platform: | |
14 | matplotlib.use("TkAgg") |
|
15 | matplotlib.use("TkAgg") | |
15 | elif 'darwin' in sys.platform: |
|
16 | elif 'darwin' in sys.platform: | |
16 |
matplotlib.use(' |
|
17 | matplotlib.use('WxAgg') | |
17 | else: |
|
18 | else: | |
18 | from schainpy.utils import log |
|
19 | from schainpy.utils import log | |
19 | log.warning('Using default Backend="Agg"', 'INFO') |
|
20 | log.warning('Using default Backend="Agg"', 'INFO') | |
@@ -40,7 +41,6 CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis', | |||||
40 |
|
41 | |||
41 | EARTH_RADIUS = 6.3710e3 |
|
42 | EARTH_RADIUS = 6.3710e3 | |
42 |
|
43 | |||
43 |
|
||||
44 | def ll2xy(lat1, lon1, lat2, lon2): |
|
44 | def ll2xy(lat1, lon1, lat2, lon2): | |
45 |
|
45 | |||
46 | p = 0.017453292519943295 |
|
46 | p = 0.017453292519943295 | |
@@ -138,140 +138,6 def apply_throttle(value): | |||||
138 |
|
138 | |||
139 | return fnThrottled |
|
139 | return fnThrottled | |
140 |
|
140 | |||
141 | @MPDecorator |
|
|||
142 | class Plotter(ProcessingUnit): |
|
|||
143 | ''' |
|
|||
144 | Proccessing unit to handle plot operations |
|
|||
145 | ''' |
|
|||
146 |
|
||||
147 | def __init__(self): |
|
|||
148 |
|
||||
149 | ProcessingUnit.__init__(self) |
|
|||
150 |
|
||||
151 | def setup(self, **kwargs): |
|
|||
152 |
|
||||
153 | self.connections = 0 |
|
|||
154 | self.web_address = kwargs.get('web_server', False) |
|
|||
155 | self.realtime = kwargs.get('realtime', False) |
|
|||
156 | self.localtime = kwargs.get('localtime', True) |
|
|||
157 | self.buffering = kwargs.get('buffering', True) |
|
|||
158 | self.throttle = kwargs.get('throttle', 2) |
|
|||
159 | self.exp_code = kwargs.get('exp_code', None) |
|
|||
160 | self.set_ready = apply_throttle(self.throttle) |
|
|||
161 | self.dates = [] |
|
|||
162 | self.data = PlotterData( |
|
|||
163 | self.plots, self.throttle, self.exp_code, self.buffering) |
|
|||
164 | self.isConfig = True |
|
|||
165 |
|
||||
166 | def ready(self): |
|
|||
167 | ''' |
|
|||
168 | Set dataOut ready |
|
|||
169 | ''' |
|
|||
170 |
|
||||
171 | self.data.ready = True |
|
|||
172 | self.dataOut.data_plt = self.data |
|
|||
173 |
|
||||
174 | def run(self, realtime=True, localtime=True, buffering=True, |
|
|||
175 | throttle=2, exp_code=None, web_server=None): |
|
|||
176 |
|
||||
177 | if not self.isConfig: |
|
|||
178 | self.setup(realtime=realtime, localtime=localtime, |
|
|||
179 | buffering=buffering, throttle=throttle, exp_code=exp_code, |
|
|||
180 | web_server=web_server) |
|
|||
181 |
|
||||
182 | if self.web_address: |
|
|||
183 | log.success( |
|
|||
184 | 'Sending to web: {}'.format(self.web_address), |
|
|||
185 | self.name |
|
|||
186 | ) |
|
|||
187 | self.context = zmq.Context() |
|
|||
188 | self.sender_web = self.context.socket(zmq.REQ) |
|
|||
189 | self.sender_web.connect(self.web_address) |
|
|||
190 | self.poll = zmq.Poller() |
|
|||
191 | self.poll.register(self.sender_web, zmq.POLLIN) |
|
|||
192 | time.sleep(1) |
|
|||
193 |
|
||||
194 | # t = Thread(target=self.event_monitor, args=(monitor,)) |
|
|||
195 | # t.start() |
|
|||
196 |
|
||||
197 | self.dataOut = self.dataIn |
|
|||
198 | self.data.ready = False |
|
|||
199 |
|
||||
200 | if self.dataOut.flagNoData: |
|
|||
201 | coerce = True |
|
|||
202 | else: |
|
|||
203 | coerce = False |
|
|||
204 |
|
||||
205 | if self.dataOut.type == 'Parameters': |
|
|||
206 | tm = self.dataOut.utctimeInit |
|
|||
207 | else: |
|
|||
208 | tm = self.dataOut.utctime |
|
|||
209 | if self.dataOut.useLocalTime: |
|
|||
210 | if not self.localtime: |
|
|||
211 | tm += time.timezone |
|
|||
212 | dt = datetime.datetime.fromtimestamp(tm).date() |
|
|||
213 | else: |
|
|||
214 | if self.localtime: |
|
|||
215 | tm -= time.timezone |
|
|||
216 | dt = datetime.datetime.utcfromtimestamp(tm).date() |
|
|||
217 | if dt not in self.dates: |
|
|||
218 | if self.data: |
|
|||
219 | self.ready() |
|
|||
220 | self.data.setup() |
|
|||
221 | self.dates.append(dt) |
|
|||
222 |
|
||||
223 | self.data.update(self.dataOut, tm) |
|
|||
224 |
|
||||
225 | if False: # TODO check when publishers ends |
|
|||
226 | self.connections -= 1 |
|
|||
227 | if self.connections == 0 and dt in self.dates: |
|
|||
228 | self.data.ended = True |
|
|||
229 | self.ready() |
|
|||
230 | time.sleep(1) |
|
|||
231 | else: |
|
|||
232 | if self.realtime: |
|
|||
233 | self.ready() |
|
|||
234 | if self.web_address: |
|
|||
235 | retries = 5 |
|
|||
236 | while True: |
|
|||
237 | self.sender_web.send(self.data.jsonify()) |
|
|||
238 | socks = dict(self.poll.poll(5000)) |
|
|||
239 | if socks.get(self.sender_web) == zmq.POLLIN: |
|
|||
240 | reply = self.sender_web.recv_string() |
|
|||
241 | if reply == 'ok': |
|
|||
242 | log.log("Response from server ok", self.name) |
|
|||
243 | break |
|
|||
244 | else: |
|
|||
245 | log.warning( |
|
|||
246 | "Malformed reply from server: {}".format(reply), self.name) |
|
|||
247 |
|
||||
248 | else: |
|
|||
249 | log.warning( |
|
|||
250 | "No response from server, retrying...", self.name) |
|
|||
251 | self.sender_web.setsockopt(zmq.LINGER, 0) |
|
|||
252 | self.sender_web.close() |
|
|||
253 | self.poll.unregister(self.sender_web) |
|
|||
254 | retries -= 1 |
|
|||
255 | if retries == 0: |
|
|||
256 | log.error( |
|
|||
257 | "Server seems to be offline, abandoning", self.name) |
|
|||
258 | self.sender_web = self.context.socket(zmq.REQ) |
|
|||
259 | self.sender_web.connect(self.web_address) |
|
|||
260 | self.poll.register(self.sender_web, zmq.POLLIN) |
|
|||
261 | time.sleep(1) |
|
|||
262 | break |
|
|||
263 | self.sender_web = self.context.socket(zmq.REQ) |
|
|||
264 | self.sender_web.connect(self.web_address) |
|
|||
265 | self.poll.register(self.sender_web, zmq.POLLIN) |
|
|||
266 | time.sleep(1) |
|
|||
267 | else: |
|
|||
268 | self.set_ready(self.ready, coerce=coerce) |
|
|||
269 |
|
||||
270 | return |
|
|||
271 |
|
||||
272 | def close(self): |
|
|||
273 | pass |
|
|||
274 |
|
||||
275 |
|
141 | |||
276 | @MPDecorator |
|
142 | @MPDecorator | |
277 | class Plot(Operation): |
|
143 | class Plot(Operation): | |
@@ -280,7 +146,7 class Plot(Operation): | |||||
280 | ''' |
|
146 | ''' | |
281 |
|
147 | |||
282 | CODE = 'Figure' |
|
148 | CODE = 'Figure' | |
283 |
colormap = 'j |
|
149 | colormap = 'jet' | |
284 | bgcolor = 'white' |
|
150 | bgcolor = 'white' | |
285 | __missing = 1E30 |
|
151 | __missing = 1E30 | |
286 |
|
152 | |||
@@ -294,6 +160,8 class Plot(Operation): | |||||
294 | Operation.__init__(self) |
|
160 | Operation.__init__(self) | |
295 | self.isConfig = False |
|
161 | self.isConfig = False | |
296 | self.isPlotConfig = False |
|
162 | self.isPlotConfig = False | |
|
163 | self.save_counter = 1 | |||
|
164 | self.sender_counter = 1 | |||
297 |
|
165 | |||
298 | def __fmtTime(self, x, pos): |
|
166 | def __fmtTime(self, x, pos): | |
299 | ''' |
|
167 | ''' | |
@@ -312,6 +180,7 class Plot(Operation): | |||||
312 | self.localtime = kwargs.pop('localtime', True) |
|
180 | self.localtime = kwargs.pop('localtime', True) | |
313 | self.show = kwargs.get('show', True) |
|
181 | self.show = kwargs.get('show', True) | |
314 | self.save = kwargs.get('save', False) |
|
182 | self.save = kwargs.get('save', False) | |
|
183 | self.save_period = kwargs.get('save_period', 2) | |||
315 | self.ftp = kwargs.get('ftp', False) |
|
184 | self.ftp = kwargs.get('ftp', False) | |
316 | self.colormap = kwargs.get('colormap', self.colormap) |
|
185 | self.colormap = kwargs.get('colormap', self.colormap) | |
317 | self.colormap_coh = kwargs.get('colormap_coh', 'jet') |
|
186 | self.colormap_coh = kwargs.get('colormap_coh', 'jet') | |
@@ -353,10 +222,20 class Plot(Operation): | |||||
353 | self.buffering = kwargs.get('buffering', True) |
|
222 | self.buffering = kwargs.get('buffering', True) | |
354 | self.throttle = kwargs.get('throttle', 2) |
|
223 | self.throttle = kwargs.get('throttle', 2) | |
355 | self.exp_code = kwargs.get('exp_code', None) |
|
224 | self.exp_code = kwargs.get('exp_code', None) | |
|
225 | self.plot_server = kwargs.get('plot_server', False) | |||
|
226 | self.sender_period = kwargs.get('sender_period', 2) | |||
356 | self.__throttle_plot = apply_throttle(self.throttle) |
|
227 | self.__throttle_plot = apply_throttle(self.throttle) | |
357 | self.data = PlotterData( |
|
228 | self.data = PlotterData( | |
358 | self.CODE, self.throttle, self.exp_code, self.buffering) |
|
229 | self.CODE, self.throttle, self.exp_code, self.buffering) | |
359 |
|
230 | |||
|
231 | if self.plot_server: | |||
|
232 | if not self.plot_server.startswith('tcp://'): | |||
|
233 | self.plot_server = 'tcp://{}'.format(self.plot_server) | |||
|
234 | log.success( | |||
|
235 | 'Sending to server: {}'.format(self.plot_server), | |||
|
236 | self.name | |||
|
237 | ) | |||
|
238 | ||||
360 | def __setup_plot(self): |
|
239 | def __setup_plot(self): | |
361 | ''' |
|
240 | ''' | |
362 | Common setup for all figures, here figures and axes are created |
|
241 | Common setup for all figures, here figures and axes are created | |
@@ -538,20 +417,6 class Plot(Operation): | |||||
538 | ax.figure.add_axes(nax) |
|
417 | ax.figure.add_axes(nax) | |
539 | return nax |
|
418 | return nax | |
540 |
|
419 | |||
541 | def setup(self): |
|
|||
542 | ''' |
|
|||
543 | This method should be implemented in the child class, the following |
|
|||
544 | attributes should be set: |
|
|||
545 |
|
||||
546 | self.nrows: number of rows |
|
|||
547 | self.ncols: number of cols |
|
|||
548 | self.nplots: number of plots (channels or pairs) |
|
|||
549 | self.ylabel: label for Y axes |
|
|||
550 | self.titles: list of axes title |
|
|||
551 |
|
||||
552 | ''' |
|
|||
553 | raise NotImplementedError |
|
|||
554 |
|
||||
555 | def fill_gaps(self, x_buffer, y_buffer, z_buffer): |
|
420 | def fill_gaps(self, x_buffer, y_buffer, z_buffer): | |
556 | ''' |
|
421 | ''' | |
557 | Create a masked array for missing data |
|
422 | Create a masked array for missing data | |
@@ -722,14 +587,14 class Plot(Operation): | |||||
722 | Main function to plot, format and save figures |
|
587 | Main function to plot, format and save figures | |
723 | ''' |
|
588 | ''' | |
724 |
|
589 | |||
725 |
|
|
590 | try: | |
726 | self.plot() |
|
591 | self.plot() | |
727 | self.format() |
|
592 | self.format() | |
728 |
|
|
593 | except Exception as e: | |
729 |
|
|
594 | log.warning('{} Plot could not be updated... check data'.format( | |
730 |
|
|
595 | self.CODE), self.name) | |
731 |
|
|
596 | log.error(str(e), '') | |
732 |
|
|
597 | return | |
733 |
|
598 | |||
734 | for n, fig in enumerate(self.figures): |
|
599 | for n, fig in enumerate(self.figures): | |
735 | if self.nrows == 0 or self.nplots == 0: |
|
600 | if self.nrows == 0 or self.nplots == 0: | |
@@ -744,6 +609,24 class Plot(Operation): | |||||
744 | fig.canvas.draw() |
|
609 | fig.canvas.draw() | |
745 |
|
610 | |||
746 | if self.save: |
|
611 | if self.save: | |
|
612 | self.save_figure(n) | |||
|
613 | ||||
|
614 | if self.plot_server: | |||
|
615 | self.send_to_server() | |||
|
616 | # t = Thread(target=self.send_to_server) | |||
|
617 | # t.start() | |||
|
618 | ||||
|
619 | def save_figure(self, n): | |||
|
620 | ''' | |||
|
621 | ''' | |||
|
622 | ||||
|
623 | if self.save_counter < self.save_period: | |||
|
624 | self.save_counter += 1 | |||
|
625 | return | |||
|
626 | ||||
|
627 | self.save_counter = 1 | |||
|
628 | ||||
|
629 | fig = self.figures[n] | |||
747 |
|
630 | |||
748 |
|
|
631 | if self.save_labels: | |
749 |
|
|
632 | labels = self.save_labels | |
@@ -761,7 +644,8 class Plot(Operation): | |||||
761 |
|
|
644 | self.CODE, | |
762 |
|
|
645 | label, | |
763 |
|
|
646 | self.getDateTime(self.data.max_time).strftime( | |
764 |
|
|
647 | '%Y%m%d_%H%M%S' | |
|
648 | ), | |||
765 |
|
|
649 | ) | |
766 |
|
|
650 | ) | |
767 |
|
|
651 | log.log('Saving figure: {}'.format(figname), self.name) | |
@@ -769,6 +653,75 class Plot(Operation): | |||||
769 |
|
|
653 | os.makedirs(os.path.dirname(figname)) | |
770 |
|
|
654 | fig.savefig(figname) | |
771 |
|
655 | |||
|
656 | if self.realtime: | |||
|
657 | figname = os.path.join( | |||
|
658 | self.save, | |||
|
659 | '{}{}_{}.png'.format( | |||
|
660 | self.CODE, | |||
|
661 | label, | |||
|
662 | self.getDateTime(self.data.min_time).strftime( | |||
|
663 | '%Y%m%d' | |||
|
664 | ), | |||
|
665 | ) | |||
|
666 | ) | |||
|
667 | fig.savefig(figname) | |||
|
668 | ||||
|
669 | def send_to_server(self): | |||
|
670 | ''' | |||
|
671 | ''' | |||
|
672 | ||||
|
673 | if self.sender_counter < self.sender_period: | |||
|
674 | self.sender_counter += 1 | |||
|
675 | ||||
|
676 | self.sender_counter = 1 | |||
|
677 | ||||
|
678 | retries = 2 | |||
|
679 | while True: | |||
|
680 | self.socket.send_string(self.data.jsonify()) | |||
|
681 | socks = dict(self.poll.poll(5000)) | |||
|
682 | if socks.get(self.socket) == zmq.POLLIN: | |||
|
683 | reply = self.socket.recv_string() | |||
|
684 | if reply == 'ok': | |||
|
685 | log.log("Response from server ok", self.name) | |||
|
686 | break | |||
|
687 | else: | |||
|
688 | log.warning( | |||
|
689 | "Malformed reply from server: {}".format(reply), self.name) | |||
|
690 | ||||
|
691 | else: | |||
|
692 | log.warning( | |||
|
693 | "No response from server, retrying...", self.name) | |||
|
694 | self.socket.setsockopt(zmq.LINGER, 0) | |||
|
695 | self.socket.close() | |||
|
696 | self.poll.unregister(self.socket) | |||
|
697 | retries -= 1 | |||
|
698 | if retries == 0: | |||
|
699 | log.error( | |||
|
700 | "Server seems to be offline, abandoning", self.name) | |||
|
701 | self.socket = self.context.socket(zmq.REQ) | |||
|
702 | self.socket.connect(self.plot_server) | |||
|
703 | self.poll.register(self.socket, zmq.POLLIN) | |||
|
704 | time.sleep(1) | |||
|
705 | break | |||
|
706 | self.socket = self.context.socket(zmq.REQ) | |||
|
707 | self.socket.connect(self.plot_server) | |||
|
708 | self.poll.register(self.socket, zmq.POLLIN) | |||
|
709 | time.sleep(0.5) | |||
|
710 | ||||
|
711 | def setup(self): | |||
|
712 | ''' | |||
|
713 | This method should be implemented in the child class, the following | |||
|
714 | attributes should be set: | |||
|
715 | ||||
|
716 | self.nrows: number of rows | |||
|
717 | self.ncols: number of cols | |||
|
718 | self.nplots: number of plots (channels or pairs) | |||
|
719 | self.ylabel: label for Y axes | |||
|
720 | self.titles: list of axes title | |||
|
721 | ||||
|
722 | ''' | |||
|
723 | raise NotImplementedError | |||
|
724 | ||||
772 | def plot(self): |
|
725 | def plot(self): | |
773 | ''' |
|
726 | ''' | |
774 | Must be defined in the child class |
|
727 | Must be defined in the child class | |
@@ -786,6 +739,12 class Plot(Operation): | |||||
786 | self.__setup(**kwargs) |
|
739 | self.__setup(**kwargs) | |
787 | self.data.setup() |
|
740 | self.data.setup() | |
788 | self.isConfig = True |
|
741 | self.isConfig = True | |
|
742 | if self.plot_server: | |||
|
743 | self.context = zmq.Context() | |||
|
744 | self.socket = self.context.socket(zmq.REQ) | |||
|
745 | self.socket.connect(self.plot_server) | |||
|
746 | self.poll = zmq.Poller() | |||
|
747 | self.poll.register(self.socket, zmq.POLLIN) | |||
789 |
|
748 | |||
790 | if dataOut.type == 'Parameters': |
|
749 | if dataOut.type == 'Parameters': | |
791 | tm = dataOut.utctimeInit |
|
750 | tm = dataOut.utctimeInit |
@@ -12,11 +12,11 Based on: | |||||
12 | $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $ |
|
12 | $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $ | |
13 | ''' |
|
13 | ''' | |
14 |
|
14 | |||
|
15 | import os | |||
15 | import inspect |
|
16 | import inspect | |
16 | import zmq |
|
17 | import zmq | |
17 | import time |
|
18 | import time | |
18 | import pickle |
|
19 | import pickle | |
19 | import os |
|
|||
20 | from multiprocessing import Process |
|
20 | from multiprocessing import Process | |
21 | from zmq.utils.monitor import recv_monitor_message |
|
21 | from zmq.utils.monitor import recv_monitor_message | |
22 |
|
22 |
General Comments 0
You need to be logged in to leave comments.
Login now