##// END OF EJS Templates
Allow to send data to new realtime app
Juan C. Espinoza -
r1212:bae4ef96bf63
parent child
Show More
@@ -3,9 +3,10 import os
3 import sys
3 import sys
4 import zmq
4 import zmq
5 import time
5 import time
6 import numpy
6 import datetime
7 import datetime
7 from functools import wraps
8 from functools import wraps
8 import numpy
9 from threading import Thread
9 import matplotlib
10 import matplotlib
10
11
11 if 'BACKEND' in os.environ:
12 if 'BACKEND' in os.environ:
@@ -13,7 +14,7 if 'BACKEND' in os.environ:
13 elif 'linux' in sys.platform:
14 elif 'linux' in sys.platform:
14 matplotlib.use("TkAgg")
15 matplotlib.use("TkAgg")
15 elif 'darwin' in sys.platform:
16 elif 'darwin' in sys.platform:
16 matplotlib.use('TkAgg')
17 matplotlib.use('WxAgg')
17 else:
18 else:
18 from schainpy.utils import log
19 from schainpy.utils import log
19 log.warning('Using default Backend="Agg"', 'INFO')
20 log.warning('Using default Backend="Agg"', 'INFO')
@@ -40,7 +41,6 CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis',
40
41
41 EARTH_RADIUS = 6.3710e3
42 EARTH_RADIUS = 6.3710e3
42
43
43
44 def ll2xy(lat1, lon1, lat2, lon2):
44 def ll2xy(lat1, lon1, lat2, lon2):
45
45
46 p = 0.017453292519943295
46 p = 0.017453292519943295
@@ -138,140 +138,6 def apply_throttle(value):
138
138
139 return fnThrottled
139 return fnThrottled
140
140
141 @MPDecorator
142 class Plotter(ProcessingUnit):
143 '''
144 Proccessing unit to handle plot operations
145 '''
146
147 def __init__(self):
148
149 ProcessingUnit.__init__(self)
150
151 def setup(self, **kwargs):
152
153 self.connections = 0
154 self.web_address = kwargs.get('web_server', False)
155 self.realtime = kwargs.get('realtime', False)
156 self.localtime = kwargs.get('localtime', True)
157 self.buffering = kwargs.get('buffering', True)
158 self.throttle = kwargs.get('throttle', 2)
159 self.exp_code = kwargs.get('exp_code', None)
160 self.set_ready = apply_throttle(self.throttle)
161 self.dates = []
162 self.data = PlotterData(
163 self.plots, self.throttle, self.exp_code, self.buffering)
164 self.isConfig = True
165
166 def ready(self):
167 '''
168 Set dataOut ready
169 '''
170
171 self.data.ready = True
172 self.dataOut.data_plt = self.data
173
174 def run(self, realtime=True, localtime=True, buffering=True,
175 throttle=2, exp_code=None, web_server=None):
176
177 if not self.isConfig:
178 self.setup(realtime=realtime, localtime=localtime,
179 buffering=buffering, throttle=throttle, exp_code=exp_code,
180 web_server=web_server)
181
182 if self.web_address:
183 log.success(
184 'Sending to web: {}'.format(self.web_address),
185 self.name
186 )
187 self.context = zmq.Context()
188 self.sender_web = self.context.socket(zmq.REQ)
189 self.sender_web.connect(self.web_address)
190 self.poll = zmq.Poller()
191 self.poll.register(self.sender_web, zmq.POLLIN)
192 time.sleep(1)
193
194 # t = Thread(target=self.event_monitor, args=(monitor,))
195 # t.start()
196
197 self.dataOut = self.dataIn
198 self.data.ready = False
199
200 if self.dataOut.flagNoData:
201 coerce = True
202 else:
203 coerce = False
204
205 if self.dataOut.type == 'Parameters':
206 tm = self.dataOut.utctimeInit
207 else:
208 tm = self.dataOut.utctime
209 if self.dataOut.useLocalTime:
210 if not self.localtime:
211 tm += time.timezone
212 dt = datetime.datetime.fromtimestamp(tm).date()
213 else:
214 if self.localtime:
215 tm -= time.timezone
216 dt = datetime.datetime.utcfromtimestamp(tm).date()
217 if dt not in self.dates:
218 if self.data:
219 self.ready()
220 self.data.setup()
221 self.dates.append(dt)
222
223 self.data.update(self.dataOut, tm)
224
225 if False: # TODO check when publishers ends
226 self.connections -= 1
227 if self.connections == 0 and dt in self.dates:
228 self.data.ended = True
229 self.ready()
230 time.sleep(1)
231 else:
232 if self.realtime:
233 self.ready()
234 if self.web_address:
235 retries = 5
236 while True:
237 self.sender_web.send(self.data.jsonify())
238 socks = dict(self.poll.poll(5000))
239 if socks.get(self.sender_web) == zmq.POLLIN:
240 reply = self.sender_web.recv_string()
241 if reply == 'ok':
242 log.log("Response from server ok", self.name)
243 break
244 else:
245 log.warning(
246 "Malformed reply from server: {}".format(reply), self.name)
247
248 else:
249 log.warning(
250 "No response from server, retrying...", self.name)
251 self.sender_web.setsockopt(zmq.LINGER, 0)
252 self.sender_web.close()
253 self.poll.unregister(self.sender_web)
254 retries -= 1
255 if retries == 0:
256 log.error(
257 "Server seems to be offline, abandoning", self.name)
258 self.sender_web = self.context.socket(zmq.REQ)
259 self.sender_web.connect(self.web_address)
260 self.poll.register(self.sender_web, zmq.POLLIN)
261 time.sleep(1)
262 break
263 self.sender_web = self.context.socket(zmq.REQ)
264 self.sender_web.connect(self.web_address)
265 self.poll.register(self.sender_web, zmq.POLLIN)
266 time.sleep(1)
267 else:
268 self.set_ready(self.ready, coerce=coerce)
269
270 return
271
272 def close(self):
273 pass
274
275
141
276 @MPDecorator
142 @MPDecorator
277 class Plot(Operation):
143 class Plot(Operation):
@@ -280,7 +146,7 class Plot(Operation):
280 '''
146 '''
281
147
282 CODE = 'Figure'
148 CODE = 'Figure'
283 colormap = 'jro'
149 colormap = 'jet'
284 bgcolor = 'white'
150 bgcolor = 'white'
285 __missing = 1E30
151 __missing = 1E30
286
152
@@ -294,6 +160,8 class Plot(Operation):
294 Operation.__init__(self)
160 Operation.__init__(self)
295 self.isConfig = False
161 self.isConfig = False
296 self.isPlotConfig = False
162 self.isPlotConfig = False
163 self.save_counter = 1
164 self.sender_counter = 1
297
165
298 def __fmtTime(self, x, pos):
166 def __fmtTime(self, x, pos):
299 '''
167 '''
@@ -312,6 +180,7 class Plot(Operation):
312 self.localtime = kwargs.pop('localtime', True)
180 self.localtime = kwargs.pop('localtime', True)
313 self.show = kwargs.get('show', True)
181 self.show = kwargs.get('show', True)
314 self.save = kwargs.get('save', False)
182 self.save = kwargs.get('save', False)
183 self.save_period = kwargs.get('save_period', 2)
315 self.ftp = kwargs.get('ftp', False)
184 self.ftp = kwargs.get('ftp', False)
316 self.colormap = kwargs.get('colormap', self.colormap)
185 self.colormap = kwargs.get('colormap', self.colormap)
317 self.colormap_coh = kwargs.get('colormap_coh', 'jet')
186 self.colormap_coh = kwargs.get('colormap_coh', 'jet')
@@ -353,10 +222,20 class Plot(Operation):
353 self.buffering = kwargs.get('buffering', True)
222 self.buffering = kwargs.get('buffering', True)
354 self.throttle = kwargs.get('throttle', 2)
223 self.throttle = kwargs.get('throttle', 2)
355 self.exp_code = kwargs.get('exp_code', None)
224 self.exp_code = kwargs.get('exp_code', None)
225 self.plot_server = kwargs.get('plot_server', False)
226 self.sender_period = kwargs.get('sender_period', 2)
356 self.__throttle_plot = apply_throttle(self.throttle)
227 self.__throttle_plot = apply_throttle(self.throttle)
357 self.data = PlotterData(
228 self.data = PlotterData(
358 self.CODE, self.throttle, self.exp_code, self.buffering)
229 self.CODE, self.throttle, self.exp_code, self.buffering)
359
230
231 if self.plot_server:
232 if not self.plot_server.startswith('tcp://'):
233 self.plot_server = 'tcp://{}'.format(self.plot_server)
234 log.success(
235 'Sending to server: {}'.format(self.plot_server),
236 self.name
237 )
238
360 def __setup_plot(self):
239 def __setup_plot(self):
361 '''
240 '''
362 Common setup for all figures, here figures and axes are created
241 Common setup for all figures, here figures and axes are created
@@ -538,20 +417,6 class Plot(Operation):
538 ax.figure.add_axes(nax)
417 ax.figure.add_axes(nax)
539 return nax
418 return nax
540
419
541 def setup(self):
542 '''
543 This method should be implemented in the child class, the following
544 attributes should be set:
545
546 self.nrows: number of rows
547 self.ncols: number of cols
548 self.nplots: number of plots (channels or pairs)
549 self.ylabel: label for Y axes
550 self.titles: list of axes title
551
552 '''
553 raise NotImplementedError
554
555 def fill_gaps(self, x_buffer, y_buffer, z_buffer):
420 def fill_gaps(self, x_buffer, y_buffer, z_buffer):
556 '''
421 '''
557 Create a masked array for missing data
422 Create a masked array for missing data
@@ -722,14 +587,14 class Plot(Operation):
722 Main function to plot, format and save figures
587 Main function to plot, format and save figures
723 '''
588 '''
724
589
725 #try:
590 try:
726 self.plot()
591 self.plot()
727 self.format()
592 self.format()
728 #except Exception as e:
593 except Exception as e:
729 # log.warning('{} Plot could not be updated... check data'.format(
594 log.warning('{} Plot could not be updated... check data'.format(
730 # self.CODE), self.name)
595 self.CODE), self.name)
731 # log.error(str(e), '')
596 log.error(str(e), '')
732 # return
597 return
733
598
734 for n, fig in enumerate(self.figures):
599 for n, fig in enumerate(self.figures):
735 if self.nrows == 0 or self.nplots == 0:
600 if self.nrows == 0 or self.nplots == 0:
@@ -744,6 +609,24 class Plot(Operation):
744 fig.canvas.draw()
609 fig.canvas.draw()
745
610
746 if self.save:
611 if self.save:
612 self.save_figure(n)
613
614 if self.plot_server:
615 self.send_to_server()
616 # t = Thread(target=self.send_to_server)
617 # t.start()
618
619 def save_figure(self, n):
620 '''
621 '''
622
623 if self.save_counter < self.save_period:
624 self.save_counter += 1
625 return
626
627 self.save_counter = 1
628
629 fig = self.figures[n]
747
630
748 if self.save_labels:
631 if self.save_labels:
749 labels = self.save_labels
632 labels = self.save_labels
@@ -761,7 +644,8 class Plot(Operation):
761 self.CODE,
644 self.CODE,
762 label,
645 label,
763 self.getDateTime(self.data.max_time).strftime(
646 self.getDateTime(self.data.max_time).strftime(
764 '%Y%m%d_%H%M%S'),
647 '%Y%m%d_%H%M%S'
648 ),
765 )
649 )
766 )
650 )
767 log.log('Saving figure: {}'.format(figname), self.name)
651 log.log('Saving figure: {}'.format(figname), self.name)
@@ -769,6 +653,75 class Plot(Operation):
769 os.makedirs(os.path.dirname(figname))
653 os.makedirs(os.path.dirname(figname))
770 fig.savefig(figname)
654 fig.savefig(figname)
771
655
656 if self.realtime:
657 figname = os.path.join(
658 self.save,
659 '{}{}_{}.png'.format(
660 self.CODE,
661 label,
662 self.getDateTime(self.data.min_time).strftime(
663 '%Y%m%d'
664 ),
665 )
666 )
667 fig.savefig(figname)
668
669 def send_to_server(self):
670 '''
671 '''
672
673 if self.sender_counter < self.sender_period:
674 self.sender_counter += 1
675
676 self.sender_counter = 1
677
678 retries = 2
679 while True:
680 self.socket.send_string(self.data.jsonify())
681 socks = dict(self.poll.poll(5000))
682 if socks.get(self.socket) == zmq.POLLIN:
683 reply = self.socket.recv_string()
684 if reply == 'ok':
685 log.log("Response from server ok", self.name)
686 break
687 else:
688 log.warning(
689 "Malformed reply from server: {}".format(reply), self.name)
690
691 else:
692 log.warning(
693 "No response from server, retrying...", self.name)
694 self.socket.setsockopt(zmq.LINGER, 0)
695 self.socket.close()
696 self.poll.unregister(self.socket)
697 retries -= 1
698 if retries == 0:
699 log.error(
700 "Server seems to be offline, abandoning", self.name)
701 self.socket = self.context.socket(zmq.REQ)
702 self.socket.connect(self.plot_server)
703 self.poll.register(self.socket, zmq.POLLIN)
704 time.sleep(1)
705 break
706 self.socket = self.context.socket(zmq.REQ)
707 self.socket.connect(self.plot_server)
708 self.poll.register(self.socket, zmq.POLLIN)
709 time.sleep(0.5)
710
711 def setup(self):
712 '''
713 This method should be implemented in the child class, the following
714 attributes should be set:
715
716 self.nrows: number of rows
717 self.ncols: number of cols
718 self.nplots: number of plots (channels or pairs)
719 self.ylabel: label for Y axes
720 self.titles: list of axes title
721
722 '''
723 raise NotImplementedError
724
772 def plot(self):
725 def plot(self):
773 '''
726 '''
774 Must be defined in the child class
727 Must be defined in the child class
@@ -786,6 +739,12 class Plot(Operation):
786 self.__setup(**kwargs)
739 self.__setup(**kwargs)
787 self.data.setup()
740 self.data.setup()
788 self.isConfig = True
741 self.isConfig = True
742 if self.plot_server:
743 self.context = zmq.Context()
744 self.socket = self.context.socket(zmq.REQ)
745 self.socket.connect(self.plot_server)
746 self.poll = zmq.Poller()
747 self.poll.register(self.socket, zmq.POLLIN)
789
748
790 if dataOut.type == 'Parameters':
749 if dataOut.type == 'Parameters':
791 tm = dataOut.utctimeInit
750 tm = dataOut.utctimeInit
@@ -12,11 +12,11 Based on:
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 '''
13 '''
14
14
15 import os
15 import inspect
16 import inspect
16 import zmq
17 import zmq
17 import time
18 import time
18 import pickle
19 import pickle
19 import os
20 from multiprocessing import Process
20 from multiprocessing import Process
21 from zmq.utils.monitor import recv_monitor_message
21 from zmq.utils.monitor import recv_monitor_message
22
22
General Comments 0
You need to be logged in to leave comments. Login now