##// END OF EJS Templates
Fix px1000 reading online
jespinoza -
r1138:c6cf5a83f48e
parent child
Show More
@@ -1,1042 +1,1042
1 1
2 2 import os
3 3 import time
4 4 import glob
5 5 import datetime
6 6 from multiprocessing import Process
7 7
8 8 import zmq
9 9 import numpy
10 10 import matplotlib
11 11 import matplotlib.pyplot as plt
12 12 from mpl_toolkits.axes_grid1 import make_axes_locatable
13 13 from matplotlib.ticker import FuncFormatter, LinearLocator, MultipleLocator
14 14
15 15 from schainpy.model.proc.jroproc_base import Operation
16 16 from schainpy.utils import log
17 17
18 18 jet_values = matplotlib.pyplot.get_cmap('jet', 100)(numpy.arange(100))[10:90]
19 19 blu_values = matplotlib.pyplot.get_cmap(
20 20 'seismic_r', 20)(numpy.arange(20))[10:15]
21 21 ncmap = matplotlib.colors.LinearSegmentedColormap.from_list(
22 22 'jro', numpy.vstack((blu_values, jet_values)))
23 23 matplotlib.pyplot.register_cmap(cmap=ncmap)
24 24
25 25 CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis', 'plasma', 'inferno', 'Greys', 'seismic', 'bwr', 'coolwarm', 'spectral')]
26 26
27 27
28 28 def figpause(interval):
29 29 backend = plt.rcParams['backend']
30 30 if backend in matplotlib.rcsetup.interactive_bk:
31 31 figManager = matplotlib._pylab_helpers.Gcf.get_active()
32 32 if figManager is not None:
33 33 canvas = figManager.canvas
34 34 if canvas.figure.stale:
35 35 canvas.draw()
36 36 canvas.start_event_loop(interval)
37 37 return
38 38
39 39
40 40 class PlotData(Operation, Process):
41 41 '''
42 42 Base class for Schain plotting operations
43 43 '''
44 44
45 45 CODE = 'Figure'
46 46 colormap = 'jro'
47 47 bgcolor = 'white'
48 48 CONFLATE = False
49 49 __missing = 1E30
50 50
51 51 __attrs__ = ['show', 'save', 'xmin', 'xmax', 'ymin', 'ymax', 'zmin', 'zmax',
52 52 'zlimits', 'xlabel', 'ylabel', 'xaxis','cb_label', 'title',
53 53 'colorbar', 'bgcolor', 'width', 'height', 'localtime', 'oneFigure',
54 54 'showprofile', 'decimation', 'ftp']
55 55
56 56 def __init__(self, **kwargs):
57 57
58 58 Operation.__init__(self, plot=True, **kwargs)
59 59 Process.__init__(self)
60 60
61 61 self.kwargs['code'] = self.CODE
62 62 self.mp = False
63 63 self.data = None
64 64 self.isConfig = False
65 65 self.figures = []
66 66 self.axes = []
67 67 self.cb_axes = []
68 68 self.localtime = kwargs.pop('localtime', True)
69 69 self.show = kwargs.get('show', True)
70 70 self.save = kwargs.get('save', False)
71 71 self.ftp = kwargs.get('ftp', False)
72 72 self.colormap = kwargs.get('colormap', self.colormap)
73 73 self.colormap_coh = kwargs.get('colormap_coh', 'jet')
74 74 self.colormap_phase = kwargs.get('colormap_phase', 'RdBu_r')
75 75 self.colormaps = kwargs.get('colormaps', None)
76 76 self.bgcolor = kwargs.get('bgcolor', self.bgcolor)
77 77 self.showprofile = kwargs.get('showprofile', False)
78 78 self.title = kwargs.get('wintitle', self.CODE.upper())
79 79 self.cb_label = kwargs.get('cb_label', None)
80 80 self.cb_labels = kwargs.get('cb_labels', None)
81 81 self.labels = kwargs.get('labels', None)
82 82 self.xaxis = kwargs.get('xaxis', 'frequency')
83 83 self.zmin = kwargs.get('zmin', None)
84 84 self.zmax = kwargs.get('zmax', None)
85 85 self.zlimits = kwargs.get('zlimits', None)
86 86 self.xmin = kwargs.get('xmin', None)
87 87 self.xmax = kwargs.get('xmax', None)
88 88 self.xrange = kwargs.get('xrange', 24)
89 89 self.xscale = kwargs.get('xscale', None)
90 90 self.ymin = kwargs.get('ymin', None)
91 91 self.ymax = kwargs.get('ymax', None)
92 92 self.yscale = kwargs.get('yscale', None)
93 93 self.xlabel = kwargs.get('xlabel', None)
94 94 self.decimation = kwargs.get('decimation', None)
95 95 self.showSNR = kwargs.get('showSNR', False)
96 96 self.oneFigure = kwargs.get('oneFigure', True)
97 97 self.width = kwargs.get('width', None)
98 98 self.height = kwargs.get('height', None)
99 99 self.colorbar = kwargs.get('colorbar', True)
100 100 self.factors = kwargs.get('factors', [1, 1, 1, 1, 1, 1, 1, 1])
101 101 self.channels = kwargs.get('channels', None)
102 102 self.titles = kwargs.get('titles', [])
103 103 self.polar = False
104 104
105 105 def __fmtTime(self, x, pos):
106 106 '''
107 107 '''
108 108
109 109 return '{}'.format(self.getDateTime(x).strftime('%H:%M'))
110 110
111 111 def __setup(self):
112 112 '''
113 113 Common setup for all figures, here figures and axes are created
114 114 '''
115 115
116 116 if self.CODE not in self.data:
117 117 raise ValueError(log.error('Missing data for {}'.format(self.CODE),
118 118 self.name))
119 119
120 120 self.setup()
121 121
122 122 self.time_label = 'LT' if self.localtime else 'UTC'
123 123 if self.data.localtime:
124 124 self.getDateTime = datetime.datetime.fromtimestamp
125 125 else:
126 126 self.getDateTime = datetime.datetime.utcfromtimestamp
127 127
128 128 if self.width is None:
129 129 self.width = 8
130 130
131 131 self.figures = []
132 132 self.axes = []
133 133 self.cb_axes = []
134 134 self.pf_axes = []
135 135 self.cmaps = []
136 136
137 137 size = '15%' if self.ncols == 1 else '30%'
138 138 pad = '4%' if self.ncols == 1 else '8%'
139 139
140 140 if self.oneFigure:
141 141 if self.height is None:
142 142 self.height = 1.4 * self.nrows + 1
143 143 fig = plt.figure(figsize=(self.width, self.height),
144 144 edgecolor='k',
145 145 facecolor='w')
146 146 self.figures.append(fig)
147 147 for n in range(self.nplots):
148 148 ax = fig.add_subplot(self.nrows, self.ncols,
149 149 n + 1, polar=self.polar)
150 150 ax.tick_params(labelsize=8)
151 151 ax.firsttime = True
152 152 ax.index = 0
153 153 ax.press = None
154 154 self.axes.append(ax)
155 155 if self.showprofile:
156 156 cax = self.__add_axes(ax, size=size, pad=pad)
157 157 cax.tick_params(labelsize=8)
158 158 self.pf_axes.append(cax)
159 159 else:
160 160 if self.height is None:
161 161 self.height = 3
162 162 for n in range(self.nplots):
163 163 fig = plt.figure(figsize=(self.width, self.height),
164 164 edgecolor='k',
165 165 facecolor='w')
166 166 ax = fig.add_subplot(1, 1, 1, polar=self.polar)
167 167 ax.tick_params(labelsize=8)
168 168 ax.firsttime = True
169 169 ax.index = 0
170 170 ax.press = None
171 171 self.figures.append(fig)
172 172 self.axes.append(ax)
173 173 if self.showprofile:
174 174 cax = self.__add_axes(ax, size=size, pad=pad)
175 175 cax.tick_params(labelsize=8)
176 176 self.pf_axes.append(cax)
177 177
178 178 for n in range(self.nrows):
179 179 if self.colormaps is not None:
180 180 cmap = plt.get_cmap(self.colormaps[n])
181 181 else:
182 182 cmap = plt.get_cmap(self.colormap)
183 183 cmap.set_bad(self.bgcolor, 1.)
184 184 self.cmaps.append(cmap)
185 185
186 186 for fig in self.figures:
187 187 fig.canvas.mpl_connect('key_press_event', self.OnKeyPress)
188 188 fig.canvas.mpl_connect('scroll_event', self.OnBtnScroll)
189 189 fig.canvas.mpl_connect('button_press_event', self.onBtnPress)
190 190 fig.canvas.mpl_connect('motion_notify_event', self.onMotion)
191 191 fig.canvas.mpl_connect('button_release_event', self.onBtnRelease)
192 192 if self.show:
193 193 fig.show()
194 194
195 195 def OnKeyPress(self, event):
196 196 '''
197 197 Event for pressing keys (up, down) change colormap
198 198 '''
199 199 ax = event.inaxes
200 200 if ax in self.axes:
201 201 if event.key == 'down':
202 202 ax.index += 1
203 203 elif event.key == 'up':
204 204 ax.index -= 1
205 205 if ax.index < 0:
206 206 ax.index = len(CMAPS) - 1
207 207 elif ax.index == len(CMAPS):
208 208 ax.index = 0
209 209 cmap = CMAPS[ax.index]
210 210 ax.cbar.set_cmap(cmap)
211 211 ax.cbar.draw_all()
212 212 ax.plt.set_cmap(cmap)
213 213 ax.cbar.patch.figure.canvas.draw()
214 214 self.colormap = cmap.name
215 215
216 216 def OnBtnScroll(self, event):
217 217 '''
218 218 Event for scrolling, scale figure
219 219 '''
220 220 cb_ax = event.inaxes
221 221 if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]:
222 222 ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0]
223 223 pt = ax.cbar.ax.bbox.get_points()[:, 1]
224 224 nrm = ax.cbar.norm
225 225 vmin, vmax, p0, p1, pS = (
226 226 nrm.vmin, nrm.vmax, pt[0], pt[1], event.y)
227 227 scale = 2 if event.step == 1 else 0.5
228 228 point = vmin + (vmax - vmin) / (p1 - p0) * (pS - p0)
229 229 ax.cbar.norm.vmin = point - scale * (point - vmin)
230 230 ax.cbar.norm.vmax = point - scale * (point - vmax)
231 231 ax.plt.set_norm(ax.cbar.norm)
232 232 ax.cbar.draw_all()
233 233 ax.cbar.patch.figure.canvas.draw()
234 234
235 235 def onBtnPress(self, event):
236 236 '''
237 237 Event for mouse button press
238 238 '''
239 239 cb_ax = event.inaxes
240 240 if cb_ax is None:
241 241 return
242 242
243 243 if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]:
244 244 cb_ax.press = event.x, event.y
245 245 else:
246 246 cb_ax.press = None
247 247
248 248 def onMotion(self, event):
249 249 '''
250 250 Event for move inside colorbar
251 251 '''
252 252 cb_ax = event.inaxes
253 253 if cb_ax is None:
254 254 return
255 255 if cb_ax not in [ax.cbar.ax for ax in self.axes if ax.cbar]:
256 256 return
257 257 if cb_ax.press is None:
258 258 return
259 259
260 260 ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0]
261 261 xprev, yprev = cb_ax.press
262 262 dx = event.x - xprev
263 263 dy = event.y - yprev
264 264 cb_ax.press = event.x, event.y
265 265 scale = ax.cbar.norm.vmax - ax.cbar.norm.vmin
266 266 perc = 0.03
267 267
268 268 if event.button == 1:
269 269 ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy)
270 270 ax.cbar.norm.vmax -= (perc * scale) * numpy.sign(dy)
271 271 elif event.button == 3:
272 272 ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy)
273 273 ax.cbar.norm.vmax += (perc * scale) * numpy.sign(dy)
274 274
275 275 ax.cbar.draw_all()
276 276 ax.plt.set_norm(ax.cbar.norm)
277 277 ax.cbar.patch.figure.canvas.draw()
278 278
279 279 def onBtnRelease(self, event):
280 280 '''
281 281 Event for mouse button release
282 282 '''
283 283 cb_ax = event.inaxes
284 284 if cb_ax is not None:
285 285 cb_ax.press = None
286 286
287 287 def __add_axes(self, ax, size='30%', pad='8%'):
288 288 '''
289 289 Add new axes to the given figure
290 290 '''
291 291 divider = make_axes_locatable(ax)
292 292 nax = divider.new_horizontal(size=size, pad=pad)
293 293 ax.figure.add_axes(nax)
294 294 return nax
295 295
296 296 self.setup()
297 297
298 298 def setup(self):
299 299 '''
300 300 This method should be implemented in the child class, the following
301 301 attributes should be set:
302 302
303 303 self.nrows: number of rows
304 304 self.ncols: number of cols
305 305 self.nplots: number of plots (channels or pairs)
306 306 self.ylabel: label for Y axes
307 307 self.titles: list of axes title
308 308
309 309 '''
310 310 raise(NotImplementedError, 'Implement this method in child class')
311 311
312 312 def fill_gaps(self, x_buffer, y_buffer, z_buffer):
313 313 '''
314 314 Create a masked array for missing data
315 315 '''
316 316 if x_buffer.shape[0] < 2:
317 317 return x_buffer, y_buffer, z_buffer
318 318
319 319 deltas = x_buffer[1:] - x_buffer[0:-1]
320 320 x_median = numpy.median(deltas)
321 321
322 322 index = numpy.where(deltas > 5 * x_median)
323 323
324 324 if len(index[0]) != 0:
325 325 z_buffer[::, index[0], ::] = self.__missing
326 326 z_buffer = numpy.ma.masked_inside(z_buffer,
327 327 0.99 * self.__missing,
328 328 1.01 * self.__missing)
329 329
330 330 return x_buffer, y_buffer, z_buffer
331 331
332 332 def decimate(self):
333 333
334 334 # dx = int(len(self.x)/self.__MAXNUMX) + 1
335 335 dy = int(len(self.y) / self.decimation) + 1
336 336
337 337 # x = self.x[::dx]
338 338 x = self.x
339 339 y = self.y[::dy]
340 340 z = self.z[::, ::, ::dy]
341 341
342 342 return x, y, z
343 343
344 344 def format(self):
345 345 '''
346 346 Set min and max values, labels, ticks and titles
347 347 '''
348 348
349 349 if self.xmin is None:
350 350 xmin = self.min_time
351 351 else:
352 352 if self.xaxis is 'time':
353 353 dt = self.getDateTime(self.min_time)
354 354 xmin = (dt.replace(hour=int(self.xmin), minute=0, second=0) -
355 355 datetime.datetime(1970, 1, 1)).total_seconds()
356 356 if self.data.localtime:
357 357 xmin += time.timezone
358 358 else:
359 359 xmin = self.xmin
360 360
361 361 if self.xmax is None:
362 362 xmax = xmin + self.xrange * 60 * 60
363 363 else:
364 364 if self.xaxis is 'time':
365 365 dt = self.getDateTime(self.max_time)
366 366 xmax = (dt.replace(hour=int(self.xmax), minute=59, second=59) -
367 367 datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=1)).total_seconds()
368 368 if self.data.localtime:
369 369 xmax += time.timezone
370 370 else:
371 371 xmax = self.xmax
372 372
373 373 ymin = self.ymin if self.ymin else numpy.nanmin(self.y)
374 374 ymax = self.ymax if self.ymax else numpy.nanmax(self.y)
375 375
376 376 Y = numpy.array([5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000])
377 377 i = 1 if numpy.where(abs(ymax-ymin) <= Y)[0][0] < 0 else numpy.where(abs(ymax-ymin) <= Y)[0][0]
378 378 ystep = Y[i] / 5.
379 379
380 380 for n, ax in enumerate(self.axes):
381 381 if ax.firsttime:
382 382 ax.set_facecolor(self.bgcolor)
383 383 ax.yaxis.set_major_locator(MultipleLocator(ystep))
384 384 ax.xaxis.set_major_locator(MultipleLocator(ystep))
385 385 if self.xscale:
386 386 ax.xaxis.set_major_formatter(FuncFormatter(lambda x, pos: '{0:g}'.format(x*self.xscale)))
387 387 if self.xscale:
388 388 ax.yaxis.set_major_formatter(FuncFormatter(lambda x, pos: '{0:g}'.format(x*self.yscale)))
389 389 if self.xaxis is 'time':
390 390 ax.xaxis.set_major_formatter(FuncFormatter(self.__fmtTime))
391 391 ax.xaxis.set_major_locator(LinearLocator(9))
392 392 if self.xlabel is not None:
393 393 ax.set_xlabel(self.xlabel)
394 394 ax.set_ylabel(self.ylabel)
395 395 ax.firsttime = False
396 396 if self.showprofile:
397 397 self.pf_axes[n].set_ylim(ymin, ymax)
398 398 self.pf_axes[n].set_xlim(self.zmin, self.zmax)
399 399 self.pf_axes[n].set_xlabel('dB')
400 400 self.pf_axes[n].grid(b=True, axis='x')
401 401 [tick.set_visible(False)
402 402 for tick in self.pf_axes[n].get_yticklabels()]
403 403 if self.colorbar:
404 404 ax.cbar = plt.colorbar(
405 405 ax.plt, ax=ax, fraction=0.05, pad=0.02, aspect=10)
406 406 ax.cbar.ax.tick_params(labelsize=8)
407 407 ax.cbar.ax.press = None
408 408 if self.cb_label:
409 409 ax.cbar.set_label(self.cb_label, size=8)
410 410 elif self.cb_labels:
411 411 ax.cbar.set_label(self.cb_labels[n], size=8)
412 412 else:
413 413 ax.cbar = None
414 414
415 415 if not self.polar:
416 416 ax.set_xlim(xmin, xmax)
417 417 ax.set_ylim(ymin, ymax)
418 418 ax.set_title('{} - {} {}'.format(
419 419 self.titles[n],
420 420 self.getDateTime(self.max_time).strftime('%H:%M:%S'),
421 421 self.time_label),
422 422 size=8)
423 423 else:
424 424 ax.set_title('{}'.format(self.titles[n]), size=8)
425 425 ax.set_ylim(0, 90)
426 426 ax.set_yticks(numpy.arange(0, 90, 20))
427 427 ax.yaxis.labelpad = 40
428 428
429 429 def __plot(self):
430 430 '''
431 431 '''
432 432 log.log('Plotting', self.name)
433 433
434 434 try:
435 435 self.plot()
436 436 self.format()
437 437 except:
438 438 log.warning('{} Plot could not be updated... check data'.format(self.CODE), self.name)
439 439
440 440 for n, fig in enumerate(self.figures):
441 441 if self.nrows == 0 or self.nplots == 0:
442 442 log.warning('No data', self.name)
443 443 fig.text(0.5, 0.5, 'No Data', fontsize='large', ha='center')
444 444 fig.canvas.manager.set_window_title(self.CODE)
445 445 continue
446 446
447 447 fig.tight_layout()
448 448 fig.canvas.manager.set_window_title('{} - {}'.format(self.title,
449 449 self.getDateTime(self.max_time).strftime('%Y/%m/%d')))
450 450 fig.canvas.draw()
451 451
452 452 if self.save and (self.data.ended or not self.data.buffering):
453 453
454 454 if self.labels:
455 455 labels = self.labels
456 456 else:
457 457 labels = range(self.nrows)
458 458
459 459 if self.oneFigure:
460 460 label = ''
461 461 else:
462 462 label = '-{}'.format(labels[n])
463 463 figname = os.path.join(
464 464 self.save,
465 465 self.CODE,
466 466 '{}{}_{}.png'.format(
467 467 self.CODE,
468 468 label,
469 469 self.getDateTime(self.saveTime).strftime(
470 470 '%Y%m%d_%H%M%S'),
471 471 )
472 472 )
473 473 log.log('Saving figure: {}'.format(figname), self.name)
474 474 if not os.path.isdir(os.path.dirname(figname)):
475 475 os.makedirs(os.path.dirname(figname))
476 476 fig.savefig(figname)
477 477
478 478 def plot(self):
479 479 '''
480 480 '''
481 481 raise(NotImplementedError, 'Implement this method in child class')
482 482
483 483 def run(self):
484 484
485 485 log.log('Starting', self.name)
486 486
487 487 context = zmq.Context()
488 488 receiver = context.socket(zmq.SUB)
489 489 receiver.setsockopt(zmq.SUBSCRIBE, '')
490 490 receiver.setsockopt(zmq.CONFLATE, self.CONFLATE)
491 491
492 492 if 'server' in self.kwargs['parent']:
493 493 receiver.connect(
494 494 'ipc:///tmp/{}.plots'.format(self.kwargs['parent']['server']))
495 495 else:
496 496 receiver.connect("ipc:///tmp/zmq.plots")
497 497
498 498 while True:
499 499 try:
500 500 self.data = receiver.recv_pyobj(flags=zmq.NOBLOCK)
501 501 if self.data.localtime and self.localtime:
502 502 self.times = self.data.times
503 503 elif self.data.localtime and not self.localtime:
504 504 self.times = self.data.times + time.timezone
505 505 elif not self.data.localtime and self.localtime:
506 506 self.times = self.data.times - time.timezone
507 507 else:
508 508 self.times = self.data.times
509 509
510 510 self.min_time = self.times[0]
511 511 self.max_time = self.times[-1]
512 512
513 513 if self.isConfig is False:
514 514 self.__setup()
515 515 self.isConfig = True
516 516
517 517 self.__plot()
518 518
519 519 except zmq.Again as e:
520 log.log('.', tag='', nl=False)
520 # log.log('.', tag='', nl=False)
521 521 if self.data:
522 522 figpause(self.data.throttle)
523 523 else:
524 524 time.sleep(2)
525 525
526 526 def close(self):
527 527 if self.data:
528 528 self.__plot()
529 529
530 530
531 531 class PlotSpectraData(PlotData):
532 532 '''
533 533 Plot for Spectra data
534 534 '''
535 535
536 536 CODE = 'spc'
537 537 colormap = 'jro'
538 538
539 539 def setup(self):
540 540 self.nplots = len(self.data.channels)
541 541 self.ncols = int(numpy.sqrt(self.nplots) + 0.9)
542 542 self.nrows = int((1.0 * self.nplots / self.ncols) + 0.9)
543 543 self.width = 3.4 * self.ncols
544 544 self.height = 3 * self.nrows
545 545 self.cb_label = 'dB'
546 546 if self.showprofile:
547 547 self.width += 0.8 * self.ncols
548 548
549 549 self.ylabel = 'Range [km]'
550 550
551 551 def plot(self):
552 552 if self.xaxis == "frequency":
553 553 x = self.data.xrange[0]
554 554 self.xlabel = "Frequency (kHz)"
555 555 elif self.xaxis == "time":
556 556 x = self.data.xrange[1]
557 557 self.xlabel = "Time (ms)"
558 558 else:
559 559 x = self.data.xrange[2]
560 560 self.xlabel = "Velocity (m/s)"
561 561
562 562 if self.CODE == 'spc_mean':
563 563 x = self.data.xrange[2]
564 564 self.xlabel = "Velocity (m/s)"
565 565
566 566 self.titles = []
567 567
568 568 y = self.data.heights
569 569 self.y = y
570 570 z = self.data['spc']
571 571
572 572 for n, ax in enumerate(self.axes):
573 573 noise = self.data['noise'][n][-1]
574 574 if self.CODE == 'spc_mean':
575 575 mean = self.data['mean'][n][-1]
576 576 if ax.firsttime:
577 577 self.xmax = self.xmax if self.xmax else numpy.nanmax(x)
578 578 self.xmin = self.xmin if self.xmin else -self.xmax
579 579 self.zmin = self.zmin if self.zmin else numpy.nanmin(z)
580 580 self.zmax = self.zmax if self.zmax else numpy.nanmax(z)
581 581 ax.plt = ax.pcolormesh(x, y, z[n].T,
582 582 vmin=self.zmin,
583 583 vmax=self.zmax,
584 584 cmap=plt.get_cmap(self.colormap)
585 585 )
586 586
587 587 if self.showprofile:
588 588 ax.plt_profile = self.pf_axes[n].plot(
589 589 self.data['rti'][n][-1], y)[0]
590 590 ax.plt_noise = self.pf_axes[n].plot(numpy.repeat(noise, len(y)), y,
591 591 color="k", linestyle="dashed", lw=1)[0]
592 592 if self.CODE == 'spc_mean':
593 593 ax.plt_mean = ax.plot(mean, y, color='k')[0]
594 594 else:
595 595 ax.plt.set_array(z[n].T.ravel())
596 596 if self.showprofile:
597 597 ax.plt_profile.set_data(self.data['rti'][n][-1], y)
598 598 ax.plt_noise.set_data(numpy.repeat(noise, len(y)), y)
599 599 if self.CODE == 'spc_mean':
600 600 ax.plt_mean.set_data(mean, y)
601 601
602 602 self.titles.append('CH {}: {:3.2f}dB'.format(n, noise))
603 603 self.saveTime = self.max_time
604 604
605 605
606 606 class PlotCrossSpectraData(PlotData):
607 607
608 608 CODE = 'cspc'
609 609 zmin_coh = None
610 610 zmax_coh = None
611 611 zmin_phase = None
612 612 zmax_phase = None
613 613
614 614 def setup(self):
615 615
616 616 self.ncols = 4
617 617 self.nrows = len(self.data.pairs)
618 618 self.nplots = self.nrows * 4
619 619 self.width = 3.4 * self.ncols
620 620 self.height = 3 * self.nrows
621 621 self.ylabel = 'Range [km]'
622 622 self.showprofile = False
623 623
624 624 def plot(self):
625 625
626 626 if self.xaxis == "frequency":
627 627 x = self.data.xrange[0]
628 628 self.xlabel = "Frequency (kHz)"
629 629 elif self.xaxis == "time":
630 630 x = self.data.xrange[1]
631 631 self.xlabel = "Time (ms)"
632 632 else:
633 633 x = self.data.xrange[2]
634 634 self.xlabel = "Velocity (m/s)"
635 635
636 636 self.titles = []
637 637
638 638 y = self.data.heights
639 639 self.y = y
640 640 spc = self.data['spc']
641 641 cspc = self.data['cspc']
642 642
643 643 for n in range(self.nrows):
644 644 noise = self.data['noise'][n][-1]
645 645 pair = self.data.pairs[n]
646 646 ax = self.axes[4 * n]
647 647 ax3 = self.axes[4 * n + 3]
648 648 if ax.firsttime:
649 649 self.xmax = self.xmax if self.xmax else numpy.nanmax(x)
650 650 self.xmin = self.xmin if self.xmin else -self.xmax
651 651 self.zmin = self.zmin if self.zmin else numpy.nanmin(spc)
652 652 self.zmax = self.zmax if self.zmax else numpy.nanmax(spc)
653 653 ax.plt = ax.pcolormesh(x, y, spc[pair[0]].T,
654 654 vmin=self.zmin,
655 655 vmax=self.zmax,
656 656 cmap=plt.get_cmap(self.colormap)
657 657 )
658 658 else:
659 659 ax.plt.set_array(spc[pair[0]].T.ravel())
660 660 self.titles.append('CH {}: {:3.2f}dB'.format(n, noise))
661 661
662 662 ax = self.axes[4 * n + 1]
663 663 if ax.firsttime:
664 664 ax.plt = ax.pcolormesh(x, y, spc[pair[1]].T,
665 665 vmin=self.zmin,
666 666 vmax=self.zmax,
667 667 cmap=plt.get_cmap(self.colormap)
668 668 )
669 669 else:
670 670 ax.plt.set_array(spc[pair[1]].T.ravel())
671 671 self.titles.append('CH {}: {:3.2f}dB'.format(n, noise))
672 672
673 673 out = cspc[n] / numpy.sqrt(spc[pair[0]] * spc[pair[1]])
674 674 coh = numpy.abs(out)
675 675 phase = numpy.arctan2(out.imag, out.real) * 180 / numpy.pi
676 676
677 677 ax = self.axes[4 * n + 2]
678 678 if ax.firsttime:
679 679 ax.plt = ax.pcolormesh(x, y, coh.T,
680 680 vmin=0,
681 681 vmax=1,
682 682 cmap=plt.get_cmap(self.colormap_coh)
683 683 )
684 684 else:
685 685 ax.plt.set_array(coh.T.ravel())
686 686 self.titles.append(
687 687 'Coherence Ch{} * Ch{}'.format(pair[0], pair[1]))
688 688
689 689 ax = self.axes[4 * n + 3]
690 690 if ax.firsttime:
691 691 ax.plt = ax.pcolormesh(x, y, phase.T,
692 692 vmin=-180,
693 693 vmax=180,
694 694 cmap=plt.get_cmap(self.colormap_phase)
695 695 )
696 696 else:
697 697 ax.plt.set_array(phase.T.ravel())
698 698 self.titles.append('Phase CH{} * CH{}'.format(pair[0], pair[1]))
699 699
700 700 self.saveTime = self.max_time
701 701
702 702
703 703 class PlotSpectraMeanData(PlotSpectraData):
704 704 '''
705 705 Plot for Spectra and Mean
706 706 '''
707 707 CODE = 'spc_mean'
708 708 colormap = 'jro'
709 709
710 710
711 711 class PlotRTIData(PlotData):
712 712 '''
713 713 Plot for RTI data
714 714 '''
715 715
716 716 CODE = 'rti'
717 717 colormap = 'jro'
718 718
719 719 def setup(self):
720 720 self.xaxis = 'time'
721 721 self.ncols = 1
722 722 self.nrows = len(self.data.channels)
723 723 self.nplots = len(self.data.channels)
724 724 self.ylabel = 'Range [km]'
725 725 self.cb_label = 'dB'
726 726 self.titles = ['{} Channel {}'.format(
727 727 self.CODE.upper(), x) for x in range(self.nrows)]
728 728
729 729 def plot(self):
730 730 self.x = self.times
731 731 self.y = self.data.heights
732 732 self.z = self.data[self.CODE]
733 733 self.z = numpy.ma.masked_invalid(self.z)
734 734
735 735 if self.decimation is None:
736 736 x, y, z = self.fill_gaps(self.x, self.y, self.z)
737 737 else:
738 738 x, y, z = self.fill_gaps(*self.decimate())
739 739
740 740 for n, ax in enumerate(self.axes):
741 741 self.zmin = self.zmin if self.zmin else numpy.min(self.z)
742 742 self.zmax = self.zmax if self.zmax else numpy.max(self.z)
743 743 if ax.firsttime:
744 744 ax.plt = ax.pcolormesh(x, y, z[n].T,
745 745 vmin=self.zmin,
746 746 vmax=self.zmax,
747 747 cmap=plt.get_cmap(self.colormap)
748 748 )
749 749 if self.showprofile:
750 750 ax.plot_profile = self.pf_axes[n].plot(
751 751 self.data['rti'][n][-1], self.y)[0]
752 752 ax.plot_noise = self.pf_axes[n].plot(numpy.repeat(self.data['noise'][n][-1], len(self.y)), self.y,
753 753 color="k", linestyle="dashed", lw=1)[0]
754 754 else:
755 755 ax.collections.remove(ax.collections[0])
756 756 ax.plt = ax.pcolormesh(x, y, z[n].T,
757 757 vmin=self.zmin,
758 758 vmax=self.zmax,
759 759 cmap=plt.get_cmap(self.colormap)
760 760 )
761 761 if self.showprofile:
762 762 ax.plot_profile.set_data(self.data['rti'][n][-1], self.y)
763 763 ax.plot_noise.set_data(numpy.repeat(
764 764 self.data['noise'][n][-1], len(self.y)), self.y)
765 765
766 766 self.saveTime = self.min_time
767 767
768 768
769 769 class PlotCOHData(PlotRTIData):
770 770 '''
771 771 Plot for Coherence data
772 772 '''
773 773
774 774 CODE = 'coh'
775 775
776 776 def setup(self):
777 777 self.xaxis = 'time'
778 778 self.ncols = 1
779 779 self.nrows = len(self.data.pairs)
780 780 self.nplots = len(self.data.pairs)
781 781 self.ylabel = 'Range [km]'
782 782 if self.CODE == 'coh':
783 783 self.cb_label = ''
784 784 self.titles = [
785 785 'Coherence Map Ch{} * Ch{}'.format(x[0], x[1]) for x in self.data.pairs]
786 786 else:
787 787 self.cb_label = 'Degrees'
788 788 self.titles = [
789 789 'Phase Map Ch{} * Ch{}'.format(x[0], x[1]) for x in self.data.pairs]
790 790
791 791
792 792 class PlotPHASEData(PlotCOHData):
793 793 '''
794 794 Plot for Phase map data
795 795 '''
796 796
797 797 CODE = 'phase'
798 798 colormap = 'seismic'
799 799
800 800
801 801 class PlotNoiseData(PlotData):
802 802 '''
803 803 Plot for noise
804 804 '''
805 805
806 806 CODE = 'noise'
807 807
808 808 def setup(self):
809 809 self.xaxis = 'time'
810 810 self.ncols = 1
811 811 self.nrows = 1
812 812 self.nplots = 1
813 813 self.ylabel = 'Intensity [dB]'
814 814 self.titles = ['Noise']
815 815 self.colorbar = False
816 816
817 817 def plot(self):
818 818
819 819 x = self.times
820 820 xmin = self.min_time
821 821 xmax = xmin + self.xrange * 60 * 60
822 822 Y = self.data[self.CODE]
823 823
824 824 if self.axes[0].firsttime:
825 825 for ch in self.data.channels:
826 826 y = Y[ch]
827 827 self.axes[0].plot(x, y, lw=1, label='Ch{}'.format(ch))
828 828 plt.legend()
829 829 else:
830 830 for ch in self.data.channels:
831 831 y = Y[ch]
832 832 self.axes[0].lines[ch].set_data(x, y)
833 833
834 834 self.ymin = numpy.nanmin(Y) - 5
835 835 self.ymax = numpy.nanmax(Y) + 5
836 836 self.saveTime = self.min_time
837 837
838 838
839 839 class PlotSNRData(PlotRTIData):
840 840 '''
841 841 Plot for SNR Data
842 842 '''
843 843
844 844 CODE = 'snr'
845 845 colormap = 'jet'
846 846
847 847
848 848 class PlotDOPData(PlotRTIData):
849 849 '''
850 850 Plot for DOPPLER Data
851 851 '''
852 852
853 853 CODE = 'dop'
854 854 colormap = 'jet'
855 855
856 856
857 857 class PlotSkyMapData(PlotData):
858 858 '''
859 859 Plot for meteors detection data
860 860 '''
861 861
862 862 CODE = 'param'
863 863
864 864 def setup(self):
865 865
866 866 self.ncols = 1
867 867 self.nrows = 1
868 868 self.width = 7.2
869 869 self.height = 7.2
870 870 self.nplots = 1
871 871 self.xlabel = 'Zonal Zenith Angle (deg)'
872 872 self.ylabel = 'Meridional Zenith Angle (deg)'
873 873 self.polar = True
874 874 self.ymin = -180
875 875 self.ymax = 180
876 876 self.colorbar = False
877 877
878 878 def plot(self):
879 879
880 880 arrayParameters = numpy.concatenate(self.data['param'])
881 881 error = arrayParameters[:, -1]
882 882 indValid = numpy.where(error == 0)[0]
883 883 finalMeteor = arrayParameters[indValid, :]
884 884 finalAzimuth = finalMeteor[:, 3]
885 885 finalZenith = finalMeteor[:, 4]
886 886
887 887 x = finalAzimuth * numpy.pi / 180
888 888 y = finalZenith
889 889
890 890 ax = self.axes[0]
891 891
892 892 if ax.firsttime:
893 893 ax.plot = ax.plot(x, y, 'bo', markersize=5)[0]
894 894 else:
895 895 ax.plot.set_data(x, y)
896 896
897 897 dt1 = self.getDateTime(self.min_time).strftime('%y/%m/%d %H:%M:%S')
898 898 dt2 = self.getDateTime(self.max_time).strftime('%y/%m/%d %H:%M:%S')
899 899 title = 'Meteor Detection Sky Map\n %s - %s \n Number of events: %5.0f\n' % (dt1,
900 900 dt2,
901 901 len(x))
902 902 self.titles[0] = title
903 903 self.saveTime = self.max_time
904 904
905 905
906 906 class PlotParamData(PlotRTIData):
907 907 '''
908 908 Plot for data_param object
909 909 '''
910 910
911 911 CODE = 'param'
912 912 colormap = 'seismic'
913 913
914 914 def setup(self):
915 915 self.xaxis = 'time'
916 916 self.ncols = 1
917 917 self.nrows = self.data.shape(self.CODE)[0]
918 918 self.nplots = self.nrows
919 919 if self.showSNR:
920 920 self.nrows += 1
921 921 self.nplots += 1
922 922
923 923 self.ylabel = 'Height [km]'
924 924 if not self.titles:
925 925 self.titles = self.data.parameters \
926 926 if self.data.parameters else ['Param {}'.format(x) for x in xrange(self.nrows)]
927 927 if self.showSNR:
928 928 self.titles.append('SNR')
929 929
930 930 def plot(self):
931 931 self.data.normalize_heights()
932 932 self.x = self.times
933 933 self.y = self.data.heights
934 934 if self.showSNR:
935 935 self.z = numpy.concatenate(
936 936 (self.data[self.CODE], self.data['snr'])
937 937 )
938 938 else:
939 939 self.z = self.data[self.CODE]
940 940
941 941 self.z = numpy.ma.masked_invalid(self.z)
942 942
943 943 if self.decimation is None:
944 944 x, y, z = self.fill_gaps(self.x, self.y, self.z)
945 945 else:
946 946 x, y, z = self.fill_gaps(*self.decimate())
947 947
948 948 for n, ax in enumerate(self.axes):
949 949
950 950 self.zmax = self.zmax if self.zmax is not None else numpy.max(
951 951 self.z[n])
952 952 self.zmin = self.zmin if self.zmin is not None else numpy.min(
953 953 self.z[n])
954 954
955 955 if ax.firsttime:
956 956 if self.zlimits is not None:
957 957 self.zmin, self.zmax = self.zlimits[n]
958 958
959 959 ax.plt = ax.pcolormesh(x, y, z[n].T * self.factors[n],
960 960 vmin=self.zmin,
961 961 vmax=self.zmax,
962 962 cmap=self.cmaps[n]
963 963 )
964 964 else:
965 965 if self.zlimits is not None:
966 966 self.zmin, self.zmax = self.zlimits[n]
967 967 ax.collections.remove(ax.collections[0])
968 968 ax.plt = ax.pcolormesh(x, y, z[n].T * self.factors[n],
969 969 vmin=self.zmin,
970 970 vmax=self.zmax,
971 971 cmap=self.cmaps[n]
972 972 )
973 973
974 974 self.saveTime = self.min_time
975 975
976 976
977 977 class PlotOutputData(PlotParamData):
978 978 '''
979 979 Plot data_output object
980 980 '''
981 981
982 982 CODE = 'output'
983 983 colormap = 'seismic'
984 984
985 985
986 986 class PlotPolarMapData(PlotData):
987 987 '''
988 988 Plot for meteors detection data
989 989 '''
990 990
991 991 CODE = 'param'
992 992 colormap = 'seismic'
993 993
994 994 def setup(self):
995 995 self.ncols = 1
996 996 self.nrows = 1
997 997 self.width = 9
998 998 self.height = 8
999 999 if self.channels is not None:
1000 1000 self.nplots = len(self.channels)
1001 1001 self.nrows = len(self.channels)
1002 1002 else:
1003 1003 self.nplots = self.data.shape(self.CODE)[0]
1004 1004 self.nrows = self.nplots
1005 1005 self.channels = range(self.nplots)
1006 1006 self.xlabel = 'Zonal Distance (km)'
1007 1007 self.ylabel = 'Meridional Distance (km)'
1008 1008 self.bgcolor = 'white'
1009 1009
1010 1010 def plot(self):
1011 1011
1012 1012 for n, ax in enumerate(self.axes):
1013 1013 data = self.data['param'][self.channels[n]]
1014 1014
1015 1015 zeniths = numpy.arange(data.shape[1])
1016 1016 azimuths = -numpy.radians(self.data.heights)+numpy.pi/2
1017 1017 self.y = zeniths
1018 1018
1019 1019 r, theta = numpy.meshgrid(zeniths, azimuths)
1020 1020 x, y = r*numpy.cos(theta), r*numpy.sin(theta)
1021 1021
1022 1022 if ax.firsttime:
1023 1023 if self.zlimits is not None:
1024 1024 self.zmin, self.zmax = self.zlimits[n]
1025 1025 ax.plt = ax.pcolormesh(x, y, numpy.ma.array(data, mask=numpy.isnan(data)),
1026 1026 vmin=self.zmin,
1027 1027 vmax=self.zmax,
1028 1028 cmap=self.cmaps[n])
1029 1029 else:
1030 1030 if self.zlimits is not None:
1031 1031 self.zmin, self.zmax = self.zlimits[n]
1032 1032 ax.collections.remove(ax.collections[0])
1033 1033 ax.plt = ax.pcolormesh(x, y, numpy.ma.array(data, mask=numpy.isnan(data)),
1034 1034 vmin=self.zmin,
1035 1035 vmax=self.zmax,
1036 1036 cmap=self.cmaps[n])
1037 1037
1038 1038
1039 1039 title = ''
1040 1040
1041 1041 self.titles = [self.data.parameters[x] for x in self.channels]
1042 1042 self.saveTime = self.max_time
@@ -1,351 +1,354
1 1 '''
2 2 Created on Dec 27, 2017
3 3
4 4 @author: Juan C. Espinoza
5 5 '''
6 6
7 7 import os
8 8 import sys
9 9 import time
10 10 import json
11 11 import glob
12 12 import datetime
13 13 import tarfile
14 14
15 15 import numpy
16 16 from netCDF4 import Dataset
17 17
18 18 from schainpy.model.io.jroIO_base import JRODataReader
19 19 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
20 20 from schainpy.model.data.jrodata import Parameters
21 21 from schainpy.utils import log
22 22
23 23 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
24 24
25 25 def load_json(obj):
26 26 '''
27 27 Parse json as string instead of unicode
28 28 '''
29 29
30 30 if isinstance(obj, str):
31 31 iterable = json.loads(obj)
32 32 else:
33 33 iterable = obj
34 34
35 35 if isinstance(iterable, dict):
36 36 return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, unicode) else v
37 37 for k, v in iterable.items()}
38 38 elif isinstance(iterable, (list, tuple)):
39 39 return [str(v) if isinstance(v, unicode) else v for v in iterable]
40 40
41 41 return iterable
42 42
43 43
44 44 class NCDFReader(JRODataReader, ProcessingUnit):
45 45
46 46 def __init__(self, **kwargs):
47 47
48 48 ProcessingUnit.__init__(self, **kwargs)
49 49
50 50 self.dataOut = Parameters()
51 51 self.counter_records = 0
52 52 self.nrecords = None
53 53 self.flagNoMoreFiles = 0
54 54 self.isConfig = False
55 55 self.filename = None
56 56 self.intervals = set()
57 57 self.ext = ('.nc', '.tgz')
58 58 self.online_mode = False
59 59
60 60 def setup(self,
61 61 path=None,
62 62 startDate=None,
63 63 endDate=None,
64 64 format=None,
65 65 startTime=datetime.time(0, 0, 0),
66 66 endTime=datetime.time(23, 59, 59),
67 67 walk=False,
68 68 **kwargs):
69 69
70 70 self.path = path
71 71 self.startDate = startDate
72 72 self.endDate = endDate
73 73 self.startTime = startTime
74 74 self.endTime = endTime
75 75 self.datatime = datetime.datetime(1900,1,1)
76 76 self.walk = walk
77 77 self.nTries = kwargs.get('nTries', 3)
78 78 self.online = kwargs.get('online', False)
79 79 self.delay = kwargs.get('delay', 30)
80 self.ele = kwargs.get('ext', '')
80 81
81 82 if self.path is None:
82 83 raise ValueError, 'The path is not valid'
83 84
84 85 self.search_files(path, startDate, endDate, startTime, endTime, walk)
85 86 self.cursor = 0
86 87 self.counter_records = 0
87 88
88 89 if not self.files:
89 90 raise Warning, 'There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path)
90 91
91 92 def search_files(self, path, startDate, endDate, startTime, endTime, walk):
92 93 '''
93 94 Searching for NCDF files in path
94 95 Creating a list of files to procces included in [startDate,endDate]
95 96
96 97 Input:
97 98 path - Path to find files
98 99 '''
99 100
100 101 log.log('Searching files {} in {} '.format(self.ext, path), 'NCDFReader')
101 102 if walk:
102 103 paths = [os.path.join(path, p) for p in os.listdir(path) if os.path.isdir(os.path.join(path, p))]
103 104 paths.sort()
104 105 else:
105 106 paths = [path]
106 107
107 108 fileList0 = []
108 109
109 110 for subpath in paths:
110 fileList0 += [os.path.join(subpath, s) for s in glob.glob1(subpath, '*') if os.path.splitext(s)[-1] in self.ext]
111 fileList0 += [os.path.join(subpath, s) for s in glob.glob1(subpath, '*') if os.path.splitext(s)[-1] in self.ext and 'E{}'.format(self.ele) in s]
111 112
112 113 fileList0.sort()
114 if self.online:
115 fileList0 = fileList0[-1:]
113 116
114 117 self.files = {}
115 118
116 119 startDate = startDate - datetime.timedelta(1)
117 120 endDate = endDate + datetime.timedelta(1)
118 121
119 122 for fullname in fileList0:
120 123 thisFile = fullname.split('/')[-1]
121 124 year = thisFile[3:7]
122 125 if not year.isdigit():
123 126 continue
124 127
125 128 month = thisFile[7:9]
126 129 if not month.isdigit():
127 130 continue
128 131
129 132 day = thisFile[9:11]
130 133 if not day.isdigit():
131 134 continue
132 135
133 136 year, month, day = int(year), int(month), int(day)
134 137 dateFile = datetime.date(year, month, day)
135 138 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
136 139
137 140 if (startDate > dateFile) or (endDate < dateFile):
138 141 continue
139 142
140 143 dt = datetime.datetime.combine(dateFile, timeFile)
141 144 if dt not in self.files:
142 145 self.files[dt] = []
143 146 self.files[dt].append(fullname)
144 147
145 148 self.dates = self.files.keys()
146 149 self.dates.sort()
147 150
148 151 return
149 152
150 153 def search_files_online(self):
151 154 '''
152 155 Searching for NCDF files in online mode path
153 156 Creating a list of files to procces included in [startDate,endDate]
154 157
155 158 Input:
156 159 path - Path to find files
157 160 '''
158 161
159 old_files = self.files[self.dt]
160 162 self.files = {}
161 163
162 164 for n in range(self.nTries):
163 165
164 166 if self.walk:
165 167 paths = [os.path.join(self.path, p) for p in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, p))]
166 paths = paths[-2:]
168 path = paths[-1]
167 169 else:
168 paths = [self.path]
170 paths = self.path
169 171
170 new_files = []
171
172 for path in paths:
173 new_files += [os.path.join(path, s) for s in glob.glob1(path, '*') if os.path.splitext(s)[-1] in self.ext and os.path.join(path, s not in old_files)]
172 new_files = [os.path.join(path, s) for s in glob.glob1(path, '*') if os.path.splitext(s)[-1] in self.ext and 'E{}'.format(self.ele) in s]
174 173
175 174 new_files.sort()
176
177 if new_files:
175
176 for fullname in new_files:
177 thisFile = fullname.split('/')[-1]
178 year = thisFile[3:7]
179 if not year.isdigit():
180 continue
181
182 month = thisFile[7:9]
183 if not month.isdigit():
184 continue
185
186 day = thisFile[9:11]
187 if not day.isdigit():
188 continue
189
190 year, month, day = int(year), int(month), int(day)
191 dateFile = datetime.date(year, month, day)
192 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
193
194 dt = datetime.datetime.combine(dateFile, timeFile)
195
196 if self.dt >= dt:
197 continue
198
199 if dt not in self.files:
200 self.dt = dt
201 self.files[dt] = []
202
203 self.files[dt].append(fullname)
204 break
205
206 if self.files:
178 207 break
179 208 else:
180 209 log.warning('Waiting {} seconds for the next file, try {} ...'.format(self.delay, n + 1), 'NCDFReader')
181 210 time.sleep(self.delay)
182 211
183 if not new_files:
184 log.error('No more files found', 'NCDFReader')
212 if not self.files:
185 213 return 0
186 214
187 startDate = self.dt - datetime.timedelta(seconds=1)
188
189 for fullname in new_files:
190 thisFile = fullname.split('/')[-1]
191 year = thisFile[3:7]
192 if not year.isdigit():
193 continue
194
195 month = thisFile[7:9]
196 if not month.isdigit():
197 continue
198
199 day = thisFile[9:11]
200 if not day.isdigit():
201 continue
202
203 year, month, day = int(year), int(month), int(day)
204 dateFile = datetime.date(year, month, day)
205 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
206
207 if (startDate > dateFile):
208 continue
209
210 dt = datetime.datetime.combine(dateFile, timeFile)
211 if dt not in self.files:
212 self.files[dt] = []
213
214 self.files[dt].append(fullname)
215
216 215 self.dates = self.files.keys()
217 216 self.dates.sort()
218 217 self.cursor = 0
219 218
220 219 return 1
221 220
222 221 def parseFile(self):
223 222 '''
224 223 '''
225 224
226 225 self.header = {}
227 226
228 227 for attr in self.fp.ncattrs():
229 228 self.header[str(attr)] = getattr(self.fp, attr)
230 229
231 230 self.data[self.header['TypeName']] = numpy.array(self.fp.variables[self.header['TypeName']])
232 231
233 232 if 'Azimuth' not in self.data:
234 233 self.data['Azimuth'] = numpy.array(self.fp.variables['Azimuth'])
235 234
236 235
237 236 def setNextFile(self):
238 237 '''
239 238 Open next files for the current datetime
240 239 '''
241 240
242 241 cursor = self.cursor
243
244 242 if not self.online_mode:
245 self.dt = self.dates[cursor]
246 243 if cursor == len(self.dates):
247 244 if self.online:
245 cursor = 0
246 self.dt = self.dates[cursor]
248 247 self.online_mode = True
248 if not self.search_files_online():
249 log.success('No more files', 'NCDFReader')
250 return 0
249 251 else:
250 252 log.success('No more files', 'NCDFReader')
251 253 self.flagNoMoreFiles = 1
252 254 return 0
253 255 else:
254 256 if not self.search_files_online():
255 257 return 0
258 cursor = self.cursor
256 259
257 260 log.log(
258 261 'Opening: {}\'s files'.format(self.dates[cursor]),
259 262 'NCDFReader'
260 263 )
261 264
262 265 self.data = {}
263 266
264 267 for fullname in self.files[self.dates[cursor]]:
265 268
266 269 if os.path.splitext(fullname)[-1] == '.tgz':
267 270 tar = tarfile.open(fullname, 'r:gz')
268 271 tar.extractall('/tmp')
269 272 files = [os.path.join('/tmp', member.name) for member in tar.getmembers()]
270 273 else:
271 274 files = [fullname]
272 275
273 276 for filename in files:
274 277 if self.filename is not None:
275 278 self.fp.close()
276 279
277 280 self.filename = filename
278 281 self.filedate = self.dates[cursor]
279 282 self.fp = Dataset(self.filename, 'r')
280 283 self.parseFile()
281 284
282 285 self.counter_records += 1
283 286 self.cursor += 1
284 287 return 1
285 288
286 289 def readNextFile(self):
287 290
288 291 while True:
289 292 self.flagDiscontinuousBlock = 0
290 293 if not self.setNextFile():
291 294 return 0
292 295
293 296 self.datatime = datetime.datetime.utcfromtimestamp(self.header['Time'])
294 297
295 298 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
296 299 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
297 300 log.warning(
298 301 'Reading Record No. {}/{} -> {} [Skipping]'.format(
299 302 self.counter_records,
300 303 self.nrecords,
301 304 self.datatime.ctime()),
302 305 'NCDFReader')
303 306 continue
304 307 break
305 308
306 309 log.log(
307 310 'Reading Record No. {}/{} -> {}'.format(
308 311 self.counter_records,
309 312 self.nrecords,
310 313 self.datatime.ctime()),
311 314 'NCDFReader')
312 315
313 316 return 1
314 317
315 318
316 319 def set_output(self):
317 320 '''
318 321 Storing data from buffer to dataOut object
319 322 '''
320 323
321 324 self.dataOut.heightList = self.data.pop('Azimuth')
322 325
323 326 log.log('Parameters found: {}'.format(','.join(self.data.keys())),
324 327 'PXReader')
325 328
326 329 self.dataOut.data_param = numpy.array(self.data.values())
327 330 self.dataOut.data_param[self.dataOut.data_param == -99900.] = numpy.nan
328 331 self.dataOut.parameters = self.data.keys()
329 332 self.dataOut.utctime = self.header['Time']
330 333 self.dataOut.utctimeInit = self.dataOut.utctime
331 334 self.dataOut.useLocalTime = False
332 335 self.dataOut.flagNoData = False
333 336 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
334 337
335 338 def getData(self):
336 339 '''
337 340 Storing data from databuffer to dataOut object
338 341 '''
339 342 if self.flagNoMoreFiles:
340 343 self.dataOut.flagNoData = True
341 344 log.error('No file left to process', 'NCDFReader')
342 345 return 0
343 346
344 347 if not self.readNextFile():
345 348 self.dataOut.flagNoData = True
346 349 return 0
347 350
348 351 self.set_output()
349 352
350 353 return 1
351 354
@@ -1,829 +1,829
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import os
6 6 import glob
7 7 import time
8 8 import json
9 9 import numpy
10 10 import paho.mqtt.client as mqtt
11 11 import zmq
12 12 import datetime
13 13 import ftplib
14 14 from zmq.utils.monitor import recv_monitor_message
15 15 from functools import wraps
16 16 from threading import Thread
17 17 from multiprocessing import Process
18 18
19 19 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
20 20 from schainpy.model.data.jrodata import JROData
21 21 from schainpy.utils import log
22 22
23 23 MAXNUMX = 500
24 24 MAXNUMY = 500
25 25
26 26 PLOT_CODES = {
27 27 'rti': 0, #Range time intensity (RTI).
28 28 'spc': 1, #Spectra (and Cross-spectra) information.
29 29 'cspc': 2, #Cross-Correlation information.
30 30 'coh': 3, #Coherence map.
31 31 'base': 4, #Base lines graphic.
32 32 'row': 5, #Row Spectra.
33 33 'total' : 6, #Total Power.
34 34 'drift' : 7, #Drifts graphics.
35 35 'height' : 8, #Height profile.
36 36 'phase' : 9, #Signal Phase.
37 37 'power' : 16,
38 38 'noise' : 17,
39 39 'beacon' : 18,
40 40 #USED IN jroplot_parameters.py
41 41 'wind' : 22,
42 42 'skymap' : 23,
43 43 # 'MPHASE_CODE' : 24,
44 'moments' : 25,
45 'param' : 26,
44 'V' : 25,
45 'Z' : 26,
46 46 'spc_fit' : 27,
47 47 'ew_drifts' : 28,
48 48 'reflectivity': 30
49 49 }
50 50
51 51 class PrettyFloat(float):
52 52 def __repr__(self):
53 53 return '%.2f' % self
54 54
55 55 def roundFloats(obj):
56 56 if isinstance(obj, list):
57 57 return map(roundFloats, obj)
58 58 elif isinstance(obj, float):
59 59 return round(obj, 2)
60 60
61 61 def decimate(z, MAXNUMY):
62 62 dy = int(len(z[0])/MAXNUMY) + 1
63 63
64 64 return z[::, ::dy]
65 65
66 66 class throttle(object):
67 67 '''
68 68 Decorator that prevents a function from being called more than once every
69 69 time period.
70 70 To create a function that cannot be called more than once a minute, but
71 71 will sleep until it can be called:
72 72 @throttle(minutes=1)
73 73 def foo():
74 74 pass
75 75
76 76 for i in range(10):
77 77 foo()
78 78 print "This function has run %s times." % i
79 79 '''
80 80
81 81 def __init__(self, seconds=0, minutes=0, hours=0):
82 82 self.throttle_period = datetime.timedelta(
83 83 seconds=seconds, minutes=minutes, hours=hours
84 84 )
85 85
86 86 self.time_of_last_call = datetime.datetime.min
87 87
88 88 def __call__(self, fn):
89 89 @wraps(fn)
90 90 def wrapper(*args, **kwargs):
91 91 coerce = kwargs.pop('coerce', None)
92 92 if coerce:
93 93 self.time_of_last_call = datetime.datetime.now()
94 94 return fn(*args, **kwargs)
95 95 else:
96 96 now = datetime.datetime.now()
97 97 time_since_last_call = now - self.time_of_last_call
98 98 time_left = self.throttle_period - time_since_last_call
99 99
100 100 if time_left > datetime.timedelta(seconds=0):
101 101 return
102 102
103 103 self.time_of_last_call = datetime.datetime.now()
104 104 return fn(*args, **kwargs)
105 105
106 106 return wrapper
107 107
108 108 class Data(object):
109 109 '''
110 110 Object to hold data to be plotted
111 111 '''
112 112
113 113 def __init__(self, plottypes, throttle_value, exp_code, buffering=True):
114 114 self.plottypes = plottypes
115 115 self.throttle = throttle_value
116 116 self.exp_code = exp_code
117 117 self.buffering = buffering
118 118 self.ended = False
119 119 self.localtime = False
120 120 self.__times = []
121 121 self.__heights = []
122 122
123 123 def __str__(self):
124 124 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
125 125 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
126 126
127 127 def __len__(self):
128 128 return len(self.__times)
129 129
130 130 def __getitem__(self, key):
131 131 if key not in self.data:
132 132 raise KeyError(log.error('Missing key: {}'.format(key)))
133 133
134 134 if 'spc' in key or not self.buffering:
135 135 ret = self.data[key]
136 136 else:
137 137 ret = numpy.array([self.data[key][x] for x in self.times])
138 138 if ret.ndim > 1:
139 139 ret = numpy.swapaxes(ret, 0, 1)
140 140 return ret
141 141
142 142 def __contains__(self, key):
143 143 return key in self.data
144 144
145 145 def setup(self):
146 146 '''
147 147 Configure object
148 148 '''
149 149
150 150 self.type = ''
151 151 self.ended = False
152 152 self.data = {}
153 153 self.__times = []
154 154 self.__heights = []
155 155 self.__all_heights = set()
156 156 for plot in self.plottypes:
157 157 if 'snr' in plot:
158 158 plot = 'snr'
159 159 self.data[plot] = {}
160 160
161 161 def shape(self, key):
162 162 '''
163 163 Get the shape of the one-element data for the given key
164 164 '''
165 165
166 166 if len(self.data[key]):
167 167 if 'spc' in key or not self.buffering:
168 168 return self.data[key].shape
169 169 return self.data[key][self.__times[0]].shape
170 170 return (0,)
171 171
172 172 def update(self, dataOut, tm):
173 173 '''
174 174 Update data object with new dataOut
175 175 '''
176 176
177 177 if tm in self.__times:
178 178 return
179 179
180 180 self.type = dataOut.type
181 181 self.parameters = getattr(dataOut, 'parameters', [])
182 182 if hasattr(dataOut, 'pairsList'):
183 183 self.pairs = dataOut.pairsList
184 184 self.channels = dataOut.channelList
185 185 self.interval = dataOut.getTimeInterval()
186 186 self.localtime = dataOut.useLocalTime
187 187 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
188 188 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
189 189 self.__heights.append(dataOut.heightList)
190 190 self.__all_heights.update(dataOut.heightList)
191 191 self.__times.append(tm)
192 192
193 193 for plot in self.plottypes:
194 194 if plot == 'spc':
195 195 z = dataOut.data_spc/dataOut.normFactor
196 196 self.data[plot] = 10*numpy.log10(z)
197 197 if plot == 'cspc':
198 198 self.data[plot] = dataOut.data_cspc
199 199 if plot == 'noise':
200 200 buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
201 201 if plot == 'rti':
202 202 buffer = dataOut.getPower()
203 203 if plot == 'snr_db':
204 204 self.data['snr'][tm] = dataOut.data_SNR
205 205 if plot == 'snr':
206 206 buffer = 10*numpy.log10(dataOut.data_SNR)
207 207 if plot == 'dop':
208 208 buffer = 10*numpy.log10(dataOut.data_DOP)
209 209 if plot == 'mean':
210 210 buffer = dataOut.data_MEAN
211 211 if plot == 'std':
212 212 buffer = dataOut.data_STD
213 213 if plot == 'coh':
214 214 buffer = dataOut.getCoherence()
215 215 if plot == 'phase':
216 216 buffer = dataOut.getCoherence(phase=True)
217 217 if plot == 'output':
218 218 buffer = dataOut.data_output
219 219 if plot == 'param':
220 220 buffer = dataOut.data_param
221 221
222 222 if self.buffering:
223 223 self.data[plot][tm] = buffer
224 224 else:
225 225 self.data[plot] = buffer
226 226
227 227 def normalize_heights(self):
228 228 '''
229 229 Ensure same-dimension of the data for different heighList
230 230 '''
231 231
232 232 H = numpy.array(list(self.__all_heights))
233 233 H.sort()
234 234 for key in self.data:
235 235 shape = self.shape(key)[:-1] + H.shape
236 236 for tm, obj in self.data[key].items():
237 237 h = self.__heights[self.__times.index(tm)]
238 238 if H.size == h.size:
239 239 continue
240 240 index = numpy.where(numpy.in1d(H, h))[0]
241 241 dummy = numpy.zeros(shape) + numpy.nan
242 242 if len(shape) == 2:
243 243 dummy[:, index] = obj
244 244 else:
245 245 dummy[index] = obj
246 246 self.data[key][tm] = dummy
247 247
248 248 self.__heights = [H for tm in self.__times]
249 249
250 250 def jsonify(self, decimate=False):
251 251 '''
252 252 Convert data to json
253 253 '''
254 254
255 255 data = {}
256 256 tm = self.times[-1]
257 257
258 258 for key in self.data:
259 259 if key in ('spc', 'cspc') or not self.buffering:
260 260 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
261 261 dy = int(self.data[key].shape[2]/MAXNUMY) + 1
262 262 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
263 263 else:
264 264 data[key] = roundFloats(self.data[key][tm].tolist())
265 265
266 266 ret = {'data': data}
267 267 ret['exp_code'] = self.exp_code
268 268 ret['time'] = tm
269 269 ret['interval'] = self.interval
270 270 ret['localtime'] = self.localtime
271 271 ret['yrange'] = roundFloats(self.heights.tolist())
272 272 if key in ('spc', 'cspc'):
273 273 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
274 274 else:
275 275 ret['xrange'] = []
276 276 if hasattr(self, 'pairs'):
277 277 ret['pairs'] = self.pairs
278 278 return json.dumps(ret)
279 279
280 280 @property
281 281 def times(self):
282 282 '''
283 283 Return the list of times of the current data
284 284 '''
285 285
286 286 ret = numpy.array(self.__times)
287 287 ret.sort()
288 288 return ret
289 289
290 290 @property
291 291 def heights(self):
292 292 '''
293 293 Return the list of heights of the current data
294 294 '''
295 295
296 296 return numpy.array(self.__heights[-1])
297 297
298 298 class PublishData(Operation):
299 299 '''
300 300 Operation to send data over zmq.
301 301 '''
302 302
303 303 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
304 304
305 305 def __init__(self, **kwargs):
306 306 """Inicio."""
307 307 Operation.__init__(self, **kwargs)
308 308 self.isConfig = False
309 309 self.client = None
310 310 self.zeromq = None
311 311 self.mqtt = None
312 312
313 313 def on_disconnect(self, client, userdata, rc):
314 314 if rc != 0:
315 315 log.warning('Unexpected disconnection.')
316 316 self.connect()
317 317
318 318 def connect(self):
319 319 log.warning('trying to connect')
320 320 try:
321 321 self.client.connect(
322 322 host=self.host,
323 323 port=self.port,
324 324 keepalive=60*10,
325 325 bind_address='')
326 326 self.client.loop_start()
327 327 # self.client.publish(
328 328 # self.topic + 'SETUP',
329 329 # json.dumps(setup),
330 330 # retain=True
331 331 # )
332 332 except:
333 333 log.error('MQTT Conection error.')
334 334 self.client = False
335 335
336 336 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
337 337 self.counter = 0
338 338 self.topic = kwargs.get('topic', 'schain')
339 339 self.delay = kwargs.get('delay', 0)
340 340 self.plottype = kwargs.get('plottype', 'spectra')
341 341 self.host = kwargs.get('host', "10.10.10.82")
342 342 self.port = kwargs.get('port', 3000)
343 343 self.clientId = clientId
344 344 self.cnt = 0
345 345 self.zeromq = zeromq
346 346 self.mqtt = kwargs.get('plottype', 0)
347 347 self.client = None
348 348 self.verbose = verbose
349 349 setup = []
350 350 if mqtt is 1:
351 351 self.client = mqtt.Client(
352 352 client_id=self.clientId + self.topic + 'SCHAIN',
353 353 clean_session=True)
354 354 self.client.on_disconnect = self.on_disconnect
355 355 self.connect()
356 356 for plot in self.plottype:
357 357 setup.append({
358 358 'plot': plot,
359 359 'topic': self.topic + plot,
360 360 'title': getattr(self, plot + '_' + 'title', False),
361 361 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
362 362 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
363 363 'xrange': getattr(self, plot + '_' + 'xrange', False),
364 364 'yrange': getattr(self, plot + '_' + 'yrange', False),
365 365 'zrange': getattr(self, plot + '_' + 'zrange', False),
366 366 })
367 367 if zeromq is 1:
368 368 context = zmq.Context()
369 369 self.zmq_socket = context.socket(zmq.PUSH)
370 370 server = kwargs.get('server', 'zmq.pipe')
371 371
372 372 if 'tcp://' in server:
373 373 address = server
374 374 else:
375 375 address = 'ipc:///tmp/%s' % server
376 376
377 377 self.zmq_socket.connect(address)
378 378 time.sleep(1)
379 379
380 380
381 381 def publish_data(self):
382 382 self.dataOut.finished = False
383 383 if self.mqtt is 1:
384 384 yData = self.dataOut.heightList[:2].tolist()
385 385 if self.plottype == 'spectra':
386 386 data = getattr(self.dataOut, 'data_spc')
387 387 z = data/self.dataOut.normFactor
388 388 zdB = 10*numpy.log10(z)
389 389 xlen, ylen = zdB[0].shape
390 390 dx = int(xlen/MAXNUMX) + 1
391 391 dy = int(ylen/MAXNUMY) + 1
392 392 Z = [0 for i in self.dataOut.channelList]
393 393 for i in self.dataOut.channelList:
394 394 Z[i] = zdB[i][::dx, ::dy].tolist()
395 395 payload = {
396 396 'timestamp': self.dataOut.utctime,
397 397 'data': roundFloats(Z),
398 398 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
399 399 'interval': self.dataOut.getTimeInterval(),
400 400 'type': self.plottype,
401 401 'yData': yData
402 402 }
403 403
404 404 elif self.plottype in ('rti', 'power'):
405 405 data = getattr(self.dataOut, 'data_spc')
406 406 z = data/self.dataOut.normFactor
407 407 avg = numpy.average(z, axis=1)
408 408 avgdB = 10*numpy.log10(avg)
409 409 xlen, ylen = z[0].shape
410 410 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
411 411 AVG = [0 for i in self.dataOut.channelList]
412 412 for i in self.dataOut.channelList:
413 413 AVG[i] = avgdB[i][::dy].tolist()
414 414 payload = {
415 415 'timestamp': self.dataOut.utctime,
416 416 'data': roundFloats(AVG),
417 417 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
418 418 'interval': self.dataOut.getTimeInterval(),
419 419 'type': self.plottype,
420 420 'yData': yData
421 421 }
422 422 elif self.plottype == 'noise':
423 423 noise = self.dataOut.getNoise()/self.dataOut.normFactor
424 424 noisedB = 10*numpy.log10(noise)
425 425 payload = {
426 426 'timestamp': self.dataOut.utctime,
427 427 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
428 428 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
429 429 'interval': self.dataOut.getTimeInterval(),
430 430 'type': self.plottype,
431 431 'yData': yData
432 432 }
433 433 elif self.plottype == 'snr':
434 434 data = getattr(self.dataOut, 'data_SNR')
435 435 avgdB = 10*numpy.log10(data)
436 436
437 437 ylen = data[0].size
438 438 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
439 439 AVG = [0 for i in self.dataOut.channelList]
440 440 for i in self.dataOut.channelList:
441 441 AVG[i] = avgdB[i][::dy].tolist()
442 442 payload = {
443 443 'timestamp': self.dataOut.utctime,
444 444 'data': roundFloats(AVG),
445 445 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
446 446 'type': self.plottype,
447 447 'yData': yData
448 448 }
449 449 else:
450 450 print "Tipo de grafico invalido"
451 451 payload = {
452 452 'data': 'None',
453 453 'timestamp': 'None',
454 454 'type': None
455 455 }
456 456
457 457 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
458 458
459 459 if self.zeromq is 1:
460 460 if self.verbose:
461 461 log.log(
462 462 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
463 463 self.name
464 464 )
465 465 self.zmq_socket.send_pyobj(self.dataOut)
466 466
467 467 def run(self, dataOut, **kwargs):
468 468 self.dataOut = dataOut
469 469 if not self.isConfig:
470 470 self.setup(**kwargs)
471 471 self.isConfig = True
472 472
473 473 self.publish_data()
474 474 time.sleep(self.delay)
475 475
476 476 def close(self):
477 477 if self.zeromq is 1:
478 478 self.dataOut.finished = True
479 479 self.zmq_socket.send_pyobj(self.dataOut)
480 480 time.sleep(0.1)
481 481 self.zmq_socket.close()
482 482 if self.client:
483 483 self.client.loop_stop()
484 484 self.client.disconnect()
485 485
486 486
487 487 class ReceiverData(ProcessingUnit):
488 488
489 489 __attrs__ = ['server']
490 490
491 491 def __init__(self, **kwargs):
492 492
493 493 ProcessingUnit.__init__(self, **kwargs)
494 494
495 495 self.isConfig = False
496 496 server = kwargs.get('server', 'zmq.pipe')
497 497 if 'tcp://' in server:
498 498 address = server
499 499 else:
500 500 address = 'ipc:///tmp/%s' % server
501 501
502 502 self.address = address
503 503 self.dataOut = JROData()
504 504
505 505 def setup(self):
506 506
507 507 self.context = zmq.Context()
508 508 self.receiver = self.context.socket(zmq.PULL)
509 509 self.receiver.bind(self.address)
510 510 time.sleep(0.5)
511 511 log.success('ReceiverData from {}'.format(self.address))
512 512
513 513
514 514 def run(self):
515 515
516 516 if not self.isConfig:
517 517 self.setup()
518 518 self.isConfig = True
519 519
520 520 self.dataOut = self.receiver.recv_pyobj()
521 521 log.log('{} - {}'.format(self.dataOut.type,
522 522 self.dataOut.datatime.ctime(),),
523 523 'Receiving')
524 524
525 525
526 526 class PlotterReceiver(ProcessingUnit, Process):
527 527
528 528 throttle_value = 5
529 529 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
530 530 'exp_code', 'web_server', 'buffering']
531 531
532 532 def __init__(self, **kwargs):
533 533
534 534 ProcessingUnit.__init__(self, **kwargs)
535 535 Process.__init__(self)
536 536 self.mp = False
537 537 self.isConfig = False
538 538 self.isWebConfig = False
539 539 self.connections = 0
540 540 server = kwargs.get('server', 'zmq.pipe')
541 541 web_server = kwargs.get('web_server', None)
542 542 if 'tcp://' in server:
543 543 address = server
544 544 else:
545 545 address = 'ipc:///tmp/%s' % server
546 546 self.address = address
547 547 self.web_address = web_server
548 548 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
549 549 self.realtime = kwargs.get('realtime', False)
550 550 self.localtime = kwargs.get('localtime', True)
551 551 self.buffering = kwargs.get('buffering', True)
552 552 self.throttle_value = kwargs.get('throttle', 5)
553 553 self.exp_code = kwargs.get('exp_code', None)
554 554 self.sendData = self.initThrottle(self.throttle_value)
555 555 self.dates = []
556 556 self.setup()
557 557
558 558 def setup(self):
559 559
560 560 self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering)
561 561 self.isConfig = True
562 562
563 563 def event_monitor(self, monitor):
564 564
565 565 events = {}
566 566
567 567 for name in dir(zmq):
568 568 if name.startswith('EVENT_'):
569 569 value = getattr(zmq, name)
570 570 events[value] = name
571 571
572 572 while monitor.poll():
573 573 evt = recv_monitor_message(monitor)
574 574 if evt['event'] == 32:
575 575 self.connections += 1
576 576 if evt['event'] == 512:
577 577 pass
578 578
579 579 evt.update({'description': events[evt['event']]})
580 580
581 581 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
582 582 break
583 583 monitor.close()
584 584 print('event monitor thread done!')
585 585
586 586 def initThrottle(self, throttle_value):
587 587
588 588 @throttle(seconds=throttle_value)
589 589 def sendDataThrottled(fn_sender, data):
590 590 fn_sender(data)
591 591
592 592 return sendDataThrottled
593 593
594 594 def send(self, data):
595 595 log.log('Sending {}'.format(data), self.name)
596 596 self.sender.send_pyobj(data)
597 597
598 598 def run(self):
599 599
600 600 log.log(
601 601 'Starting from {}'.format(self.address),
602 602 self.name
603 603 )
604 604
605 605 self.context = zmq.Context()
606 606 self.receiver = self.context.socket(zmq.PULL)
607 607 self.receiver.bind(self.address)
608 608 monitor = self.receiver.get_monitor_socket()
609 609 self.sender = self.context.socket(zmq.PUB)
610 610 if self.web_address:
611 611 log.success(
612 612 'Sending to web: {}'.format(self.web_address),
613 613 self.name
614 614 )
615 615 self.sender_web = self.context.socket(zmq.PUSH)
616 616 self.sender_web.connect(self.web_address)
617 617 time.sleep(1)
618 618
619 619 if 'server' in self.kwargs:
620 620 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
621 621 else:
622 622 self.sender.bind("ipc:///tmp/zmq.plots")
623 623
624 624 time.sleep(2)
625 625
626 626 t = Thread(target=self.event_monitor, args=(monitor,))
627 627 t.start()
628 628
629 629 while True:
630 630 dataOut = self.receiver.recv_pyobj()
631 631 if not dataOut.flagNoData:
632 632 if dataOut.type == 'Parameters':
633 633 tm = dataOut.utctimeInit
634 634 else:
635 635 tm = dataOut.utctime
636 636 if dataOut.useLocalTime:
637 637 if not self.localtime:
638 638 tm += time.timezone
639 639 dt = datetime.datetime.fromtimestamp(tm).date()
640 640 else:
641 641 if self.localtime:
642 642 tm -= time.timezone
643 643 dt = datetime.datetime.utcfromtimestamp(tm).date()
644 644 coerce = False
645 645 if dt not in self.dates:
646 646 if self.data:
647 647 self.data.ended = True
648 648 self.send(self.data)
649 649 coerce = True
650 650 self.data.setup()
651 651 self.dates.append(dt)
652 652
653 653 self.data.update(dataOut, tm)
654 654
655 655 if dataOut.finished is True:
656 656 self.connections -= 1
657 657 if self.connections == 0 and dt in self.dates:
658 658 self.data.ended = True
659 659 self.send(self.data)
660 660 self.data.setup()
661 661 else:
662 662 if self.realtime:
663 663 self.send(self.data)
664 664 if self.web_address:
665 665 payload = self.data.jsonify()
666 666 log.log('Sending to web... type:{}, size:{}'.format(dataOut.type, len(payload)), self.name)
667 667 self.sender_web.send(payload)
668 668 else:
669 669 self.sendData(self.send, self.data, coerce=coerce)
670 670 coerce = False
671 671
672 672 return
673 673
674 674
675 675 class SendToFTP(Operation, Process):
676 676
677 677 '''
678 678 Operation to send data over FTP.
679 679 '''
680 680
681 681 __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout']
682 682
683 683 def __init__(self, **kwargs):
684 684 '''
685 685 patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...]
686 686 '''
687 687 Operation.__init__(self, **kwargs)
688 688 Process.__init__(self)
689 689 self.server = kwargs.get('server')
690 690 self.username = kwargs.get('username')
691 691 self.password = kwargs.get('password')
692 692 self.patterns = kwargs.get('patterns')
693 693 self.timeout = kwargs.get('timeout', 10)
694 694 self.times = [time.time() for p in self.patterns]
695 695 self.latest = ['' for p in self.patterns]
696 696 self.mp = False
697 697 self.ftp = None
698 698
699 699 def setup(self):
700 700
701 701 log.log('Connecting to ftp://{}'.format(self.server), self.name)
702 702 try:
703 703 self.ftp = ftplib.FTP(self.server, timeout=self.timeout)
704 704 except ftplib.all_errors:
705 705 log.error('Server connection fail: {}'.format(self.server), self.name)
706 706 if self.ftp is not None:
707 707 self.ftp.close()
708 708 self.ftp = None
709 709 self.isConfig = False
710 710 return
711 711
712 712 try:
713 713 self.ftp.login(self.username, self.password)
714 714 except ftplib.all_errors:
715 715 log.error('The given username y/o password are incorrect', self.name)
716 716 if self.ftp is not None:
717 717 self.ftp.close()
718 718 self.ftp = None
719 719 self.isConfig = False
720 720 return
721 721
722 722 log.success('Connection success', self.name)
723 723 self.isConfig = True
724 724 return
725 725
726 726 def check(self):
727 727
728 728 try:
729 729 self.ftp.voidcmd("NOOP")
730 730 except:
731 731 log.warning('Connection lost... trying to reconnect', self.name)
732 732 if self.ftp is not None:
733 733 self.ftp.close()
734 734 self.ftp = None
735 735 self.setup()
736 736
737 737 def find_files(self, path, ext):
738 738
739 739 files = glob.glob1(path, '*{}'.format(ext))
740 740 files.sort()
741 741 if files:
742 742 return files[-1]
743 743 return None
744 744
745 745 def getftpname(self, filename, exp_code, sub_exp_code):
746 746
747 747 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
748 748 YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year
749 749 DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday
750 750 exp_code = '%3.3d'%exp_code
751 751 sub_exp_code = '%2.2d'%sub_exp_code
752 plot_code = '%2.2d'% PLOT_CODES[filename.split('_')[0].split('-')[0]]
752 plot_code = '%2.2d'% PLOT_CODES[filename.split('_')[0].split('-')[1]]
753 753 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
754 754 return name
755 755
756 756 def upload(self, src, dst):
757 757
758 758 log.log('Uploading {} '.format(src), self.name, nl=False)
759 759
760 760 fp = open(src, 'rb')
761 761 command = 'STOR {}'.format(dst)
762 762
763 763 try:
764 764 self.ftp.storbinary(command, fp, blocksize=1024)
765 765 except ftplib.all_errors, e:
766 766 log.error('{}'.format(e), self.name)
767 767 if self.ftp is not None:
768 768 self.ftp.close()
769 769 self.ftp = None
770 770 return
771 771
772 772 try:
773 773 self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
774 774 except ftplib.all_errors, e:
775 775 log.error('{}'.format(e), self.name)
776 776 if self.ftp is not None:
777 777 self.ftp.close()
778 778 self.ftp = None
779 779
780 780 fp.close()
781 781
782 782 log.success('OK', tag='')
783 783
784 784 def send_files(self):
785 785
786 786 for x, pattern in enumerate(self.patterns):
787 787 local, remote, ext, delay, exp_code, sub_exp_code = pattern
788 788 if time.time()-self.times[x] >= delay:
789 789 srcname = self.find_files(local, ext)
790 790
791 791 if srcname is None or srcname == self.latest[x]:
792 792 continue
793 793
794 794 if 'png' in ext:
795 795 dstname = self.getftpname(srcname, exp_code, sub_exp_code)
796 796 else:
797 797 dstname = srcname
798 798
799 799 src = os.path.join(local, srcname)
800 800
801 801 if os.path.getmtime(src) < time.time() - 30*60:
802 802 continue
803 803
804 804 dst = os.path.join(remote, dstname)
805 805
806 806 if self.ftp is None:
807 807 continue
808 808
809 809 self.upload(src, dst)
810 810
811 811 self.times[x] = time.time()
812 812 self.latest[x] = srcname
813 813
814 814 def run(self):
815 815
816 816 while True:
817 817 if not self.isConfig:
818 818 self.setup()
819 819 if self.ftp is not None:
820 820 self.check()
821 821 self.send_files()
822 822 time.sleep(2)
823 823
824 824 def close():
825 825
826 826 if self.ftp is not None:
827 827 if self.ftp is not None:
828 828 self.ftp.close()
829 829 self.terminate()
General Comments 0
You need to be logged in to leave comments. Login now