@@ -3,9 +3,10 import os | |||
|
3 | 3 | import sys |
|
4 | 4 | import zmq |
|
5 | 5 | import time |
|
6 | import numpy | |
|
6 | 7 | import datetime |
|
7 | 8 | from functools import wraps |
|
8 | import numpy | |
|
9 | from threading import Thread | |
|
9 | 10 | import matplotlib |
|
10 | 11 | |
|
11 | 12 | if 'BACKEND' in os.environ: |
@@ -13,7 +14,7 if 'BACKEND' in os.environ: | |||
|
13 | 14 | elif 'linux' in sys.platform: |
|
14 | 15 | matplotlib.use("TkAgg") |
|
15 | 16 | elif 'darwin' in sys.platform: |
|
16 |
matplotlib.use(' |
|
|
17 | matplotlib.use('WxAgg') | |
|
17 | 18 | else: |
|
18 | 19 | from schainpy.utils import log |
|
19 | 20 | log.warning('Using default Backend="Agg"', 'INFO') |
@@ -40,7 +41,6 CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis', | |||
|
40 | 41 | |
|
41 | 42 | EARTH_RADIUS = 6.3710e3 |
|
42 | 43 | |
|
43 | ||
|
44 | 44 | def ll2xy(lat1, lon1, lat2, lon2): |
|
45 | 45 | |
|
46 | 46 | p = 0.017453292519943295 |
@@ -138,140 +138,6 def apply_throttle(value): | |||
|
138 | 138 | |
|
139 | 139 | return fnThrottled |
|
140 | 140 | |
|
141 | @MPDecorator | |
|
142 | class Plotter(ProcessingUnit): | |
|
143 | ''' | |
|
144 | Proccessing unit to handle plot operations | |
|
145 | ''' | |
|
146 | ||
|
147 | def __init__(self): | |
|
148 | ||
|
149 | ProcessingUnit.__init__(self) | |
|
150 | ||
|
151 | def setup(self, **kwargs): | |
|
152 | ||
|
153 | self.connections = 0 | |
|
154 | self.web_address = kwargs.get('web_server', False) | |
|
155 | self.realtime = kwargs.get('realtime', False) | |
|
156 | self.localtime = kwargs.get('localtime', True) | |
|
157 | self.buffering = kwargs.get('buffering', True) | |
|
158 | self.throttle = kwargs.get('throttle', 2) | |
|
159 | self.exp_code = kwargs.get('exp_code', None) | |
|
160 | self.set_ready = apply_throttle(self.throttle) | |
|
161 | self.dates = [] | |
|
162 | self.data = PlotterData( | |
|
163 | self.plots, self.throttle, self.exp_code, self.buffering) | |
|
164 | self.isConfig = True | |
|
165 | ||
|
166 | def ready(self): | |
|
167 | ''' | |
|
168 | Set dataOut ready | |
|
169 | ''' | |
|
170 | ||
|
171 | self.data.ready = True | |
|
172 | self.dataOut.data_plt = self.data | |
|
173 | ||
|
174 | def run(self, realtime=True, localtime=True, buffering=True, | |
|
175 | throttle=2, exp_code=None, web_server=None): | |
|
176 | ||
|
177 | if not self.isConfig: | |
|
178 | self.setup(realtime=realtime, localtime=localtime, | |
|
179 | buffering=buffering, throttle=throttle, exp_code=exp_code, | |
|
180 | web_server=web_server) | |
|
181 | ||
|
182 | if self.web_address: | |
|
183 | log.success( | |
|
184 | 'Sending to web: {}'.format(self.web_address), | |
|
185 | self.name | |
|
186 | ) | |
|
187 | self.context = zmq.Context() | |
|
188 | self.sender_web = self.context.socket(zmq.REQ) | |
|
189 | self.sender_web.connect(self.web_address) | |
|
190 | self.poll = zmq.Poller() | |
|
191 | self.poll.register(self.sender_web, zmq.POLLIN) | |
|
192 | time.sleep(1) | |
|
193 | ||
|
194 | # t = Thread(target=self.event_monitor, args=(monitor,)) | |
|
195 | # t.start() | |
|
196 | ||
|
197 | self.dataOut = self.dataIn | |
|
198 | self.data.ready = False | |
|
199 | ||
|
200 | if self.dataOut.flagNoData: | |
|
201 | coerce = True | |
|
202 | else: | |
|
203 | coerce = False | |
|
204 | ||
|
205 | if self.dataOut.type == 'Parameters': | |
|
206 | tm = self.dataOut.utctimeInit | |
|
207 | else: | |
|
208 | tm = self.dataOut.utctime | |
|
209 | if self.dataOut.useLocalTime: | |
|
210 | if not self.localtime: | |
|
211 | tm += time.timezone | |
|
212 | dt = datetime.datetime.fromtimestamp(tm).date() | |
|
213 | else: | |
|
214 | if self.localtime: | |
|
215 | tm -= time.timezone | |
|
216 | dt = datetime.datetime.utcfromtimestamp(tm).date() | |
|
217 | if dt not in self.dates: | |
|
218 | if self.data: | |
|
219 | self.ready() | |
|
220 | self.data.setup() | |
|
221 | self.dates.append(dt) | |
|
222 | ||
|
223 | self.data.update(self.dataOut, tm) | |
|
224 | ||
|
225 | if False: # TODO check when publishers ends | |
|
226 | self.connections -= 1 | |
|
227 | if self.connections == 0 and dt in self.dates: | |
|
228 | self.data.ended = True | |
|
229 | self.ready() | |
|
230 | time.sleep(1) | |
|
231 | else: | |
|
232 | if self.realtime: | |
|
233 | self.ready() | |
|
234 | if self.web_address: | |
|
235 | retries = 5 | |
|
236 | while True: | |
|
237 | self.sender_web.send(self.data.jsonify()) | |
|
238 | socks = dict(self.poll.poll(5000)) | |
|
239 | if socks.get(self.sender_web) == zmq.POLLIN: | |
|
240 | reply = self.sender_web.recv_string() | |
|
241 | if reply == 'ok': | |
|
242 | log.log("Response from server ok", self.name) | |
|
243 | break | |
|
244 | else: | |
|
245 | log.warning( | |
|
246 | "Malformed reply from server: {}".format(reply), self.name) | |
|
247 | ||
|
248 | else: | |
|
249 | log.warning( | |
|
250 | "No response from server, retrying...", self.name) | |
|
251 | self.sender_web.setsockopt(zmq.LINGER, 0) | |
|
252 | self.sender_web.close() | |
|
253 | self.poll.unregister(self.sender_web) | |
|
254 | retries -= 1 | |
|
255 | if retries == 0: | |
|
256 | log.error( | |
|
257 | "Server seems to be offline, abandoning", self.name) | |
|
258 | self.sender_web = self.context.socket(zmq.REQ) | |
|
259 | self.sender_web.connect(self.web_address) | |
|
260 | self.poll.register(self.sender_web, zmq.POLLIN) | |
|
261 | time.sleep(1) | |
|
262 | break | |
|
263 | self.sender_web = self.context.socket(zmq.REQ) | |
|
264 | self.sender_web.connect(self.web_address) | |
|
265 | self.poll.register(self.sender_web, zmq.POLLIN) | |
|
266 | time.sleep(1) | |
|
267 | else: | |
|
268 | self.set_ready(self.ready, coerce=coerce) | |
|
269 | ||
|
270 | return | |
|
271 | ||
|
272 | def close(self): | |
|
273 | pass | |
|
274 | ||
|
275 | 141 | |
|
276 | 142 | @MPDecorator |
|
277 | 143 | class Plot(Operation): |
@@ -280,7 +146,7 class Plot(Operation): | |||
|
280 | 146 | ''' |
|
281 | 147 | |
|
282 | 148 | CODE = 'Figure' |
|
283 |
colormap = 'j |
|
|
149 | colormap = 'jet' | |
|
284 | 150 | bgcolor = 'white' |
|
285 | 151 | __missing = 1E30 |
|
286 | 152 | |
@@ -294,6 +160,8 class Plot(Operation): | |||
|
294 | 160 | Operation.__init__(self) |
|
295 | 161 | self.isConfig = False |
|
296 | 162 | self.isPlotConfig = False |
|
163 | self.save_counter = 1 | |
|
164 | self.sender_counter = 1 | |
|
297 | 165 | |
|
298 | 166 | def __fmtTime(self, x, pos): |
|
299 | 167 | ''' |
@@ -312,6 +180,7 class Plot(Operation): | |||
|
312 | 180 | self.localtime = kwargs.pop('localtime', True) |
|
313 | 181 | self.show = kwargs.get('show', True) |
|
314 | 182 | self.save = kwargs.get('save', False) |
|
183 | self.save_period = kwargs.get('save_period', 2) | |
|
315 | 184 | self.ftp = kwargs.get('ftp', False) |
|
316 | 185 | self.colormap = kwargs.get('colormap', self.colormap) |
|
317 | 186 | self.colormap_coh = kwargs.get('colormap_coh', 'jet') |
@@ -353,9 +222,19 class Plot(Operation): | |||
|
353 | 222 | self.buffering = kwargs.get('buffering', True) |
|
354 | 223 | self.throttle = kwargs.get('throttle', 2) |
|
355 | 224 | self.exp_code = kwargs.get('exp_code', None) |
|
225 | self.plot_server = kwargs.get('plot_server', False) | |
|
226 | self.sender_period = kwargs.get('sender_period', 2) | |
|
356 | 227 | self.__throttle_plot = apply_throttle(self.throttle) |
|
357 | 228 | self.data = PlotterData( |
|
358 | 229 | self.CODE, self.throttle, self.exp_code, self.buffering) |
|
230 | ||
|
231 | if self.plot_server: | |
|
232 | if not self.plot_server.startswith('tcp://'): | |
|
233 | self.plot_server = 'tcp://{}'.format(self.plot_server) | |
|
234 | log.success( | |
|
235 | 'Sending to server: {}'.format(self.plot_server), | |
|
236 | self.name | |
|
237 | ) | |
|
359 | 238 | |
|
360 | 239 | def __setup_plot(self): |
|
361 | 240 | ''' |
@@ -538,20 +417,6 class Plot(Operation): | |||
|
538 | 417 | ax.figure.add_axes(nax) |
|
539 | 418 | return nax |
|
540 | 419 | |
|
541 | def setup(self): | |
|
542 | ''' | |
|
543 | This method should be implemented in the child class, the following | |
|
544 | attributes should be set: | |
|
545 | ||
|
546 | self.nrows: number of rows | |
|
547 | self.ncols: number of cols | |
|
548 | self.nplots: number of plots (channels or pairs) | |
|
549 | self.ylabel: label for Y axes | |
|
550 | self.titles: list of axes title | |
|
551 | ||
|
552 | ''' | |
|
553 | raise NotImplementedError | |
|
554 | ||
|
555 | 420 | def fill_gaps(self, x_buffer, y_buffer, z_buffer): |
|
556 | 421 | ''' |
|
557 | 422 | Create a masked array for missing data |
@@ -722,14 +587,14 class Plot(Operation): | |||
|
722 | 587 | Main function to plot, format and save figures |
|
723 | 588 | ''' |
|
724 | 589 | |
|
725 |
|
|
|
726 | self.plot() | |
|
727 | self.format() | |
|
728 |
|
|
|
729 |
|
|
|
730 |
|
|
|
731 |
|
|
|
732 |
|
|
|
590 | try: | |
|
591 | self.plot() | |
|
592 | self.format() | |
|
593 | except Exception as e: | |
|
594 | log.warning('{} Plot could not be updated... check data'.format( | |
|
595 | self.CODE), self.name) | |
|
596 | log.error(str(e), '') | |
|
597 | return | |
|
733 | 598 | |
|
734 | 599 | for n, fig in enumerate(self.figures): |
|
735 | 600 | if self.nrows == 0 or self.nplots == 0: |
@@ -744,30 +609,118 class Plot(Operation): | |||
|
744 | 609 | fig.canvas.draw() |
|
745 | 610 | |
|
746 | 611 | if self.save: |
|
612 | self.save_figure(n) | |
|
613 | ||
|
614 | if self.plot_server: | |
|
615 | self.send_to_server() | |
|
616 | # t = Thread(target=self.send_to_server) | |
|
617 | # t.start() | |
|
747 | 618 | |
|
748 | if self.save_labels: | |
|
749 | labels = self.save_labels | |
|
750 | else: | |
|
751 | labels = list(range(self.nrows)) | |
|
619 | def save_figure(self, n): | |
|
620 | ''' | |
|
621 | ''' | |
|
752 | 622 | |
|
753 | if self.oneFigure: | |
|
754 | label = '' | |
|
755 |
|
|
|
756 | label = '-{}'.format(labels[n]) | |
|
757 | figname = os.path.join( | |
|
758 | self.save, | |
|
623 | if self.save_counter < self.save_period: | |
|
624 | self.save_counter += 1 | |
|
625 | return | |
|
626 | ||
|
627 | self.save_counter = 1 | |
|
628 | ||
|
629 | fig = self.figures[n] | |
|
630 | ||
|
631 | if self.save_labels: | |
|
632 | labels = self.save_labels | |
|
633 | else: | |
|
634 | labels = list(range(self.nrows)) | |
|
635 | ||
|
636 | if self.oneFigure: | |
|
637 | label = '' | |
|
638 | else: | |
|
639 | label = '-{}'.format(labels[n]) | |
|
640 | figname = os.path.join( | |
|
641 | self.save, | |
|
642 | self.CODE, | |
|
643 | '{}{}_{}.png'.format( | |
|
644 | self.CODE, | |
|
645 | label, | |
|
646 | self.getDateTime(self.data.max_time).strftime( | |
|
647 | '%Y%m%d_%H%M%S' | |
|
648 | ), | |
|
649 | ) | |
|
650 | ) | |
|
651 | log.log('Saving figure: {}'.format(figname), self.name) | |
|
652 | if not os.path.isdir(os.path.dirname(figname)): | |
|
653 | os.makedirs(os.path.dirname(figname)) | |
|
654 | fig.savefig(figname) | |
|
655 | ||
|
656 | if self.realtime: | |
|
657 | figname = os.path.join( | |
|
658 | self.save, | |
|
659 | '{}{}_{}.png'.format( | |
|
759 | 660 | self.CODE, |
|
760 |
|
|
|
761 | self.CODE, | |
|
762 |
|
|
|
763 | self.getDateTime(self.data.max_time).strftime( | |
|
764 | '%Y%m%d_%H%M%S'), | |
|
661 | label, | |
|
662 | self.getDateTime(self.data.min_time).strftime( | |
|
663 | '%Y%m%d' | |
|
664 | ), | |
|
765 | 665 | ) |
|
766 | 666 | ) |
|
767 | log.log('Saving figure: {}'.format(figname), self.name) | |
|
768 | if not os.path.isdir(os.path.dirname(figname)): | |
|
769 | os.makedirs(os.path.dirname(figname)) | |
|
770 | fig.savefig(figname) | |
|
667 | fig.savefig(figname) | |
|
668 | ||
|
669 | def send_to_server(self): | |
|
670 | ''' | |
|
671 | ''' | |
|
672 | ||
|
673 | if self.sender_counter < self.sender_period: | |
|
674 | self.sender_counter += 1 | |
|
675 | ||
|
676 | self.sender_counter = 1 | |
|
677 | ||
|
678 | retries = 2 | |
|
679 | while True: | |
|
680 | self.socket.send_string(self.data.jsonify()) | |
|
681 | socks = dict(self.poll.poll(5000)) | |
|
682 | if socks.get(self.socket) == zmq.POLLIN: | |
|
683 | reply = self.socket.recv_string() | |
|
684 | if reply == 'ok': | |
|
685 | log.log("Response from server ok", self.name) | |
|
686 | break | |
|
687 | else: | |
|
688 | log.warning( | |
|
689 | "Malformed reply from server: {}".format(reply), self.name) | |
|
690 | ||
|
691 | else: | |
|
692 | log.warning( | |
|
693 | "No response from server, retrying...", self.name) | |
|
694 | self.socket.setsockopt(zmq.LINGER, 0) | |
|
695 | self.socket.close() | |
|
696 | self.poll.unregister(self.socket) | |
|
697 | retries -= 1 | |
|
698 | if retries == 0: | |
|
699 | log.error( | |
|
700 | "Server seems to be offline, abandoning", self.name) | |
|
701 | self.socket = self.context.socket(zmq.REQ) | |
|
702 | self.socket.connect(self.plot_server) | |
|
703 | self.poll.register(self.socket, zmq.POLLIN) | |
|
704 | time.sleep(1) | |
|
705 | break | |
|
706 | self.socket = self.context.socket(zmq.REQ) | |
|
707 | self.socket.connect(self.plot_server) | |
|
708 | self.poll.register(self.socket, zmq.POLLIN) | |
|
709 | time.sleep(0.5) | |
|
710 | ||
|
711 | def setup(self): | |
|
712 | ''' | |
|
713 | This method should be implemented in the child class, the following | |
|
714 | attributes should be set: | |
|
715 | ||
|
716 | self.nrows: number of rows | |
|
717 | self.ncols: number of cols | |
|
718 | self.nplots: number of plots (channels or pairs) | |
|
719 | self.ylabel: label for Y axes | |
|
720 | self.titles: list of axes title | |
|
721 | ||
|
722 | ''' | |
|
723 | raise NotImplementedError | |
|
771 | 724 | |
|
772 | 725 | def plot(self): |
|
773 | 726 | ''' |
@@ -786,6 +739,12 class Plot(Operation): | |||
|
786 | 739 | self.__setup(**kwargs) |
|
787 | 740 | self.data.setup() |
|
788 | 741 | self.isConfig = True |
|
742 | if self.plot_server: | |
|
743 | self.context = zmq.Context() | |
|
744 | self.socket = self.context.socket(zmq.REQ) | |
|
745 | self.socket.connect(self.plot_server) | |
|
746 | self.poll = zmq.Poller() | |
|
747 | self.poll.register(self.socket, zmq.POLLIN) | |
|
789 | 748 | |
|
790 | 749 | if dataOut.type == 'Parameters': |
|
791 | 750 | tm = dataOut.utctimeInit |
@@ -12,11 +12,11 Based on: | |||
|
12 | 12 | $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $ |
|
13 | 13 | ''' |
|
14 | 14 | |
|
15 | import os | |
|
15 | 16 | import inspect |
|
16 | 17 | import zmq |
|
17 | 18 | import time |
|
18 | 19 | import pickle |
|
19 | import os | |
|
20 | 20 | from multiprocessing import Process |
|
21 | 21 | from zmq.utils.monitor import recv_monitor_message |
|
22 | 22 | |
@@ -250,7 +250,7 def MPDecorator(BaseClass): | |||
|
250 | 250 | while True: |
|
251 | 251 | |
|
252 | 252 | BaseClass.run(self, **self.kwargs) |
|
253 | ||
|
253 | ||
|
254 | 254 | for op, optype, opId, kwargs in self.operations: |
|
255 | 255 | if optype == 'self' and not self.dataOut.flagNoData: |
|
256 | 256 | op(**kwargs) |
General Comments 0
You need to be logged in to leave comments.
Login now