##// END OF EJS Templates
Allow to send data to new realtime app
Juan C. Espinoza -
r1212:bae4ef96bf63
parent child
Show More
@@ -1,824 +1,783
1 1
2 2 import os
3 3 import sys
4 4 import zmq
5 5 import time
6 import numpy
6 7 import datetime
7 8 from functools import wraps
8 import numpy
9 from threading import Thread
9 10 import matplotlib
10 11
11 12 if 'BACKEND' in os.environ:
12 13 matplotlib.use(os.environ['BACKEND'])
13 14 elif 'linux' in sys.platform:
14 15 matplotlib.use("TkAgg")
15 16 elif 'darwin' in sys.platform:
16 matplotlib.use('TkAgg')
17 matplotlib.use('WxAgg')
17 18 else:
18 19 from schainpy.utils import log
19 20 log.warning('Using default Backend="Agg"', 'INFO')
20 21 matplotlib.use('Agg')
21 22
22 23 import matplotlib.pyplot as plt
23 24 from matplotlib.patches import Polygon
24 25 from mpl_toolkits.axes_grid1 import make_axes_locatable
25 26 from matplotlib.ticker import FuncFormatter, LinearLocator, MultipleLocator
26 27
27 28 from schainpy.model.data.jrodata import PlotterData
28 29 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
29 30 from schainpy.utils import log
30 31
31 32 jet_values = matplotlib.pyplot.get_cmap('jet', 100)(numpy.arange(100))[10:90]
32 33 blu_values = matplotlib.pyplot.get_cmap(
33 34 'seismic_r', 20)(numpy.arange(20))[10:15]
34 35 ncmap = matplotlib.colors.LinearSegmentedColormap.from_list(
35 36 'jro', numpy.vstack((blu_values, jet_values)))
36 37 matplotlib.pyplot.register_cmap(cmap=ncmap)
37 38
38 39 CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis',
39 40 'plasma', 'inferno', 'Greys', 'seismic', 'bwr', 'coolwarm')]
40 41
41 42 EARTH_RADIUS = 6.3710e3
42 43
43
44 44 def ll2xy(lat1, lon1, lat2, lon2):
45 45
46 46 p = 0.017453292519943295
47 47 a = 0.5 - numpy.cos((lat2 - lat1) * p)/2 + numpy.cos(lat1 * p) * \
48 48 numpy.cos(lat2 * p) * (1 - numpy.cos((lon2 - lon1) * p)) / 2
49 49 r = 12742 * numpy.arcsin(numpy.sqrt(a))
50 50 theta = numpy.arctan2(numpy.sin((lon2-lon1)*p)*numpy.cos(lat2*p), numpy.cos(lat1*p)
51 51 * numpy.sin(lat2*p)-numpy.sin(lat1*p)*numpy.cos(lat2*p)*numpy.cos((lon2-lon1)*p))
52 52 theta = -theta + numpy.pi/2
53 53 return r*numpy.cos(theta), r*numpy.sin(theta)
54 54
55 55
56 56 def km2deg(km):
57 57 '''
58 58 Convert distance in km to degrees
59 59 '''
60 60
61 61 return numpy.rad2deg(km/EARTH_RADIUS)
62 62
63 63
64 64 def figpause(interval):
65 65 backend = plt.rcParams['backend']
66 66 if backend in matplotlib.rcsetup.interactive_bk:
67 67 figManager = matplotlib._pylab_helpers.Gcf.get_active()
68 68 if figManager is not None:
69 69 canvas = figManager.canvas
70 70 if canvas.figure.stale:
71 71 canvas.draw()
72 72 try:
73 73 canvas.start_event_loop(interval)
74 74 except:
75 75 pass
76 76 return
77 77
78 78
79 79 def popup(message):
80 80 '''
81 81 '''
82 82
83 83 fig = plt.figure(figsize=(12, 8), facecolor='r')
84 84 text = '\n'.join([s.strip() for s in message.split(':')])
85 85 fig.text(0.01, 0.5, text, ha='left', va='center',
86 86 size='20', weight='heavy', color='w')
87 87 fig.show()
88 88 figpause(1000)
89 89
90 90
91 91 class Throttle(object):
92 92 '''
93 93 Decorator that prevents a function from being called more than once every
94 94 time period.
95 95 To create a function that cannot be called more than once a minute, but
96 96 will sleep until it can be called:
97 97 @Throttle(minutes=1)
98 98 def foo():
99 99 pass
100 100
101 101 for i in range(10):
102 102 foo()
103 103 print "This function has run %s times." % i
104 104 '''
105 105
106 106 def __init__(self, seconds=0, minutes=0, hours=0):
107 107 self.throttle_period = datetime.timedelta(
108 108 seconds=seconds, minutes=minutes, hours=hours
109 109 )
110 110
111 111 self.time_of_last_call = datetime.datetime.min
112 112
113 113 def __call__(self, fn):
114 114 @wraps(fn)
115 115 def wrapper(*args, **kwargs):
116 116 coerce = kwargs.pop('coerce', None)
117 117 if coerce:
118 118 self.time_of_last_call = datetime.datetime.now()
119 119 return fn(*args, **kwargs)
120 120 else:
121 121 now = datetime.datetime.now()
122 122 time_since_last_call = now - self.time_of_last_call
123 123 time_left = self.throttle_period - time_since_last_call
124 124
125 125 if time_left > datetime.timedelta(seconds=0):
126 126 return
127 127
128 128 self.time_of_last_call = datetime.datetime.now()
129 129 return fn(*args, **kwargs)
130 130
131 131 return wrapper
132 132
133 133 def apply_throttle(value):
134 134
135 135 @Throttle(seconds=value)
136 136 def fnThrottled(fn):
137 137 fn()
138 138
139 139 return fnThrottled
140 140
141 @MPDecorator
142 class Plotter(ProcessingUnit):
143 '''
144 Proccessing unit to handle plot operations
145 '''
146
147 def __init__(self):
148
149 ProcessingUnit.__init__(self)
150
151 def setup(self, **kwargs):
152
153 self.connections = 0
154 self.web_address = kwargs.get('web_server', False)
155 self.realtime = kwargs.get('realtime', False)
156 self.localtime = kwargs.get('localtime', True)
157 self.buffering = kwargs.get('buffering', True)
158 self.throttle = kwargs.get('throttle', 2)
159 self.exp_code = kwargs.get('exp_code', None)
160 self.set_ready = apply_throttle(self.throttle)
161 self.dates = []
162 self.data = PlotterData(
163 self.plots, self.throttle, self.exp_code, self.buffering)
164 self.isConfig = True
165
166 def ready(self):
167 '''
168 Set dataOut ready
169 '''
170
171 self.data.ready = True
172 self.dataOut.data_plt = self.data
173
174 def run(self, realtime=True, localtime=True, buffering=True,
175 throttle=2, exp_code=None, web_server=None):
176
177 if not self.isConfig:
178 self.setup(realtime=realtime, localtime=localtime,
179 buffering=buffering, throttle=throttle, exp_code=exp_code,
180 web_server=web_server)
181
182 if self.web_address:
183 log.success(
184 'Sending to web: {}'.format(self.web_address),
185 self.name
186 )
187 self.context = zmq.Context()
188 self.sender_web = self.context.socket(zmq.REQ)
189 self.sender_web.connect(self.web_address)
190 self.poll = zmq.Poller()
191 self.poll.register(self.sender_web, zmq.POLLIN)
192 time.sleep(1)
193
194 # t = Thread(target=self.event_monitor, args=(monitor,))
195 # t.start()
196
197 self.dataOut = self.dataIn
198 self.data.ready = False
199
200 if self.dataOut.flagNoData:
201 coerce = True
202 else:
203 coerce = False
204
205 if self.dataOut.type == 'Parameters':
206 tm = self.dataOut.utctimeInit
207 else:
208 tm = self.dataOut.utctime
209 if self.dataOut.useLocalTime:
210 if not self.localtime:
211 tm += time.timezone
212 dt = datetime.datetime.fromtimestamp(tm).date()
213 else:
214 if self.localtime:
215 tm -= time.timezone
216 dt = datetime.datetime.utcfromtimestamp(tm).date()
217 if dt not in self.dates:
218 if self.data:
219 self.ready()
220 self.data.setup()
221 self.dates.append(dt)
222
223 self.data.update(self.dataOut, tm)
224
225 if False: # TODO check when publishers ends
226 self.connections -= 1
227 if self.connections == 0 and dt in self.dates:
228 self.data.ended = True
229 self.ready()
230 time.sleep(1)
231 else:
232 if self.realtime:
233 self.ready()
234 if self.web_address:
235 retries = 5
236 while True:
237 self.sender_web.send(self.data.jsonify())
238 socks = dict(self.poll.poll(5000))
239 if socks.get(self.sender_web) == zmq.POLLIN:
240 reply = self.sender_web.recv_string()
241 if reply == 'ok':
242 log.log("Response from server ok", self.name)
243 break
244 else:
245 log.warning(
246 "Malformed reply from server: {}".format(reply), self.name)
247
248 else:
249 log.warning(
250 "No response from server, retrying...", self.name)
251 self.sender_web.setsockopt(zmq.LINGER, 0)
252 self.sender_web.close()
253 self.poll.unregister(self.sender_web)
254 retries -= 1
255 if retries == 0:
256 log.error(
257 "Server seems to be offline, abandoning", self.name)
258 self.sender_web = self.context.socket(zmq.REQ)
259 self.sender_web.connect(self.web_address)
260 self.poll.register(self.sender_web, zmq.POLLIN)
261 time.sleep(1)
262 break
263 self.sender_web = self.context.socket(zmq.REQ)
264 self.sender_web.connect(self.web_address)
265 self.poll.register(self.sender_web, zmq.POLLIN)
266 time.sleep(1)
267 else:
268 self.set_ready(self.ready, coerce=coerce)
269
270 return
271
272 def close(self):
273 pass
274
275 141
276 142 @MPDecorator
277 143 class Plot(Operation):
278 144 '''
279 145 Base class for Schain plotting operations
280 146 '''
281 147
282 148 CODE = 'Figure'
283 colormap = 'jro'
149 colormap = 'jet'
284 150 bgcolor = 'white'
285 151 __missing = 1E30
286 152
287 153 __attrs__ = ['show', 'save', 'xmin', 'xmax', 'ymin', 'ymax', 'zmin', 'zmax',
288 154 'zlimits', 'xlabel', 'ylabel', 'xaxis', 'cb_label', 'title',
289 155 'colorbar', 'bgcolor', 'width', 'height', 'localtime', 'oneFigure',
290 156 'showprofile', 'decimation', 'pause']
291 157
292 158 def __init__(self):
293 159
294 160 Operation.__init__(self)
295 161 self.isConfig = False
296 162 self.isPlotConfig = False
163 self.save_counter = 1
164 self.sender_counter = 1
297 165
298 166 def __fmtTime(self, x, pos):
299 167 '''
300 168 '''
301 169
302 170 return '{}'.format(self.getDateTime(x).strftime('%H:%M'))
303 171
304 172 def __setup(self, **kwargs):
305 173 '''
306 174 Initialize variables
307 175 '''
308 176
309 177 self.figures = []
310 178 self.axes = []
311 179 self.cb_axes = []
312 180 self.localtime = kwargs.pop('localtime', True)
313 181 self.show = kwargs.get('show', True)
314 182 self.save = kwargs.get('save', False)
183 self.save_period = kwargs.get('save_period', 2)
315 184 self.ftp = kwargs.get('ftp', False)
316 185 self.colormap = kwargs.get('colormap', self.colormap)
317 186 self.colormap_coh = kwargs.get('colormap_coh', 'jet')
318 187 self.colormap_phase = kwargs.get('colormap_phase', 'RdBu_r')
319 188 self.colormaps = kwargs.get('colormaps', None)
320 189 self.bgcolor = kwargs.get('bgcolor', self.bgcolor)
321 190 self.showprofile = kwargs.get('showprofile', False)
322 191 self.title = kwargs.get('wintitle', self.CODE.upper())
323 192 self.cb_label = kwargs.get('cb_label', None)
324 193 self.cb_labels = kwargs.get('cb_labels', None)
325 194 self.labels = kwargs.get('labels', None)
326 195 self.xaxis = kwargs.get('xaxis', 'frequency')
327 196 self.zmin = kwargs.get('zmin', None)
328 197 self.zmax = kwargs.get('zmax', None)
329 198 self.zlimits = kwargs.get('zlimits', None)
330 199 self.xmin = kwargs.get('xmin', None)
331 200 self.xmax = kwargs.get('xmax', None)
332 201 self.xrange = kwargs.get('xrange', 12)
333 202 self.xscale = kwargs.get('xscale', None)
334 203 self.ymin = kwargs.get('ymin', None)
335 204 self.ymax = kwargs.get('ymax', None)
336 205 self.yscale = kwargs.get('yscale', None)
337 206 self.xlabel = kwargs.get('xlabel', None)
338 207 self.decimation = kwargs.get('decimation', None)
339 208 self.showSNR = kwargs.get('showSNR', False)
340 209 self.oneFigure = kwargs.get('oneFigure', True)
341 210 self.width = kwargs.get('width', None)
342 211 self.height = kwargs.get('height', None)
343 212 self.colorbar = kwargs.get('colorbar', True)
344 213 self.factors = kwargs.get('factors', [1, 1, 1, 1, 1, 1, 1, 1])
345 214 self.channels = kwargs.get('channels', None)
346 215 self.titles = kwargs.get('titles', [])
347 216 self.polar = False
348 217 self.type = kwargs.get('type', 'iq')
349 218 self.grid = kwargs.get('grid', False)
350 219 self.pause = kwargs.get('pause', False)
351 220 self.save_labels = kwargs.get('save_labels', None)
352 221 self.realtime = kwargs.get('realtime', True)
353 222 self.buffering = kwargs.get('buffering', True)
354 223 self.throttle = kwargs.get('throttle', 2)
355 224 self.exp_code = kwargs.get('exp_code', None)
225 self.plot_server = kwargs.get('plot_server', False)
226 self.sender_period = kwargs.get('sender_period', 2)
356 227 self.__throttle_plot = apply_throttle(self.throttle)
357 228 self.data = PlotterData(
358 229 self.CODE, self.throttle, self.exp_code, self.buffering)
230
231 if self.plot_server:
232 if not self.plot_server.startswith('tcp://'):
233 self.plot_server = 'tcp://{}'.format(self.plot_server)
234 log.success(
235 'Sending to server: {}'.format(self.plot_server),
236 self.name
237 )
359 238
360 239 def __setup_plot(self):
361 240 '''
362 241 Common setup for all figures, here figures and axes are created
363 242 '''
364 243
365 244 self.setup()
366 245
367 246 self.time_label = 'LT' if self.localtime else 'UTC'
368 247 if self.data.localtime:
369 248 self.getDateTime = datetime.datetime.fromtimestamp
370 249 else:
371 250 self.getDateTime = datetime.datetime.utcfromtimestamp
372 251
373 252 if self.width is None:
374 253 self.width = 8
375 254
376 255 self.figures = []
377 256 self.axes = []
378 257 self.cb_axes = []
379 258 self.pf_axes = []
380 259 self.cmaps = []
381 260
382 261 size = '15%' if self.ncols == 1 else '30%'
383 262 pad = '4%' if self.ncols == 1 else '8%'
384 263
385 264 if self.oneFigure:
386 265 if self.height is None:
387 266 self.height = 1.4 * self.nrows + 1
388 267 fig = plt.figure(figsize=(self.width, self.height),
389 268 edgecolor='k',
390 269 facecolor='w')
391 270 self.figures.append(fig)
392 271 for n in range(self.nplots):
393 272 ax = fig.add_subplot(self.nrows, self.ncols,
394 273 n + 1, polar=self.polar)
395 274 ax.tick_params(labelsize=8)
396 275 ax.firsttime = True
397 276 ax.index = 0
398 277 ax.press = None
399 278 self.axes.append(ax)
400 279 if self.showprofile:
401 280 cax = self.__add_axes(ax, size=size, pad=pad)
402 281 cax.tick_params(labelsize=8)
403 282 self.pf_axes.append(cax)
404 283 else:
405 284 if self.height is None:
406 285 self.height = 3
407 286 for n in range(self.nplots):
408 287 fig = plt.figure(figsize=(self.width, self.height),
409 288 edgecolor='k',
410 289 facecolor='w')
411 290 ax = fig.add_subplot(1, 1, 1, polar=self.polar)
412 291 ax.tick_params(labelsize=8)
413 292 ax.firsttime = True
414 293 ax.index = 0
415 294 ax.press = None
416 295 self.figures.append(fig)
417 296 self.axes.append(ax)
418 297 if self.showprofile:
419 298 cax = self.__add_axes(ax, size=size, pad=pad)
420 299 cax.tick_params(labelsize=8)
421 300 self.pf_axes.append(cax)
422 301
423 302 for n in range(self.nrows):
424 303 if self.colormaps is not None:
425 304 cmap = plt.get_cmap(self.colormaps[n])
426 305 else:
427 306 cmap = plt.get_cmap(self.colormap)
428 307 cmap.set_bad(self.bgcolor, 1.)
429 308 self.cmaps.append(cmap)
430 309
431 310 for fig in self.figures:
432 311 fig.canvas.mpl_connect('key_press_event', self.OnKeyPress)
433 312 fig.canvas.mpl_connect('scroll_event', self.OnBtnScroll)
434 313 fig.canvas.mpl_connect('button_press_event', self.onBtnPress)
435 314 fig.canvas.mpl_connect('motion_notify_event', self.onMotion)
436 315 fig.canvas.mpl_connect('button_release_event', self.onBtnRelease)
437 316 if self.show:
438 317 fig.show()
439 318
440 319 def OnKeyPress(self, event):
441 320 '''
442 321 Event for pressing keys (up, down) change colormap
443 322 '''
444 323 ax = event.inaxes
445 324 if ax in self.axes:
446 325 if event.key == 'down':
447 326 ax.index += 1
448 327 elif event.key == 'up':
449 328 ax.index -= 1
450 329 if ax.index < 0:
451 330 ax.index = len(CMAPS) - 1
452 331 elif ax.index == len(CMAPS):
453 332 ax.index = 0
454 333 cmap = CMAPS[ax.index]
455 334 ax.cbar.set_cmap(cmap)
456 335 ax.cbar.draw_all()
457 336 ax.plt.set_cmap(cmap)
458 337 ax.cbar.patch.figure.canvas.draw()
459 338 self.colormap = cmap.name
460 339
461 340 def OnBtnScroll(self, event):
462 341 '''
463 342 Event for scrolling, scale figure
464 343 '''
465 344 cb_ax = event.inaxes
466 345 if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]:
467 346 ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0]
468 347 pt = ax.cbar.ax.bbox.get_points()[:, 1]
469 348 nrm = ax.cbar.norm
470 349 vmin, vmax, p0, p1, pS = (
471 350 nrm.vmin, nrm.vmax, pt[0], pt[1], event.y)
472 351 scale = 2 if event.step == 1 else 0.5
473 352 point = vmin + (vmax - vmin) / (p1 - p0) * (pS - p0)
474 353 ax.cbar.norm.vmin = point - scale * (point - vmin)
475 354 ax.cbar.norm.vmax = point - scale * (point - vmax)
476 355 ax.plt.set_norm(ax.cbar.norm)
477 356 ax.cbar.draw_all()
478 357 ax.cbar.patch.figure.canvas.draw()
479 358
480 359 def onBtnPress(self, event):
481 360 '''
482 361 Event for mouse button press
483 362 '''
484 363 cb_ax = event.inaxes
485 364 if cb_ax is None:
486 365 return
487 366
488 367 if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]:
489 368 cb_ax.press = event.x, event.y
490 369 else:
491 370 cb_ax.press = None
492 371
493 372 def onMotion(self, event):
494 373 '''
495 374 Event for move inside colorbar
496 375 '''
497 376 cb_ax = event.inaxes
498 377 if cb_ax is None:
499 378 return
500 379 if cb_ax not in [ax.cbar.ax for ax in self.axes if ax.cbar]:
501 380 return
502 381 if cb_ax.press is None:
503 382 return
504 383
505 384 ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0]
506 385 xprev, yprev = cb_ax.press
507 386 dx = event.x - xprev
508 387 dy = event.y - yprev
509 388 cb_ax.press = event.x, event.y
510 389 scale = ax.cbar.norm.vmax - ax.cbar.norm.vmin
511 390 perc = 0.03
512 391
513 392 if event.button == 1:
514 393 ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy)
515 394 ax.cbar.norm.vmax -= (perc * scale) * numpy.sign(dy)
516 395 elif event.button == 3:
517 396 ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy)
518 397 ax.cbar.norm.vmax += (perc * scale) * numpy.sign(dy)
519 398
520 399 ax.cbar.draw_all()
521 400 ax.plt.set_norm(ax.cbar.norm)
522 401 ax.cbar.patch.figure.canvas.draw()
523 402
524 403 def onBtnRelease(self, event):
525 404 '''
526 405 Event for mouse button release
527 406 '''
528 407 cb_ax = event.inaxes
529 408 if cb_ax is not None:
530 409 cb_ax.press = None
531 410
532 411 def __add_axes(self, ax, size='30%', pad='8%'):
533 412 '''
534 413 Add new axes to the given figure
535 414 '''
536 415 divider = make_axes_locatable(ax)
537 416 nax = divider.new_horizontal(size=size, pad=pad)
538 417 ax.figure.add_axes(nax)
539 418 return nax
540 419
541 def setup(self):
542 '''
543 This method should be implemented in the child class, the following
544 attributes should be set:
545
546 self.nrows: number of rows
547 self.ncols: number of cols
548 self.nplots: number of plots (channels or pairs)
549 self.ylabel: label for Y axes
550 self.titles: list of axes title
551
552 '''
553 raise NotImplementedError
554
555 420 def fill_gaps(self, x_buffer, y_buffer, z_buffer):
556 421 '''
557 422 Create a masked array for missing data
558 423 '''
559 424 if x_buffer.shape[0] < 2:
560 425 return x_buffer, y_buffer, z_buffer
561 426
562 427 deltas = x_buffer[1:] - x_buffer[0:-1]
563 428 x_median = numpy.median(deltas)
564 429
565 430 index = numpy.where(deltas > 5 * x_median)
566 431
567 432 if len(index[0]) != 0:
568 433 z_buffer[::, index[0], ::] = self.__missing
569 434 z_buffer = numpy.ma.masked_inside(z_buffer,
570 435 0.99 * self.__missing,
571 436 1.01 * self.__missing)
572 437
573 438 return x_buffer, y_buffer, z_buffer
574 439
575 440 def decimate(self):
576 441
577 442 # dx = int(len(self.x)/self.__MAXNUMX) + 1
578 443 dy = int(len(self.y) / self.decimation) + 1
579 444
580 445 # x = self.x[::dx]
581 446 x = self.x
582 447 y = self.y[::dy]
583 448 z = self.z[::, ::, ::dy]
584 449
585 450 return x, y, z
586 451
587 452 def format(self):
588 453 '''
589 454 Set min and max values, labels, ticks and titles
590 455 '''
591 456
592 457 if self.xmin is None:
593 458 xmin = self.data.min_time
594 459 else:
595 460 if self.xaxis is 'time':
596 461 dt = self.getDateTime(self.data.min_time)
597 462 xmin = (dt.replace(hour=int(self.xmin), minute=0, second=0) -
598 463 datetime.datetime(1970, 1, 1)).total_seconds()
599 464 if self.data.localtime:
600 465 xmin += time.timezone
601 466 else:
602 467 xmin = self.xmin
603 468
604 469 if self.xmax is None:
605 470 xmax = xmin + self.xrange * 60 * 60
606 471 else:
607 472 if self.xaxis is 'time':
608 473 dt = self.getDateTime(self.data.max_time)
609 474 xmax = (dt.replace(hour=int(self.xmax), minute=59, second=59) -
610 475 datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=1)).total_seconds()
611 476 if self.data.localtime:
612 477 xmax += time.timezone
613 478 else:
614 479 xmax = self.xmax
615 480
616 481 ymin = self.ymin if self.ymin else numpy.nanmin(self.y)
617 482 ymax = self.ymax if self.ymax else numpy.nanmax(self.y)
618 483 #Y = numpy.array([1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000])
619 484
620 485 #i = 1 if numpy.where(
621 486 # abs(ymax-ymin) <= Y)[0][0] < 0 else numpy.where(abs(ymax-ymin) <= Y)[0][0]
622 487 #ystep = Y[i] / 10.
623 488 dig = int(numpy.log10(ymax))
624 489 if dig == 0:
625 490 digD = len(str(ymax)) - 2
626 491 ydec = ymax*(10**digD)
627 492
628 493 dig = int(numpy.log10(ydec))
629 494 ystep = ((ydec + (10**(dig)))//10**(dig))*(10**(dig))
630 495 ystep = ystep/5
631 496 ystep = ystep/(10**digD)
632 497
633 498 else:
634 499 ystep = ((ymax + (10**(dig)))//10**(dig))*(10**(dig))
635 500 ystep = ystep/5
636 501
637 502 if self.xaxis is not 'time':
638 503
639 504 dig = int(numpy.log10(xmax))
640 505
641 506 if dig <= 0:
642 507 digD = len(str(xmax)) - 2
643 508 xdec = xmax*(10**digD)
644 509
645 510 dig = int(numpy.log10(xdec))
646 511 xstep = ((xdec + (10**(dig)))//10**(dig))*(10**(dig))
647 512 xstep = xstep*0.5
648 513 xstep = xstep/(10**digD)
649 514
650 515 else:
651 516 xstep = ((xmax + (10**(dig)))//10**(dig))*(10**(dig))
652 517 xstep = xstep/5
653 518
654 519 for n, ax in enumerate(self.axes):
655 520 if ax.firsttime:
656 521 ax.set_facecolor(self.bgcolor)
657 522 ax.yaxis.set_major_locator(MultipleLocator(ystep))
658 523 if self.xscale:
659 524 ax.xaxis.set_major_formatter(FuncFormatter(
660 525 lambda x, pos: '{0:g}'.format(x*self.xscale)))
661 526 if self.xscale:
662 527 ax.yaxis.set_major_formatter(FuncFormatter(
663 528 lambda x, pos: '{0:g}'.format(x*self.yscale)))
664 529 if self.xaxis is 'time':
665 530 ax.xaxis.set_major_formatter(FuncFormatter(self.__fmtTime))
666 531 ax.xaxis.set_major_locator(LinearLocator(9))
667 532 else:
668 533 ax.xaxis.set_major_locator(MultipleLocator(xstep))
669 534 if self.xlabel is not None:
670 535 ax.set_xlabel(self.xlabel)
671 536 ax.set_ylabel(self.ylabel)
672 537 ax.firsttime = False
673 538 if self.showprofile:
674 539 self.pf_axes[n].set_ylim(ymin, ymax)
675 540 self.pf_axes[n].set_xlim(self.zmin, self.zmax)
676 541 self.pf_axes[n].set_xlabel('dB')
677 542 self.pf_axes[n].grid(b=True, axis='x')
678 543 [tick.set_visible(False)
679 544 for tick in self.pf_axes[n].get_yticklabels()]
680 545 if self.colorbar:
681 546 ax.cbar = plt.colorbar(
682 547 ax.plt, ax=ax, fraction=0.05, pad=0.02, aspect=10)
683 548 ax.cbar.ax.tick_params(labelsize=8)
684 549 ax.cbar.ax.press = None
685 550 if self.cb_label:
686 551 ax.cbar.set_label(self.cb_label, size=8)
687 552 elif self.cb_labels:
688 553 ax.cbar.set_label(self.cb_labels[n], size=8)
689 554 else:
690 555 ax.cbar = None
691 556 if self.grid:
692 557 ax.grid(True)
693 558
694 559 if not self.polar:
695 560 ax.set_xlim(xmin, xmax)
696 561 ax.set_ylim(ymin, ymax)
697 562 ax.set_title('{} {} {}'.format(
698 563 self.titles[n],
699 564 self.getDateTime(self.data.max_time).strftime(
700 565 '%H:%M:%S'),
701 566 self.time_label),
702 567 size=8)
703 568 else:
704 569 ax.set_title('{}'.format(self.titles[n]), size=8)
705 570 ax.set_ylim(0, 90)
706 571 ax.set_yticks(numpy.arange(0, 90, 20))
707 572 ax.yaxis.labelpad = 40
708 573
709 574 def clear_figures(self):
710 575 '''
711 576 Reset axes for redraw plots
712 577 '''
713 578
714 579 for ax in self.axes:
715 580 ax.clear()
716 581 ax.firsttime = True
717 582 if ax.cbar:
718 583 ax.cbar.remove()
719 584
720 585 def __plot(self):
721 586 '''
722 587 Main function to plot, format and save figures
723 588 '''
724 589
725 #try:
726 self.plot()
727 self.format()
728 #except Exception as e:
729 # log.warning('{} Plot could not be updated... check data'.format(
730 # self.CODE), self.name)
731 # log.error(str(e), '')
732 # return
590 try:
591 self.plot()
592 self.format()
593 except Exception as e:
594 log.warning('{} Plot could not be updated... check data'.format(
595 self.CODE), self.name)
596 log.error(str(e), '')
597 return
733 598
734 599 for n, fig in enumerate(self.figures):
735 600 if self.nrows == 0 or self.nplots == 0:
736 601 log.warning('No data', self.name)
737 602 fig.text(0.5, 0.5, 'No Data', fontsize='large', ha='center')
738 603 fig.canvas.manager.set_window_title(self.CODE)
739 604 continue
740 605
741 606 fig.tight_layout()
742 607 fig.canvas.manager.set_window_title('{} - {}'.format(self.title,
743 608 self.getDateTime(self.data.max_time).strftime('%Y/%m/%d')))
744 609 fig.canvas.draw()
745 610
746 611 if self.save:
612 self.save_figure(n)
613
614 if self.plot_server:
615 self.send_to_server()
616 # t = Thread(target=self.send_to_server)
617 # t.start()
747 618
748 if self.save_labels:
749 labels = self.save_labels
750 else:
751 labels = list(range(self.nrows))
619 def save_figure(self, n):
620 '''
621 '''
752 622
753 if self.oneFigure:
754 label = ''
755 else:
756 label = '-{}'.format(labels[n])
757 figname = os.path.join(
758 self.save,
623 if self.save_counter < self.save_period:
624 self.save_counter += 1
625 return
626
627 self.save_counter = 1
628
629 fig = self.figures[n]
630
631 if self.save_labels:
632 labels = self.save_labels
633 else:
634 labels = list(range(self.nrows))
635
636 if self.oneFigure:
637 label = ''
638 else:
639 label = '-{}'.format(labels[n])
640 figname = os.path.join(
641 self.save,
642 self.CODE,
643 '{}{}_{}.png'.format(
644 self.CODE,
645 label,
646 self.getDateTime(self.data.max_time).strftime(
647 '%Y%m%d_%H%M%S'
648 ),
649 )
650 )
651 log.log('Saving figure: {}'.format(figname), self.name)
652 if not os.path.isdir(os.path.dirname(figname)):
653 os.makedirs(os.path.dirname(figname))
654 fig.savefig(figname)
655
656 if self.realtime:
657 figname = os.path.join(
658 self.save,
659 '{}{}_{}.png'.format(
759 660 self.CODE,
760 '{}{}_{}.png'.format(
761 self.CODE,
762 label,
763 self.getDateTime(self.data.max_time).strftime(
764 '%Y%m%d_%H%M%S'),
661 label,
662 self.getDateTime(self.data.min_time).strftime(
663 '%Y%m%d'
664 ),
765 665 )
766 666 )
767 log.log('Saving figure: {}'.format(figname), self.name)
768 if not os.path.isdir(os.path.dirname(figname)):
769 os.makedirs(os.path.dirname(figname))
770 fig.savefig(figname)
667 fig.savefig(figname)
668
669 def send_to_server(self):
670 '''
671 '''
672
673 if self.sender_counter < self.sender_period:
674 self.sender_counter += 1
675
676 self.sender_counter = 1
677
678 retries = 2
679 while True:
680 self.socket.send_string(self.data.jsonify())
681 socks = dict(self.poll.poll(5000))
682 if socks.get(self.socket) == zmq.POLLIN:
683 reply = self.socket.recv_string()
684 if reply == 'ok':
685 log.log("Response from server ok", self.name)
686 break
687 else:
688 log.warning(
689 "Malformed reply from server: {}".format(reply), self.name)
690
691 else:
692 log.warning(
693 "No response from server, retrying...", self.name)
694 self.socket.setsockopt(zmq.LINGER, 0)
695 self.socket.close()
696 self.poll.unregister(self.socket)
697 retries -= 1
698 if retries == 0:
699 log.error(
700 "Server seems to be offline, abandoning", self.name)
701 self.socket = self.context.socket(zmq.REQ)
702 self.socket.connect(self.plot_server)
703 self.poll.register(self.socket, zmq.POLLIN)
704 time.sleep(1)
705 break
706 self.socket = self.context.socket(zmq.REQ)
707 self.socket.connect(self.plot_server)
708 self.poll.register(self.socket, zmq.POLLIN)
709 time.sleep(0.5)
710
711 def setup(self):
712 '''
713 This method should be implemented in the child class, the following
714 attributes should be set:
715
716 self.nrows: number of rows
717 self.ncols: number of cols
718 self.nplots: number of plots (channels or pairs)
719 self.ylabel: label for Y axes
720 self.titles: list of axes title
721
722 '''
723 raise NotImplementedError
771 724
772 725 def plot(self):
773 726 '''
774 727 Must be defined in the child class
775 728 '''
776 729 raise NotImplementedError
777 730
778 731 def run(self, dataOut, **kwargs):
779 732
780 733 if dataOut.error:
781 734 coerce = True
782 735 else:
783 736 coerce = False
784 737
785 738 if self.isConfig is False:
786 739 self.__setup(**kwargs)
787 740 self.data.setup()
788 741 self.isConfig = True
742 if self.plot_server:
743 self.context = zmq.Context()
744 self.socket = self.context.socket(zmq.REQ)
745 self.socket.connect(self.plot_server)
746 self.poll = zmq.Poller()
747 self.poll.register(self.socket, zmq.POLLIN)
789 748
790 749 if dataOut.type == 'Parameters':
791 750 tm = dataOut.utctimeInit
792 751 else:
793 752 tm = dataOut.utctime
794 753
795 754 if dataOut.useLocalTime:
796 755 if not self.localtime:
797 756 tm += time.timezone
798 757 else:
799 758 if self.localtime:
800 759 tm -= time.timezone
801 760
802 761 if self.data and (tm - self.data.min_time) >= self.xrange*60*60:
803 762 self.__plot()
804 763 self.data.setup()
805 764 self.clear_figures()
806 765
807 766 self.data.update(dataOut, tm)
808 767
809 768 if self.isPlotConfig is False:
810 769 self.__setup_plot()
811 770 self.isPlotConfig = True
812 771
813 772 if self.realtime:
814 773 self.__plot()
815 774 else:
816 775 self.__throttle_plot(self.__plot, coerce=coerce)
817 776
818 777 figpause(0.001)
819 778
820 779 def close(self):
821 780
822 781 if self.data and self.pause:
823 782 figpause(10)
824 783
@@ -1,390 +1,390
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14
15 import os
15 16 import inspect
16 17 import zmq
17 18 import time
18 19 import pickle
19 import os
20 20 from multiprocessing import Process
21 21 from zmq.utils.monitor import recv_monitor_message
22 22
23 23 from schainpy.utils import log
24 24
25 25
26 26 class ProcessingUnit(object):
27 27
28 28 """
29 29 Update - Jan 2018 - MULTIPROCESSING
30 30 All the "call" methods present in the previous base were removed.
31 31 The majority of operations are independant processes, thus
32 32 the decorator is in charge of communicate the operation processes
33 33 with the proccessing unit via IPC.
34 34
35 35 The constructor does not receive any argument. The remaining methods
36 36 are related with the operations to execute.
37 37
38 38
39 39 """
40 40
41 41 def __init__(self):
42 42
43 43 self.dataIn = None
44 44 self.dataOut = None
45 45 self.isConfig = False
46 46 self.operations = []
47 47 self.plots = []
48 48
49 49 def getAllowedArgs(self):
50 50 if hasattr(self, '__attrs__'):
51 51 return self.__attrs__
52 52 else:
53 53 return inspect.getargspec(self.run).args
54 54
55 55 def addOperation(self, conf, operation):
56 56 """
57 57 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
58 58 posses the id of the operation process (IPC purposes)
59 59
60 60 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
61 61 identificador asociado a este objeto.
62 62
63 63 Input:
64 64
65 65 object : objeto de la clase "Operation"
66 66
67 67 Return:
68 68
69 69 objId : identificador del objeto, necesario para comunicar con master(procUnit)
70 70 """
71 71
72 72 self.operations.append(
73 73 (operation, conf.type, conf.id, conf.getKwargs()))
74 74
75 75 if 'plot' in self.name.lower():
76 76 self.plots.append(operation.CODE)
77 77
78 78 def getOperationObj(self, objId):
79 79
80 80 if objId not in list(self.operations.keys()):
81 81 return None
82 82
83 83 return self.operations[objId]
84 84
85 85 def operation(self, **kwargs):
86 86 """
87 87 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
88 88 atributos del objeto dataOut
89 89
90 90 Input:
91 91
92 92 **kwargs : Diccionario de argumentos de la funcion a ejecutar
93 93 """
94 94
95 95 raise NotImplementedError
96 96
97 97 def setup(self):
98 98
99 99 raise NotImplementedError
100 100
101 101 def run(self):
102 102
103 103 raise NotImplementedError
104 104
105 105 def close(self):
106 106
107 107 return
108 108
109 109
110 110 class Operation(object):
111 111
112 112 """
113 113 Update - Jan 2018 - MULTIPROCESSING
114 114
115 115 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
116 116 The constructor doe snot receive any argument, neither the baseclass.
117 117
118 118
119 119 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
120 120 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
121 121 acumulacion dentro de esta clase
122 122
123 123 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
124 124
125 125 """
126 126
127 127 def __init__(self):
128 128
129 129 self.id = None
130 130 self.isConfig = False
131 131
132 132 if not hasattr(self, 'name'):
133 133 self.name = self.__class__.__name__
134 134
135 135 def getAllowedArgs(self):
136 136 if hasattr(self, '__attrs__'):
137 137 return self.__attrs__
138 138 else:
139 139 return inspect.getargspec(self.run).args
140 140
141 141 def setup(self):
142 142
143 143 self.isConfig = True
144 144
145 145 raise NotImplementedError
146 146
147 147 def run(self, dataIn, **kwargs):
148 148 """
149 149 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
150 150 atributos del objeto dataIn.
151 151
152 152 Input:
153 153
154 154 dataIn : objeto del tipo JROData
155 155
156 156 Return:
157 157
158 158 None
159 159
160 160 Affected:
161 161 __buffer : buffer de recepcion de datos.
162 162
163 163 """
164 164 if not self.isConfig:
165 165 self.setup(**kwargs)
166 166
167 167 raise NotImplementedError
168 168
169 169 def close(self):
170 170
171 171 return
172 172
173 173
174 174 def MPDecorator(BaseClass):
175 175 """
176 176 Multiprocessing class decorator
177 177
178 178 This function add multiprocessing features to a BaseClass. Also, it handle
179 179 the communication beetween processes (readers, procUnits and operations).
180 180 """
181 181
182 182 class MPClass(BaseClass, Process):
183 183
184 184 def __init__(self, *args, **kwargs):
185 185 super(MPClass, self).__init__()
186 186 Process.__init__(self)
187 187 self.operationKwargs = {}
188 188 self.args = args
189 189 self.kwargs = kwargs
190 190 self.sender = None
191 191 self.receiver = None
192 192 self.name = BaseClass.__name__
193 193 if 'plot' in self.name.lower() and not self.name.endswith('_'):
194 194 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
195 195 self.start_time = time.time()
196 196
197 197 if len(self.args) is 3:
198 198 self.typeProc = "ProcUnit"
199 199 self.id = args[0]
200 200 self.inputId = args[1]
201 201 self.project_id = args[2]
202 202 elif len(self.args) is 2:
203 203 self.id = args[0]
204 204 self.inputId = args[0]
205 205 self.project_id = args[1]
206 206 self.typeProc = "Operation"
207 207
208 208 def subscribe(self):
209 209 '''
210 210 This function create a socket to receive objects from the
211 211 topic `inputId`.
212 212 '''
213 213
214 214 c = zmq.Context()
215 215 self.receiver = c.socket(zmq.SUB)
216 216 self.receiver.connect(
217 217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
219 219
220 220 def listen(self):
221 221 '''
222 222 This function waits for objects and deserialize using pickle
223 223 '''
224 224
225 225 data = pickle.loads(self.receiver.recv_multipart()[1])
226 226
227 227 return data
228 228
229 229 def set_publisher(self):
230 230 '''
231 231 This function create a socket for publishing purposes.
232 232 '''
233 233
234 234 time.sleep(1)
235 235 c = zmq.Context()
236 236 self.sender = c.socket(zmq.PUB)
237 237 self.sender.connect(
238 238 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
239 239
240 240 def publish(self, data, id):
241 241 '''
242 242 This function publish an object, to a specific topic.
243 243 '''
244 244 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
245 245
246 246 def runReader(self):
247 247 '''
248 248 Run fuction for read units
249 249 '''
250 250 while True:
251 251
252 252 BaseClass.run(self, **self.kwargs)
253
253
254 254 for op, optype, opId, kwargs in self.operations:
255 255 if optype == 'self' and not self.dataOut.flagNoData:
256 256 op(**kwargs)
257 257 elif optype == 'other' and not self.dataOut.flagNoData:
258 258 self.dataOut = op.run(self.dataOut, **self.kwargs)
259 259 elif optype == 'external':
260 260 self.publish(self.dataOut, opId)
261 261
262 262 if self.dataOut.flagNoData and not self.dataOut.error:
263 263 continue
264 264
265 265 self.publish(self.dataOut, self.id)
266 266
267 267 if self.dataOut.error:
268 268 log.error(self.dataOut.error, self.name)
269 269 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
270 270 break
271 271
272 272 time.sleep(1)
273 273
274 274 def runProc(self):
275 275 '''
276 276 Run function for proccessing units
277 277 '''
278 278
279 279 while True:
280 280 self.dataIn = self.listen()
281 281
282 282 if self.dataIn.flagNoData and self.dataIn.error is None:
283 283 continue
284 284
285 285 BaseClass.run(self, **self.kwargs)
286 286
287 287 if self.dataIn.error:
288 288 self.dataOut.error = self.dataIn.error
289 289 self.dataOut.flagNoData = True
290 290
291 291 for op, optype, opId, kwargs in self.operations:
292 292 if optype == 'self' and not self.dataOut.flagNoData:
293 293 op(**kwargs)
294 294 elif optype == 'other' and not self.dataOut.flagNoData:
295 295 self.dataOut = op.run(self.dataOut, **kwargs)
296 296 elif optype == 'external' and not self.dataOut.flagNoData:
297 297 self.publish(self.dataOut, opId)
298 298
299 299 if not self.dataOut.flagNoData or self.dataOut.error:
300 300 self.publish(self.dataOut, self.id)
301 301 for op, optype, opId, kwargs in self.operations:
302 302 if optype == 'self' and self.dataOut.error:
303 303 op(**kwargs)
304 304 elif optype == 'other' and self.dataOut.error:
305 305 self.dataOut = op.run(self.dataOut, **kwargs)
306 306 elif optype == 'external' and self.dataOut.error:
307 307 self.publish(self.dataOut, opId)
308 308
309 309 if self.dataIn.error:
310 310 break
311 311
312 312 time.sleep(1)
313 313
314 314 def runOp(self):
315 315 '''
316 316 Run function for external operations (this operations just receive data
317 317 ex: plots, writers, publishers)
318 318 '''
319 319
320 320 while True:
321 321
322 322 dataOut = self.listen()
323 323
324 324 BaseClass.run(self, dataOut, **self.kwargs)
325 325
326 326 if dataOut.error:
327 327 break
328 328
329 329 time.sleep(1)
330 330
331 331 def run(self):
332 332 if self.typeProc is "ProcUnit":
333 333
334 334 if self.inputId is not None:
335 335
336 336 self.subscribe()
337 337
338 338 self.set_publisher()
339 339
340 340 if 'Reader' not in BaseClass.__name__:
341 341 self.runProc()
342 342 else:
343 343 self.runReader()
344 344
345 345 elif self.typeProc is "Operation":
346 346
347 347 self.subscribe()
348 348 self.runOp()
349 349
350 350 else:
351 351 raise ValueError("Unknown type")
352 352
353 353 self.close()
354 354
355 355 def event_monitor(self, monitor):
356 356
357 357 events = {}
358 358
359 359 for name in dir(zmq):
360 360 if name.startswith('EVENT_'):
361 361 value = getattr(zmq, name)
362 362 events[value] = name
363 363
364 364 while monitor.poll():
365 365 evt = recv_monitor_message(monitor)
366 366 if evt['event'] == 32:
367 367 self.connections += 1
368 368 if evt['event'] == 512:
369 369 pass
370 370
371 371 evt.update({'description': events[evt['event']]})
372 372
373 373 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
374 374 break
375 375 monitor.close()
376 376 print('event monitor thread done!')
377 377
378 378 def close(self):
379 379
380 380 BaseClass.close(self)
381 381
382 382 if self.sender:
383 383 self.sender.close()
384 384
385 385 if self.receiver:
386 386 self.receiver.close()
387 387
388 388 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
389 389
390 390 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now