##// END OF EJS Templates
Fix support for ZMQ in VoltageReader
Juan C. Espinoza -
r1564:d1f6bedb3696
parent child
Show More
@@ -1,659 +1,661
1 1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
2 2 # All rights reserved.
3 3 #
4 4 # Distributed under the terms of the BSD 3-clause license.
5 5 """API to create signal chain projects
6 6
7 7 The API is provide through class: Project
8 8 """
9 9
10 10 import re
11 11 import sys
12 12 import ast
13 13 import datetime
14 14 import traceback
15 15 import time
16 16 import multiprocessing
17 17 from multiprocessing import Process, Queue
18 18 from threading import Thread
19 19 from xml.etree.ElementTree import ElementTree, Element, SubElement
20 20
21 21 from schainpy.admin import Alarm, SchainWarning
22 22 from schainpy.model import *
23 23 from schainpy.utils import log
24 24
25 25 if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7:
26 26 multiprocessing.set_start_method('fork')
27 27
28 28 class ConfBase():
29 29
30 30 def __init__(self):
31 31
32 32 self.id = '0'
33 33 self.name = None
34 34 self.priority = None
35 35 self.parameters = {}
36 36 self.object = None
37 37 self.operations = []
38 38
39 39 def getId(self):
40 40
41 41 return self.id
42 42
43 43 def getNewId(self):
44 44
45 45 return int(self.id) * 10 + len(self.operations) + 1
46 46
47 47 def updateId(self, new_id):
48 48
49 49 self.id = str(new_id)
50 50
51 51 n = 1
52 52 for conf in self.operations:
53 53 conf_id = str(int(new_id) * 10 + n)
54 54 conf.updateId(conf_id)
55 55 n += 1
56 56
57 57 def getKwargs(self):
58 58
59 59 params = {}
60 60
61 61 for key, value in self.parameters.items():
62 62 if value not in (None, '', ' '):
63 63 params[key] = value
64 64
65 65 return params
66 66
67 67 def update(self, **kwargs):
68 68
69 69 for key, value in kwargs.items():
70 70 self.addParameter(name=key, value=value)
71 71
72 72 def addParameter(self, name, value, format=None):
73 73 '''
74 74 '''
75 75
76 76 if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
77 77 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
78 78 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
79 79 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
80 80 else:
81 81 try:
82 82 self.parameters[name] = ast.literal_eval(value)
83 83 except:
84 84 if isinstance(value, str) and ',' in value:
85 85 self.parameters[name] = value.split(',')
86 86 else:
87 87 self.parameters[name] = value
88 88
89 89 def getParameters(self):
90 90
91 91 params = {}
92 92 for key, value in self.parameters.items():
93 93 s = type(value).__name__
94 94 if s == 'date':
95 95 params[key] = value.strftime('%Y/%m/%d')
96 96 elif s == 'time':
97 97 params[key] = value.strftime('%H:%M:%S')
98 98 else:
99 99 params[key] = str(value)
100 100
101 101 return params
102 102
103 103 def makeXml(self, element):
104 104
105 105 xml = SubElement(element, self.ELEMENTNAME)
106 106 for label in self.xml_labels:
107 107 xml.set(label, str(getattr(self, label)))
108 108
109 109 for key, value in self.getParameters().items():
110 110 xml_param = SubElement(xml, 'Parameter')
111 111 xml_param.set('name', key)
112 112 xml_param.set('value', value)
113 113
114 114 for conf in self.operations:
115 115 conf.makeXml(xml)
116 116
117 117 def __str__(self):
118 118
119 119 if self.ELEMENTNAME == 'Operation':
120 120 s = ' {}[id={}]\n'.format(self.name, self.id)
121 121 else:
122 122 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
123 123
124 124 for key, value in self.parameters.items():
125 125 if self.ELEMENTNAME == 'Operation':
126 126 s += ' {}: {}\n'.format(key, value)
127 127 else:
128 128 s += ' {}: {}\n'.format(key, value)
129 129
130 130 for conf in self.operations:
131 131 s += str(conf)
132 132
133 133 return s
134 134
135 135 class OperationConf(ConfBase):
136 136
137 137 ELEMENTNAME = 'Operation'
138 138 xml_labels = ['id', 'name']
139 139
140 140 def setup(self, id, name, priority, project_id, err_queue):
141 141
142 142 self.id = str(id)
143 143 self.project_id = project_id
144 144 self.name = name
145 145 self.type = 'other'
146 146 self.err_queue = err_queue
147 147
148 148 def readXml(self, element, project_id, err_queue):
149 149
150 150 self.id = element.get('id')
151 151 self.name = element.get('name')
152 152 self.type = 'other'
153 153 self.project_id = str(project_id)
154 154 self.err_queue = err_queue
155 155
156 156 for elm in element.iter('Parameter'):
157 157 self.addParameter(elm.get('name'), elm.get('value'))
158 158
159 159 def createObject(self):
160 160
161 161 className = eval(self.name)
162 162
163 163 if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name:
164 164 kwargs = self.getKwargs()
165 165 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
166 166 opObj.start()
167 167 self.type = 'external'
168 168 else:
169 169 opObj = className()
170 170
171 171 self.object = opObj
172 172 return opObj
173 173
174 174 class ProcUnitConf(ConfBase):
175 175
176 176 ELEMENTNAME = 'ProcUnit'
177 177 xml_labels = ['id', 'inputId', 'name']
178 178
179 179 def setup(self, project_id, id, name, datatype, inputId, err_queue):
180 180 '''
181 181 '''
182 182
183 183 if datatype == None and name == None:
184 184 raise ValueError('datatype or name should be defined')
185 185
186 186 if name == None:
187 187 if 'Proc' in datatype:
188 188 name = datatype
189 189 else:
190 190 name = '%sProc' % (datatype)
191 191
192 192 if datatype == None:
193 193 datatype = name.replace('Proc', '')
194 194
195 195 self.id = str(id)
196 196 self.project_id = project_id
197 197 self.name = name
198 198 self.datatype = datatype
199 199 self.inputId = inputId
200 200 self.err_queue = err_queue
201 201 self.operations = []
202 202 self.parameters = {}
203 203
204 204 def removeOperation(self, id):
205 205
206 206 i = [1 if x.id == id else 0 for x in self.operations]
207 207 self.operations.pop(i.index(1))
208 208
209 209 def getOperation(self, id):
210 210
211 211 for conf in self.operations:
212 212 if conf.id == id:
213 213 return conf
214 214
215 215 def addOperation(self, name, optype='self'):
216 216 '''
217 217 '''
218 218
219 219 id = self.getNewId()
220 220 conf = OperationConf()
221 221 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
222 222 self.operations.append(conf)
223 223
224 224 return conf
225 225
226 226 def readXml(self, element, project_id, err_queue):
227 227
228 228 self.id = element.get('id')
229 229 self.name = element.get('name')
230 230 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
231 231 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
232 232 self.project_id = str(project_id)
233 233 self.err_queue = err_queue
234 234 self.operations = []
235 235 self.parameters = {}
236 236
237 237 for elm in element:
238 238 if elm.tag == 'Parameter':
239 239 self.addParameter(elm.get('name'), elm.get('value'))
240 240 elif elm.tag == 'Operation':
241 241 conf = OperationConf()
242 242 conf.readXml(elm, project_id, err_queue)
243 243 self.operations.append(conf)
244 244
245 245 def createObjects(self):
246 246 '''
247 247 Instancia de unidades de procesamiento.
248 248 '''
249 249
250 250 className = eval(self.name)
251 251 kwargs = self.getKwargs()
252 252 procUnitObj = className()
253 253 procUnitObj.name = self.name
254 254 log.success('creating process...', self.name)
255 255
256 256 for conf in self.operations:
257 257
258 258 opObj = conf.createObject()
259 259
260 260 log.success('adding operation: {}, type:{}'.format(
261 261 conf.name,
262 262 conf.type), self.name)
263 263
264 264 procUnitObj.addOperation(conf, opObj)
265 265
266 266 self.object = procUnitObj
267 267
268 268 def run(self):
269 269 '''
270 270 '''
271 271
272 272 return self.object.call(**self.getKwargs())
273 273
274 274
275 275 class ReadUnitConf(ProcUnitConf):
276 276
277 277 ELEMENTNAME = 'ReadUnit'
278 278
279 279 def __init__(self):
280 280
281 281 self.id = None
282 282 self.datatype = None
283 283 self.name = None
284 284 self.inputId = None
285 285 self.operations = []
286 286 self.parameters = {}
287 287
288 288 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
289 startTime='', endTime='', server=None, **kwargs):
289 startTime='', endTime='', server=None, topic='', **kwargs):
290 290
291 291 if datatype == None and name == None:
292 292 raise ValueError('datatype or name should be defined')
293 293 if name == None:
294 294 if 'Reader' in datatype:
295 295 name = datatype
296 296 datatype = name.replace('Reader', '')
297 297 else:
298 298 name = '{}Reader'.format(datatype)
299 299 if datatype == None:
300 300 if 'Reader' in name:
301 301 datatype = name.replace('Reader', '')
302 302 else:
303 303 datatype = name
304 304 name = '{}Reader'.format(name)
305 305
306 306 self.id = id
307 307 self.project_id = project_id
308 308 self.name = name
309 309 self.datatype = datatype
310 310 self.err_queue = err_queue
311 311
312 312 self.addParameter(name='path', value=path)
313 313 self.addParameter(name='startDate', value=startDate)
314 314 self.addParameter(name='endDate', value=endDate)
315 315 self.addParameter(name='startTime', value=startTime)
316 316 self.addParameter(name='endTime', value=endTime)
317 self.addParameter(name='server', value=server)
318 self.addParameter(name='topic', value=topic)
317 319
318 320 for key, value in kwargs.items():
319 321 self.addParameter(name=key, value=value)
320 322
321 323
322 324 class Project(Process):
323 325 """API to create signal chain projects"""
324 326
325 327 ELEMENTNAME = 'Project'
326 328
327 329 def __init__(self, name=''):
328 330
329 331 Process.__init__(self)
330 332 self.id = '1'
331 333 if name:
332 334 self.name = '{} ({})'.format(Process.__name__, name)
333 335 self.filename = None
334 336 self.description = None
335 337 self.email = None
336 338 self.alarm = []
337 339 self.configurations = {}
338 340 # self.err_queue = Queue()
339 341 self.err_queue = None
340 342 self.started = False
341 343
342 344 def getNewId(self):
343 345
344 346 idList = list(self.configurations.keys())
345 347 id = int(self.id) * 10
346 348
347 349 while True:
348 350 id += 1
349 351
350 352 if str(id) in idList:
351 353 continue
352 354
353 355 break
354 356
355 357 return str(id)
356 358
357 359 def updateId(self, new_id):
358 360
359 361 self.id = str(new_id)
360 362
361 363 keyList = list(self.configurations.keys())
362 364 keyList.sort()
363 365
364 366 n = 1
365 367 new_confs = {}
366 368
367 369 for procKey in keyList:
368 370
369 371 conf = self.configurations[procKey]
370 372 idProcUnit = str(int(self.id) * 10 + n)
371 373 conf.updateId(idProcUnit)
372 374 new_confs[idProcUnit] = conf
373 375 n += 1
374 376
375 377 self.configurations = new_confs
376 378
377 379 def setup(self, id=1, name='', description='', email=None, alarm=[]):
378 380
379 381 self.id = str(id)
380 382 self.description = description
381 383 self.email = email
382 384 self.alarm = alarm
383 385 if name:
384 386 self.name = '{} ({})'.format(Process.__name__, name)
385 387
386 388 def update(self, **kwargs):
387 389
388 390 for key, value in kwargs.items():
389 391 setattr(self, key, value)
390 392
391 393 def clone(self):
392 394
393 395 p = Project()
394 396 p.id = self.id
395 397 p.name = self.name
396 398 p.description = self.description
397 399 p.configurations = self.configurations.copy()
398 400
399 401 return p
400 402
401 403 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
402 404
403 405 '''
404 406 '''
405 407
406 408 if id is None:
407 409 idReadUnit = self.getNewId()
408 410 else:
409 411 idReadUnit = str(id)
410 412
411 413 conf = ReadUnitConf()
412 414 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
413 415 self.configurations[conf.id] = conf
414 416
415 417 return conf
416 418
417 419 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
418 420
419 421 '''
420 422 '''
421 423
422 424 if id is None:
423 425 idProcUnit = self.getNewId()
424 426 else:
425 427 idProcUnit = id
426 428
427 429 conf = ProcUnitConf()
428 430 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
429 431 self.configurations[conf.id] = conf
430 432
431 433 return conf
432 434
433 435 def removeProcUnit(self, id):
434 436
435 437 if id in self.configurations:
436 438 self.configurations.pop(id)
437 439
438 440 def getReadUnit(self):
439 441
440 442 for obj in list(self.configurations.values()):
441 443 if obj.ELEMENTNAME == 'ReadUnit':
442 444 return obj
443 445
444 446 return None
445 447
446 448 def getProcUnit(self, id):
447 449
448 450 return self.configurations[id]
449 451
450 452 def getUnits(self):
451 453
452 454 keys = list(self.configurations)
453 455 keys.sort()
454 456
455 457 for key in keys:
456 458 yield self.configurations[key]
457 459
458 460 def updateUnit(self, id, **kwargs):
459 461
460 462 conf = self.configurations[id].update(**kwargs)
461 463
462 464 def makeXml(self):
463 465
464 466 xml = Element('Project')
465 467 xml.set('id', str(self.id))
466 468 xml.set('name', self.name)
467 469 xml.set('description', self.description)
468 470
469 471 for conf in self.configurations.values():
470 472 conf.makeXml(xml)
471 473
472 474 self.xml = xml
473 475
474 476 def writeXml(self, filename=None):
475 477
476 478 if filename == None:
477 479 if self.filename:
478 480 filename = self.filename
479 481 else:
480 482 filename = 'schain.xml'
481 483
482 484 if not filename:
483 485 print('filename has not been defined. Use setFilename(filename) for do it.')
484 486 return 0
485 487
486 488 abs_file = os.path.abspath(filename)
487 489
488 490 if not os.access(os.path.dirname(abs_file), os.W_OK):
489 491 print('No write permission on %s' % os.path.dirname(abs_file))
490 492 return 0
491 493
492 494 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
493 495 print('File %s already exists and it could not be overwriten' % abs_file)
494 496 return 0
495 497
496 498 self.makeXml()
497 499
498 500 ElementTree(self.xml).write(abs_file, method='xml')
499 501
500 502 self.filename = abs_file
501 503
502 504 return 1
503 505
504 506 def readXml(self, filename):
505 507
506 508 abs_file = os.path.abspath(filename)
507 509
508 510 self.configurations = {}
509 511
510 512 try:
511 513 self.xml = ElementTree().parse(abs_file)
512 514 except:
513 515 log.error('Error reading %s, verify file format' % filename)
514 516 return 0
515 517
516 518 self.id = self.xml.get('id')
517 519 self.name = self.xml.get('name')
518 520 self.description = self.xml.get('description')
519 521
520 522 for element in self.xml:
521 523 if element.tag == 'ReadUnit':
522 524 conf = ReadUnitConf()
523 525 conf.readXml(element, self.id, self.err_queue)
524 526 self.configurations[conf.id] = conf
525 527 elif element.tag == 'ProcUnit':
526 528 conf = ProcUnitConf()
527 529 input_proc = self.configurations[element.get('inputId')]
528 530 conf.readXml(element, self.id, self.err_queue)
529 531 self.configurations[conf.id] = conf
530 532
531 533 self.filename = abs_file
532 534
533 535 return 1
534 536
535 537 def __str__(self):
536 538
537 539 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
538 540 self.id,
539 541 self.name,
540 542 self.description,
541 543 )
542 544
543 545 for conf in self.configurations.values():
544 546 text += '{}'.format(conf)
545 547
546 548 return text
547 549
548 550 def createObjects(self):
549 551
550 552 keys = list(self.configurations.keys())
551 553 keys.sort()
552 554 for key in keys:
553 555 conf = self.configurations[key]
554 556 conf.createObjects()
555 557 if conf.inputId is not None:
556 558 conf.object.setInput(self.configurations[conf.inputId].object)
557 559
558 560 def monitor(self):
559 561
560 562 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
561 563 t.start()
562 564
563 565 def _monitor(self, queue, ctx):
564 566
565 567 import socket
566 568
567 569 procs = 0
568 570 err_msg = ''
569 571
570 572 while True:
571 573 msg = queue.get()
572 574 if '#_start_#' in msg:
573 575 procs += 1
574 576 elif '#_end_#' in msg:
575 577 procs -= 1
576 578 else:
577 579 err_msg = msg
578 580
579 581 if procs == 0 or 'Traceback' in err_msg:
580 582 break
581 583 time.sleep(0.1)
582 584
583 585 if '|' in err_msg:
584 586 name, err = err_msg.split('|')
585 587 if 'SchainWarning' in err:
586 588 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
587 589 elif 'SchainError' in err:
588 590 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
589 591 else:
590 592 log.error(err, name)
591 593 else:
592 594 name, err = self.name, err_msg
593 595
594 596 time.sleep(1)
595 597
596 598 ctx.term()
597 599
598 600 message = ''.join(err)
599 601
600 602 if err_msg:
601 603 subject = 'SChain v%s: Error running %s\n' % (
602 604 schainpy.__version__, self.name)
603 605
604 606 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
605 607 socket.gethostname())
606 608 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
607 609 subtitle += 'Configuration file: %s\n' % self.filename
608 610 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
609 611
610 612 readUnitConfObj = self.getReadUnit()
611 613 if readUnitConfObj:
612 614 subtitle += '\nInput parameters:\n'
613 615 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
614 616 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
615 617 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
616 618 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
617 619 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
618 620
619 621 a = Alarm(
620 622 modes=self.alarm,
621 623 email=self.email,
622 624 message=message,
623 625 subject=subject,
624 626 subtitle=subtitle,
625 627 filename=self.filename
626 628 )
627 629
628 630 a.start()
629 631
630 632 def setFilename(self, filename):
631 633
632 634 self.filename = filename
633 635
634 636 def runProcs(self):
635 637
636 638 err = False
637 639 n = len(self.configurations)
638 640
639 641 while not err:
640 642 for conf in self.getUnits():
641 643 ok = conf.run()
642 644 if ok == 'Error':
643 645 n -= 1
644 646 continue
645 647 elif not ok:
646 648 break
647 649 if n == 0:
648 650 err = True
649 651
650 652 def run(self):
651 653
652 654 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
653 655 self.started = True
654 656 self.start_time = time.time()
655 657 self.createObjects()
656 658 self.runProcs()
657 659 log.success('{} Done (Time: {:4.2f}s)'.format(
658 660 self.name,
659 661 time.time() - self.start_time), '')
@@ -1,697 +1,697
1 1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
2 2 # All rights reserved.
3 3 #
4 4 # Distributed under the terms of the BSD 3-clause license.
5 5 """Base class to create plot operations
6 6
7 7 """
8 8
9 9 import os
10 10 import sys
11 11 import zmq
12 12 import time
13 13 import numpy
14 14 import datetime
15 15 from collections import deque
16 16 from functools import wraps
17 17 from threading import Thread
18 18 import matplotlib
19 19
20 20 if 'BACKEND' in os.environ:
21 21 matplotlib.use(os.environ['BACKEND'])
22 22 elif 'linux' in sys.platform:
23 23 matplotlib.use("TkAgg")
24 24 elif 'darwin' in sys.platform:
25 25 matplotlib.use('MacOSX')
26 26 else:
27 27 from schainpy.utils import log
28 28 log.warning('Using default Backend="Agg"', 'INFO')
29 29 matplotlib.use('Agg')
30 30
31 31 import matplotlib.pyplot as plt
32 32 from matplotlib.patches import Polygon
33 33 from mpl_toolkits.axes_grid1 import make_axes_locatable
34 34 from matplotlib.ticker import FuncFormatter, LinearLocator, MultipleLocator
35 35
36 36 from schainpy.model.data.jrodata import PlotterData
37 37 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
38 38 from schainpy.utils import log
39 39
40 40 jet_values = matplotlib.pyplot.get_cmap('jet', 100)(numpy.arange(100))[10:90]
41 41 blu_values = matplotlib.pyplot.get_cmap(
42 42 'seismic_r', 20)(numpy.arange(20))[10:15]
43 43 ncmap = matplotlib.colors.LinearSegmentedColormap.from_list(
44 44 'jro', numpy.vstack((blu_values, jet_values)))
45 45 matplotlib.pyplot.register_cmap(cmap=ncmap)
46 46
47 47 CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis',
48 48 'plasma', 'inferno', 'Greys', 'seismic', 'bwr', 'coolwarm')]
49 49
50 50 EARTH_RADIUS = 6.3710e3
51 51
52 52 def ll2xy(lat1, lon1, lat2, lon2):
53 53
54 54 p = 0.017453292519943295
55 55 a = 0.5 - numpy.cos((lat2 - lat1) * p) / 2 + numpy.cos(lat1 * p) * \
56 56 numpy.cos(lat2 * p) * (1 - numpy.cos((lon2 - lon1) * p)) / 2
57 57 r = 12742 * numpy.arcsin(numpy.sqrt(a))
58 58 theta = numpy.arctan2(numpy.sin((lon2 - lon1) * p) * numpy.cos(lat2 * p), numpy.cos(lat1 * p)
59 59 * numpy.sin(lat2 * p) - numpy.sin(lat1 * p) * numpy.cos(lat2 * p) * numpy.cos((lon2 - lon1) * p))
60 60 theta = -theta + numpy.pi / 2
61 61 return r * numpy.cos(theta), r * numpy.sin(theta)
62 62
63 63
64 64 def km2deg(km):
65 65 '''
66 66 Convert distance in km to degrees
67 67 '''
68 68
69 69 return numpy.rad2deg(km / EARTH_RADIUS)
70 70
71 71
72 72 def figpause(interval):
73 73 backend = plt.rcParams['backend']
74 74 if backend in matplotlib.rcsetup.interactive_bk:
75 75 figManager = matplotlib._pylab_helpers.Gcf.get_active()
76 76 if figManager is not None:
77 77 canvas = figManager.canvas
78 78 if canvas.figure.stale:
79 79 canvas.draw()
80 80 try:
81 81 canvas.start_event_loop(interval)
82 82 except:
83 83 pass
84 84 return
85 85
86 86 def popup(message):
87 87 '''
88 88 '''
89 89
90 90 fig = plt.figure(figsize=(12, 8), facecolor='r')
91 91 text = '\n'.join([s.strip() for s in message.split(':')])
92 92 fig.text(0.01, 0.5, text, ha='left', va='center',
93 93 size='20', weight='heavy', color='w')
94 94 fig.show()
95 95 figpause(1000)
96 96
97 97
98 98 class Throttle(object):
99 99 '''
100 100 Decorator that prevents a function from being called more than once every
101 101 time period.
102 102 To create a function that cannot be called more than once a minute, but
103 103 will sleep until it can be called:
104 104 @Throttle(minutes=1)
105 105 def foo():
106 106 pass
107 107
108 108 for i in range(10):
109 109 foo()
110 110 print "This function has run %s times." % i
111 111 '''
112 112
113 113 def __init__(self, seconds=0, minutes=0, hours=0):
114 114 self.throttle_period = datetime.timedelta(
115 115 seconds=seconds, minutes=minutes, hours=hours
116 116 )
117 117
118 118 self.time_of_last_call = datetime.datetime.min
119 119
120 120 def __call__(self, fn):
121 121 @wraps(fn)
122 122 def wrapper(*args, **kwargs):
123 123 coerce = kwargs.pop('coerce', None)
124 124 if coerce:
125 125 self.time_of_last_call = datetime.datetime.now()
126 126 return fn(*args, **kwargs)
127 127 else:
128 128 now = datetime.datetime.now()
129 129 time_since_last_call = now - self.time_of_last_call
130 130 time_left = self.throttle_period - time_since_last_call
131 131
132 132 if time_left > datetime.timedelta(seconds=0):
133 133 return
134 134
135 135 self.time_of_last_call = datetime.datetime.now()
136 136 return fn(*args, **kwargs)
137 137
138 138 return wrapper
139 139
140 140 def apply_throttle(value):
141 141
142 142 @Throttle(seconds=value)
143 143 def fnThrottled(fn):
144 144 fn()
145 145
146 146 return fnThrottled
147 147
148 148
149 149 @MPDecorator
150 150 class Plot(Operation):
151 151 """Base class for Schain plotting operations
152 152
153 153 This class should never be use directtly you must subclass a new operation,
154 154 children classes must be defined as follow:
155 155
156 156 ExamplePlot(Plot):
157 157
158 158 CODE = 'code'
159 159 colormap = 'jet'
160 160 plot_type = 'pcolor' # options are ('pcolor', 'pcolorbuffer', 'scatter', 'scatterbuffer')
161 161
162 162 def setup(self):
163 163 pass
164 164
165 165 def plot(self):
166 166 pass
167 167
168 168 """
169 169
170 170 CODE = 'Figure'
171 171 colormap = 'jet'
172 172 bgcolor = 'white'
173 173 buffering = True
174 174 __missing = 1E30
175 175
176 176 __attrs__ = ['show', 'save', 'ymin', 'ymax', 'zmin', 'zmax', 'title',
177 177 'showprofile']
178 178
179 179 def __init__(self):
180 180
181 181 Operation.__init__(self)
182 182 self.isConfig = False
183 183 self.isPlotConfig = False
184 184 self.save_time = 0
185 185 self.sender_time = 0
186 186 self.data = None
187 187 self.firsttime = True
188 188 self.sender_queue = deque(maxlen=10)
189 189 self.plots_adjust = {'left': 0.125, 'right': 0.9, 'bottom': 0.15, 'top': 0.9, 'wspace': 0.2, 'hspace': 0.2}
190 190
191 191 def __fmtTime(self, x, pos):
192 192 '''
193 193 '''
194 194
195 195 return '{}'.format(self.getDateTime(x).strftime('%H:%M'))
196 196
197 197 def __setup(self, **kwargs):
198 198 '''
199 199 Initialize variables
200 200 '''
201 201
202 202 self.figures = []
203 203 self.axes = []
204 204 self.cb_axes = []
205 205 self.localtime = kwargs.pop('localtime', True)
206 206 self.show = kwargs.get('show', True)
207 207 self.save = kwargs.get('save', False)
208 208 self.save_period = kwargs.get('save_period', 0)
209 209 self.colormap = kwargs.get('colormap', self.colormap)
210 210 self.colormap_coh = kwargs.get('colormap_coh', 'jet')
211 211 self.colormap_phase = kwargs.get('colormap_phase', 'RdBu_r')
212 212 self.colormaps = kwargs.get('colormaps', None)
213 213 self.bgcolor = kwargs.get('bgcolor', self.bgcolor)
214 214 self.showprofile = kwargs.get('showprofile', False)
215 215 self.title = kwargs.get('wintitle', self.CODE.upper())
216 216 self.cb_label = kwargs.get('cb_label', None)
217 217 self.cb_labels = kwargs.get('cb_labels', None)
218 218 self.labels = kwargs.get('labels', None)
219 219 self.xaxis = kwargs.get('xaxis', 'frequency')
220 220 self.zmin = kwargs.get('zmin', None)
221 221 self.zmax = kwargs.get('zmax', None)
222 222 self.zlimits = kwargs.get('zlimits', None)
223 223 self.xlimits = kwargs.get('xlimits', None)
224 224 self.xstep_given = kwargs.get('xstep_given', None)
225 225 self.ystep_given = kwargs.get('ystep_given', None)
226 226 self.autoxticks = kwargs.get('autoxticks', True)
227 227 self.xmin = kwargs.get('xmin', None)
228 228 self.xmax = kwargs.get('xmax', None)
229 229 self.xrange = kwargs.get('xrange', 12)
230 230 self.xscale = kwargs.get('xscale', None)
231 231 self.ymin = kwargs.get('ymin', None)
232 232 self.ymax = kwargs.get('ymax', None)
233 233 self.yscale = kwargs.get('yscale', None)
234 234 self.xlabel = kwargs.get('xlabel', None)
235 235 self.attr_time = kwargs.get('attr_time', 'utctime')
236 236 self.attr_data = kwargs.get('attr_data', 'data_param')
237 237 self.decimation = kwargs.get('decimation', None)
238 238 self.oneFigure = kwargs.get('oneFigure', True)
239 239 self.width = kwargs.get('width', None)
240 240 self.height = kwargs.get('height', None)
241 241 self.colorbar = kwargs.get('colorbar', True)
242 242 self.factors = kwargs.get('factors', [1, 1, 1, 1, 1, 1, 1, 1])
243 243 self.channels = kwargs.get('channels', None)
244 244 self.titles = kwargs.get('titles', [])
245 245 self.polar = False
246 246 self.type = kwargs.get('type', 'iq')
247 247 self.grid = kwargs.get('grid', False)
248 248 self.pause = kwargs.get('pause', False)
249 249 self.save_code = kwargs.get('save_code', self.CODE)
250 250 self.throttle = kwargs.get('throttle', 0)
251 251 self.exp_code = kwargs.get('exp_code', None)
252 252 self.server = kwargs.get('server', False)
253 253 self.sender_period = kwargs.get('sender_period', 60)
254 254 self.tag = kwargs.get('tag', '')
255 255 self.height_index = kwargs.get('height_index', None)
256 256 self.__throttle_plot = apply_throttle(self.throttle)
257 257 code = self.attr_data if self.attr_data else self.CODE
258 258 self.data = PlotterData(self.CODE, self.exp_code, self.localtime)
259 259
260 260 if self.server:
261 261 if not self.server.startswith('tcp://'):
262 262 self.server = 'tcp://{}'.format(self.server)
263 263 log.success(
264 264 'Sending to server: {}'.format(self.server),
265 265 self.name
266 266 )
267 267
268 268 if isinstance(self.attr_data, str):
269 269 self.attr_data = [self.attr_data]
270 270
271 271 def __setup_plot(self):
272 272 '''
273 273 Common setup for all figures, here figures and axes are created
274 274 '''
275 275
276 276 self.setup()
277 277
278 278 self.time_label = 'LT' if self.localtime else 'UTC'
279 279
280 280 if self.width is None:
281 281 self.width = 8
282 282
283 283 self.figures = []
284 284 self.axes = []
285 285 self.cb_axes = []
286 286 self.pf_axes = []
287 287 self.cmaps = []
288 288
289 289 size = '15%' if self.ncols == 1 else '30%'
290 290 pad = '4%' if self.ncols == 1 else '8%'
291 291
292 292 if self.oneFigure:
293 293 if self.height is None:
294 294 self.height = 1.4 * self.nrows + 1
295 295 fig = plt.figure(figsize=(self.width, self.height),
296 296 edgecolor='k',
297 297 facecolor='w')
298 298 self.figures.append(fig)
299 299 for n in range(self.nplots):
300 300 ax = fig.add_subplot(self.nrows, self.ncols,
301 301 n + 1, polar=self.polar)
302 302 ax.tick_params(labelsize=8)
303 303 ax.firsttime = True
304 304 ax.index = 0
305 305 ax.press = None
306 306 self.axes.append(ax)
307 307 if self.showprofile:
308 308 cax = self.__add_axes(ax, size=size, pad=pad)
309 309 cax.tick_params(labelsize=8)
310 310 self.pf_axes.append(cax)
311 311 else:
312 312 if self.height is None:
313 313 self.height = 3
314 314 for n in range(self.nplots):
315 315 fig = plt.figure(figsize=(self.width, self.height),
316 316 edgecolor='k',
317 317 facecolor='w')
318 318 ax = fig.add_subplot(1, 1, 1, polar=self.polar)
319 319 ax.tick_params(labelsize=8)
320 320 ax.firsttime = True
321 321 ax.index = 0
322 322 ax.press = None
323 323 self.figures.append(fig)
324 324 self.axes.append(ax)
325 325 if self.showprofile:
326 326 cax = self.__add_axes(ax, size=size, pad=pad)
327 327 cax.tick_params(labelsize=8)
328 328 self.pf_axes.append(cax)
329 329
330 330 for n in range(self.nrows):
331 331 if self.colormaps is not None:
332 332 cmap = plt.get_cmap(self.colormaps[n])
333 333 else:
334 334 cmap = plt.get_cmap(self.colormap)
335 335 cmap.set_bad(self.bgcolor, 1.)
336 336 self.cmaps.append(cmap)
337 337
338 338 def __add_axes(self, ax, size='30%', pad='8%'):
339 339 '''
340 340 Add new axes to the given figure
341 341 '''
342 342 divider = make_axes_locatable(ax)
343 343 nax = divider.new_horizontal(size=size, pad=pad)
344 344 ax.figure.add_axes(nax)
345 345 return nax
346 346
347 347 def fill_gaps(self, x_buffer, y_buffer, z_buffer):
348 348 '''
349 349 Create a masked array for missing data
350 350 '''
351 351 if x_buffer.shape[0] < 2:
352 352 return x_buffer, y_buffer, z_buffer
353 353
354 354 deltas = x_buffer[1:] - x_buffer[0:-1]
355 355 x_median = numpy.median(deltas)
356 356
357 357 index = numpy.where(deltas > 5 * x_median)
358 358
359 359 if len(index[0]) != 0:
360 360 z_buffer[::, index[0], ::] = self.__missing
361 361 z_buffer = numpy.ma.masked_inside(z_buffer,
362 362 0.99 * self.__missing,
363 363 1.01 * self.__missing)
364 364
365 365 return x_buffer, y_buffer, z_buffer
366 366
367 367 def decimate(self):
368 368
369 369 # dx = int(len(self.x)/self.__MAXNUMX) + 1
370 370 dy = int(len(self.y) / self.decimation) + 1
371 371
372 372 # x = self.x[::dx]
373 373 x = self.x
374 374 y = self.y[::dy]
375 375 z = self.z[::, ::, ::dy]
376 376
377 377 return x, y, z
378 378
379 379 def format(self):
380 380 '''
381 381 Set min and max values, labels, ticks and titles
382 382 '''
383 383
384 384 for n, ax in enumerate(self.axes):
385 385 if ax.firsttime:
386 386 if self.xaxis != 'time':
387 387 xmin = self.xmin
388 388 xmax = self.xmax
389 389 else:
390 390 xmin = self.tmin
391 391 xmax = self.tmin + self.xrange * 60 * 60
392 392 ax.xaxis.set_major_formatter(FuncFormatter(self.__fmtTime))
393 393 ax.xaxis.set_major_locator(LinearLocator(9))
394 394 ymin = self.ymin if self.ymin is not None else numpy.nanmin(self.y[numpy.isfinite(self.y)])
395 395 ymax = self.ymax if self.ymax is not None else numpy.nanmax(self.y[numpy.isfinite(self.y)])
396 396 ax.set_facecolor(self.bgcolor)
397 397 if self.xscale:
398 398 ax.xaxis.set_major_formatter(FuncFormatter(
399 399 lambda x, pos: '{0:g}'.format(x * self.xscale)))
400 400 if self.yscale:
401 401 ax.yaxis.set_major_formatter(FuncFormatter(
402 402 lambda x, pos: '{0:g}'.format(x * self.yscale)))
403 403 if self.xlabel is not None:
404 404 ax.set_xlabel(self.xlabel)
405 405 if self.ylabel is not None:
406 406 ax.set_ylabel(self.ylabel)
407 407 if self.showprofile:
408 408 self.pf_axes[n].set_ylim(ymin, ymax)
409 409 self.pf_axes[n].set_xlim(self.zmin, self.zmax)
410 410 self.pf_axes[n].set_xlabel('dB')
411 411 self.pf_axes[n].grid(b=True, axis='x')
412 412 [tick.set_visible(False)
413 413 for tick in self.pf_axes[n].get_yticklabels()]
414 414 if self.colorbar:
415 415 ax.cbar = plt.colorbar(
416 416 ax.plt, ax=ax, fraction=0.05, pad=0.02, aspect=10)
417 417 ax.cbar.ax.tick_params(labelsize=8)
418 418 ax.cbar.ax.press = None
419 419 if self.cb_label:
420 420 ax.cbar.set_label(self.cb_label, size=8)
421 421 elif self.cb_labels:
422 422 ax.cbar.set_label(self.cb_labels[n], size=8)
423 423 else:
424 424 ax.cbar = None
425 425 ax.set_xlim(xmin, xmax)
426 426 ax.set_ylim(ymin, ymax)
427 427 ax.firsttime = False
428 428 if self.grid:
429 429 ax.grid(True)
430 430 if not self.polar:
431 431 ax.set_title('{} {} {}'.format(
432 432 self.titles[n],
433 433 self.getDateTime(self.data.max_time).strftime(
434 434 '%Y-%m-%d %H:%M:%S'),
435 435 self.time_label),
436 436 size=8)
437 437 else:
438 438 ax.set_title('{}'.format(self.titles[n]), size=8)
439 439 ax.set_ylim(0, 90)
440 440 ax.set_yticks(numpy.arange(0, 90, 20))
441 441 ax.yaxis.labelpad = 40
442 442
443 443 if self.firsttime:
444 444 for n, fig in enumerate(self.figures):
445 445 fig.subplots_adjust(**self.plots_adjust)
446 446 self.firsttime = False
447 447
448 448 def clear_figures(self):
449 449 '''
450 450 Reset axes for redraw plots
451 451 '''
452 452
453 453 for ax in self.axes + self.pf_axes + self.cb_axes:
454 454 ax.clear()
455 455 ax.firsttime = True
456 456 if hasattr(ax, 'cbar') and ax.cbar:
457 457 ax.cbar.remove()
458 458
459 459 def __plot(self):
460 460 '''
461 461 Main function to plot, format and save figures
462 462 '''
463 463
464 464 self.plot()
465 465 self.format()
466 466
467 467 for n, fig in enumerate(self.figures):
468 468 if self.nrows == 0 or self.nplots == 0:
469 469 log.warning('No data', self.name)
470 470 fig.text(0.5, 0.5, 'No Data', fontsize='large', ha='center')
471 471 fig.canvas.manager.set_window_title(self.CODE)
472 472 continue
473 473
474 474 fig.canvas.manager.set_window_title('{} - {}'.format(self.title,
475 475 self.getDateTime(self.data.max_time).strftime('%Y/%m/%d')))
476 476 fig.canvas.draw()
477 477 if self.show:
478 478 fig.show()
479 479 figpause(0.01)
480 480
481 481 if self.save:
482 482 self.save_figure(n)
483 483
484 484 if self.server:
485 485 self.send_to_server()
486 486
487 487 def __update(self, dataOut, timestamp):
488 488 '''
489 489 '''
490 490
491 491 metadata = {
492 492 'yrange': dataOut.heightList,
493 493 'interval': dataOut.timeInterval,
494 494 'channels': dataOut.channelList
495 495 }
496 496
497 497 data, meta = self.update(dataOut)
498 498 metadata.update(meta)
499 499 self.data.update(data, timestamp, metadata)
500 500
501 501 def save_figure(self, n):
502 502 '''
503 503 '''
504 504
505 505 if (self.data.max_time - self.save_time) <= self.save_period:
506 506 return
507 507
508 508 self.save_time = self.data.max_time
509 509
510 510 fig = self.figures[n]
511 511
512 512 if self.throttle == 0:
513 513 figname = os.path.join(
514 514 self.save,
515 515 self.save_code,
516 516 '{}_{}.png'.format(
517 517 self.save_code,
518 518 self.getDateTime(self.data.max_time).strftime(
519 519 '%Y%m%d_%H%M%S'
520 520 ),
521 521 )
522 522 )
523 523 log.log('Saving figure: {}'.format(figname), self.name)
524 524 if not os.path.isdir(os.path.dirname(figname)):
525 525 os.makedirs(os.path.dirname(figname))
526 526 fig.savefig(figname)
527 527
528 528 figname = os.path.join(
529 529 self.save,
530 530 self.save_code,
531 531 '{}_{}.png'.format(
532 532 self.save_code,
533 533 self.getDateTime(self.data.min_time).strftime(
534 534 '%Y%m%d'
535 535 ),
536 536 )
537 537 )
538 538 log.log('Saving figure: {}'.format(figname), self.name)
539 539 if not os.path.isdir(os.path.dirname(figname)):
540 540 os.makedirs(os.path.dirname(figname))
541 541 fig.savefig(figname)
542 542
543 543 def send_to_server(self):
544 544 '''
545 545 '''
546 546
547 547 if self.exp_code == None:
548 548 log.warning('Missing `exp_code` skipping sending to server...')
549 549
550 550 last_time = self.data.max_time
551 551 interval = last_time - self.sender_time
552 552 if interval < self.sender_period:
553 553 return
554 554
555 555 self.sender_time = last_time
556 556
557 attrs = ['titles', 'zmin', 'zmax', 'tag', 'ymin', 'ymax']
557 attrs = ['titles', 'zmin', 'zmax', 'tag', 'ymin', 'ymax', 'zlimits']
558 558 for attr in attrs:
559 559 value = getattr(self, attr)
560 560 if value:
561 561 if isinstance(value, (numpy.float32, numpy.float64)):
562 562 value = round(float(value), 2)
563 563 self.data.meta[attr] = value
564 564 if self.colormap == 'jet':
565 565 self.data.meta['colormap'] = 'Jet'
566 566 elif 'RdBu' in self.colormap:
567 567 self.data.meta['colormap'] = 'RdBu'
568 568 else:
569 569 self.data.meta['colormap'] = 'Viridis'
570 570 self.data.meta['interval'] = int(interval)
571 571
572 572 self.sender_queue.append(last_time)
573 573
574 574 while True:
575 575 try:
576 576 tm = self.sender_queue.popleft()
577 577 except IndexError:
578 578 break
579 579 msg = self.data.jsonify(tm, self.save_code, self.plot_type)
580 580 self.socket.send_string(msg)
581 581 socks = dict(self.poll.poll(2000))
582 582 if socks.get(self.socket) == zmq.POLLIN:
583 583 reply = self.socket.recv_string()
584 584 if reply == 'ok':
585 585 log.log("Response from server ok", self.name)
586 586 time.sleep(0.1)
587 587 continue
588 588 else:
589 589 log.warning(
590 590 "Malformed reply from server: {}".format(reply), self.name)
591 591 else:
592 592 log.warning(
593 593 "No response from server, retrying...", self.name)
594 594 self.sender_queue.appendleft(tm)
595 595 self.socket.setsockopt(zmq.LINGER, 0)
596 596 self.socket.close()
597 597 self.poll.unregister(self.socket)
598 598 self.socket = self.context.socket(zmq.REQ)
599 599 self.socket.connect(self.server)
600 600 self.poll.register(self.socket, zmq.POLLIN)
601 601 break
602 602
603 603 def setup(self):
604 604 '''
605 605 This method should be implemented in the child class, the following
606 606 attributes should be set:
607 607
608 608 self.nrows: number of rows
609 609 self.ncols: number of cols
610 610 self.nplots: number of plots (channels or pairs)
611 611 self.ylabel: label for Y axes
612 612 self.titles: list of axes title
613 613
614 614 '''
615 615 raise NotImplementedError
616 616
617 617 def plot(self):
618 618 '''
619 619 Must be defined in the child class, the actual plotting method
620 620 '''
621 621 raise NotImplementedError
622 622
623 623 def update(self, dataOut):
624 624 '''
625 625 Must be defined in the child class, update self.data with new data
626 626 '''
627 627
628 628 data = {
629 629 self.CODE: getattr(dataOut, 'data_{}'.format(self.CODE))
630 630 }
631 631 meta = {}
632 632
633 633 return data, meta
634 634
635 635 def run(self, dataOut, **kwargs):
636 636 '''
637 637 Main plotting routine
638 638 '''
639 639
640 640 if self.isConfig is False:
641 641 self.__setup(**kwargs)
642 642
643 643 if self.localtime:
644 644 self.getDateTime = datetime.datetime.fromtimestamp
645 645 else:
646 646 self.getDateTime = datetime.datetime.utcfromtimestamp
647 647
648 648 self.data.setup()
649 649 self.isConfig = True
650 650 if self.server:
651 651 self.context = zmq.Context()
652 652 self.socket = self.context.socket(zmq.REQ)
653 653 self.socket.connect(self.server)
654 654 self.poll = zmq.Poller()
655 655 self.poll.register(self.socket, zmq.POLLIN)
656 656
657 657 tm = getattr(dataOut, self.attr_time)
658 658
659 659 if self.data and 'time' in self.xaxis and (tm - self.tmin) >= self.xrange * 60 * 60:
660 660 self.save_time = tm
661 661 self.__plot()
662 662 self.tmin += self.xrange * 60 * 60
663 663 self.data.setup()
664 664 self.clear_figures()
665 665
666 666 self.__update(dataOut, tm)
667 667
668 668 if self.isPlotConfig is False:
669 669 self.__setup_plot()
670 670 self.isPlotConfig = True
671 671 if self.xaxis == 'time':
672 672 dt = self.getDateTime(tm)
673 673 if self.xmin is None:
674 674 self.tmin = tm
675 675 self.xmin = dt.hour
676 676 minutes = (self.xmin - int(self.xmin)) * 60
677 677 seconds = (minutes - int(minutes)) * 60
678 678 self.tmin = (dt.replace(hour=int(self.xmin), minute=int(minutes), second=int(seconds)) -
679 679 datetime.datetime(1970, 1, 1)).total_seconds()
680 680 if self.localtime:
681 681 self.tmin += time.timezone
682 682
683 683 if self.xmin is not None and self.xmax is not None:
684 684 self.xrange = self.xmax - self.xmin
685 685
686 686 if self.throttle == 0:
687 687 self.__plot()
688 688 else:
689 689 self.__throttle_plot(self.__plot) # , coerce=coerce)
690 690
691 691 def close(self):
692 692
693 693 if self.data and not self.data.flagNoData:
694 694 self.save_time = 0
695 695 self.__plot()
696 696 if self.data and not self.data.flagNoData and self.pause:
697 697 figpause(10)
@@ -1,1615 +1,1621
1 1 """
2 2 Created on Jul 2, 2014
3 3
4 4 @author: roj-idl71
5 5 """
6 6 import os
7 7 import sys
8 8 import glob
9 9 import time
10 10 import numpy
11 11 import fnmatch
12 12 import inspect
13 13 import time
14 14 import datetime
15 15 import zmq
16 16
17 17 from schainpy.model.proc.jroproc_base import Operation, MPDecorator
18 18 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
19 19 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
20 20 from schainpy.utils import log
21 21 import schainpy.admin
22 22
23 23 LOCALTIME = True
24 24 DT_DIRECTIVES = {
25 25 '%Y': 4,
26 26 '%y': 2,
27 27 '%m': 2,
28 28 '%d': 2,
29 29 '%j': 3,
30 30 '%H': 2,
31 31 '%M': 2,
32 32 '%S': 2,
33 33 '%f': 6
34 34 }
35 35
36 36
37 37 def isNumber(cad):
38 38 """
39 39 Chequea si el conjunto de caracteres que componen un string puede ser convertidos a un numero.
40 40
41 41 Excepciones:
42 42 Si un determinado string no puede ser convertido a numero
43 43 Input:
44 44 str, string al cual se le analiza para determinar si convertible a un numero o no
45 45
46 46 Return:
47 47 True : si el string es uno numerico
48 48 False : no es un string numerico
49 49 """
50 50 try:
51 51 float(cad)
52 52 return True
53 53 except:
54 54 return False
55 55
56 56
57 57 def isFileInEpoch(filename, startUTSeconds, endUTSeconds):
58 58 """
59 59 Esta funcion determina si un archivo de datos se encuentra o no dentro del rango de fecha especificado.
60 60
61 61 Inputs:
62 62 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
63 63
64 64 startUTSeconds : fecha inicial del rango seleccionado. La fecha esta dada en
65 65 segundos contados desde 01/01/1970.
66 66 endUTSeconds : fecha final del rango seleccionado. La fecha esta dada en
67 67 segundos contados desde 01/01/1970.
68 68
69 69 Return:
70 70 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
71 71 fecha especificado, de lo contrario retorna False.
72 72
73 73 Excepciones:
74 74 Si el archivo no existe o no puede ser abierto
75 75 Si la cabecera no puede ser leida.
76 76
77 77 """
78 78 basicHeaderObj = BasicHeader(LOCALTIME)
79 79
80 80 try:
81 81
82 82 fp = open(filename, 'rb')
83 83 except IOError:
84 84 print("The file %s can't be opened" % (filename))
85 85 return 0
86 86
87 87 sts = basicHeaderObj.read(fp)
88 88 fp.close()
89 89
90 90 if not(sts):
91 91 print("Skipping the file %s because it has not a valid header" % (filename))
92 92 return 0
93 93
94 94 if not ((startUTSeconds <= basicHeaderObj.utc) and (endUTSeconds > basicHeaderObj.utc)):
95 95 return 0
96 96
97 97 return 1
98 98
99 99
100 100 def isTimeInRange(thisTime, startTime, endTime):
101 101 if endTime >= startTime:
102 102 if (thisTime < startTime) or (thisTime > endTime):
103 103 return 0
104 104 return 1
105 105 else:
106 106 if (thisTime < startTime) and (thisTime > endTime):
107 107 return 0
108 108 return 1
109 109
110 110
111 111 def isFileInTimeRange(filename, startDate, endDate, startTime, endTime):
112 112 """
113 113 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
114 114
115 115 Inputs:
116 116 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
117 117
118 118 startDate : fecha inicial del rango seleccionado en formato datetime.date
119 119
120 120 endDate : fecha final del rango seleccionado en formato datetime.date
121 121
122 122 startTime : tiempo inicial del rango seleccionado en formato datetime.time
123 123
124 124 endTime : tiempo final del rango seleccionado en formato datetime.time
125 125
126 126 Return:
127 127 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
128 128 fecha especificado, de lo contrario retorna False.
129 129
130 130 Excepciones:
131 131 Si el archivo no existe o no puede ser abierto
132 132 Si la cabecera no puede ser leida.
133 133
134 134 """
135 135
136 136 try:
137 137 fp = open(filename, 'rb')
138 138 except IOError:
139 139 print("The file %s can't be opened" % (filename))
140 140 return None
141 141
142 142 firstBasicHeaderObj = BasicHeader(LOCALTIME)
143 143 systemHeaderObj = SystemHeader()
144 144
145 145 radarControllerHeaderObj = RadarControllerHeader()
146 146 processingHeaderObj = ProcessingHeader()
147 147
148 148 lastBasicHeaderObj = BasicHeader(LOCALTIME)
149 149
150 150 sts = firstBasicHeaderObj.read(fp)
151 151
152 152 if not(sts):
153 153 print("[Reading] Skipping the file %s because it has not a valid header" % (filename))
154 154 return None
155 155
156 156 if not systemHeaderObj.read(fp):
157 157 return None
158 158
159 159 if not radarControllerHeaderObj.read(fp):
160 160 return None
161 161
162 162 if not processingHeaderObj.read(fp):
163 163 return None
164 164
165 165 filesize = os.path.getsize(filename)
166 166
167 167 offset = processingHeaderObj.blockSize + 24 # header size
168 168
169 169 if filesize <= offset:
170 170 print("[Reading] %s: This file has not enough data" % filename)
171 171 return None
172 172
173 173 fp.seek(-offset, 2)
174 174
175 175 sts = lastBasicHeaderObj.read(fp)
176 176
177 177 fp.close()
178 178
179 179 thisDatetime = lastBasicHeaderObj.datatime
180 180 thisTime_last_block = thisDatetime.time()
181 181
182 182 thisDatetime = firstBasicHeaderObj.datatime
183 183 thisDate = thisDatetime.date()
184 184 thisTime_first_block = thisDatetime.time()
185 185
186 186 # General case
187 187 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
188 188 #-----------o----------------------------o-----------
189 189 # startTime endTime
190 190
191 191 if endTime >= startTime:
192 192 if (thisTime_last_block < startTime) or (thisTime_first_block > endTime):
193 193 return None
194 194
195 195 return thisDatetime
196 196
197 197 # If endTime < startTime then endTime belongs to the next day
198 198
199 199 # <<<<<<<<<<<o o>>>>>>>>>>>
200 200 #-----------o----------------------------o-----------
201 201 # endTime startTime
202 202
203 203 if (thisDate == startDate) and (thisTime_last_block < startTime):
204 204 return None
205 205
206 206 if (thisDate == endDate) and (thisTime_first_block > endTime):
207 207 return None
208 208
209 209 if (thisTime_last_block < startTime) and (thisTime_first_block > endTime):
210 210 return None
211 211
212 212 return thisDatetime
213 213
214 214
215 215 def isFolderInDateRange(folder, startDate=None, endDate=None):
216 216 """
217 217 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
218 218
219 219 Inputs:
220 220 folder : nombre completo del directorio.
221 221 Su formato deberia ser "/path_root/?YYYYDDD"
222 222
223 223 siendo:
224 224 YYYY : Anio (ejemplo 2015)
225 225 DDD : Dia del anio (ejemplo 305)
226 226
227 227 startDate : fecha inicial del rango seleccionado en formato datetime.date
228 228
229 229 endDate : fecha final del rango seleccionado en formato datetime.date
230 230
231 231 Return:
232 232 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
233 233 fecha especificado, de lo contrario retorna False.
234 234 Excepciones:
235 235 Si el directorio no tiene el formato adecuado
236 236 """
237 237
238 238 basename = os.path.basename(folder)
239 239
240 240 if not isRadarFolder(basename):
241 241 print("The folder %s has not the rigth format" % folder)
242 242 return 0
243 243
244 244 if startDate and endDate:
245 245 thisDate = getDateFromRadarFolder(basename)
246 246
247 247 if thisDate < startDate:
248 248 return 0
249 249
250 250 if thisDate > endDate:
251 251 return 0
252 252
253 253 return 1
254 254
255 255
256 256 def isFileInDateRange(filename, startDate=None, endDate=None):
257 257 """
258 258 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
259 259
260 260 Inputs:
261 261 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
262 262
263 263 Su formato deberia ser "?YYYYDDDsss"
264 264
265 265 siendo:
266 266 YYYY : Anio (ejemplo 2015)
267 267 DDD : Dia del anio (ejemplo 305)
268 268 sss : set
269 269
270 270 startDate : fecha inicial del rango seleccionado en formato datetime.date
271 271
272 272 endDate : fecha final del rango seleccionado en formato datetime.date
273 273
274 274 Return:
275 275 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
276 276 fecha especificado, de lo contrario retorna False.
277 277 Excepciones:
278 278 Si el archivo no tiene el formato adecuado
279 279 """
280 280
281 281 basename = os.path.basename(filename)
282 282
283 283 if not isRadarFile(basename):
284 284 print("The filename %s has not the rigth format" % filename)
285 285 return 0
286 286
287 287 if startDate and endDate:
288 288 thisDate = getDateFromRadarFile(basename)
289 289
290 290 if thisDate < startDate:
291 291 return 0
292 292
293 293 if thisDate > endDate:
294 294 return 0
295 295
296 296 return 1
297 297
298 298
299 299 def getFileFromSet(path, ext, set):
300 300 validFilelist = []
301 301 fileList = os.listdir(path)
302 302
303 303 # 0 1234 567 89A BCDE
304 304 # H YYYY DDD SSS .ext
305 305
306 306 for thisFile in fileList:
307 307 try:
308 308 year = int(thisFile[1:5])
309 309 doy = int(thisFile[5:8])
310 310 except:
311 311 continue
312 312
313 313 if (os.path.splitext(thisFile)[-1].lower() != ext.lower()):
314 314 continue
315 315
316 316 validFilelist.append(thisFile)
317 317
318 318 myfile = fnmatch.filter(
319 319 validFilelist, '*%4.4d%3.3d%3.3d*' % (year, doy, set))
320 320
321 321 if len(myfile) != 0:
322 322 return myfile[0]
323 323 else:
324 324 filename = '*%4.4d%3.3d%3.3d%s' % (year, doy, set, ext.lower())
325 325 print('the filename %s does not exist' % filename)
326 326 print('...going to the last file: ')
327 327
328 328 if validFilelist:
329 329 validFilelist = sorted(validFilelist, key=str.lower)
330 330 return validFilelist[-1]
331 331
332 332 return None
333 333
334 334
335 335 def getlastFileFromPath(path, ext):
336 336 """
337 337 Depura el fileList dejando solo los que cumplan el formato de "PYYYYDDDSSS.ext"
338 338 al final de la depuracion devuelve el ultimo file de la lista que quedo.
339 339
340 340 Input:
341 341 fileList : lista conteniendo todos los files (sin path) que componen una determinada carpeta
342 342 ext : extension de los files contenidos en una carpeta
343 343
344 344 Return:
345 345 El ultimo file de una determinada carpeta, no se considera el path.
346 346 """
347 347 validFilelist = []
348 348 fileList = os.listdir(path)
349 349
350 350 # 0 1234 567 89A BCDE
351 351 # H YYYY DDD SSS .ext
352 352
353 353 for thisFile in fileList:
354 354
355 355 year = thisFile[1:5]
356 356 if not isNumber(year):
357 357 continue
358 358
359 359 doy = thisFile[5:8]
360 360 if not isNumber(doy):
361 361 continue
362 362
363 363 year = int(year)
364 364 doy = int(doy)
365 365
366 366 if (os.path.splitext(thisFile)[-1].lower() != ext.lower()):
367 367 continue
368 368
369 369 validFilelist.append(thisFile)
370 370
371 371 if validFilelist:
372 372 validFilelist = sorted(validFilelist, key=str.lower)
373 373 return validFilelist[-1]
374 374
375 375 return None
376 376
377 377
378 378 def isRadarFolder(folder):
379 379 try:
380 380 year = int(folder[1:5])
381 381 doy = int(folder[5:8])
382 382 except:
383 383 return 0
384 384
385 385 return 1
386 386
387 387
388 388 def isRadarFile(file):
389 389 try:
390 390 year = int(file[1:5])
391 391 doy = int(file[5:8])
392 392 set = int(file[8:11])
393 393 except:
394 394 return 0
395 395
396 396 return 1
397 397
398 398
399 399 def getDateFromRadarFile(file):
400 400 try:
401 401 year = int(file[1:5])
402 402 doy = int(file[5:8])
403 403 set = int(file[8:11])
404 404 except:
405 405 return None
406 406
407 407 thisDate = datetime.date(year, 1, 1) + datetime.timedelta(doy - 1)
408 408 return thisDate
409 409
410 410
411 411 def getDateFromRadarFolder(folder):
412 412 try:
413 413 year = int(folder[1:5])
414 414 doy = int(folder[5:8])
415 415 except:
416 416 return None
417 417
418 418 thisDate = datetime.date(year, 1, 1) + datetime.timedelta(doy - 1)
419 419 return thisDate
420 420
421 421 def parse_format(s, fmt):
422 422
423 423 for i in range(fmt.count('%')):
424 424 x = fmt.index('%')
425 425 d = DT_DIRECTIVES[fmt[x:x + 2]]
426 426 fmt = fmt.replace(fmt[x:x + 2], s[x:x + d])
427 427 return fmt
428 428
429 429 class Reader(object):
430 430
431 431 c = 3E8
432 432 isConfig = False
433 433 dtype = None
434 434 pathList = []
435 435 filenameList = []
436 436 datetimeList = []
437 437 filename = None
438 438 ext = None
439 439 flagIsNewFile = 1
440 440 flagDiscontinuousBlock = 0
441 441 flagIsNewBlock = 0
442 442 flagNoMoreFiles = 0
443 443 fp = None
444 444 firstHeaderSize = 0
445 445 basicHeaderSize = 24
446 446 versionFile = 1103
447 447 fileSize = None
448 448 fileSizeByHeader = None
449 449 fileIndex = -1
450 450 profileIndex = None
451 451 blockIndex = 0
452 452 nTotalBlocks = 0
453 453 maxTimeStep = 30
454 454 lastUTTime = None
455 455 datablock = None
456 456 dataOut = None
457 457 getByBlock = False
458 458 path = None
459 459 startDate = None
460 460 endDate = None
461 461 startTime = datetime.time(0, 0, 0)
462 462 endTime = datetime.time(23, 59, 59)
463 463 set = None
464 464 expLabel = ""
465 465 online = False
466 466 delay = 60
467 467 nTries = 3 # quantity tries
468 468 nFiles = 3 # number of files for searching
469 469 walk = True
470 470 getblock = False
471 471 nTxs = 1
472 472 realtime = False
473 473 blocksize = 0
474 474 blocktime = None
475 475 warnings = True
476 476 verbose = True
477 477 server = None
478 478 format = None
479 479 oneDDict = None
480 480 twoDDict = None
481 481 independentParam = None
482 482 filefmt = None
483 483 folderfmt = None
484 484 open_file = open
485 485 open_mode = 'rb'
486 486
487 487 def run(self):
488 488
489 489 raise NotImplementedError
490 490
491 491 def getAllowedArgs(self):
492 492 if hasattr(self, '__attrs__'):
493 493 return self.__attrs__
494 494 else:
495 495 return inspect.getargspec(self.run).args
496 496
497 497 def set_kwargs(self, **kwargs):
498 498
499 499 for key, value in kwargs.items():
500 500 setattr(self, key, value)
501 501
502 502 def find_folders(self, path, startDate, endDate, folderfmt, last=False):
503 503
504 504 folders = [x for f in path.split(',')
505 505 for x in os.listdir(f) if os.path.isdir(os.path.join(f, x))]
506 506 folders.sort()
507 507
508 508 if last:
509 509 folders = [folders[-1]]
510 510
511 511 for folder in folders:
512 512 try:
513 513 dt = datetime.datetime.strptime(parse_format(folder, folderfmt), folderfmt).date()
514 514 if dt >= startDate and dt <= endDate:
515 515 yield os.path.join(path, folder)
516 516 else:
517 517 log.log('Skiping folder {}'.format(folder), self.name)
518 518 except Exception as e:
519 519 log.log('Skiping folder {}'.format(folder), self.name)
520 520 continue
521 521 return
522 522
523 523 def find_files(self, folders, ext, filefmt, startDate=None, endDate=None,
524 524 expLabel='', last=False):
525 525
526 526 for path in folders:
527 527 files = glob.glob1(path, '*{}'.format(ext))
528 528 files.sort()
529 529 if last:
530 530 if files:
531 531 fo = files[-1]
532 532 try:
533 533 dt = datetime.datetime.strptime(parse_format(fo, filefmt), filefmt).date()
534 534 yield os.path.join(path, expLabel, fo)
535 535 except Exception as e:
536 536 pass
537 537 return
538 538 else:
539 539 return
540 540
541 541 for fo in files:
542 542 try:
543 543 dt = datetime.datetime.strptime(parse_format(fo, filefmt), filefmt).date()
544 544 #print(dt)
545 545 #print(startDate)
546 546 #print(endDate)
547 547 if dt >= startDate and dt <= endDate:
548 548
549 549 yield os.path.join(path, expLabel, fo)
550 550
551 551 else:
552 552
553 553 log.log('Skiping file {}'.format(fo), self.name)
554 554 except Exception as e:
555 555 log.log('Skiping file {}'.format(fo), self.name)
556 556 continue
557 557
558 558 def searchFilesOffLine(self, path, startDate, endDate,
559 559 expLabel, ext, walk,
560 560 filefmt, folderfmt):
561 561 """Search files in offline mode for the given arguments
562 562
563 563 Return:
564 564 Generator of files
565 565 """
566 566
567 567 if walk:
568 568 folders = self.find_folders(
569 569 path, startDate, endDate, folderfmt)
570 570 else:
571 571 folders = path.split(',')
572 572
573 573 return self.find_files(
574 574 folders, ext, filefmt, startDate, endDate, expLabel)
575 575
576 576 def searchFilesOnLine(self, path, startDate, endDate,
577 577 expLabel, ext, walk,
578 578 filefmt, folderfmt):
579 579 """Search for the last file of the last folder
580 580
581 581 Arguments:
582 582 path : carpeta donde estan contenidos los files que contiene data
583 583 expLabel : Nombre del subexperimento (subfolder)
584 584 ext : extension de los files
585 585 walk : Si es habilitado no realiza busquedas dentro de los ubdirectorios (doypath)
586 586
587 587 Return:
588 588 generator with the full path of last filename
589 589 """
590 590
591 591 if walk:
592 592 folders = self.find_folders(
593 593 path, startDate, endDate, folderfmt, last=True)
594 594 else:
595 595 folders = path.split(',')
596 596
597 597 return self.find_files(
598 598 folders, ext, filefmt, startDate, endDate, expLabel, last=True)
599 599
600 600 def setNextFile(self):
601 601 """Set the next file to be readed open it and parse de file header"""
602 602
603 603 #print("fp: ",self.fp)
604 604 while True:
605 605
606 606 #print(self.fp)
607 607 if self.fp != None:
608 608 self.fp.close()
609 609
610 610 #print("setNextFile")
611 611 #print("BEFORE OPENING",self.filename)
612 612 if self.online:
613 613 newFile = self.setNextFileOnline()
614 614
615 615 else:
616 616
617 617 newFile = self.setNextFileOffline()
618 618
619 619 #print("newFile: ",newFile)
620 620 if not(newFile):
621 621
622 622 if self.online:
623 623 raise schainpy.admin.SchainError('Time to wait for new files reach')
624 624 else:
625 625 if self.fileIndex == -1:
626 626 #print("OKK")
627 627 raise schainpy.admin.SchainWarning('No files found in the given path')
628 628 else:
629 629
630 630 raise schainpy.admin.SchainWarning('No more files to read')
631 631
632 632 if self.verifyFile(self.filename):
633 633
634 634 break
635 635
636 636 ##print("BEFORE OPENING",self.filename)
637 637
638 638 log.log('Opening file: %s' % self.filename, self.name)
639 639
640 640 self.readFirstHeader()
641 641 self.nReadBlocks = 0
642 642
643 643 def setNextFileOnline(self):
644 644 """Check for the next file to be readed in online mode.
645 645
646 646 Set:
647 647 self.filename
648 648 self.fp
649 649 self.filesize
650 650
651 651 Return:
652 652 boolean
653 653
654 654 """
655 655
656 656 nextFile = True
657 657 nextDay = False
658 658
659 659 for nFiles in range(self.nFiles + 1):
660 660 for nTries in range(self.nTries):
661 661 fullfilename, filename = self.checkForRealPath(nextFile, nextDay)
662 662 if fullfilename is not None:
663 663 break
664 664 log.warning(
665 665 "Waiting %0.2f sec for the next file: \"%s\" , try %02d ..." % (self.delay, filename, nTries + 1),
666 666 self.name)
667 667 time.sleep(self.delay)
668 668 nextFile = False
669 669 continue
670 670
671 671 if fullfilename is not None:
672 672 break
673 673
674 674 #self.nTries = 1
675 675 nextFile = True
676 676
677 677 if nFiles == (self.nFiles - 1):
678 678 log.log('Trying with next day...', self.name)
679 679 nextDay = True
680 680 self.nTries = 3
681 681
682 682 if fullfilename:
683 683 self.fileSize = os.path.getsize(fullfilename)
684 684 self.filename = fullfilename
685 685 self.flagIsNewFile = 1
686 686 if self.fp != None:
687 687 self.fp.close()
688 688 #print(fullfilename)
689 689 self.fp = self.open_file(fullfilename, self.open_mode)
690 690
691 691 self.flagNoMoreFiles = 0
692 692 self.fileIndex += 1
693 693 return 1
694 694 else:
695 695 return 0
696 696
697 697 def setNextFileOffline(self):
698 698 """Open the next file to be readed in offline mode"""
699 699
700 700 try:
701 701 filename = next(self.filenameList)
702 702 self.fileIndex += 1
703 703 except StopIteration:
704 704 self.flagNoMoreFiles = 1
705 705 return 0
706 706 #print(self.fileIndex)
707 707 #print(filename)
708 708 self.filename = filename
709 709 self.fileSize = os.path.getsize(filename)
710 710 self.fp = self.open_file(filename, self.open_mode)
711 711 self.flagIsNewFile = 1
712 712
713 713 return 1
714 714
715 715 @staticmethod
716 716 def isDateTimeInRange(dt, startDate, endDate, startTime, endTime):
717 717 """Check if the given datetime is in range"""
718 718
719 719 if startDate <= dt.date() <= endDate:
720 720 if startTime <= dt.time() <= endTime:
721 721 return True
722 722 return False
723 723
724 724 def verifyFile(self, filename):
725 725 """Check for a valid file
726 726
727 727 Arguments:
728 728 filename -- full path filename
729 729
730 730 Return:
731 731 boolean
732 732 """
733 733
734 734 return True
735 735
736 736 def checkForRealPath(self, nextFile, nextDay):
737 737 """Check if the next file to be readed exists"""
738 738
739 739 raise NotImplementedError
740 740
741 741 def readFirstHeader(self):
742 742 """Parse the file header"""
743 743
744 744
745 745 pass
746 746
747 747 def waitDataBlock(self, pointer_location, blocksize=None):
748 748 """
749 749 """
750 750
751 751 currentPointer = pointer_location
752 752 if blocksize is None:
753 753 neededSize = self.processingHeaderObj.blockSize # + self.basicHeaderSize
754 754 else:
755 755 neededSize = blocksize
756 756
757 757 for nTries in range(self.nTries):
758 758 self.fp.close()
759 759 self.fp = open(self.filename, 'rb')
760 760 self.fp.seek(currentPointer)
761 761
762 762 self.fileSize = os.path.getsize(self.filename)
763 763 currentSize = self.fileSize - currentPointer
764 764
765 765 if (currentSize >= neededSize):
766 766 return 1
767 767
768 768 log.warning(
769 769 "Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1),
770 770 self.name
771 771 )
772 772 time.sleep(self.delay)
773 773
774 774 return 0
775 775
776 776 class JRODataReader(Reader):
777 777
778 778 utc = 0
779 779 nReadBlocks = 0
780 780 foldercounter = 0
781 781 firstHeaderSize = 0
782 782 basicHeaderSize = 24
783 783 __isFirstTimeOnline = 1
784 topic = ''
784 785 filefmt = "*%Y%j***"
785 786 folderfmt = "*%Y%j"
786 787 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'online', 'delay', 'walk']
787 788
788 789 def getDtypeWidth(self):
789 790
790 791 dtype_index = get_dtype_index(self.dtype)
791 792 dtype_width = get_dtype_width(dtype_index)
792 793
793 794 return dtype_width
794 795
795 796 def checkForRealPath(self, nextFile, nextDay):
796 797 """Check if the next file to be readed exists.
797 798
798 799 Example :
799 800 nombre correcto del file es .../.../D2009307/P2009307367.ext
800 801
801 802 Entonces la funcion prueba con las siguientes combinaciones
802 803 .../.../y2009307367.ext
803 804 .../.../Y2009307367.ext
804 805 .../.../x2009307/y2009307367.ext
805 806 .../.../x2009307/Y2009307367.ext
806 807 .../.../X2009307/y2009307367.ext
807 808 .../.../X2009307/Y2009307367.ext
808 809 siendo para este caso, la ultima combinacion de letras, identica al file buscado
809 810
810 811 Return:
811 812 str -- fullpath of the file
812 813 """
813 814
814 815
815 816 if nextFile:
816 817 self.set += 1
817 818 if nextDay:
818 819 self.set = 0
819 820 self.doy += 1
820 821 foldercounter = 0
821 822 prefixDirList = [None, 'd', 'D']
822 823 if self.ext.lower() == ".r": # voltage
823 824 prefixFileList = ['d', 'D']
824 825 elif self.ext.lower() == ".pdata": # spectra
825 826 prefixFileList = ['p', 'P']
826 827
827 828 ##############DP##############
828 829
829 830 elif self.ext.lower() == ".dat": # dat
830 831 prefixFileList = ['z', 'Z']
831 832
832 833
833 834
834 835 ##############DP##############
835 836 # barrido por las combinaciones posibles
836 837 for prefixDir in prefixDirList:
837 838 thispath = self.path
838 839 if prefixDir != None:
839 840 # formo el nombre del directorio xYYYYDDD (x=d o x=D)
840 841 if foldercounter == 0:
841 842 thispath = os.path.join(self.path, "%s%04d%03d" %
842 843 (prefixDir, self.year, self.doy))
843 844 else:
844 845 thispath = os.path.join(self.path, "%s%04d%03d_%02d" % (
845 846 prefixDir, self.year, self.doy, foldercounter))
846 847 for prefixFile in prefixFileList: # barrido por las dos combinaciones posibles de "D"
847 848 # formo el nombre del file xYYYYDDDSSS.ext
848 849 filename = "%s%04d%03d%03d%s" % (prefixFile, self.year, self.doy, self.set, self.ext)
849 850 fullfilename = os.path.join(
850 851 thispath, filename)
851 852
852 853 if os.path.exists(fullfilename):
853 854 return fullfilename, filename
854 855
855 856 return None, filename
856 857
857 858 def __waitNewBlock(self):
858 859 """
859 860 Return 1 si se encontro un nuevo bloque de datos, 0 de otra forma.
860 861
861 862 Si el modo de lectura es OffLine siempre retorn 0
862 863 """
863 864 if not self.online:
864 865 return 0
865 866
866 867 if (self.nReadBlocks >= self.processingHeaderObj.dataBlocksPerFile):
867 868 return 0
868 869
869 870 currentPointer = self.fp.tell()
870 871
871 872 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
872 873
873 874 for nTries in range(self.nTries):
874 875
875 876 self.fp.close()
876 877 self.fp = open(self.filename, 'rb')
877 878 self.fp.seek(currentPointer)
878 879
879 880 self.fileSize = os.path.getsize(self.filename)
880 881 currentSize = self.fileSize - currentPointer
881 882
882 883 if (currentSize >= neededSize):
883 884 self.basicHeaderObj.read(self.fp)
884 885 return 1
885 886
886 887 if self.fileSize == self.fileSizeByHeader:
887 888 # self.flagEoF = True
888 889 return 0
889 890
890 891 print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1))
891 892 #print(self.filename)
892 893 time.sleep(self.delay)
893 894
894 895 return 0
895 896
896 897 def __setNewBlock(self):
897 898
898 899 if self.fp == None:
899 900 return 0
900 901
901 902 if self.flagIsNewFile:
902 903 self.lastUTTime = self.basicHeaderObj.utc
903 904 return 1
904 905
905 906 if self.realtime:
906 907 self.flagDiscontinuousBlock = 1
907 908 if not(self.setNextFile()):
908 909 return 0
909 910 else:
910 911 return 1
911 912
912 913 currentSize = self.fileSize - self.fp.tell()
913 914 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
914 915
915 916 if (currentSize >= neededSize):
916 917 self.basicHeaderObj.read(self.fp)
917 918 self.lastUTTime = self.basicHeaderObj.utc
918 919 return 1
919 920
920 921 if self.__waitNewBlock():
921 922 self.lastUTTime = self.basicHeaderObj.utc
922 923 return 1
923 924
924 925 if not(self.setNextFile()):
925 926 return 0
926 927
927 928 deltaTime = self.basicHeaderObj.utc - self.lastUTTime
928 929 self.lastUTTime = self.basicHeaderObj.utc
929 930
930 931 self.flagDiscontinuousBlock = 0
931 932
932 933 if deltaTime > self.maxTimeStep:
933 934 self.flagDiscontinuousBlock = 1
934 935
935 936 return 1
936 937
937 938 def readNextBlock(self):
938 939
939 940 while True:
940 941 if not(self.__setNewBlock()):
941 942 continue
942 943
943 944 if not(self.readBlock()):
944 945 return 0
945 946
946 947 self.getBasicHeader()
947 948
948 949 if not self.isDateTimeInRange(self.dataOut.datatime, self.startDate, self.endDate, self.startTime, self.endTime):
949 950 print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.nReadBlocks,
950 951 self.processingHeaderObj.dataBlocksPerFile,
951 952 self.dataOut.datatime.ctime()))
952 953 continue
953 954
954 955 break
955 956
956 957 if self.verbose:
957 958 print("[Reading] Block No. %d/%d -> %s" % (self.nReadBlocks,
958 959 self.processingHeaderObj.dataBlocksPerFile,
959 960 self.dataOut.datatime.ctime()))
960 961 #################DP#################
961 962 self.dataOut.TimeBlockDate=self.dataOut.datatime.ctime()
962 963 self.dataOut.TimeBlockSeconds=time.mktime(time.strptime(self.dataOut.datatime.ctime()))
963 964 #################DP#################
964 965 return 1
965 966
966 967 def readFirstHeader(self):
967 968
968 969 self.basicHeaderObj.read(self.fp)
969 970 self.systemHeaderObj.read(self.fp)
970 971 self.radarControllerHeaderObj.read(self.fp)
971 972 self.processingHeaderObj.read(self.fp)
972 973 self.firstHeaderSize = self.basicHeaderObj.size
973 974
974 975 datatype = int(numpy.log2((self.processingHeaderObj.processFlags &
975 976 PROCFLAG.DATATYPE_MASK)) - numpy.log2(PROCFLAG.DATATYPE_CHAR))
976 977 if datatype == 0:
977 978 datatype_str = numpy.dtype([('real', '<i1'), ('imag', '<i1')])
978 979 elif datatype == 1:
979 980 datatype_str = numpy.dtype([('real', '<i2'), ('imag', '<i2')])
980 981 elif datatype == 2:
981 982 datatype_str = numpy.dtype([('real', '<i4'), ('imag', '<i4')])
982 983 elif datatype == 3:
983 984 datatype_str = numpy.dtype([('real', '<i8'), ('imag', '<i8')])
984 985 elif datatype == 4:
985 986 datatype_str = numpy.dtype([('real', '<f4'), ('imag', '<f4')])
986 987 elif datatype == 5:
987 988 datatype_str = numpy.dtype([('real', '<f8'), ('imag', '<f8')])
988 989 else:
989 990 raise ValueError('Data type was not defined')
990 991
991 992 self.dtype = datatype_str
992 993 # self.ippSeconds = 2 * 1000 * self.radarControllerHeaderObj.ipp / self.c
993 994 self.fileSizeByHeader = self.processingHeaderObj.dataBlocksPerFile * self.processingHeaderObj.blockSize + \
994 995 self.firstHeaderSize + self.basicHeaderSize * \
995 996 (self.processingHeaderObj.dataBlocksPerFile - 1)
996 997 # self.dataOut.channelList = numpy.arange(self.systemHeaderObj.numChannels)
997 998 # self.dataOut.channelIndexList = numpy.arange(self.systemHeaderObj.numChannels)
998 999 self.getBlockDimension()
999 1000
1000 1001 def verifyFile(self, filename):
1001 1002
1002 1003 flag = True
1003 1004
1004 1005 try:
1005 1006 fp = open(filename, 'rb')
1006 1007 except IOError:
1007 1008 log.error("File {} can't be opened".format(filename), self.name)
1008 1009 return False
1009 1010
1010 1011 if self.online and self.waitDataBlock(0):
1011 1012 pass
1012 1013
1013 1014 basicHeaderObj = BasicHeader(LOCALTIME)
1014 1015 systemHeaderObj = SystemHeader()
1015 1016 radarControllerHeaderObj = RadarControllerHeader()
1016 1017 processingHeaderObj = ProcessingHeader()
1017 1018
1018 1019 if not(basicHeaderObj.read(fp)):
1019 1020 flag = False
1020 1021 if not(systemHeaderObj.read(fp)):
1021 1022 flag = False
1022 1023 if not(radarControllerHeaderObj.read(fp)):
1023 1024 flag = False
1024 1025 if not(processingHeaderObj.read(fp)):
1025 1026 flag = False
1026 1027 if not self.online:
1027 1028 dt1 = basicHeaderObj.datatime
1028 1029 pos = self.fileSize - processingHeaderObj.blockSize - 24
1029 1030 if pos < 0:
1030 1031 flag = False
1031 1032 log.error('Invalid size for file: {}'.format(self.filename), self.name)
1032 1033 else:
1033 1034 fp.seek(pos)
1034 1035 if not(basicHeaderObj.read(fp)):
1035 1036 flag = False
1036 1037 dt2 = basicHeaderObj.datatime
1037 1038 if not self.isDateTimeInRange(dt1, self.startDate, self.endDate, self.startTime, self.endTime) and not \
1038 1039 self.isDateTimeInRange(dt2, self.startDate, self.endDate, self.startTime, self.endTime):
1039 1040 flag = False
1040 1041
1041 1042 fp.close()
1042 1043 return flag
1043 1044
1044 1045 def findDatafiles(self, path, startDate=None, endDate=None, expLabel='', ext='.r', walk=True, include_path=False):
1045 1046
1046 1047 path_empty = True
1047 1048
1048 1049 dateList = []
1049 1050 pathList = []
1050 1051
1051 1052 multi_path = path.split(',')
1052 1053
1053 1054 if not walk:
1054 1055
1055 1056 for single_path in multi_path:
1056 1057
1057 1058 if not os.path.isdir(single_path):
1058 1059 continue
1059 1060
1060 1061 fileList = glob.glob1(single_path, "*" + ext)
1061 1062
1062 1063 if not fileList:
1063 1064 continue
1064 1065
1065 1066 path_empty = False
1066 1067
1067 1068 fileList.sort()
1068 1069
1069 1070 for thisFile in fileList:
1070 1071
1071 1072 if not os.path.isfile(os.path.join(single_path, thisFile)):
1072 1073 continue
1073 1074
1074 1075 if not isRadarFile(thisFile):
1075 1076 continue
1076 1077
1077 1078 if not isFileInDateRange(thisFile, startDate, endDate):
1078 1079 continue
1079 1080
1080 1081 thisDate = getDateFromRadarFile(thisFile)
1081 1082
1082 1083 if thisDate in dateList or single_path in pathList:
1083 1084 continue
1084 1085
1085 1086 dateList.append(thisDate)
1086 1087 pathList.append(single_path)
1087 1088
1088 1089 else:
1089 1090 for single_path in multi_path:
1090 1091
1091 1092 if not os.path.isdir(single_path):
1092 1093 continue
1093 1094
1094 1095 dirList = []
1095 1096
1096 1097 for thisPath in os.listdir(single_path):
1097 1098
1098 1099 if not os.path.isdir(os.path.join(single_path, thisPath)):
1099 1100 continue
1100 1101
1101 1102 if not isRadarFolder(thisPath):
1102 1103 continue
1103 1104
1104 1105 if not isFolderInDateRange(thisPath, startDate, endDate):
1105 1106 continue
1106 1107
1107 1108 dirList.append(thisPath)
1108 1109
1109 1110 if not dirList:
1110 1111 continue
1111 1112
1112 1113 dirList.sort()
1113 1114
1114 1115 for thisDir in dirList:
1115 1116
1116 1117 datapath = os.path.join(single_path, thisDir, expLabel)
1117 1118 fileList = glob.glob1(datapath, "*" + ext)
1118 1119
1119 1120 if not fileList:
1120 1121 continue
1121 1122
1122 1123 path_empty = False
1123 1124
1124 1125 thisDate = getDateFromRadarFolder(thisDir)
1125 1126
1126 1127 pathList.append(datapath)
1127 1128 dateList.append(thisDate)
1128 1129
1129 1130 dateList.sort()
1130 1131
1131 1132 if walk:
1132 1133 pattern_path = os.path.join(multi_path[0], "[dYYYYDDD]", expLabel)
1133 1134 else:
1134 1135 pattern_path = multi_path[0]
1135 1136
1136 1137 if path_empty:
1137 1138 raise schainpy.admin.SchainError("[Reading] No *%s files in %s for %s to %s" % (ext, pattern_path, startDate, endDate))
1138 1139 else:
1139 1140 if not dateList:
1140 1141 raise schainpy.admin.SchainError("[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" % (startDate, endDate, ext, path))
1141 1142
1142 1143 if include_path:
1143 1144 return dateList, pathList
1144 1145
1145 1146 return dateList
1146 1147
1147 1148 def setup(self, **kwargs):
1148 1149
1149 1150 self.set_kwargs(**kwargs)
1150 1151 if not self.ext.startswith('.'):
1151 1152 self.ext = '.{}'.format(self.ext)
1152 1153
1153 1154 if self.server is not None:
1154 1155 if 'tcp://' in self.server:
1155 address = server
1156 address = self.server
1156 1157 else:
1157 1158 address = 'ipc:///tmp/%s' % self.server
1158 1159 self.server = address
1159 1160 self.context = zmq.Context()
1160 self.receiver = self.context.socket(zmq.PULL)
1161 self.receiver = self.context.socket(zmq.SUB)
1161 1162 self.receiver.connect(self.server)
1163 self.receiver.setsockopt(zmq.SUBSCRIBE, str.encode(str(self.topic)))
1162 1164 time.sleep(0.5)
1163 1165 print('[Starting] ReceiverData from {}'.format(self.server))
1164 1166 else:
1165 1167 self.server = None
1166 1168 if self.path == None:
1167 1169 raise ValueError("[Reading] The path is not valid")
1168 1170
1169 1171 if self.online:
1170 1172 log.log("[Reading] Searching files in online mode...", self.name)
1171 1173
1172 1174 for nTries in range(self.nTries):
1173 1175 fullpath = self.searchFilesOnLine(self.path, self.startDate,
1174 1176 self.endDate, self.expLabel, self.ext, self.walk,
1175 1177 self.filefmt, self.folderfmt)
1176 1178
1177 1179 try:
1178 1180 fullpath = next(fullpath)
1179 1181 except:
1180 1182 fullpath = None
1181 1183
1182 1184 if fullpath:
1183 1185 break
1184 1186
1185 1187 log.warning(
1186 1188 'Waiting {} sec for a valid file in {}: try {} ...'.format(
1187 1189 self.delay, self.path, nTries + 1),
1188 1190 self.name)
1189 1191 time.sleep(self.delay)
1190 1192
1191 1193 if not(fullpath):
1192 1194 raise schainpy.admin.SchainError(
1193 1195 'There isn\'t any valid file in {}'.format(self.path))
1194 1196
1195 1197 pathname, filename = os.path.split(fullpath)
1196 1198 self.year = int(filename[1:5])
1197 1199 self.doy = int(filename[5:8])
1198 1200 self.set = int(filename[8:11]) - 1
1199 1201 else:
1200 1202 log.log("Searching files in {}".format(self.path), self.name)
1201 1203 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
1202 1204 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
1203 1205
1204 1206 self.setNextFile()
1205 1207
1206 1208 return
1207 1209
1208 1210 def getBasicHeader(self):
1209 1211
1210 1212 self.dataOut.utctime = self.basicHeaderObj.utc + self.basicHeaderObj.miliSecond / \
1211 1213 1000. + self.profileIndex * self.radarControllerHeaderObj.ippSeconds
1212 1214
1213 1215 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
1214 1216
1215 1217 self.dataOut.timeZone = self.basicHeaderObj.timeZone
1216 1218
1217 1219 self.dataOut.dstFlag = self.basicHeaderObj.dstFlag
1218 1220
1219 1221 self.dataOut.errorCount = self.basicHeaderObj.errorCount
1220 1222
1221 1223 self.dataOut.useLocalTime = self.basicHeaderObj.useLocalTime
1222 1224
1223 1225 self.dataOut.ippSeconds = self.radarControllerHeaderObj.ippSeconds / self.nTxs
1224 1226
1225 1227 def getFirstHeader(self):
1226 1228
1227 1229 raise NotImplementedError
1228 1230
1229 1231 def getData(self):
1230 1232
1231 1233 raise NotImplementedError
1232 1234
1233 1235 def hasNotDataInBuffer(self):
1234 1236
1235 1237 raise NotImplementedError
1236 1238
1237 1239 def readBlock(self):
1238 1240
1239 1241 raise NotImplementedError
1240 1242
1241 1243 def isEndProcess(self):
1242 1244
1243 1245 return self.flagNoMoreFiles
1244 1246
1245 1247 def printReadBlocks(self):
1246 1248
1247 1249 print("[Reading] Number of read blocks per file %04d" % self.nReadBlocks)
1248 1250
1249 1251 def printTotalBlocks(self):
1250 1252
1251 1253 print("[Reading] Number of read blocks %04d" % self.nTotalBlocks)
1252 1254
1253 1255 def run(self, **kwargs):
1254 1256 """
1255 1257
1256 1258 Arguments:
1257 1259 path :
1258 1260 startDate :
1259 1261 endDate :
1260 1262 startTime :
1261 1263 endTime :
1262 1264 set :
1263 1265 expLabel :
1264 1266 ext :
1265 1267 online :
1266 1268 delay :
1267 1269 walk :
1268 1270 getblock :
1269 1271 nTxs :
1270 1272 realtime :
1271 1273 blocksize :
1272 1274 blocktime :
1273 1275 skip :
1274 1276 cursor :
1275 1277 warnings :
1276 1278 server :
1277 1279 verbose :
1278 1280 format :
1279 1281 oneDDict :
1280 1282 twoDDict :
1281 1283 independentParam :
1282 1284 """
1283 1285
1284 1286 if not(self.isConfig):
1285 1287 self.setup(**kwargs)
1286 1288 self.isConfig = True
1287 1289 if self.server is None:
1288 1290 self.getData()
1289 1291 else:
1290 self.getFromServer()
1292 try:
1293 self.getFromServer()
1294 except Exception as e:
1295 log.warning('Invalid block...')
1296 self.dataOut.flagNoData = True
1291 1297
1292 1298
1293 1299 class JRODataWriter(Reader):
1294 1300
1295 1301 """
1296 1302 Esta clase permite escribir datos a archivos procesados (.r o ,pdata). La escritura
1297 1303 de los datos siempre se realiza por bloques.
1298 1304 """
1299 1305
1300 1306 setFile = None
1301 1307 profilesPerBlock = None
1302 1308 blocksPerFile = None
1303 1309 nWriteBlocks = 0
1304 1310 fileDate = None
1305 1311
1306 1312 def __init__(self, dataOut=None):
1307 1313 raise NotImplementedError
1308 1314
1309 1315 def hasAllDataInBuffer(self):
1310 1316 raise NotImplementedError
1311 1317
1312 1318 def setBlockDimension(self):
1313 1319 raise NotImplementedError
1314 1320
1315 1321 def writeBlock(self):
1316 1322 raise NotImplementedError
1317 1323
1318 1324 def putData(self):
1319 1325 raise NotImplementedError
1320 1326
1321 1327 def getDtypeWidth(self):
1322 1328
1323 1329 dtype_index = get_dtype_index(self.dtype)
1324 1330 dtype_width = get_dtype_width(dtype_index)
1325 1331
1326 1332 return dtype_width
1327 1333
1328 1334 def getProcessFlags(self):
1329 1335
1330 1336 processFlags = 0
1331 1337
1332 1338 dtype_index = get_dtype_index(self.dtype)
1333 1339 procflag_dtype = get_procflag_dtype(dtype_index)
1334 1340
1335 1341 processFlags += procflag_dtype
1336 1342
1337 1343 if self.dataOut.flagDecodeData:
1338 1344 processFlags += PROCFLAG.DECODE_DATA
1339 1345
1340 1346 if self.dataOut.flagDeflipData:
1341 1347 processFlags += PROCFLAG.DEFLIP_DATA
1342 1348
1343 1349 if self.dataOut.code is not None:
1344 1350 processFlags += PROCFLAG.DEFINE_PROCESS_CODE
1345 1351
1346 1352 if self.dataOut.nCohInt > 1:
1347 1353 processFlags += PROCFLAG.COHERENT_INTEGRATION
1348 1354
1349 1355 if self.dataOut.type == "Spectra":
1350 1356 if self.dataOut.nIncohInt > 1:
1351 1357 processFlags += PROCFLAG.INCOHERENT_INTEGRATION
1352 1358
1353 1359 if self.dataOut.data_dc is not None:
1354 1360 processFlags += PROCFLAG.SAVE_CHANNELS_DC
1355 1361
1356 1362 if self.dataOut.flagShiftFFT:
1357 1363 processFlags += PROCFLAG.SHIFT_FFT_DATA
1358 1364
1359 1365 return processFlags
1360 1366
1361 1367 def setBasicHeader(self):
1362 1368
1363 1369 self.basicHeaderObj.size = self.basicHeaderSize # bytes
1364 1370 self.basicHeaderObj.version = self.versionFile
1365 1371 self.basicHeaderObj.dataBlock = self.nTotalBlocks
1366 1372 utc = numpy.floor(self.dataOut.utctime)
1367 1373 milisecond = (self.dataOut.utctime - utc) * 1000.0
1368 1374 self.basicHeaderObj.utc = utc
1369 1375 self.basicHeaderObj.miliSecond = milisecond
1370 1376 self.basicHeaderObj.timeZone = self.dataOut.timeZone
1371 1377 self.basicHeaderObj.dstFlag = self.dataOut.dstFlag
1372 1378 self.basicHeaderObj.errorCount = self.dataOut.errorCount
1373 1379
1374 1380 def setFirstHeader(self):
1375 1381 """
1376 1382 Obtiene una copia del First Header
1377 1383
1378 1384 Affected:
1379 1385
1380 1386 self.basicHeaderObj
1381 1387 self.systemHeaderObj
1382 1388 self.radarControllerHeaderObj
1383 1389 self.processingHeaderObj self.
1384 1390
1385 1391 Return:
1386 1392 None
1387 1393 """
1388 1394
1389 1395 raise NotImplementedError
1390 1396
1391 1397 def __writeFirstHeader(self):
1392 1398 """
1393 1399 Escribe el primer header del file es decir el Basic header y el Long header (SystemHeader, RadarControllerHeader, ProcessingHeader)
1394 1400
1395 1401 Affected:
1396 1402 __dataType
1397 1403
1398 1404 Return:
1399 1405 None
1400 1406 """
1401 1407
1402 1408 # CALCULAR PARAMETROS
1403 1409
1404 1410 sizeLongHeader = self.systemHeaderObj.size + \
1405 1411 self.radarControllerHeaderObj.size + self.processingHeaderObj.size
1406 1412 self.basicHeaderObj.size = self.basicHeaderSize + sizeLongHeader
1407 1413
1408 1414 self.basicHeaderObj.write(self.fp)
1409 1415 self.systemHeaderObj.write(self.fp)
1410 1416 self.radarControllerHeaderObj.write(self.fp)
1411 1417 self.processingHeaderObj.write(self.fp)
1412 1418
1413 1419 def __setNewBlock(self):
1414 1420 """
1415 1421 Si es un nuevo file escribe el First Header caso contrario escribe solo el Basic Header
1416 1422
1417 1423 Return:
1418 1424 0 : si no pudo escribir nada
1419 1425 1 : Si escribio el Basic el First Header
1420 1426 """
1421 1427 if self.fp == None:
1422 1428 self.setNextFile()
1423 1429
1424 1430 if self.flagIsNewFile:
1425 1431 return 1
1426 1432
1427 1433 if self.blockIndex < self.processingHeaderObj.dataBlocksPerFile:
1428 1434 self.basicHeaderObj.write(self.fp)
1429 1435 return 1
1430 1436
1431 1437 if not(self.setNextFile()):
1432 1438 return 0
1433 1439
1434 1440 return 1
1435 1441
1436 1442 def writeNextBlock(self):
1437 1443 """
1438 1444 Selecciona el bloque siguiente de datos y los escribe en un file
1439 1445
1440 1446 Return:
1441 1447 0 : Si no hizo pudo escribir el bloque de datos
1442 1448 1 : Si no pudo escribir el bloque de datos
1443 1449 """
1444 1450 if not(self.__setNewBlock()):
1445 1451 return 0
1446 1452
1447 1453 self.writeBlock()
1448 1454
1449 1455 print("[Writing] Block No. %d/%d" % (self.blockIndex,
1450 1456 self.processingHeaderObj.dataBlocksPerFile))
1451 1457
1452 1458 return 1
1453 1459
1454 1460 def setNextFile(self):
1455 1461 """Determina el siguiente file que sera escrito
1456 1462
1457 1463 Affected:
1458 1464 self.filename
1459 1465 self.subfolder
1460 1466 self.fp
1461 1467 self.setFile
1462 1468 self.flagIsNewFile
1463 1469
1464 1470 Return:
1465 1471 0 : Si el archivo no puede ser escrito
1466 1472 1 : Si el archivo esta listo para ser escrito
1467 1473 """
1468 1474 ext = self.ext
1469 1475 path = self.path
1470 1476
1471 1477 if self.fp != None:
1472 1478 self.fp.close()
1473 1479
1474 1480 if not os.path.exists(path):
1475 1481 os.mkdir(path)
1476 1482
1477 1483 timeTuple = time.localtime(self.dataOut.utctime)
1478 1484 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year, timeTuple.tm_yday)
1479 1485
1480 1486 fullpath = os.path.join(path, subfolder)
1481 1487 setFile = self.setFile
1482 1488
1483 1489 if not(os.path.exists(fullpath)):
1484 1490 os.mkdir(fullpath)
1485 1491 setFile = -1 # inicializo mi contador de seteo
1486 1492 else:
1487 1493 filesList = os.listdir(fullpath)
1488 1494 if len(filesList) > 0:
1489 1495 filesList = sorted(filesList, key=str.lower)
1490 1496 filen = filesList[-1]
1491 1497 # el filename debera tener el siguiente formato
1492 1498 # 0 1234 567 89A BCDE (hex)
1493 1499 # x YYYY DDD SSS .ext
1494 1500 if isNumber(filen[8:11]):
1495 1501 # inicializo mi contador de seteo al seteo del ultimo file
1496 1502 setFile = int(filen[8:11])
1497 1503 else:
1498 1504 setFile = -1
1499 1505 else:
1500 1506 setFile = -1 # inicializo mi contador de seteo
1501 1507
1502 1508 setFile += 1
1503 1509
1504 1510 # If this is a new day it resets some values
1505 1511 if self.dataOut.datatime.date() > self.fileDate:
1506 1512 setFile = 0
1507 1513 self.nTotalBlocks = 0
1508 1514
1509 1515 filen = '{}{:04d}{:03d}{:03d}{}'.format(
1510 1516 self.optchar, timeTuple.tm_year, timeTuple.tm_yday, setFile, ext)
1511 1517
1512 1518 filename = os.path.join(path, subfolder, filen)
1513 1519
1514 1520 fp = open(filename, 'wb')
1515 1521
1516 1522 self.blockIndex = 0
1517 1523 self.filename = filename
1518 1524 self.subfolder = subfolder
1519 1525 self.fp = fp
1520 1526 self.setFile = setFile
1521 1527 self.flagIsNewFile = 1
1522 1528 self.fileDate = self.dataOut.datatime.date()
1523 1529 self.setFirstHeader()
1524 1530
1525 1531 print('[Writing] Opening file: %s' % self.filename)
1526 1532
1527 1533 self.__writeFirstHeader()
1528 1534
1529 1535 return 1
1530 1536
1531 1537 def setup(self, dataOut, path, blocksPerFile, profilesPerBlock=64, set=None, ext=None, datatype=4):
1532 1538 """
1533 1539 Setea el tipo de formato en la cual sera guardada la data y escribe el First Header
1534 1540
1535 1541 Inputs:
1536 1542 path : directory where data will be saved
1537 1543 profilesPerBlock : number of profiles per block
1538 1544 set : initial file set
1539 1545 datatype : An integer number that defines data type:
1540 1546 0 : int8 (1 byte)
1541 1547 1 : int16 (2 bytes)
1542 1548 2 : int32 (4 bytes)
1543 1549 3 : int64 (8 bytes)
1544 1550 4 : float32 (4 bytes)
1545 1551 5 : double64 (8 bytes)
1546 1552
1547 1553 Return:
1548 1554 0 : Si no realizo un buen seteo
1549 1555 1 : Si realizo un buen seteo
1550 1556 """
1551 1557
1552 1558 if ext == None:
1553 1559 ext = self.ext
1554 1560
1555 1561 self.ext = ext.lower()
1556 1562
1557 1563 self.path = path
1558 1564
1559 1565 if set is None:
1560 1566 self.setFile = -1
1561 1567 else:
1562 1568 self.setFile = set - 1
1563 1569
1564 1570 self.blocksPerFile = blocksPerFile
1565 1571 self.profilesPerBlock = profilesPerBlock
1566 1572 self.dataOut = dataOut
1567 1573 self.fileDate = self.dataOut.datatime.date()
1568 1574 self.dtype = self.dataOut.dtype
1569 1575
1570 1576 if datatype is not None:
1571 1577 self.dtype = get_numpy_dtype(datatype)
1572 1578
1573 1579 if not(self.setNextFile()):
1574 1580 print("[Writing] There isn't a next file")
1575 1581 return 0
1576 1582
1577 1583 self.setBlockDimension()
1578 1584
1579 1585 return 1
1580 1586
1581 1587 def run(self, dataOut, path, blocksPerFile=100, profilesPerBlock=64, set=None, ext=None, datatype=4, **kwargs):
1582 1588
1583 1589 if not(self.isConfig):
1584 1590
1585 1591 self.setup(dataOut, path, blocksPerFile, profilesPerBlock=profilesPerBlock,
1586 1592 set=set, ext=ext, datatype=datatype, **kwargs)
1587 1593 self.isConfig = True
1588 1594
1589 1595 self.dataOut = dataOut
1590 1596 self.putData()
1591 1597 return self.dataOut
1592 1598
1593 1599 @MPDecorator
1594 1600 class printInfo(Operation):
1595 1601
1596 1602 def __init__(self):
1597 1603
1598 1604 Operation.__init__(self)
1599 1605 self.__printInfo = True
1600 1606
1601 1607 def run(self, dataOut, headers=['systemHeaderObj', 'radarControllerHeaderObj', 'processingHeaderObj']):
1602 1608 if self.__printInfo == False:
1603 1609 return
1604 1610
1605 1611 for header in headers:
1606 1612 if hasattr(dataOut, header):
1607 1613 obj = getattr(dataOut, header)
1608 1614 if hasattr(obj, 'printInfo'):
1609 1615 obj.printInfo()
1610 1616 else:
1611 1617 print(obj)
1612 1618 else:
1613 1619 log.warning('Header {} Not found in object'.format(header))
1614 1620
1615 1621 self.__printInfo = False
@@ -1,692 +1,699
1 1 '''
2 2 Created on Jul 2, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6
7 7 import numpy
8 8
9 9 from .jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
12 12 from schainpy.model.data.jrodata import Voltage
13 13
14 14
15 15 class VoltageReader(JRODataReader, ProcessingUnit):
16 16 """
17 17 Esta clase permite leer datos de voltage desde archivos en formato rawdata (.r). La lectura
18 18 de los datos siempre se realiza por bloques. Los datos leidos (array de 3 dimensiones:
19 19 perfiles*alturas*canales) son almacenados en la variable "buffer".
20 20
21 21 perfiles * alturas * canales
22 22
23 23 Esta clase contiene instancias (objetos) de las clases BasicHeader, SystemHeader,
24 24 RadarControllerHeader y Voltage. Los tres primeros se usan para almacenar informacion de la
25 25 cabecera de datos (metadata), y el cuarto (Voltage) para obtener y almacenar un perfil de
26 26 datos desde el "buffer" cada vez que se ejecute el metodo "getData".
27 27
28 28 Example:
29 29
30 30 dpath = "/home/myuser/data"
31 31
32 32 startTime = datetime.datetime(2010,1,20,0,0,0,0,0,0)
33 33
34 34 endTime = datetime.datetime(2010,1,21,23,59,59,0,0,0)
35 35
36 36 readerObj = VoltageReader()
37 37
38 38 readerObj.setup(dpath, startTime, endTime)
39 39
40 40 while(True):
41 41
42 42 #to get one profile
43 43 profile = readerObj.getData()
44 44
45 45 #print the profile
46 46 print profile
47 47
48 48 #If you want to see all datablock
49 49 print readerObj.datablock
50 50
51 51 if readerObj.flagNoMoreFiles:
52 52 break
53 53
54 54 """
55 55
56 56 def __init__(self):
57 57 """
58 58 Inicializador de la clase VoltageReader para la lectura de datos de voltage.
59 59
60 60 Input:
61 61 dataOut : Objeto de la clase Voltage. Este objeto sera utilizado para
62 62 almacenar un perfil de datos cada vez que se haga un requerimiento
63 63 (getData). El perfil sera obtenido a partir del buffer de datos,
64 64 si el buffer esta vacio se hara un nuevo proceso de lectura de un
65 65 bloque de datos.
66 66 Si este parametro no es pasado se creara uno internamente.
67 67
68 68 Variables afectadas:
69 69 self.dataOut
70 70
71 71 Return:
72 72 None
73 73 """
74 74
75 75 ProcessingUnit.__init__(self)
76 76
77 77 self.ext = ".r"
78 78 self.optchar = "D"
79 79 self.basicHeaderObj = BasicHeader(LOCALTIME)
80 80 self.systemHeaderObj = SystemHeader()
81 81 self.radarControllerHeaderObj = RadarControllerHeader()
82 82
83 83 self.processingHeaderObj = ProcessingHeader()
84 84 self.lastUTTime = 0
85 85 self.profileIndex = 2 ** 32 - 1
86 86 self.dataOut = Voltage()
87 87 self.selBlocksize = None
88 88 self.selBlocktime = None
89 89 ##print("1--OKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKK")
90 90 def createObjByDefault(self):
91 91 ##print("2--OKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKK")
92 92 dataObj = Voltage()
93 93
94 94 return dataObj
95 95
96 96 def __hasNotDataInBuffer(self):
97 97 ##print("3--OKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKK")
98 98 if self.profileIndex >= self.processingHeaderObj.profilesPerBlock * self.nTxs:
99 99 return 1
100 100
101 101 return 0
102 102
103 103 def getBlockDimension(self):
104 104 """
105 105 Obtiene la cantidad de puntos a leer por cada bloque de datos
106 106
107 107 Affected:
108 108 self.blocksize
109 109
110 110 Return:
111 111 None
112 112 """
113 113 ##print("4--OKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKK")
114 114 pts2read = self.processingHeaderObj.profilesPerBlock * \
115 115 self.processingHeaderObj.nHeights * self.systemHeaderObj.nChannels
116 116 self.blocksize = pts2read
117 117
118 118 def readBlock(self):
119 119
120 120 """
121 121 readBlock lee el bloque de datos desde la posicion actual del puntero del archivo
122 122 (self.fp) y actualiza todos los parametros relacionados al bloque de datos
123 123 (metadata + data). La data leida es almacenada en el buffer y el contador del buffer
124 124 es seteado a 0
125 125
126 126 Inputs:
127 127 None
128 128
129 129 Return:
130 130 None
131 131
132 132 Affected:
133 133 self.profileIndex
134 134 self.datablock
135 135 self.flagIsNewFile
136 136 self.flagIsNewBlock
137 137 self.nTotalBlocks
138 138
139 139 Exceptions:
140 140 Si un bloque leido no es un bloque valido
141 141 """
142 142 ##print("5--OKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKK")
143 143 # if self.server is not None:
144 144 # self.zBlock = self.receiver.recv()
145 145 # self.zHeader = self.zBlock[:24]
146 146 # self.zDataBlock = self.zBlock[24:]
147 147 # junk = numpy.fromstring(self.zDataBlock, numpy.dtype([('real','<i4'),('imag','<i4')]))
148 148 # self.processingHeaderObj.profilesPerBlock = 240
149 149 # self.processingHeaderObj.nHeights = 248
150 150 # self.systemHeaderObj.nChannels
151 151 # else:
152 152 current_pointer_location = self.fp.tell()
153 153 junk = numpy.fromfile(self.fp, self.dtype, self.blocksize)
154 154
155 155 try:
156 156 junk = junk.reshape((self.processingHeaderObj.profilesPerBlock,
157 157 self.processingHeaderObj.nHeights, self.systemHeaderObj.nChannels))
158 158 except:
159 159 # print "The read block (%3d) has not enough data" %self.nReadBlocks
160 160
161 161 if self.waitDataBlock(pointer_location=current_pointer_location):
162 162 junk = numpy.fromfile(self.fp, self.dtype, self.blocksize)
163 163 junk = junk.reshape((self.processingHeaderObj.profilesPerBlock,
164 164 self.processingHeaderObj.nHeights, self.systemHeaderObj.nChannels))
165 165 # return 0
166 166
167 167 # Dimensions : nChannels, nProfiles, nSamples
168 168
169 169 junk = numpy.transpose(junk, (2, 0, 1))
170 170 self.datablock = junk['real'] + junk['imag'] * 1j
171 171
172 172 self.profileIndex = 0
173 173
174 174 self.flagIsNewFile = 0
175 175 self.flagIsNewBlock = 1
176 176
177 177 self.nTotalBlocks += 1
178 178 self.nReadBlocks += 1
179 179
180 180 return 1
181 181
182 182 def getFirstHeader(self):
183 183 ##print("6--OKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKK")
184 184
185 185 self.getBasicHeader()
186 186
187 187 self.dataOut.processingHeaderObj = self.processingHeaderObj.copy()
188 188
189 189 self.dataOut.systemHeaderObj = self.systemHeaderObj.copy()
190 190
191 191 self.dataOut.radarControllerHeaderObj = self.radarControllerHeaderObj.copy()
192 192
193 193 #self.dataOut.ippSeconds_general=self.radarControllerHeaderObj.ippSeconds
194 194 #print(self.nTxs)
195 195 if self.nTxs > 1:
196 196 #print(self.radarControllerHeaderObj.ippSeconds)
197 197 self.dataOut.radarControllerHeaderObj.ippSeconds = self.radarControllerHeaderObj.ippSeconds / self.nTxs
198 198 #print(self.radarControllerHeaderObj.ippSeconds)
199 199 # Time interval and code are propierties of dataOut. Its value depends of radarControllerHeaderObj.
200 200
201 201 # self.dataOut.timeInterval = self.radarControllerHeaderObj.ippSeconds * self.processingHeaderObj.nCohInt
202 202 #
203 203 # if self.radarControllerHeaderObj.code is not None:
204 204 #
205 205 # self.dataOut.nCode = self.radarControllerHeaderObj.nCode
206 206 #
207 207 # self.dataOut.nBaud = self.radarControllerHeaderObj.nBaud
208 208 #
209 209 # self.dataOut.code = self.radarControllerHeaderObj.code
210 210
211 211 self.dataOut.dtype = self.dtype
212 212
213 213 self.dataOut.nProfiles = self.processingHeaderObj.profilesPerBlock
214 214
215 215 self.dataOut.heightList = numpy.arange(
216 216 self.processingHeaderObj.nHeights) * self.processingHeaderObj.deltaHeight + self.processingHeaderObj.firstHeight
217 217
218 218 self.dataOut.channelList = list(range(self.systemHeaderObj.nChannels))
219 219
220 220 self.dataOut.nCohInt = self.processingHeaderObj.nCohInt
221 221
222 222 # asumo q la data no esta decodificada
223 223 self.dataOut.flagDecodeData = self.processingHeaderObj.flag_decode
224 224
225 225 # asumo q la data no esta sin flip
226 226 self.dataOut.flagDeflipData = self.processingHeaderObj.flag_deflip
227 227
228 228 self.dataOut.flagShiftFFT = self.processingHeaderObj.shif_fft
229 229
230 230 def reshapeData(self):
231 231 ##print("7--OKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKK")
232 232 if self.nTxs < 0:
233 233 return
234 234
235 235 if self.nTxs == 1:
236 236 return
237 237
238 238 if self.nTxs < 1 and self.processingHeaderObj.profilesPerBlock % (1. / self.nTxs) != 0:
239 239 raise ValueError("1./nTxs (=%f), should be a multiple of nProfiles (=%d)" % (
240 240 1. / self.nTxs, self.processingHeaderObj.profilesPerBlock))
241 241
242 242 if self.nTxs > 1 and self.processingHeaderObj.nHeights % self.nTxs != 0:
243 243 raise ValueError("nTxs (=%d), should be a multiple of nHeights (=%d)" % (
244 244 self.nTxs, self.processingHeaderObj.nHeights))
245 245
246 246 self.datablock = self.datablock.reshape(
247 247 (self.systemHeaderObj.nChannels, self.processingHeaderObj.profilesPerBlock * self.nTxs, int(self.processingHeaderObj.nHeights / self.nTxs)))
248 248
249 249 self.dataOut.nProfiles = self.processingHeaderObj.profilesPerBlock * self.nTxs
250 250 self.dataOut.heightList = numpy.arange(self.processingHeaderObj.nHeights / self.nTxs) * \
251 251 self.processingHeaderObj.deltaHeight + self.processingHeaderObj.firstHeight
252 252 self.dataOut.radarControllerHeaderObj.ippSeconds = self.radarControllerHeaderObj.ippSeconds / self.nTxs
253 253
254 254 return
255 255
256 256 def readFirstHeaderFromServer(self):
257 257
258 258 ##print("8--OKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKK")
259 259 self.getFirstHeader()
260 260
261 261 self.firstHeaderSize = self.basicHeaderObj.size
262 262
263 263 datatype = int(numpy.log2((self.processingHeaderObj.processFlags &
264 264 PROCFLAG.DATATYPE_MASK)) - numpy.log2(PROCFLAG.DATATYPE_CHAR))
265 265 if datatype == 0:
266 266 datatype_str = numpy.dtype([('real', '<i1'), ('imag', '<i1')])
267 267 elif datatype == 1:
268 268 datatype_str = numpy.dtype([('real', '<i2'), ('imag', '<i2')])
269 269 elif datatype == 2:
270 270 datatype_str = numpy.dtype([('real', '<i4'), ('imag', '<i4')])
271 271 elif datatype == 3:
272 272 datatype_str = numpy.dtype([('real', '<i8'), ('imag', '<i8')])
273 273 elif datatype == 4:
274 274 datatype_str = numpy.dtype([('real', '<f4'), ('imag', '<f4')])
275 275 elif datatype == 5:
276 276 datatype_str = numpy.dtype([('real', '<f8'), ('imag', '<f8')])
277 277 else:
278 278 raise ValueError('Data type was not defined')
279 279
280 280 self.dtype = datatype_str
281 281 # self.ippSeconds = 2 * 1000 * self.radarControllerHeaderObj.ipp / self.c
282 282 self.fileSizeByHeader = self.processingHeaderObj.dataBlocksPerFile * self.processingHeaderObj.blockSize + \
283 283 self.firstHeaderSize + self.basicHeaderSize * \
284 284 (self.processingHeaderObj.dataBlocksPerFile - 1)
285 285 # self.dataOut.channelList = numpy.arange(self.systemHeaderObj.numChannels)
286 286 # self.dataOut.channelIndexList = numpy.arange(self.systemHeaderObj.numChannels)
287 287 self.getBlockDimension()
288 288
289 289 def getFromServer(self):
290 290 ##print("9--OKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKK")
291 291 self.flagDiscontinuousBlock = 0
292 292 self.profileIndex = 0
293 293 self.flagIsNewBlock = 1
294 294 self.dataOut.flagNoData = False
295 295 self.nTotalBlocks += 1
296 296 self.nReadBlocks += 1
297 297 self.blockPointer = 0
298 298
299 299 block = self.receiver.recv()
300 300
301 301 self.basicHeaderObj.read(block[self.blockPointer:])
302 302 self.blockPointer += self.basicHeaderObj.length
303 303 self.systemHeaderObj.read(block[self.blockPointer:])
304 304 self.blockPointer += self.systemHeaderObj.length
305 305 self.radarControllerHeaderObj.read(block[self.blockPointer:])
306 306 self.blockPointer += self.radarControllerHeaderObj.length
307 307 self.processingHeaderObj.read(block[self.blockPointer:])
308 308 self.blockPointer += self.processingHeaderObj.length
309 309 self.readFirstHeaderFromServer()
310 310
311 311 timestamp = self.basicHeaderObj.get_datatime()
312 print('[Reading] - Block {} - {}'.format(self.nTotalBlocks, timestamp))
312 print('[Receiving] - Block {} - {}'.format(self.nTotalBlocks, timestamp))
313 if self.nTotalBlocks == self.processingHeaderObj.dataBlocksPerFile:
314 self.nTotalBlocks = 0
315 self.nReadBlocks = 0
316 print('Receiving the next stream...')
313 317 current_pointer_location = self.blockPointer
314 318 junk = numpy.fromstring(
315 319 block[self.blockPointer:], self.dtype, self.blocksize)
316 320
317 321 try:
318 322 junk = junk.reshape((self.processingHeaderObj.profilesPerBlock,
319 323 self.processingHeaderObj.nHeights, self.systemHeaderObj.nChannels))
320 324 except:
321 325 # print "The read block (%3d) has not enough data" %self.nReadBlocks
322 326 if self.waitDataBlock(pointer_location=current_pointer_location):
323 327 junk = numpy.fromstring(
324 328 block[self.blockPointer:], self.dtype, self.blocksize)
325 329 junk = junk.reshape((self.processingHeaderObj.profilesPerBlock,
326 330 self.processingHeaderObj.nHeights, self.systemHeaderObj.nChannels))
327 331 # return 0
328 332
329 333 # Dimensions : nChannels, nProfiles, nSamples
330 334
331 335 junk = numpy.transpose(junk, (2, 0, 1))
332 336 self.datablock = junk['real'] + junk['imag'] * 1j
333 337 self.profileIndex = 0
334 338 if self.selBlocksize == None:
335 339 self.selBlocksize = self.dataOut.nProfiles
336 340 if self.selBlocktime != None:
337 341 if self.dataOut.nCohInt is not None:
338 342 nCohInt = self.dataOut.nCohInt
339 343 else:
340 344 nCohInt = 1
341 345 self.selBlocksize = int(self.dataOut.nProfiles * round(self.selBlocktime / (
342 346 nCohInt * self.dataOut.ippSeconds * self.dataOut.nProfiles)))
343 347 self.dataOut.data = self.datablock[:,
344 348 self.profileIndex:self.profileIndex + self.selBlocksize, :]
345 349 datasize = self.dataOut.data.shape[1]
346 350 if datasize < self.selBlocksize:
347 351 buffer = numpy.zeros(
348 352 (self.dataOut.data.shape[0], self.selBlocksize, self.dataOut.data.shape[2]), dtype='complex')
349 353 buffer[:, :datasize, :] = self.dataOut.data
350 354 self.dataOut.data = buffer
351 355 self.profileIndex = blockIndex
352 356
353 357 self.dataOut.flagDataAsBlock = True
354 358 self.flagIsNewBlock = 1
355 359 self.dataOut.realtime = self.online
356 360
357 361 return self.dataOut.data
358 362
359 363 def getData(self):
360 364 """
361 365 getData obtiene una unidad de datos del buffer de lectura, un perfil, y la copia al objeto self.dataOut
362 366 del tipo "Voltage" con todos los parametros asociados a este (metadata). cuando no hay datos
363 367 en el buffer de lectura es necesario hacer una nueva lectura de los bloques de datos usando
364 368 "readNextBlock"
365 369
366 370 Ademas incrementa el contador del buffer "self.profileIndex" en 1.
367 371
368 372 Return:
369 373
370 374 Si el flag self.getByBlock ha sido seteado el bloque completo es copiado a self.dataOut y el self.profileIndex
371 375 es igual al total de perfiles leidos desde el archivo.
372 376
373 377 Si self.getByBlock == False:
374 378
375 379 self.dataOut.data = buffer[:, thisProfile, :]
376 380
377 381 shape = [nChannels, nHeis]
378 382
379 383 Si self.getByBlock == True:
380 384
381 385 self.dataOut.data = buffer[:, :, :]
382 386
383 387 shape = [nChannels, nProfiles, nHeis]
384 388
385 389 Variables afectadas:
386 390 self.dataOut
387 391 self.profileIndex
388 392
389 393 Affected:
390 394 self.dataOut
391 395 self.profileIndex
392 396 self.flagDiscontinuousBlock
393 397 self.flagIsNewBlock
394 398 """
395 399
396 400 ##print("10--OKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKKK")
397 401 if self.flagNoMoreFiles:
398 402 self.dataOut.flagNoData = True
399 403 return 0
400 404 self.flagDiscontinuousBlock = 0
401 405 self.flagIsNewBlock = 0
402 406 if self.__hasNotDataInBuffer():
403 407 if not(self.readNextBlock()):
404 408 return 0
405 409
406 410 self.getFirstHeader()
407 411
408 412 self.reshapeData()
409 413 if self.datablock is None:
410 414 self.dataOut.flagNoData = True
411 415 return 0
412 416
413 417 if not self.getByBlock:
414 418
415 419 """
416 420 Return profile by profile
417 421
418 422 If nTxs > 1 then one profile is divided by nTxs and number of total
419 423 blocks is increased by nTxs (nProfiles *= nTxs)
420 424 """
421 425 self.dataOut.flagDataAsBlock = False
422 426 self.dataOut.data = self.datablock[:, self.profileIndex, :]
423 427 self.dataOut.profileIndex = self.profileIndex
424 428
425 429
426 430 self.profileIndex += 1
427 431
428 432 else:
429 433 """
430 434 Return a block
431 435 """
432 436 if self.selBlocksize == None:
433 437 self.selBlocksize = self.dataOut.nProfiles
434 438 if self.selBlocktime != None:
435 439 if self.dataOut.nCohInt is not None:
436 440 nCohInt = self.dataOut.nCohInt
437 441 else:
438 442 nCohInt = 1
439 443 self.selBlocksize = int(self.dataOut.nProfiles * round(self.selBlocktime / (
440 444 nCohInt * self.dataOut.ippSeconds * self.dataOut.nProfiles)))
441 445
442 446 self.dataOut.data = self.datablock[:,
443 447 self.profileIndex:self.profileIndex + self.selBlocksize, :]
444 448 self.profileIndex += self.selBlocksize
445 449 datasize = self.dataOut.data.shape[1]
446 450
447 451 if datasize < self.selBlocksize:
448 452 buffer = numpy.zeros(
449 453 (self.dataOut.data.shape[0], self.selBlocksize, self.dataOut.data.shape[2]), dtype='complex')
450 454 buffer[:, :datasize, :] = self.dataOut.data
451 455
452 456 while datasize < self.selBlocksize: # Not enough profiles to fill the block
453 457 if not(self.readNextBlock()):
454 458 return 0
455 459 self.getFirstHeader()
456 460 self.reshapeData()
457 461 if self.datablock is None:
458 462 self.dataOut.flagNoData = True
459 463 return 0
460 464 # stack data
461 465 blockIndex = self.selBlocksize - datasize
462 466 datablock1 = self.datablock[:, :blockIndex, :]
463 467
464 468 buffer[:, datasize:datasize +
465 469 datablock1.shape[1], :] = datablock1
466 470 datasize += datablock1.shape[1]
467 471
468 472 self.dataOut.data = buffer
469 473 self.profileIndex = blockIndex
470 474
471 475 self.dataOut.flagDataAsBlock = True
472 476 self.dataOut.nProfiles = self.dataOut.data.shape[1]
473 477
474 478 #######################DP#######################
475 479 self.dataOut.CurrentBlock=self.nReadBlocks
476 480 self.dataOut.LastBlock=self.processingHeaderObj.dataBlocksPerFile
477 481 #######################DP#######################
478 482 self.dataOut.flagNoData = False
479 483
480 484 #self.getBasicHeader()
481 485
482 486 self.dataOut.realtime = self.online
483 487
484 488 return self.dataOut.data
485 489
486 490
487 491 @MPDecorator
488 492 class VoltageWriter(JRODataWriter, Operation):
489 493 """
490 494 Esta clase permite escribir datos de voltajes a archivos procesados (.r). La escritura
491 495 de los datos siempre se realiza por bloques.
492 496 """
493 497
494 498 ext = ".r"
495 499
496 500 optchar = "D"
497 501
498 502 shapeBuffer = None
499 503
500 504 def __init__(self): # , **kwargs):
501 505 """
502 506 Inicializador de la clase VoltageWriter para la escritura de datos de espectros.
503 507
504 508 Affected:
505 509 self.dataOut
506 510
507 511 Return: None
508 512 """
509 513 Operation.__init__(self) # , **kwargs)
510 514
511 515 self.nTotalBlocks = 0
512 516
513 517 self.profileIndex = 0
514 518
515 519 self.isConfig = False
516 520
517 521 self.fp = None
518 522
519 523 self.flagIsNewFile = 1
520 524
521 525 self.blockIndex = 0
522 526
523 527 self.flagIsNewBlock = 0
524 528
525 529 self.setFile = None
526 530
527 531 self.dtype = None
528 532
529 533 self.path = None
530 534
531 535 self.filename = None
532 536
533 537 self.basicHeaderObj = BasicHeader(LOCALTIME)
534 538
535 539 self.systemHeaderObj = SystemHeader()
536 540
537 541 self.radarControllerHeaderObj = RadarControllerHeader()
538 542
539 543 self.processingHeaderObj = ProcessingHeader()
540 544
541 545 def hasAllDataInBuffer(self):
542 546 if self.profileIndex >= self.processingHeaderObj.profilesPerBlock:
543 547 return 1
544 548 return 0
545 549
546 550 def setBlockDimension(self):
547 551 """
548 552 Obtiene las formas dimensionales del los subbloques de datos que componen un bloque
549 553
550 554 Affected:
551 555 self.shape_spc_Buffer
552 556 self.shape_cspc_Buffer
553 557 self.shape_dc_Buffer
554 558
555 559 Return: None
556 560 """
557 561 self.shapeBuffer = (self.processingHeaderObj.profilesPerBlock,
558 562 self.processingHeaderObj.nHeights,
559 563 self.systemHeaderObj.nChannels)
560 564
561 565 self.datablock = numpy.zeros((self.systemHeaderObj.nChannels,
562 566 self.processingHeaderObj.profilesPerBlock,
563 567 self.processingHeaderObj.nHeights),
564 568 dtype=numpy.dtype('complex64'))
565 569
566 570 def writeBlock(self):
567 571 """
568 572 Escribe el buffer en el file designado
569 573
570 574 Affected:
571 575 self.profileIndex
572 576 self.flagIsNewFile
573 577 self.flagIsNewBlock
574 578 self.nTotalBlocks
575 579 self.blockIndex
576 580
577 581 Return: None
578 582 """
579 583 data = numpy.zeros(self.shapeBuffer, self.dtype)
580 584
581 585 junk = numpy.transpose(self.datablock, (1, 2, 0))
582 586
583 587 data['real'] = junk.real
584 588 data['imag'] = junk.imag
585 589
586 590 data = data.reshape((-1))
587 591
588 592 data.tofile(self.fp)
589 593
590 594 self.datablock.fill(0)
591 595
592 596 self.profileIndex = 0
593 597 self.flagIsNewFile = 0
594 598 self.flagIsNewBlock = 1
595 599
596 600 self.blockIndex += 1
597 601 self.nTotalBlocks += 1
598 602
599 603 # print "[Writing] Block = %04d" %self.blockIndex
600 604
601 605 def putData(self):
602 606 """
603 607 Setea un bloque de datos y luego los escribe en un file
604 608
605 609 Affected:
606 610 self.flagIsNewBlock
607 611 self.profileIndex
608 612
609 613 Return:
610 614 0 : Si no hay data o no hay mas files que puedan escribirse
611 615 1 : Si se escribio la data de un bloque en un file
612 616 """
613 617 if self.dataOut.flagNoData:
614 618 return 0
615 619
616 620 self.flagIsNewBlock = 0
617 621
618 622 if self.dataOut.flagDiscontinuousBlock:
619 623 self.datablock.fill(0)
620 624 self.profileIndex = 0
621 625 self.setNextFile()
622 626
623 627 if self.profileIndex == 0:
624 628 self.setBasicHeader()
625 629
626 self.datablock[:, self.profileIndex, :] = self.dataOut.data
627
628 self.profileIndex += 1
629
630 if not self.dataOut.flagDataAsBlock:
631 self.datablock[:, self.profileIndex, :] = self.dataOut.data
632 self.profileIndex += 1
633 else:
634 self.datablock[:, :, :] = self.dataOut.data
635 self.profileIndex = self.processingHeaderObj.profilesPerBlock
636
630 637 if self.hasAllDataInBuffer():
631 638 # if self.flagIsNewFile:
632 639 self.writeNextBlock()
633 640 # self.setFirstHeader()
634 641
635 642 return 1
636 643
637 644 def __getBlockSize(self):
638 645 '''
639 646 Este metodos determina el cantidad de bytes para un bloque de datos de tipo Voltage
640 647 '''
641 648
642 649 dtype_width = self.getDtypeWidth()
643 650
644 651 blocksize = int(self.dataOut.nHeights * self.dataOut.nChannels *
645 652 self.profilesPerBlock * dtype_width * 2)
646 653
647 654 return blocksize
648 655
649 656 def setFirstHeader(self):
650 657 """
651 658 Obtiene una copia del First Header
652 659
653 660 Affected:
654 661 self.systemHeaderObj
655 662 self.radarControllerHeaderObj
656 663 self.dtype
657 664
658 665 Return:
659 666 None
660 667 """
661 668
662 669 self.systemHeaderObj = self.dataOut.systemHeaderObj.copy()
663 670 self.systemHeaderObj.nChannels = self.dataOut.nChannels
664 671 self.radarControllerHeaderObj = self.dataOut.radarControllerHeaderObj.copy()
665 672
666 673 self.processingHeaderObj.dtype = 0 # Voltage
667 674 self.processingHeaderObj.blockSize = self.__getBlockSize()
668 675 self.processingHeaderObj.profilesPerBlock = self.profilesPerBlock
669 676 self.processingHeaderObj.dataBlocksPerFile = self.blocksPerFile
670 677 # podria ser 1 o self.dataOut.processingHeaderObj.nWindows
671 678 self.processingHeaderObj.nWindows = 1
672 679 self.processingHeaderObj.nCohInt = self.dataOut.nCohInt
673 680 # Cuando la data de origen es de tipo Voltage
674 681 self.processingHeaderObj.nIncohInt = 1
675 682 # Cuando la data de origen es de tipo Voltage
676 683 self.processingHeaderObj.totalSpectra = 0
677 684
678 685 if self.dataOut.code is not None:
679 686 self.processingHeaderObj.code = self.dataOut.code
680 687 self.processingHeaderObj.nCode = self.dataOut.nCode
681 688 self.processingHeaderObj.nBaud = self.dataOut.nBaud
682 689
683 690 if self.processingHeaderObj.nWindows != 0:
684 691 self.processingHeaderObj.firstHeight = self.dataOut.heightList[0]
685 692 self.processingHeaderObj.deltaHeight = self.dataOut.heightList[1] - \
686 693 self.dataOut.heightList[0]
687 694 self.processingHeaderObj.nHeights = self.dataOut.nHeights
688 695 self.processingHeaderObj.samplesWin = self.dataOut.nHeights
689 696
690 697 self.processingHeaderObj.processFlags = self.getProcessFlags()
691 698
692 699 self.setBasicHeader()
General Comments 0
You need to be logged in to leave comments. Login now