##// END OF EJS Templates
Minor bugs
Juan C. Espinoza -
r1278:72237600e769
parent child
Show More
@@ -1,803 +1,800
1 1
2 2 import os
3 3 import sys
4 4 import zmq
5 5 import time
6 6 import numpy
7 7 import datetime
8 8 from functools import wraps
9 9 from threading import Thread
10 10 import matplotlib
11 11
12 12 if 'BACKEND' in os.environ:
13 13 matplotlib.use(os.environ['BACKEND'])
14 14 elif 'linux' in sys.platform:
15 15 matplotlib.use("TkAgg")
16 16 elif 'darwin' in sys.platform:
17 17 matplotlib.use('WxAgg')
18 18 else:
19 19 from schainpy.utils import log
20 20 log.warning('Using default Backend="Agg"', 'INFO')
21 21 matplotlib.use('Agg')
22 22
23 23 import matplotlib.pyplot as plt
24 24 from matplotlib.patches import Polygon
25 25 from mpl_toolkits.axes_grid1 import make_axes_locatable
26 26 from matplotlib.ticker import FuncFormatter, LinearLocator, MultipleLocator
27 27
28 28 from schainpy.model.data.jrodata import PlotterData
29 29 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
30 30 from schainpy.utils import log
31 31
32 32 jet_values = matplotlib.pyplot.get_cmap('jet', 100)(numpy.arange(100))[10:90]
33 33 blu_values = matplotlib.pyplot.get_cmap(
34 34 'seismic_r', 20)(numpy.arange(20))[10:15]
35 35 ncmap = matplotlib.colors.LinearSegmentedColormap.from_list(
36 36 'jro', numpy.vstack((blu_values, jet_values)))
37 37 matplotlib.pyplot.register_cmap(cmap=ncmap)
38 38
39 39 CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis',
40 40 'plasma', 'inferno', 'Greys', 'seismic', 'bwr', 'coolwarm')]
41 41
42 42 EARTH_RADIUS = 6.3710e3
43 43
44 44 def ll2xy(lat1, lon1, lat2, lon2):
45 45
46 46 p = 0.017453292519943295
47 47 a = 0.5 - numpy.cos((lat2 - lat1) * p)/2 + numpy.cos(lat1 * p) * \
48 48 numpy.cos(lat2 * p) * (1 - numpy.cos((lon2 - lon1) * p)) / 2
49 49 r = 12742 * numpy.arcsin(numpy.sqrt(a))
50 50 theta = numpy.arctan2(numpy.sin((lon2-lon1)*p)*numpy.cos(lat2*p), numpy.cos(lat1*p)
51 51 * numpy.sin(lat2*p)-numpy.sin(lat1*p)*numpy.cos(lat2*p)*numpy.cos((lon2-lon1)*p))
52 52 theta = -theta + numpy.pi/2
53 53 return r*numpy.cos(theta), r*numpy.sin(theta)
54 54
55 55
56 56 def km2deg(km):
57 57 '''
58 58 Convert distance in km to degrees
59 59 '''
60 60
61 61 return numpy.rad2deg(km/EARTH_RADIUS)
62 62
63 63
64 64 def figpause(interval):
65 65 backend = plt.rcParams['backend']
66 66 if backend in matplotlib.rcsetup.interactive_bk:
67 67 figManager = matplotlib._pylab_helpers.Gcf.get_active()
68 68 if figManager is not None:
69 69 canvas = figManager.canvas
70 70 if canvas.figure.stale:
71 71 canvas.draw()
72 72 try:
73 73 canvas.start_event_loop(interval)
74 74 except:
75 75 pass
76 76 return
77 77
78 78
79 79 def popup(message):
80 80 '''
81 81 '''
82 82
83 83 fig = plt.figure(figsize=(12, 8), facecolor='r')
84 84 text = '\n'.join([s.strip() for s in message.split(':')])
85 85 fig.text(0.01, 0.5, text, ha='left', va='center',
86 86 size='20', weight='heavy', color='w')
87 87 fig.show()
88 88 figpause(1000)
89 89
90 90
91 91 class Throttle(object):
92 92 '''
93 93 Decorator that prevents a function from being called more than once every
94 94 time period.
95 95 To create a function that cannot be called more than once a minute, but
96 96 will sleep until it can be called:
97 97 @Throttle(minutes=1)
98 98 def foo():
99 99 pass
100 100
101 101 for i in range(10):
102 102 foo()
103 103 print "This function has run %s times." % i
104 104 '''
105 105
106 106 def __init__(self, seconds=0, minutes=0, hours=0):
107 107 self.throttle_period = datetime.timedelta(
108 108 seconds=seconds, minutes=minutes, hours=hours
109 109 )
110 110
111 111 self.time_of_last_call = datetime.datetime.min
112 112
113 113 def __call__(self, fn):
114 114 @wraps(fn)
115 115 def wrapper(*args, **kwargs):
116 116 coerce = kwargs.pop('coerce', None)
117 117 if coerce:
118 118 self.time_of_last_call = datetime.datetime.now()
119 119 return fn(*args, **kwargs)
120 120 else:
121 121 now = datetime.datetime.now()
122 122 time_since_last_call = now - self.time_of_last_call
123 123 time_left = self.throttle_period - time_since_last_call
124 124
125 125 if time_left > datetime.timedelta(seconds=0):
126 126 return
127 127
128 128 self.time_of_last_call = datetime.datetime.now()
129 129 return fn(*args, **kwargs)
130 130
131 131 return wrapper
132 132
133 133 def apply_throttle(value):
134 134
135 135 @Throttle(seconds=value)
136 136 def fnThrottled(fn):
137 137 fn()
138 138
139 139 return fnThrottled
140 140
141 141
142 142 @MPDecorator
143 143 class Plot(Operation):
144 144 '''
145 145 Base class for Schain plotting operations
146 146 '''
147 147
148 148 CODE = 'Figure'
149 149 colormap = 'jet'
150 150 bgcolor = 'white'
151 151 __missing = 1E30
152 152
153 153 __attrs__ = ['show', 'save', 'xmin', 'xmax', 'ymin', 'ymax', 'zmin', 'zmax',
154 154 'zlimits', 'xlabel', 'ylabel', 'xaxis', 'cb_label', 'title',
155 155 'colorbar', 'bgcolor', 'width', 'height', 'localtime', 'oneFigure',
156 156 'showprofile', 'decimation', 'pause']
157 157
158 158 def __init__(self):
159 159
160 160 Operation.__init__(self)
161 161 self.isConfig = False
162 162 self.isPlotConfig = False
163 163 self.save_counter = 1
164 164 self.sender_counter = 1
165 165 self.data = None
166 166
167 167 def __fmtTime(self, x, pos):
168 168 '''
169 169 '''
170 170
171 171 return '{}'.format(self.getDateTime(x).strftime('%H:%M'))
172 172
173 173 def __setup(self, **kwargs):
174 174 '''
175 175 Initialize variables
176 176 '''
177 177
178 178 self.figures = []
179 179 self.axes = []
180 180 self.cb_axes = []
181 181 self.localtime = kwargs.pop('localtime', True)
182 182 self.show = kwargs.get('show', True)
183 183 self.save = kwargs.get('save', False)
184 184 self.save_period = kwargs.get('save_period', 1)
185 185 self.ftp = kwargs.get('ftp', False)
186 186 self.colormap = kwargs.get('colormap', self.colormap)
187 187 self.colormap_coh = kwargs.get('colormap_coh', 'jet')
188 188 self.colormap_phase = kwargs.get('colormap_phase', 'RdBu_r')
189 189 self.colormaps = kwargs.get('colormaps', None)
190 190 self.bgcolor = kwargs.get('bgcolor', self.bgcolor)
191 191 self.showprofile = kwargs.get('showprofile', False)
192 192 self.title = kwargs.get('wintitle', self.CODE.upper())
193 193 self.cb_label = kwargs.get('cb_label', None)
194 194 self.cb_labels = kwargs.get('cb_labels', None)
195 195 self.labels = kwargs.get('labels', None)
196 196 self.xaxis = kwargs.get('xaxis', 'frequency')
197 197 self.zmin = kwargs.get('zmin', None)
198 198 self.zmax = kwargs.get('zmax', None)
199 199 self.zlimits = kwargs.get('zlimits', None)
200 200 self.xmin = kwargs.get('xmin', None)
201 201 self.xmax = kwargs.get('xmax', None)
202 202 self.xrange = kwargs.get('xrange', 24)
203 203 self.xscale = kwargs.get('xscale', None)
204 204 self.ymin = kwargs.get('ymin', None)
205 205 self.ymax = kwargs.get('ymax', None)
206 206 self.yscale = kwargs.get('yscale', None)
207 207 self.xlabel = kwargs.get('xlabel', None)
208 208 self.attr_time = kwargs.get('attr_time', 'utctime')
209 209 self.decimation = kwargs.get('decimation', None)
210 210 self.showSNR = kwargs.get('showSNR', False)
211 211 self.oneFigure = kwargs.get('oneFigure', True)
212 212 self.width = kwargs.get('width', None)
213 213 self.height = kwargs.get('height', None)
214 214 self.colorbar = kwargs.get('colorbar', True)
215 215 self.factors = kwargs.get('factors', [1, 1, 1, 1, 1, 1, 1, 1])
216 216 self.channels = kwargs.get('channels', None)
217 217 self.titles = kwargs.get('titles', [])
218 218 self.polar = False
219 219 self.type = kwargs.get('type', 'iq')
220 220 self.grid = kwargs.get('grid', False)
221 221 self.pause = kwargs.get('pause', False)
222 self.save_labels = kwargs.get('save_labels', None)
222 self.save_code = kwargs.get('save_code', None)
223 223 self.realtime = kwargs.get('realtime', True)
224 224 self.buffering = kwargs.get('buffering', True)
225 self.throttle = kwargs.get('throttle', 2)
225 self.throttle = kwargs.get('throttle', 0)
226 226 self.exp_code = kwargs.get('exp_code', None)
227 227 self.plot_server = kwargs.get('plot_server', False)
228 228 self.sender_period = kwargs.get('sender_period', 1)
229 229 self.__throttle_plot = apply_throttle(self.throttle)
230 230 self.data = PlotterData(
231 231 self.CODE, self.throttle, self.exp_code, self.buffering, snr=self.showSNR)
232 232
233 233 if self.plot_server:
234 234 if not self.plot_server.startswith('tcp://'):
235 235 self.plot_server = 'tcp://{}'.format(self.plot_server)
236 236 log.success(
237 237 'Sending to server: {}'.format(self.plot_server),
238 238 self.name
239 239 )
240 240 if 'plot_name' in kwargs:
241 241 self.plot_name = kwargs['plot_name']
242 242
243 243 def __setup_plot(self):
244 244 '''
245 245 Common setup for all figures, here figures and axes are created
246 246 '''
247 247
248 248 self.setup()
249 249
250 250 self.time_label = 'LT' if self.localtime else 'UTC'
251 251
252 252 if self.width is None:
253 253 self.width = 8
254 254
255 255 self.figures = []
256 256 self.axes = []
257 257 self.cb_axes = []
258 258 self.pf_axes = []
259 259 self.cmaps = []
260 260
261 261 size = '15%' if self.ncols == 1 else '30%'
262 262 pad = '4%' if self.ncols == 1 else '8%'
263 263
264 264 if self.oneFigure:
265 265 if self.height is None:
266 266 self.height = 1.4 * self.nrows + 1
267 267 fig = plt.figure(figsize=(self.width, self.height),
268 268 edgecolor='k',
269 269 facecolor='w')
270 270 self.figures.append(fig)
271 271 for n in range(self.nplots):
272 272 ax = fig.add_subplot(self.nrows, self.ncols,
273 273 n + 1, polar=self.polar)
274 274 ax.tick_params(labelsize=8)
275 275 ax.firsttime = True
276 276 ax.index = 0
277 277 ax.press = None
278 278 self.axes.append(ax)
279 279 if self.showprofile:
280 280 cax = self.__add_axes(ax, size=size, pad=pad)
281 281 cax.tick_params(labelsize=8)
282 282 self.pf_axes.append(cax)
283 283 else:
284 284 if self.height is None:
285 285 self.height = 3
286 286 for n in range(self.nplots):
287 287 fig = plt.figure(figsize=(self.width, self.height),
288 288 edgecolor='k',
289 289 facecolor='w')
290 290 ax = fig.add_subplot(1, 1, 1, polar=self.polar)
291 291 ax.tick_params(labelsize=8)
292 292 ax.firsttime = True
293 293 ax.index = 0
294 294 ax.press = None
295 295 self.figures.append(fig)
296 296 self.axes.append(ax)
297 297 if self.showprofile:
298 298 cax = self.__add_axes(ax, size=size, pad=pad)
299 299 cax.tick_params(labelsize=8)
300 300 self.pf_axes.append(cax)
301 301
302 302 for n in range(self.nrows):
303 303 if self.colormaps is not None:
304 304 cmap = plt.get_cmap(self.colormaps[n])
305 305 else:
306 306 cmap = plt.get_cmap(self.colormap)
307 307 cmap.set_bad(self.bgcolor, 1.)
308 308 self.cmaps.append(cmap)
309 309
310 310 for fig in self.figures:
311 311 fig.canvas.mpl_connect('key_press_event', self.OnKeyPress)
312 312 fig.canvas.mpl_connect('scroll_event', self.OnBtnScroll)
313 313 fig.canvas.mpl_connect('button_press_event', self.onBtnPress)
314 314 fig.canvas.mpl_connect('motion_notify_event', self.onMotion)
315 315 fig.canvas.mpl_connect('button_release_event', self.onBtnRelease)
316 316
317 317 def OnKeyPress(self, event):
318 318 '''
319 319 Event for pressing keys (up, down) change colormap
320 320 '''
321 321 ax = event.inaxes
322 322 if ax in self.axes:
323 323 if event.key == 'down':
324 324 ax.index += 1
325 325 elif event.key == 'up':
326 326 ax.index -= 1
327 327 if ax.index < 0:
328 328 ax.index = len(CMAPS) - 1
329 329 elif ax.index == len(CMAPS):
330 330 ax.index = 0
331 331 cmap = CMAPS[ax.index]
332 332 ax.cbar.set_cmap(cmap)
333 333 ax.cbar.draw_all()
334 334 ax.plt.set_cmap(cmap)
335 335 ax.cbar.patch.figure.canvas.draw()
336 336 self.colormap = cmap.name
337 337
338 338 def OnBtnScroll(self, event):
339 339 '''
340 340 Event for scrolling, scale figure
341 341 '''
342 342 cb_ax = event.inaxes
343 343 if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]:
344 344 ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0]
345 345 pt = ax.cbar.ax.bbox.get_points()[:, 1]
346 346 nrm = ax.cbar.norm
347 347 vmin, vmax, p0, p1, pS = (
348 348 nrm.vmin, nrm.vmax, pt[0], pt[1], event.y)
349 349 scale = 2 if event.step == 1 else 0.5
350 350 point = vmin + (vmax - vmin) / (p1 - p0) * (pS - p0)
351 351 ax.cbar.norm.vmin = point - scale * (point - vmin)
352 352 ax.cbar.norm.vmax = point - scale * (point - vmax)
353 353 ax.plt.set_norm(ax.cbar.norm)
354 354 ax.cbar.draw_all()
355 355 ax.cbar.patch.figure.canvas.draw()
356 356
357 357 def onBtnPress(self, event):
358 358 '''
359 359 Event for mouse button press
360 360 '''
361 361 cb_ax = event.inaxes
362 362 if cb_ax is None:
363 363 return
364 364
365 365 if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]:
366 366 cb_ax.press = event.x, event.y
367 367 else:
368 368 cb_ax.press = None
369 369
370 370 def onMotion(self, event):
371 371 '''
372 372 Event for move inside colorbar
373 373 '''
374 374 cb_ax = event.inaxes
375 375 if cb_ax is None:
376 376 return
377 377 if cb_ax not in [ax.cbar.ax for ax in self.axes if ax.cbar]:
378 378 return
379 379 if cb_ax.press is None:
380 380 return
381 381
382 382 ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0]
383 383 xprev, yprev = cb_ax.press
384 384 dx = event.x - xprev
385 385 dy = event.y - yprev
386 386 cb_ax.press = event.x, event.y
387 387 scale = ax.cbar.norm.vmax - ax.cbar.norm.vmin
388 388 perc = 0.03
389 389
390 390 if event.button == 1:
391 391 ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy)
392 392 ax.cbar.norm.vmax -= (perc * scale) * numpy.sign(dy)
393 393 elif event.button == 3:
394 394 ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy)
395 395 ax.cbar.norm.vmax += (perc * scale) * numpy.sign(dy)
396 396
397 397 ax.cbar.draw_all()
398 398 ax.plt.set_norm(ax.cbar.norm)
399 399 ax.cbar.patch.figure.canvas.draw()
400 400
401 401 def onBtnRelease(self, event):
402 402 '''
403 403 Event for mouse button release
404 404 '''
405 405 cb_ax = event.inaxes
406 406 if cb_ax is not None:
407 407 cb_ax.press = None
408 408
409 409 def __add_axes(self, ax, size='30%', pad='8%'):
410 410 '''
411 411 Add new axes to the given figure
412 412 '''
413 413 divider = make_axes_locatable(ax)
414 414 nax = divider.new_horizontal(size=size, pad=pad)
415 415 ax.figure.add_axes(nax)
416 416 return nax
417 417
418 418 def fill_gaps(self, x_buffer, y_buffer, z_buffer):
419 419 '''
420 420 Create a masked array for missing data
421 421 '''
422 422 if x_buffer.shape[0] < 2:
423 423 return x_buffer, y_buffer, z_buffer
424 424
425 425 deltas = x_buffer[1:] - x_buffer[0:-1]
426 426 x_median = numpy.median(deltas)
427 427
428 428 index = numpy.where(deltas > 5 * x_median)
429 429
430 430 if len(index[0]) != 0:
431 431 z_buffer[::, index[0], ::] = self.__missing
432 432 z_buffer = numpy.ma.masked_inside(z_buffer,
433 433 0.99 * self.__missing,
434 434 1.01 * self.__missing)
435 435
436 436 return x_buffer, y_buffer, z_buffer
437 437
438 438 def decimate(self):
439 439
440 440 # dx = int(len(self.x)/self.__MAXNUMX) + 1
441 441 dy = int(len(self.y) / self.decimation) + 1
442 442
443 443 # x = self.x[::dx]
444 444 x = self.x
445 445 y = self.y[::dy]
446 446 z = self.z[::, ::, ::dy]
447 447
448 448 return x, y, z
449 449
450 450 def format(self):
451 451 '''
452 452 Set min and max values, labels, ticks and titles
453 453 '''
454 454
455 455 if self.xmin is None:
456 456 xmin = self.data.min_time
457 457 else:
458 458 if self.xaxis is 'time':
459 459 dt = self.getDateTime(self.data.min_time)
460 460 xmin = (dt.replace(hour=int(self.xmin), minute=0, second=0) -
461 461 datetime.datetime(1970, 1, 1)).total_seconds()
462 462 if self.data.localtime:
463 463 xmin += time.timezone
464 464 else:
465 465 xmin = self.xmin
466 466
467 467 if self.xmax is None:
468 468 xmax = xmin + self.xrange * 60 * 60
469 469 else:
470 470 if self.xaxis is 'time':
471 471 dt = self.getDateTime(self.data.max_time)
472 472 xmax = self.xmax - 1
473 473 xmax = (dt.replace(hour=int(xmax), minute=59, second=59) -
474 474 datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=1)).total_seconds()
475 475 if self.data.localtime:
476 476 xmax += time.timezone
477 477 else:
478 478 xmax = self.xmax
479 479
480 480 ymin = self.ymin if self.ymin else numpy.nanmin(self.y)
481 481 ymax = self.ymax if self.ymax else numpy.nanmax(self.y)
482 482
483 483 for n, ax in enumerate(self.axes):
484 484 if ax.firsttime:
485 485
486 486 dig = int(numpy.log10(ymax))
487 487 if dig == 0:
488 488 digD = len(str(ymax)) - 2
489 489 ydec = ymax*(10**digD)
490 490
491 491 dig = int(numpy.log10(ydec))
492 492 ystep = ((ydec + (10**(dig)))//10**(dig))*(10**(dig))
493 493 ystep = ystep/5
494 494 ystep = ystep/(10**digD)
495 495
496 496 else:
497 497 ystep = ((ymax + (10**(dig)))//10**(dig))*(10**(dig))
498 498 ystep = ystep/5
499 499
500 500 if self.xaxis is not 'time':
501 501
502 502 dig = int(numpy.log10(xmax))
503 503
504 504 if dig <= 0:
505 505 digD = len(str(xmax)) - 2
506 506 xdec = xmax*(10**digD)
507 507
508 508 dig = int(numpy.log10(xdec))
509 509 xstep = ((xdec + (10**(dig)))//10**(dig))*(10**(dig))
510 510 xstep = xstep*0.5
511 511 xstep = xstep/(10**digD)
512 512
513 513 else:
514 514 xstep = ((xmax + (10**(dig)))//10**(dig))*(10**(dig))
515 515 xstep = xstep/5
516 516
517 517 ax.set_facecolor(self.bgcolor)
518 518 ax.yaxis.set_major_locator(MultipleLocator(ystep))
519 519 if self.xscale:
520 520 ax.xaxis.set_major_formatter(FuncFormatter(
521 521 lambda x, pos: '{0:g}'.format(x*self.xscale)))
522 522 if self.xscale:
523 523 ax.yaxis.set_major_formatter(FuncFormatter(
524 524 lambda x, pos: '{0:g}'.format(x*self.yscale)))
525 525 if self.xaxis is 'time':
526 526 ax.xaxis.set_major_formatter(FuncFormatter(self.__fmtTime))
527 527 ax.xaxis.set_major_locator(LinearLocator(9))
528 528 else:
529 529 ax.xaxis.set_major_locator(MultipleLocator(xstep))
530 530 if self.xlabel is not None:
531 531 ax.set_xlabel(self.xlabel)
532 532 ax.set_ylabel(self.ylabel)
533 533 ax.firsttime = False
534 534 if self.showprofile:
535 535 self.pf_axes[n].set_ylim(ymin, ymax)
536 536 self.pf_axes[n].set_xlim(self.zmin, self.zmax)
537 537 self.pf_axes[n].set_xlabel('dB')
538 538 self.pf_axes[n].grid(b=True, axis='x')
539 539 [tick.set_visible(False)
540 540 for tick in self.pf_axes[n].get_yticklabels()]
541 541 if self.colorbar:
542 542 ax.cbar = plt.colorbar(
543 543 ax.plt, ax=ax, fraction=0.05, pad=0.02, aspect=10)
544 544 ax.cbar.ax.tick_params(labelsize=8)
545 545 ax.cbar.ax.press = None
546 546 if self.cb_label:
547 547 ax.cbar.set_label(self.cb_label, size=8)
548 548 elif self.cb_labels:
549 549 ax.cbar.set_label(self.cb_labels[n], size=8)
550 550 else:
551 551 ax.cbar = None
552 552 if self.grid:
553 553 ax.grid(True)
554 554
555 555 if not self.polar:
556 556 ax.set_xlim(xmin, xmax)
557 557 ax.set_ylim(ymin, ymax)
558 558 ax.set_title('{} {} {}'.format(
559 559 self.titles[n],
560 560 self.getDateTime(self.data.max_time).strftime(
561 561 '%Y-%m-%d %H:%M:%S'),
562 562 self.time_label),
563 563 size=8)
564 564 else:
565 565 ax.set_title('{}'.format(self.titles[n]), size=8)
566 566 ax.set_ylim(0, 90)
567 567 ax.set_yticks(numpy.arange(0, 90, 20))
568 568 ax.yaxis.labelpad = 40
569 569
570 570 def clear_figures(self):
571 571 '''
572 572 Reset axes for redraw plots
573 573 '''
574 574
575 575 for ax in self.axes:
576 576 ax.clear()
577 577 ax.firsttime = True
578 578 if ax.cbar:
579 579 ax.cbar.remove()
580 580
581 581 def __plot(self):
582 582 '''
583 583 Main function to plot, format and save figures
584 584 '''
585 585
586 586 try:
587 587 self.plot()
588 588 self.format()
589 589 except Exception as e:
590 590 log.warning('{} Plot could not be updated... check data'.format(
591 591 self.CODE), self.name)
592 592 log.error(str(e), '')
593 593 return
594 594
595 595 for n, fig in enumerate(self.figures):
596 596 if self.nrows == 0 or self.nplots == 0:
597 597 log.warning('No data', self.name)
598 598 fig.text(0.5, 0.5, 'No Data', fontsize='large', ha='center')
599 599 fig.canvas.manager.set_window_title(self.CODE)
600 600 continue
601 601
602 602 fig.tight_layout()
603 603 fig.canvas.manager.set_window_title('{} - {}'.format(self.title,
604 604 self.getDateTime(self.data.max_time).strftime('%Y/%m/%d')))
605 605 fig.canvas.draw()
606 606 if self.show:
607 607 fig.show()
608 608 # figpause(0.1)
609 609
610 610 if self.save:
611 611 self.save_figure(n)
612 612
613 613 if self.plot_server:
614 614 self.send_to_server()
615 615
616 616 def save_figure(self, n):
617 617 '''
618 618 '''
619 619
620 620 if self.save_counter < self.save_period:
621 621 self.save_counter += 1
622 622 return
623 623
624 624 self.save_counter = 1
625 625
626 626 fig = self.figures[n]
627 627
628 if self.save_labels:
629 labels = self.save_labels
628 if self.save_code:
629 if isinstance(self.save_code, str):
630 labels = [self.save_code for x in self.figures]
631 else:
632 labels = self.save_code
630 633 else:
631 labels = list(range(self.nrows))
634 labels = [self.CODE for x in self.figures]
632 635
633 if self.oneFigure:
634 label = ''
635 else:
636 label = '-{}'.format(labels[n])
637 636 figname = os.path.join(
638 637 self.save,
639 self.CODE,
640 '{}{}_{}.png'.format(
641 self.CODE,
642 label,
638 labels[n],
639 '{}_{}.png'.format(
640 labels[n],
643 641 self.getDateTime(self.data.max_time).strftime(
644 642 '%Y%m%d_%H%M%S'
645 643 ),
646 644 )
647 645 )
648 646 log.log('Saving figure: {}'.format(figname), self.name)
649 647 if not os.path.isdir(os.path.dirname(figname)):
650 648 os.makedirs(os.path.dirname(figname))
651 649 fig.savefig(figname)
652 650
653 if self.realtime:
651 if self.throttle == 0:
654 652 figname = os.path.join(
655 653 self.save,
656 '{}{}_{}.png'.format(
657 self.CODE,
658 label,
654 '{}_{}.png'.format(
655 labels[n],
659 656 self.getDateTime(self.data.min_time).strftime(
660 657 '%Y%m%d'
661 658 ),
662 659 )
663 660 )
664 661 fig.savefig(figname)
665 662
666 663 def send_to_server(self):
667 664 '''
668 665 '''
669 666
670 667 if self.sender_counter < self.sender_period:
671 668 self.sender_counter += 1
672 669 return
673 670
674 671 self.sender_counter = 1
675 672 self.data.meta['titles'] = self.titles
676 673 retries = 2
677 674 while True:
678 675 self.socket.send_string(self.data.jsonify(self.plot_name, self.plot_type))
679 676 socks = dict(self.poll.poll(5000))
680 677 if socks.get(self.socket) == zmq.POLLIN:
681 678 reply = self.socket.recv_string()
682 679 if reply == 'ok':
683 680 log.log("Response from server ok", self.name)
684 681 break
685 682 else:
686 683 log.warning(
687 684 "Malformed reply from server: {}".format(reply), self.name)
688 685
689 686 else:
690 687 log.warning(
691 688 "No response from server, retrying...", self.name)
692 689 self.socket.setsockopt(zmq.LINGER, 0)
693 690 self.socket.close()
694 691 self.poll.unregister(self.socket)
695 692 retries -= 1
696 693 if retries == 0:
697 694 log.error(
698 695 "Server seems to be offline, abandoning", self.name)
699 696 self.socket = self.context.socket(zmq.REQ)
700 697 self.socket.connect(self.plot_server)
701 698 self.poll.register(self.socket, zmq.POLLIN)
702 699 time.sleep(1)
703 700 break
704 701 self.socket = self.context.socket(zmq.REQ)
705 702 self.socket.connect(self.plot_server)
706 703 self.poll.register(self.socket, zmq.POLLIN)
707 704 time.sleep(0.5)
708 705
709 706 def setup(self):
710 707 '''
711 708 This method should be implemented in the child class, the following
712 709 attributes should be set:
713 710
714 711 self.nrows: number of rows
715 712 self.ncols: number of cols
716 713 self.nplots: number of plots (channels or pairs)
717 714 self.ylabel: label for Y axes
718 715 self.titles: list of axes title
719 716
720 717 '''
721 718 raise NotImplementedError
722 719
723 720 def plot(self):
724 721 '''
725 722 Must be defined in the child class
726 723 '''
727 724 raise NotImplementedError
728 725
729 726 def run(self, dataOut, **kwargs):
730 727 '''
731 728 Main plotting routine
732 729 '''
733 730
734 731 if self.isConfig is False:
735 732 self.__setup(**kwargs)
736 733
737 734 t = getattr(dataOut, self.attr_time)
738 735
739 736 if dataOut.useLocalTime:
740 737 self.getDateTime = datetime.datetime.fromtimestamp
741 738 if not self.localtime:
742 739 t += time.timezone
743 740 else:
744 741 self.getDateTime = datetime.datetime.utcfromtimestamp
745 742 if self.localtime:
746 743 t -= time.timezone
747 744
748 745 if 'buffer' in self.plot_type:
749 746 if self.xmin is None:
750 747 self.tmin = t
751 748 self.xmin = self.getDateTime(t).hour
752 749 else:
753 750 self.tmin = (
754 751 self.getDateTime(t).replace(
755 752 hour=int(self.xmin),
756 753 minute=0,
757 754 second=0) - self.getDateTime(0)).total_seconds()
758 755
759 756 self.data.setup()
760 757 self.isConfig = True
761 758 if self.plot_server:
762 759 self.context = zmq.Context()
763 760 self.socket = self.context.socket(zmq.REQ)
764 761 self.socket.connect(self.plot_server)
765 762 self.poll = zmq.Poller()
766 763 self.poll.register(self.socket, zmq.POLLIN)
767 764
768 765 tm = getattr(dataOut, self.attr_time)
769 766
770 767 if not dataOut.useLocalTime and self.localtime:
771 768 tm -= time.timezone
772 769 if dataOut.useLocalTime and not self.localtime:
773 770 tm += time.timezone
774 771
775 772 if self.xaxis is 'time' and self.data and (tm - self.tmin) >= self.xrange*60*60:
776 773 self.save_counter = self.save_period
777 774 self.__plot()
778 775 self.xmin += self.xrange
779 776 if self.xmin >= 24:
780 777 self.xmin -= 24
781 778 self.tmin += self.xrange*60*60
782 779 self.data.setup()
783 780 self.clear_figures()
784 781
785 782 self.data.update(dataOut, tm)
786 783
787 784 if self.isPlotConfig is False:
788 785 self.__setup_plot()
789 786 self.isPlotConfig = True
790 787
791 if self.realtime:
788 if self.throttle == 0:
792 789 self.__plot()
793 790 else:
794 791 self.__throttle_plot(self.__plot)#, coerce=coerce)
795 792
796 793 def close(self):
797 794
798 795 if self.data:
799 796 self.save_counter = self.save_period
800 797 self.__plot()
801 798 if self.data and self.pause:
802 799 figpause(10)
803 800
@@ -1,1574 +1,1575
1 1 """
2 2 Created on Jul 2, 2014
3 3
4 4 @author: roj-idl71
5 5 """
6 6 import os
7 7 import sys
8 8 import glob
9 9 import time
10 10 import numpy
11 11 import fnmatch
12 12 import inspect
13 13 import time
14 14 import datetime
15 15 import zmq
16 16
17 17 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
18 18 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
19 19 from schainpy.utils import log
20 20 import schainpy.admin
21 21
22 22 LOCALTIME = True
23 23 DT_DIRECTIVES = {
24 24 '%Y': 4,
25 25 '%y': 2,
26 26 '%m': 2,
27 27 '%d': 2,
28 28 '%j': 3,
29 29 '%H': 2,
30 30 '%M': 2,
31 31 '%S': 2,
32 32 '%f': 6
33 33 }
34 34
35 35
36 36 def isNumber(cad):
37 37 """
38 38 Chequea si el conjunto de caracteres que componen un string puede ser convertidos a un numero.
39 39
40 40 Excepciones:
41 41 Si un determinado string no puede ser convertido a numero
42 42 Input:
43 43 str, string al cual se le analiza para determinar si convertible a un numero o no
44 44
45 45 Return:
46 46 True : si el string es uno numerico
47 47 False : no es un string numerico
48 48 """
49 49 try:
50 50 float(cad)
51 51 return True
52 52 except:
53 53 return False
54 54
55 55
56 56 def isFileInEpoch(filename, startUTSeconds, endUTSeconds):
57 57 """
58 58 Esta funcion determina si un archivo de datos se encuentra o no dentro del rango de fecha especificado.
59 59
60 60 Inputs:
61 61 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
62 62
63 63 startUTSeconds : fecha inicial del rango seleccionado. La fecha esta dada en
64 64 segundos contados desde 01/01/1970.
65 65 endUTSeconds : fecha final del rango seleccionado. La fecha esta dada en
66 66 segundos contados desde 01/01/1970.
67 67
68 68 Return:
69 69 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
70 70 fecha especificado, de lo contrario retorna False.
71 71
72 72 Excepciones:
73 73 Si el archivo no existe o no puede ser abierto
74 74 Si la cabecera no puede ser leida.
75 75
76 76 """
77 77 basicHeaderObj = BasicHeader(LOCALTIME)
78 78
79 79 try:
80 80 fp = open(filename, 'rb')
81 81 except IOError:
82 82 print("The file %s can't be opened" % (filename))
83 83 return 0
84 84
85 85 sts = basicHeaderObj.read(fp)
86 86 fp.close()
87 87
88 88 if not(sts):
89 89 print("Skipping the file %s because it has not a valid header" % (filename))
90 90 return 0
91 91
92 92 if not ((startUTSeconds <= basicHeaderObj.utc) and (endUTSeconds > basicHeaderObj.utc)):
93 93 return 0
94 94
95 95 return 1
96 96
97 97
98 98 def isTimeInRange(thisTime, startTime, endTime):
99 99 if endTime >= startTime:
100 100 if (thisTime < startTime) or (thisTime > endTime):
101 101 return 0
102 102 return 1
103 103 else:
104 104 if (thisTime < startTime) and (thisTime > endTime):
105 105 return 0
106 106 return 1
107 107
108 108
109 109 def isFileInTimeRange(filename, startDate, endDate, startTime, endTime):
110 110 """
111 111 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
112 112
113 113 Inputs:
114 114 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
115 115
116 116 startDate : fecha inicial del rango seleccionado en formato datetime.date
117 117
118 118 endDate : fecha final del rango seleccionado en formato datetime.date
119 119
120 120 startTime : tiempo inicial del rango seleccionado en formato datetime.time
121 121
122 122 endTime : tiempo final del rango seleccionado en formato datetime.time
123 123
124 124 Return:
125 125 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
126 126 fecha especificado, de lo contrario retorna False.
127 127
128 128 Excepciones:
129 129 Si el archivo no existe o no puede ser abierto
130 130 Si la cabecera no puede ser leida.
131 131
132 132 """
133 133
134 134 try:
135 135 fp = open(filename, 'rb')
136 136 except IOError:
137 137 print("The file %s can't be opened" % (filename))
138 138 return None
139 139
140 140 firstBasicHeaderObj = BasicHeader(LOCALTIME)
141 141 systemHeaderObj = SystemHeader()
142 142 radarControllerHeaderObj = RadarControllerHeader()
143 143 processingHeaderObj = ProcessingHeader()
144 144
145 145 lastBasicHeaderObj = BasicHeader(LOCALTIME)
146 146
147 147 sts = firstBasicHeaderObj.read(fp)
148 148
149 149 if not(sts):
150 150 print("[Reading] Skipping the file %s because it has not a valid header" % (filename))
151 151 return None
152 152
153 153 if not systemHeaderObj.read(fp):
154 154 return None
155 155
156 156 if not radarControllerHeaderObj.read(fp):
157 157 return None
158 158
159 159 if not processingHeaderObj.read(fp):
160 160 return None
161 161
162 162 filesize = os.path.getsize(filename)
163 163
164 164 offset = processingHeaderObj.blockSize + 24 # header size
165 165
166 166 if filesize <= offset:
167 167 print("[Reading] %s: This file has not enough data" % filename)
168 168 return None
169 169
170 170 fp.seek(-offset, 2)
171 171
172 172 sts = lastBasicHeaderObj.read(fp)
173 173
174 174 fp.close()
175 175
176 176 thisDatetime = lastBasicHeaderObj.datatime
177 177 thisTime_last_block = thisDatetime.time()
178 178
179 179 thisDatetime = firstBasicHeaderObj.datatime
180 180 thisDate = thisDatetime.date()
181 181 thisTime_first_block = thisDatetime.time()
182 182
183 183 # General case
184 184 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
185 185 #-----------o----------------------------o-----------
186 186 # startTime endTime
187 187
188 188 if endTime >= startTime:
189 189 if (thisTime_last_block < startTime) or (thisTime_first_block > endTime):
190 190 return None
191 191
192 192 return thisDatetime
193 193
194 194 # If endTime < startTime then endTime belongs to the next day
195 195
196 196 #<<<<<<<<<<<o o>>>>>>>>>>>
197 197 #-----------o----------------------------o-----------
198 198 # endTime startTime
199 199
200 200 if (thisDate == startDate) and (thisTime_last_block < startTime):
201 201 return None
202 202
203 203 if (thisDate == endDate) and (thisTime_first_block > endTime):
204 204 return None
205 205
206 206 if (thisTime_last_block < startTime) and (thisTime_first_block > endTime):
207 207 return None
208 208
209 209 return thisDatetime
210 210
211 211
212 212 def isFolderInDateRange(folder, startDate=None, endDate=None):
213 213 """
214 214 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
215 215
216 216 Inputs:
217 217 folder : nombre completo del directorio.
218 218 Su formato deberia ser "/path_root/?YYYYDDD"
219 219
220 220 siendo:
221 221 YYYY : Anio (ejemplo 2015)
222 222 DDD : Dia del anio (ejemplo 305)
223 223
224 224 startDate : fecha inicial del rango seleccionado en formato datetime.date
225 225
226 226 endDate : fecha final del rango seleccionado en formato datetime.date
227 227
228 228 Return:
229 229 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
230 230 fecha especificado, de lo contrario retorna False.
231 231 Excepciones:
232 232 Si el directorio no tiene el formato adecuado
233 233 """
234 234
235 235 basename = os.path.basename(folder)
236 236
237 237 if not isRadarFolder(basename):
238 238 print("The folder %s has not the rigth format" % folder)
239 239 return 0
240 240
241 241 if startDate and endDate:
242 242 thisDate = getDateFromRadarFolder(basename)
243 243
244 244 if thisDate < startDate:
245 245 return 0
246 246
247 247 if thisDate > endDate:
248 248 return 0
249 249
250 250 return 1
251 251
252 252
253 253 def isFileInDateRange(filename, startDate=None, endDate=None):
254 254 """
255 255 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
256 256
257 257 Inputs:
258 258 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
259 259
260 260 Su formato deberia ser "?YYYYDDDsss"
261 261
262 262 siendo:
263 263 YYYY : Anio (ejemplo 2015)
264 264 DDD : Dia del anio (ejemplo 305)
265 265 sss : set
266 266
267 267 startDate : fecha inicial del rango seleccionado en formato datetime.date
268 268
269 269 endDate : fecha final del rango seleccionado en formato datetime.date
270 270
271 271 Return:
272 272 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
273 273 fecha especificado, de lo contrario retorna False.
274 274 Excepciones:
275 275 Si el archivo no tiene el formato adecuado
276 276 """
277 277
278 278 basename = os.path.basename(filename)
279 279
280 280 if not isRadarFile(basename):
281 281 print("The filename %s has not the rigth format" % filename)
282 282 return 0
283 283
284 284 if startDate and endDate:
285 285 thisDate = getDateFromRadarFile(basename)
286 286
287 287 if thisDate < startDate:
288 288 return 0
289 289
290 290 if thisDate > endDate:
291 291 return 0
292 292
293 293 return 1
294 294
295 295
296 296 def getFileFromSet(path, ext, set):
297 297 validFilelist = []
298 298 fileList = os.listdir(path)
299 299
300 300 # 0 1234 567 89A BCDE
301 301 # H YYYY DDD SSS .ext
302 302
303 303 for thisFile in fileList:
304 304 try:
305 305 year = int(thisFile[1:5])
306 306 doy = int(thisFile[5:8])
307 307 except:
308 308 continue
309 309
310 310 if (os.path.splitext(thisFile)[-1].lower() != ext.lower()):
311 311 continue
312 312
313 313 validFilelist.append(thisFile)
314 314
315 315 myfile = fnmatch.filter(
316 316 validFilelist, '*%4.4d%3.3d%3.3d*' % (year, doy, set))
317 317
318 318 if len(myfile) != 0:
319 319 return myfile[0]
320 320 else:
321 321 filename = '*%4.4d%3.3d%3.3d%s' % (year, doy, set, ext.lower())
322 322 print('the filename %s does not exist' % filename)
323 323 print('...going to the last file: ')
324 324
325 325 if validFilelist:
326 326 validFilelist = sorted(validFilelist, key=str.lower)
327 327 return validFilelist[-1]
328 328
329 329 return None
330 330
331 331
332 332 def getlastFileFromPath(path, ext):
333 333 """
334 334 Depura el fileList dejando solo los que cumplan el formato de "PYYYYDDDSSS.ext"
335 335 al final de la depuracion devuelve el ultimo file de la lista que quedo.
336 336
337 337 Input:
338 338 fileList : lista conteniendo todos los files (sin path) que componen una determinada carpeta
339 339 ext : extension de los files contenidos en una carpeta
340 340
341 341 Return:
342 342 El ultimo file de una determinada carpeta, no se considera el path.
343 343 """
344 344 validFilelist = []
345 345 fileList = os.listdir(path)
346 346
347 347 # 0 1234 567 89A BCDE
348 348 # H YYYY DDD SSS .ext
349 349
350 350 for thisFile in fileList:
351 351
352 352 year = thisFile[1:5]
353 353 if not isNumber(year):
354 354 continue
355 355
356 356 doy = thisFile[5:8]
357 357 if not isNumber(doy):
358 358 continue
359 359
360 360 year = int(year)
361 361 doy = int(doy)
362 362
363 363 if (os.path.splitext(thisFile)[-1].lower() != ext.lower()):
364 364 continue
365 365
366 366 validFilelist.append(thisFile)
367 367
368 368 if validFilelist:
369 369 validFilelist = sorted(validFilelist, key=str.lower)
370 370 return validFilelist[-1]
371 371
372 372 return None
373 373
374 374
375 375 def isRadarFolder(folder):
376 376 try:
377 377 year = int(folder[1:5])
378 378 doy = int(folder[5:8])
379 379 except:
380 380 return 0
381 381
382 382 return 1
383 383
384 384
385 385 def isRadarFile(file):
386 386 try:
387 387 year = int(file[1:5])
388 388 doy = int(file[5:8])
389 389 set = int(file[8:11])
390 390 except:
391 391 return 0
392 392
393 393 return 1
394 394
395 395
396 396 def getDateFromRadarFile(file):
397 397 try:
398 398 year = int(file[1:5])
399 399 doy = int(file[5:8])
400 400 set = int(file[8:11])
401 401 except:
402 402 return None
403 403
404 404 thisDate = datetime.date(year, 1, 1) + datetime.timedelta(doy - 1)
405 405 return thisDate
406 406
407 407
408 408 def getDateFromRadarFolder(folder):
409 409 try:
410 410 year = int(folder[1:5])
411 411 doy = int(folder[5:8])
412 412 except:
413 413 return None
414 414
415 415 thisDate = datetime.date(year, 1, 1) + datetime.timedelta(doy - 1)
416 416 return thisDate
417 417
418 418 def parse_format(s, fmt):
419 419
420 420 for i in range(fmt.count('%')):
421 421 x = fmt.index('%')
422 422 d = DT_DIRECTIVES[fmt[x:x+2]]
423 423 fmt = fmt.replace(fmt[x:x+2], s[x:x+d])
424 424 return fmt
425 425
426 426 class Reader(object):
427 427
428 428 c = 3E8
429 429 isConfig = False
430 430 dtype = None
431 431 pathList = []
432 432 filenameList = []
433 433 datetimeList = []
434 434 filename = None
435 435 ext = None
436 436 flagIsNewFile = 1
437 437 flagDiscontinuousBlock = 0
438 438 flagIsNewBlock = 0
439 439 flagNoMoreFiles = 0
440 440 fp = None
441 441 firstHeaderSize = 0
442 442 basicHeaderSize = 24
443 443 versionFile = 1103
444 444 fileSize = None
445 445 fileSizeByHeader = None
446 446 fileIndex = -1
447 447 profileIndex = None
448 448 blockIndex = 0
449 449 nTotalBlocks = 0
450 450 maxTimeStep = 30
451 451 lastUTTime = None
452 452 datablock = None
453 453 dataOut = None
454 454 getByBlock = False
455 455 path = None
456 456 startDate = None
457 457 endDate = None
458 458 startTime = datetime.time(0, 0, 0)
459 459 endTime = datetime.time(23, 59, 59)
460 460 set = None
461 461 expLabel = ""
462 462 online = False
463 463 delay = 60
464 464 nTries = 3 # quantity tries
465 465 nFiles = 3 # number of files for searching
466 466 walk = True
467 467 getblock = False
468 468 nTxs = 1
469 469 realtime = False
470 470 blocksize = 0
471 471 blocktime = None
472 472 warnings = True
473 473 verbose = True
474 474 server = None
475 475 format = None
476 476 oneDDict = None
477 477 twoDDict = None
478 478 independentParam = None
479 479 filefmt = None
480 480 folderfmt = None
481 481 open_file = open
482 482 open_mode = 'rb'
483 483
484 484 def run(self):
485 485
486 486 raise NotImplementedError
487 487
488 488 def getAllowedArgs(self):
489 489 if hasattr(self, '__attrs__'):
490 490 return self.__attrs__
491 491 else:
492 492 return inspect.getargspec(self.run).args
493 493
494 494 def set_kwargs(self, **kwargs):
495 495
496 496 for key, value in kwargs.items():
497 497 setattr(self, key, value)
498 498
499 499 def find_folders(self, path, startDate, endDate, folderfmt, last=False):
500 500
501 501 folders = [x for f in path.split(',')
502 502 for x in os.listdir(f) if os.path.isdir(os.path.join(f, x))]
503 503 folders.sort()
504 504
505 505 if last:
506 506 folders = [folders[-1]]
507 507
508 508 for folder in folders:
509 509 try:
510 510 dt = datetime.datetime.strptime(parse_format(folder, folderfmt), folderfmt).date()
511 511 if dt >= startDate and dt <= endDate:
512 512 yield os.path.join(path, folder)
513 513 else:
514 514 log.log('Skiping folder {}'.format(folder), self.name)
515 515 except Exception as e:
516 516 log.log('Skiping folder {}'.format(folder), self.name)
517 517 continue
518 518 return
519 519
520 520 def find_files(self, folders, ext, filefmt, startDate=None, endDate=None,
521 521 expLabel='', last=False):
522 522
523 523 for path in folders:
524 524 files = glob.glob1(path, '*{}'.format(ext))
525 525 files.sort()
526 526 if last:
527 527 if files:
528 528 fo = files[-1]
529 529 try:
530 530 dt = datetime.datetime.strptime(parse_format(fo, filefmt), filefmt).date()
531 531 yield os.path.join(path, expLabel, fo)
532 532 except Exception as e:
533 533 pass
534 534 return
535 535 else:
536 536 return
537 537
538 538 for fo in files:
539 539 try:
540 540 dt = datetime.datetime.strptime(parse_format(fo, filefmt), filefmt).date()
541 541 if dt >= startDate and dt <= endDate:
542 542 yield os.path.join(path, expLabel, fo)
543 543 else:
544 544 log.log('Skiping file {}'.format(fo), self.name)
545 545 except Exception as e:
546 546 log.log('Skiping file {}'.format(fo), self.name)
547 547 continue
548 548
549 549 def searchFilesOffLine(self, path, startDate, endDate,
550 550 expLabel, ext, walk,
551 551 filefmt, folderfmt):
552 552 """Search files in offline mode for the given arguments
553 553
554 554 Return:
555 555 Generator of files
556 556 """
557 557
558 558 if walk:
559 559 folders = self.find_folders(
560 560 path, startDate, endDate, folderfmt)
561 561 else:
562 562 folders = path.split(',')
563 563
564 564 return self.find_files(
565 565 folders, ext, filefmt, startDate, endDate, expLabel)
566 566
567 567 def searchFilesOnLine(self, path, startDate, endDate,
568 568 expLabel, ext, walk,
569 569 filefmt, folderfmt):
570 570 """Search for the last file of the last folder
571 571
572 572 Arguments:
573 573 path : carpeta donde estan contenidos los files que contiene data
574 574 expLabel : Nombre del subexperimento (subfolder)
575 575 ext : extension de los files
576 576 walk : Si es habilitado no realiza busquedas dentro de los ubdirectorios (doypath)
577 577
578 578 Return:
579 579 generator with the full path of last filename
580 580 """
581 581
582 582 if walk:
583 583 folders = self.find_folders(
584 584 path, startDate, endDate, folderfmt, last=True)
585 585 else:
586 586 folders = path.split(',')
587 587
588 588 return self.find_files(
589 589 folders, ext, filefmt, startDate, endDate, expLabel, last=True)
590 590
591 591 def setNextFile(self):
592 592 """Set the next file to be readed open it and parse de file header"""
593 593
594 594 while True:
595 595 if self.fp != None:
596 596 self.fp.close()
597 597
598 598 if self.online:
599 599 newFile = self.setNextFileOnline()
600 600 else:
601 601 newFile = self.setNextFileOffline()
602 602
603 603 if not(newFile):
604 604 if self.online:
605 605 raise schainpy.admin.SchainError('Time to wait for new files reach')
606 606 else:
607 607 if self.fileIndex == -1:
608 608 raise schainpy.admin.SchainWarning('No files found in the given path')
609 609 else:
610 610 raise schainpy.admin.SchainWarning('No more files to read')
611 611
612 612 if self.verifyFile(self.filename):
613 613 break
614 614
615 615 log.log('Opening file: %s' % self.filename, self.name)
616 616
617 617 self.readFirstHeader()
618 618 self.nReadBlocks = 0
619 619
620 620 def setNextFileOnline(self):
621 621 """Check for the next file to be readed in online mode.
622 622
623 623 Set:
624 624 self.filename
625 625 self.fp
626 626 self.filesize
627 627
628 628 Return:
629 629 boolean
630 630
631 631 """
632 632 nextFile = True
633 633 nextDay = False
634 634
635 635 for nFiles in range(self.nFiles+1):
636 636 for nTries in range(self.nTries):
637 637 fullfilename, filename = self.checkForRealPath(nextFile, nextDay)
638 638 if fullfilename is not None:
639 639 break
640 640 log.warning(
641 641 "Waiting %0.2f sec for the next file: \"%s\" , try %02d ..." % (self.delay, filename, nTries + 1),
642 642 self.name)
643 643 time.sleep(self.delay)
644 644 nextFile = False
645 645 continue
646 646
647 if fullfilename:
647 if fullfilename is not None:
648 648 break
649 649
650 650 self.nTries = 1
651 651 nextFile = True
652 652
653 653 if nFiles == (self.nFiles - 1):
654 654 log.log('Trying with next day...', self.name)
655 nextDay = True
655 nextDay = True
656 self.nTries = 3
656 657
657 658 if fullfilename:
658 659 self.fileSize = os.path.getsize(fullfilename)
659 660 self.filename = fullfilename
660 661 self.flagIsNewFile = 1
661 662 if self.fp != None:
662 663 self.fp.close()
663 664 self.fp = self.open_file(fullfilename, self.open_mode)
664 665 self.flagNoMoreFiles = 0
665 666 self.fileIndex += 1
666 667 return 1
667 668 else:
668 669 return 0
669 670
670 671 def setNextFileOffline(self):
671 672 """Open the next file to be readed in offline mode"""
672 673
673 674 try:
674 675 filename = next(self.filenameList)
675 676 self.fileIndex +=1
676 677 except StopIteration:
677 678 self.flagNoMoreFiles = 1
678 679 return 0
679 680
680 681 self.filename = filename
681 682 self.fileSize = os.path.getsize(filename)
682 683 self.fp = self.open_file(filename, self.open_mode)
683 684 self.flagIsNewFile = 1
684 685
685 686 return 1
686 687
687 688 @staticmethod
688 689 def isDateTimeInRange(dt, startDate, endDate, startTime, endTime):
689 690 """Check if the given datetime is in range"""
690 691
691 692 if startDate <= dt.date() <= endDate:
692 693 if startTime <= dt.time() <= endTime:
693 694 return True
694 695 return False
695 696
696 697 def verifyFile(self, filename):
697 698 """Check for a valid file
698 699
699 700 Arguments:
700 701 filename -- full path filename
701 702
702 703 Return:
703 704 boolean
704 705 """
705 706
706 707 return True
707 708
708 709 def checkForRealPath(self, nextFile, nextDay):
709 710 """Check if the next file to be readed exists"""
710 711
711 712 raise NotImplementedError
712 713
713 714 def readFirstHeader(self):
714 715 """Parse the file header"""
715 716
716 717 pass
717 718
718 719 class JRODataReader(Reader):
719 720
720 721 utc = 0
721 722 nReadBlocks = 0
722 723 foldercounter = 0
723 724 firstHeaderSize = 0
724 725 basicHeaderSize = 24
725 726 __isFirstTimeOnline = 1
726 727 __printInfo = True
727 728 filefmt = "*%Y%j***"
728 729 folderfmt = "*%Y%j"
729 730
730 731 def getDtypeWidth(self):
731 732
732 733 dtype_index = get_dtype_index(self.dtype)
733 734 dtype_width = get_dtype_width(dtype_index)
734 735
735 736 return dtype_width
736 737
737 738 def checkForRealPath(self, nextFile, nextDay):
738 739 """Check if the next file to be readed exists.
739 740
740 741 Example :
741 742 nombre correcto del file es .../.../D2009307/P2009307367.ext
742 743
743 744 Entonces la funcion prueba con las siguientes combinaciones
744 745 .../.../y2009307367.ext
745 746 .../.../Y2009307367.ext
746 747 .../.../x2009307/y2009307367.ext
747 748 .../.../x2009307/Y2009307367.ext
748 749 .../.../X2009307/y2009307367.ext
749 750 .../.../X2009307/Y2009307367.ext
750 751 siendo para este caso, la ultima combinacion de letras, identica al file buscado
751 752
752 753 Return:
753 754 str -- fullpath of the file
754 755 """
755 756
756 757
757 758 if nextFile:
758 759 self.set += 1
759 760 if nextDay:
760 761 self.set = 0
761 762 self.doy += 1
762 763 foldercounter = 0
763 764 prefixDirList = [None, 'd', 'D']
764 765 if self.ext.lower() == ".r": # voltage
765 766 prefixFileList = ['d', 'D']
766 767 elif self.ext.lower() == ".pdata": # spectra
767 768 prefixFileList = ['p', 'P']
768 769
769 770 # barrido por las combinaciones posibles
770 771 for prefixDir in prefixDirList:
771 772 thispath = self.path
772 773 if prefixDir != None:
773 774 # formo el nombre del directorio xYYYYDDD (x=d o x=D)
774 775 if foldercounter == 0:
775 776 thispath = os.path.join(self.path, "%s%04d%03d" %
776 777 (prefixDir, self.year, self.doy))
777 778 else:
778 779 thispath = os.path.join(self.path, "%s%04d%03d_%02d" % (
779 780 prefixDir, self.year, self.doy, foldercounter))
780 781 for prefixFile in prefixFileList: # barrido por las dos combinaciones posibles de "D"
781 782 # formo el nombre del file xYYYYDDDSSS.ext
782 783 filename = "%s%04d%03d%03d%s" % (prefixFile, self.year, self.doy, self.set, self.ext)
783 784 fullfilename = os.path.join(
784 785 thispath, filename)
785 786
786 787 if os.path.exists(fullfilename):
787 788 return fullfilename, filename
788 789
789 790 return None, filename
790 791
791 792 def __waitNewBlock(self):
792 793 """
793 794 Return 1 si se encontro un nuevo bloque de datos, 0 de otra forma.
794 795
795 796 Si el modo de lectura es OffLine siempre retorn 0
796 797 """
797 798 if not self.online:
798 799 return 0
799 800
800 801 if (self.nReadBlocks >= self.processingHeaderObj.dataBlocksPerFile):
801 802 return 0
802 803
803 804 currentPointer = self.fp.tell()
804 805
805 806 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
806 807
807 808 for nTries in range(self.nTries):
808 809
809 810 self.fp.close()
810 811 self.fp = open(self.filename, 'rb')
811 812 self.fp.seek(currentPointer)
812 813
813 814 self.fileSize = os.path.getsize(self.filename)
814 815 currentSize = self.fileSize - currentPointer
815 816
816 817 if (currentSize >= neededSize):
817 818 self.basicHeaderObj.read(self.fp)
818 819 return 1
819 820
820 821 if self.fileSize == self.fileSizeByHeader:
821 822 # self.flagEoF = True
822 823 return 0
823 824
824 825 print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1))
825 826 time.sleep(self.delay)
826 827
827 828 return 0
828 829
829 830 def waitDataBlock(self, pointer_location, blocksize=None):
830 831
831 832 currentPointer = pointer_location
832 833 if blocksize is None:
833 834 neededSize = self.processingHeaderObj.blockSize # + self.basicHeaderSize
834 835 else:
835 836 neededSize = blocksize
836 837
837 838 for nTries in range(self.nTries):
838 839 self.fp.close()
839 840 self.fp = open(self.filename, 'rb')
840 841 self.fp.seek(currentPointer)
841 842
842 843 self.fileSize = os.path.getsize(self.filename)
843 844 currentSize = self.fileSize - currentPointer
844 845
845 846 if (currentSize >= neededSize):
846 847 return 1
847 848
848 849 log.warning(
849 850 "Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1),
850 851 self.name
851 852 )
852 853 time.sleep(self.delay)
853 854
854 855 return 0
855 856
856 857 def __setNewBlock(self):
857 858
858 859 if self.fp == None:
859 860 return 0
860 861
861 862 if self.flagIsNewFile:
862 863 self.lastUTTime = self.basicHeaderObj.utc
863 864 return 1
864 865
865 866 if self.realtime:
866 867 self.flagDiscontinuousBlock = 1
867 868 if not(self.setNextFile()):
868 869 return 0
869 870 else:
870 871 return 1
871 872
872 873 currentSize = self.fileSize - self.fp.tell()
873 874 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
874 875
875 876 if (currentSize >= neededSize):
876 877 self.basicHeaderObj.read(self.fp)
877 878 self.lastUTTime = self.basicHeaderObj.utc
878 879 return 1
879 880
880 881 if self.__waitNewBlock():
881 882 self.lastUTTime = self.basicHeaderObj.utc
882 883 return 1
883 884
884 885 if not(self.setNextFile()):
885 886 return 0
886 887
887 888 deltaTime = self.basicHeaderObj.utc - self.lastUTTime
888 889 self.lastUTTime = self.basicHeaderObj.utc
889 890
890 891 self.flagDiscontinuousBlock = 0
891 892
892 893 if deltaTime > self.maxTimeStep:
893 894 self.flagDiscontinuousBlock = 1
894 895
895 896 return 1
896 897
897 898 def readNextBlock(self):
898 899
899 900 while True:
900 901 self.__setNewBlock()
901 902
902 903 if not(self.readBlock()):
903 904 return 0
904 905
905 906 self.getBasicHeader()
906 907
907 908 if not self.isDateTimeInRange(self.dataOut.datatime, self.startDate, self.endDate, self.startTime, self.endTime):
908 909 print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.nReadBlocks,
909 910 self.processingHeaderObj.dataBlocksPerFile,
910 911 self.dataOut.datatime.ctime()))
911 912 continue
912 913
913 914 break
914 915
915 916 if self.verbose:
916 917 print("[Reading] Block No. %d/%d -> %s" % (self.nReadBlocks,
917 918 self.processingHeaderObj.dataBlocksPerFile,
918 919 self.dataOut.datatime.ctime()))
919 920 return 1
920 921
921 922 def readFirstHeader(self):
922 923
923 924 self.basicHeaderObj.read(self.fp)
924 925 self.systemHeaderObj.read(self.fp)
925 926 self.radarControllerHeaderObj.read(self.fp)
926 927 self.processingHeaderObj.read(self.fp)
927 928 self.firstHeaderSize = self.basicHeaderObj.size
928 929
929 930 datatype = int(numpy.log2((self.processingHeaderObj.processFlags &
930 931 PROCFLAG.DATATYPE_MASK)) - numpy.log2(PROCFLAG.DATATYPE_CHAR))
931 932 if datatype == 0:
932 933 datatype_str = numpy.dtype([('real', '<i1'), ('imag', '<i1')])
933 934 elif datatype == 1:
934 935 datatype_str = numpy.dtype([('real', '<i2'), ('imag', '<i2')])
935 936 elif datatype == 2:
936 937 datatype_str = numpy.dtype([('real', '<i4'), ('imag', '<i4')])
937 938 elif datatype == 3:
938 939 datatype_str = numpy.dtype([('real', '<i8'), ('imag', '<i8')])
939 940 elif datatype == 4:
940 941 datatype_str = numpy.dtype([('real', '<f4'), ('imag', '<f4')])
941 942 elif datatype == 5:
942 943 datatype_str = numpy.dtype([('real', '<f8'), ('imag', '<f8')])
943 944 else:
944 945 raise ValueError('Data type was not defined')
945 946
946 947 self.dtype = datatype_str
947 948 #self.ippSeconds = 2 * 1000 * self.radarControllerHeaderObj.ipp / self.c
948 949 self.fileSizeByHeader = self.processingHeaderObj.dataBlocksPerFile * self.processingHeaderObj.blockSize + \
949 950 self.firstHeaderSize + self.basicHeaderSize * \
950 951 (self.processingHeaderObj.dataBlocksPerFile - 1)
951 952 # self.dataOut.channelList = numpy.arange(self.systemHeaderObj.numChannels)
952 953 # self.dataOut.channelIndexList = numpy.arange(self.systemHeaderObj.numChannels)
953 954 self.getBlockDimension()
954 955
955 956 def verifyFile(self, filename, msgFlag=True):
956 957
957 958 msg = None
958 959
959 960 try:
960 961 fp = open(filename, 'rb')
961 962 except IOError:
962 963
963 964 if msgFlag:
964 965 print("[Reading] File %s can't be opened" % (filename))
965 966
966 967 return False
967 968
968 969 if self.waitDataBlock(0):
969 970 basicHeaderObj = BasicHeader(LOCALTIME)
970 971 systemHeaderObj = SystemHeader()
971 972 radarControllerHeaderObj = RadarControllerHeader()
972 973 processingHeaderObj = ProcessingHeader()
973 974
974 975 if not(basicHeaderObj.read(fp)):
975 976 fp.close()
976 977 return False
977 978
978 979 if not(systemHeaderObj.read(fp)):
979 980 fp.close()
980 981 return False
981 982
982 983 if not(radarControllerHeaderObj.read(fp)):
983 984 fp.close()
984 985 return False
985 986
986 987 if not(processingHeaderObj.read(fp)):
987 988 fp.close()
988 989 return False
989 990
990 991 if not self.online:
991 992 dt1 = basicHeaderObj.datatime
992 993 fp.seek(self.fileSize-processingHeaderObj.blockSize-24)
993 994 if not(basicHeaderObj.read(fp)):
994 995 fp.close()
995 996 return False
996 997 dt2 = basicHeaderObj.datatime
997 998 if not self.isDateTimeInRange(dt1, self.startDate, self.endDate, self.startTime, self.endTime) and not \
998 999 self.isDateTimeInRange(dt2, self.startDate, self.endDate, self.startTime, self.endTime):
999 1000 return False
1000 1001
1001 1002 fp.close()
1002 1003
1003 1004 return True
1004 1005
1005 1006 def findDatafiles(self, path, startDate=None, endDate=None, expLabel='', ext='.r', walk=True, include_path=False):
1006 1007
1007 1008 path_empty = True
1008 1009
1009 1010 dateList = []
1010 1011 pathList = []
1011 1012
1012 1013 multi_path = path.split(',')
1013 1014
1014 1015 if not walk:
1015 1016
1016 1017 for single_path in multi_path:
1017 1018
1018 1019 if not os.path.isdir(single_path):
1019 1020 continue
1020 1021
1021 1022 fileList = glob.glob1(single_path, "*" + ext)
1022 1023
1023 1024 if not fileList:
1024 1025 continue
1025 1026
1026 1027 path_empty = False
1027 1028
1028 1029 fileList.sort()
1029 1030
1030 1031 for thisFile in fileList:
1031 1032
1032 1033 if not os.path.isfile(os.path.join(single_path, thisFile)):
1033 1034 continue
1034 1035
1035 1036 if not isRadarFile(thisFile):
1036 1037 continue
1037 1038
1038 1039 if not isFileInDateRange(thisFile, startDate, endDate):
1039 1040 continue
1040 1041
1041 1042 thisDate = getDateFromRadarFile(thisFile)
1042 1043
1043 1044 if thisDate in dateList or single_path in pathList:
1044 1045 continue
1045 1046
1046 1047 dateList.append(thisDate)
1047 1048 pathList.append(single_path)
1048 1049
1049 1050 else:
1050 1051 for single_path in multi_path:
1051 1052
1052 1053 if not os.path.isdir(single_path):
1053 1054 continue
1054 1055
1055 1056 dirList = []
1056 1057
1057 1058 for thisPath in os.listdir(single_path):
1058 1059
1059 1060 if not os.path.isdir(os.path.join(single_path, thisPath)):
1060 1061 continue
1061 1062
1062 1063 if not isRadarFolder(thisPath):
1063 1064 continue
1064 1065
1065 1066 if not isFolderInDateRange(thisPath, startDate, endDate):
1066 1067 continue
1067 1068
1068 1069 dirList.append(thisPath)
1069 1070
1070 1071 if not dirList:
1071 1072 continue
1072 1073
1073 1074 dirList.sort()
1074 1075
1075 1076 for thisDir in dirList:
1076 1077
1077 1078 datapath = os.path.join(single_path, thisDir, expLabel)
1078 1079 fileList = glob.glob1(datapath, "*" + ext)
1079 1080
1080 1081 if not fileList:
1081 1082 continue
1082 1083
1083 1084 path_empty = False
1084 1085
1085 1086 thisDate = getDateFromRadarFolder(thisDir)
1086 1087
1087 1088 pathList.append(datapath)
1088 1089 dateList.append(thisDate)
1089 1090
1090 1091 dateList.sort()
1091 1092
1092 1093 if walk:
1093 1094 pattern_path = os.path.join(multi_path[0], "[dYYYYDDD]", expLabel)
1094 1095 else:
1095 1096 pattern_path = multi_path[0]
1096 1097
1097 1098 if path_empty:
1098 1099 raise schainpy.admin.SchainError("[Reading] No *%s files in %s for %s to %s" % (ext, pattern_path, startDate, endDate))
1099 1100 else:
1100 1101 if not dateList:
1101 1102 raise schainpy.admin.SchainError("[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" % (startDate, endDate, ext, path))
1102 1103
1103 1104 if include_path:
1104 1105 return dateList, pathList
1105 1106
1106 1107 return dateList
1107 1108
1108 1109 def setup(self, **kwargs):
1109 1110
1110 1111 self.set_kwargs(**kwargs)
1111 1112 if not self.ext.startswith('.'):
1112 1113 self.ext = '.{}'.format(self.ext)
1113 1114
1114 1115 if self.server is not None:
1115 1116 if 'tcp://' in self.server:
1116 1117 address = server
1117 1118 else:
1118 1119 address = 'ipc:///tmp/%s' % self.server
1119 1120 self.server = address
1120 1121 self.context = zmq.Context()
1121 1122 self.receiver = self.context.socket(zmq.PULL)
1122 1123 self.receiver.connect(self.server)
1123 1124 time.sleep(0.5)
1124 1125 print('[Starting] ReceiverData from {}'.format(self.server))
1125 1126 else:
1126 1127 self.server = None
1127 1128 if self.path == None:
1128 1129 raise ValueError("[Reading] The path is not valid")
1129 1130
1130 1131 if self.online:
1131 1132 log.log("[Reading] Searching files in online mode...", self.name)
1132 1133
1133 1134 for nTries in range(self.nTries):
1134 1135 fullpath = self.searchFilesOnLine(self.path, self.startDate,
1135 1136 self.endDate, self.expLabel, self.ext, self.walk,
1136 1137 self.filefmt, self.folderfmt)
1137 1138
1138 1139 try:
1139 1140 fullpath = next(fullpath)
1140 1141 except:
1141 1142 fullpath = None
1142 1143
1143 1144 if fullpath:
1144 1145 break
1145 1146
1146 1147 log.warning(
1147 1148 'Waiting {} sec for a valid file in {}: try {} ...'.format(
1148 1149 self.delay, self.path, nTries + 1),
1149 1150 self.name)
1150 1151 time.sleep(self.delay)
1151 1152
1152 1153 if not(fullpath):
1153 1154 raise schainpy.admin.SchainError(
1154 1155 'There isn\'t any valid file in {}'.format(self.path))
1155 1156
1156 1157 pathname, filename = os.path.split(fullpath)
1157 1158 self.year = int(filename[1:5])
1158 1159 self.doy = int(filename[5:8])
1159 1160 self.set = int(filename[8:11]) - 1
1160 1161 else:
1161 1162 log.log("Searching files in {}".format(self.path), self.name)
1162 1163 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
1163 1164 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
1164 1165
1165 1166 self.setNextFile()
1166 1167
1167 1168 return
1168 1169
1169 1170 def getBasicHeader(self):
1170 1171
1171 1172 self.dataOut.utctime = self.basicHeaderObj.utc + self.basicHeaderObj.miliSecond / \
1172 1173 1000. + self.profileIndex * self.radarControllerHeaderObj.ippSeconds
1173 1174
1174 1175 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
1175 1176
1176 1177 self.dataOut.timeZone = self.basicHeaderObj.timeZone
1177 1178
1178 1179 self.dataOut.dstFlag = self.basicHeaderObj.dstFlag
1179 1180
1180 1181 self.dataOut.errorCount = self.basicHeaderObj.errorCount
1181 1182
1182 1183 self.dataOut.useLocalTime = self.basicHeaderObj.useLocalTime
1183 1184
1184 1185 self.dataOut.ippSeconds = self.radarControllerHeaderObj.ippSeconds / self.nTxs
1185 1186
1186 1187 # self.dataOut.nProfiles = self.processingHeaderObj.profilesPerBlock*self.nTxs
1187 1188
1188 1189 def getFirstHeader(self):
1189 1190
1190 1191 raise NotImplementedError
1191 1192
1192 1193 def getData(self):
1193 1194
1194 1195 raise NotImplementedError
1195 1196
1196 1197 def hasNotDataInBuffer(self):
1197 1198
1198 1199 raise NotImplementedError
1199 1200
1200 1201 def readBlock(self):
1201 1202
1202 1203 raise NotImplementedError
1203 1204
1204 1205 def isEndProcess(self):
1205 1206
1206 1207 return self.flagNoMoreFiles
1207 1208
1208 1209 def printReadBlocks(self):
1209 1210
1210 1211 print("[Reading] Number of read blocks per file %04d" % self.nReadBlocks)
1211 1212
1212 1213 def printTotalBlocks(self):
1213 1214
1214 1215 print("[Reading] Number of read blocks %04d" % self.nTotalBlocks)
1215 1216
1216 1217 def printNumberOfBlock(self):
1217 1218 'SPAM!'
1218 1219
1219 1220 # if self.flagIsNewBlock:
1220 1221 # print "[Reading] Block No. %d/%d -> %s" %(self.nReadBlocks,
1221 1222 # self.processingHeaderObj.dataBlocksPerFile,
1222 1223 # self.dataOut.datatime.ctime())
1223 1224
1224 1225 def printInfo(self):
1225 1226
1226 1227 if self.__printInfo == False:
1227 1228 return
1228 1229
1229 1230 self.basicHeaderObj.printInfo()
1230 1231 self.systemHeaderObj.printInfo()
1231 1232 self.radarControllerHeaderObj.printInfo()
1232 1233 self.processingHeaderObj.printInfo()
1233 1234
1234 1235 self.__printInfo = False
1235 1236
1236 1237 def run(self, **kwargs):
1237 1238 """
1238 1239
1239 1240 Arguments:
1240 1241 path :
1241 1242 startDate :
1242 1243 endDate :
1243 1244 startTime :
1244 1245 endTime :
1245 1246 set :
1246 1247 expLabel :
1247 1248 ext :
1248 1249 online :
1249 1250 delay :
1250 1251 walk :
1251 1252 getblock :
1252 1253 nTxs :
1253 1254 realtime :
1254 1255 blocksize :
1255 1256 blocktime :
1256 1257 skip :
1257 1258 cursor :
1258 1259 warnings :
1259 1260 server :
1260 1261 verbose :
1261 1262 format :
1262 1263 oneDDict :
1263 1264 twoDDict :
1264 1265 independentParam :
1265 1266 """
1266 1267
1267 1268 if not(self.isConfig):
1268 1269 self.setup(**kwargs)
1269 1270 self.isConfig = True
1270 1271 if self.server is None:
1271 1272 self.getData()
1272 1273 else:
1273 1274 self.getFromServer()
1274 1275
1275 1276
1276 1277 class JRODataWriter(Reader):
1277 1278
1278 1279 """
1279 1280 Esta clase permite escribir datos a archivos procesados (.r o ,pdata). La escritura
1280 1281 de los datos siempre se realiza por bloques.
1281 1282 """
1282 1283
1283 1284 setFile = None
1284 1285 profilesPerBlock = None
1285 1286 blocksPerFile = None
1286 1287 nWriteBlocks = 0
1287 1288 fileDate = None
1288 1289
1289 1290 def __init__(self, dataOut=None):
1290 1291 raise NotImplementedError
1291 1292
1292 1293 def hasAllDataInBuffer(self):
1293 1294 raise NotImplementedError
1294 1295
1295 1296 def setBlockDimension(self):
1296 1297 raise NotImplementedError
1297 1298
1298 1299 def writeBlock(self):
1299 1300 raise NotImplementedError
1300 1301
1301 1302 def putData(self):
1302 1303 raise NotImplementedError
1303 1304
1304 1305 def getDtypeWidth(self):
1305 1306
1306 1307 dtype_index = get_dtype_index(self.dtype)
1307 1308 dtype_width = get_dtype_width(dtype_index)
1308 1309
1309 1310 return dtype_width
1310 1311
1311 1312 def getProcessFlags(self):
1312 1313
1313 1314 processFlags = 0
1314 1315
1315 1316 dtype_index = get_dtype_index(self.dtype)
1316 1317 procflag_dtype = get_procflag_dtype(dtype_index)
1317 1318
1318 1319 processFlags += procflag_dtype
1319 1320
1320 1321 if self.dataOut.flagDecodeData:
1321 1322 processFlags += PROCFLAG.DECODE_DATA
1322 1323
1323 1324 if self.dataOut.flagDeflipData:
1324 1325 processFlags += PROCFLAG.DEFLIP_DATA
1325 1326
1326 1327 if self.dataOut.code is not None:
1327 1328 processFlags += PROCFLAG.DEFINE_PROCESS_CODE
1328 1329
1329 1330 if self.dataOut.nCohInt > 1:
1330 1331 processFlags += PROCFLAG.COHERENT_INTEGRATION
1331 1332
1332 1333 if self.dataOut.type == "Spectra":
1333 1334 if self.dataOut.nIncohInt > 1:
1334 1335 processFlags += PROCFLAG.INCOHERENT_INTEGRATION
1335 1336
1336 1337 if self.dataOut.data_dc is not None:
1337 1338 processFlags += PROCFLAG.SAVE_CHANNELS_DC
1338 1339
1339 1340 if self.dataOut.flagShiftFFT:
1340 1341 processFlags += PROCFLAG.SHIFT_FFT_DATA
1341 1342
1342 1343 return processFlags
1343 1344
1344 1345 def setBasicHeader(self):
1345 1346
1346 1347 self.basicHeaderObj.size = self.basicHeaderSize # bytes
1347 1348 self.basicHeaderObj.version = self.versionFile
1348 1349 self.basicHeaderObj.dataBlock = self.nTotalBlocks
1349 1350 utc = numpy.floor(self.dataOut.utctime)
1350 1351 milisecond = (self.dataOut.utctime - utc) * 1000.0
1351 1352 self.basicHeaderObj.utc = utc
1352 1353 self.basicHeaderObj.miliSecond = milisecond
1353 1354 self.basicHeaderObj.timeZone = self.dataOut.timeZone
1354 1355 self.basicHeaderObj.dstFlag = self.dataOut.dstFlag
1355 1356 self.basicHeaderObj.errorCount = self.dataOut.errorCount
1356 1357
1357 1358 def setFirstHeader(self):
1358 1359 """
1359 1360 Obtiene una copia del First Header
1360 1361
1361 1362 Affected:
1362 1363
1363 1364 self.basicHeaderObj
1364 1365 self.systemHeaderObj
1365 1366 self.radarControllerHeaderObj
1366 1367 self.processingHeaderObj self.
1367 1368
1368 1369 Return:
1369 1370 None
1370 1371 """
1371 1372
1372 1373 raise NotImplementedError
1373 1374
1374 1375 def __writeFirstHeader(self):
1375 1376 """
1376 1377 Escribe el primer header del file es decir el Basic header y el Long header (SystemHeader, RadarControllerHeader, ProcessingHeader)
1377 1378
1378 1379 Affected:
1379 1380 __dataType
1380 1381
1381 1382 Return:
1382 1383 None
1383 1384 """
1384 1385
1385 1386 # CALCULAR PARAMETROS
1386 1387
1387 1388 sizeLongHeader = self.systemHeaderObj.size + \
1388 1389 self.radarControllerHeaderObj.size + self.processingHeaderObj.size
1389 1390 self.basicHeaderObj.size = self.basicHeaderSize + sizeLongHeader
1390 1391
1391 1392 self.basicHeaderObj.write(self.fp)
1392 1393 self.systemHeaderObj.write(self.fp)
1393 1394 self.radarControllerHeaderObj.write(self.fp)
1394 1395 self.processingHeaderObj.write(self.fp)
1395 1396
1396 1397 def __setNewBlock(self):
1397 1398 """
1398 1399 Si es un nuevo file escribe el First Header caso contrario escribe solo el Basic Header
1399 1400
1400 1401 Return:
1401 1402 0 : si no pudo escribir nada
1402 1403 1 : Si escribio el Basic el First Header
1403 1404 """
1404 1405 if self.fp == None:
1405 1406 self.setNextFile()
1406 1407
1407 1408 if self.flagIsNewFile:
1408 1409 return 1
1409 1410
1410 1411 if self.blockIndex < self.processingHeaderObj.dataBlocksPerFile:
1411 1412 self.basicHeaderObj.write(self.fp)
1412 1413 return 1
1413 1414
1414 1415 if not(self.setNextFile()):
1415 1416 return 0
1416 1417
1417 1418 return 1
1418 1419
1419 1420 def writeNextBlock(self):
1420 1421 """
1421 1422 Selecciona el bloque siguiente de datos y los escribe en un file
1422 1423
1423 1424 Return:
1424 1425 0 : Si no hizo pudo escribir el bloque de datos
1425 1426 1 : Si no pudo escribir el bloque de datos
1426 1427 """
1427 1428 if not(self.__setNewBlock()):
1428 1429 return 0
1429 1430
1430 1431 self.writeBlock()
1431 1432
1432 1433 print("[Writing] Block No. %d/%d" % (self.blockIndex,
1433 1434 self.processingHeaderObj.dataBlocksPerFile))
1434 1435
1435 1436 return 1
1436 1437
1437 1438 def setNextFile(self):
1438 1439 """Determina el siguiente file que sera escrito
1439 1440
1440 1441 Affected:
1441 1442 self.filename
1442 1443 self.subfolder
1443 1444 self.fp
1444 1445 self.setFile
1445 1446 self.flagIsNewFile
1446 1447
1447 1448 Return:
1448 1449 0 : Si el archivo no puede ser escrito
1449 1450 1 : Si el archivo esta listo para ser escrito
1450 1451 """
1451 1452 ext = self.ext
1452 1453 path = self.path
1453 1454
1454 1455 if self.fp != None:
1455 1456 self.fp.close()
1456 1457
1457 1458 if not os.path.exists(path):
1458 1459 os.mkdir(path)
1459 1460
1460 1461 timeTuple = time.localtime(self.dataOut.utctime)
1461 1462 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year, timeTuple.tm_yday)
1462 1463
1463 1464 fullpath = os.path.join(path, subfolder)
1464 1465 setFile = self.setFile
1465 1466
1466 1467 if not(os.path.exists(fullpath)):
1467 1468 os.mkdir(fullpath)
1468 1469 setFile = -1 # inicializo mi contador de seteo
1469 1470 else:
1470 1471 filesList = os.listdir(fullpath)
1471 1472 if len(filesList) > 0:
1472 1473 filesList = sorted(filesList, key=str.lower)
1473 1474 filen = filesList[-1]
1474 1475 # el filename debera tener el siguiente formato
1475 1476 # 0 1234 567 89A BCDE (hex)
1476 1477 # x YYYY DDD SSS .ext
1477 1478 if isNumber(filen[8:11]):
1478 1479 # inicializo mi contador de seteo al seteo del ultimo file
1479 1480 setFile = int(filen[8:11])
1480 1481 else:
1481 1482 setFile = -1
1482 1483 else:
1483 1484 setFile = -1 # inicializo mi contador de seteo
1484 1485
1485 1486 setFile += 1
1486 1487
1487 1488 # If this is a new day it resets some values
1488 1489 if self.dataOut.datatime.date() > self.fileDate:
1489 1490 setFile = 0
1490 1491 self.nTotalBlocks = 0
1491 1492
1492 1493 filen = '{}{:04d}{:03d}{:03d}{}'.format(
1493 1494 self.optchar, timeTuple.tm_year, timeTuple.tm_yday, setFile, ext)
1494 1495
1495 1496 filename = os.path.join(path, subfolder, filen)
1496 1497
1497 1498 fp = open(filename, 'wb')
1498 1499
1499 1500 self.blockIndex = 0
1500 1501 self.filename = filename
1501 1502 self.subfolder = subfolder
1502 1503 self.fp = fp
1503 1504 self.setFile = setFile
1504 1505 self.flagIsNewFile = 1
1505 1506 self.fileDate = self.dataOut.datatime.date()
1506 1507 self.setFirstHeader()
1507 1508
1508 1509 print('[Writing] Opening file: %s' % self.filename)
1509 1510
1510 1511 self.__writeFirstHeader()
1511 1512
1512 1513 return 1
1513 1514
1514 1515 def setup(self, dataOut, path, blocksPerFile, profilesPerBlock=64, set=None, ext=None, datatype=4):
1515 1516 """
1516 1517 Setea el tipo de formato en la cual sera guardada la data y escribe el First Header
1517 1518
1518 1519 Inputs:
1519 1520 path : directory where data will be saved
1520 1521 profilesPerBlock : number of profiles per block
1521 1522 set : initial file set
1522 1523 datatype : An integer number that defines data type:
1523 1524 0 : int8 (1 byte)
1524 1525 1 : int16 (2 bytes)
1525 1526 2 : int32 (4 bytes)
1526 1527 3 : int64 (8 bytes)
1527 1528 4 : float32 (4 bytes)
1528 1529 5 : double64 (8 bytes)
1529 1530
1530 1531 Return:
1531 1532 0 : Si no realizo un buen seteo
1532 1533 1 : Si realizo un buen seteo
1533 1534 """
1534 1535
1535 1536 if ext == None:
1536 1537 ext = self.ext
1537 1538
1538 1539 self.ext = ext.lower()
1539 1540
1540 1541 self.path = path
1541 1542
1542 1543 if set is None:
1543 1544 self.setFile = -1
1544 1545 else:
1545 1546 self.setFile = set - 1
1546 1547
1547 1548 self.blocksPerFile = blocksPerFile
1548 1549 self.profilesPerBlock = profilesPerBlock
1549 1550 self.dataOut = dataOut
1550 1551 self.fileDate = self.dataOut.datatime.date()
1551 1552 self.dtype = self.dataOut.dtype
1552 1553
1553 1554 if datatype is not None:
1554 1555 self.dtype = get_numpy_dtype(datatype)
1555 1556
1556 1557 if not(self.setNextFile()):
1557 1558 print("[Writing] There isn't a next file")
1558 1559 return 0
1559 1560
1560 1561 self.setBlockDimension()
1561 1562
1562 1563 return 1
1563 1564
1564 1565 def run(self, dataOut, path, blocksPerFile=100, profilesPerBlock=64, set=None, ext=None, datatype=4, **kwargs):
1565 1566
1566 1567 if not(self.isConfig):
1567 1568
1568 1569 self.setup(dataOut, path, blocksPerFile, profilesPerBlock=profilesPerBlock,
1569 1570 set=set, ext=ext, datatype=datatype, **kwargs)
1570 1571 self.isConfig = True
1571 1572
1572 1573 self.dataOut = dataOut
1573 1574 self.putData()
1574 1575 return self.dataOut
@@ -1,429 +1,429
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14
15 15 import os
16 16 import sys
17 17 import inspect
18 18 import zmq
19 19 import time
20 20 import pickle
21 21 import traceback
22 22 try:
23 23 from queue import Queue
24 24 except:
25 25 from Queue import Queue
26 26 from threading import Thread
27 27 from multiprocessing import Process
28 28
29 29 from schainpy.utils import log
30 30
31 31
32 32 class ProcessingUnit(object):
33 33
34 34 """
35 35 Update - Jan 2018 - MULTIPROCESSING
36 36 All the "call" methods present in the previous base were removed.
37 37 The majority of operations are independant processes, thus
38 38 the decorator is in charge of communicate the operation processes
39 39 with the proccessing unit via IPC.
40 40
41 41 The constructor does not receive any argument. The remaining methods
42 42 are related with the operations to execute.
43 43
44 44
45 45 """
46 46 proc_type = 'processing'
47 47 __attrs__ = []
48 48
49 49 def __init__(self):
50 50
51 51 self.dataIn = None
52 52 self.dataOut = None
53 53 self.isConfig = False
54 54 self.operations = []
55 55 self.plots = []
56 56
57 57 def getAllowedArgs(self):
58 58 if hasattr(self, '__attrs__'):
59 59 return self.__attrs__
60 60 else:
61 61 return inspect.getargspec(self.run).args
62 62
63 63 def addOperation(self, conf, operation):
64 64 """
65 65 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
66 66 posses the id of the operation process (IPC purposes)
67 67
68 68 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
69 69 identificador asociado a este objeto.
70 70
71 71 Input:
72 72
73 73 object : objeto de la clase "Operation"
74 74
75 75 Return:
76 76
77 77 objId : identificador del objeto, necesario para comunicar con master(procUnit)
78 78 """
79 79
80 80 self.operations.append(
81 81 (operation, conf.type, conf.id, conf.getKwargs()))
82 82
83 83 if 'plot' in self.name.lower():
84 84 self.plots.append(operation.CODE)
85 85
86 86 def getOperationObj(self, objId):
87 87
88 88 if objId not in list(self.operations.keys()):
89 89 return None
90 90
91 91 return self.operations[objId]
92 92
93 93 def operation(self, **kwargs):
94 94 """
95 95 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
96 96 atributos del objeto dataOut
97 97
98 98 Input:
99 99
100 100 **kwargs : Diccionario de argumentos de la funcion a ejecutar
101 101 """
102 102
103 103 raise NotImplementedError
104 104
105 105 def setup(self):
106 106
107 107 raise NotImplementedError
108 108
109 109 def run(self):
110 110
111 111 raise NotImplementedError
112 112
113 113 def close(self):
114 114
115 115 return
116 116
117 117
118 118 class Operation(object):
119 119
120 120 """
121 121 Update - Jan 2018 - MULTIPROCESSING
122 122
123 123 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
124 124 The constructor doe snot receive any argument, neither the baseclass.
125 125
126 126
127 127 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
128 128 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
129 129 acumulacion dentro de esta clase
130 130
131 131 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
132 132
133 133 """
134 134 proc_type = 'operation'
135 135 __attrs__ = []
136 136
137 137 def __init__(self):
138 138
139 139 self.id = None
140 140 self.isConfig = False
141 141
142 142 if not hasattr(self, 'name'):
143 143 self.name = self.__class__.__name__
144 144
145 145 def getAllowedArgs(self):
146 146 if hasattr(self, '__attrs__'):
147 147 return self.__attrs__
148 148 else:
149 149 return inspect.getargspec(self.run).args
150 150
151 151 def setup(self):
152 152
153 153 self.isConfig = True
154 154
155 155 raise NotImplementedError
156 156
157 157 def run(self, dataIn, **kwargs):
158 158 """
159 159 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
160 160 atributos del objeto dataIn.
161 161
162 162 Input:
163 163
164 164 dataIn : objeto del tipo JROData
165 165
166 166 Return:
167 167
168 168 None
169 169
170 170 Affected:
171 171 __buffer : buffer de recepcion de datos.
172 172
173 173 """
174 174 if not self.isConfig:
175 175 self.setup(**kwargs)
176 176
177 177 raise NotImplementedError
178 178
179 179 def close(self):
180 180
181 181 return
182 182
183 183 class InputQueue(Thread):
184 184
185 185 '''
186 186 Class to hold input data for Proccessing Units and external Operations,
187 187 '''
188 188
189 189 def __init__(self, project_id, inputId, lock=None):
190 190
191 191 Thread.__init__(self)
192 192 self.queue = Queue()
193 193 self.project_id = project_id
194 194 self.inputId = inputId
195 195 self.lock = lock
196 196 self.islocked = False
197 197 self.size = 0
198 198
199 199 def run(self):
200 200
201 201 c = zmq.Context()
202 202 self.receiver = c.socket(zmq.SUB)
203 203 self.receiver.connect(
204 204 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
205 205 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
206 206
207 207 while True:
208 208 obj = self.receiver.recv_multipart()[1]
209 209 self.size += sys.getsizeof(obj)
210 210 self.queue.put(obj)
211 211
212 212 def get(self):
213 213
214 if not self.islocked and self.size/1000000 > 1024:
214 if not self.islocked and self.size/1000000 > 512:
215 215 self.lock.n.value += 1
216 216 self.islocked = True
217 217 self.lock.clear()
218 elif self.islocked and self.size/1000000 <= 1024:
218 elif self.islocked and self.size/1000000 <= 512:
219 219 self.islocked = False
220 220 self.lock.n.value -= 1
221 221 if self.lock.n.value == 0:
222 222 self.lock.set()
223 223
224 224 obj = self.queue.get()
225 225 self.size -= sys.getsizeof(obj)
226 226 return pickle.loads(obj)
227 227
228 228
229 229 def MPDecorator(BaseClass):
230 230 """
231 231 Multiprocessing class decorator
232 232
233 233 This function add multiprocessing features to a BaseClass. Also, it handle
234 234 the communication beetween processes (readers, procUnits and operations).
235 235 """
236 236
237 237 class MPClass(BaseClass, Process):
238 238
239 239 def __init__(self, *args, **kwargs):
240 240 super(MPClass, self).__init__()
241 241 Process.__init__(self)
242 242 self.operationKwargs = {}
243 243 self.args = args
244 244 self.kwargs = kwargs
245 245 self.sender = None
246 246 self.receiver = None
247 247 self.i = 0
248 248 self.t = time.time()
249 249 self.name = BaseClass.__name__
250 250 self.__doc__ = BaseClass.__doc__
251 251
252 252 if 'plot' in self.name.lower() and not self.name.endswith('_'):
253 253 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
254 254
255 255 self.start_time = time.time()
256 256 self.id = args[0]
257 257 self.inputId = args[1]
258 258 self.project_id = args[2]
259 259 self.err_queue = args[3]
260 260 self.lock = args[4]
261 261 self.typeProc = args[5]
262 262 self.err_queue.put('#_start_#')
263 263 if self.inputId is not None:
264 264 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
265 265
266 266 def subscribe(self):
267 267 '''
268 268 Start the zmq socket receiver and subcribe to input ID.
269 269 '''
270 270
271 271 self.queue.start()
272 272
273 273 def listen(self):
274 274 '''
275 275 This function waits for objects
276 276 '''
277 277
278 278 return self.queue.get()
279 279
280 280 def set_publisher(self):
281 281 '''
282 282 This function create a zmq socket for publishing objects.
283 283 '''
284 284
285 285 time.sleep(0.5)
286 286
287 287 c = zmq.Context()
288 288 self.sender = c.socket(zmq.PUB)
289 289 self.sender.connect(
290 290 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
291 291
292 292 def publish(self, data, id):
293 293 '''
294 294 This function publish an object, to an specific topic.
295 295 It blocks publishing when receiver queue is full to avoid data loss
296 296 '''
297 297
298 298 if self.inputId is None:
299 299 self.lock.wait()
300 300 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
301 301
302 302 def runReader(self):
303 303 '''
304 304 Run fuction for read units
305 305 '''
306 306 while True:
307 307
308 308 try:
309 309 BaseClass.run(self, **self.kwargs)
310 310 except:
311 311 err = traceback.format_exc()
312 312 if 'No more files' in err:
313 313 log.warning('No more files to read', self.name)
314 314 else:
315 315 self.err_queue.put('{}|{}'.format(self.name, err))
316 316 self.dataOut.error = True
317 317
318 318 for op, optype, opId, kwargs in self.operations:
319 319 if optype == 'self' and not self.dataOut.flagNoData:
320 320 op(**kwargs)
321 321 elif optype == 'other' and not self.dataOut.flagNoData:
322 322 self.dataOut = op.run(self.dataOut, **self.kwargs)
323 323 elif optype == 'external':
324 324 self.publish(self.dataOut, opId)
325 325
326 326 if self.dataOut.flagNoData and not self.dataOut.error:
327 327 continue
328 328
329 329 self.publish(self.dataOut, self.id)
330 330
331 331 if self.dataOut.error:
332 332 break
333 333
334 334 time.sleep(0.5)
335 335
336 336 def runProc(self):
337 337 '''
338 338 Run function for proccessing units
339 339 '''
340 340
341 341 while True:
342 342 self.dataIn = self.listen()
343 343
344 344 if self.dataIn.flagNoData and self.dataIn.error is None:
345 345 continue
346 346 elif not self.dataIn.error:
347 347 try:
348 348 BaseClass.run(self, **self.kwargs)
349 349 except:
350 350 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
351 351 self.dataOut.error = True
352 352 elif self.dataIn.error:
353 353 self.dataOut.error = self.dataIn.error
354 354 self.dataOut.flagNoData = True
355 355
356 356 for op, optype, opId, kwargs in self.operations:
357 357 if optype == 'self' and not self.dataOut.flagNoData:
358 358 op(**kwargs)
359 359 elif optype == 'other' and not self.dataOut.flagNoData:
360 360 self.dataOut = op.run(self.dataOut, **kwargs)
361 361 elif optype == 'external' and not self.dataOut.flagNoData:
362 362 self.publish(self.dataOut, opId)
363 363
364 364 self.publish(self.dataOut, self.id)
365 365 for op, optype, opId, kwargs in self.operations:
366 366 if optype == 'external' and self.dataOut.error:
367 367 self.publish(self.dataOut, opId)
368 368
369 369 if self.dataOut.error:
370 370 break
371 371
372 372 time.sleep(0.5)
373 373
374 374 def runOp(self):
375 375 '''
376 376 Run function for external operations (this operations just receive data
377 377 ex: plots, writers, publishers)
378 378 '''
379 379
380 380 while True:
381 381
382 382 dataOut = self.listen()
383 383
384 384 if not dataOut.error:
385 385 try:
386 386 BaseClass.run(self, dataOut, **self.kwargs)
387 387 except:
388 388 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
389 389 dataOut.error = True
390 390 else:
391 391 break
392 392
393 393 def run(self):
394 394 if self.typeProc is "ProcUnit":
395 395
396 396 if self.inputId is not None:
397 397 self.subscribe()
398 398
399 399 self.set_publisher()
400 400
401 401 if 'Reader' not in BaseClass.__name__:
402 402 self.runProc()
403 403 else:
404 404 self.runReader()
405 405
406 406 elif self.typeProc is "Operation":
407 407
408 408 self.subscribe()
409 409 self.runOp()
410 410
411 411 else:
412 412 raise ValueError("Unknown type")
413 413
414 414 self.close()
415 415
416 416 def close(self):
417 417
418 418 BaseClass.close(self)
419 419 self.err_queue.put('#_end_#')
420 420
421 421 if self.sender:
422 422 self.sender.close()
423 423
424 424 if self.receiver:
425 425 self.receiver.close()
426 426
427 427 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
428 428
429 429 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now