##// END OF EJS Templates
Review decorator logic for ending process
Juan C. Espinoza -
r1193:c3967e412107
parent child
Show More
@@ -1,1257 +1,1256
1 1 '''
2 2 Updated on January , 2018, for multiprocessing purposes
3 3 Author: Sergio Cortez
4 4 Created on September , 2012
5 5 '''
6 6 from platform import python_version
7 7 import sys
8 8 import ast
9 9 import datetime
10 10 import traceback
11 11 import math
12 12 import time
13 13 import zmq
14 14 from multiprocessing import Process, cpu_count
15 15 from threading import Thread
16 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 17 from xml.dom import minidom
18 18
19 19
20 20 from schainpy.admin import Alarm, SchainWarning
21 21 from schainpy.model import *
22 22 from schainpy.utils import log
23 23
24 24
25 25 DTYPES = {
26 26 'Voltage': '.r',
27 27 'Spectra': '.pdata'
28 28 }
29 29
30 30
31 31 def MPProject(project, n=cpu_count()):
32 32 '''
33 33 Project wrapper to run schain in n processes
34 34 '''
35 35
36 36 rconf = project.getReadUnitObj()
37 37 op = rconf.getOperationObj('run')
38 38 dt1 = op.getParameterValue('startDate')
39 39 dt2 = op.getParameterValue('endDate')
40 40 tm1 = op.getParameterValue('startTime')
41 41 tm2 = op.getParameterValue('endTime')
42 42 days = (dt2 - dt1).days
43 43
44 44 for day in range(days + 1):
45 45 skip = 0
46 46 cursor = 0
47 47 processes = []
48 48 dt = dt1 + datetime.timedelta(day)
49 49 dt_str = dt.strftime('%Y/%m/%d')
50 50 reader = JRODataReader()
51 51 paths, files = reader.searchFilesOffLine(path=rconf.path,
52 52 startDate=dt,
53 53 endDate=dt,
54 54 startTime=tm1,
55 55 endTime=tm2,
56 56 ext=DTYPES[rconf.datatype])
57 57 nFiles = len(files)
58 58 if nFiles == 0:
59 59 continue
60 60 skip = int(math.ceil(nFiles / n))
61 61 while nFiles > cursor * skip:
62 62 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
63 63 skip=skip)
64 64 p = project.clone()
65 65 p.start()
66 66 processes.append(p)
67 67 cursor += 1
68 68
69 69 def beforeExit(exctype, value, trace):
70 70 for process in processes:
71 71 process.terminate()
72 72 process.join()
73 73 print(traceback.print_tb(trace))
74 74
75 75 sys.excepthook = beforeExit
76 76
77 77 for process in processes:
78 78 process.join()
79 79 process.terminate()
80 80
81 81 time.sleep(3)
82 82
83 83 def wait(context):
84 84
85 85 time.sleep(1)
86 86 c = zmq.Context()
87 87 receiver = c.socket(zmq.SUB)
88 88 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
89 89 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
90 90 msg = receiver.recv_multipart()[1]
91 91 context.terminate()
92 92
93 93 class ParameterConf():
94 94
95 95 id = None
96 96 name = None
97 97 value = None
98 98 format = None
99 99
100 100 __formated_value = None
101 101
102 102 ELEMENTNAME = 'Parameter'
103 103
104 104 def __init__(self):
105 105
106 106 self.format = 'str'
107 107
108 108 def getElementName(self):
109 109
110 110 return self.ELEMENTNAME
111 111
112 112 def getValue(self):
113 113
114 114 value = self.value
115 115 format = self.format
116 116
117 117 if self.__formated_value != None:
118 118
119 119 return self.__formated_value
120 120
121 121 if format == 'obj':
122 122 return value
123 123
124 124 if format == 'str':
125 125 self.__formated_value = str(value)
126 126 return self.__formated_value
127 127
128 128 if value == '':
129 129 raise ValueError('%s: This parameter value is empty' % self.name)
130 130
131 131 if format == 'list':
132 strList = value.split(',')
133
132 strList = [s.strip() for s in value.split(',')]
134 133 self.__formated_value = strList
135 134
136 135 return self.__formated_value
137 136
138 137 if format == 'intlist':
139 138 '''
140 139 Example:
141 140 value = (0,1,2)
142 141 '''
143 142
144 143 new_value = ast.literal_eval(value)
145 144
146 145 if type(new_value) not in (tuple, list):
147 146 new_value = [int(new_value)]
148 147
149 148 self.__formated_value = new_value
150 149
151 150 return self.__formated_value
152 151
153 152 if format == 'floatlist':
154 153 '''
155 154 Example:
156 155 value = (0.5, 1.4, 2.7)
157 156 '''
158 157
159 158 new_value = ast.literal_eval(value)
160 159
161 160 if type(new_value) not in (tuple, list):
162 161 new_value = [float(new_value)]
163 162
164 163 self.__formated_value = new_value
165 164
166 165 return self.__formated_value
167 166
168 167 if format == 'date':
169 168 strList = value.split('/')
170 169 intList = [int(x) for x in strList]
171 170 date = datetime.date(intList[0], intList[1], intList[2])
172 171
173 172 self.__formated_value = date
174 173
175 174 return self.__formated_value
176 175
177 176 if format == 'time':
178 177 strList = value.split(':')
179 178 intList = [int(x) for x in strList]
180 179 time = datetime.time(intList[0], intList[1], intList[2])
181 180
182 181 self.__formated_value = time
183 182
184 183 return self.__formated_value
185 184
186 185 if format == 'pairslist':
187 186 '''
188 187 Example:
189 188 value = (0,1),(1,2)
190 189 '''
191 190
192 191 new_value = ast.literal_eval(value)
193 192
194 193 if type(new_value) not in (tuple, list):
195 194 raise ValueError('%s has to be a tuple or list of pairs' % value)
196 195
197 196 if type(new_value[0]) not in (tuple, list):
198 197 if len(new_value) != 2:
199 198 raise ValueError('%s has to be a tuple or list of pairs' % value)
200 199 new_value = [new_value]
201 200
202 201 for thisPair in new_value:
203 202 if len(thisPair) != 2:
204 203 raise ValueError('%s has to be a tuple or list of pairs' % value)
205 204
206 205 self.__formated_value = new_value
207 206
208 207 return self.__formated_value
209 208
210 209 if format == 'multilist':
211 210 '''
212 211 Example:
213 212 value = (0,1,2),(3,4,5)
214 213 '''
215 214 multiList = ast.literal_eval(value)
216 215
217 216 if type(multiList[0]) == int:
218 217 multiList = ast.literal_eval('(' + value + ')')
219 218
220 219 self.__formated_value = multiList
221 220
222 221 return self.__formated_value
223 222
224 223 if format == 'bool':
225 224 value = int(value)
226 225
227 226 if format == 'int':
228 227 value = float(value)
229 228
230 229 format_func = eval(format)
231 230
232 231 self.__formated_value = format_func(value)
233 232
234 233 return self.__formated_value
235 234
236 235 def updateId(self, new_id):
237 236
238 237 self.id = str(new_id)
239 238
240 239 def setup(self, id, name, value, format='str'):
241 240 self.id = str(id)
242 241 self.name = name
243 242 if format == 'obj':
244 243 self.value = value
245 244 else:
246 245 self.value = str(value)
247 246 self.format = str.lower(format)
248 247
249 248 self.getValue()
250 249
251 250 return 1
252 251
253 252 def update(self, name, value, format='str'):
254 253
255 254 self.name = name
256 255 self.value = str(value)
257 256 self.format = format
258 257
259 258 def makeXml(self, opElement):
260 259 if self.name not in ('queue',):
261 260 parmElement = SubElement(opElement, self.ELEMENTNAME)
262 261 parmElement.set('id', str(self.id))
263 262 parmElement.set('name', self.name)
264 263 parmElement.set('value', self.value)
265 264 parmElement.set('format', self.format)
266 265
267 266 def readXml(self, parmElement):
268 267
269 268 self.id = parmElement.get('id')
270 269 self.name = parmElement.get('name')
271 270 self.value = parmElement.get('value')
272 271 self.format = str.lower(parmElement.get('format'))
273 272
274 273 # Compatible with old signal chain version
275 274 if self.format == 'int' and self.name == 'idfigure':
276 275 self.name = 'id'
277 276
278 277 def printattr(self):
279 278
280 279 print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id))
281 280
282 281 class OperationConf():
283 282
284 283 ELEMENTNAME = 'Operation'
285 284
286 285 def __init__(self):
287 286
288 287 self.id = '0'
289 288 self.name = None
290 289 self.priority = None
291 290 self.topic = None
292 291
293 292 def __getNewId(self):
294 293
295 294 return int(self.id) * 10 + len(self.parmConfObjList) + 1
296 295
297 296 def getId(self):
298 297 return self.id
299 298
300 299 def updateId(self, new_id):
301 300
302 301 self.id = str(new_id)
303 302
304 303 n = 1
305 304 for parmObj in self.parmConfObjList:
306 305
307 306 idParm = str(int(new_id) * 10 + n)
308 307 parmObj.updateId(idParm)
309 308
310 309 n += 1
311 310
312 311 def getElementName(self):
313 312
314 313 return self.ELEMENTNAME
315 314
316 315 def getParameterObjList(self):
317 316
318 317 return self.parmConfObjList
319 318
320 319 def getParameterObj(self, parameterName):
321 320
322 321 for parmConfObj in self.parmConfObjList:
323 322
324 323 if parmConfObj.name != parameterName:
325 324 continue
326 325
327 326 return parmConfObj
328 327
329 328 return None
330 329
331 330 def getParameterObjfromValue(self, parameterValue):
332 331
333 332 for parmConfObj in self.parmConfObjList:
334 333
335 334 if parmConfObj.getValue() != parameterValue:
336 335 continue
337 336
338 337 return parmConfObj.getValue()
339 338
340 339 return None
341 340
342 341 def getParameterValue(self, parameterName):
343 342
344 343 parameterObj = self.getParameterObj(parameterName)
345 344
346 345 # if not parameterObj:
347 346 # return None
348 347
349 348 value = parameterObj.getValue()
350 349
351 350 return value
352 351
353 352 def getKwargs(self):
354 353
355 354 kwargs = {}
356 355
357 356 for parmConfObj in self.parmConfObjList:
358 357 if self.name == 'run' and parmConfObj.name == 'datatype':
359 358 continue
360 359
361 360 kwargs[parmConfObj.name] = parmConfObj.getValue()
362 361
363 362 return kwargs
364 363
365 364 def setup(self, id, name, priority, type, project_id):
366 365
367 366 self.id = str(id)
368 367 self.project_id = project_id
369 368 self.name = name
370 369 self.type = type
371 370 self.priority = priority
372 371 self.parmConfObjList = []
373 372
374 373 def removeParameters(self):
375 374
376 375 for obj in self.parmConfObjList:
377 376 del obj
378 377
379 378 self.parmConfObjList = []
380 379
381 380 def addParameter(self, name, value, format='str'):
382 381
383 382 if value is None:
384 383 return None
385 384 id = self.__getNewId()
386 385
387 386 parmConfObj = ParameterConf()
388 387 if not parmConfObj.setup(id, name, value, format):
389 388 return None
390 389
391 390 self.parmConfObjList.append(parmConfObj)
392 391
393 392 return parmConfObj
394 393
395 394 def changeParameter(self, name, value, format='str'):
396 395
397 396 parmConfObj = self.getParameterObj(name)
398 397 parmConfObj.update(name, value, format)
399 398
400 399 return parmConfObj
401 400
402 401 def makeXml(self, procUnitElement):
403 402
404 403 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
405 404 opElement.set('id', str(self.id))
406 405 opElement.set('name', self.name)
407 406 opElement.set('type', self.type)
408 407 opElement.set('priority', str(self.priority))
409 408
410 409 for parmConfObj in self.parmConfObjList:
411 410 parmConfObj.makeXml(opElement)
412 411
413 412 def readXml(self, opElement, project_id):
414 413
415 414 self.id = opElement.get('id')
416 415 self.name = opElement.get('name')
417 416 self.type = opElement.get('type')
418 417 self.priority = opElement.get('priority')
419 418 self.project_id = str(project_id)
420 419
421 420 # Compatible with old signal chain version
422 421 # Use of 'run' method instead 'init'
423 422 if self.type == 'self' and self.name == 'init':
424 423 self.name = 'run'
425 424
426 425 self.parmConfObjList = []
427 426
428 427 parmElementList = opElement.iter(ParameterConf().getElementName())
429 428
430 429 for parmElement in parmElementList:
431 430 parmConfObj = ParameterConf()
432 431 parmConfObj.readXml(parmElement)
433 432
434 433 # Compatible with old signal chain version
435 434 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
436 435 if self.type != 'self' and self.name == 'Plot':
437 436 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
438 437 self.name = parmConfObj.value
439 438 continue
440 439
441 440 self.parmConfObjList.append(parmConfObj)
442 441
443 442 def printattr(self):
444 443
445 444 print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME,
446 445 self.id,
447 446 self.name,
448 447 self.type,
449 448 self.priority,
450 449 self.project_id))
451 450
452 451 for parmConfObj in self.parmConfObjList:
453 452 parmConfObj.printattr()
454 453
455 454 def createObject(self):
456 455
457 456 className = eval(self.name)
458 457
459 458 if self.type == 'other':
460 459 opObj = className()
461 460 elif self.type == 'external':
462 461 kwargs = self.getKwargs()
463 462 opObj = className(self.id, self.project_id, **kwargs)
464 463 opObj.start()
465 464
466 465 return opObj
467 466
468 467 class ProcUnitConf():
469 468
470 469 ELEMENTNAME = 'ProcUnit'
471 470
472 471 def __init__(self):
473 472
474 473 self.id = None
475 474 self.datatype = None
476 475 self.name = None
477 476 self.inputId = None
478 477 self.opConfObjList = []
479 478 self.procUnitObj = None
480 479 self.opObjDict = {}
481 480
482 481 def __getPriority(self):
483 482
484 483 return len(self.opConfObjList) + 1
485 484
486 485 def __getNewId(self):
487 486
488 487 return int(self.id) * 10 + len(self.opConfObjList) + 1
489 488
490 489 def getElementName(self):
491 490
492 491 return self.ELEMENTNAME
493 492
494 493 def getId(self):
495 494
496 495 return self.id
497 496
498 497 def updateId(self, new_id):
499 498 '''
500 499 new_id = int(parentId) * 10 + (int(self.id) % 10)
501 500 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
502 501
503 502 # If this proc unit has not inputs
504 503 #if self.inputId == '0':
505 504 #new_inputId = 0
506 505
507 506 n = 1
508 507 for opConfObj in self.opConfObjList:
509 508
510 509 idOp = str(int(new_id) * 10 + n)
511 510 opConfObj.updateId(idOp)
512 511
513 512 n += 1
514 513
515 514 self.parentId = str(parentId)
516 515 self.id = str(new_id)
517 516 #self.inputId = str(new_inputId)
518 517 '''
519 518 n = 1
520 519
521 520 def getInputId(self):
522 521
523 522 return self.inputId
524 523
525 524 def getOperationObjList(self):
526 525
527 526 return self.opConfObjList
528 527
529 528 def getOperationObj(self, name=None):
530 529
531 530 for opConfObj in self.opConfObjList:
532 531
533 532 if opConfObj.name != name:
534 533 continue
535 534
536 535 return opConfObj
537 536
538 537 return None
539 538
540 539 def getOpObjfromParamValue(self, value=None):
541 540
542 541 for opConfObj in self.opConfObjList:
543 542 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
544 543 continue
545 544 return opConfObj
546 545 return None
547 546
548 547 def getProcUnitObj(self):
549 548
550 549 return self.procUnitObj
551 550
552 551 def setup(self, project_id, id, name, datatype, inputId):
553 552 '''
554 553 id sera el topico a publicar
555 554 inputId sera el topico a subscribirse
556 555 '''
557 556
558 557 # Compatible with old signal chain version
559 558 if datatype == None and name == None:
560 559 raise ValueError('datatype or name should be defined')
561 560
562 561 #Definir una condicion para inputId cuando sea 0
563 562
564 563 if name == None:
565 564 if 'Proc' in datatype:
566 565 name = datatype
567 566 else:
568 567 name = '%sProc' % (datatype)
569 568
570 569 if datatype == None:
571 570 datatype = name.replace('Proc', '')
572 571
573 572 self.id = str(id)
574 573 self.project_id = project_id
575 574 self.name = name
576 575 self.datatype = datatype
577 576 self.inputId = inputId
578 577 self.opConfObjList = []
579 578
580 579 self.addOperation(name='run', optype='self')
581 580
582 581 def removeOperations(self):
583 582
584 583 for obj in self.opConfObjList:
585 584 del obj
586 585
587 586 self.opConfObjList = []
588 587 self.addOperation(name='run')
589 588
590 589 def addParameter(self, **kwargs):
591 590 '''
592 591 Add parameters to 'run' operation
593 592 '''
594 593 opObj = self.opConfObjList[0]
595 594
596 595 opObj.addParameter(**kwargs)
597 596
598 597 return opObj
599 598
600 599 def addOperation(self, name, optype='self'):
601 600 '''
602 601 Actualizacion - > proceso comunicacion
603 602 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
604 603 definir el tipoc de socket o comunicacion ipc++
605 604
606 605 '''
607 606
608 607 id = self.__getNewId()
609 608 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
610 609 opConfObj = OperationConf()
611 610 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id)
612 611 self.opConfObjList.append(opConfObj)
613 612
614 613 return opConfObj
615 614
616 615 def makeXml(self, projectElement):
617 616
618 617 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
619 618 procUnitElement.set('id', str(self.id))
620 619 procUnitElement.set('name', self.name)
621 620 procUnitElement.set('datatype', self.datatype)
622 621 procUnitElement.set('inputId', str(self.inputId))
623 622
624 623 for opConfObj in self.opConfObjList:
625 624 opConfObj.makeXml(procUnitElement)
626 625
627 626 def readXml(self, upElement, project_id):
628 627
629 628 self.id = upElement.get('id')
630 629 self.name = upElement.get('name')
631 630 self.datatype = upElement.get('datatype')
632 631 self.inputId = upElement.get('inputId')
633 632 self.project_id = str(project_id)
634 633
635 634 if self.ELEMENTNAME == 'ReadUnit':
636 635 self.datatype = self.datatype.replace('Reader', '')
637 636
638 637 if self.ELEMENTNAME == 'ProcUnit':
639 638 self.datatype = self.datatype.replace('Proc', '')
640 639
641 640 if self.inputId == 'None':
642 641 self.inputId = '0'
643 642
644 643 self.opConfObjList = []
645 644
646 645 opElementList = upElement.iter(OperationConf().getElementName())
647 646
648 647 for opElement in opElementList:
649 648 opConfObj = OperationConf()
650 649 opConfObj.readXml(opElement, project_id)
651 650 self.opConfObjList.append(opConfObj)
652 651
653 652 def printattr(self):
654 653
655 654 print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME,
656 655 self.id,
657 656 self.name,
658 657 self.datatype,
659 658 self.inputId,
660 659 self.project_id))
661 660
662 661 for opConfObj in self.opConfObjList:
663 662 opConfObj.printattr()
664 663
665 664 def getKwargs(self):
666 665
667 666 opObj = self.opConfObjList[0]
668 667 kwargs = opObj.getKwargs()
669 668
670 669 return kwargs
671 670
672 671 def createObjects(self):
673 672 '''
674 673 Instancia de unidades de procesamiento.
675 674 '''
676 675
677 676 className = eval(self.name)
678 677 kwargs = self.getKwargs()
679 678 procUnitObj = className(self.id, self.inputId, self.project_id, **kwargs) # necesitan saber su id y su entrada por fines de ipc
680 679 log.success('creating process...', self.name)
681 680
682 681 for opConfObj in self.opConfObjList:
683 682
684 683 if opConfObj.type == 'self' and opConfObj.name == 'run':
685 684 continue
686 685 elif opConfObj.type == 'self':
687 686 opObj = getattr(procUnitObj, opConfObj.name)
688 687 else:
689 688 opObj = opConfObj.createObject()
690 689
691 690 log.success('creating operation: {}, type:{}'.format(
692 691 opConfObj.name,
693 692 opConfObj.type), self.name)
694 693
695 694 procUnitObj.addOperation(opConfObj, opObj)
696 695
697 696 procUnitObj.start()
698 697 self.procUnitObj = procUnitObj
699 698
700 699 def close(self):
701 700
702 701 for opConfObj in self.opConfObjList:
703 702 if opConfObj.type == 'self':
704 703 continue
705 704
706 705 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
707 706 opObj.close()
708 707
709 708 self.procUnitObj.close()
710 709
711 710 return
712 711
713 712
714 713 class ReadUnitConf(ProcUnitConf):
715 714
716 715 ELEMENTNAME = 'ReadUnit'
717 716
718 717 def __init__(self):
719 718
720 719 self.id = None
721 720 self.datatype = None
722 721 self.name = None
723 722 self.inputId = None
724 723 self.opConfObjList = []
725 724
726 725 def getElementName(self):
727 726
728 727 return self.ELEMENTNAME
729 728
730 729 def setup(self, project_id, id, name, datatype, path='', startDate='', endDate='',
731 730 startTime='', endTime='', server=None, **kwargs):
732 731
733 732
734 733 '''
735 734 *****el id del proceso sera el Topico
736 735
737 736 Adicion de {topic}, si no esta presente -> error
738 737 kwargs deben ser trasmitidos en la instanciacion
739 738
740 739 '''
741 740
742 741 # Compatible with old signal chain version
743 742 if datatype == None and name == None:
744 743 raise ValueError('datatype or name should be defined')
745 744 if name == None:
746 745 if 'Reader' in datatype:
747 746 name = datatype
748 747 datatype = name.replace('Reader','')
749 748 else:
750 749 name = '{}Reader'.format(datatype)
751 750 if datatype == None:
752 751 if 'Reader' in name:
753 752 datatype = name.replace('Reader','')
754 753 else:
755 754 datatype = name
756 755 name = '{}Reader'.format(name)
757 756
758 757 self.id = id
759 758 self.project_id = project_id
760 759 self.name = name
761 760 self.datatype = datatype
762 761 if path != '':
763 762 self.path = os.path.abspath(path)
764 763 self.startDate = startDate
765 764 self.endDate = endDate
766 765 self.startTime = startTime
767 766 self.endTime = endTime
768 767 self.server = server
769 768 self.addRunOperation(**kwargs)
770 769
771 770 def update(self, **kwargs):
772 771
773 772 if 'datatype' in kwargs:
774 773 datatype = kwargs.pop('datatype')
775 774 if 'Reader' in datatype:
776 775 self.name = datatype
777 776 else:
778 777 self.name = '%sReader' % (datatype)
779 778 self.datatype = self.name.replace('Reader', '')
780 779
781 780 attrs = ('path', 'startDate', 'endDate',
782 781 'startTime', 'endTime')
783 782
784 783 for attr in attrs:
785 784 if attr in kwargs:
786 785 setattr(self, attr, kwargs.pop(attr))
787 786
788 787 self.updateRunOperation(**kwargs)
789 788
790 789 def removeOperations(self):
791 790
792 791 for obj in self.opConfObjList:
793 792 del obj
794 793
795 794 self.opConfObjList = []
796 795
797 796 def addRunOperation(self, **kwargs):
798 797
799 798 opObj = self.addOperation(name='run', optype='self')
800 799
801 800 if self.server is None:
802 801 opObj.addParameter(
803 802 name='datatype', value=self.datatype, format='str')
804 803 opObj.addParameter(name='path', value=self.path, format='str')
805 804 opObj.addParameter(
806 805 name='startDate', value=self.startDate, format='date')
807 806 opObj.addParameter(
808 807 name='endDate', value=self.endDate, format='date')
809 808 opObj.addParameter(
810 809 name='startTime', value=self.startTime, format='time')
811 810 opObj.addParameter(
812 811 name='endTime', value=self.endTime, format='time')
813 812
814 813 for key, value in list(kwargs.items()):
815 814 opObj.addParameter(name=key, value=value,
816 815 format=type(value).__name__)
817 816 else:
818 817 opObj.addParameter(name='server', value=self.server, format='str')
819 818
820 819 return opObj
821 820
822 821 def updateRunOperation(self, **kwargs):
823 822
824 823 opObj = self.getOperationObj(name='run')
825 824 opObj.removeParameters()
826 825
827 826 opObj.addParameter(name='datatype', value=self.datatype, format='str')
828 827 opObj.addParameter(name='path', value=self.path, format='str')
829 828 opObj.addParameter(
830 829 name='startDate', value=self.startDate, format='date')
831 830 opObj.addParameter(name='endDate', value=self.endDate, format='date')
832 831 opObj.addParameter(
833 832 name='startTime', value=self.startTime, format='time')
834 833 opObj.addParameter(name='endTime', value=self.endTime, format='time')
835 834
836 835 for key, value in list(kwargs.items()):
837 836 opObj.addParameter(name=key, value=value,
838 837 format=type(value).__name__)
839 838
840 839 return opObj
841 840
842 841 def readXml(self, upElement, project_id):
843 842
844 843 self.id = upElement.get('id')
845 844 self.name = upElement.get('name')
846 845 self.datatype = upElement.get('datatype')
847 846 self.project_id = str(project_id) #yong
848 847
849 848 if self.ELEMENTNAME == 'ReadUnit':
850 849 self.datatype = self.datatype.replace('Reader', '')
851 850
852 851 self.opConfObjList = []
853 852
854 853 opElementList = upElement.iter(OperationConf().getElementName())
855 854
856 855 for opElement in opElementList:
857 856 opConfObj = OperationConf()
858 857 opConfObj.readXml(opElement, project_id)
859 858 self.opConfObjList.append(opConfObj)
860 859
861 860 if opConfObj.name == 'run':
862 861 self.path = opConfObj.getParameterValue('path')
863 862 self.startDate = opConfObj.getParameterValue('startDate')
864 863 self.endDate = opConfObj.getParameterValue('endDate')
865 864 self.startTime = opConfObj.getParameterValue('startTime')
866 865 self.endTime = opConfObj.getParameterValue('endTime')
867 866
868 867
869 868 class Project(Process):
870 869
871 870 ELEMENTNAME = 'Project'
872 871
873 872 def __init__(self):
874 873
875 874 Process.__init__(self)
876 875 self.id = None
877 876 self.filename = None
878 877 self.description = None
879 878 self.email = None
880 879 self.alarm = None
881 880 self.procUnitConfObjDict = {}
882 881
883 882 def __getNewId(self):
884 883
885 884 idList = list(self.procUnitConfObjDict.keys())
886 885 id = int(self.id) * 10
887 886
888 887 while True:
889 888 id += 1
890 889
891 890 if str(id) in idList:
892 891 continue
893 892
894 893 break
895 894
896 895 return str(id)
897 896
898 897 def getElementName(self):
899 898
900 899 return self.ELEMENTNAME
901 900
902 901 def getId(self):
903 902
904 903 return self.id
905 904
906 905 def updateId(self, new_id):
907 906
908 907 self.id = str(new_id)
909 908
910 909 keyList = list(self.procUnitConfObjDict.keys())
911 910 keyList.sort()
912 911
913 912 n = 1
914 913 newProcUnitConfObjDict = {}
915 914
916 915 for procKey in keyList:
917 916
918 917 procUnitConfObj = self.procUnitConfObjDict[procKey]
919 918 idProcUnit = str(int(self.id) * 10 + n)
920 919 procUnitConfObj.updateId(idProcUnit)
921 920 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
922 921 n += 1
923 922
924 923 self.procUnitConfObjDict = newProcUnitConfObjDict
925 924
926 925 def setup(self, id=1, name='', description='', email=None, alarm=[]):
927 926
928 927 print(' ')
929 928 print('*' * 60)
930 929 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
931 930 print('*' * 60)
932 931 print("* Python " + python_version() + " *")
933 932 print('*' * 19)
934 933 print(' ')
935 934 self.id = str(id)
936 935 self.description = description
937 936 self.email = email
938 937 self.alarm = alarm
939 938
940 939 def update(self, **kwargs):
941 940
942 941 for key, value in list(kwargs.items()):
943 942 setattr(self, key, value)
944 943
945 944 def clone(self):
946 945
947 946 p = Project()
948 947 p.procUnitConfObjDict = self.procUnitConfObjDict
949 948 return p
950 949
951 950 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
952 951
953 952 '''
954 953 Actualizacion:
955 954 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
956 955
957 956 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
958 957
959 958 '''
960 959
961 960 if id is None:
962 961 idReadUnit = self.__getNewId()
963 962 else:
964 963 idReadUnit = str(id)
965 964
966 965 readUnitConfObj = ReadUnitConf()
967 966 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, **kwargs)
968 967 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
969 968
970 969 return readUnitConfObj
971 970
972 971 def addProcUnit(self, inputId='0', datatype=None, name=None):
973 972
974 973 '''
975 974 Actualizacion:
976 975 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
977 976 Deberia reemplazar a "inputId"
978 977
979 978 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
980 979 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
981 980
982 981 '''
983 982
984 983 idProcUnit = self.__getNewId() #Topico para subscripcion
985 984 procUnitConfObj = ProcUnitConf()
986 985 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId) #topic_read, topic_write,
987 986 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
988 987
989 988 return procUnitConfObj
990 989
991 990 def removeProcUnit(self, id):
992 991
993 992 if id in list(self.procUnitConfObjDict.keys()):
994 993 self.procUnitConfObjDict.pop(id)
995 994
996 995 def getReadUnitId(self):
997 996
998 997 readUnitConfObj = self.getReadUnitObj()
999 998
1000 999 return readUnitConfObj.id
1001 1000
1002 1001 def getReadUnitObj(self):
1003 1002
1004 1003 for obj in list(self.procUnitConfObjDict.values()):
1005 1004 if obj.getElementName() == 'ReadUnit':
1006 1005 return obj
1007 1006
1008 1007 return None
1009 1008
1010 1009 def getProcUnitObj(self, id=None, name=None):
1011 1010
1012 1011 if id != None:
1013 1012 return self.procUnitConfObjDict[id]
1014 1013
1015 1014 if name != None:
1016 1015 return self.getProcUnitObjByName(name)
1017 1016
1018 1017 return None
1019 1018
1020 1019 def getProcUnitObjByName(self, name):
1021 1020
1022 1021 for obj in list(self.procUnitConfObjDict.values()):
1023 1022 if obj.name == name:
1024 1023 return obj
1025 1024
1026 1025 return None
1027 1026
1028 1027 def procUnitItems(self):
1029 1028
1030 1029 return list(self.procUnitConfObjDict.items())
1031 1030
1032 1031 def makeXml(self):
1033 1032
1034 1033 projectElement = Element('Project')
1035 1034 projectElement.set('id', str(self.id))
1036 1035 projectElement.set('name', self.name)
1037 1036 projectElement.set('description', self.description)
1038 1037
1039 1038 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1040 1039 procUnitConfObj.makeXml(projectElement)
1041 1040
1042 1041 self.projectElement = projectElement
1043 1042
1044 1043 def writeXml(self, filename=None):
1045 1044
1046 1045 if filename == None:
1047 1046 if self.filename:
1048 1047 filename = self.filename
1049 1048 else:
1050 1049 filename = 'schain.xml'
1051 1050
1052 1051 if not filename:
1053 1052 print('filename has not been defined. Use setFilename(filename) for do it.')
1054 1053 return 0
1055 1054
1056 1055 abs_file = os.path.abspath(filename)
1057 1056
1058 1057 if not os.access(os.path.dirname(abs_file), os.W_OK):
1059 1058 print('No write permission on %s' % os.path.dirname(abs_file))
1060 1059 return 0
1061 1060
1062 1061 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1063 1062 print('File %s already exists and it could not be overwriten' % abs_file)
1064 1063 return 0
1065 1064
1066 1065 self.makeXml()
1067 1066
1068 1067 ElementTree(self.projectElement).write(abs_file, method='xml')
1069 1068
1070 1069 self.filename = abs_file
1071 1070
1072 1071 return 1
1073 1072
1074 1073 def readXml(self, filename=None):
1075 1074
1076 1075 if not filename:
1077 1076 print('filename is not defined')
1078 1077 return 0
1079 1078
1080 1079 abs_file = os.path.abspath(filename)
1081 1080
1082 1081 if not os.path.isfile(abs_file):
1083 1082 print('%s file does not exist' % abs_file)
1084 1083 return 0
1085 1084
1086 1085 self.projectElement = None
1087 1086 self.procUnitConfObjDict = {}
1088 1087
1089 1088 try:
1090 1089 self.projectElement = ElementTree().parse(abs_file)
1091 1090 except:
1092 1091 print('Error reading %s, verify file format' % filename)
1093 1092 return 0
1094 1093
1095 1094 self.project = self.projectElement.tag
1096 1095
1097 1096 self.id = self.projectElement.get('id')
1098 1097 self.name = self.projectElement.get('name')
1099 1098 self.description = self.projectElement.get('description')
1100 1099
1101 1100 readUnitElementList = self.projectElement.iter(
1102 1101 ReadUnitConf().getElementName())
1103 1102
1104 1103 for readUnitElement in readUnitElementList:
1105 1104 readUnitConfObj = ReadUnitConf()
1106 1105 readUnitConfObj.readXml(readUnitElement, self.id)
1107 1106 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1108 1107
1109 1108 procUnitElementList = self.projectElement.iter(
1110 1109 ProcUnitConf().getElementName())
1111 1110
1112 1111 for procUnitElement in procUnitElementList:
1113 1112 procUnitConfObj = ProcUnitConf()
1114 1113 procUnitConfObj.readXml(procUnitElement, self.id)
1115 1114 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1116 1115
1117 1116 self.filename = abs_file
1118 1117
1119 1118 return 1
1120 1119
1121 1120 def __str__(self):
1122 1121
1123 1122 print('Project[%s]: name = %s, description = %s, project_id = %s' % (self.id,
1124 1123 self.name,
1125 1124 self.description,
1126 1125 self.project_id))
1127 1126
1128 1127 for procUnitConfObj in self.procUnitConfObjDict.values():
1129 1128 print(procUnitConfObj)
1130 1129
1131 1130 def createObjects(self):
1132 1131
1133 1132
1134 1133 keys = list(self.procUnitConfObjDict.keys())
1135 1134 keys.sort()
1136 1135 for key in keys:
1137 1136 self.procUnitConfObjDict[key].createObjects()
1138 1137
1139 1138 def __handleError(self, procUnitConfObj, modes=None, stdout=True):
1140 1139
1141 1140 import socket
1142 1141
1143 1142 if modes is None:
1144 1143 modes = self.alarm
1145 1144
1146 1145 if not self.alarm:
1147 1146 modes = []
1148 1147
1149 1148 err = traceback.format_exception(sys.exc_info()[0],
1150 1149 sys.exc_info()[1],
1151 1150 sys.exc_info()[2])
1152 1151
1153 1152 log.error('{}'.format(err[-1]), procUnitConfObj.name)
1154 1153
1155 1154 message = ''.join(err)
1156 1155
1157 1156 if stdout:
1158 1157 sys.stderr.write(message)
1159 1158
1160 1159 subject = 'SChain v%s: Error running %s\n' % (
1161 1160 schainpy.__version__, procUnitConfObj.name)
1162 1161
1163 1162 subtitle = '%s: %s\n' % (
1164 1163 procUnitConfObj.getElementName(), procUnitConfObj.name)
1165 1164 subtitle += 'Hostname: %s\n' % socket.gethostbyname(
1166 1165 socket.gethostname())
1167 1166 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1168 1167 subtitle += 'Configuration file: %s\n' % self.filename
1169 1168 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1170 1169
1171 1170 readUnitConfObj = self.getReadUnitObj()
1172 1171 if readUnitConfObj:
1173 1172 subtitle += '\nInput parameters:\n'
1174 1173 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1175 1174 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1176 1175 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1177 1176 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1178 1177 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1179 1178 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1180 1179
1181 1180 a = Alarm(
1182 1181 modes=modes,
1183 1182 email=self.email,
1184 1183 message=message,
1185 1184 subject=subject,
1186 1185 subtitle=subtitle,
1187 1186 filename=self.filename
1188 1187 )
1189 1188
1190 1189 return a
1191 1190
1192 1191 def isPaused(self):
1193 1192 return 0
1194 1193
1195 1194 def isStopped(self):
1196 1195 return 0
1197 1196
1198 1197 def runController(self):
1199 1198 '''
1200 1199 returns 0 when this process has been stopped, 1 otherwise
1201 1200 '''
1202 1201
1203 1202 if self.isPaused():
1204 1203 print('Process suspended')
1205 1204
1206 1205 while True:
1207 1206 time.sleep(0.1)
1208 1207
1209 1208 if not self.isPaused():
1210 1209 break
1211 1210
1212 1211 if self.isStopped():
1213 1212 break
1214 1213
1215 1214 print('Process reinitialized')
1216 1215
1217 1216 if self.isStopped():
1218 1217 print('Process stopped')
1219 1218 return 0
1220 1219
1221 1220 return 1
1222 1221
1223 1222 def setFilename(self, filename):
1224 1223
1225 1224 self.filename = filename
1226 1225
1227 1226 def setProxyCom(self):
1228 1227
1229 1228 if not os.path.exists('/tmp/schain'):
1230 1229 os.mkdir('/tmp/schain')
1231 1230
1232 1231 self.ctx = zmq.Context()
1233 1232 xpub = self.ctx.socket(zmq.XPUB)
1234 1233 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1235 1234 xsub = self.ctx.socket(zmq.XSUB)
1236 1235 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1237 1236
1238 1237 try:
1239 1238 zmq.proxy(xpub, xsub)
1240 1239 except: # zmq.ContextTerminated:
1241 1240 xpub.close()
1242 1241 xsub.close()
1243 1242
1244 1243 def run(self):
1245 1244
1246 1245 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1247 1246 self.start_time = time.time()
1248 1247 self.createObjects()
1249 1248 # t = Thread(target=wait, args=(self.ctx, ))
1250 1249 # t.start()
1251 1250 self.setProxyCom()
1252 1251
1253 1252 # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo
1254 1253
1255 1254 log.success('{} Done (time: {}s)'.format(
1256 1255 self.name,
1257 1256 time.time()-self.start_time))
@@ -1,803 +1,800
1 1
2 2 import os
3 3 import sys
4 4 import zmq
5 5 import time
6 6 import datetime
7 7 from functools import wraps
8 8 import numpy
9 9 import matplotlib
10 10
11 11 if 'BACKEND' in os.environ:
12 12 matplotlib.use(os.environ['BACKEND'])
13 13 elif 'linux' in sys.platform:
14 14 matplotlib.use("TkAgg")
15 15 elif 'darwin' in sys.platform:
16 16 matplotlib.use('TkAgg')
17 17 else:
18 18 from schainpy.utils import log
19 19 log.warning('Using default Backend="Agg"', 'INFO')
20 20 matplotlib.use('Agg')
21 21
22 22 import matplotlib.pyplot as plt
23 23 from matplotlib.patches import Polygon
24 24 from mpl_toolkits.axes_grid1 import make_axes_locatable
25 25 from matplotlib.ticker import FuncFormatter, LinearLocator, MultipleLocator
26 26
27 27 from schainpy.model.data.jrodata import PlotterData
28 28 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
29 29 from schainpy.utils import log
30 30
31 31 jet_values = matplotlib.pyplot.get_cmap('jet', 100)(numpy.arange(100))[10:90]
32 32 blu_values = matplotlib.pyplot.get_cmap(
33 33 'seismic_r', 20)(numpy.arange(20))[10:15]
34 34 ncmap = matplotlib.colors.LinearSegmentedColormap.from_list(
35 35 'jro', numpy.vstack((blu_values, jet_values)))
36 36 matplotlib.pyplot.register_cmap(cmap=ncmap)
37 37
38 38 CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis',
39 39 'plasma', 'inferno', 'Greys', 'seismic', 'bwr', 'coolwarm')]
40 40
41 41 EARTH_RADIUS = 6.3710e3
42 42
43 43
44 44 def ll2xy(lat1, lon1, lat2, lon2):
45 45
46 46 p = 0.017453292519943295
47 47 a = 0.5 - numpy.cos((lat2 - lat1) * p)/2 + numpy.cos(lat1 * p) * \
48 48 numpy.cos(lat2 * p) * (1 - numpy.cos((lon2 - lon1) * p)) / 2
49 49 r = 12742 * numpy.arcsin(numpy.sqrt(a))
50 50 theta = numpy.arctan2(numpy.sin((lon2-lon1)*p)*numpy.cos(lat2*p), numpy.cos(lat1*p)
51 51 * numpy.sin(lat2*p)-numpy.sin(lat1*p)*numpy.cos(lat2*p)*numpy.cos((lon2-lon1)*p))
52 52 theta = -theta + numpy.pi/2
53 53 return r*numpy.cos(theta), r*numpy.sin(theta)
54 54
55 55
56 56 def km2deg(km):
57 57 '''
58 58 Convert distance in km to degrees
59 59 '''
60 60
61 61 return numpy.rad2deg(km/EARTH_RADIUS)
62 62
63 63
64 64 def figpause(interval):
65 65 backend = plt.rcParams['backend']
66 66 if backend in matplotlib.rcsetup.interactive_bk:
67 67 figManager = matplotlib._pylab_helpers.Gcf.get_active()
68 68 if figManager is not None:
69 69 canvas = figManager.canvas
70 70 if canvas.figure.stale:
71 71 canvas.draw()
72 72 try:
73 73 canvas.start_event_loop(interval)
74 74 except:
75 75 pass
76 76 return
77 77
78 78
79 79 def popup(message):
80 80 '''
81 81 '''
82 82
83 83 fig = plt.figure(figsize=(12, 8), facecolor='r')
84 84 text = '\n'.join([s.strip() for s in message.split(':')])
85 85 fig.text(0.01, 0.5, text, ha='left', va='center',
86 86 size='20', weight='heavy', color='w')
87 87 fig.show()
88 88 figpause(1000)
89 89
90 90
91 91 class Throttle(object):
92 92 '''
93 93 Decorator that prevents a function from being called more than once every
94 94 time period.
95 95 To create a function that cannot be called more than once a minute, but
96 96 will sleep until it can be called:
97 97 @Throttle(minutes=1)
98 98 def foo():
99 99 pass
100 100
101 101 for i in range(10):
102 102 foo()
103 103 print "This function has run %s times." % i
104 104 '''
105 105
106 106 def __init__(self, seconds=0, minutes=0, hours=0):
107 107 self.throttle_period = datetime.timedelta(
108 108 seconds=seconds, minutes=minutes, hours=hours
109 109 )
110 110
111 111 self.time_of_last_call = datetime.datetime.min
112 112
113 113 def __call__(self, fn):
114 114 @wraps(fn)
115 115 def wrapper(*args, **kwargs):
116 116 coerce = kwargs.pop('coerce', None)
117 117 if coerce:
118 118 self.time_of_last_call = datetime.datetime.now()
119 119 return fn(*args, **kwargs)
120 120 else:
121 121 now = datetime.datetime.now()
122 122 time_since_last_call = now - self.time_of_last_call
123 123 time_left = self.throttle_period - time_since_last_call
124 124
125 125 if time_left > datetime.timedelta(seconds=0):
126 126 return
127 127
128 128 self.time_of_last_call = datetime.datetime.now()
129 129 return fn(*args, **kwargs)
130 130
131 131 return wrapper
132 132
133 133 def apply_throttle(value):
134 134
135 135 @Throttle(seconds=value)
136 136 def fnThrottled(fn):
137 137 fn()
138 138
139 139 return fnThrottled
140 140
141 141 @MPDecorator
142 142 class Plotter(ProcessingUnit):
143 143 '''
144 144 Proccessing unit to handle plot operations
145 145 '''
146 146
147 147 def __init__(self):
148 148
149 149 ProcessingUnit.__init__(self)
150 150
151 151 def setup(self, **kwargs):
152 152
153 153 self.connections = 0
154 154 self.web_address = kwargs.get('web_server', False)
155 155 self.realtime = kwargs.get('realtime', False)
156 156 self.localtime = kwargs.get('localtime', True)
157 157 self.buffering = kwargs.get('buffering', True)
158 158 self.throttle = kwargs.get('throttle', 2)
159 159 self.exp_code = kwargs.get('exp_code', None)
160 160 self.set_ready = apply_throttle(self.throttle)
161 161 self.dates = []
162 162 self.data = PlotterData(
163 163 self.plots, self.throttle, self.exp_code, self.buffering)
164 164 self.isConfig = True
165 165
166 166 def ready(self):
167 167 '''
168 168 Set dataOut ready
169 169 '''
170 170
171 171 self.data.ready = True
172 172 self.dataOut.data_plt = self.data
173 173
174 174 def run(self, realtime=True, localtime=True, buffering=True,
175 175 throttle=2, exp_code=None, web_server=None):
176 176
177 177 if not self.isConfig:
178 178 self.setup(realtime=realtime, localtime=localtime,
179 179 buffering=buffering, throttle=throttle, exp_code=exp_code,
180 180 web_server=web_server)
181 181
182 182 if self.web_address:
183 183 log.success(
184 184 'Sending to web: {}'.format(self.web_address),
185 185 self.name
186 186 )
187 187 self.context = zmq.Context()
188 188 self.sender_web = self.context.socket(zmq.REQ)
189 189 self.sender_web.connect(self.web_address)
190 190 self.poll = zmq.Poller()
191 191 self.poll.register(self.sender_web, zmq.POLLIN)
192 192 time.sleep(1)
193 193
194 194 # t = Thread(target=self.event_monitor, args=(monitor,))
195 195 # t.start()
196 196
197 197 self.dataOut = self.dataIn
198 198 self.data.ready = False
199 199
200 200 if self.dataOut.flagNoData:
201 201 coerce = True
202 202 else:
203 203 coerce = False
204 204
205 205 if self.dataOut.type == 'Parameters':
206 206 tm = self.dataOut.utctimeInit
207 207 else:
208 208 tm = self.dataOut.utctime
209 209 if self.dataOut.useLocalTime:
210 210 if not self.localtime:
211 211 tm += time.timezone
212 212 dt = datetime.datetime.fromtimestamp(tm).date()
213 213 else:
214 214 if self.localtime:
215 215 tm -= time.timezone
216 216 dt = datetime.datetime.utcfromtimestamp(tm).date()
217 217 if dt not in self.dates:
218 218 if self.data:
219 219 self.ready()
220 220 self.data.setup()
221 221 self.dates.append(dt)
222 222
223 223 self.data.update(self.dataOut, tm)
224 224
225 225 if False: # TODO check when publishers ends
226 226 self.connections -= 1
227 227 if self.connections == 0 and dt in self.dates:
228 228 self.data.ended = True
229 229 self.ready()
230 230 time.sleep(1)
231 231 else:
232 232 if self.realtime:
233 233 self.ready()
234 234 if self.web_address:
235 235 retries = 5
236 236 while True:
237 237 self.sender_web.send(self.data.jsonify())
238 238 socks = dict(self.poll.poll(5000))
239 239 if socks.get(self.sender_web) == zmq.POLLIN:
240 240 reply = self.sender_web.recv_string()
241 241 if reply == 'ok':
242 242 log.log("Response from server ok", self.name)
243 243 break
244 244 else:
245 245 log.warning(
246 246 "Malformed reply from server: {}".format(reply), self.name)
247 247
248 248 else:
249 249 log.warning(
250 250 "No response from server, retrying...", self.name)
251 251 self.sender_web.setsockopt(zmq.LINGER, 0)
252 252 self.sender_web.close()
253 253 self.poll.unregister(self.sender_web)
254 254 retries -= 1
255 255 if retries == 0:
256 256 log.error(
257 257 "Server seems to be offline, abandoning", self.name)
258 258 self.sender_web = self.context.socket(zmq.REQ)
259 259 self.sender_web.connect(self.web_address)
260 260 self.poll.register(self.sender_web, zmq.POLLIN)
261 261 time.sleep(1)
262 262 break
263 263 self.sender_web = self.context.socket(zmq.REQ)
264 264 self.sender_web.connect(self.web_address)
265 265 self.poll.register(self.sender_web, zmq.POLLIN)
266 266 time.sleep(1)
267 267 else:
268 268 self.set_ready(self.ready, coerce=coerce)
269 269
270 270 return
271 271
272 272 def close(self):
273 273 pass
274 274
275 275
276 276 @MPDecorator
277 277 class Plot(Operation):
278 278 '''
279 279 Base class for Schain plotting operations
280 280 '''
281 281
282 282 CODE = 'Figure'
283 283 colormap = 'jro'
284 284 bgcolor = 'white'
285 285 __missing = 1E30
286 286
287 287 __attrs__ = ['show', 'save', 'xmin', 'xmax', 'ymin', 'ymax', 'zmin', 'zmax',
288 288 'zlimits', 'xlabel', 'ylabel', 'xaxis', 'cb_label', 'title',
289 289 'colorbar', 'bgcolor', 'width', 'height', 'localtime', 'oneFigure',
290 290 'showprofile', 'decimation', 'pause']
291 291
292 292 def __init__(self):
293 293
294 294 Operation.__init__(self)
295 295 self.isConfig = False
296 296 self.isPlotConfig = False
297 297
298 298 def __fmtTime(self, x, pos):
299 299 '''
300 300 '''
301 301
302 302 return '{}'.format(self.getDateTime(x).strftime('%H:%M'))
303 303
304 304 def __setup(self, **kwargs):
305 305 '''
306 306 Initialize variables
307 307 '''
308 308
309 309 self.figures = []
310 310 self.axes = []
311 311 self.cb_axes = []
312 312 self.localtime = kwargs.pop('localtime', True)
313 313 self.show = kwargs.get('show', True)
314 314 self.save = kwargs.get('save', False)
315 315 self.ftp = kwargs.get('ftp', False)
316 316 self.colormap = kwargs.get('colormap', self.colormap)
317 317 self.colormap_coh = kwargs.get('colormap_coh', 'jet')
318 318 self.colormap_phase = kwargs.get('colormap_phase', 'RdBu_r')
319 319 self.colormaps = kwargs.get('colormaps', None)
320 320 self.bgcolor = kwargs.get('bgcolor', self.bgcolor)
321 321 self.showprofile = kwargs.get('showprofile', False)
322 322 self.title = kwargs.get('wintitle', self.CODE.upper())
323 323 self.cb_label = kwargs.get('cb_label', None)
324 324 self.cb_labels = kwargs.get('cb_labels', None)
325 325 self.labels = kwargs.get('labels', None)
326 326 self.xaxis = kwargs.get('xaxis', 'frequency')
327 327 self.zmin = kwargs.get('zmin', None)
328 328 self.zmax = kwargs.get('zmax', None)
329 329 self.zlimits = kwargs.get('zlimits', None)
330 330 self.xmin = kwargs.get('xmin', None)
331 331 self.xmax = kwargs.get('xmax', None)
332 332 self.xrange = kwargs.get('xrange', 12)
333 333 self.xscale = kwargs.get('xscale', None)
334 334 self.ymin = kwargs.get('ymin', None)
335 335 self.ymax = kwargs.get('ymax', None)
336 336 self.yscale = kwargs.get('yscale', None)
337 337 self.xlabel = kwargs.get('xlabel', None)
338 338 self.decimation = kwargs.get('decimation', None)
339 339 self.showSNR = kwargs.get('showSNR', False)
340 340 self.oneFigure = kwargs.get('oneFigure', True)
341 341 self.width = kwargs.get('width', None)
342 342 self.height = kwargs.get('height', None)
343 343 self.colorbar = kwargs.get('colorbar', True)
344 344 self.factors = kwargs.get('factors', [1, 1, 1, 1, 1, 1, 1, 1])
345 345 self.channels = kwargs.get('channels', None)
346 346 self.titles = kwargs.get('titles', [])
347 347 self.polar = False
348 348 self.grid = kwargs.get('grid', False)
349 349 self.pause = kwargs.get('pause', False)
350 350 self.save_labels = kwargs.get('save_labels', None)
351 351 self.realtime = kwargs.get('realtime', True)
352 352 self.buffering = kwargs.get('buffering', True)
353 353 self.throttle = kwargs.get('throttle', 2)
354 354 self.exp_code = kwargs.get('exp_code', None)
355 355 self.__throttle_plot = apply_throttle(self.throttle)
356 356 self.data = PlotterData(
357 357 self.CODE, self.throttle, self.exp_code, self.buffering)
358 358
359 359 def __setup_plot(self):
360 360 '''
361 361 Common setup for all figures, here figures and axes are created
362 362 '''
363 363
364 364 self.setup()
365 365
366 366 self.time_label = 'LT' if self.localtime else 'UTC'
367 367 if self.data.localtime:
368 368 self.getDateTime = datetime.datetime.fromtimestamp
369 369 else:
370 370 self.getDateTime = datetime.datetime.utcfromtimestamp
371 371
372 372 if self.width is None:
373 373 self.width = 8
374 374
375 375 self.figures = []
376 376 self.axes = []
377 377 self.cb_axes = []
378 378 self.pf_axes = []
379 379 self.cmaps = []
380 380
381 381 size = '15%' if self.ncols == 1 else '30%'
382 382 pad = '4%' if self.ncols == 1 else '8%'
383 383
384 384 if self.oneFigure:
385 385 if self.height is None:
386 386 self.height = 1.4 * self.nrows + 1
387 387 fig = plt.figure(figsize=(self.width, self.height),
388 388 edgecolor='k',
389 389 facecolor='w')
390 390 self.figures.append(fig)
391 391 for n in range(self.nplots):
392 392 ax = fig.add_subplot(self.nrows, self.ncols,
393 393 n + 1, polar=self.polar)
394 394 ax.tick_params(labelsize=8)
395 395 ax.firsttime = True
396 396 ax.index = 0
397 397 ax.press = None
398 398 self.axes.append(ax)
399 399 if self.showprofile:
400 400 cax = self.__add_axes(ax, size=size, pad=pad)
401 401 cax.tick_params(labelsize=8)
402 402 self.pf_axes.append(cax)
403 403 else:
404 404 if self.height is None:
405 405 self.height = 3
406 406 for n in range(self.nplots):
407 407 fig = plt.figure(figsize=(self.width, self.height),
408 408 edgecolor='k',
409 409 facecolor='w')
410 410 ax = fig.add_subplot(1, 1, 1, polar=self.polar)
411 411 ax.tick_params(labelsize=8)
412 412 ax.firsttime = True
413 413 ax.index = 0
414 414 ax.press = None
415 415 self.figures.append(fig)
416 416 self.axes.append(ax)
417 417 if self.showprofile:
418 418 cax = self.__add_axes(ax, size=size, pad=pad)
419 419 cax.tick_params(labelsize=8)
420 420 self.pf_axes.append(cax)
421 421
422 422 for n in range(self.nrows):
423 423 if self.colormaps is not None:
424 424 cmap = plt.get_cmap(self.colormaps[n])
425 425 else:
426 426 cmap = plt.get_cmap(self.colormap)
427 427 cmap.set_bad(self.bgcolor, 1.)
428 428 self.cmaps.append(cmap)
429 429
430 430 for fig in self.figures:
431 431 fig.canvas.mpl_connect('key_press_event', self.OnKeyPress)
432 432 fig.canvas.mpl_connect('scroll_event', self.OnBtnScroll)
433 433 fig.canvas.mpl_connect('button_press_event', self.onBtnPress)
434 434 fig.canvas.mpl_connect('motion_notify_event', self.onMotion)
435 435 fig.canvas.mpl_connect('button_release_event', self.onBtnRelease)
436 436 if self.show:
437 437 fig.show()
438 438
439 439 def OnKeyPress(self, event):
440 440 '''
441 441 Event for pressing keys (up, down) change colormap
442 442 '''
443 443 ax = event.inaxes
444 444 if ax in self.axes:
445 445 if event.key == 'down':
446 446 ax.index += 1
447 447 elif event.key == 'up':
448 448 ax.index -= 1
449 449 if ax.index < 0:
450 450 ax.index = len(CMAPS) - 1
451 451 elif ax.index == len(CMAPS):
452 452 ax.index = 0
453 453 cmap = CMAPS[ax.index]
454 454 ax.cbar.set_cmap(cmap)
455 455 ax.cbar.draw_all()
456 456 ax.plt.set_cmap(cmap)
457 457 ax.cbar.patch.figure.canvas.draw()
458 458 self.colormap = cmap.name
459 459
460 460 def OnBtnScroll(self, event):
461 461 '''
462 462 Event for scrolling, scale figure
463 463 '''
464 464 cb_ax = event.inaxes
465 465 if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]:
466 466 ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0]
467 467 pt = ax.cbar.ax.bbox.get_points()[:, 1]
468 468 nrm = ax.cbar.norm
469 469 vmin, vmax, p0, p1, pS = (
470 470 nrm.vmin, nrm.vmax, pt[0], pt[1], event.y)
471 471 scale = 2 if event.step == 1 else 0.5
472 472 point = vmin + (vmax - vmin) / (p1 - p0) * (pS - p0)
473 473 ax.cbar.norm.vmin = point - scale * (point - vmin)
474 474 ax.cbar.norm.vmax = point - scale * (point - vmax)
475 475 ax.plt.set_norm(ax.cbar.norm)
476 476 ax.cbar.draw_all()
477 477 ax.cbar.patch.figure.canvas.draw()
478 478
479 479 def onBtnPress(self, event):
480 480 '''
481 481 Event for mouse button press
482 482 '''
483 483 cb_ax = event.inaxes
484 484 if cb_ax is None:
485 485 return
486 486
487 487 if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]:
488 488 cb_ax.press = event.x, event.y
489 489 else:
490 490 cb_ax.press = None
491 491
492 492 def onMotion(self, event):
493 493 '''
494 494 Event for move inside colorbar
495 495 '''
496 496 cb_ax = event.inaxes
497 497 if cb_ax is None:
498 498 return
499 499 if cb_ax not in [ax.cbar.ax for ax in self.axes if ax.cbar]:
500 500 return
501 501 if cb_ax.press is None:
502 502 return
503 503
504 504 ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0]
505 505 xprev, yprev = cb_ax.press
506 506 dx = event.x - xprev
507 507 dy = event.y - yprev
508 508 cb_ax.press = event.x, event.y
509 509 scale = ax.cbar.norm.vmax - ax.cbar.norm.vmin
510 510 perc = 0.03
511 511
512 512 if event.button == 1:
513 513 ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy)
514 514 ax.cbar.norm.vmax -= (perc * scale) * numpy.sign(dy)
515 515 elif event.button == 3:
516 516 ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy)
517 517 ax.cbar.norm.vmax += (perc * scale) * numpy.sign(dy)
518 518
519 519 ax.cbar.draw_all()
520 520 ax.plt.set_norm(ax.cbar.norm)
521 521 ax.cbar.patch.figure.canvas.draw()
522 522
523 523 def onBtnRelease(self, event):
524 524 '''
525 525 Event for mouse button release
526 526 '''
527 527 cb_ax = event.inaxes
528 528 if cb_ax is not None:
529 529 cb_ax.press = None
530 530
531 531 def __add_axes(self, ax, size='30%', pad='8%'):
532 532 '''
533 533 Add new axes to the given figure
534 534 '''
535 535 divider = make_axes_locatable(ax)
536 536 nax = divider.new_horizontal(size=size, pad=pad)
537 537 ax.figure.add_axes(nax)
538 538 return nax
539 539
540 540 def setup(self):
541 541 '''
542 542 This method should be implemented in the child class, the following
543 543 attributes should be set:
544 544
545 545 self.nrows: number of rows
546 546 self.ncols: number of cols
547 547 self.nplots: number of plots (channels or pairs)
548 548 self.ylabel: label for Y axes
549 549 self.titles: list of axes title
550 550
551 551 '''
552 552 raise NotImplementedError
553 553
554 554 def fill_gaps(self, x_buffer, y_buffer, z_buffer):
555 555 '''
556 556 Create a masked array for missing data
557 557 '''
558 558 if x_buffer.shape[0] < 2:
559 559 return x_buffer, y_buffer, z_buffer
560 560
561 561 deltas = x_buffer[1:] - x_buffer[0:-1]
562 562 x_median = numpy.median(deltas)
563 563
564 564 index = numpy.where(deltas > 5 * x_median)
565 565
566 566 if len(index[0]) != 0:
567 567 z_buffer[::, index[0], ::] = self.__missing
568 568 z_buffer = numpy.ma.masked_inside(z_buffer,
569 569 0.99 * self.__missing,
570 570 1.01 * self.__missing)
571 571
572 572 return x_buffer, y_buffer, z_buffer
573 573
574 574 def decimate(self):
575 575
576 576 # dx = int(len(self.x)/self.__MAXNUMX) + 1
577 577 dy = int(len(self.y) / self.decimation) + 1
578 578
579 579 # x = self.x[::dx]
580 580 x = self.x
581 581 y = self.y[::dy]
582 582 z = self.z[::, ::, ::dy]
583 583
584 584 return x, y, z
585 585
586 586 def format(self):
587 587 '''
588 588 Set min and max values, labels, ticks and titles
589 589 '''
590 590
591 591 if self.xmin is None:
592 592 xmin = self.data.min_time
593 593 else:
594 594 if self.xaxis is 'time':
595 595 dt = self.getDateTime(self.data.min_time)
596 596 xmin = (dt.replace(hour=int(self.xmin), minute=0, second=0) -
597 597 datetime.datetime(1970, 1, 1)).total_seconds()
598 598 if self.data.localtime:
599 599 xmin += time.timezone
600 600 else:
601 601 xmin = self.xmin
602 602
603 603 if self.xmax is None:
604 604 xmax = xmin + self.xrange * 60 * 60
605 605 else:
606 606 if self.xaxis is 'time':
607 607 dt = self.getDateTime(self.data.max_time)
608 608 xmax = (dt.replace(hour=int(self.xmax), minute=59, second=59) -
609 609 datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=1)).total_seconds()
610 610 if self.data.localtime:
611 611 xmax += time.timezone
612 612 else:
613 613 xmax = self.xmax
614 614
615 615 ymin = self.ymin if self.ymin else numpy.nanmin(self.y)
616 616 ymax = self.ymax if self.ymax else numpy.nanmax(self.y)
617 617
618 618 Y = numpy.array([1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000])
619 619 i = 1 if numpy.where(
620 620 abs(ymax-ymin) <= Y)[0][0] < 0 else numpy.where(abs(ymax-ymin) <= Y)[0][0]
621 621 ystep = Y[i] / 10.
622 622
623 623 if self.xaxis is not 'time':
624 624 X = numpy.array([1, 2, 5, 10, 20, 50, 100,
625 625 200, 500, 1000, 2000, 5000])/2.
626 626 i = 1 if numpy.where(
627 627 abs(xmax-xmin) <= X)[0][0] < 0 else numpy.where(abs(xmax-xmin) <= X)[0][0]
628 628 xstep = X[i] / 10.
629 629
630 630 for n, ax in enumerate(self.axes):
631 631 if ax.firsttime:
632 632 ax.set_facecolor(self.bgcolor)
633 633 ax.yaxis.set_major_locator(MultipleLocator(ystep))
634 634 if self.xscale:
635 635 ax.xaxis.set_major_formatter(FuncFormatter(
636 636 lambda x, pos: '{0:g}'.format(x*self.xscale)))
637 637 if self.xscale:
638 638 ax.yaxis.set_major_formatter(FuncFormatter(
639 639 lambda x, pos: '{0:g}'.format(x*self.yscale)))
640 640 if self.xaxis is 'time':
641 641 ax.xaxis.set_major_formatter(FuncFormatter(self.__fmtTime))
642 642 ax.xaxis.set_major_locator(LinearLocator(9))
643 643 else:
644 644 ax.xaxis.set_major_locator(MultipleLocator(xstep))
645 645 if self.xlabel is not None:
646 646 ax.set_xlabel(self.xlabel)
647 647 ax.set_ylabel(self.ylabel)
648 648 ax.firsttime = False
649 649 if self.showprofile:
650 650 self.pf_axes[n].set_ylim(ymin, ymax)
651 651 self.pf_axes[n].set_xlim(self.zmin, self.zmax)
652 652 self.pf_axes[n].set_xlabel('dB')
653 653 self.pf_axes[n].grid(b=True, axis='x')
654 654 [tick.set_visible(False)
655 655 for tick in self.pf_axes[n].get_yticklabels()]
656 656 if self.colorbar:
657 657 ax.cbar = plt.colorbar(
658 658 ax.plt, ax=ax, fraction=0.05, pad=0.02, aspect=10)
659 659 ax.cbar.ax.tick_params(labelsize=8)
660 660 ax.cbar.ax.press = None
661 661 if self.cb_label:
662 662 ax.cbar.set_label(self.cb_label, size=8)
663 663 elif self.cb_labels:
664 664 ax.cbar.set_label(self.cb_labels[n], size=8)
665 665 else:
666 666 ax.cbar = None
667 667 if self.grid:
668 668 ax.grid(True)
669 669
670 670 if not self.polar:
671 671 ax.set_xlim(xmin, xmax)
672 672 ax.set_ylim(ymin, ymax)
673 673 ax.set_title('{} {} {}'.format(
674 674 self.titles[n],
675 675 self.getDateTime(self.data.max_time).strftime(
676 676 '%Y-%m-%dT%H:%M:%S'),
677 677 self.time_label),
678 678 size=8)
679 679 else:
680 680 ax.set_title('{}'.format(self.titles[n]), size=8)
681 681 ax.set_ylim(0, 90)
682 682 ax.set_yticks(numpy.arange(0, 90, 20))
683 683 ax.yaxis.labelpad = 40
684 684
685 685 def clear_figures(self):
686 686 '''
687 687 Reset axes for redraw plots
688 688 '''
689 689
690 690 for ax in self.axes:
691 691 ax.clear()
692 692 ax.firsttime = True
693 693 if ax.cbar:
694 694 ax.cbar.remove()
695 695
696 696 def __plot(self):
697 697 '''
698 698 Main function to plot, format and save figures
699 699 '''
700 700
701 701 #try:
702 702 self.plot()
703 703 self.format()
704 704 #except Exception as e:
705 705 # log.warning('{} Plot could not be updated... check data'.format(
706 706 # self.CODE), self.name)
707 707 # log.error(str(e), '')
708 708 # return
709 709
710 710 for n, fig in enumerate(self.figures):
711 711 if self.nrows == 0 or self.nplots == 0:
712 712 log.warning('No data', self.name)
713 713 fig.text(0.5, 0.5, 'No Data', fontsize='large', ha='center')
714 714 fig.canvas.manager.set_window_title(self.CODE)
715 715 continue
716 716
717 717 fig.tight_layout()
718 718 fig.canvas.manager.set_window_title('{} - {}'.format(self.title,
719 719 self.getDateTime(self.data.max_time).strftime('%Y/%m/%d')))
720 720 fig.canvas.draw()
721 721
722 722 if self.save:
723 723
724 724 if self.save_labels:
725 725 labels = self.save_labels
726 726 else:
727 727 labels = list(range(self.nrows))
728 728
729 729 if self.oneFigure:
730 730 label = ''
731 731 else:
732 732 label = '-{}'.format(labels[n])
733 733 figname = os.path.join(
734 734 self.save,
735 735 self.CODE,
736 736 '{}{}_{}.png'.format(
737 737 self.CODE,
738 738 label,
739 739 self.getDateTime(self.data.max_time).strftime(
740 740 '%Y%m%d_%H%M%S'),
741 741 )
742 742 )
743 743 log.log('Saving figure: {}'.format(figname), self.name)
744 744 if not os.path.isdir(os.path.dirname(figname)):
745 745 os.makedirs(os.path.dirname(figname))
746 746 fig.savefig(figname)
747 747
748 748 def plot(self):
749 749 '''
750 750 Must be defined in the child class
751 751 '''
752 752 raise NotImplementedError
753 753
754 754 def run(self, dataOut, **kwargs):
755 755
756 if dataOut.flagNoData and not dataOut.error:
757 return dataOut
758
759 756 if dataOut.error:
760 757 coerce = True
761 758 else:
762 759 coerce = False
763 760
764 761 if self.isConfig is False:
765 762 self.__setup(**kwargs)
766 763 self.data.setup()
767 764 self.isConfig = True
768 765
769 766 if dataOut.type == 'Parameters':
770 767 tm = dataOut.utctimeInit
771 768 else:
772 769 tm = dataOut.utctime
773 770
774 771 if dataOut.useLocalTime:
775 772 if not self.localtime:
776 773 tm += time.timezone
777 774 else:
778 775 if self.localtime:
779 776 tm -= time.timezone
780 777
781 778 if self.data and (tm - self.data.min_time) >= self.xrange*60*60:
782 779 self.__plot()
783 780 self.data.setup()
784 781 self.clear_figures()
785 782
786 783 self.data.update(dataOut, tm)
787 784
788 785 if self.isPlotConfig is False:
789 786 self.__setup_plot()
790 787 self.isPlotConfig = True
791 788
792 789 if self.realtime:
793 790 self.__plot()
794 791 else:
795 792 self.__throttle_plot(self.__plot, coerce=coerce)
796 793
797 794 figpause(0.001)
798 795
799 796 def close(self):
800 797
801 798 if self.data and self.pause:
802 799 figpause(10)
803 800
@@ -1,368 +1,368
1 1 '''
2 2 Created on Nov 9, 2016
3 3
4 4 @author: roj- LouVD
5 5 '''
6 6
7 7
8 8 import os
9 9 import sys
10 10 import time
11 11 import glob
12 12 import datetime
13 13
14 14 import numpy
15 15
16 16 from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator
17 17 from schainpy.model.data.jrodata import Parameters
18 18 from schainpy.model.io.jroIO_base import JRODataReader, isNumber
19 19 from schainpy.utils import log
20 20
21 21 FILE_HEADER_STRUCTURE = numpy.dtype([
22 22 ('FMN', '<u4'),
23 23 ('nrec', '<u4'),
24 24 ('fr_offset', '<u4'),
25 25 ('id', '<u4'),
26 26 ('site', 'u1', (32,))
27 27 ])
28 28
29 29 REC_HEADER_STRUCTURE = numpy.dtype([
30 30 ('rmn', '<u4'),
31 31 ('rcounter', '<u4'),
32 32 ('nr_offset', '<u4'),
33 33 ('tr_offset', '<u4'),
34 34 ('time', '<u4'),
35 35 ('time_msec', '<u4'),
36 36 ('tag', 'u1', (32,)),
37 37 ('comments', 'u1', (32,)),
38 38 ('lat', '<f4'),
39 39 ('lon', '<f4'),
40 40 ('gps_status', '<u4'),
41 41 ('freq', '<u4'),
42 42 ('freq0', '<u4'),
43 43 ('nchan', '<u4'),
44 44 ('delta_r', '<u4'),
45 45 ('nranges', '<u4'),
46 46 ('r0', '<u4'),
47 47 ('prf', '<u4'),
48 48 ('ncoh', '<u4'),
49 49 ('npoints', '<u4'),
50 50 ('polarization', '<i4'),
51 51 ('rx_filter', '<u4'),
52 52 ('nmodes', '<u4'),
53 53 ('dmode_index', '<u4'),
54 54 ('dmode_rngcorr', '<u4'),
55 55 ('nrxs', '<u4'),
56 56 ('acf_length', '<u4'),
57 57 ('acf_lags', '<u4'),
58 58 ('sea_to_atmos', '<f4'),
59 59 ('sea_notch', '<u4'),
60 60 ('lh_sea', '<u4'),
61 61 ('hh_sea', '<u4'),
62 62 ('nbins_sea', '<u4'),
63 63 ('min_snr', '<f4'),
64 64 ('min_cc', '<f4'),
65 65 ('max_time_diff', '<f4')
66 66 ])
67 67
68 68 DATA_STRUCTURE = numpy.dtype([
69 69 ('range', '<u4'),
70 70 ('status', '<u4'),
71 71 ('zonal', '<f4'),
72 72 ('meridional', '<f4'),
73 73 ('vertical', '<f4'),
74 74 ('zonal_a', '<f4'),
75 75 ('meridional_a', '<f4'),
76 76 ('corrected_fading', '<f4'), # seconds
77 77 ('uncorrected_fading', '<f4'), # seconds
78 78 ('time_diff', '<f4'),
79 79 ('major_axis', '<f4'),
80 80 ('axial_ratio', '<f4'),
81 81 ('orientation', '<f4'),
82 82 ('sea_power', '<u4'),
83 83 ('sea_algorithm', '<u4')
84 84 ])
85 85
86 86 @MPDecorator
87 87 class BLTRParamReader(JRODataReader, ProcessingUnit):
88 88 '''
89 89 Boundary Layer and Tropospheric Radar (BLTR) reader, Wind velocities and SNR from *.sswma files
90 90 '''
91 91
92 92 ext = '.sswma'
93 93
94 94 def __init__(self):
95 95
96 96 ProcessingUnit.__init__(self)
97 97
98 98 self.dataOut = Parameters()
99 99 self.counter_records = 0
100 100 self.flagNoMoreFiles = 0
101 101 self.isConfig = False
102 102 self.filename = None
103 103
104 104 def setup(self,
105 105 path=None,
106 106 startDate=None,
107 107 endDate=None,
108 108 ext=None,
109 109 startTime=datetime.time(0, 0, 0),
110 110 endTime=datetime.time(23, 59, 59),
111 111 timezone=0,
112 112 status_value=0,
113 113 **kwargs):
114 114 self.path = path
115 115 self.startDate = startDate
116 116 self.endDate = endDate
117 117 self.startTime = startTime
118 118 self.endTime = endTime
119 119 self.status_value = status_value
120 120 self.datatime = datetime.datetime(1900,1,1)
121 121
122 122 if self.path is None:
123 123 raise ValueError("The path is not valid")
124 124
125 125 if ext is None:
126 126 ext = self.ext
127 127
128 128 self.search_files(self.path, startDate, endDate, ext)
129 129 self.timezone = timezone
130 130 self.fileIndex = 0
131 131
132 132 if not self.fileList:
133 133 raise Warning("There is no files matching these date in the folder: %s. \n Check 'startDate' and 'endDate' " % (
134 134 path))
135 135
136 136 self.setNextFile()
137 137
138 138 def search_files(self, path, startDate, endDate, ext):
139 139 '''
140 140 Searching for BLTR rawdata file in path
141 141 Creating a list of file to proces included in [startDate,endDate]
142 142
143 143 Input:
144 144 path - Path to find BLTR rawdata files
145 145 startDate - Select file from this date
146 146 enDate - Select file until this date
147 147 ext - Extension of the file to read
148 148 '''
149 149
150 150 log.success('Searching files in {} '.format(path), 'BLTRParamReader')
151 151 foldercounter = 0
152 152 fileList0 = glob.glob1(path, "*%s" % ext)
153 153 fileList0.sort()
154 154
155 155 self.fileList = []
156 156 self.dateFileList = []
157 157
158 158 for thisFile in fileList0:
159 159 year = thisFile[-14:-10]
160 160 if not isNumber(year):
161 161 continue
162 162
163 163 month = thisFile[-10:-8]
164 164 if not isNumber(month):
165 165 continue
166 166
167 167 day = thisFile[-8:-6]
168 168 if not isNumber(day):
169 169 continue
170 170
171 171 year, month, day = int(year), int(month), int(day)
172 172 dateFile = datetime.date(year, month, day)
173 173
174 174 if (startDate > dateFile) or (endDate < dateFile):
175 175 continue
176 176
177 177 self.fileList.append(thisFile)
178 178 self.dateFileList.append(dateFile)
179 179
180 180 return
181 181
182 182 def setNextFile(self):
183 183
184 184 file_id = self.fileIndex
185 185
186 186 if file_id == len(self.fileList):
187 187 self.flagNoMoreFiles = 1
188 188 return 0
189 189
190 190 log.success('Opening {}'.format(self.fileList[file_id]), 'BLTRParamReader')
191 191 filename = os.path.join(self.path, self.fileList[file_id])
192 192
193 193 dirname, name = os.path.split(filename)
194 194 # 'peru2' ---> Piura - 'peru1' ---> Huancayo or Porcuya
195 195 self.siteFile = name.split('.')[0]
196 196 if self.filename is not None:
197 197 self.fp.close()
198 198 self.filename = filename
199 199 self.fp = open(self.filename, 'rb')
200 200 self.header_file = numpy.fromfile(self.fp, FILE_HEADER_STRUCTURE, 1)
201 201 self.nrecords = self.header_file['nrec'][0]
202 202 self.sizeOfFile = os.path.getsize(self.filename)
203 203 self.counter_records = 0
204 204 self.flagIsNewFile = 0
205 205 self.fileIndex += 1
206 206
207 207 return 1
208 208
209 209 def readNextBlock(self):
210 210
211 211 while True:
212 212 if self.counter_records == self.nrecords:
213 213 self.flagIsNewFile = 1
214 214 if not self.setNextFile():
215 215 return 0
216 216
217 217 self.readBlock()
218 218
219 219 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
220 220 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
221 221 log.warning(
222 222 'Reading Record No. {}/{} -> {} [Skipping]'.format(
223 223 self.counter_records,
224 224 self.nrecords,
225 225 self.datatime.ctime()),
226 226 'BLTRParamReader')
227 227 continue
228 228 break
229 229
230 230 log.log('Reading Record No. {}/{} -> {}'.format(
231 231 self.counter_records,
232 232 self.nrecords,
233 233 self.datatime.ctime()), 'BLTRParamReader')
234 234
235 235 return 1
236 236
237 237 def readBlock(self):
238 238
239 239 pointer = self.fp.tell()
240 240 header_rec = numpy.fromfile(self.fp, REC_HEADER_STRUCTURE, 1)
241 241 self.nchannels = int(header_rec['nchan'][0] / 2)
242 242 self.kchan = header_rec['nrxs'][0]
243 243 self.nmodes = header_rec['nmodes'][0]
244 244 self.nranges = header_rec['nranges'][0]
245 245 self.fp.seek(pointer)
246 246 self.height = numpy.empty((self.nmodes, self.nranges))
247 247 self.snr = numpy.empty((self.nmodes, int(self.nchannels), self.nranges))
248 248 self.buffer = numpy.empty((self.nmodes, 3, self.nranges))
249 249 self.flagDiscontinuousBlock = 0
250 250
251 251 for mode in range(self.nmodes):
252 252 self.readHeader()
253 253 data = self.readData()
254 254 self.height[mode] = (data[0] - self.correction) / 1000.
255 255 self.buffer[mode] = data[1]
256 256 self.snr[mode] = data[2]
257 257
258 258 self.counter_records = self.counter_records + self.nmodes
259 259
260 260 return
261 261
262 262 def readHeader(self):
263 263 '''
264 264 RecordHeader of BLTR rawdata file
265 265 '''
266 266
267 267 header_structure = numpy.dtype(
268 268 REC_HEADER_STRUCTURE.descr + [
269 269 ('antenna_coord', 'f4', (2, int(self.nchannels))),
270 270 ('rx_gains', 'u4', (int(self.nchannels),)),
271 271 ('rx_analysis', 'u4', (int(self.nchannels),))
272 272 ]
273 273 )
274 274
275 275 self.header_rec = numpy.fromfile(self.fp, header_structure, 1)
276 276 self.lat = self.header_rec['lat'][0]
277 277 self.lon = self.header_rec['lon'][0]
278 278 self.delta = self.header_rec['delta_r'][0]
279 279 self.correction = self.header_rec['dmode_rngcorr'][0]
280 280 self.imode = self.header_rec['dmode_index'][0]
281 281 self.antenna = self.header_rec['antenna_coord']
282 282 self.rx_gains = self.header_rec['rx_gains']
283 283 self.time = self.header_rec['time'][0]
284 284 dt = datetime.datetime.utcfromtimestamp(self.time)
285 285 if dt.date()>self.datatime.date():
286 286 self.flagDiscontinuousBlock = 1
287 287 self.datatime = dt
288 288
289 289 def readData(self):
290 290 '''
291 291 Reading and filtering data block record of BLTR rawdata file, filtering is according to status_value.
292 292
293 293 Input:
294 294 status_value - Array data is set to NAN for values that are not equal to status_value
295 295
296 296 '''
297 297 self.nchannels = int(self.nchannels)
298 298
299 299 data_structure = numpy.dtype(
300 300 DATA_STRUCTURE.descr + [
301 301 ('rx_saturation', 'u4', (self.nchannels,)),
302 302 ('chan_offset', 'u4', (2 * self.nchannels,)),
303 303 ('rx_amp', 'u4', (self.nchannels,)),
304 304 ('rx_snr', 'f4', (self.nchannels,)),
305 305 ('cross_snr', 'f4', (self.kchan,)),
306 306 ('sea_power_relative', 'f4', (self.kchan,))]
307 307 )
308 308
309 309 data = numpy.fromfile(self.fp, data_structure, self.nranges)
310 310
311 311 height = data['range']
312 312 winds = numpy.array(
313 313 (data['zonal'], data['meridional'], data['vertical']))
314 314 snr = data['rx_snr'].T
315 315
316 316 winds[numpy.where(winds == -9999.)] = numpy.nan
317 317 winds[:, numpy.where(data['status'] != self.status_value)] = numpy.nan
318 318 snr[numpy.where(snr == -9999.)] = numpy.nan
319 319 snr[:, numpy.where(data['status'] != self.status_value)] = numpy.nan
320 320 snr = numpy.power(10, snr / 10)
321 321
322 322 return height, winds, snr
323 323
324 324 def set_output(self):
325 325 '''
326 326 Storing data from databuffer to dataOut object
327 327 '''
328 328
329 329 self.dataOut.data_SNR = self.snr
330 330 self.dataOut.height = self.height
331 331 self.dataOut.data = self.buffer
332 332 self.dataOut.utctimeInit = self.time
333 333 self.dataOut.utctime = self.dataOut.utctimeInit
334 334 self.dataOut.useLocalTime = False
335 335 self.dataOut.paramInterval = 157
336 336 self.dataOut.timezone = self.timezone
337 337 self.dataOut.site = self.siteFile
338 338 self.dataOut.nrecords = self.nrecords / self.nmodes
339 339 self.dataOut.sizeOfFile = self.sizeOfFile
340 340 self.dataOut.lat = self.lat
341 341 self.dataOut.lon = self.lon
342 342 self.dataOut.channelList = list(range(self.nchannels))
343 343 self.dataOut.kchan = self.kchan
344 344 self.dataOut.delta = self.delta
345 345 self.dataOut.correction = self.correction
346 346 self.dataOut.nmodes = self.nmodes
347 347 self.dataOut.imode = self.imode
348 348 self.dataOut.antenna = self.antenna
349 349 self.dataOut.rx_gains = self.rx_gains
350 350 self.dataOut.flagNoData = False
351 351 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
352 352
353 353 def getData(self):
354 354 '''
355 355 Storing data from databuffer to dataOut object
356 356 '''
357 357 if self.flagNoMoreFiles:
358 358 self.dataOut.flagNoData = True
359 self.dataOut.error = (1, 'No More files to read')
359 self.dataOut.error = 'No More files to read'
360 360
361 361 if not self.readNextBlock():
362 362 self.dataOut.flagNoData = True
363 363 return 0
364 364
365 365 self.set_output()
366 366
367 367 return 1
368 368 No newline at end of file
@@ -1,1828 +1,1828
1 1 '''
2 2 Created on Jul 2, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6 import os
7 7 import sys
8 8 import glob
9 9 import time
10 10 import numpy
11 11 import fnmatch
12 12 import inspect
13 13 import time
14 14 import datetime
15 15 import traceback
16 16 import zmq
17 17
18 18 try:
19 19 from gevent import sleep
20 20 except:
21 21 from time import sleep
22 22
23 23 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
24 24 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
25 25 from schainpy.utils import log
26 26 import schainpy.admin
27 27
28 28 LOCALTIME = True
29 29
30 30
31 31 def isNumber(cad):
32 32 """
33 33 Chequea si el conjunto de caracteres que componen un string puede ser convertidos a un numero.
34 34
35 35 Excepciones:
36 36 Si un determinado string no puede ser convertido a numero
37 37 Input:
38 38 str, string al cual se le analiza para determinar si convertible a un numero o no
39 39
40 40 Return:
41 41 True : si el string es uno numerico
42 42 False : no es un string numerico
43 43 """
44 44 try:
45 45 float(cad)
46 46 return True
47 47 except:
48 48 return False
49 49
50 50
51 51 def isFileInEpoch(filename, startUTSeconds, endUTSeconds):
52 52 """
53 53 Esta funcion determina si un archivo de datos se encuentra o no dentro del rango de fecha especificado.
54 54
55 55 Inputs:
56 56 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
57 57
58 58 startUTSeconds : fecha inicial del rango seleccionado. La fecha esta dada en
59 59 segundos contados desde 01/01/1970.
60 60 endUTSeconds : fecha final del rango seleccionado. La fecha esta dada en
61 61 segundos contados desde 01/01/1970.
62 62
63 63 Return:
64 64 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
65 65 fecha especificado, de lo contrario retorna False.
66 66
67 67 Excepciones:
68 68 Si el archivo no existe o no puede ser abierto
69 69 Si la cabecera no puede ser leida.
70 70
71 71 """
72 72 basicHeaderObj = BasicHeader(LOCALTIME)
73 73
74 74 try:
75 75 fp = open(filename, 'rb')
76 76 except IOError:
77 77 print("The file %s can't be opened" % (filename))
78 78 return 0
79 79
80 80 sts = basicHeaderObj.read(fp)
81 81 fp.close()
82 82
83 83 if not(sts):
84 84 print("Skipping the file %s because it has not a valid header" % (filename))
85 85 return 0
86 86
87 87 if not ((startUTSeconds <= basicHeaderObj.utc) and (endUTSeconds > basicHeaderObj.utc)):
88 88 return 0
89 89
90 90 return 1
91 91
92 92
93 93 def isTimeInRange(thisTime, startTime, endTime):
94 94 if endTime >= startTime:
95 95 if (thisTime < startTime) or (thisTime > endTime):
96 96 return 0
97 97 return 1
98 98 else:
99 99 if (thisTime < startTime) and (thisTime > endTime):
100 100 return 0
101 101 return 1
102 102
103 103
104 104 def isFileInTimeRange(filename, startDate, endDate, startTime, endTime):
105 105 """
106 106 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
107 107
108 108 Inputs:
109 109 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
110 110
111 111 startDate : fecha inicial del rango seleccionado en formato datetime.date
112 112
113 113 endDate : fecha final del rango seleccionado en formato datetime.date
114 114
115 115 startTime : tiempo inicial del rango seleccionado en formato datetime.time
116 116
117 117 endTime : tiempo final del rango seleccionado en formato datetime.time
118 118
119 119 Return:
120 120 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
121 121 fecha especificado, de lo contrario retorna False.
122 122
123 123 Excepciones:
124 124 Si el archivo no existe o no puede ser abierto
125 125 Si la cabecera no puede ser leida.
126 126
127 127 """
128 128
129 129 try:
130 130 fp = open(filename, 'rb')
131 131 except IOError:
132 132 print("The file %s can't be opened" % (filename))
133 133 return None
134 134
135 135 firstBasicHeaderObj = BasicHeader(LOCALTIME)
136 136 systemHeaderObj = SystemHeader()
137 137 radarControllerHeaderObj = RadarControllerHeader()
138 138 processingHeaderObj = ProcessingHeader()
139 139
140 140 lastBasicHeaderObj = BasicHeader(LOCALTIME)
141 141
142 142 sts = firstBasicHeaderObj.read(fp)
143 143
144 144 if not(sts):
145 145 print("[Reading] Skipping the file %s because it has not a valid header" % (filename))
146 146 return None
147 147
148 148 if not systemHeaderObj.read(fp):
149 149 return None
150 150
151 151 if not radarControllerHeaderObj.read(fp):
152 152 return None
153 153
154 154 if not processingHeaderObj.read(fp):
155 155 return None
156 156
157 157 filesize = os.path.getsize(filename)
158 158
159 159 offset = processingHeaderObj.blockSize + 24 # header size
160 160
161 161 if filesize <= offset:
162 162 print("[Reading] %s: This file has not enough data" % filename)
163 163 return None
164 164
165 165 fp.seek(-offset, 2)
166 166
167 167 sts = lastBasicHeaderObj.read(fp)
168 168
169 169 fp.close()
170 170
171 171 thisDatetime = lastBasicHeaderObj.datatime
172 172 thisTime_last_block = thisDatetime.time()
173 173
174 174 thisDatetime = firstBasicHeaderObj.datatime
175 175 thisDate = thisDatetime.date()
176 176 thisTime_first_block = thisDatetime.time()
177 177
178 178 # General case
179 179 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
180 180 #-----------o----------------------------o-----------
181 181 # startTime endTime
182 182
183 183 if endTime >= startTime:
184 184 if (thisTime_last_block < startTime) or (thisTime_first_block > endTime):
185 185 return None
186 186
187 187 return thisDatetime
188 188
189 189 # If endTime < startTime then endTime belongs to the next day
190 190
191 191 #<<<<<<<<<<<o o>>>>>>>>>>>
192 192 #-----------o----------------------------o-----------
193 193 # endTime startTime
194 194
195 195 if (thisDate == startDate) and (thisTime_last_block < startTime):
196 196 return None
197 197
198 198 if (thisDate == endDate) and (thisTime_first_block > endTime):
199 199 return None
200 200
201 201 if (thisTime_last_block < startTime) and (thisTime_first_block > endTime):
202 202 return None
203 203
204 204 return thisDatetime
205 205
206 206
207 207 def isFolderInDateRange(folder, startDate=None, endDate=None):
208 208 """
209 209 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
210 210
211 211 Inputs:
212 212 folder : nombre completo del directorio.
213 213 Su formato deberia ser "/path_root/?YYYYDDD"
214 214
215 215 siendo:
216 216 YYYY : Anio (ejemplo 2015)
217 217 DDD : Dia del anio (ejemplo 305)
218 218
219 219 startDate : fecha inicial del rango seleccionado en formato datetime.date
220 220
221 221 endDate : fecha final del rango seleccionado en formato datetime.date
222 222
223 223 Return:
224 224 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
225 225 fecha especificado, de lo contrario retorna False.
226 226 Excepciones:
227 227 Si el directorio no tiene el formato adecuado
228 228 """
229 229
230 230 basename = os.path.basename(folder)
231 231
232 232 if not isRadarFolder(basename):
233 233 print("The folder %s has not the rigth format" % folder)
234 234 return 0
235 235
236 236 if startDate and endDate:
237 237 thisDate = getDateFromRadarFolder(basename)
238 238
239 239 if thisDate < startDate:
240 240 return 0
241 241
242 242 if thisDate > endDate:
243 243 return 0
244 244
245 245 return 1
246 246
247 247
248 248 def isFileInDateRange(filename, startDate=None, endDate=None):
249 249 """
250 250 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
251 251
252 252 Inputs:
253 253 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
254 254
255 255 Su formato deberia ser "?YYYYDDDsss"
256 256
257 257 siendo:
258 258 YYYY : Anio (ejemplo 2015)
259 259 DDD : Dia del anio (ejemplo 305)
260 260 sss : set
261 261
262 262 startDate : fecha inicial del rango seleccionado en formato datetime.date
263 263
264 264 endDate : fecha final del rango seleccionado en formato datetime.date
265 265
266 266 Return:
267 267 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
268 268 fecha especificado, de lo contrario retorna False.
269 269 Excepciones:
270 270 Si el archivo no tiene el formato adecuado
271 271 """
272 272
273 273 basename = os.path.basename(filename)
274 274
275 275 if not isRadarFile(basename):
276 276 print("The filename %s has not the rigth format" % filename)
277 277 return 0
278 278
279 279 if startDate and endDate:
280 280 thisDate = getDateFromRadarFile(basename)
281 281
282 282 if thisDate < startDate:
283 283 return 0
284 284
285 285 if thisDate > endDate:
286 286 return 0
287 287
288 288 return 1
289 289
290 290
291 291 def getFileFromSet(path, ext, set):
292 292 validFilelist = []
293 293 fileList = os.listdir(path)
294 294
295 295 # 0 1234 567 89A BCDE
296 296 # H YYYY DDD SSS .ext
297 297
298 298 for thisFile in fileList:
299 299 try:
300 300 year = int(thisFile[1:5])
301 301 doy = int(thisFile[5:8])
302 302 except:
303 303 continue
304 304
305 305 if (os.path.splitext(thisFile)[-1].lower() != ext.lower()):
306 306 continue
307 307
308 308 validFilelist.append(thisFile)
309 309
310 310 myfile = fnmatch.filter(
311 311 validFilelist, '*%4.4d%3.3d%3.3d*' % (year, doy, set))
312 312
313 313 if len(myfile) != 0:
314 314 return myfile[0]
315 315 else:
316 316 filename = '*%4.4d%3.3d%3.3d%s' % (year, doy, set, ext.lower())
317 317 print('the filename %s does not exist' % filename)
318 318 print('...going to the last file: ')
319 319
320 320 if validFilelist:
321 321 validFilelist = sorted(validFilelist, key=str.lower)
322 322 return validFilelist[-1]
323 323
324 324 return None
325 325
326 326
327 327 def getlastFileFromPath(path, ext):
328 328 """
329 329 Depura el fileList dejando solo los que cumplan el formato de "PYYYYDDDSSS.ext"
330 330 al final de la depuracion devuelve el ultimo file de la lista que quedo.
331 331
332 332 Input:
333 333 fileList : lista conteniendo todos los files (sin path) que componen una determinada carpeta
334 334 ext : extension de los files contenidos en una carpeta
335 335
336 336 Return:
337 337 El ultimo file de una determinada carpeta, no se considera el path.
338 338 """
339 339 validFilelist = []
340 340 fileList = os.listdir(path)
341 341
342 342 # 0 1234 567 89A BCDE
343 343 # H YYYY DDD SSS .ext
344 344
345 345 for thisFile in fileList:
346 346
347 347 year = thisFile[1:5]
348 348 if not isNumber(year):
349 349 continue
350 350
351 351 doy = thisFile[5:8]
352 352 if not isNumber(doy):
353 353 continue
354 354
355 355 year = int(year)
356 356 doy = int(doy)
357 357
358 358 if (os.path.splitext(thisFile)[-1].lower() != ext.lower()):
359 359 continue
360 360
361 361 validFilelist.append(thisFile)
362 362
363 363 if validFilelist:
364 364 validFilelist = sorted(validFilelist, key=str.lower)
365 365 return validFilelist[-1]
366 366
367 367 return None
368 368
369 369
370 370 def checkForRealPath(path, foldercounter, year, doy, set, ext):
371 371 """
372 372 Por ser Linux Case Sensitive entonces checkForRealPath encuentra el nombre correcto de un path,
373 373 Prueba por varias combinaciones de nombres entre mayusculas y minusculas para determinar
374 374 el path exacto de un determinado file.
375 375
376 376 Example :
377 377 nombre correcto del file es .../.../D2009307/P2009307367.ext
378 378
379 379 Entonces la funcion prueba con las siguientes combinaciones
380 380 .../.../y2009307367.ext
381 381 .../.../Y2009307367.ext
382 382 .../.../x2009307/y2009307367.ext
383 383 .../.../x2009307/Y2009307367.ext
384 384 .../.../X2009307/y2009307367.ext
385 385 .../.../X2009307/Y2009307367.ext
386 386 siendo para este caso, la ultima combinacion de letras, identica al file buscado
387 387
388 388 Return:
389 389 Si encuentra la cobinacion adecuada devuelve el path completo y el nombre del file
390 390 caso contrario devuelve None como path y el la ultima combinacion de nombre en mayusculas
391 391 para el filename
392 392 """
393 393 fullfilename = None
394 394 find_flag = False
395 395 filename = None
396 396
397 397 prefixDirList = [None, 'd', 'D']
398 398 if ext.lower() == ".r": # voltage
399 399 prefixFileList = ['d', 'D']
400 400 elif ext.lower() == ".pdata": # spectra
401 401 prefixFileList = ['p', 'P']
402 402 else:
403 403 return None, filename
404 404
405 405 # barrido por las combinaciones posibles
406 406 for prefixDir in prefixDirList:
407 407 thispath = path
408 408 if prefixDir != None:
409 409 # formo el nombre del directorio xYYYYDDD (x=d o x=D)
410 410 if foldercounter == 0:
411 411 thispath = os.path.join(path, "%s%04d%03d" %
412 412 (prefixDir, year, doy))
413 413 else:
414 414 thispath = os.path.join(path, "%s%04d%03d_%02d" % (
415 415 prefixDir, year, doy, foldercounter))
416 416 for prefixFile in prefixFileList: # barrido por las dos combinaciones posibles de "D"
417 417 # formo el nombre del file xYYYYDDDSSS.ext
418 418 filename = "%s%04d%03d%03d%s" % (prefixFile, year, doy, set, ext)
419 419 fullfilename = os.path.join(
420 420 thispath, filename) # formo el path completo
421 421
422 422 if os.path.exists(fullfilename): # verifico que exista
423 423 find_flag = True
424 424 break
425 425 if find_flag:
426 426 break
427 427
428 428 if not(find_flag):
429 429 return None, filename
430 430
431 431 return fullfilename, filename
432 432
433 433
434 434 def isRadarFolder(folder):
435 435 try:
436 436 year = int(folder[1:5])
437 437 doy = int(folder[5:8])
438 438 except:
439 439 return 0
440 440
441 441 return 1
442 442
443 443
444 444 def isRadarFile(file):
445 445 try:
446 446 year = int(file[1:5])
447 447 doy = int(file[5:8])
448 448 set = int(file[8:11])
449 449 except:
450 450 return 0
451 451
452 452 return 1
453 453
454 454
455 455 def getDateFromRadarFile(file):
456 456 try:
457 457 year = int(file[1:5])
458 458 doy = int(file[5:8])
459 459 set = int(file[8:11])
460 460 except:
461 461 return None
462 462
463 463 thisDate = datetime.date(year, 1, 1) + datetime.timedelta(doy - 1)
464 464 return thisDate
465 465
466 466
467 467 def getDateFromRadarFolder(folder):
468 468 try:
469 469 year = int(folder[1:5])
470 470 doy = int(folder[5:8])
471 471 except:
472 472 return None
473 473
474 474 thisDate = datetime.date(year, 1, 1) + datetime.timedelta(doy - 1)
475 475 return thisDate
476 476
477 477
478 478 class JRODataIO:
479 479
480 480 c = 3E8
481 481
482 482 isConfig = False
483 483
484 484 basicHeaderObj = None
485 485
486 486 systemHeaderObj = None
487 487
488 488 radarControllerHeaderObj = None
489 489
490 490 processingHeaderObj = None
491 491
492 492 dtype = None
493 493
494 494 pathList = []
495 495
496 496 filenameList = []
497 497
498 498 filename = None
499 499
500 500 ext = None
501 501
502 502 flagIsNewFile = 1
503 503
504 504 flagDiscontinuousBlock = 0
505 505
506 506 flagIsNewBlock = 0
507 507
508 508 fp = None
509 509
510 510 firstHeaderSize = 0
511 511
512 512 basicHeaderSize = 24
513 513
514 514 versionFile = 1103
515 515
516 516 fileSize = None
517 517
518 518 # ippSeconds = None
519 519
520 520 fileSizeByHeader = None
521 521
522 522 fileIndex = None
523 523
524 524 profileIndex = None
525 525
526 526 blockIndex = None
527 527
528 528 nTotalBlocks = None
529 529
530 530 maxTimeStep = 30
531 531
532 532 lastUTTime = None
533 533
534 534 datablock = None
535 535
536 536 dataOut = None
537 537
538 538 blocksize = None
539 539
540 540 getByBlock = False
541 541
542 542 def __init__(self):
543 543
544 544 raise NotImplementedError
545 545
546 546 def run(self):
547 547
548 548 raise NotImplementedError
549 549
550 550 def getDtypeWidth(self):
551 551
552 552 dtype_index = get_dtype_index(self.dtype)
553 553 dtype_width = get_dtype_width(dtype_index)
554 554
555 555 return dtype_width
556 556
557 557 def getAllowedArgs(self):
558 558 if hasattr(self, '__attrs__'):
559 559 return self.__attrs__
560 560 else:
561 561 return inspect.getargspec(self.run).args
562 562
563 563
564 564 class JRODataReader(JRODataIO):
565 565
566 566 online = 0
567 567
568 568 realtime = 0
569 569
570 570 nReadBlocks = 0
571 571
572 572 delay = 10 # number of seconds waiting a new file
573 573
574 574 nTries = 3 # quantity tries
575 575
576 576 nFiles = 3 # number of files for searching
577 577
578 578 path = None
579 579
580 580 foldercounter = 0
581 581
582 582 flagNoMoreFiles = 0
583 583
584 584 datetimeList = []
585 585
586 586 __isFirstTimeOnline = 1
587 587
588 588 __printInfo = True
589 589
590 590 profileIndex = None
591 591
592 592 nTxs = 1
593 593
594 594 txIndex = None
595 595
596 596 # Added--------------------
597 597
598 598 selBlocksize = None
599 599
600 600 selBlocktime = None
601 601
602 602 def __init__(self):
603 603 """
604 604 This class is used to find data files
605 605
606 606 Example:
607 607 reader = JRODataReader()
608 608 fileList = reader.findDataFiles()
609 609
610 610 """
611 611 pass
612 612
613 613 def createObjByDefault(self):
614 614 """
615 615
616 616 """
617 617 raise NotImplementedError
618 618
619 619 def getBlockDimension(self):
620 620
621 621 raise NotImplementedError
622 622
623 623 def searchFilesOffLine(self,
624 624 path,
625 625 startDate=None,
626 626 endDate=None,
627 627 startTime=datetime.time(0, 0, 0),
628 628 endTime=datetime.time(23, 59, 59),
629 629 set=None,
630 630 expLabel='',
631 631 ext='.r',
632 632 cursor=None,
633 633 skip=None,
634 634 walk=True):
635 635
636 636 self.filenameList = []
637 637 self.datetimeList = []
638 638
639 639 pathList = []
640 640
641 641 dateList, pathList = self.findDatafiles(
642 642 path, startDate, endDate, expLabel, ext, walk, include_path=True)
643 643
644 644 if dateList == []:
645 645 return [], []
646 646
647 647 if len(dateList) > 1:
648 648 print("[Reading] Data found for date range [%s - %s]: total days = %d" % (startDate, endDate, len(dateList)))
649 649 else:
650 650 print("[Reading] Data found for date range [%s - %s]: date = %s" % (startDate, endDate, dateList[0]))
651 651
652 652 filenameList = []
653 653 datetimeList = []
654 654
655 655 for thisPath in pathList:
656 656
657 657 fileList = glob.glob1(thisPath, "*%s" % ext)
658 658 fileList.sort()
659 659
660 660 for file in fileList:
661 661
662 662 filename = os.path.join(thisPath, file)
663 663
664 664 if not isFileInDateRange(filename, startDate, endDate):
665 665 continue
666 666
667 667 thisDatetime = isFileInTimeRange(
668 668 filename, startDate, endDate, startTime, endTime)
669 669
670 670 if not(thisDatetime):
671 671 continue
672 672
673 673 filenameList.append(filename)
674 674 datetimeList.append(thisDatetime)
675 675
676 676 if cursor is not None and skip is not None:
677 677 filenameList = filenameList[cursor * skip:cursor * skip + skip]
678 678 datetimeList = datetimeList[cursor * skip:cursor * skip + skip]
679 679
680 680 if not(filenameList):
681 681 print("[Reading] Time range selected invalid [%s - %s]: No *%s files in %s)" % (startTime, endTime, ext, path))
682 682 return [], []
683 683
684 684 print("[Reading] %d file(s) was(were) found in time range: %s - %s" % (len(filenameList), startTime, endTime))
685 685
686 686 # for i in range(len(filenameList)):
687 687 # print "[Reading] %s -> [%s]" %(filenameList[i], datetimeList[i].ctime())
688 688
689 689 self.filenameList = filenameList
690 690 self.datetimeList = datetimeList
691 691
692 692 return pathList, filenameList
693 693
694 694 def __searchFilesOnLine(self, path, expLabel="", ext=None, walk=True, set=None):
695 695 """
696 696 Busca el ultimo archivo de la ultima carpeta (determinada o no por startDateTime) y
697 697 devuelve el archivo encontrado ademas de otros datos.
698 698
699 699 Input:
700 700 path : carpeta donde estan contenidos los files que contiene data
701 701
702 702 expLabel : Nombre del subexperimento (subfolder)
703 703
704 704 ext : extension de los files
705 705
706 706 walk : Si es habilitado no realiza busquedas dentro de los ubdirectorios (doypath)
707 707
708 708 Return:
709 709 directory : eL directorio donde esta el file encontrado
710 710 filename : el ultimo file de una determinada carpeta
711 711 year : el anho
712 712 doy : el numero de dia del anho
713 713 set : el set del archivo
714 714
715 715
716 716 """
717 717 if not os.path.isdir(path):
718 718 return None, None, None, None, None, None
719 719
720 720 dirList = []
721 721
722 722 if not walk:
723 723 fullpath = path
724 724 foldercounter = 0
725 725 else:
726 726 # Filtra solo los directorios
727 727 for thisPath in os.listdir(path):
728 728 if not os.path.isdir(os.path.join(path, thisPath)):
729 729 continue
730 730 if not isRadarFolder(thisPath):
731 731 continue
732 732
733 733 dirList.append(thisPath)
734 734
735 735 if not(dirList):
736 736 return None, None, None, None, None, None
737 737
738 738 dirList = sorted(dirList, key=str.lower)
739 739
740 740 doypath = dirList[-1]
741 741 foldercounter = int(doypath.split('_')[1]) if len(
742 742 doypath.split('_')) > 1 else 0
743 743 fullpath = os.path.join(path, doypath, expLabel)
744 744
745 745 print("[Reading] %s folder was found: " % (fullpath))
746 746
747 747 if set == None:
748 748 filename = getlastFileFromPath(fullpath, ext)
749 749 else:
750 750 filename = getFileFromSet(fullpath, ext, set)
751 751
752 752 if not(filename):
753 753 return None, None, None, None, None, None
754 754
755 755 print("[Reading] %s file was found" % (filename))
756 756
757 757 if not(self.__verifyFile(os.path.join(fullpath, filename))):
758 758 return None, None, None, None, None, None
759 759
760 760 year = int(filename[1:5])
761 761 doy = int(filename[5:8])
762 762 set = int(filename[8:11])
763 763
764 764 return fullpath, foldercounter, filename, year, doy, set
765 765
766 766 def __setNextFileOffline(self):
767 767
768 768 idFile = self.fileIndex
769 769
770 770 while (True):
771 771 idFile += 1
772 772 if not(idFile < len(self.filenameList)):
773 773 self.flagNoMoreFiles = 1
774 774 # print "[Reading] No more Files"
775 775 return 0
776 776
777 777 filename = self.filenameList[idFile]
778 778
779 779 if not(self.__verifyFile(filename)):
780 780 continue
781 781
782 782 fileSize = os.path.getsize(filename)
783 783 fp = open(filename, 'rb')
784 784 break
785 785
786 786 self.flagIsNewFile = 1
787 787 self.fileIndex = idFile
788 788 self.filename = filename
789 789 self.fileSize = fileSize
790 790 self.fp = fp
791 791
792 792 # print "[Reading] Setting the file: %s"%self.filename
793 793
794 794 return 1
795 795
796 796 def __setNextFileOnline(self):
797 797 """
798 798 Busca el siguiente file que tenga suficiente data para ser leida, dentro de un folder especifico, si
799 799 no encuentra un file valido espera un tiempo determinado y luego busca en los posibles n files
800 800 siguientes.
801 801
802 802 Affected:
803 803 self.flagIsNewFile
804 804 self.filename
805 805 self.fileSize
806 806 self.fp
807 807 self.set
808 808 self.flagNoMoreFiles
809 809
810 810 Return:
811 811 0 : si luego de una busqueda del siguiente file valido este no pudo ser encontrado
812 812 1 : si el file fue abierto con exito y esta listo a ser leido
813 813
814 814 Excepciones:
815 815 Si un determinado file no puede ser abierto
816 816 """
817 817 nFiles = 0
818 818 fileOk_flag = False
819 819 firstTime_flag = True
820 820
821 821 self.set += 1
822 822
823 823 if self.set > 999:
824 824 self.set = 0
825 825 self.foldercounter += 1
826 826
827 827 # busca el 1er file disponible
828 828 fullfilename, filename = checkForRealPath(
829 829 self.path, self.foldercounter, self.year, self.doy, self.set, self.ext)
830 830 if fullfilename:
831 831 if self.__verifyFile(fullfilename, False):
832 832 fileOk_flag = True
833 833
834 834 # si no encuentra un file entonces espera y vuelve a buscar
835 835 if not(fileOk_flag):
836 836 # busco en los siguientes self.nFiles+1 files posibles
837 837 for nFiles in range(self.nFiles + 1):
838 838
839 839 if firstTime_flag: # si es la 1era vez entonces hace el for self.nTries veces
840 840 tries = self.nTries
841 841 else:
842 842 tries = 1 # si no es la 1era vez entonces solo lo hace una vez
843 843
844 844 for nTries in range(tries):
845 845 if firstTime_flag:
846 846 print("\t[Reading] Waiting %0.2f sec for the next file: \"%s\" , try %03d ..." % (self.delay, filename, nTries + 1))
847 847 sleep(self.delay)
848 848 else:
849 849 print("\t[Reading] Searching the next \"%s%04d%03d%03d%s\" file ..." % (self.optchar, self.year, self.doy, self.set, self.ext))
850 850
851 851 fullfilename, filename = checkForRealPath(
852 852 self.path, self.foldercounter, self.year, self.doy, self.set, self.ext)
853 853 if fullfilename:
854 854 if self.__verifyFile(fullfilename):
855 855 fileOk_flag = True
856 856 break
857 857
858 858 if fileOk_flag:
859 859 break
860 860
861 861 firstTime_flag = False
862 862
863 863 log.warning('Skipping the file {} due to this file doesn\'t exist'.format(filename))
864 864 self.set += 1
865 865
866 866 # si no encuentro el file buscado cambio de carpeta y busco en la siguiente carpeta
867 867 if nFiles == (self.nFiles - 1):
868 868 self.set = 0
869 869 self.doy += 1
870 870 self.foldercounter = 0
871 871
872 872 if fileOk_flag:
873 873 self.fileSize = os.path.getsize(fullfilename)
874 874 self.filename = fullfilename
875 875 self.flagIsNewFile = 1
876 876 if self.fp != None:
877 877 self.fp.close()
878 878 self.fp = open(fullfilename, 'rb')
879 879 self.flagNoMoreFiles = 0
880 880 # print '[Reading] Setting the file: %s' % fullfilename
881 881 else:
882 882 self.fileSize = 0
883 883 self.filename = None
884 884 self.flagIsNewFile = 0
885 885 self.fp = None
886 886 self.flagNoMoreFiles = 1
887 887 # print '[Reading] No more files to read'
888 888
889 889 return fileOk_flag
890 890
891 891 def setNextFile(self):
892 892 if self.fp != None:
893 893 self.fp.close()
894 894
895 895 if self.online:
896 896 newFile = self.__setNextFileOnline()
897 897 else:
898 898 newFile = self.__setNextFileOffline()
899 899
900 900 if not(newFile):
901 self.dataOut.error = (-1, 'No more files to read')
901 self.dataOut.error = 'No more files to read'
902 902 return 0
903 903
904 904 if self.verbose:
905 905 print('[Reading] Setting the file: %s' % self.filename)
906 906
907 907 self.__readFirstHeader()
908 908 self.nReadBlocks = 0
909 909 return 1
910 910
911 911 def __waitNewBlock(self):
912 912 """
913 913 Return 1 si se encontro un nuevo bloque de datos, 0 de otra forma.
914 914
915 915 Si el modo de lectura es OffLine siempre retorn 0
916 916 """
917 917 if not self.online:
918 918 return 0
919 919
920 920 if (self.nReadBlocks >= self.processingHeaderObj.dataBlocksPerFile):
921 921 return 0
922 922
923 923 currentPointer = self.fp.tell()
924 924
925 925 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
926 926
927 927 for nTries in range(self.nTries):
928 928
929 929 self.fp.close()
930 930 self.fp = open(self.filename, 'rb')
931 931 self.fp.seek(currentPointer)
932 932
933 933 self.fileSize = os.path.getsize(self.filename)
934 934 currentSize = self.fileSize - currentPointer
935 935
936 936 if (currentSize >= neededSize):
937 937 self.basicHeaderObj.read(self.fp)
938 938 return 1
939 939
940 940 if self.fileSize == self.fileSizeByHeader:
941 941 # self.flagEoF = True
942 942 return 0
943 943
944 944 print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1))
945 945 sleep(self.delay)
946 946
947 947 return 0
948 948
949 949 def waitDataBlock(self, pointer_location):
950 950
951 951 currentPointer = pointer_location
952 952
953 953 neededSize = self.processingHeaderObj.blockSize # + self.basicHeaderSize
954 954
955 955 for nTries in range(self.nTries):
956 956 self.fp.close()
957 957 self.fp = open(self.filename, 'rb')
958 958 self.fp.seek(currentPointer)
959 959
960 960 self.fileSize = os.path.getsize(self.filename)
961 961 currentSize = self.fileSize - currentPointer
962 962
963 963 if (currentSize >= neededSize):
964 964 return 1
965 965
966 966 print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1))
967 967 sleep(self.delay)
968 968
969 969 return 0
970 970
971 971 def __jumpToLastBlock(self):
972 972
973 973 if not(self.__isFirstTimeOnline):
974 974 return
975 975
976 976 csize = self.fileSize - self.fp.tell()
977 977 blocksize = self.processingHeaderObj.blockSize
978 978
979 979 # salta el primer bloque de datos
980 980 if csize > self.processingHeaderObj.blockSize:
981 981 self.fp.seek(self.fp.tell() + blocksize)
982 982 else:
983 983 return
984 984
985 985 csize = self.fileSize - self.fp.tell()
986 986 neededsize = self.processingHeaderObj.blockSize + self.basicHeaderSize
987 987 while True:
988 988
989 989 if self.fp.tell() < self.fileSize:
990 990 self.fp.seek(self.fp.tell() + neededsize)
991 991 else:
992 992 self.fp.seek(self.fp.tell() - neededsize)
993 993 break
994 994
995 995 # csize = self.fileSize - self.fp.tell()
996 996 # neededsize = self.processingHeaderObj.blockSize + self.basicHeaderSize
997 997 # factor = int(csize/neededsize)
998 998 # if factor > 0:
999 999 # self.fp.seek(self.fp.tell() + factor*neededsize)
1000 1000
1001 1001 self.flagIsNewFile = 0
1002 1002 self.__isFirstTimeOnline = 0
1003 1003
1004 1004 def __setNewBlock(self):
1005 1005 # if self.server is None:
1006 1006 if self.fp == None:
1007 1007 return 0
1008 1008
1009 1009 # if self.online:
1010 1010 # self.__jumpToLastBlock()
1011 1011
1012 1012 if self.flagIsNewFile:
1013 1013 self.lastUTTime = self.basicHeaderObj.utc
1014 1014 return 1
1015 1015
1016 1016 if self.realtime:
1017 1017 self.flagDiscontinuousBlock = 1
1018 1018 if not(self.setNextFile()):
1019 1019 return 0
1020 1020 else:
1021 1021 return 1
1022 1022 # if self.server is None:
1023 1023 currentSize = self.fileSize - self.fp.tell()
1024 1024 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
1025 1025 if (currentSize >= neededSize):
1026 1026 self.basicHeaderObj.read(self.fp)
1027 1027 self.lastUTTime = self.basicHeaderObj.utc
1028 1028 return 1
1029 1029 # else:
1030 1030 # self.basicHeaderObj.read(self.zHeader)
1031 1031 # self.lastUTTime = self.basicHeaderObj.utc
1032 1032 # return 1
1033 1033 if self.__waitNewBlock():
1034 1034 self.lastUTTime = self.basicHeaderObj.utc
1035 1035 return 1
1036 1036 # if self.server is None:
1037 1037 if not(self.setNextFile()):
1038 1038 return 0
1039 1039
1040 1040 deltaTime = self.basicHeaderObj.utc - self.lastUTTime
1041 1041 self.lastUTTime = self.basicHeaderObj.utc
1042 1042
1043 1043 self.flagDiscontinuousBlock = 0
1044 1044
1045 1045 if deltaTime > self.maxTimeStep:
1046 1046 self.flagDiscontinuousBlock = 1
1047 1047
1048 1048 return 1
1049 1049
1050 1050 def readNextBlock(self):
1051 1051
1052 1052 # Skip block out of startTime and endTime
1053 1053 while True:
1054 1054 if not(self.__setNewBlock()):
1055 self.dataOut.error = (-1, 'No more files to read')
1055 self.dataOut.error = 'No more files to read'
1056 1056 return 0
1057 1057
1058 1058 if not(self.readBlock()):
1059 1059 return 0
1060 1060
1061 1061 self.getBasicHeader()
1062 1062 if (self.dataOut.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or (self.dataOut.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
1063 1063 print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.nReadBlocks,
1064 1064 self.processingHeaderObj.dataBlocksPerFile,
1065 1065 self.dataOut.datatime.ctime()))
1066 1066 continue
1067 1067
1068 1068 break
1069 1069
1070 1070 if self.verbose:
1071 1071 print("[Reading] Block No. %d/%d -> %s" % (self.nReadBlocks,
1072 1072 self.processingHeaderObj.dataBlocksPerFile,
1073 1073 self.dataOut.datatime.ctime()))
1074 1074 return 1
1075 1075
1076 1076 def __readFirstHeader(self):
1077 1077
1078 1078 self.basicHeaderObj.read(self.fp)
1079 1079 self.systemHeaderObj.read(self.fp)
1080 1080 self.radarControllerHeaderObj.read(self.fp)
1081 1081 self.processingHeaderObj.read(self.fp)
1082 1082
1083 1083 self.firstHeaderSize = self.basicHeaderObj.size
1084 1084
1085 1085 datatype = int(numpy.log2((self.processingHeaderObj.processFlags &
1086 1086 PROCFLAG.DATATYPE_MASK)) - numpy.log2(PROCFLAG.DATATYPE_CHAR))
1087 1087 if datatype == 0:
1088 1088 datatype_str = numpy.dtype([('real', '<i1'), ('imag', '<i1')])
1089 1089 elif datatype == 1:
1090 1090 datatype_str = numpy.dtype([('real', '<i2'), ('imag', '<i2')])
1091 1091 elif datatype == 2:
1092 1092 datatype_str = numpy.dtype([('real', '<i4'), ('imag', '<i4')])
1093 1093 elif datatype == 3:
1094 1094 datatype_str = numpy.dtype([('real', '<i8'), ('imag', '<i8')])
1095 1095 elif datatype == 4:
1096 1096 datatype_str = numpy.dtype([('real', '<f4'), ('imag', '<f4')])
1097 1097 elif datatype == 5:
1098 1098 datatype_str = numpy.dtype([('real', '<f8'), ('imag', '<f8')])
1099 1099 else:
1100 1100 raise ValueError('Data type was not defined')
1101 1101
1102 1102 self.dtype = datatype_str
1103 1103 #self.ippSeconds = 2 * 1000 * self.radarControllerHeaderObj.ipp / self.c
1104 1104 self.fileSizeByHeader = self.processingHeaderObj.dataBlocksPerFile * self.processingHeaderObj.blockSize + \
1105 1105 self.firstHeaderSize + self.basicHeaderSize * \
1106 1106 (self.processingHeaderObj.dataBlocksPerFile - 1)
1107 1107 # self.dataOut.channelList = numpy.arange(self.systemHeaderObj.numChannels)
1108 1108 # self.dataOut.channelIndexList = numpy.arange(self.systemHeaderObj.numChannels)
1109 1109 self.getBlockDimension()
1110 1110
1111 1111 def __verifyFile(self, filename, msgFlag=True):
1112 1112
1113 1113 msg = None
1114 1114
1115 1115 try:
1116 1116 fp = open(filename, 'rb')
1117 1117 except IOError:
1118 1118
1119 1119 if msgFlag:
1120 1120 print("[Reading] File %s can't be opened" % (filename))
1121 1121
1122 1122 return False
1123 1123
1124 1124 currentPosition = fp.tell()
1125 1125 neededSize = self.processingHeaderObj.blockSize + self.firstHeaderSize
1126 1126
1127 1127 if neededSize == 0:
1128 1128 basicHeaderObj = BasicHeader(LOCALTIME)
1129 1129 systemHeaderObj = SystemHeader()
1130 1130 radarControllerHeaderObj = RadarControllerHeader()
1131 1131 processingHeaderObj = ProcessingHeader()
1132 1132
1133 1133 if not(basicHeaderObj.read(fp)):
1134 1134 fp.close()
1135 1135 return False
1136 1136
1137 1137 if not(systemHeaderObj.read(fp)):
1138 1138 fp.close()
1139 1139 return False
1140 1140
1141 1141 if not(radarControllerHeaderObj.read(fp)):
1142 1142 fp.close()
1143 1143 return False
1144 1144
1145 1145 if not(processingHeaderObj.read(fp)):
1146 1146 fp.close()
1147 1147 return False
1148 1148
1149 1149 neededSize = processingHeaderObj.blockSize + basicHeaderObj.size
1150 1150 else:
1151 1151 msg = "[Reading] Skipping the file %s due to it hasn't enough data" % filename
1152 1152
1153 1153 fp.close()
1154 1154
1155 1155 fileSize = os.path.getsize(filename)
1156 1156 currentSize = fileSize - currentPosition
1157 1157
1158 1158 if currentSize < neededSize:
1159 1159 if msgFlag and (msg != None):
1160 1160 print(msg)
1161 1161 return False
1162 1162
1163 1163 return True
1164 1164
1165 1165 def findDatafiles(self, path, startDate=None, endDate=None, expLabel='', ext='.r', walk=True, include_path=False):
1166 1166
1167 1167 path_empty = True
1168 1168
1169 1169 dateList = []
1170 1170 pathList = []
1171 1171
1172 1172 multi_path = path.split(',')
1173 1173
1174 1174 if not walk:
1175 1175
1176 1176 for single_path in multi_path:
1177 1177
1178 1178 if not os.path.isdir(single_path):
1179 1179 continue
1180 1180
1181 1181 fileList = glob.glob1(single_path, "*" + ext)
1182 1182
1183 1183 if not fileList:
1184 1184 continue
1185 1185
1186 1186 path_empty = False
1187 1187
1188 1188 fileList.sort()
1189 1189
1190 1190 for thisFile in fileList:
1191 1191
1192 1192 if not os.path.isfile(os.path.join(single_path, thisFile)):
1193 1193 continue
1194 1194
1195 1195 if not isRadarFile(thisFile):
1196 1196 continue
1197 1197
1198 1198 if not isFileInDateRange(thisFile, startDate, endDate):
1199 1199 continue
1200 1200
1201 1201 thisDate = getDateFromRadarFile(thisFile)
1202 1202
1203 1203 if thisDate in dateList:
1204 1204 continue
1205 1205
1206 1206 dateList.append(thisDate)
1207 1207 pathList.append(single_path)
1208 1208
1209 1209 else:
1210 1210 for single_path in multi_path:
1211 1211
1212 1212 if not os.path.isdir(single_path):
1213 1213 continue
1214 1214
1215 1215 dirList = []
1216 1216
1217 1217 for thisPath in os.listdir(single_path):
1218 1218
1219 1219 if not os.path.isdir(os.path.join(single_path, thisPath)):
1220 1220 continue
1221 1221
1222 1222 if not isRadarFolder(thisPath):
1223 1223 continue
1224 1224
1225 1225 if not isFolderInDateRange(thisPath, startDate, endDate):
1226 1226 continue
1227 1227
1228 1228 dirList.append(thisPath)
1229 1229
1230 1230 if not dirList:
1231 1231 continue
1232 1232
1233 1233 dirList.sort()
1234 1234
1235 1235 for thisDir in dirList:
1236 1236
1237 1237 datapath = os.path.join(single_path, thisDir, expLabel)
1238 1238 fileList = glob.glob1(datapath, "*" + ext)
1239 1239
1240 1240 if not fileList:
1241 1241 continue
1242 1242
1243 1243 path_empty = False
1244 1244
1245 1245 thisDate = getDateFromRadarFolder(thisDir)
1246 1246
1247 1247 pathList.append(datapath)
1248 1248 dateList.append(thisDate)
1249 1249
1250 1250 dateList.sort()
1251 1251
1252 1252 if walk:
1253 1253 pattern_path = os.path.join(multi_path[0], "[dYYYYDDD]", expLabel)
1254 1254 else:
1255 1255 pattern_path = multi_path[0]
1256 1256
1257 1257 if path_empty:
1258 1258 print("[Reading] No *%s files in %s for %s to %s" % (ext, pattern_path, startDate, endDate))
1259 1259 else:
1260 1260 if not dateList:
1261 1261 print("[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" % (startDate, endDate, ext, path))
1262 1262
1263 1263 if include_path:
1264 1264 return dateList, pathList
1265 1265
1266 1266 return dateList
1267 1267
1268 1268 def setup(self,
1269 1269 path=None,
1270 1270 startDate=None,
1271 1271 endDate=None,
1272 1272 startTime=datetime.time(0, 0, 0),
1273 1273 endTime=datetime.time(23, 59, 59),
1274 1274 set=None,
1275 1275 expLabel="",
1276 1276 ext=None,
1277 1277 online=False,
1278 1278 delay=60,
1279 1279 walk=True,
1280 1280 getblock=False,
1281 1281 nTxs=1,
1282 1282 realtime=False,
1283 1283 blocksize=None,
1284 1284 blocktime=None,
1285 1285 skip=None,
1286 1286 cursor=None,
1287 1287 warnings=True,
1288 1288 verbose=True,
1289 1289 server=None,
1290 1290 format=None,
1291 1291 oneDDict=None,
1292 1292 twoDDict=None,
1293 1293 ind2DList=None):
1294 1294 if server is not None:
1295 1295 if 'tcp://' in server:
1296 1296 address = server
1297 1297 else:
1298 1298 address = 'ipc:///tmp/%s' % server
1299 1299 self.server = address
1300 1300 self.context = zmq.Context()
1301 1301 self.receiver = self.context.socket(zmq.PULL)
1302 1302 self.receiver.connect(self.server)
1303 1303 time.sleep(0.5)
1304 1304 print('[Starting] ReceiverData from {}'.format(self.server))
1305 1305 else:
1306 1306 self.server = None
1307 1307 if path == None:
1308 1308 raise ValueError("[Reading] The path is not valid")
1309 1309
1310 1310 if ext == None:
1311 1311 ext = self.ext
1312 1312
1313 1313 if online:
1314 1314 print("[Reading] Searching files in online mode...")
1315 1315
1316 1316 for nTries in range(self.nTries):
1317 1317 fullpath, foldercounter, file, year, doy, set = self.__searchFilesOnLine(
1318 1318 path=path, expLabel=expLabel, ext=ext, walk=walk, set=set)
1319 1319
1320 1320 if fullpath:
1321 1321 break
1322 1322
1323 1323 print('[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries + 1))
1324 1324 sleep(self.delay)
1325 1325
1326 1326 if not(fullpath):
1327 self.dataOut.error = (-1, 'There isn\'t any valid file in {}'.format(path))
1327 self.dataOut.error = 'There isn\'t any valid file in {}'.format(path)
1328 1328 return
1329 1329
1330 1330 self.year = year
1331 1331 self.doy = doy
1332 1332 self.set = set - 1
1333 1333 self.path = path
1334 1334 self.foldercounter = foldercounter
1335 1335 last_set = None
1336 1336 else:
1337 1337 print("[Reading] Searching files in offline mode ...")
1338 1338 pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate,
1339 1339 startTime=startTime, endTime=endTime,
1340 1340 set=set, expLabel=expLabel, ext=ext,
1341 1341 walk=walk, cursor=cursor,
1342 1342 skip=skip)
1343 1343
1344 1344 if not(pathList):
1345 1345 self.fileIndex = -1
1346 1346 self.pathList = []
1347 1347 self.filenameList = []
1348 1348 return
1349 1349
1350 1350 self.fileIndex = -1
1351 1351 self.pathList = pathList
1352 1352 self.filenameList = filenameList
1353 1353 file_name = os.path.basename(filenameList[-1])
1354 1354 basename, ext = os.path.splitext(file_name)
1355 1355 last_set = int(basename[-3:])
1356 1356
1357 1357 self.online = online
1358 1358 self.realtime = realtime
1359 1359 self.delay = delay
1360 1360 ext = ext.lower()
1361 1361 self.ext = ext
1362 1362 self.getByBlock = getblock
1363 1363 self.nTxs = nTxs
1364 1364 self.startTime = startTime
1365 1365 self.endTime = endTime
1366 1366 self.endDate = endDate
1367 1367 self.startDate = startDate
1368 1368 # Added-----------------
1369 1369 self.selBlocksize = blocksize
1370 1370 self.selBlocktime = blocktime
1371 1371
1372 1372 # Verbose-----------
1373 1373 self.verbose = verbose
1374 1374 self.warnings = warnings
1375 1375
1376 1376 if not(self.setNextFile()):
1377 1377 if (startDate != None) and (endDate != None):
1378 1378 print("[Reading] No files in range: %s - %s" % (datetime.datetime.combine(startDate, startTime).ctime(), datetime.datetime.combine(endDate, endTime).ctime()))
1379 1379 elif startDate != None:
1380 1380 print("[Reading] No files in range: %s" % (datetime.datetime.combine(startDate, startTime).ctime()))
1381 1381 else:
1382 1382 print("[Reading] No files")
1383 1383
1384 1384 self.fileIndex = -1
1385 1385 self.pathList = []
1386 1386 self.filenameList = []
1387 1387 return
1388 1388
1389 1389 # self.getBasicHeader()
1390 1390
1391 1391 if last_set != None:
1392 1392 self.dataOut.last_block = last_set * \
1393 1393 self.processingHeaderObj.dataBlocksPerFile + self.basicHeaderObj.dataBlock
1394 1394 return
1395 1395
1396 1396 def getBasicHeader(self):
1397 1397
1398 1398 self.dataOut.utctime = self.basicHeaderObj.utc + self.basicHeaderObj.miliSecond / \
1399 1399 1000. + self.profileIndex * self.radarControllerHeaderObj.ippSeconds
1400 1400
1401 1401 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
1402 1402
1403 1403 self.dataOut.timeZone = self.basicHeaderObj.timeZone
1404 1404
1405 1405 self.dataOut.dstFlag = self.basicHeaderObj.dstFlag
1406 1406
1407 1407 self.dataOut.errorCount = self.basicHeaderObj.errorCount
1408 1408
1409 1409 self.dataOut.useLocalTime = self.basicHeaderObj.useLocalTime
1410 1410
1411 1411 self.dataOut.ippSeconds = self.radarControllerHeaderObj.ippSeconds / self.nTxs
1412 1412
1413 1413 # self.dataOut.nProfiles = self.processingHeaderObj.profilesPerBlock*self.nTxs
1414 1414
1415 1415 def getFirstHeader(self):
1416 1416
1417 1417 raise NotImplementedError
1418 1418
1419 1419 def getData(self):
1420 1420
1421 1421 raise NotImplementedError
1422 1422
1423 1423 def hasNotDataInBuffer(self):
1424 1424
1425 1425 raise NotImplementedError
1426 1426
1427 1427 def readBlock(self):
1428 1428
1429 1429 raise NotImplementedError
1430 1430
1431 1431 def isEndProcess(self):
1432 1432
1433 1433 return self.flagNoMoreFiles
1434 1434
1435 1435 def printReadBlocks(self):
1436 1436
1437 1437 print("[Reading] Number of read blocks per file %04d" % self.nReadBlocks)
1438 1438
1439 1439 def printTotalBlocks(self):
1440 1440
1441 1441 print("[Reading] Number of read blocks %04d" % self.nTotalBlocks)
1442 1442
1443 1443 def printNumberOfBlock(self):
1444 1444 'SPAM!'
1445 1445
1446 1446 # if self.flagIsNewBlock:
1447 1447 # print "[Reading] Block No. %d/%d -> %s" %(self.nReadBlocks,
1448 1448 # self.processingHeaderObj.dataBlocksPerFile,
1449 1449 # self.dataOut.datatime.ctime())
1450 1450
1451 1451 def printInfo(self):
1452 1452
1453 1453 if self.__printInfo == False:
1454 1454 return
1455 1455
1456 1456 self.basicHeaderObj.printInfo()
1457 1457 self.systemHeaderObj.printInfo()
1458 1458 self.radarControllerHeaderObj.printInfo()
1459 1459 self.processingHeaderObj.printInfo()
1460 1460
1461 1461 self.__printInfo = False
1462 1462
1463 1463 def run(self,
1464 1464 path=None,
1465 1465 startDate=None,
1466 1466 endDate=None,
1467 1467 startTime=datetime.time(0, 0, 0),
1468 1468 endTime=datetime.time(23, 59, 59),
1469 1469 set=None,
1470 1470 expLabel="",
1471 1471 ext=None,
1472 1472 online=False,
1473 1473 delay=60,
1474 1474 walk=True,
1475 1475 getblock=False,
1476 1476 nTxs=1,
1477 1477 realtime=False,
1478 1478 blocksize=None,
1479 1479 blocktime=None,
1480 1480 skip=None,
1481 1481 cursor=None,
1482 1482 warnings=True,
1483 1483 server=None,
1484 1484 verbose=True,
1485 1485 format=None,
1486 1486 oneDDict=None,
1487 1487 twoDDict=None,
1488 1488 ind2DList=None, **kwargs):
1489 1489
1490 1490 if not(self.isConfig):
1491 1491 self.setup(path=path,
1492 1492 startDate=startDate,
1493 1493 endDate=endDate,
1494 1494 startTime=startTime,
1495 1495 endTime=endTime,
1496 1496 set=set,
1497 1497 expLabel=expLabel,
1498 1498 ext=ext,
1499 1499 online=online,
1500 1500 delay=delay,
1501 1501 walk=walk,
1502 1502 getblock=getblock,
1503 1503 nTxs=nTxs,
1504 1504 realtime=realtime,
1505 1505 blocksize=blocksize,
1506 1506 blocktime=blocktime,
1507 1507 skip=skip,
1508 1508 cursor=cursor,
1509 1509 warnings=warnings,
1510 1510 server=server,
1511 1511 verbose=verbose,
1512 1512 format=format,
1513 1513 oneDDict=oneDDict,
1514 1514 twoDDict=twoDDict,
1515 1515 ind2DList=ind2DList)
1516 1516 self.isConfig = True
1517 1517 if server is None:
1518 1518 self.getData()
1519 1519 else:
1520 1520 self.getFromServer()
1521 1521
1522 1522
1523 1523 class JRODataWriter(JRODataIO):
1524 1524
1525 1525 """
1526 1526 Esta clase permite escribir datos a archivos procesados (.r o ,pdata). La escritura
1527 1527 de los datos siempre se realiza por bloques.
1528 1528 """
1529 1529
1530 1530 blockIndex = 0
1531 1531
1532 1532 path = None
1533 1533
1534 1534 setFile = None
1535 1535
1536 1536 profilesPerBlock = None
1537 1537
1538 1538 blocksPerFile = None
1539 1539
1540 1540 nWriteBlocks = 0
1541 1541
1542 1542 fileDate = None
1543 1543
1544 1544 def __init__(self, dataOut=None):
1545 1545 raise NotImplementedError
1546 1546
1547 1547 def hasAllDataInBuffer(self):
1548 1548 raise NotImplementedError
1549 1549
1550 1550 def setBlockDimension(self):
1551 1551 raise NotImplementedError
1552 1552
1553 1553 def writeBlock(self):
1554 1554 raise NotImplementedError
1555 1555
1556 1556 def putData(self):
1557 1557 raise NotImplementedError
1558 1558
1559 1559 def getProcessFlags(self):
1560 1560
1561 1561 processFlags = 0
1562 1562
1563 1563 dtype_index = get_dtype_index(self.dtype)
1564 1564 procflag_dtype = get_procflag_dtype(dtype_index)
1565 1565
1566 1566 processFlags += procflag_dtype
1567 1567
1568 1568 if self.dataOut.flagDecodeData:
1569 1569 processFlags += PROCFLAG.DECODE_DATA
1570 1570
1571 1571 if self.dataOut.flagDeflipData:
1572 1572 processFlags += PROCFLAG.DEFLIP_DATA
1573 1573
1574 1574 if self.dataOut.code is not None:
1575 1575 processFlags += PROCFLAG.DEFINE_PROCESS_CODE
1576 1576
1577 1577 if self.dataOut.nCohInt > 1:
1578 1578 processFlags += PROCFLAG.COHERENT_INTEGRATION
1579 1579
1580 1580 if self.dataOut.type == "Spectra":
1581 1581 if self.dataOut.nIncohInt > 1:
1582 1582 processFlags += PROCFLAG.INCOHERENT_INTEGRATION
1583 1583
1584 1584 if self.dataOut.data_dc is not None:
1585 1585 processFlags += PROCFLAG.SAVE_CHANNELS_DC
1586 1586
1587 1587 if self.dataOut.flagShiftFFT:
1588 1588 processFlags += PROCFLAG.SHIFT_FFT_DATA
1589 1589
1590 1590 return processFlags
1591 1591
1592 1592 def setBasicHeader(self):
1593 1593
1594 1594 self.basicHeaderObj.size = self.basicHeaderSize # bytes
1595 1595 self.basicHeaderObj.version = self.versionFile
1596 1596 self.basicHeaderObj.dataBlock = self.nTotalBlocks
1597 1597
1598 1598 utc = numpy.floor(self.dataOut.utctime)
1599 1599 milisecond = (self.dataOut.utctime - utc) * 1000.0
1600 1600
1601 1601 self.basicHeaderObj.utc = utc
1602 1602 self.basicHeaderObj.miliSecond = milisecond
1603 1603 self.basicHeaderObj.timeZone = self.dataOut.timeZone
1604 1604 self.basicHeaderObj.dstFlag = self.dataOut.dstFlag
1605 1605 self.basicHeaderObj.errorCount = self.dataOut.errorCount
1606 1606
1607 1607 def setFirstHeader(self):
1608 1608 """
1609 1609 Obtiene una copia del First Header
1610 1610
1611 1611 Affected:
1612 1612
1613 1613 self.basicHeaderObj
1614 1614 self.systemHeaderObj
1615 1615 self.radarControllerHeaderObj
1616 1616 self.processingHeaderObj self.
1617 1617
1618 1618 Return:
1619 1619 None
1620 1620 """
1621 1621
1622 1622 raise NotImplementedError
1623 1623
1624 1624 def __writeFirstHeader(self):
1625 1625 """
1626 1626 Escribe el primer header del file es decir el Basic header y el Long header (SystemHeader, RadarControllerHeader, ProcessingHeader)
1627 1627
1628 1628 Affected:
1629 1629 __dataType
1630 1630
1631 1631 Return:
1632 1632 None
1633 1633 """
1634 1634
1635 1635 # CALCULAR PARAMETROS
1636 1636
1637 1637 sizeLongHeader = self.systemHeaderObj.size + \
1638 1638 self.radarControllerHeaderObj.size + self.processingHeaderObj.size
1639 1639 self.basicHeaderObj.size = self.basicHeaderSize + sizeLongHeader
1640 1640
1641 1641 self.basicHeaderObj.write(self.fp)
1642 1642 self.systemHeaderObj.write(self.fp)
1643 1643 self.radarControllerHeaderObj.write(self.fp)
1644 1644 self.processingHeaderObj.write(self.fp)
1645 1645
1646 1646 def __setNewBlock(self):
1647 1647 """
1648 1648 Si es un nuevo file escribe el First Header caso contrario escribe solo el Basic Header
1649 1649
1650 1650 Return:
1651 1651 0 : si no pudo escribir nada
1652 1652 1 : Si escribio el Basic el First Header
1653 1653 """
1654 1654 if self.fp == None:
1655 1655 self.setNextFile()
1656 1656
1657 1657 if self.flagIsNewFile:
1658 1658 return 1
1659 1659
1660 1660 if self.blockIndex < self.processingHeaderObj.dataBlocksPerFile:
1661 1661 self.basicHeaderObj.write(self.fp)
1662 1662 return 1
1663 1663
1664 1664 if not(self.setNextFile()):
1665 1665 return 0
1666 1666
1667 1667 return 1
1668 1668
1669 1669 def writeNextBlock(self):
1670 1670 """
1671 1671 Selecciona el bloque siguiente de datos y los escribe en un file
1672 1672
1673 1673 Return:
1674 1674 0 : Si no hizo pudo escribir el bloque de datos
1675 1675 1 : Si no pudo escribir el bloque de datos
1676 1676 """
1677 1677 if not(self.__setNewBlock()):
1678 1678 return 0
1679 1679
1680 1680 self.writeBlock()
1681 1681
1682 1682 print("[Writing] Block No. %d/%d" % (self.blockIndex,
1683 1683 self.processingHeaderObj.dataBlocksPerFile))
1684 1684
1685 1685 return 1
1686 1686
1687 1687 def setNextFile(self):
1688 1688 """
1689 1689 Determina el siguiente file que sera escrito
1690 1690
1691 1691 Affected:
1692 1692 self.filename
1693 1693 self.subfolder
1694 1694 self.fp
1695 1695 self.setFile
1696 1696 self.flagIsNewFile
1697 1697
1698 1698 Return:
1699 1699 0 : Si el archivo no puede ser escrito
1700 1700 1 : Si el archivo esta listo para ser escrito
1701 1701 """
1702 1702 ext = self.ext
1703 1703 path = self.path
1704 1704
1705 1705 if self.fp != None:
1706 1706 self.fp.close()
1707 1707
1708 1708 timeTuple = time.localtime(self.dataOut.utctime)
1709 1709 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year, timeTuple.tm_yday)
1710 1710
1711 1711 fullpath = os.path.join(path, subfolder)
1712 1712 setFile = self.setFile
1713 1713
1714 1714 if not(os.path.exists(fullpath)):
1715 1715 os.mkdir(fullpath)
1716 1716 setFile = -1 # inicializo mi contador de seteo
1717 1717 else:
1718 1718 filesList = os.listdir(fullpath)
1719 1719 if len(filesList) > 0:
1720 1720 filesList = sorted(filesList, key=str.lower)
1721 1721 filen = filesList[-1]
1722 1722 # el filename debera tener el siguiente formato
1723 1723 # 0 1234 567 89A BCDE (hex)
1724 1724 # x YYYY DDD SSS .ext
1725 1725 if isNumber(filen[8:11]):
1726 1726 # inicializo mi contador de seteo al seteo del ultimo file
1727 1727 setFile = int(filen[8:11])
1728 1728 else:
1729 1729 setFile = -1
1730 1730 else:
1731 1731 setFile = -1 # inicializo mi contador de seteo
1732 1732
1733 1733 setFile += 1
1734 1734
1735 1735 # If this is a new day it resets some values
1736 1736 if self.dataOut.datatime.date() > self.fileDate:
1737 1737 setFile = 0
1738 1738 self.nTotalBlocks = 0
1739 1739
1740 1740 filen = '{}{:04d}{:03d}{:03d}{}'.format(
1741 1741 self.optchar, timeTuple.tm_year, timeTuple.tm_yday, setFile, ext)
1742 1742
1743 1743 filename = os.path.join(path, subfolder, filen)
1744 1744
1745 1745 fp = open(filename, 'wb')
1746 1746
1747 1747 self.blockIndex = 0
1748 1748
1749 1749 # guardando atributos
1750 1750 self.filename = filename
1751 1751 self.subfolder = subfolder
1752 1752 self.fp = fp
1753 1753 self.setFile = setFile
1754 1754 self.flagIsNewFile = 1
1755 1755 self.fileDate = self.dataOut.datatime.date()
1756 1756
1757 1757 self.setFirstHeader()
1758 1758
1759 1759 print('[Writing] Opening file: %s' % self.filename)
1760 1760
1761 1761 self.__writeFirstHeader()
1762 1762
1763 1763 return 1
1764 1764
1765 1765 def setup(self, dataOut, path, blocksPerFile, profilesPerBlock=64, set=None, ext=None, datatype=4):
1766 1766 """
1767 1767 Setea el tipo de formato en la cual sera guardada la data y escribe el First Header
1768 1768
1769 1769 Inputs:
1770 1770 path : directory where data will be saved
1771 1771 profilesPerBlock : number of profiles per block
1772 1772 set : initial file set
1773 1773 datatype : An integer number that defines data type:
1774 1774 0 : int8 (1 byte)
1775 1775 1 : int16 (2 bytes)
1776 1776 2 : int32 (4 bytes)
1777 1777 3 : int64 (8 bytes)
1778 1778 4 : float32 (4 bytes)
1779 1779 5 : double64 (8 bytes)
1780 1780
1781 1781 Return:
1782 1782 0 : Si no realizo un buen seteo
1783 1783 1 : Si realizo un buen seteo
1784 1784 """
1785 1785
1786 1786 if ext == None:
1787 1787 ext = self.ext
1788 1788
1789 1789 self.ext = ext.lower()
1790 1790
1791 1791 self.path = path
1792 1792
1793 1793 if set is None:
1794 1794 self.setFile = -1
1795 1795 else:
1796 1796 self.setFile = set - 1
1797 1797
1798 1798 self.blocksPerFile = blocksPerFile
1799 1799
1800 1800 self.profilesPerBlock = profilesPerBlock
1801 1801
1802 1802 self.dataOut = dataOut
1803 1803 self.fileDate = self.dataOut.datatime.date()
1804 1804 # By default
1805 1805 self.dtype = self.dataOut.dtype
1806 1806
1807 1807 if datatype is not None:
1808 1808 self.dtype = get_numpy_dtype(datatype)
1809 1809
1810 1810 if not(self.setNextFile()):
1811 1811 print("[Writing] There isn't a next file")
1812 1812 return 0
1813 1813
1814 1814 self.setBlockDimension()
1815 1815
1816 1816 return 1
1817 1817
1818 1818 def run(self, dataOut, path, blocksPerFile=100, profilesPerBlock=64, set=None, ext=None, datatype=4, **kwargs):
1819 1819
1820 1820 if not(self.isConfig):
1821 1821
1822 1822 self.setup(dataOut, path, blocksPerFile, profilesPerBlock=profilesPerBlock,
1823 1823 set=set, ext=ext, datatype=datatype, **kwargs)
1824 1824 self.isConfig = True
1825 1825
1826 1826 self.dataOut = dataOut
1827 1827 self.putData()
1828 1828 return self.dataOut No newline at end of file
@@ -1,797 +1,796
1 1
2 2 '''
3 3 Created on Jul 3, 2014
4 4
5 5 @author: roj-idl71
6 6 '''
7 7 # SUBCHANNELS EN VEZ DE CHANNELS
8 8 # BENCHMARKS -> PROBLEMAS CON ARCHIVOS GRANDES -> INCONSTANTE EN EL TIEMPO
9 9 # ACTUALIZACION DE VERSION
10 10 # HEADERS
11 11 # MODULO DE ESCRITURA
12 12 # METADATA
13 13
14 14 import os
15 15 import datetime
16 16 import numpy
17 17 import timeit
18 18 from fractions import Fraction
19 19
20 20 try:
21 21 from gevent import sleep
22 22 except:
23 23 from time import sleep
24 24
25 25 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
26 26 from schainpy.model.data.jrodata import Voltage
27 27 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
28 28 from time import time
29 29
30 30 import pickle
31 31 try:
32 32 import digital_rf
33 33 except:
34 34 print('You should install "digital_rf" module if you want to read Digital RF data')
35 35
36 36 @MPDecorator
37 37 class DigitalRFReader(ProcessingUnit):
38 38 '''
39 39 classdocs
40 40 '''
41 41
42 42 def __init__(self):
43 43 '''
44 44 Constructor
45 45 '''
46 46
47 47 ProcessingUnit.__init__(self)
48 48
49 49 self.dataOut = Voltage()
50 50 self.__printInfo = True
51 51 self.__flagDiscontinuousBlock = False
52 52 self.__bufferIndex = 9999999
53 53 self.__ippKm = None
54 54 self.__codeType = 0
55 55 self.__nCode = None
56 56 self.__nBaud = None
57 57 self.__code = None
58 58 self.dtype = None
59 59 self.oldAverage = None
60 60
61 61 def close(self):
62 62 print('Average of writing to digital rf format is ', self.oldAverage * 1000)
63 63 return
64 64
65 65 def __getCurrentSecond(self):
66 66
67 67 return self.__thisUnixSample / self.__sample_rate
68 68
69 69 thisSecond = property(__getCurrentSecond, "I'm the 'thisSecond' property.")
70 70
71 71 def __setFileHeader(self):
72 72 '''
73 73 In this method will be initialized every parameter of dataOut object (header, no data)
74 74 '''
75 75 ippSeconds = 1.0 * self.__nSamples / self.__sample_rate
76 76
77 77 nProfiles = 1.0 / ippSeconds # Number of profiles in one second
78 78
79 79 try:
80 80 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(
81 81 self.__radarControllerHeader)
82 82 except:
83 83 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(
84 84 txA=0,
85 85 txB=0,
86 86 nWindows=1,
87 87 nHeights=self.__nSamples,
88 88 firstHeight=self.__firstHeigth,
89 89 deltaHeight=self.__deltaHeigth,
90 90 codeType=self.__codeType,
91 91 nCode=self.__nCode, nBaud=self.__nBaud,
92 92 code=self.__code)
93 93
94 94 try:
95 95 self.dataOut.systemHeaderObj = SystemHeader(self.__systemHeader)
96 96 except:
97 97 self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples,
98 98 nProfiles=nProfiles,
99 99 nChannels=len(
100 100 self.__channelList),
101 101 adcResolution=14)
102 102 self.dataOut.type = "Voltage"
103 103
104 104 self.dataOut.data = None
105 105
106 106 self.dataOut.dtype = self.dtype
107 107
108 108 # self.dataOut.nChannels = 0
109 109
110 110 # self.dataOut.nHeights = 0
111 111
112 112 self.dataOut.nProfiles = int(nProfiles)
113 113
114 114 self.dataOut.heightList = self.__firstHeigth + \
115 115 numpy.arange(self.__nSamples, dtype=numpy.float) * \
116 116 self.__deltaHeigth
117 117
118 118 self.dataOut.channelList = list(range(self.__num_subchannels))
119 119
120 120 self.dataOut.blocksize = self.dataOut.getNChannels() * self.dataOut.getNHeights()
121 121
122 122 # self.dataOut.channelIndexList = None
123 123
124 124 self.dataOut.flagNoData = True
125 125
126 126 self.dataOut.flagDataAsBlock = False
127 127 # Set to TRUE if the data is discontinuous
128 128 self.dataOut.flagDiscontinuousBlock = False
129 129
130 130 self.dataOut.utctime = None
131 131
132 132 # timezone like jroheader, difference in minutes between UTC and localtime
133 133 self.dataOut.timeZone = self.__timezone / 60
134 134
135 135 self.dataOut.dstFlag = 0
136 136
137 137 self.dataOut.errorCount = 0
138 138
139 139 try:
140 140 self.dataOut.nCohInt = self.fixed_metadata_dict.get(
141 141 'nCohInt', self.nCohInt)
142 142
143 143 # asumo que la data esta decodificada
144 144 self.dataOut.flagDecodeData = self.fixed_metadata_dict.get(
145 145 'flagDecodeData', self.flagDecodeData)
146 146
147 147 # asumo que la data esta sin flip
148 148 self.dataOut.flagDeflipData = self.fixed_metadata_dict['flagDeflipData']
149 149
150 150 self.dataOut.flagShiftFFT = self.fixed_metadata_dict['flagShiftFFT']
151 151
152 152 self.dataOut.useLocalTime = self.fixed_metadata_dict['useLocalTime']
153 153 except:
154 154 pass
155 155
156 156 self.dataOut.ippSeconds = ippSeconds
157 157
158 158 # Time interval between profiles
159 159 # self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt
160 160
161 161 self.dataOut.frequency = self.__frequency
162 162
163 163 self.dataOut.realtime = self.__online
164 164
165 165 def findDatafiles(self, path, startDate=None, endDate=None):
166 166
167 167 if not os.path.isdir(path):
168 168 return []
169 169
170 170 try:
171 171 digitalReadObj = digital_rf.DigitalRFReader(
172 172 path, load_all_metadata=True)
173 173 except:
174 174 digitalReadObj = digital_rf.DigitalRFReader(path)
175 175
176 176 channelNameList = digitalReadObj.get_channels()
177 177
178 178 if not channelNameList:
179 179 return []
180 180
181 181 metadata_dict = digitalReadObj.get_rf_file_metadata(channelNameList[0])
182 182
183 183 sample_rate = metadata_dict['sample_rate'][0]
184 184
185 185 this_metadata_file = digitalReadObj.get_metadata(channelNameList[0])
186 186
187 187 try:
188 188 timezone = this_metadata_file['timezone'].value
189 189 except:
190 190 timezone = 0
191 191
192 192 startUTCSecond, endUTCSecond = digitalReadObj.get_bounds(
193 193 channelNameList[0]) / sample_rate - timezone
194 194
195 195 startDatetime = datetime.datetime.utcfromtimestamp(startUTCSecond)
196 196 endDatatime = datetime.datetime.utcfromtimestamp(endUTCSecond)
197 197
198 198 if not startDate:
199 199 startDate = startDatetime.date()
200 200
201 201 if not endDate:
202 202 endDate = endDatatime.date()
203 203
204 204 dateList = []
205 205
206 206 thisDatetime = startDatetime
207 207
208 208 while(thisDatetime <= endDatatime):
209 209
210 210 thisDate = thisDatetime.date()
211 211
212 212 if thisDate < startDate:
213 213 continue
214 214
215 215 if thisDate > endDate:
216 216 break
217 217
218 218 dateList.append(thisDate)
219 219 thisDatetime += datetime.timedelta(1)
220 220
221 221 return dateList
222 222
223 223 def setup(self, path=None,
224 224 startDate=None,
225 225 endDate=None,
226 226 startTime=datetime.time(0, 0, 0),
227 227 endTime=datetime.time(23, 59, 59),
228 228 channelList=None,
229 229 nSamples=None,
230 230 online=False,
231 231 delay=60,
232 232 buffer_size=1024,
233 233 ippKm=None,
234 234 nCohInt=1,
235 235 nCode=1,
236 236 nBaud=1,
237 237 flagDecodeData=False,
238 238 code=numpy.ones((1, 1), dtype=numpy.int),
239 239 **kwargs):
240 240 '''
241 241 In this method we should set all initial parameters.
242 242
243 243 Inputs:
244 244 path
245 245 startDate
246 246 endDate
247 247 startTime
248 248 endTime
249 249 set
250 250 expLabel
251 251 ext
252 252 online
253 253 delay
254 254 '''
255 255 self.nCohInt = nCohInt
256 256 self.flagDecodeData = flagDecodeData
257 257 self.i = 0
258 258 if not os.path.isdir(path):
259 259 raise ValueError("[Reading] Directory %s does not exist" % path)
260 260
261 261 try:
262 262 self.digitalReadObj = digital_rf.DigitalRFReader(
263 263 path, load_all_metadata=True)
264 264 except:
265 265 self.digitalReadObj = digital_rf.DigitalRFReader(path)
266 266
267 267 channelNameList = self.digitalReadObj.get_channels()
268 268
269 269 if not channelNameList:
270 270 raise ValueError("[Reading] Directory %s does not have any files" % path)
271 271
272 272 if not channelList:
273 273 channelList = list(range(len(channelNameList)))
274 274
275 275 ########## Reading metadata ######################
276 276
277 277 top_properties = self.digitalReadObj.get_properties(
278 278 channelNameList[channelList[0]])
279 279
280 280 self.__num_subchannels = top_properties['num_subchannels']
281 281 self.__sample_rate = 1.0 * \
282 282 top_properties['sample_rate_numerator'] / \
283 283 top_properties['sample_rate_denominator']
284 284 # self.__samples_per_file = top_properties['samples_per_file'][0]
285 285 self.__deltaHeigth = 1e6 * 0.15 / self.__sample_rate # why 0.15?
286 286
287 287 this_metadata_file = self.digitalReadObj.get_digital_metadata(
288 288 channelNameList[channelList[0]])
289 289 metadata_bounds = this_metadata_file.get_bounds()
290 290 self.fixed_metadata_dict = this_metadata_file.read(
291 291 metadata_bounds[0])[metadata_bounds[0]] # GET FIRST HEADER
292 292
293 293 try:
294 294 self.__processingHeader = self.fixed_metadata_dict['processingHeader']
295 295 self.__radarControllerHeader = self.fixed_metadata_dict['radarControllerHeader']
296 296 self.__systemHeader = self.fixed_metadata_dict['systemHeader']
297 297 self.dtype = pickle.loads(self.fixed_metadata_dict['dtype'])
298 298 except:
299 299 pass
300 300
301 301 self.__frequency = None
302 302
303 303 self.__frequency = self.fixed_metadata_dict.get('frequency', 1)
304 304
305 305 self.__timezone = self.fixed_metadata_dict.get('timezone', 300)
306 306
307 307 try:
308 308 nSamples = self.fixed_metadata_dict['nSamples']
309 309 except:
310 310 nSamples = None
311 311
312 312 self.__firstHeigth = 0
313 313
314 314 try:
315 315 codeType = self.__radarControllerHeader['codeType']
316 316 except:
317 317 codeType = 0
318 318
319 319 try:
320 320 if codeType:
321 321 nCode = self.__radarControllerHeader['nCode']
322 322 nBaud = self.__radarControllerHeader['nBaud']
323 323 code = self.__radarControllerHeader['code']
324 324 except:
325 325 pass
326 326
327 327 if not ippKm:
328 328 try:
329 329 # seconds to km
330 330 ippKm = self.__radarControllerHeader['ipp']
331 331 except:
332 332 ippKm = None
333 333 ####################################################
334 334 self.__ippKm = ippKm
335 335 startUTCSecond = None
336 336 endUTCSecond = None
337 337
338 338 if startDate:
339 339 startDatetime = datetime.datetime.combine(startDate, startTime)
340 340 startUTCSecond = (
341 341 startDatetime - datetime.datetime(1970, 1, 1)).total_seconds() + self.__timezone
342 342
343 343 if endDate:
344 344 endDatetime = datetime.datetime.combine(endDate, endTime)
345 345 endUTCSecond = (endDatetime - datetime.datetime(1970,
346 346 1, 1)).total_seconds() + self.__timezone
347 347
348 348 start_index, end_index = self.digitalReadObj.get_bounds(
349 349 channelNameList[channelList[0]])
350 350
351 351 if not startUTCSecond:
352 352 startUTCSecond = start_index / self.__sample_rate
353 353
354 354 if start_index > startUTCSecond * self.__sample_rate:
355 355 startUTCSecond = start_index / self.__sample_rate
356 356
357 357 if not endUTCSecond:
358 358 endUTCSecond = end_index / self.__sample_rate
359 359
360 360 if end_index < endUTCSecond * self.__sample_rate:
361 361 endUTCSecond = end_index / self.__sample_rate
362 362 if not nSamples:
363 363 if not ippKm:
364 364 raise ValueError("[Reading] nSamples or ippKm should be defined")
365 365 nSamples = int(ippKm / (1e6 * 0.15 / self.__sample_rate))
366 366 channelBoundList = []
367 367 channelNameListFiltered = []
368 368
369 369 for thisIndexChannel in channelList:
370 370 thisChannelName = channelNameList[thisIndexChannel]
371 371 start_index, end_index = self.digitalReadObj.get_bounds(
372 372 thisChannelName)
373 373 channelBoundList.append((start_index, end_index))
374 374 channelNameListFiltered.append(thisChannelName)
375 375
376 376 self.profileIndex = 0
377 377 self.i = 0
378 378 self.__delay = delay
379 379
380 380 self.__codeType = codeType
381 381 self.__nCode = nCode
382 382 self.__nBaud = nBaud
383 383 self.__code = code
384 384
385 385 self.__datapath = path
386 386 self.__online = online
387 387 self.__channelList = channelList
388 388 self.__channelNameList = channelNameListFiltered
389 389 self.__channelBoundList = channelBoundList
390 390 self.__nSamples = nSamples
391 391 self.__samples_to_read = int(nSamples) # FIJO: AHORA 40
392 392 self.__nChannels = len(self.__channelList)
393 393
394 394 self.__startUTCSecond = startUTCSecond
395 395 self.__endUTCSecond = endUTCSecond
396 396
397 397 self.__timeInterval = 1.0 * self.__samples_to_read / \
398 398 self.__sample_rate # Time interval
399 399
400 400 if online:
401 401 # self.__thisUnixSample = int(endUTCSecond*self.__sample_rate - 4*self.__samples_to_read)
402 402 startUTCSecond = numpy.floor(endUTCSecond)
403 403
404 404 # por que en el otro metodo lo primero q se hace es sumar samplestoread
405 405 self.__thisUnixSample = int(
406 406 startUTCSecond * self.__sample_rate) - self.__samples_to_read
407 407
408 408 self.__data_buffer = numpy.zeros(
409 409 (self.__num_subchannels, self.__samples_to_read), dtype=numpy.complex)
410 410
411 411 self.__setFileHeader()
412 412 self.isConfig = True
413 413
414 414 print("[Reading] Digital RF Data was found from %s to %s " % (
415 415 datetime.datetime.utcfromtimestamp(
416 416 self.__startUTCSecond - self.__timezone),
417 417 datetime.datetime.utcfromtimestamp(
418 418 self.__endUTCSecond - self.__timezone)
419 419 ))
420 420
421 421 print("[Reading] Starting process from %s to %s" % (datetime.datetime.utcfromtimestamp(startUTCSecond - self.__timezone),
422 422 datetime.datetime.utcfromtimestamp(
423 423 endUTCSecond - self.__timezone)
424 424 ))
425 425 self.oldAverage = None
426 426 self.count = 0
427 427 self.executionTime = 0
428 428
429 429 def __reload(self):
430 430 # print
431 431 # print "%s not in range [%s, %s]" %(
432 432 # datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
433 433 # datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
434 434 # datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
435 435 # )
436 436 print("[Reading] reloading metadata ...")
437 437
438 438 try:
439 439 self.digitalReadObj.reload(complete_update=True)
440 440 except:
441 441 self.digitalReadObj.reload()
442 442
443 443 start_index, end_index = self.digitalReadObj.get_bounds(
444 444 self.__channelNameList[self.__channelList[0]])
445 445
446 446 if start_index > self.__startUTCSecond * self.__sample_rate:
447 447 self.__startUTCSecond = 1.0 * start_index / self.__sample_rate
448 448
449 449 if end_index > self.__endUTCSecond * self.__sample_rate:
450 450 self.__endUTCSecond = 1.0 * end_index / self.__sample_rate
451 451 print()
452 452 print("[Reading] New timerange found [%s, %s] " % (
453 453 datetime.datetime.utcfromtimestamp(
454 454 self.__startUTCSecond - self.__timezone),
455 455 datetime.datetime.utcfromtimestamp(
456 456 self.__endUTCSecond - self.__timezone)
457 457 ))
458 458
459 459 return True
460 460
461 461 return False
462 462
463 463 def timeit(self, toExecute):
464 464 t0 = time()
465 465 toExecute()
466 466 self.executionTime = time() - t0
467 467 if self.oldAverage is None:
468 468 self.oldAverage = self.executionTime
469 469 self.oldAverage = (self.executionTime + self.count *
470 470 self.oldAverage) / (self.count + 1.0)
471 471 self.count = self.count + 1.0
472 472 return
473 473
474 474 def __readNextBlock(self, seconds=30, volt_scale=1):
475 475 '''
476 476 '''
477 477
478 478 # Set the next data
479 479 self.__flagDiscontinuousBlock = False
480 480 self.__thisUnixSample += self.__samples_to_read
481 481
482 482 if self.__thisUnixSample + 2 * self.__samples_to_read > self.__endUTCSecond * self.__sample_rate:
483 483 print("[Reading] There are no more data into selected time-range")
484 484 if self.__online:
485 485 self.__reload()
486 486 else:
487 487 return False
488 488
489 489 if self.__thisUnixSample + 2 * self.__samples_to_read > self.__endUTCSecond * self.__sample_rate:
490 490 return False
491 491 self.__thisUnixSample -= self.__samples_to_read
492 492
493 493 indexChannel = 0
494 494
495 495 dataOk = False
496 496 for thisChannelName in self.__channelNameList: # TODO VARIOS CHANNELS?
497 497 for indexSubchannel in range(self.__num_subchannels):
498 498 try:
499 499 t0 = time()
500 500 result = self.digitalReadObj.read_vector_c81d(self.__thisUnixSample,
501 501 self.__samples_to_read,
502 502 thisChannelName, sub_channel=indexSubchannel)
503 503 self.executionTime = time() - t0
504 504 if self.oldAverage is None:
505 505 self.oldAverage = self.executionTime
506 506 self.oldAverage = (
507 507 self.executionTime + self.count * self.oldAverage) / (self.count + 1.0)
508 508 self.count = self.count + 1.0
509 509
510 510 except IOError as e:
511 511 # read next profile
512 512 self.__flagDiscontinuousBlock = True
513 513 print("[Reading] %s" % datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), e)
514 514 break
515 515
516 516 if result.shape[0] != self.__samples_to_read:
517 517 self.__flagDiscontinuousBlock = True
518 518 print("[Reading] %s: Too few samples were found, just %d/%d samples" % (datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
519 519 result.shape[0],
520 520 self.__samples_to_read))
521 521 break
522 522
523 523 self.__data_buffer[indexSubchannel, :] = result * volt_scale
524 524
525 525 indexChannel += 1
526 526
527 527 dataOk = True
528 528
529 529 self.__utctime = self.__thisUnixSample / self.__sample_rate
530 530
531 531 if not dataOk:
532 532 return False
533 533
534 534 print("[Reading] %s: %d samples <> %f sec" % (datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
535 535 self.__samples_to_read,
536 536 self.__timeInterval))
537 537
538 538 self.__bufferIndex = 0
539 539
540 540 return True
541 541
542 542 def __isBufferEmpty(self):
543 543 return self.__bufferIndex > self.__samples_to_read - self.__nSamples # 40960 - 40
544 544
545 545 def getData(self, seconds=30, nTries=5):
546 546 '''
547 547 This method gets the data from files and put the data into the dataOut object
548 548
549 549 In addition, increase el the buffer counter in one.
550 550
551 551 Return:
552 552 data : retorna un perfil de voltages (alturas * canales) copiados desde el
553 553 buffer. Si no hay mas archivos a leer retorna None.
554 554
555 555 Affected:
556 556 self.dataOut
557 557 self.profileIndex
558 558 self.flagDiscontinuousBlock
559 559 self.flagIsNewBlock
560 560 '''
561 561
562 562 err_counter = 0
563 563 self.dataOut.flagNoData = True
564 564
565 565 if self.__isBufferEmpty():
566 566 self.__flagDiscontinuousBlock = False
567 567
568 568 while True:
569 569 if self.__readNextBlock():
570 570 break
571 571 if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate:
572 self.dataOut.error = (1, '')
572 self.dataOut.error = 'Error'
573 573 return
574 574
575 575 if self.__flagDiscontinuousBlock:
576 print('[Reading] discontinuous block found ... continue with the next block')
577 self.dataOut.error = (1, '')
576 self.dataOut.error = 'discontinuous block found'
578 577 return
579 578
580 579 if not self.__online:
581 self.dataOut.error = (1, '')
580 self.dataOut.error = 'Online?'
582 581 return
583 582
584 583 err_counter += 1
585 584 if err_counter > nTries:
586 self.dataOut.error = (1, '')
585 self.dataOut.error = 'Max retrys reach'
587 586 return
588 587
589 588 print('[Reading] waiting %d seconds to read a new block' % seconds)
590 589 sleep(seconds)
591 590
592 591 self.dataOut.data = self.__data_buffer[:,
593 592 self.__bufferIndex:self.__bufferIndex + self.__nSamples]
594 593 self.dataOut.utctime = (
595 594 self.__thisUnixSample + self.__bufferIndex) / self.__sample_rate
596 595 self.dataOut.flagNoData = False
597 596 self.dataOut.flagDiscontinuousBlock = self.__flagDiscontinuousBlock
598 597 self.dataOut.profileIndex = self.profileIndex
599 598
600 599 self.__bufferIndex += self.__nSamples
601 600 self.profileIndex += 1
602 601
603 602 if self.profileIndex == self.dataOut.nProfiles:
604 603 self.profileIndex = 0
605 604
606 605 return
607 606
608 607 def printInfo(self):
609 608 '''
610 609 '''
611 610 if self.__printInfo == False:
612 611 return
613 612
614 613 # self.systemHeaderObj.printInfo()
615 614 # self.radarControllerHeaderObj.printInfo()
616 615
617 616 self.__printInfo = False
618 617
619 618 def printNumberOfBlock(self):
620 619 '''
621 620 '''
622 621 return
623 622 # print self.profileIndex
624 623
625 624 def run(self, **kwargs):
626 625 '''
627 626 This method will be called many times so here you should put all your code
628 627 '''
629 628
630 629 if not self.isConfig:
631 630 self.setup(**kwargs)
632 631 #self.i = self.i+1
633 632 self.getData(seconds=self.__delay)
634 633
635 634 return
636 635
637 636
638 637 class DigitalRFWriter(Operation):
639 638 '''
640 639 classdocs
641 640 '''
642 641
643 642 def __init__(self, **kwargs):
644 643 '''
645 644 Constructor
646 645 '''
647 646 Operation.__init__(self, **kwargs)
648 647 self.metadata_dict = {}
649 648 self.dataOut = None
650 649 self.dtype = None
651 650 self.oldAverage = 0
652 651
653 652 def setHeader(self):
654 653
655 654 self.metadata_dict['frequency'] = self.dataOut.frequency
656 655 self.metadata_dict['timezone'] = self.dataOut.timeZone
657 656 self.metadata_dict['dtype'] = pickle.dumps(self.dataOut.dtype)
658 657 self.metadata_dict['nProfiles'] = self.dataOut.nProfiles
659 658 self.metadata_dict['heightList'] = self.dataOut.heightList
660 659 self.metadata_dict['channelList'] = self.dataOut.channelList
661 660 self.metadata_dict['flagDecodeData'] = self.dataOut.flagDecodeData
662 661 self.metadata_dict['flagDeflipData'] = self.dataOut.flagDeflipData
663 662 self.metadata_dict['flagShiftFFT'] = self.dataOut.flagShiftFFT
664 663 self.metadata_dict['useLocalTime'] = self.dataOut.useLocalTime
665 664 self.metadata_dict['nCohInt'] = self.dataOut.nCohInt
666 665 self.metadata_dict['type'] = self.dataOut.type
667 666 self.metadata_dict['flagDataAsBlock'] = getattr(
668 667 self.dataOut, 'flagDataAsBlock', None) # chequear
669 668
670 669 def setup(self, dataOut, path, frequency, fileCadence, dirCadence, metadataCadence, set=0, metadataFile='metadata', ext='.h5'):
671 670 '''
672 671 In this method we should set all initial parameters.
673 672 Input:
674 673 dataOut: Input data will also be outputa data
675 674 '''
676 675 self.setHeader()
677 676 self.__ippSeconds = dataOut.ippSeconds
678 677 self.__deltaH = dataOut.getDeltaH()
679 678 self.__sample_rate = 1e6 * 0.15 / self.__deltaH
680 679 self.__dtype = dataOut.dtype
681 680 if len(dataOut.dtype) == 2:
682 681 self.__dtype = dataOut.dtype[0]
683 682 self.__nSamples = dataOut.systemHeaderObj.nSamples
684 683 self.__nProfiles = dataOut.nProfiles
685 684
686 685 if self.dataOut.type != 'Voltage':
687 686 raise 'Digital RF cannot be used with this data type'
688 687 self.arr_data = numpy.ones((1, dataOut.nFFTPoints * len(
689 688 self.dataOut.channelList)), dtype=[('r', self.__dtype), ('i', self.__dtype)])
690 689 else:
691 690 self.arr_data = numpy.ones((self.__nSamples, len(
692 691 self.dataOut.channelList)), dtype=[('r', self.__dtype), ('i', self.__dtype)])
693 692
694 693 file_cadence_millisecs = 1000
695 694
696 695 sample_rate_fraction = Fraction(self.__sample_rate).limit_denominator()
697 696 sample_rate_numerator = int(sample_rate_fraction.numerator)
698 697 sample_rate_denominator = int(sample_rate_fraction.denominator)
699 698 start_global_index = dataOut.utctime * self.__sample_rate
700 699
701 700 uuid = 'prueba'
702 701 compression_level = 0
703 702 checksum = False
704 703 is_complex = True
705 704 num_subchannels = len(dataOut.channelList)
706 705 is_continuous = True
707 706 marching_periods = False
708 707
709 708 self.digitalWriteObj = digital_rf.DigitalRFWriter(path, self.__dtype, dirCadence,
710 709 fileCadence, start_global_index,
711 710 sample_rate_numerator, sample_rate_denominator, uuid, compression_level, checksum,
712 711 is_complex, num_subchannels, is_continuous, marching_periods)
713 712 metadata_dir = os.path.join(path, 'metadata')
714 713 os.system('mkdir %s' % (metadata_dir))
715 714 self.digitalMetadataWriteObj = digital_rf.DigitalMetadataWriter(metadata_dir, dirCadence, 1, # 236, file_cadence_millisecs / 1000
716 715 sample_rate_numerator, sample_rate_denominator,
717 716 metadataFile)
718 717 self.isConfig = True
719 718 self.currentSample = 0
720 719 self.oldAverage = 0
721 720 self.count = 0
722 721 return
723 722
724 723 def writeMetadata(self):
725 724 start_idx = self.__sample_rate * self.dataOut.utctime
726 725
727 726 self.metadata_dict['processingHeader'] = self.dataOut.processingHeaderObj.getAsDict(
728 727 )
729 728 self.metadata_dict['radarControllerHeader'] = self.dataOut.radarControllerHeaderObj.getAsDict(
730 729 )
731 730 self.metadata_dict['systemHeader'] = self.dataOut.systemHeaderObj.getAsDict(
732 731 )
733 732 self.digitalMetadataWriteObj.write(start_idx, self.metadata_dict)
734 733 return
735 734
736 735 def timeit(self, toExecute):
737 736 t0 = time()
738 737 toExecute()
739 738 self.executionTime = time() - t0
740 739 if self.oldAverage is None:
741 740 self.oldAverage = self.executionTime
742 741 self.oldAverage = (self.executionTime + self.count *
743 742 self.oldAverage) / (self.count + 1.0)
744 743 self.count = self.count + 1.0
745 744 return
746 745
747 746 def writeData(self):
748 747 if self.dataOut.type != 'Voltage':
749 748 raise 'Digital RF cannot be used with this data type'
750 749 for channel in self.dataOut.channelList:
751 750 for i in range(self.dataOut.nFFTPoints):
752 751 self.arr_data[1][channel * self.dataOut.nFFTPoints +
753 752 i]['r'] = self.dataOut.data[channel][i].real
754 753 self.arr_data[1][channel * self.dataOut.nFFTPoints +
755 754 i]['i'] = self.dataOut.data[channel][i].imag
756 755 else:
757 756 for i in range(self.dataOut.systemHeaderObj.nSamples):
758 757 for channel in self.dataOut.channelList:
759 758 self.arr_data[i][channel]['r'] = self.dataOut.data[channel][i].real
760 759 self.arr_data[i][channel]['i'] = self.dataOut.data[channel][i].imag
761 760
762 761 def f(): return self.digitalWriteObj.rf_write(self.arr_data)
763 762 self.timeit(f)
764 763
765 764 return
766 765
767 766 def run(self, dataOut, frequency=49.92e6, path=None, fileCadence=1000, dirCadence=36000, metadataCadence=1, **kwargs):
768 767 '''
769 768 This method will be called many times so here you should put all your code
770 769 Inputs:
771 770 dataOut: object with the data
772 771 '''
773 772 # print dataOut.__dict__
774 773 self.dataOut = dataOut
775 774 if not self.isConfig:
776 775 self.setup(dataOut, path, frequency, fileCadence,
777 776 dirCadence, metadataCadence, **kwargs)
778 777 self.writeMetadata()
779 778
780 779 self.writeData()
781 780
782 781 ## self.currentSample += 1
783 782 # if self.dataOut.flagDataAsBlock or self.currentSample == 1:
784 783 # self.writeMetadata()
785 784 ## if self.currentSample == self.__nProfiles: self.currentSample = 0
786 785
787 786 return dataOut
788 787
789 788 def close(self):
790 789 print('[Writing] - Closing files ')
791 790 print('Average of writing to digital rf format is ', self.oldAverage * 1000)
792 791 try:
793 792 self.digitalWriteObj.close()
794 793 except:
795 794 pass
796 795
797 796
@@ -1,642 +1,642
1 1 '''
2 2 Created on Aug 1, 2017
3 3
4 4 @author: Juan C. Espinoza
5 5 '''
6 6
7 7 import os
8 8 import sys
9 9 import time
10 10 import json
11 11 import glob
12 12 import datetime
13 13
14 14 import numpy
15 15 import h5py
16 16
17 17 from schainpy.model.io.jroIO_base import JRODataReader
18 18 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
19 19 from schainpy.model.data.jrodata import Parameters
20 20 from schainpy.utils import log
21 21
22 22 try:
23 23 import madrigal.cedar
24 24 except:
25 25 log.warning(
26 26 'You should install "madrigal library" module if you want to read/write Madrigal data'
27 27 )
28 28
29 29 DEF_CATALOG = {
30 30 'principleInvestigator': 'Marco Milla',
31 31 'expPurpose': None,
32 32 'cycleTime': None,
33 33 'correlativeExp': None,
34 34 'sciRemarks': None,
35 35 'instRemarks': None
36 36 }
37 37 DEF_HEADER = {
38 38 'kindatDesc': None,
39 39 'analyst': 'Jicamarca User',
40 40 'comments': None,
41 41 'history': None
42 42 }
43 43 MNEMONICS = {
44 44 10: 'jro',
45 45 11: 'jbr',
46 46 840: 'jul',
47 47 13: 'jas',
48 48 1000: 'pbr',
49 49 1001: 'hbr',
50 50 1002: 'obr',
51 51 }
52 52
53 53 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
54 54
55 55 def load_json(obj):
56 56 '''
57 57 Parse json as string instead of unicode
58 58 '''
59 59
60 60 if isinstance(obj, str):
61 61 iterable = json.loads(obj)
62 62 else:
63 63 iterable = obj
64 64
65 65 if isinstance(iterable, dict):
66 66 return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, str) else v
67 67 for k, v in list(iterable.items())}
68 68 elif isinstance(iterable, (list, tuple)):
69 69 return [str(v) if isinstance(v, str) else v for v in iterable]
70 70
71 71 return iterable
72 72
73 73 @MPDecorator
74 74 class MADReader(JRODataReader, ProcessingUnit):
75 75
76 76 def __init__(self):
77 77
78 78 ProcessingUnit.__init__(self)
79 79
80 80 self.dataOut = Parameters()
81 81 self.counter_records = 0
82 82 self.nrecords = None
83 83 self.flagNoMoreFiles = 0
84 84 self.isConfig = False
85 85 self.filename = None
86 86 self.intervals = set()
87 87
88 88 def setup(self,
89 89 path=None,
90 90 startDate=None,
91 91 endDate=None,
92 92 format=None,
93 93 startTime=datetime.time(0, 0, 0),
94 94 endTime=datetime.time(23, 59, 59),
95 95 **kwargs):
96 96
97 97 self.path = path
98 98 self.startDate = startDate
99 99 self.endDate = endDate
100 100 self.startTime = startTime
101 101 self.endTime = endTime
102 102 self.datatime = datetime.datetime(1900,1,1)
103 103 self.oneDDict = load_json(kwargs.get('oneDDict',
104 104 "{\"GDLATR\":\"lat\", \"GDLONR\":\"lon\"}"))
105 105 self.twoDDict = load_json(kwargs.get('twoDDict',
106 106 "{\"GDALT\": \"heightList\"}"))
107 107 self.ind2DList = load_json(kwargs.get('ind2DList',
108 108 "[\"GDALT\"]"))
109 109 if self.path is None:
110 110 raise ValueError('The path is not valid')
111 111
112 112 if format is None:
113 113 raise ValueError('The format is not valid choose simple or hdf5')
114 114 elif format.lower() in ('simple', 'txt'):
115 115 self.ext = '.txt'
116 116 elif format.lower() in ('cedar',):
117 117 self.ext = '.001'
118 118 else:
119 119 self.ext = '.hdf5'
120 120
121 121 self.search_files(self.path)
122 122 self.fileId = 0
123 123
124 124 if not self.fileList:
125 125 raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path))
126 126
127 127 self.setNextFile()
128 128
129 129 def search_files(self, path):
130 130 '''
131 131 Searching for madrigal files in path
132 132 Creating a list of files to procces included in [startDate,endDate]
133 133
134 134 Input:
135 135 path - Path to find files
136 136 '''
137 137
138 138 log.log('Searching files {} in {} '.format(self.ext, path), 'MADReader')
139 139 foldercounter = 0
140 140 fileList0 = glob.glob1(path, '*{}'.format(self.ext))
141 141 fileList0.sort()
142 142
143 143 self.fileList = []
144 144 self.dateFileList = []
145 145
146 146 startDate = self.startDate - datetime.timedelta(1)
147 147 endDate = self.endDate + datetime.timedelta(1)
148 148
149 149 for thisFile in fileList0:
150 150 year = thisFile[3:7]
151 151 if not year.isdigit():
152 152 continue
153 153
154 154 month = thisFile[7:9]
155 155 if not month.isdigit():
156 156 continue
157 157
158 158 day = thisFile[9:11]
159 159 if not day.isdigit():
160 160 continue
161 161
162 162 year, month, day = int(year), int(month), int(day)
163 163 dateFile = datetime.date(year, month, day)
164 164
165 165 if (startDate > dateFile) or (endDate < dateFile):
166 166 continue
167 167
168 168 self.fileList.append(thisFile)
169 169 self.dateFileList.append(dateFile)
170 170
171 171 return
172 172
173 173 def parseHeader(self):
174 174 '''
175 175 '''
176 176
177 177 self.output = {}
178 178 self.version = '2'
179 179 s_parameters = None
180 180 if self.ext == '.txt':
181 181 self.parameters = [s.strip().lower() for s in self.fp.readline().strip().split(' ') if s]
182 182 elif self.ext == '.hdf5':
183 183 metadata = self.fp['Metadata']
184 184 data = self.fp['Data']['Array Layout']
185 185 if 'Independent Spatial Parameters' in metadata:
186 186 s_parameters = [s[0].lower() for s in metadata['Independent Spatial Parameters']]
187 187 self.version = '3'
188 188 one = [s[0].lower() for s in data['1D Parameters']['Data Parameters']]
189 189 one_d = [1 for s in one]
190 190 two = [s[0].lower() for s in data['2D Parameters']['Data Parameters']]
191 191 two_d = [2 for s in two]
192 192 self.parameters = one + two
193 193 self.parameters_d = one_d + two_d
194 194
195 195 log.success('Parameters found: {}'.format(','.join(self.parameters)),
196 196 'MADReader')
197 197 if s_parameters:
198 198 log.success('Spatial parameters: {}'.format(','.join(s_parameters)),
199 199 'MADReader')
200 200
201 201 for param in list(self.oneDDict.keys()):
202 202 if param.lower() not in self.parameters:
203 203 log.warning(
204 204 'Parameter {} not found will be ignored'.format(
205 205 param),
206 206 'MADReader')
207 207 self.oneDDict.pop(param, None)
208 208
209 209 for param, value in list(self.twoDDict.items()):
210 210 if param.lower() not in self.parameters:
211 211 log.warning(
212 212 'Parameter {} not found, it will be ignored'.format(
213 213 param),
214 214 'MADReader')
215 215 self.twoDDict.pop(param, None)
216 216 continue
217 217 if isinstance(value, list):
218 218 if value[0] not in self.output:
219 219 self.output[value[0]] = []
220 220 self.output[value[0]].append(None)
221 221
222 222 def parseData(self):
223 223 '''
224 224 '''
225 225
226 226 if self.ext == '.txt':
227 227 self.data = numpy.genfromtxt(self.fp, missing_values=('missing'))
228 228 self.nrecords = self.data.shape[0]
229 229 self.ranges = numpy.unique(self.data[:,self.parameters.index(self.ind2DList[0].lower())])
230 230 elif self.ext == '.hdf5':
231 231 self.data = self.fp['Data']['Array Layout']
232 232 self.nrecords = len(self.data['timestamps'].value)
233 233 self.ranges = self.data['range'].value
234 234
235 235 def setNextFile(self):
236 236 '''
237 237 '''
238 238
239 239 file_id = self.fileId
240 240
241 241 if file_id == len(self.fileList):
242 242 log.success('No more files', 'MADReader')
243 243 self.flagNoMoreFiles = 1
244 244 return 0
245 245
246 246 log.success(
247 247 'Opening: {}'.format(self.fileList[file_id]),
248 248 'MADReader'
249 249 )
250 250
251 251 filename = os.path.join(self.path, self.fileList[file_id])
252 252
253 253 if self.filename is not None:
254 254 self.fp.close()
255 255
256 256 self.filename = filename
257 257 self.filedate = self.dateFileList[file_id]
258 258
259 259 if self.ext=='.hdf5':
260 260 self.fp = h5py.File(self.filename, 'r')
261 261 else:
262 262 self.fp = open(self.filename, 'rb')
263 263
264 264 self.parseHeader()
265 265 self.parseData()
266 266 self.sizeOfFile = os.path.getsize(self.filename)
267 267 self.counter_records = 0
268 268 self.flagIsNewFile = 0
269 269 self.fileId += 1
270 270
271 271 return 1
272 272
273 273 def readNextBlock(self):
274 274
275 275 while True:
276 276 self.flagDiscontinuousBlock = 0
277 277 if self.flagIsNewFile:
278 278 if not self.setNextFile():
279 279 return 0
280 280
281 281 self.readBlock()
282 282
283 283 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
284 284 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
285 285 log.warning(
286 286 'Reading Record No. {}/{} -> {} [Skipping]'.format(
287 287 self.counter_records,
288 288 self.nrecords,
289 289 self.datatime.ctime()),
290 290 'MADReader')
291 291 continue
292 292 break
293 293
294 294 log.log(
295 295 'Reading Record No. {}/{} -> {}'.format(
296 296 self.counter_records,
297 297 self.nrecords,
298 298 self.datatime.ctime()),
299 299 'MADReader')
300 300
301 301 return 1
302 302
303 303 def readBlock(self):
304 304 '''
305 305 '''
306 306 dum = []
307 307 if self.ext == '.txt':
308 308 dt = self.data[self.counter_records][:6].astype(int)
309 309 if datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5]).date() > self.datatime.date():
310 310 self.flagDiscontinuousBlock = 1
311 311 self.datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
312 312 while True:
313 313 dt = self.data[self.counter_records][:6].astype(int)
314 314 datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
315 315 if datatime == self.datatime:
316 316 dum.append(self.data[self.counter_records])
317 317 self.counter_records += 1
318 318 if self.counter_records == self.nrecords:
319 319 self.flagIsNewFile = True
320 320 break
321 321 continue
322 322 self.intervals.add((datatime-self.datatime).seconds)
323 323 break
324 324 elif self.ext == '.hdf5':
325 325 datatime = datetime.datetime.utcfromtimestamp(
326 326 self.data['timestamps'][self.counter_records])
327 327 nHeights = len(self.ranges)
328 328 for n, param in enumerate(self.parameters):
329 329 if self.parameters_d[n] == 1:
330 330 dum.append(numpy.ones(nHeights)*self.data['1D Parameters'][param][self.counter_records])
331 331 else:
332 332 if self.version == '2':
333 333 dum.append(self.data['2D Parameters'][param][self.counter_records])
334 334 else:
335 335 tmp = self.data['2D Parameters'][param].value.T
336 336 dum.append(tmp[self.counter_records])
337 337 self.intervals.add((datatime-self.datatime).seconds)
338 338 if datatime.date()>self.datatime.date():
339 339 self.flagDiscontinuousBlock = 1
340 340 self.datatime = datatime
341 341 self.counter_records += 1
342 342 if self.counter_records == self.nrecords:
343 343 self.flagIsNewFile = True
344 344
345 345 self.buffer = numpy.array(dum)
346 346 return
347 347
348 348 def set_output(self):
349 349 '''
350 350 Storing data from buffer to dataOut object
351 351 '''
352 352
353 353 parameters = [None for __ in self.parameters]
354 354
355 355 for param, attr in list(self.oneDDict.items()):
356 356 x = self.parameters.index(param.lower())
357 357 setattr(self.dataOut, attr, self.buffer[0][x])
358 358
359 359 for param, value in list(self.twoDDict.items()):
360 360 x = self.parameters.index(param.lower())
361 361 if self.ext == '.txt':
362 362 y = self.parameters.index(self.ind2DList[0].lower())
363 363 ranges = self.buffer[:,y]
364 364 if self.ranges.size == ranges.size:
365 365 continue
366 366 index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
367 367 dummy = numpy.zeros(self.ranges.shape) + numpy.nan
368 368 dummy[index] = self.buffer[:,x]
369 369 else:
370 370 dummy = self.buffer[x]
371 371
372 372 if isinstance(value, str):
373 373 if value not in self.ind2DList:
374 374 setattr(self.dataOut, value, dummy.reshape(1,-1))
375 375 elif isinstance(value, list):
376 376 self.output[value[0]][value[1]] = dummy
377 377 parameters[value[1]] = param
378 378
379 379 for key, value in list(self.output.items()):
380 380 setattr(self.dataOut, key, numpy.array(value))
381 381
382 382 self.dataOut.parameters = [s for s in parameters if s]
383 383 self.dataOut.heightList = self.ranges
384 384 self.dataOut.utctime = (self.datatime - datetime.datetime(1970, 1, 1)).total_seconds()
385 385 self.dataOut.utctimeInit = self.dataOut.utctime
386 386 self.dataOut.paramInterval = min(self.intervals)
387 387 self.dataOut.useLocalTime = False
388 388 self.dataOut.flagNoData = False
389 389 self.dataOut.nrecords = self.nrecords
390 390 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
391 391
392 392 def getData(self):
393 393 '''
394 394 Storing data from databuffer to dataOut object
395 395 '''
396 396 if self.flagNoMoreFiles:
397 397 self.dataOut.flagNoData = True
398 log.error('No file left to process', 'MADReader')
398 self.dataOut.error = 'No file left to process'
399 399 return 0
400 400
401 401 if not self.readNextBlock():
402 402 self.dataOut.flagNoData = True
403 403 return 0
404 404
405 405 self.set_output()
406 406
407 407 return 1
408 408
409 409
410 410 class MADWriter(Operation):
411 411
412 412 missing = -32767
413 413
414 414 def __init__(self, **kwargs):
415 415
416 416 Operation.__init__(self, **kwargs)
417 417 self.dataOut = Parameters()
418 418 self.counter = 0
419 419 self.path = None
420 420 self.fp = None
421 421
422 422 def run(self, dataOut, path, oneDDict, ind2DList='[]', twoDDict='{}',
423 423 metadata='{}', format='cedar', **kwargs):
424 424 '''
425 425 Inputs:
426 426 path - path where files will be created
427 427 oneDDict - json of one-dimensional parameters in record where keys
428 428 are Madrigal codes (integers or mnemonics) and values the corresponding
429 429 dataOut attribute e.g: {
430 430 'gdlatr': 'lat',
431 431 'gdlonr': 'lon',
432 432 'gdlat2':'lat',
433 433 'glon2':'lon'}
434 434 ind2DList - list of independent spatial two-dimensional parameters e.g:
435 435 ['heighList']
436 436 twoDDict - json of two-dimensional parameters in record where keys
437 437 are Madrigal codes (integers or mnemonics) and values the corresponding
438 438 dataOut attribute if multidimensional array specify as tupple
439 439 ('attr', pos) e.g: {
440 440 'gdalt': 'heightList',
441 441 'vn1p2': ('data_output', 0),
442 442 'vn2p2': ('data_output', 1),
443 443 'vn3': ('data_output', 2),
444 444 'snl': ('data_SNR', 'db')
445 445 }
446 446 metadata - json of madrigal metadata (kinst, kindat, catalog and header)
447 447 '''
448 448 if not self.isConfig:
449 449 self.setup(path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs)
450 450 self.isConfig = True
451 451
452 452 self.dataOut = dataOut
453 453 self.putData()
454 454 return
455 455
456 456 def setup(self, path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs):
457 457 '''
458 458 Configure Operation
459 459 '''
460 460
461 461 self.path = path
462 462 self.blocks = kwargs.get('blocks', None)
463 463 self.counter = 0
464 464 self.oneDDict = load_json(oneDDict)
465 465 self.twoDDict = load_json(twoDDict)
466 466 self.ind2DList = load_json(ind2DList)
467 467 meta = load_json(metadata)
468 468 self.kinst = meta.get('kinst')
469 469 self.kindat = meta.get('kindat')
470 470 self.catalog = meta.get('catalog', DEF_CATALOG)
471 471 self.header = meta.get('header', DEF_HEADER)
472 472 if format == 'cedar':
473 473 self.ext = '.dat'
474 474 self.extra_args = {}
475 475 elif format == 'hdf5':
476 476 self.ext = '.hdf5'
477 477 self.extra_args = {'ind2DList': self.ind2DList}
478 478
479 479 self.keys = [k.lower() for k in self.twoDDict]
480 480 if 'range' in self.keys:
481 481 self.keys.remove('range')
482 482 if 'gdalt' in self.keys:
483 483 self.keys.remove('gdalt')
484 484
485 485 def setFile(self):
486 486 '''
487 487 Create new cedar file object
488 488 '''
489 489
490 490 self.mnemonic = MNEMONICS[self.kinst] #TODO get mnemonic from madrigal
491 491 date = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
492 492
493 493 filename = '{}{}{}'.format(self.mnemonic,
494 494 date.strftime('%Y%m%d_%H%M%S'),
495 495 self.ext)
496 496
497 497 self.fullname = os.path.join(self.path, filename)
498 498
499 499 if os.path.isfile(self.fullname) :
500 500 log.warning(
501 501 'Destination file {} already exists, previous file deleted.'.format(
502 502 self.fullname),
503 503 'MADWriter')
504 504 os.remove(self.fullname)
505 505
506 506 try:
507 507 log.success(
508 508 'Creating file: {}'.format(self.fullname),
509 509 'MADWriter')
510 510 self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
511 511 except ValueError as e:
512 512 log.error(
513 513 'Impossible to create a cedar object with "madrigal.cedar.MadrigalCedarFile"',
514 514 'MADWriter')
515 515 return
516 516
517 517 return 1
518 518
519 519 def writeBlock(self):
520 520 '''
521 521 Add data records to cedar file taking data from oneDDict and twoDDict
522 522 attributes.
523 523 Allowed parameters in: parcodes.tab
524 524 '''
525 525
526 526 startTime = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
527 527 endTime = startTime + datetime.timedelta(seconds=self.dataOut.paramInterval)
528 528 heights = self.dataOut.heightList
529 529
530 530 if self.ext == '.dat':
531 531 for key, value in list(self.twoDDict.items()):
532 532 if isinstance(value, str):
533 533 data = getattr(self.dataOut, value)
534 534 invalid = numpy.isnan(data)
535 535 data[invalid] = self.missing
536 536 elif isinstance(value, (tuple, list)):
537 537 attr, key = value
538 538 data = getattr(self.dataOut, attr)
539 539 invalid = numpy.isnan(data)
540 540 data[invalid] = self.missing
541 541
542 542 out = {}
543 543 for key, value in list(self.twoDDict.items()):
544 544 key = key.lower()
545 545 if isinstance(value, str):
546 546 if 'db' in value.lower():
547 547 tmp = getattr(self.dataOut, value.replace('_db', ''))
548 548 SNRavg = numpy.average(tmp, axis=0)
549 549 tmp = 10*numpy.log10(SNRavg)
550 550 else:
551 551 tmp = getattr(self.dataOut, value)
552 552 out[key] = tmp.flatten()
553 553 elif isinstance(value, (tuple, list)):
554 554 attr, x = value
555 555 data = getattr(self.dataOut, attr)
556 556 out[key] = data[int(x)]
557 557
558 558 a = numpy.array([out[k] for k in self.keys])
559 559 nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
560 560 index = numpy.where(nrows == False)[0]
561 561
562 562 rec = madrigal.cedar.MadrigalDataRecord(
563 563 self.kinst,
564 564 self.kindat,
565 565 startTime.year,
566 566 startTime.month,
567 567 startTime.day,
568 568 startTime.hour,
569 569 startTime.minute,
570 570 startTime.second,
571 571 startTime.microsecond/10000,
572 572 endTime.year,
573 573 endTime.month,
574 574 endTime.day,
575 575 endTime.hour,
576 576 endTime.minute,
577 577 endTime.second,
578 578 endTime.microsecond/10000,
579 579 list(self.oneDDict.keys()),
580 580 list(self.twoDDict.keys()),
581 581 len(index),
582 582 **self.extra_args
583 583 )
584 584
585 585 # Setting 1d values
586 586 for key in self.oneDDict:
587 587 rec.set1D(key, getattr(self.dataOut, self.oneDDict[key]))
588 588
589 589 # Setting 2d values
590 590 nrec = 0
591 591 for n in index:
592 592 for key in out:
593 593 rec.set2D(key, nrec, out[key][n])
594 594 nrec += 1
595 595
596 596 self.fp.append(rec)
597 597 if self.ext == '.hdf5' and self.counter % 500 == 0 and self.counter > 0:
598 598 self.fp.dump()
599 599 if self.counter % 100 == 0 and self.counter > 0:
600 600 log.log(
601 601 'Writing {} records'.format(
602 602 self.counter),
603 603 'MADWriter')
604 604
605 605 def setHeader(self):
606 606 '''
607 607 Create an add catalog and header to cedar file
608 608 '''
609 609
610 610 log.success('Closing file {}'.format(self.fullname), 'MADWriter')
611 611
612 612 if self.ext == '.dat':
613 613 self.fp.write()
614 614 else:
615 615 self.fp.dump()
616 616 self.fp.close()
617 617
618 618 header = madrigal.cedar.CatalogHeaderCreator(self.fullname)
619 619 header.createCatalog(**self.catalog)
620 620 header.createHeader(**self.header)
621 621 header.write()
622 622
623 623 def putData(self):
624 624
625 625 if self.dataOut.flagNoData:
626 626 return 0
627 627
628 628 if self.dataOut.flagDiscontinuousBlock or self.counter == self.blocks:
629 629 if self.counter > 0:
630 630 self.setHeader()
631 631 self.counter = 0
632 632
633 633 if self.counter == 0:
634 634 self.setFile()
635 635
636 636 self.writeBlock()
637 637 self.counter += 1
638 638
639 639 def close(self):
640 640
641 641 if self.counter > 0:
642 642 self.setHeader() No newline at end of file
@@ -1,1093 +1,1036
1 1 import numpy
2 2 import time
3 3 import os
4 4 import h5py
5 5 import re
6 6 import datetime
7 7
8 8 from schainpy.model.data.jrodata import *
9 9 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
10 10 # from .jroIO_base import *
11 11 from schainpy.model.io.jroIO_base import *
12 12 import schainpy
13 13 from schainpy.utils import log
14 14
15 15 @MPDecorator
16 16 class ParamReader(JRODataReader,ProcessingUnit):
17 17 '''
18 18 Reads HDF5 format files
19 19
20 20 path
21 21
22 22 startDate
23 23
24 24 endDate
25 25
26 26 startTime
27 27
28 28 endTime
29 29 '''
30 30
31 31 ext = ".hdf5"
32 32
33 33 optchar = "D"
34 34
35 35 timezone = None
36 36
37 37 startTime = None
38 38
39 39 endTime = None
40 40
41 41 fileIndex = None
42 42
43 43 utcList = None #To select data in the utctime list
44 44
45 45 blockList = None #List to blocks to be read from the file
46 46
47 47 blocksPerFile = None #Number of blocks to be read
48 48
49 49 blockIndex = None
50 50
51 51 path = None
52 52
53 53 #List of Files
54 54
55 55 filenameList = None
56 56
57 57 datetimeList = None
58 58
59 59 #Hdf5 File
60 60
61 61 listMetaname = None
62 62
63 63 listMeta = None
64 64
65 65 listDataname = None
66 66
67 67 listData = None
68 68
69 69 listShapes = None
70 70
71 71 fp = None
72 72
73 73 #dataOut reconstruction
74 74
75 75 dataOut = None
76 76
77 77
78 78 def __init__(self):#, **kwargs):
79 79 ProcessingUnit.__init__(self) #, **kwargs)
80 80 self.dataOut = Parameters()
81 81 return
82 82
83 83 def setup(self, **kwargs):
84 84
85 85 path = kwargs['path']
86 86 startDate = kwargs['startDate']
87 87 endDate = kwargs['endDate']
88 88 startTime = kwargs['startTime']
89 89 endTime = kwargs['endTime']
90 90 walk = kwargs['walk']
91 91 if 'ext' in kwargs:
92 92 ext = kwargs['ext']
93 93 else:
94 94 ext = '.hdf5'
95 95 if 'timezone' in kwargs:
96 96 self.timezone = kwargs['timezone']
97 97 else:
98 98 self.timezone = 'lt'
99 99
100 100 print("[Reading] Searching files in offline mode ...")
101 101 pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate,
102 102 startTime=startTime, endTime=endTime,
103 103 ext=ext, walk=walk)
104 104
105 105 if not(filenameList):
106 106 print("There is no files into the folder: %s"%(path))
107 107 sys.exit(-1)
108 108
109 109 self.fileIndex = -1
110 110 self.startTime = startTime
111 111 self.endTime = endTime
112 112
113 113 self.__readMetadata()
114 114
115 115 self.__setNextFileOffline()
116 116
117 117 return
118 118
119 119 def searchFilesOffLine(self,
120 120 path,
121 121 startDate=None,
122 122 endDate=None,
123 123 startTime=datetime.time(0,0,0),
124 124 endTime=datetime.time(23,59,59),
125 125 ext='.hdf5',
126 126 walk=True):
127 127
128 128 expLabel = ''
129 129 self.filenameList = []
130 130 self.datetimeList = []
131 131
132 132 pathList = []
133 133
134 134 JRODataObj = JRODataReader()
135 135 dateList, pathList = JRODataObj.findDatafiles(path, startDate, endDate, expLabel, ext, walk, include_path=True)
136 136
137 137 if dateList == []:
138 138 print("[Reading] No *%s files in %s from %s to %s)"%(ext, path,
139 139 datetime.datetime.combine(startDate,startTime).ctime(),
140 140 datetime.datetime.combine(endDate,endTime).ctime()))
141 141
142 142 return None, None
143 143
144 144 if len(dateList) > 1:
145 145 print("[Reading] %d days were found in date range: %s - %s" %(len(dateList), startDate, endDate))
146 146 else:
147 147 print("[Reading] data was found for the date %s" %(dateList[0]))
148 148
149 149 filenameList = []
150 150 datetimeList = []
151 151
152 152 #----------------------------------------------------------------------------------
153 153
154 154 for thisPath in pathList:
155 # thisPath = pathList[pathDict[file]]
155 # thisPath = pathList[pathDict[file]]
156 156
157 157 fileList = glob.glob1(thisPath, "*%s" %ext)
158 158 fileList.sort()
159 159
160 160 for file in fileList:
161 161
162 162 filename = os.path.join(thisPath,file)
163 163
164 164 if not isFileInDateRange(filename, startDate, endDate):
165 165 continue
166 166
167 167 thisDatetime = self.__isFileInTimeRange(filename, startDate, endDate, startTime, endTime)
168 168
169 169 if not(thisDatetime):
170 170 continue
171 171
172 172 filenameList.append(filename)
173 173 datetimeList.append(thisDatetime)
174 174
175 175 if not(filenameList):
176 176 print("[Reading] Any file was found int time range %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime()))
177 177 return None, None
178 178
179 179 print("[Reading] %d file(s) was(were) found in time range: %s - %s" %(len(filenameList), startTime, endTime))
180 180 print()
181 181
182 182 self.filenameList = filenameList
183 183 self.datetimeList = datetimeList
184 184
185 185 return pathList, filenameList
186 186
187 187 def __isFileInTimeRange(self,filename, startDate, endDate, startTime, endTime):
188 188
189 189 """
190 190 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
191 191
192 192 Inputs:
193 193 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
194 194
195 195 startDate : fecha inicial del rango seleccionado en formato datetime.date
196 196
197 197 endDate : fecha final del rango seleccionado en formato datetime.date
198 198
199 199 startTime : tiempo inicial del rango seleccionado en formato datetime.time
200 200
201 201 endTime : tiempo final del rango seleccionado en formato datetime.time
202 202
203 203 Return:
204 204 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
205 205 fecha especificado, de lo contrario retorna False.
206 206
207 207 Excepciones:
208 208 Si el archivo no existe o no puede ser abierto
209 209 Si la cabecera no puede ser leida.
210 210
211 211 """
212 212
213 213 try:
214 214 fp = h5py.File(filename,'r')
215 215 grp1 = fp['Data']
216 216
217 217 except IOError:
218 218 traceback.print_exc()
219 219 raise IOError("The file %s can't be opened" %(filename))
220 220 #chino rata
221 221 #In case has utctime attribute
222 222 grp2 = grp1['utctime']
223 # thisUtcTime = grp2.value[0] - 5*3600 #To convert to local time
223 # thisUtcTime = grp2.value[0] - 5*3600 #To convert to local time
224 224 thisUtcTime = grp2.value[0]
225 225
226 226 fp.close()
227 227
228 228 if self.timezone == 'lt':
229 229 thisUtcTime -= 5*3600
230 230
231 231 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
232 # thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0])
232 # thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0])
233 233 thisDate = thisDatetime.date()
234 234 thisTime = thisDatetime.time()
235 235
236 236 startUtcTime = (datetime.datetime.combine(thisDate,startTime)- datetime.datetime(1970, 1, 1)).total_seconds()
237 237 endUtcTime = (datetime.datetime.combine(thisDate,endTime)- datetime.datetime(1970, 1, 1)).total_seconds()
238 238
239 239 #General case
240 240 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
241 241 #-----------o----------------------------o-----------
242 242 # startTime endTime
243 243
244 244 if endTime >= startTime:
245 245 thisUtcLog = numpy.logical_and(thisUtcTime > startUtcTime, thisUtcTime < endUtcTime)
246 246 if numpy.any(thisUtcLog): #If there is one block between the hours mentioned
247 247 return thisDatetime
248 248 return None
249 249
250 250 #If endTime < startTime then endTime belongs to the next day
251 251 #<<<<<<<<<<<o o>>>>>>>>>>>
252 252 #-----------o----------------------------o-----------
253 253 # endTime startTime
254 254
255 255 if (thisDate == startDate) and numpy.all(thisUtcTime < startUtcTime):
256 256 return None
257 257
258 258 if (thisDate == endDate) and numpy.all(thisUtcTime > endUtcTime):
259 259 return None
260 260
261 261 if numpy.all(thisUtcTime < startUtcTime) and numpy.all(thisUtcTime > endUtcTime):
262 262 return None
263 263
264 264 return thisDatetime
265 265
266 266 def __setNextFileOffline(self):
267 267
268 268 self.fileIndex += 1
269 269 idFile = self.fileIndex
270 270
271 271 if not(idFile < len(self.filenameList)):
272 272 print("No more Files")
273 273 return 0
274 274
275 275 filename = self.filenameList[idFile]
276 276
277 277 filePointer = h5py.File(filename,'r')
278 278
279 279 self.filename = filename
280 280
281 281 self.fp = filePointer
282 282
283 283 print("Setting the file: %s"%self.filename)
284 284
285 # self.__readMetadata()
285 # self.__readMetadata()
286 286 self.__setBlockList()
287 287 self.__readData()
288 # self.nRecords = self.fp['Data'].attrs['blocksPerFile']
289 # self.nRecords = self.fp['Data'].attrs['nRecords']
288 # self.nRecords = self.fp['Data'].attrs['blocksPerFile']
289 # self.nRecords = self.fp['Data'].attrs['nRecords']
290 290 self.blockIndex = 0
291 291 return 1
292 292
293 293 def __setBlockList(self):
294 294 '''
295 295 Selects the data within the times defined
296 296
297 297 self.fp
298 298 self.startTime
299 299 self.endTime
300 300
301 301 self.blockList
302 302 self.blocksPerFile
303 303
304 304 '''
305 305 fp = self.fp
306 306 startTime = self.startTime
307 307 endTime = self.endTime
308 308
309 309 grp = fp['Data']
310 310 thisUtcTime = grp['utctime'].value.astype(numpy.float)[0]
311 311
312 312 #ERROOOOR
313 313 if self.timezone == 'lt':
314 314 thisUtcTime -= 5*3600
315 315
316 316 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
317 317
318 318 thisDate = thisDatetime.date()
319 319 thisTime = thisDatetime.time()
320 320
321 321 startUtcTime = (datetime.datetime.combine(thisDate,startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
322 322 endUtcTime = (datetime.datetime.combine(thisDate,endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
323 323
324 324 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
325 325
326 326 self.blockList = ind
327 327 self.blocksPerFile = len(ind)
328 328
329 329 return
330 330
331 331 def __readMetadata(self):
332 332 '''
333 333 Reads Metadata
334 334
335 335 self.pathMeta
336 336
337 337 self.listShapes
338 338 self.listMetaname
339 339 self.listMeta
340 340
341 341 '''
342 342
343 # grp = self.fp['Data']
344 # pathMeta = os.path.join(self.path, grp.attrs['metadata'])
345 #
346 # if pathMeta == self.pathMeta:
347 # return
348 # else:
349 # self.pathMeta = pathMeta
350 #
351 # filePointer = h5py.File(self.pathMeta,'r')
352 # groupPointer = filePointer['Metadata']
343 # grp = self.fp['Data']
344 # pathMeta = os.path.join(self.path, grp.attrs['metadata'])
345 #
346 # if pathMeta == self.pathMeta:
347 # return
348 # else:
349 # self.pathMeta = pathMeta
350 #
351 # filePointer = h5py.File(self.pathMeta,'r')
352 # groupPointer = filePointer['Metadata']
353 353
354 354 filename = self.filenameList[0]
355 355
356 356 fp = h5py.File(filename,'r')
357 357
358 358 gp = fp['Metadata']
359 359
360 360 listMetaname = []
361 361 listMetadata = []
362 362 for item in list(gp.items()):
363 363 name = item[0]
364 364
365 365 if name=='array dimensions':
366 366 table = gp[name][:]
367 367 listShapes = {}
368 368 for shapes in table:
369 369 listShapes[shapes[0]] = numpy.array([shapes[1],shapes[2],shapes[3],shapes[4],shapes[5]])
370 370 else:
371 371 data = gp[name].value
372 372 listMetaname.append(name)
373 373 listMetadata.append(data)
374 374
375 # if name=='type':
376 # self.__initDataOut(data)
375 # if name=='type':
376 # self.__initDataOut(data)
377 377
378 378 self.listShapes = listShapes
379 379 self.listMetaname = listMetaname
380 380 self.listMeta = listMetadata
381 381
382 382 fp.close()
383 383 return
384 384
385 385 def __readData(self):
386 386 grp = self.fp['Data']
387 387 listdataname = []
388 388 listdata = []
389 389
390 390 for item in list(grp.items()):
391 391 name = item[0]
392 392 listdataname.append(name)
393 393
394 394 array = self.__setDataArray(grp[name],self.listShapes[name])
395 395 listdata.append(array)
396 396
397 397 self.listDataname = listdataname
398 398 self.listData = listdata
399 399 return
400 400
401 401 def __setDataArray(self, dataset, shapes):
402 402
403 403 nDims = shapes[0]
404 404
405 405 nDim2 = shapes[1] #Dimension 0
406 406
407 407 nDim1 = shapes[2] #Dimension 1, number of Points or Parameters
408 408
409 409 nDim0 = shapes[3] #Dimension 2, number of samples or ranges
410 410
411 411 mode = shapes[4] #Mode of storing
412 412
413 413 blockList = self.blockList
414 414
415 415 blocksPerFile = self.blocksPerFile
416 416
417 417 #Depending on what mode the data was stored
418 418 if mode == 0: #Divided in channels
419 419 arrayData = dataset.value.astype(numpy.float)[0][blockList]
420 420 if mode == 1: #Divided in parameter
421 421 strds = 'table'
422 422 nDatas = nDim1
423 423 newShapes = (blocksPerFile,nDim2,nDim0)
424 424 elif mode==2: #Concatenated in a table
425 425 strds = 'table0'
426 426 arrayData = dataset[strds].value
427 427 #Selecting part of the dataset
428 428 utctime = arrayData[:,0]
429 429 u, indices = numpy.unique(utctime, return_index=True)
430 430
431 431 if blockList.size != indices.size:
432 432 indMin = indices[blockList[0]]
433 433 if blockList[1] + 1 >= indices.size:
434 434 arrayData = arrayData[indMin:,:]
435 435 else:
436 436 indMax = indices[blockList[1] + 1]
437 437 arrayData = arrayData[indMin:indMax,:]
438 438 return arrayData
439 439
440 440 # One dimension
441 441 if nDims == 0:
442 442 arrayData = dataset.value.astype(numpy.float)[0][blockList]
443 443
444 444 # Two dimensions
445 445 elif nDims == 2:
446 446 arrayData = numpy.zeros((blocksPerFile,nDim1,nDim0))
447 447 newShapes = (blocksPerFile,nDim0)
448 448 nDatas = nDim1
449 449
450 450 for i in range(nDatas):
451 451 data = dataset[strds + str(i)].value
452 452 arrayData[:,i,:] = data[blockList,:]
453 453
454 454 # Three dimensions
455 455 else:
456 456 arrayData = numpy.zeros((blocksPerFile,nDim2,nDim1,nDim0))
457 457 for i in range(nDatas):
458 458
459 459 data = dataset[strds + str(i)].value
460 460
461 461 for b in range(blockList.size):
462 462 arrayData[b,:,i,:] = data[:,:,blockList[b]]
463 463
464 464 return arrayData
465 465
466 466 def __setDataOut(self):
467 467 listMeta = self.listMeta
468 468 listMetaname = self.listMetaname
469 469 listDataname = self.listDataname
470 470 listData = self.listData
471 471 listShapes = self.listShapes
472 472
473 473 blockIndex = self.blockIndex
474 # blockList = self.blockList
474 # blockList = self.blockList
475 475
476 476 for i in range(len(listMeta)):
477 477 setattr(self.dataOut,listMetaname[i],listMeta[i])
478 478
479 479 for j in range(len(listData)):
480 480 nShapes = listShapes[listDataname[j]][0]
481 481 mode = listShapes[listDataname[j]][4]
482 482 if nShapes == 1:
483 483 setattr(self.dataOut,listDataname[j],listData[j][blockIndex])
484 484 elif nShapes > 1:
485 485 setattr(self.dataOut,listDataname[j],listData[j][blockIndex,:])
486 486 elif mode==0:
487 487 setattr(self.dataOut,listDataname[j],listData[j][blockIndex])
488 488 #Mode Meteors
489 489 elif mode ==2:
490 490 selectedData = self.__selectDataMode2(listData[j], blockIndex)
491 491 setattr(self.dataOut, listDataname[j], selectedData)
492 492 return
493 493
494 494 def __selectDataMode2(self, data, blockIndex):
495 495 utctime = data[:,0]
496 496 aux, indices = numpy.unique(utctime, return_inverse=True)
497 497 selInd = numpy.where(indices == blockIndex)[0]
498 498 selData = data[selInd,:]
499 499
500 500 return selData
501 501
502 502 def getData(self):
503 503
504 504 if self.blockIndex==self.blocksPerFile:
505 505 if not( self.__setNextFileOffline() ):
506 506 self.dataOut.flagNoData = True
507 507 return 0
508 508
509 509 self.__setDataOut()
510 510 self.dataOut.flagNoData = False
511 511
512 512 self.blockIndex += 1
513 513
514 514 return
515 515
516 516 def run(self, **kwargs):
517 517
518 518 if not(self.isConfig):
519 519 self.setup(**kwargs)
520 # self.setObjProperties()
520 # self.setObjProperties()
521 521 self.isConfig = True
522 522
523 523 self.getData()
524 524
525 525 return
526
526 527 @MPDecorator
527 528 class ParamWriter(Operation):
528 529 '''
529 530 HDF5 Writer, stores parameters data in HDF5 format files
530 531
531 532 path: path where the files will be stored
532 533
533 534 blocksPerFile: number of blocks that will be saved in per HDF5 format file
534 535
535 536 mode: selects the data stacking mode: '0' channels, '1' parameters, '3' table (for meteors)
536 537
537 538 metadataList: list of attributes that will be stored as metadata
538 539
539 540 dataList: list of attributes that will be stores as data
540 541
541 542 '''
542 543
543 544
544 545 ext = ".hdf5"
545
546 546 optchar = "D"
547
548 547 metaoptchar = "M"
549
550 548 metaFile = None
551
552 549 filename = None
553
554 550 path = None
555
556 551 setFile = None
557
558 552 fp = None
559
560 553 grp = None
561
562 554 ds = None
563
564 555 firsttime = True
565
566 556 #Configurations
567
568 557 blocksPerFile = None
569
570 558 blockIndex = None
571
572 559 dataOut = None
573
574 560 #Data Arrays
575
576 561 dataList = None
577
578 562 metadataList = None
579
580 # arrayDim = None
581
582 563 dsList = None #List of dictionaries with dataset properties
583
584 564 tableDim = None
585
586 # dtype = [('arrayName', 'S20'),('nChannels', 'i'), ('nPoints', 'i'), ('nSamples', 'i'),('mode', 'b')]
587
588 565 dtype = [('arrayName', 'S20'),('nDimensions', 'i'), ('dim2', 'i'), ('dim1', 'i'),('dim0', 'i'),('mode', 'b')]
589
590 566 currentDay = None
591
592 567 lastTime = None
593 568
594 def __init__(self):#, **kwargs):
595 Operation.__init__(self)#, **kwargs)
596 #self.isConfig = False
569 def __init__(self):
570
571 Operation.__init__(self)
597 572 return
598 573
599 def setup(self, dataOut, path=None, blocksPerFile=10, metadataList=None, dataList=None, mode=None, **kwargs):
574 def setup(self, dataOut, path=None, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None):
600 575 self.path = path
601 576 self.blocksPerFile = blocksPerFile
602 577 self.metadataList = metadataList
603 578 self.dataList = dataList
604 579 self.dataOut = dataOut
605 580 self.mode = mode
606 581 if self.mode is not None:
607 582 self.mode = numpy.zeros(len(self.dataList)) + mode
608 583 else:
609 #self.mode = numpy.ones(len(self.dataList),int)
610 584 self.mode = numpy.ones(len(self.dataList))
611 log.error(self.mode)#yong
585
586 self.setType = setType
612 587
613 588 arrayDim = numpy.zeros((len(self.dataList),5))
614 589
615 590 #Table dimensions
616 591 dtype0 = self.dtype
617 592 tableList = []
618 593
619 594 #Dictionary and list of tables
620 595 dsList = []
621 596
622 597 for i in range(len(self.dataList)):
623 598 dsDict = {}
624 599 dataAux = getattr(self.dataOut, self.dataList[i])
625 600 dsDict['variable'] = self.dataList[i]
626 601 #--------------------- Conditionals ------------------------
627 602 #There is no data
628 603
629 604
630 605 if dataAux is None:
631 606
632 607 return 0
633 608
634 #Not array, just a number
635 #Mode 0
636 #log.error(mode)#yong
637 #log.error(len(mode))#yong
638 #log.error(type(mode))#yong
639 609 if type(dataAux)==float or type(dataAux)==int:
640 610 dsDict['mode'] = 0
641 611 dsDict['nDim'] = 0
642 612 arrayDim[i,0] = 0
643 613 dsList.append(dsDict)
644 614
645 615 #Mode 2: meteors
646 616 elif self.mode[i] == 2:
647 # dsDict['nDim'] = 0
648 617 dsDict['dsName'] = 'table0'
649 618 dsDict['mode'] = 2 # Mode meteors
650 619 dsDict['shape'] = dataAux.shape[-1]
651 620 dsDict['nDim'] = 0
652 621 dsDict['dsNumber'] = 1
653
654 622 arrayDim[i,3] = dataAux.shape[-1]
655 623 arrayDim[i,4] = self.mode[i] #Mode the data was stored
656
657 624 dsList.append(dsDict)
658 625
659 626 #Mode 1
660 627 else:
661 628 arrayDim0 = dataAux.shape #Data dimensions
662 629 arrayDim[i,0] = len(arrayDim0) #Number of array dimensions
663 630 arrayDim[i,4] = self.mode[i] #Mode the data was stored
664
665 631 strtable = 'table'
666 632 dsDict['mode'] = 1 # Mode parameters
667 633
668 634 # Three-dimension arrays
669 635 if len(arrayDim0) == 3:
670 636 arrayDim[i,1:-1] = numpy.array(arrayDim0)
671 637 nTables = int(arrayDim[i,2])
672 638 dsDict['dsNumber'] = nTables
673 639 dsDict['shape'] = arrayDim[i,2:4]
674 640 dsDict['nDim'] = 3
675 641
676 642 for j in range(nTables):
677 643 dsDict = dsDict.copy()
678 644 dsDict['dsName'] = strtable + str(j)
679 645 dsList.append(dsDict)
680 646
681 647 # Two-dimension arrays
682 648 elif len(arrayDim0) == 2:
683 649 arrayDim[i,2:-1] = numpy.array(arrayDim0)
684 650 nTables = int(arrayDim[i,2])
685 651 dsDict['dsNumber'] = nTables
686 652 dsDict['shape'] = arrayDim[i,3]
687 653 dsDict['nDim'] = 2
688 654
689 655 for j in range(nTables):
690 656 dsDict = dsDict.copy()
691 657 dsDict['dsName'] = strtable + str(j)
692 658 dsList.append(dsDict)
693 659
694 660 # One-dimension arrays
695 661 elif len(arrayDim0) == 1:
696 662 arrayDim[i,3] = arrayDim0[0]
697 663 dsDict['shape'] = arrayDim0[0]
698 664 dsDict['dsNumber'] = 1
699 665 dsDict['dsName'] = strtable + str(0)
700 666 dsDict['nDim'] = 1
701 667 dsList.append(dsDict)
702 668
703 669 table = numpy.array((self.dataList[i],) + tuple(arrayDim[i,:]),dtype = dtype0)
704 670 tableList.append(table)
705 671
706 # self.arrayDim = arrayDim
707 672 self.dsList = dsList
708 673 self.tableDim = numpy.array(tableList, dtype = dtype0)
709 674 self.blockIndex = 0
710
711 675 timeTuple = time.localtime(dataOut.utctime)
712 676 self.currentDay = timeTuple.tm_yday
713 return 1
714 677
715 678 def putMetadata(self):
716 679
717 680 fp = self.createMetadataFile()
718 681 self.writeMetadata(fp)
719 682 fp.close()
720 683 return
721 684
722 685 def createMetadataFile(self):
723 686 ext = self.ext
724 687 path = self.path
725 688 setFile = self.setFile
726 689
727 690 timeTuple = time.localtime(self.dataOut.utctime)
728 691
729 692 subfolder = ''
730 693 fullpath = os.path.join( path, subfolder )
731 694
732 695 if not( os.path.exists(fullpath) ):
733 696 os.mkdir(fullpath)
734 697 setFile = -1 #inicializo mi contador de seteo
735 698
736 699 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
737 700 fullpath = os.path.join( path, subfolder )
738 701
739 702 if not( os.path.exists(fullpath) ):
740 703 os.mkdir(fullpath)
741 704 setFile = -1 #inicializo mi contador de seteo
742 705
743 706 else:
744 707 filesList = os.listdir( fullpath )
745 708 filesList = sorted( filesList, key=str.lower )
746 709 if len( filesList ) > 0:
747 filesList = [k for k in filesList if 'M' in k]
710 filesList = [k for k in filesList if k.startswith(self.metaoptchar)]
748 711 filen = filesList[-1]
749 712 # el filename debera tener el siguiente formato
750 713 # 0 1234 567 89A BCDE (hex)
751 714 # x YYYY DDD SSS .ext
752 715 if isNumber( filen[8:11] ):
753 716 setFile = int( filen[8:11] ) #inicializo mi contador de seteo al seteo del ultimo file
754 717 else:
755 718 setFile = -1
756 719 else:
757 720 setFile = -1 #inicializo mi contador de seteo
758 721
759 722 if self.setType is None:
760 723 setFile += 1
761 724 file = '%s%4.4d%3.3d%03d%s' % (self.metaoptchar,
762 725 timeTuple.tm_year,
763 726 timeTuple.tm_yday,
764 727 setFile,
765 728 ext )
766 729 else:
767 730 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
768 731 file = '%s%4.4d%3.3d%04d%s' % (self.metaoptchar,
769 732 timeTuple.tm_year,
770 733 timeTuple.tm_yday,
771 734 setFile,
772 735 ext )
773 736
774 737 filename = os.path.join( path, subfolder, file )
775 738 self.metaFile = file
776 739 #Setting HDF5 File
777 740 fp = h5py.File(filename,'w')
778 741
779 742 return fp
780 743
781 744 def writeMetadata(self, fp):
782 745
783 746 grp = fp.create_group("Metadata")
784 747 grp.create_dataset('array dimensions', data = self.tableDim, dtype = self.dtype)
785 748
786 749 for i in range(len(self.metadataList)):
787 750 grp.create_dataset(self.metadataList[i], data=getattr(self.dataOut, self.metadataList[i]))
788 751 return
789 752
790 753 def timeFlag(self):
791 754 currentTime = self.dataOut.utctime
792 755
793 756 if self.lastTime is None:
794 757 self.lastTime = currentTime
795 758
796 759 #Day
797 760 timeTuple = time.localtime(currentTime)
798 761 dataDay = timeTuple.tm_yday
799 762
800 763 #Time
801 764 timeDiff = currentTime - self.lastTime
802 765
803 766 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
804 767 if dataDay != self.currentDay:
805 768 self.currentDay = dataDay
806 769 return True
807 770 elif timeDiff > 3*60*60:
808 771 self.lastTime = currentTime
809 772 return True
810 773 else:
811 774 self.lastTime = currentTime
812 775 return False
813 776
814 777 def setNextFile(self):
815 778
816 779 ext = self.ext
817 780 path = self.path
818 781 setFile = self.setFile
819 782 mode = self.mode
820 783
821 784 timeTuple = time.localtime(self.dataOut.utctime)
822 785 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
823 786
824 787 fullpath = os.path.join( path, subfolder )
825 788
826 789 if os.path.exists(fullpath):
827 790 filesList = os.listdir( fullpath )
828 filesList = [k for k in filesList if 'D' in k]
791 filesList = [k for k in filesList if k.startswith(self.optchar)]
829 792 if len( filesList ) > 0:
830 793 filesList = sorted( filesList, key=str.lower )
831 794 filen = filesList[-1]
832 795 # el filename debera tener el siguiente formato
833 796 # 0 1234 567 89A BCDE (hex)
834 797 # x YYYY DDD SSS .ext
835 798 if isNumber( filen[8:11] ):
836 799 setFile = int( filen[8:11] ) #inicializo mi contador de seteo al seteo del ultimo file
837 800 else:
838 801 setFile = -1
839 802 else:
840 803 setFile = -1 #inicializo mi contador de seteo
841 804 else:
842 805 os.makedirs(fullpath)
843 806 setFile = -1 #inicializo mi contador de seteo
844 807
845 if None is None:
808 if self.setType is None:
846 809 setFile += 1
847 file = '%s%4.4d%3.3d%03d%s' % (self.metaoptchar,
810 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
848 811 timeTuple.tm_year,
849 812 timeTuple.tm_yday,
850 813 setFile,
851 814 ext )
852 815 else:
853 816 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
854 file = '%s%4.4d%3.3d%04d%s' % (self.metaoptchar,
817 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
855 818 timeTuple.tm_year,
856 819 timeTuple.tm_yday,
857 820 setFile,
858 821 ext )
859 822
860 823 filename = os.path.join( path, subfolder, file )
861 824
862 825 #Setting HDF5 File
863 826 fp = h5py.File(filename,'w')
864 827 #write metadata
865 828 self.writeMetadata(fp)
866 829 #Write data
867 830 grp = fp.create_group("Data")
868 # grp.attrs['metadata'] = self.metaFile
869
870 # grp.attrs['blocksPerFile'] = 0
871 831 ds = []
872 832 data = []
873 833 dsList = self.dsList
874 834 i = 0
875 835 while i < len(dsList):
876 836 dsInfo = dsList[i]
877 837 #One-dimension data
878 838 if dsInfo['mode'] == 0:
879 # ds0 = grp.create_dataset(self.dataList[i], (1,1), maxshape=(1,self.blocksPerFile) , chunks = True, dtype='S20')
880 839 ds0 = grp.create_dataset(dsInfo['variable'], (1,1), maxshape=(1,self.blocksPerFile) , chunks = True, dtype=numpy.float64)
881 840 ds.append(ds0)
882 841 data.append([])
883 842 i += 1
884 843 continue
885 # nDimsForDs.append(nDims[i])
886 844
887 845 elif dsInfo['mode'] == 2:
888 846 grp0 = grp.create_group(dsInfo['variable'])
889 847 ds0 = grp0.create_dataset(dsInfo['dsName'], (1,dsInfo['shape']), data = numpy.zeros((1,dsInfo['shape'])) , maxshape=(None,dsInfo['shape']), chunks=True)
890 848 ds.append(ds0)
891 849 data.append([])
892 850 i += 1
893 851 continue
894 852
895 853 elif dsInfo['mode'] == 1:
896 854 grp0 = grp.create_group(dsInfo['variable'])
897 855
898 856 for j in range(dsInfo['dsNumber']):
899 857 dsInfo = dsList[i]
900 858 tableName = dsInfo['dsName']
901 859
902 860
903 861 if dsInfo['nDim'] == 3:
904 862 shape = dsInfo['shape'].astype(int)
905 863 ds0 = grp0.create_dataset(tableName, (shape[0],shape[1],1) , data = numpy.zeros((shape[0],shape[1],1)), maxshape = (None,shape[1],None), chunks=True)
906 864 else:
907 865 shape = int(dsInfo['shape'])
908 866 ds0 = grp0.create_dataset(tableName, (1,shape), data = numpy.zeros((1,shape)) , maxshape=(None,shape), chunks=True)
909 867
910 868 ds.append(ds0)
911 869 data.append([])
912 870 i += 1
913 # nDimsForDs.append(nDims[i])
914 871
915 872 fp.flush()
916 873 fp.close()
917 874
918 # self.nDatas = nDatas
919 # self.nDims = nDims
920 # self.nDimsForDs = nDimsForDs
921 #Saving variables
922 print('Writing the file: %s'%filename)
875 log.log('creating file: {}'.format(filename), 'Writing')
923 876 self.filename = filename
924 # self.fp = fp
925 # self.grp = grp
926 # self.grp.attrs.modify('nRecords', 1)
927 877 self.ds = ds
928 878 self.data = data
929 # self.setFile = setFile
930 879 self.firsttime = True
931 880 self.blockIndex = 0
932 881 return
933 882
934 883 def putData(self):
935 884
936 885 if self.blockIndex == self.blocksPerFile or self.timeFlag():
937 886 self.setNextFile()
938 887
939 # if not self.firsttime:
940 888 self.readBlock()
941 889 self.setBlock() #Prepare data to be written
942 890 self.writeBlock() #Write data
943 891
944 892 return
945 893
946 894 def readBlock(self):
947 895
948 896 '''
949 897 data Array configured
950 898
951 899
952 900 self.data
953 901 '''
954 902 dsList = self.dsList
955 903 ds = self.ds
956 904 #Setting HDF5 File
957 905 fp = h5py.File(self.filename,'r+')
958 906 grp = fp["Data"]
959 907 ind = 0
960 908
961 # grp.attrs['blocksPerFile'] = 0
962 909 while ind < len(dsList):
963 910 dsInfo = dsList[ind]
964 911
965 912 if dsInfo['mode'] == 0:
966 913 ds0 = grp[dsInfo['variable']]
967 914 ds[ind] = ds0
968 915 ind += 1
969 916 else:
970 917
971 918 grp0 = grp[dsInfo['variable']]
972 919
973 920 for j in range(dsInfo['dsNumber']):
974 921 dsInfo = dsList[ind]
975 922 ds0 = grp0[dsInfo['dsName']]
976 923 ds[ind] = ds0
977 924 ind += 1
978 925
979 926 self.fp = fp
980 927 self.grp = grp
981 928 self.ds = ds
982 929
983 930 return
984 931
985 932 def setBlock(self):
986 933 '''
987 934 data Array configured
988 935
989 936
990 937 self.data
991 938 '''
992 939 #Creating Arrays
993 940 dsList = self.dsList
994 941 data = self.data
995 942 ind = 0
996 943
997 944 while ind < len(dsList):
998 945 dsInfo = dsList[ind]
999 946 dataAux = getattr(self.dataOut, dsInfo['variable'])
1000 947
1001 948 mode = dsInfo['mode']
1002 949 nDim = dsInfo['nDim']
1003 950
1004 951 if mode == 0 or mode == 2 or nDim == 1:
1005 952 data[ind] = dataAux
1006 953 ind += 1
1007 # elif nDim == 1:
1008 # data[ind] = numpy.reshape(dataAux,(numpy.size(dataAux),1))
1009 # ind += 1
954 # elif nDim == 1:
955 # data[ind] = numpy.reshape(dataAux,(numpy.size(dataAux),1))
956 # ind += 1
1010 957 elif nDim == 2:
1011 958 for j in range(dsInfo['dsNumber']):
1012 959 data[ind] = dataAux[j,:]
1013 960 ind += 1
1014 961 elif nDim == 3:
1015 962 for j in range(dsInfo['dsNumber']):
1016 963 data[ind] = dataAux[:,j,:]
1017 964 ind += 1
1018 965
1019 966 self.data = data
1020 967 return
1021 968
1022 969 def writeBlock(self):
1023 970 '''
1024 971 Saves the block in the HDF5 file
1025 972 '''
1026 973 dsList = self.dsList
1027 974
1028 975 for i in range(len(self.ds)):
1029 976 dsInfo = dsList[i]
1030 977 nDim = dsInfo['nDim']
1031 978 mode = dsInfo['mode']
1032 979
1033 980 # First time
1034 981 if self.firsttime:
1035 # self.ds[i].resize(self.data[i].shape)
1036 # self.ds[i][self.blockIndex,:] = self.data[i]
1037 982 if type(self.data[i]) == numpy.ndarray:
1038 983
1039 984 if nDim == 3:
1040 985 self.data[i] = self.data[i].reshape((self.data[i].shape[0],self.data[i].shape[1],1))
1041 986 self.ds[i].resize(self.data[i].shape)
1042 987 if mode == 2:
1043 988 self.ds[i].resize(self.data[i].shape)
1044 989 self.ds[i][:] = self.data[i]
1045 990 else:
1046 991
1047 992 # From second time
1048 993 # Meteors!
1049 994 if mode == 2:
1050 995 dataShape = self.data[i].shape
1051 996 dsShape = self.ds[i].shape
1052 997 self.ds[i].resize((self.ds[i].shape[0] + dataShape[0],self.ds[i].shape[1]))
1053 998 self.ds[i][dsShape[0]:,:] = self.data[i]
1054 999 # No dimension
1055 1000 elif mode == 0:
1056 1001 self.ds[i].resize((self.ds[i].shape[0], self.ds[i].shape[1] + 1))
1057 1002 self.ds[i][0,-1] = self.data[i]
1058 1003 # One dimension
1059 1004 elif nDim == 1:
1060 1005 self.ds[i].resize((self.ds[i].shape[0] + 1, self.ds[i].shape[1]))
1061 1006 self.ds[i][-1,:] = self.data[i]
1062 1007 # Two dimension
1063 1008 elif nDim == 2:
1064 1009 self.ds[i].resize((self.ds[i].shape[0] + 1,self.ds[i].shape[1]))
1065 1010 self.ds[i][self.blockIndex,:] = self.data[i]
1066 1011 # Three dimensions
1067 1012 elif nDim == 3:
1068 1013 self.ds[i].resize((self.ds[i].shape[0],self.ds[i].shape[1],self.ds[i].shape[2]+1))
1069 1014 self.ds[i][:,:,-1] = self.data[i]
1070 1015
1071 1016 self.firsttime = False
1072 1017 self.blockIndex += 1
1073 1018
1074 1019 #Close to save changes
1075 1020 self.fp.flush()
1076 1021 self.fp.close()
1077 1022 return
1078 1023
1079 def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, mode=None, **kwargs):
1024 def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None):
1080 1025
1081 1026 if not(self.isConfig):
1082 flagdata = self.setup(dataOut, path=path, blocksPerFile=blocksPerFile,
1083 metadataList=metadataList, dataList=dataList, mode=mode, **kwargs)
1084
1085 if not(flagdata):
1086 return
1027 self.setup(dataOut, path=path, blocksPerFile=blocksPerFile,
1028 metadataList=metadataList, dataList=dataList, mode=mode,
1029 setType=setType)
1087 1030
1088 1031 self.isConfig = True
1089 1032 self.setNextFile()
1090 1033
1091 1034 self.putData()
1092 1035 return
1093 1036 No newline at end of file
@@ -1,380 +1,382
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14
15 15 import inspect
16 16 import zmq
17 17 import time
18 18 import pickle
19 19 import os
20 20 from multiprocessing import Process
21 21 from zmq.utils.monitor import recv_monitor_message
22 22
23 23 from schainpy.utils import log
24 24
25 25
26 26 class ProcessingUnit(object):
27 27
28 28 """
29 29 Update - Jan 2018 - MULTIPROCESSING
30 30 All the "call" methods present in the previous base were removed.
31 31 The majority of operations are independant processes, thus
32 32 the decorator is in charge of communicate the operation processes
33 33 with the proccessing unit via IPC.
34 34
35 35 The constructor does not receive any argument. The remaining methods
36 36 are related with the operations to execute.
37 37
38 38
39 39 """
40 40
41 41 def __init__(self):
42 42
43 43 self.dataIn = None
44 44 self.dataOut = None
45 45 self.isConfig = False
46 46 self.operations = []
47 47 self.plots = []
48 48
49 49 def getAllowedArgs(self):
50 50 if hasattr(self, '__attrs__'):
51 51 return self.__attrs__
52 52 else:
53 53 return inspect.getargspec(self.run).args
54 54
55 55 def addOperation(self, conf, operation):
56 56 """
57 57 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
58 58 posses the id of the operation process (IPC purposes)
59 59
60 60 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
61 61 identificador asociado a este objeto.
62 62
63 63 Input:
64 64
65 65 object : objeto de la clase "Operation"
66 66
67 67 Return:
68 68
69 69 objId : identificador del objeto, necesario para comunicar con master(procUnit)
70 70 """
71 71
72 72 self.operations.append(
73 73 (operation, conf.type, conf.id, conf.getKwargs()))
74 74
75 75 if 'plot' in self.name.lower():
76 76 self.plots.append(operation.CODE)
77 77
78 78 def getOperationObj(self, objId):
79 79
80 80 if objId not in list(self.operations.keys()):
81 81 return None
82 82
83 83 return self.operations[objId]
84 84
85 85 def operation(self, **kwargs):
86 86 """
87 87 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
88 88 atributos del objeto dataOut
89 89
90 90 Input:
91 91
92 92 **kwargs : Diccionario de argumentos de la funcion a ejecutar
93 93 """
94 94
95 95 raise NotImplementedError
96 96
97 97 def setup(self):
98 98
99 99 raise NotImplementedError
100 100
101 101 def run(self):
102 102
103 103 raise NotImplementedError
104 104
105 105 def close(self):
106 106
107 107 return
108 108
109 109
110 110 class Operation(object):
111 111
112 112 """
113 113 Update - Jan 2018 - MULTIPROCESSING
114 114
115 115 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
116 116 The constructor doe snot receive any argument, neither the baseclass.
117 117
118 118
119 119 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
120 120 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
121 121 acumulacion dentro de esta clase
122 122
123 123 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
124 124
125 125 """
126 126
127 127 def __init__(self):
128 128
129 129 self.id = None
130 130 self.isConfig = False
131 131
132 132 if not hasattr(self, 'name'):
133 133 self.name = self.__class__.__name__
134 134
135 135 def getAllowedArgs(self):
136 136 if hasattr(self, '__attrs__'):
137 137 return self.__attrs__
138 138 else:
139 139 return inspect.getargspec(self.run).args
140 140
141 141 def setup(self):
142 142
143 143 self.isConfig = True
144 144
145 145 raise NotImplementedError
146 146
147 147 def run(self, dataIn, **kwargs):
148 148 """
149 149 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
150 150 atributos del objeto dataIn.
151 151
152 152 Input:
153 153
154 154 dataIn : objeto del tipo JROData
155 155
156 156 Return:
157 157
158 158 None
159 159
160 160 Affected:
161 161 __buffer : buffer de recepcion de datos.
162 162
163 163 """
164 164 if not self.isConfig:
165 165 self.setup(**kwargs)
166 166
167 167 raise NotImplementedError
168 168
169 169 def close(self):
170 170
171 171 return
172 172
173 173
174 174 def MPDecorator(BaseClass):
175 175 """
176 176 Multiprocessing class decorator
177 177
178 178 This function add multiprocessing features to a BaseClass. Also, it handle
179 179 the communication beetween processes (readers, procUnits and operations).
180 180 """
181 181
182 182 class MPClass(BaseClass, Process):
183 183
184 184 def __init__(self, *args, **kwargs):
185 185 super(MPClass, self).__init__()
186 186 Process.__init__(self)
187 187 self.operationKwargs = {}
188 188 self.args = args
189 189 self.kwargs = kwargs
190 190 self.sender = None
191 191 self.receiver = None
192 192 self.name = BaseClass.__name__
193 if 'plot' in self.name.lower():
194 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
193 195 self.start_time = time.time()
194 196
195 197 if len(self.args) is 3:
196 198 self.typeProc = "ProcUnit"
197 199 self.id = args[0]
198 200 self.inputId = args[1]
199 201 self.project_id = args[2]
200 202 elif len(self.args) is 2:
201 203 self.id = args[0]
202 204 self.inputId = args[0]
203 205 self.project_id = args[1]
204 206 self.typeProc = "Operation"
205 207
206 208 def subscribe(self):
207 209 '''
208 210 This function create a socket to receive objects from the
209 211 topic `inputId`.
210 212 '''
211 213
212 214 c = zmq.Context()
213 215 self.receiver = c.socket(zmq.SUB)
214 216 self.receiver.connect(
215 217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
216 218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
217 219
218 220 def listen(self):
219 221 '''
220 222 This function waits for objects and deserialize using pickle
221 223 '''
222 224
223 225 data = pickle.loads(self.receiver.recv_multipart()[1])
224 226
225 227 return data
226 228
227 229 def set_publisher(self):
228 230 '''
229 231 This function create a socket for publishing purposes.
230 232 '''
231 233
232 234 time.sleep(1)
233 235 c = zmq.Context()
234 236 self.sender = c.socket(zmq.PUB)
235 237 self.sender.connect(
236 238 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
237 239
238 240 def publish(self, data, id):
239 241 '''
240 242 This function publish an object, to a specific topic.
241 243 '''
242 244 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
243 245
244 246 def runReader(self):
245 247 '''
246 248 Run fuction for read units
247 249 '''
248 250 while True:
249 251
250 252 BaseClass.run(self, **self.kwargs)
251 253
252 254 for op, optype, opId, kwargs in self.operations:
253 255 if optype == 'self':
254 256 op(**kwargs)
255 257 elif optype == 'other':
256 258 self.dataOut = op.run(self.dataOut, **self.kwargs)
257 259 elif optype == 'external':
258 260 self.publish(self.dataOut, opId)
259 261
260 if self.dataOut.flagNoData and self.dataOut.error is None:
262 if self.dataOut.flagNoData and not self.dataOut.error:
261 263 continue
262 264
263 265 self.publish(self.dataOut, self.id)
264 266
265 267 if self.dataOut.error:
266 if self.dataOut.error[0] == -1:
267 log.error(self.dataOut.error[1], self.name)
268 if self.dataOut.error[0] == 1:
269 log.success(self.dataOut.error[1], self.name)
268 log.error(self.dataOut.error, self.name)
270 269 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
271 270 break
272 271
273 272 time.sleep(1)
274 273
275 274 def runProc(self):
276 275 '''
277 276 Run function for proccessing units
278 277 '''
279 278
280 279 while True:
281 280 self.dataIn = self.listen()
282 281
283 282 if self.dataIn.flagNoData and self.dataIn.error is None:
284 283 continue
285 284
286 285 BaseClass.run(self, **self.kwargs)
287 286
288 if self.dataOut.flagNoData:
289 continue
287 if self.dataIn.error:
288 self.dataOut.error = self.dataIn.error
289 self.dataOut.flagNoData = True
290 290
291 291 for op, optype, opId, kwargs in self.operations:
292 292 if optype == 'self':
293 293 op(**kwargs)
294 294 elif optype == 'other':
295 295 self.dataOut = op.run(self.dataOut, **kwargs)
296 296 elif optype == 'external':
297 self.publish(self.dataOut, opId)
297 if not self.dataOut.flagNoData or self.dataOut.error:
298 self.publish(self.dataOut, opId)
298 299
299 300 self.publish(self.dataOut, self.id)
300 301 if self.dataIn.error:
301 302 break
302 303
303 304 time.sleep(1)
304 305
305 306 def runOp(self):
306 307 '''
307 308 Run function for external operations (this operations just receive data
308 309 ex: plots, writers, publishers)
309 310 '''
310 311
311 312 while True:
312 313
313 314 dataOut = self.listen()
314 315
315 316 BaseClass.run(self, dataOut, **self.kwargs)
316 317
317 318 if dataOut.error:
318 319 break
320
319 321 time.sleep(1)
320 322
321 323 def run(self):
322 324 if self.typeProc is "ProcUnit":
323 325
324 326 if self.inputId is not None:
325 327
326 328 self.subscribe()
327 329
328 330 self.set_publisher()
329 331
330 332 if 'Reader' not in BaseClass.__name__:
331 333 self.runProc()
332 334 else:
333 335 self.runReader()
334 336
335 337 elif self.typeProc is "Operation":
336 338
337 339 self.subscribe()
338 340 self.runOp()
339 341
340 342 else:
341 343 raise ValueError("Unknown type")
342 344
343 345 self.close()
344 346
345 347 def event_monitor(self, monitor):
346 348
347 349 events = {}
348 350
349 351 for name in dir(zmq):
350 352 if name.startswith('EVENT_'):
351 353 value = getattr(zmq, name)
352 354 events[value] = name
353 355
354 356 while monitor.poll():
355 357 evt = recv_monitor_message(monitor)
356 358 if evt['event'] == 32:
357 359 self.connections += 1
358 360 if evt['event'] == 512:
359 361 pass
360 362
361 363 evt.update({'description': events[evt['event']]})
362 364
363 365 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
364 366 break
365 367 monitor.close()
366 368 print('event monitor thread done!')
367 369
368 370 def close(self):
369 371
370 372 BaseClass.close(self)
371 373
372 374 if self.sender:
373 375 self.sender.close()
374 376
375 377 if self.receiver:
376 378 self.receiver.close()
377 379
378 380 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
379 381
380 382 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now