##// END OF EJS Templates
Allow to send data to new realtime app
Juan C. Espinoza -
r1212:bae4ef96bf63
parent child
Show More
@@ -3,9 +3,10 import os
3 3 import sys
4 4 import zmq
5 5 import time
6 import numpy
6 7 import datetime
7 8 from functools import wraps
8 import numpy
9 from threading import Thread
9 10 import matplotlib
10 11
11 12 if 'BACKEND' in os.environ:
@@ -13,7 +14,7 if 'BACKEND' in os.environ:
13 14 elif 'linux' in sys.platform:
14 15 matplotlib.use("TkAgg")
15 16 elif 'darwin' in sys.platform:
16 matplotlib.use('TkAgg')
17 matplotlib.use('WxAgg')
17 18 else:
18 19 from schainpy.utils import log
19 20 log.warning('Using default Backend="Agg"', 'INFO')
@@ -40,7 +41,6 CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis',
40 41
41 42 EARTH_RADIUS = 6.3710e3
42 43
43
44 44 def ll2xy(lat1, lon1, lat2, lon2):
45 45
46 46 p = 0.017453292519943295
@@ -138,140 +138,6 def apply_throttle(value):
138 138
139 139 return fnThrottled
140 140
141 @MPDecorator
142 class Plotter(ProcessingUnit):
143 '''
144 Proccessing unit to handle plot operations
145 '''
146
147 def __init__(self):
148
149 ProcessingUnit.__init__(self)
150
151 def setup(self, **kwargs):
152
153 self.connections = 0
154 self.web_address = kwargs.get('web_server', False)
155 self.realtime = kwargs.get('realtime', False)
156 self.localtime = kwargs.get('localtime', True)
157 self.buffering = kwargs.get('buffering', True)
158 self.throttle = kwargs.get('throttle', 2)
159 self.exp_code = kwargs.get('exp_code', None)
160 self.set_ready = apply_throttle(self.throttle)
161 self.dates = []
162 self.data = PlotterData(
163 self.plots, self.throttle, self.exp_code, self.buffering)
164 self.isConfig = True
165
166 def ready(self):
167 '''
168 Set dataOut ready
169 '''
170
171 self.data.ready = True
172 self.dataOut.data_plt = self.data
173
174 def run(self, realtime=True, localtime=True, buffering=True,
175 throttle=2, exp_code=None, web_server=None):
176
177 if not self.isConfig:
178 self.setup(realtime=realtime, localtime=localtime,
179 buffering=buffering, throttle=throttle, exp_code=exp_code,
180 web_server=web_server)
181
182 if self.web_address:
183 log.success(
184 'Sending to web: {}'.format(self.web_address),
185 self.name
186 )
187 self.context = zmq.Context()
188 self.sender_web = self.context.socket(zmq.REQ)
189 self.sender_web.connect(self.web_address)
190 self.poll = zmq.Poller()
191 self.poll.register(self.sender_web, zmq.POLLIN)
192 time.sleep(1)
193
194 # t = Thread(target=self.event_monitor, args=(monitor,))
195 # t.start()
196
197 self.dataOut = self.dataIn
198 self.data.ready = False
199
200 if self.dataOut.flagNoData:
201 coerce = True
202 else:
203 coerce = False
204
205 if self.dataOut.type == 'Parameters':
206 tm = self.dataOut.utctimeInit
207 else:
208 tm = self.dataOut.utctime
209 if self.dataOut.useLocalTime:
210 if not self.localtime:
211 tm += time.timezone
212 dt = datetime.datetime.fromtimestamp(tm).date()
213 else:
214 if self.localtime:
215 tm -= time.timezone
216 dt = datetime.datetime.utcfromtimestamp(tm).date()
217 if dt not in self.dates:
218 if self.data:
219 self.ready()
220 self.data.setup()
221 self.dates.append(dt)
222
223 self.data.update(self.dataOut, tm)
224
225 if False: # TODO check when publishers ends
226 self.connections -= 1
227 if self.connections == 0 and dt in self.dates:
228 self.data.ended = True
229 self.ready()
230 time.sleep(1)
231 else:
232 if self.realtime:
233 self.ready()
234 if self.web_address:
235 retries = 5
236 while True:
237 self.sender_web.send(self.data.jsonify())
238 socks = dict(self.poll.poll(5000))
239 if socks.get(self.sender_web) == zmq.POLLIN:
240 reply = self.sender_web.recv_string()
241 if reply == 'ok':
242 log.log("Response from server ok", self.name)
243 break
244 else:
245 log.warning(
246 "Malformed reply from server: {}".format(reply), self.name)
247
248 else:
249 log.warning(
250 "No response from server, retrying...", self.name)
251 self.sender_web.setsockopt(zmq.LINGER, 0)
252 self.sender_web.close()
253 self.poll.unregister(self.sender_web)
254 retries -= 1
255 if retries == 0:
256 log.error(
257 "Server seems to be offline, abandoning", self.name)
258 self.sender_web = self.context.socket(zmq.REQ)
259 self.sender_web.connect(self.web_address)
260 self.poll.register(self.sender_web, zmq.POLLIN)
261 time.sleep(1)
262 break
263 self.sender_web = self.context.socket(zmq.REQ)
264 self.sender_web.connect(self.web_address)
265 self.poll.register(self.sender_web, zmq.POLLIN)
266 time.sleep(1)
267 else:
268 self.set_ready(self.ready, coerce=coerce)
269
270 return
271
272 def close(self):
273 pass
274
275 141
276 142 @MPDecorator
277 143 class Plot(Operation):
@@ -280,7 +146,7 class Plot(Operation):
280 146 '''
281 147
282 148 CODE = 'Figure'
283 colormap = 'jro'
149 colormap = 'jet'
284 150 bgcolor = 'white'
285 151 __missing = 1E30
286 152
@@ -294,6 +160,8 class Plot(Operation):
294 160 Operation.__init__(self)
295 161 self.isConfig = False
296 162 self.isPlotConfig = False
163 self.save_counter = 1
164 self.sender_counter = 1
297 165
298 166 def __fmtTime(self, x, pos):
299 167 '''
@@ -312,6 +180,7 class Plot(Operation):
312 180 self.localtime = kwargs.pop('localtime', True)
313 181 self.show = kwargs.get('show', True)
314 182 self.save = kwargs.get('save', False)
183 self.save_period = kwargs.get('save_period', 2)
315 184 self.ftp = kwargs.get('ftp', False)
316 185 self.colormap = kwargs.get('colormap', self.colormap)
317 186 self.colormap_coh = kwargs.get('colormap_coh', 'jet')
@@ -353,10 +222,20 class Plot(Operation):
353 222 self.buffering = kwargs.get('buffering', True)
354 223 self.throttle = kwargs.get('throttle', 2)
355 224 self.exp_code = kwargs.get('exp_code', None)
225 self.plot_server = kwargs.get('plot_server', False)
226 self.sender_period = kwargs.get('sender_period', 2)
356 227 self.__throttle_plot = apply_throttle(self.throttle)
357 228 self.data = PlotterData(
358 229 self.CODE, self.throttle, self.exp_code, self.buffering)
359 230
231 if self.plot_server:
232 if not self.plot_server.startswith('tcp://'):
233 self.plot_server = 'tcp://{}'.format(self.plot_server)
234 log.success(
235 'Sending to server: {}'.format(self.plot_server),
236 self.name
237 )
238
360 239 def __setup_plot(self):
361 240 '''
362 241 Common setup for all figures, here figures and axes are created
@@ -538,20 +417,6 class Plot(Operation):
538 417 ax.figure.add_axes(nax)
539 418 return nax
540 419
541 def setup(self):
542 '''
543 This method should be implemented in the child class, the following
544 attributes should be set:
545
546 self.nrows: number of rows
547 self.ncols: number of cols
548 self.nplots: number of plots (channels or pairs)
549 self.ylabel: label for Y axes
550 self.titles: list of axes title
551
552 '''
553 raise NotImplementedError
554
555 420 def fill_gaps(self, x_buffer, y_buffer, z_buffer):
556 421 '''
557 422 Create a masked array for missing data
@@ -722,14 +587,14 class Plot(Operation):
722 587 Main function to plot, format and save figures
723 588 '''
724 589
725 #try:
590 try:
726 591 self.plot()
727 592 self.format()
728 #except Exception as e:
729 # log.warning('{} Plot could not be updated... check data'.format(
730 # self.CODE), self.name)
731 # log.error(str(e), '')
732 # return
593 except Exception as e:
594 log.warning('{} Plot could not be updated... check data'.format(
595 self.CODE), self.name)
596 log.error(str(e), '')
597 return
733 598
734 599 for n, fig in enumerate(self.figures):
735 600 if self.nrows == 0 or self.nplots == 0:
@@ -744,6 +609,24 class Plot(Operation):
744 609 fig.canvas.draw()
745 610
746 611 if self.save:
612 self.save_figure(n)
613
614 if self.plot_server:
615 self.send_to_server()
616 # t = Thread(target=self.send_to_server)
617 # t.start()
618
619 def save_figure(self, n):
620 '''
621 '''
622
623 if self.save_counter < self.save_period:
624 self.save_counter += 1
625 return
626
627 self.save_counter = 1
628
629 fig = self.figures[n]
747 630
748 631 if self.save_labels:
749 632 labels = self.save_labels
@@ -761,7 +644,8 class Plot(Operation):
761 644 self.CODE,
762 645 label,
763 646 self.getDateTime(self.data.max_time).strftime(
764 '%Y%m%d_%H%M%S'),
647 '%Y%m%d_%H%M%S'
648 ),
765 649 )
766 650 )
767 651 log.log('Saving figure: {}'.format(figname), self.name)
@@ -769,6 +653,75 class Plot(Operation):
769 653 os.makedirs(os.path.dirname(figname))
770 654 fig.savefig(figname)
771 655
656 if self.realtime:
657 figname = os.path.join(
658 self.save,
659 '{}{}_{}.png'.format(
660 self.CODE,
661 label,
662 self.getDateTime(self.data.min_time).strftime(
663 '%Y%m%d'
664 ),
665 )
666 )
667 fig.savefig(figname)
668
669 def send_to_server(self):
670 '''
671 '''
672
673 if self.sender_counter < self.sender_period:
674 self.sender_counter += 1
675
676 self.sender_counter = 1
677
678 retries = 2
679 while True:
680 self.socket.send_string(self.data.jsonify())
681 socks = dict(self.poll.poll(5000))
682 if socks.get(self.socket) == zmq.POLLIN:
683 reply = self.socket.recv_string()
684 if reply == 'ok':
685 log.log("Response from server ok", self.name)
686 break
687 else:
688 log.warning(
689 "Malformed reply from server: {}".format(reply), self.name)
690
691 else:
692 log.warning(
693 "No response from server, retrying...", self.name)
694 self.socket.setsockopt(zmq.LINGER, 0)
695 self.socket.close()
696 self.poll.unregister(self.socket)
697 retries -= 1
698 if retries == 0:
699 log.error(
700 "Server seems to be offline, abandoning", self.name)
701 self.socket = self.context.socket(zmq.REQ)
702 self.socket.connect(self.plot_server)
703 self.poll.register(self.socket, zmq.POLLIN)
704 time.sleep(1)
705 break
706 self.socket = self.context.socket(zmq.REQ)
707 self.socket.connect(self.plot_server)
708 self.poll.register(self.socket, zmq.POLLIN)
709 time.sleep(0.5)
710
711 def setup(self):
712 '''
713 This method should be implemented in the child class, the following
714 attributes should be set:
715
716 self.nrows: number of rows
717 self.ncols: number of cols
718 self.nplots: number of plots (channels or pairs)
719 self.ylabel: label for Y axes
720 self.titles: list of axes title
721
722 '''
723 raise NotImplementedError
724
772 725 def plot(self):
773 726 '''
774 727 Must be defined in the child class
@@ -786,6 +739,12 class Plot(Operation):
786 739 self.__setup(**kwargs)
787 740 self.data.setup()
788 741 self.isConfig = True
742 if self.plot_server:
743 self.context = zmq.Context()
744 self.socket = self.context.socket(zmq.REQ)
745 self.socket.connect(self.plot_server)
746 self.poll = zmq.Poller()
747 self.poll.register(self.socket, zmq.POLLIN)
789 748
790 749 if dataOut.type == 'Parameters':
791 750 tm = dataOut.utctimeInit
@@ -12,11 +12,11 Based on:
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14
15 import os
15 16 import inspect
16 17 import zmq
17 18 import time
18 19 import pickle
19 import os
20 20 from multiprocessing import Process
21 21 from zmq.utils.monitor import recv_monitor_message
22 22
General Comments 0
You need to be logged in to leave comments. Login now