##// END OF EJS Templates
Bug saving plots when throttle, add utcoffset to HDFReader
jespinoza -
r1363:1c8e307bd47c
parent child
Show More
@@ -1,691 +1,693
1 1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
2 2 # All rights reserved.
3 3 #
4 4 # Distributed under the terms of the BSD 3-clause license.
5 5 """Base class to create plot operations
6 6
7 7 """
8 8
9 9 import os
10 10 import sys
11 11 import zmq
12 12 import time
13 13 import numpy
14 14 import datetime
15 15 from collections import deque
16 16 from functools import wraps
17 17 from threading import Thread
18 18 import matplotlib
19 19
20 20 if 'BACKEND' in os.environ:
21 21 matplotlib.use(os.environ['BACKEND'])
22 22 elif 'linux' in sys.platform:
23 23 matplotlib.use("TkAgg")
24 24 elif 'darwin' in sys.platform:
25 25 matplotlib.use('MacOSX')
26 26 else:
27 27 from schainpy.utils import log
28 28 log.warning('Using default Backend="Agg"', 'INFO')
29 29 matplotlib.use('Agg')
30 30
31 31 import matplotlib.pyplot as plt
32 32 from matplotlib.patches import Polygon
33 33 from mpl_toolkits.axes_grid1 import make_axes_locatable
34 34 from matplotlib.ticker import FuncFormatter, LinearLocator, MultipleLocator
35 35
36 36 from schainpy.model.data.jrodata import PlotterData
37 37 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
38 38 from schainpy.utils import log
39 39
40 40 jet_values = matplotlib.pyplot.get_cmap('jet', 100)(numpy.arange(100))[10:90]
41 41 blu_values = matplotlib.pyplot.get_cmap(
42 42 'seismic_r', 20)(numpy.arange(20))[10:15]
43 43 ncmap = matplotlib.colors.LinearSegmentedColormap.from_list(
44 44 'jro', numpy.vstack((blu_values, jet_values)))
45 45 matplotlib.pyplot.register_cmap(cmap=ncmap)
46 46
47 47 CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis',
48 48 'plasma', 'inferno', 'Greys', 'seismic', 'bwr', 'coolwarm')]
49 49
50 50 EARTH_RADIUS = 6.3710e3
51 51
52 52 def ll2xy(lat1, lon1, lat2, lon2):
53 53
54 54 p = 0.017453292519943295
55 55 a = 0.5 - numpy.cos((lat2 - lat1) * p)/2 + numpy.cos(lat1 * p) * \
56 56 numpy.cos(lat2 * p) * (1 - numpy.cos((lon2 - lon1) * p)) / 2
57 57 r = 12742 * numpy.arcsin(numpy.sqrt(a))
58 58 theta = numpy.arctan2(numpy.sin((lon2-lon1)*p)*numpy.cos(lat2*p), numpy.cos(lat1*p)
59 59 * numpy.sin(lat2*p)-numpy.sin(lat1*p)*numpy.cos(lat2*p)*numpy.cos((lon2-lon1)*p))
60 60 theta = -theta + numpy.pi/2
61 61 return r*numpy.cos(theta), r*numpy.sin(theta)
62 62
63 63
64 64 def km2deg(km):
65 65 '''
66 66 Convert distance in km to degrees
67 67 '''
68 68
69 69 return numpy.rad2deg(km/EARTH_RADIUS)
70 70
71 71
72 72 def figpause(interval):
73 73 backend = plt.rcParams['backend']
74 74 if backend in matplotlib.rcsetup.interactive_bk:
75 75 figManager = matplotlib._pylab_helpers.Gcf.get_active()
76 76 if figManager is not None:
77 77 canvas = figManager.canvas
78 78 if canvas.figure.stale:
79 79 canvas.draw()
80 80 try:
81 81 canvas.start_event_loop(interval)
82 82 except:
83 83 pass
84 84 return
85 85
86 86 def popup(message):
87 87 '''
88 88 '''
89 89
90 90 fig = plt.figure(figsize=(12, 8), facecolor='r')
91 91 text = '\n'.join([s.strip() for s in message.split(':')])
92 92 fig.text(0.01, 0.5, text, ha='left', va='center',
93 93 size='20', weight='heavy', color='w')
94 94 fig.show()
95 95 figpause(1000)
96 96
97 97
98 98 class Throttle(object):
99 99 '''
100 100 Decorator that prevents a function from being called more than once every
101 101 time period.
102 102 To create a function that cannot be called more than once a minute, but
103 103 will sleep until it can be called:
104 104 @Throttle(minutes=1)
105 105 def foo():
106 106 pass
107 107
108 108 for i in range(10):
109 109 foo()
110 110 print "This function has run %s times." % i
111 111 '''
112 112
113 113 def __init__(self, seconds=0, minutes=0, hours=0):
114 114 self.throttle_period = datetime.timedelta(
115 115 seconds=seconds, minutes=minutes, hours=hours
116 116 )
117 117
118 118 self.time_of_last_call = datetime.datetime.min
119 119
120 120 def __call__(self, fn):
121 121 @wraps(fn)
122 122 def wrapper(*args, **kwargs):
123 123 coerce = kwargs.pop('coerce', None)
124 124 if coerce:
125 125 self.time_of_last_call = datetime.datetime.now()
126 126 return fn(*args, **kwargs)
127 127 else:
128 128 now = datetime.datetime.now()
129 129 time_since_last_call = now - self.time_of_last_call
130 130 time_left = self.throttle_period - time_since_last_call
131 131
132 132 if time_left > datetime.timedelta(seconds=0):
133 133 return
134 134
135 135 self.time_of_last_call = datetime.datetime.now()
136 136 return fn(*args, **kwargs)
137 137
138 138 return wrapper
139 139
140 140 def apply_throttle(value):
141 141
142 142 @Throttle(seconds=value)
143 143 def fnThrottled(fn):
144 144 fn()
145 145
146 146 return fnThrottled
147 147
148 148
149 149 @MPDecorator
150 150 class Plot(Operation):
151 151 """Base class for Schain plotting operations
152 152
153 153 This class should never be use directtly you must subclass a new operation,
154 154 children classes must be defined as follow:
155 155
156 156 ExamplePlot(Plot):
157 157
158 158 CODE = 'code'
159 159 colormap = 'jet'
160 160 plot_type = 'pcolor' # options are ('pcolor', 'pcolorbuffer', 'scatter', 'scatterbuffer')
161 161
162 162 def setup(self):
163 163 pass
164 164
165 165 def plot(self):
166 166 pass
167 167
168 168 """
169 169
170 170 CODE = 'Figure'
171 171 colormap = 'jet'
172 172 bgcolor = 'white'
173 173 buffering = True
174 174 __missing = 1E30
175 175
176 176 __attrs__ = ['show', 'save', 'ymin', 'ymax', 'zmin', 'zmax', 'title',
177 177 'showprofile']
178 178
179 179 def __init__(self):
180 180
181 181 Operation.__init__(self)
182 182 self.isConfig = False
183 183 self.isPlotConfig = False
184 184 self.save_time = 0
185 185 self.sender_time = 0
186 186 self.data = None
187 187 self.firsttime = True
188 188 self.sender_queue = deque(maxlen=10)
189 189 self.plots_adjust = {'left': 0.125, 'right': 0.9, 'bottom': 0.15, 'top': 0.9, 'wspace': 0.2, 'hspace': 0.2}
190 190
191 191 def __fmtTime(self, x, pos):
192 192 '''
193 193 '''
194 194
195 195 return '{}'.format(self.getDateTime(x).strftime('%H:%M'))
196 196
197 197 def __setup(self, **kwargs):
198 198 '''
199 199 Initialize variables
200 200 '''
201 201
202 202 self.figures = []
203 203 self.axes = []
204 204 self.cb_axes = []
205 205 self.localtime = kwargs.pop('localtime', True)
206 206 self.show = kwargs.get('show', True)
207 207 self.save = kwargs.get('save', False)
208 208 self.save_period = kwargs.get('save_period', 0)
209 209 self.colormap = kwargs.get('colormap', self.colormap)
210 210 self.colormap_coh = kwargs.get('colormap_coh', 'jet')
211 211 self.colormap_phase = kwargs.get('colormap_phase', 'RdBu_r')
212 212 self.colormaps = kwargs.get('colormaps', None)
213 213 self.bgcolor = kwargs.get('bgcolor', self.bgcolor)
214 214 self.showprofile = kwargs.get('showprofile', False)
215 215 self.title = kwargs.get('wintitle', self.CODE.upper())
216 216 self.cb_label = kwargs.get('cb_label', None)
217 217 self.cb_labels = kwargs.get('cb_labels', None)
218 218 self.labels = kwargs.get('labels', None)
219 219 self.xaxis = kwargs.get('xaxis', 'frequency')
220 220 self.zmin = kwargs.get('zmin', None)
221 221 self.zmax = kwargs.get('zmax', None)
222 222 self.zlimits = kwargs.get('zlimits', None)
223 223 self.xmin = kwargs.get('xmin', None)
224 224 self.xmax = kwargs.get('xmax', None)
225 225 self.xrange = kwargs.get('xrange', 12)
226 226 self.xscale = kwargs.get('xscale', None)
227 227 self.ymin = kwargs.get('ymin', None)
228 228 self.ymax = kwargs.get('ymax', None)
229 229 self.yscale = kwargs.get('yscale', None)
230 230 self.xlabel = kwargs.get('xlabel', None)
231 231 self.attr_time = kwargs.get('attr_time', 'utctime')
232 232 self.attr_data = kwargs.get('attr_data', 'data_param')
233 233 self.decimation = kwargs.get('decimation', None)
234 234 self.oneFigure = kwargs.get('oneFigure', True)
235 235 self.width = kwargs.get('width', None)
236 236 self.height = kwargs.get('height', None)
237 237 self.colorbar = kwargs.get('colorbar', True)
238 238 self.factors = kwargs.get('factors', [1, 1, 1, 1, 1, 1, 1, 1])
239 239 self.channels = kwargs.get('channels', None)
240 240 self.titles = kwargs.get('titles', [])
241 241 self.polar = False
242 242 self.type = kwargs.get('type', 'iq')
243 243 self.grid = kwargs.get('grid', False)
244 244 self.pause = kwargs.get('pause', False)
245 245 self.save_code = kwargs.get('save_code', self.CODE)
246 246 self.throttle = kwargs.get('throttle', 0)
247 247 self.exp_code = kwargs.get('exp_code', None)
248 248 self.server = kwargs.get('server', False)
249 249 self.sender_period = kwargs.get('sender_period', 60)
250 250 self.tag = kwargs.get('tag', '')
251 251 self.height_index = kwargs.get('height_index', None)
252 252 self.__throttle_plot = apply_throttle(self.throttle)
253 253 code = self.attr_data if self.attr_data else self.CODE
254 254 self.data = PlotterData(self.CODE, self.exp_code, self.localtime)
255 255
256 256 if self.server:
257 257 if not self.server.startswith('tcp://'):
258 258 self.server = 'tcp://{}'.format(self.server)
259 259 log.success(
260 260 'Sending to server: {}'.format(self.server),
261 261 self.name
262 262 )
263 263
264 264 if isinstance(self.attr_data, str):
265 265 self.attr_data = [self.attr_data]
266 266
267 267 def __setup_plot(self):
268 268 '''
269 269 Common setup for all figures, here figures and axes are created
270 270 '''
271 271
272 272 self.setup()
273 273
274 274 self.time_label = 'LT' if self.localtime else 'UTC'
275 275
276 276 if self.width is None:
277 277 self.width = 8
278 278
279 279 self.figures = []
280 280 self.axes = []
281 281 self.cb_axes = []
282 282 self.pf_axes = []
283 283 self.cmaps = []
284 284
285 285 size = '15%' if self.ncols == 1 else '30%'
286 286 pad = '4%' if self.ncols == 1 else '8%'
287 287
288 288 if self.oneFigure:
289 289 if self.height is None:
290 290 self.height = 1.4 * self.nrows + 1
291 291 fig = plt.figure(figsize=(self.width, self.height),
292 292 edgecolor='k',
293 293 facecolor='w')
294 294 self.figures.append(fig)
295 295 for n in range(self.nplots):
296 296 ax = fig.add_subplot(self.nrows, self.ncols,
297 297 n + 1, polar=self.polar)
298 298 ax.tick_params(labelsize=8)
299 299 ax.firsttime = True
300 300 ax.index = 0
301 301 ax.press = None
302 302 self.axes.append(ax)
303 303 if self.showprofile:
304 304 cax = self.__add_axes(ax, size=size, pad=pad)
305 305 cax.tick_params(labelsize=8)
306 306 self.pf_axes.append(cax)
307 307 else:
308 308 if self.height is None:
309 309 self.height = 3
310 310 for n in range(self.nplots):
311 311 fig = plt.figure(figsize=(self.width, self.height),
312 312 edgecolor='k',
313 313 facecolor='w')
314 314 ax = fig.add_subplot(1, 1, 1, polar=self.polar)
315 315 ax.tick_params(labelsize=8)
316 316 ax.firsttime = True
317 317 ax.index = 0
318 318 ax.press = None
319 319 self.figures.append(fig)
320 320 self.axes.append(ax)
321 321 if self.showprofile:
322 322 cax = self.__add_axes(ax, size=size, pad=pad)
323 323 cax.tick_params(labelsize=8)
324 324 self.pf_axes.append(cax)
325 325
326 326 for n in range(self.nrows):
327 print(self.nrows)
328 327 if self.colormaps is not None:
329 328 cmap = plt.get_cmap(self.colormaps[n])
330 329 else:
331 330 cmap = plt.get_cmap(self.colormap)
332 331 cmap.set_bad(self.bgcolor, 1.)
333 332 self.cmaps.append(cmap)
334 333
335 334 def __add_axes(self, ax, size='30%', pad='8%'):
336 335 '''
337 336 Add new axes to the given figure
338 337 '''
339 338 divider = make_axes_locatable(ax)
340 339 nax = divider.new_horizontal(size=size, pad=pad)
341 340 ax.figure.add_axes(nax)
342 341 return nax
343 342
344 343 def fill_gaps(self, x_buffer, y_buffer, z_buffer):
345 344 '''
346 345 Create a masked array for missing data
347 346 '''
348 347 if x_buffer.shape[0] < 2:
349 348 return x_buffer, y_buffer, z_buffer
350 349
351 350 deltas = x_buffer[1:] - x_buffer[0:-1]
352 351 x_median = numpy.median(deltas)
353 352
354 353 index = numpy.where(deltas > 5 * x_median)
355 354
356 355 if len(index[0]) != 0:
357 356 z_buffer[::, index[0], ::] = self.__missing
358 357 z_buffer = numpy.ma.masked_inside(z_buffer,
359 358 0.99 * self.__missing,
360 359 1.01 * self.__missing)
361 360
362 361 return x_buffer, y_buffer, z_buffer
363 362
364 363 def decimate(self):
365 364
366 365 # dx = int(len(self.x)/self.__MAXNUMX) + 1
367 366 dy = int(len(self.y) / self.decimation) + 1
368 367
369 368 # x = self.x[::dx]
370 369 x = self.x
371 370 y = self.y[::dy]
372 371 z = self.z[::, ::, ::dy]
373 372
374 373 return x, y, z
375 374
376 375 def format(self):
377 376 '''
378 377 Set min and max values, labels, ticks and titles
379 378 '''
380 379
381 380 for n, ax in enumerate(self.axes):
382 381 if ax.firsttime:
383 382 if self.xaxis != 'time':
384 383 xmin = self.xmin
385 384 xmax = self.xmax
386 385 else:
387 386 xmin = self.tmin
388 387 xmax = self.tmin + self.xrange*60*60
389 388 ax.xaxis.set_major_formatter(FuncFormatter(self.__fmtTime))
390 389 ax.xaxis.set_major_locator(LinearLocator(9))
391 390 ymin = self.ymin if self.ymin is not None else numpy.nanmin(self.y[numpy.isfinite(self.y)])
392 391 ymax = self.ymax if self.ymax is not None else numpy.nanmax(self.y[numpy.isfinite(self.y)])
393 392 ax.set_facecolor(self.bgcolor)
394 393 if self.xscale:
395 394 ax.xaxis.set_major_formatter(FuncFormatter(
396 395 lambda x, pos: '{0:g}'.format(x*self.xscale)))
397 396 if self.yscale:
398 397 ax.yaxis.set_major_formatter(FuncFormatter(
399 398 lambda x, pos: '{0:g}'.format(x*self.yscale)))
400 399 if self.xlabel is not None:
401 400 ax.set_xlabel(self.xlabel)
402 401 if self.ylabel is not None:
403 402 ax.set_ylabel(self.ylabel)
404 403 if self.showprofile:
405 404 self.pf_axes[n].set_ylim(ymin, ymax)
406 405 self.pf_axes[n].set_xlim(self.zmin, self.zmax)
407 406 self.pf_axes[n].set_xlabel('dB')
408 407 self.pf_axes[n].grid(b=True, axis='x')
409 408 [tick.set_visible(False)
410 409 for tick in self.pf_axes[n].get_yticklabels()]
411 410 if self.colorbar:
412 411 ax.cbar = plt.colorbar(
413 412 ax.plt, ax=ax, fraction=0.05, pad=0.02, aspect=10)
414 413 ax.cbar.ax.tick_params(labelsize=8)
415 414 ax.cbar.ax.press = None
416 415 if self.cb_label:
417 416 ax.cbar.set_label(self.cb_label, size=8)
418 417 elif self.cb_labels:
419 418 ax.cbar.set_label(self.cb_labels[n], size=8)
420 419 else:
421 420 ax.cbar = None
422 421 ax.set_xlim(xmin, xmax)
423 422 ax.set_ylim(ymin, ymax)
424 423 ax.firsttime = False
425 424 if self.grid:
426 425 ax.grid(True)
427 426 if not self.polar:
428 427 ax.set_title('{} {} {}'.format(
429 428 self.titles[n],
430 429 self.getDateTime(self.data.max_time).strftime(
431 430 '%Y-%m-%d %H:%M:%S'),
432 431 self.time_label),
433 432 size=8)
434 433 else:
435 434 ax.set_title('{}'.format(self.titles[n]), size=8)
436 435 ax.set_ylim(0, 90)
437 436 ax.set_yticks(numpy.arange(0, 90, 20))
438 437 ax.yaxis.labelpad = 40
439 438
440 439 if self.firsttime:
441 440 for n, fig in enumerate(self.figures):
442 441 fig.subplots_adjust(**self.plots_adjust)
443 442 self.firsttime = False
444 443
445 444 def clear_figures(self):
446 445 '''
447 446 Reset axes for redraw plots
448 447 '''
449 448
450 449 for ax in self.axes+self.pf_axes+self.cb_axes:
451 450 ax.clear()
452 451 ax.firsttime = True
453 452 if hasattr(ax, 'cbar') and ax.cbar:
454 453 ax.cbar.remove()
455 454
456 455 def __plot(self):
457 456 '''
458 457 Main function to plot, format and save figures
459 458 '''
460 459
461 460 self.plot()
462 461 self.format()
463 462
464 463 for n, fig in enumerate(self.figures):
465 464 if self.nrows == 0 or self.nplots == 0:
466 465 log.warning('No data', self.name)
467 466 fig.text(0.5, 0.5, 'No Data', fontsize='large', ha='center')
468 467 fig.canvas.manager.set_window_title(self.CODE)
469 468 continue
470 469
471 470 fig.canvas.manager.set_window_title('{} - {}'.format(self.title,
472 471 self.getDateTime(self.data.max_time).strftime('%Y/%m/%d')))
473 472 fig.canvas.draw()
474 473 if self.show:
475 474 fig.show()
476 475 figpause(0.01)
477 476
478 477 if self.save:
479 478 self.save_figure(n)
480 479
481 480 if self.server:
482 481 self.send_to_server()
483 482
484 483 def __update(self, dataOut, timestamp):
485 484 '''
486 485 '''
487 486
488 487 metadata = {
489 488 'yrange': dataOut.heightList,
490 489 'interval': dataOut.timeInterval,
491 490 'channels': dataOut.channelList
492 491 }
493 492
494 493 data, meta = self.update(dataOut)
495 494 metadata.update(meta)
496 495 self.data.update(data, timestamp, metadata)
497 496
498 497 def save_figure(self, n):
499 498 '''
500 499 '''
501 500
502 501 if (self.data.max_time - self.save_time) <= self.save_period:
503 502 return
504 503
505 504 self.save_time = self.data.max_time
506 505
507 506 fig = self.figures[n]
508 507
509 508 if self.throttle == 0:
510 509 figname = os.path.join(
511 510 self.save,
512 511 self.save_code,
513 512 '{}_{}.png'.format(
514 513 self.save_code,
515 514 self.getDateTime(self.data.max_time).strftime(
516 515 '%Y%m%d_%H%M%S'
517 516 ),
518 517 )
519 518 )
520 519 log.log('Saving figure: {}'.format(figname), self.name)
521 520 if not os.path.isdir(os.path.dirname(figname)):
522 521 os.makedirs(os.path.dirname(figname))
523 522 fig.savefig(figname)
524 523
525 524 figname = os.path.join(
526 525 self.save,
527 526 '{}_{}.png'.format(
528 527 self.save_code,
529 528 self.getDateTime(self.data.min_time).strftime(
530 529 '%Y%m%d'
531 530 ),
532 531 )
533 532 )
533 log.log('Saving figure: {}'.format(figname), self.name)
534 if not os.path.isdir(os.path.dirname(figname)):
535 os.makedirs(os.path.dirname(figname))
534 536 fig.savefig(figname)
535 537
536 538 def send_to_server(self):
537 539 '''
538 540 '''
539 541
540 542 if self.exp_code == None:
541 543 log.warning('Missing `exp_code` skipping sending to server...')
542 544
543 545 last_time = self.data.max_time
544 546 interval = last_time - self.sender_time
545 547 if interval < self.sender_period:
546 548 return
547 549
548 550 self.sender_time = last_time
549 551
550 552 attrs = ['titles', 'zmin', 'zmax', 'tag', 'ymin', 'ymax']
551 553 for attr in attrs:
552 554 value = getattr(self, attr)
553 555 if value:
554 556 if isinstance(value, (numpy.float32, numpy.float64)):
555 557 value = round(float(value), 2)
556 558 self.data.meta[attr] = value
557 559 if self.colormap == 'jet':
558 560 self.data.meta['colormap'] = 'Jet'
559 561 elif 'RdBu' in self.colormap:
560 562 self.data.meta['colormap'] = 'RdBu'
561 563 else:
562 564 self.data.meta['colormap'] = 'Viridis'
563 565 self.data.meta['interval'] = int(interval)
564 566
565 567 self.sender_queue.append(last_time)
566 568
567 569 while True:
568 570 try:
569 571 tm = self.sender_queue.popleft()
570 572 except IndexError:
571 573 break
572 574 msg = self.data.jsonify(tm, self.save_code, self.plot_type)
573 575 self.socket.send_string(msg)
574 576 socks = dict(self.poll.poll(2000))
575 577 if socks.get(self.socket) == zmq.POLLIN:
576 578 reply = self.socket.recv_string()
577 579 if reply == 'ok':
578 580 log.log("Response from server ok", self.name)
579 581 time.sleep(0.1)
580 582 continue
581 583 else:
582 584 log.warning(
583 585 "Malformed reply from server: {}".format(reply), self.name)
584 586 else:
585 587 log.warning(
586 588 "No response from server, retrying...", self.name)
587 589 self.sender_queue.appendleft(tm)
588 590 self.socket.setsockopt(zmq.LINGER, 0)
589 591 self.socket.close()
590 592 self.poll.unregister(self.socket)
591 593 self.socket = self.context.socket(zmq.REQ)
592 594 self.socket.connect(self.server)
593 595 self.poll.register(self.socket, zmq.POLLIN)
594 596 break
595 597
596 598 def setup(self):
597 599 '''
598 600 This method should be implemented in the child class, the following
599 601 attributes should be set:
600 602
601 603 self.nrows: number of rows
602 604 self.ncols: number of cols
603 605 self.nplots: number of plots (channels or pairs)
604 606 self.ylabel: label for Y axes
605 607 self.titles: list of axes title
606 608
607 609 '''
608 610 raise NotImplementedError
609 611
610 612 def plot(self):
611 613 '''
612 614 Must be defined in the child class, the actual plotting method
613 615 '''
614 616 raise NotImplementedError
615 617
616 618 def update(self, dataOut):
617 619 '''
618 620 Must be defined in the child class, update self.data with new data
619 621 '''
620 622
621 623 data = {
622 624 self.CODE: getattr(dataOut, 'data_{}'.format(self.CODE))
623 625 }
624 626 meta = {}
625 627
626 628 return data, meta
627 629
628 630 def run(self, dataOut, **kwargs):
629 631 '''
630 632 Main plotting routine
631 633 '''
632 634
633 635 if self.isConfig is False:
634 636 self.__setup(**kwargs)
635 637
636 638 if self.localtime:
637 639 self.getDateTime = datetime.datetime.fromtimestamp
638 640 else:
639 641 self.getDateTime = datetime.datetime.utcfromtimestamp
640 642
641 643 self.data.setup()
642 644 self.isConfig = True
643 645 if self.server:
644 646 self.context = zmq.Context()
645 647 self.socket = self.context.socket(zmq.REQ)
646 648 self.socket.connect(self.server)
647 649 self.poll = zmq.Poller()
648 650 self.poll.register(self.socket, zmq.POLLIN)
649 651
650 652 tm = getattr(dataOut, self.attr_time)
651 653
652 654 if self.data and 'time' in self.xaxis and (tm - self.tmin) >= self.xrange*60*60:
653 655 self.save_time = tm
654 656 self.__plot()
655 657 self.tmin += self.xrange*60*60
656 658 self.data.setup()
657 659 self.clear_figures()
658 660
659 661 self.__update(dataOut, tm)
660 662
661 663 if self.isPlotConfig is False:
662 664 self.__setup_plot()
663 665 self.isPlotConfig = True
664 666 if self.xaxis == 'time':
665 667 dt = self.getDateTime(tm)
666 668 if self.xmin is None:
667 669 self.tmin = tm
668 670 self.xmin = dt.hour
669 671 minutes = (self.xmin-int(self.xmin)) * 60
670 672 seconds = (minutes - int(minutes)) * 60
671 673 self.tmin = (dt.replace(hour=int(self.xmin), minute=int(minutes), second=int(seconds)) -
672 674 datetime.datetime(1970, 1, 1)).total_seconds()
673 675 if self.localtime:
674 676 self.tmin += time.timezone
675 677
676 678 if self.xmin is not None and self.xmax is not None:
677 679 self.xrange = self.xmax - self.xmin
678 680
679 681 if self.throttle == 0:
680 682 self.__plot()
681 683 else:
682 684 self.__throttle_plot(self.__plot)#, coerce=coerce)
683 685
684 686 def close(self):
685 687
686 688 if self.data and not self.data.flagNoData:
687 self.save_time = self.data.max_time
689 self.save_time = 0
688 690 self.__plot()
689 691 if self.data and not self.data.flagNoData and self.pause:
690 692 figpause(10)
691 693
@@ -1,627 +1,626
1 1 import os
2 2 import time
3 3 import datetime
4 4
5 5 import numpy
6 6 import h5py
7 7
8 8 import schainpy.admin
9 9 from schainpy.model.data.jrodata import *
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.io.jroIO_base import *
12 12 from schainpy.utils import log
13 13
14 14
15 15 class HDFReader(Reader, ProcessingUnit):
16 16 """Processing unit to read HDF5 format files
17 17
18 18 This unit reads HDF5 files created with `HDFWriter` operation contains
19 19 by default two groups Data and Metadata all variables would be saved as `dataOut`
20 20 attributes.
21 21 It is possible to read any HDF5 file by given the structure in the `description`
22 22 parameter, also you can add extra values to metadata with the parameter `extras`.
23 23
24 24 Parameters:
25 25 -----------
26 26 path : str
27 27 Path where files are located.
28 28 startDate : date
29 29 Start date of the files
30 30 endDate : list
31 31 End date of the files
32 32 startTime : time
33 33 Start time of the files
34 34 endTime : time
35 35 End time of the files
36 36 description : dict, optional
37 37 Dictionary with the description of the HDF5 file
38 38 extras : dict, optional
39 39 Dictionary with extra metadata to be be added to `dataOut`
40 40
41 41 Examples
42 42 --------
43 43
44 44 desc = {
45 45 'Data': {
46 46 'data_output': ['u', 'v', 'w'],
47 47 'utctime': 'timestamps',
48 48 } ,
49 49 'Metadata': {
50 50 'heightList': 'heights'
51 51 }
52 52 }
53 53
54 54 desc = {
55 55 'Data': {
56 56 'data_output': 'winds',
57 57 'utctime': 'timestamps'
58 58 },
59 59 'Metadata': {
60 60 'heightList': 'heights'
61 61 }
62 62 }
63 63
64 64 extras = {
65 65 'timeZone': 300
66 66 }
67 67
68 68 reader = project.addReadUnit(
69 69 name='HDFReader',
70 70 path='/path/to/files',
71 71 startDate='2019/01/01',
72 72 endDate='2019/01/31',
73 73 startTime='00:00:00',
74 74 endTime='23:59:59',
75 75 # description=json.dumps(desc),
76 76 # extras=json.dumps(extras),
77 77 )
78 78
79 79 """
80 80
81 81 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
82 82
83 83 def __init__(self):
84 84 ProcessingUnit.__init__(self)
85 85 self.dataOut = Parameters()
86 86 self.ext = ".hdf5"
87 87 self.optchar = "D"
88 88 self.meta = {}
89 89 self.data = {}
90 90 self.open_file = h5py.File
91 91 self.open_mode = 'r'
92 92 self.description = {}
93 93 self.extras = {}
94 94 self.filefmt = "*%Y%j***"
95 95 self.folderfmt = "*%Y%j"
96 self.utcoffset = 0
96 97
97 98 def setup(self, **kwargs):
98 99
99 100 self.set_kwargs(**kwargs)
100 101 if not self.ext.startswith('.'):
101 102 self.ext = '.{}'.format(self.ext)
102 103
103 104 if self.online:
104 105 log.log("Searching files in online mode...", self.name)
105 106
106 107 for nTries in range(self.nTries):
107 108 fullpath = self.searchFilesOnLine(self.path, self.startDate,
108 109 self.endDate, self.expLabel, self.ext, self.walk,
109 110 self.filefmt, self.folderfmt)
110 111 try:
111 112 fullpath = next(fullpath)
112 113 except:
113 114 fullpath = None
114 115
115 116 if fullpath:
116 117 break
117 118
118 119 log.warning(
119 120 'Waiting {} sec for a valid file in {}: try {} ...'.format(
120 121 self.delay, self.path, nTries + 1),
121 122 self.name)
122 123 time.sleep(self.delay)
123 124
124 125 if not(fullpath):
125 126 raise schainpy.admin.SchainError(
126 127 'There isn\'t any valid file in {}'.format(self.path))
127 128
128 129 pathname, filename = os.path.split(fullpath)
129 130 self.year = int(filename[1:5])
130 131 self.doy = int(filename[5:8])
131 132 self.set = int(filename[8:11]) - 1
132 133 else:
133 134 log.log("Searching files in {}".format(self.path), self.name)
134 135 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
135 136 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
136 137
137 138 self.setNextFile()
138 139
139 140 return
140 141
141 142 def readFirstHeader(self):
142 143 '''Read metadata and data'''
143 144
144 145 self.__readMetadata()
145 146 self.__readData()
146 147 self.__setBlockList()
147 148
148 149 if 'type' in self.meta:
149 150 self.dataOut = eval(self.meta['type'])()
150 151
151 152 for attr in self.meta:
152 153 setattr(self.dataOut, attr, self.meta[attr])
153 154
154 155 self.blockIndex = 0
155 156
156 157 return
157 158
158 159 def __setBlockList(self):
159 160 '''
160 161 Selects the data within the times defined
161 162
162 163 self.fp
163 164 self.startTime
164 165 self.endTime
165 166 self.blockList
166 167 self.blocksPerFile
167 168
168 169 '''
169 170
170 171 startTime = self.startTime
171 172 endTime = self.endTime
172
173 thisUtcTime = self.data['utctime']
173 thisUtcTime = self.data['utctime'] + self.utcoffset
174 174 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
175
176 175 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
177 176
178 177 thisDate = thisDatetime.date()
179 178 thisTime = thisDatetime.time()
180 179
181 180 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
182 181 endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
183 182
184 183 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
185 184
186 185 self.blockList = ind
187 186 self.blocksPerFile = len(ind)
188 187 return
189 188
190 189 def __readMetadata(self):
191 190 '''
192 191 Reads Metadata
193 192 '''
194 193
195 194 meta = {}
196 195
197 196 if self.description:
198 197 for key, value in self.description['Metadata'].items():
199 198 meta[key] = self.fp[value][()]
200 199 else:
201 200 grp = self.fp['Metadata']
202 201 for name in grp:
203 202 meta[name] = grp[name][()]
204 203
205 204 if self.extras:
206 205 for key, value in self.extras.items():
207 206 meta[key] = value
208 207 self.meta = meta
209 208
210 209 return
211 210
212 211 def __readData(self):
213 212
214 213 data = {}
215 214
216 215 if self.description:
217 216 for key, value in self.description['Data'].items():
218 217 if isinstance(value, str):
219 218 if isinstance(self.fp[value], h5py.Dataset):
220 219 data[key] = self.fp[value][()]
221 220 elif isinstance(self.fp[value], h5py.Group):
222 221 array = []
223 222 for ch in self.fp[value]:
224 223 array.append(self.fp[value][ch][()])
225 224 data[key] = numpy.array(array)
226 225 elif isinstance(value, list):
227 226 array = []
228 227 for ch in value:
229 228 array.append(self.fp[ch][()])
230 229 data[key] = numpy.array(array)
231 230 else:
232 231 grp = self.fp['Data']
233 232 for name in grp:
234 233 if isinstance(grp[name], h5py.Dataset):
235 234 array = grp[name][()]
236 235 elif isinstance(grp[name], h5py.Group):
237 236 array = []
238 237 for ch in grp[name]:
239 238 array.append(grp[name][ch][()])
240 239 array = numpy.array(array)
241 240 else:
242 241 log.warning('Unknown type: {}'.format(name))
243 242
244 243 if name in self.description:
245 244 key = self.description[name]
246 245 else:
247 246 key = name
248 247 data[key] = array
249 248
250 249 self.data = data
251 250 return
252 251
253 252 def getData(self):
254 253
255 254 for attr in self.data:
256 255 if self.data[attr].ndim == 1:
257 256 setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
258 257 else:
259 258 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
260 259
261 260 self.dataOut.flagNoData = False
262 261 self.blockIndex += 1
263 262
264 263 log.log("Block No. {}/{} -> {}".format(
265 264 self.blockIndex,
266 265 self.blocksPerFile,
267 266 self.dataOut.datatime.ctime()), self.name)
268 267
269 268 return
270 269
271 270 def run(self, **kwargs):
272 271
273 272 if not(self.isConfig):
274 273 self.setup(**kwargs)
275 274 self.isConfig = True
276 275
277 276 if self.blockIndex == self.blocksPerFile:
278 277 self.setNextFile()
279 278
280 279 self.getData()
281 280
282 281 return
283 282
284 283 @MPDecorator
285 284 class HDFWriter(Operation):
286 285 """Operation to write HDF5 files.
287 286
288 287 The HDF5 file contains by default two groups Data and Metadata where
289 288 you can save any `dataOut` attribute specified by `dataList` and `metadataList`
290 289 parameters, data attributes are normaly time dependent where the metadata
291 290 are not.
292 291 It is possible to customize the structure of the HDF5 file with the
293 292 optional description parameter see the examples.
294 293
295 294 Parameters:
296 295 -----------
297 296 path : str
298 297 Path where files will be saved.
299 298 blocksPerFile : int
300 299 Number of blocks per file
301 300 metadataList : list
302 301 List of the dataOut attributes that will be saved as metadata
303 302 dataList : int
304 303 List of the dataOut attributes that will be saved as data
305 304 setType : bool
306 305 If True the name of the files corresponds to the timestamp of the data
307 306 description : dict, optional
308 307 Dictionary with the desired description of the HDF5 file
309 308
310 309 Examples
311 310 --------
312 311
313 312 desc = {
314 313 'data_output': {'winds': ['z', 'w', 'v']},
315 314 'utctime': 'timestamps',
316 315 'heightList': 'heights'
317 316 }
318 317 desc = {
319 318 'data_output': ['z', 'w', 'v'],
320 319 'utctime': 'timestamps',
321 320 'heightList': 'heights'
322 321 }
323 322 desc = {
324 323 'Data': {
325 324 'data_output': 'winds',
326 325 'utctime': 'timestamps'
327 326 },
328 327 'Metadata': {
329 328 'heightList': 'heights'
330 329 }
331 330 }
332 331
333 332 writer = proc_unit.addOperation(name='HDFWriter')
334 333 writer.addParameter(name='path', value='/path/to/file')
335 334 writer.addParameter(name='blocksPerFile', value='32')
336 335 writer.addParameter(name='metadataList', value='heightList,timeZone')
337 336 writer.addParameter(name='dataList',value='data_output,utctime')
338 337 # writer.addParameter(name='description',value=json.dumps(desc))
339 338
340 339 """
341 340
342 341 ext = ".hdf5"
343 342 optchar = "D"
344 343 filename = None
345 344 path = None
346 345 setFile = None
347 346 fp = None
348 347 firsttime = True
349 348 #Configurations
350 349 blocksPerFile = None
351 350 blockIndex = None
352 351 dataOut = None
353 352 #Data Arrays
354 353 dataList = None
355 354 metadataList = None
356 355 currentDay = None
357 356 lastTime = None
358 357
359 358 def __init__(self):
360 359
361 360 Operation.__init__(self)
362 361 return
363 362
364 363 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None):
365 364 self.path = path
366 365 self.blocksPerFile = blocksPerFile
367 366 self.metadataList = metadataList
368 367 self.dataList = [s.strip() for s in dataList]
369 368 self.setType = setType
370 369 self.description = description
371 370
372 371 if self.metadataList is None:
373 372 self.metadataList = self.dataOut.metadata_list
374 373
375 374 tableList = []
376 375 dsList = []
377 376
378 377 for i in range(len(self.dataList)):
379 378 dsDict = {}
380 379 if hasattr(self.dataOut, self.dataList[i]):
381 380 dataAux = getattr(self.dataOut, self.dataList[i])
382 381 dsDict['variable'] = self.dataList[i]
383 382 else:
384 383 log.warning('Attribute {} not found in dataOut', self.name)
385 384 continue
386 385
387 386 if dataAux is None:
388 387 continue
389 388 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
390 389 dsDict['nDim'] = 0
391 390 else:
392 391 dsDict['nDim'] = len(dataAux.shape)
393 392 dsDict['shape'] = dataAux.shape
394 393 dsDict['dsNumber'] = dataAux.shape[0]
395 394 dsDict['dtype'] = dataAux.dtype
396 395
397 396 dsList.append(dsDict)
398 397
399 398 self.dsList = dsList
400 399 self.currentDay = self.dataOut.datatime.date()
401 400
402 401 def timeFlag(self):
403 402 currentTime = self.dataOut.utctime
404 403 timeTuple = time.localtime(currentTime)
405 404 dataDay = timeTuple.tm_yday
406 405
407 406 if self.lastTime is None:
408 407 self.lastTime = currentTime
409 408 self.currentDay = dataDay
410 409 return False
411 410
412 411 timeDiff = currentTime - self.lastTime
413 412
414 413 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
415 414 if dataDay != self.currentDay:
416 415 self.currentDay = dataDay
417 416 return True
418 417 elif timeDiff > 3*60*60:
419 418 self.lastTime = currentTime
420 419 return True
421 420 else:
422 421 self.lastTime = currentTime
423 422 return False
424 423
425 424 def run(self, dataOut, path, blocksPerFile=10, metadataList=None,
426 425 dataList=[], setType=None, description={}):
427 426
428 427 self.dataOut = dataOut
429 428 if not(self.isConfig):
430 429 self.setup(path=path, blocksPerFile=blocksPerFile,
431 430 metadataList=metadataList, dataList=dataList,
432 431 setType=setType, description=description)
433 432
434 433 self.isConfig = True
435 434 self.setNextFile()
436 435
437 436 self.putData()
438 437 return
439 438
440 439 def setNextFile(self):
441 440
442 441 ext = self.ext
443 442 path = self.path
444 443 setFile = self.setFile
445 444
446 445 timeTuple = time.localtime(self.dataOut.utctime)
447 446 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
448 447 fullpath = os.path.join(path, subfolder)
449 448
450 449 if os.path.exists(fullpath):
451 450 filesList = os.listdir(fullpath)
452 451 filesList = [k for k in filesList if k.startswith(self.optchar)]
453 452 if len( filesList ) > 0:
454 453 filesList = sorted(filesList, key=str.lower)
455 454 filen = filesList[-1]
456 455 # el filename debera tener el siguiente formato
457 456 # 0 1234 567 89A BCDE (hex)
458 457 # x YYYY DDD SSS .ext
459 458 if isNumber(filen[8:11]):
460 459 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
461 460 else:
462 461 setFile = -1
463 462 else:
464 463 setFile = -1 #inicializo mi contador de seteo
465 464 else:
466 465 os.makedirs(fullpath)
467 466 setFile = -1 #inicializo mi contador de seteo
468 467
469 468 if self.setType is None:
470 469 setFile += 1
471 470 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
472 471 timeTuple.tm_year,
473 472 timeTuple.tm_yday,
474 473 setFile,
475 474 ext )
476 475 else:
477 476 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
478 477 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
479 478 timeTuple.tm_year,
480 479 timeTuple.tm_yday,
481 480 setFile,
482 481 ext )
483 482
484 483 self.filename = os.path.join( path, subfolder, file )
485 484
486 485 #Setting HDF5 File
487 486 self.fp = h5py.File(self.filename, 'w')
488 487 #write metadata
489 488 self.writeMetadata(self.fp)
490 489 #Write data
491 490 self.writeData(self.fp)
492 491
493 492 def getLabel(self, name, x=None):
494 493
495 494 if x is None:
496 495 if 'Data' in self.description:
497 496 data = self.description['Data']
498 497 if 'Metadata' in self.description:
499 498 data.update(self.description['Metadata'])
500 499 else:
501 500 data = self.description
502 501 if name in data:
503 502 if isinstance(data[name], str):
504 503 return data[name]
505 504 elif isinstance(data[name], list):
506 505 return None
507 506 elif isinstance(data[name], dict):
508 507 for key, value in data[name].items():
509 508 return key
510 509 return name
511 510 else:
512 511 if 'Metadata' in self.description:
513 512 meta = self.description['Metadata']
514 513 else:
515 514 meta = self.description
516 515 if name in meta:
517 516 if isinstance(meta[name], list):
518 517 return meta[name][x]
519 518 elif isinstance(meta[name], dict):
520 519 for key, value in meta[name].items():
521 520 return value[x]
522 521 if 'cspc' in name:
523 522 return 'pair{:02d}'.format(x)
524 523 else:
525 524 return 'channel{:02d}'.format(x)
526 525
527 526 def writeMetadata(self, fp):
528 527
529 528 if self.description:
530 529 if 'Metadata' in self.description:
531 530 grp = fp.create_group('Metadata')
532 531 else:
533 532 grp = fp
534 533 else:
535 534 grp = fp.create_group('Metadata')
536 535
537 536 for i in range(len(self.metadataList)):
538 537 if not hasattr(self.dataOut, self.metadataList[i]):
539 538 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
540 539 continue
541 540 value = getattr(self.dataOut, self.metadataList[i])
542 541 if isinstance(value, bool):
543 542 if value is True:
544 543 value = 1
545 544 else:
546 545 value = 0
547 546 grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
548 547 return
549 548
550 549 def writeData(self, fp):
551 550
552 551 if self.description:
553 552 if 'Data' in self.description:
554 553 grp = fp.create_group('Data')
555 554 else:
556 555 grp = fp
557 556 else:
558 557 grp = fp.create_group('Data')
559 558
560 559 dtsets = []
561 560 data = []
562 561
563 562 for dsInfo in self.dsList:
564 563 if dsInfo['nDim'] == 0:
565 564 ds = grp.create_dataset(
566 565 self.getLabel(dsInfo['variable']),
567 566 (self.blocksPerFile, ),
568 567 chunks=True,
569 568 dtype=numpy.float64)
570 569 dtsets.append(ds)
571 570 data.append((dsInfo['variable'], -1))
572 571 else:
573 572 label = self.getLabel(dsInfo['variable'])
574 573 if label is not None:
575 574 sgrp = grp.create_group(label)
576 575 else:
577 576 sgrp = grp
578 577 for i in range(dsInfo['dsNumber']):
579 578 ds = sgrp.create_dataset(
580 579 self.getLabel(dsInfo['variable'], i),
581 580 (self.blocksPerFile, ) + dsInfo['shape'][1:],
582 581 chunks=True,
583 582 dtype=dsInfo['dtype'])
584 583 dtsets.append(ds)
585 584 data.append((dsInfo['variable'], i))
586 585 fp.flush()
587 586
588 587 log.log('Creating file: {}'.format(fp.filename), self.name)
589 588
590 589 self.ds = dtsets
591 590 self.data = data
592 591 self.firsttime = True
593 592 self.blockIndex = 0
594 593 return
595 594
596 595 def putData(self):
597 596
598 597 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
599 598 self.closeFile()
600 599 self.setNextFile()
601 600
602 601 for i, ds in enumerate(self.ds):
603 602 attr, ch = self.data[i]
604 603 if ch == -1:
605 604 ds[self.blockIndex] = getattr(self.dataOut, attr)
606 605 else:
607 606 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
608 607
609 608 self.fp.flush()
610 609 self.blockIndex += 1
611 610 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
612 611
613 612 return
614 613
615 614 def closeFile(self):
616 615
617 616 if self.blockIndex != self.blocksPerFile:
618 617 for ds in self.ds:
619 618 ds.resize(self.blockIndex, axis=0)
620 619
621 620 if self.fp:
622 621 self.fp.flush()
623 622 self.fp.close()
624 623
625 624 def close(self):
626 625
627 626 self.closeFile()
General Comments 0
You need to be logged in to leave comments. Login now