##// END OF EJS Templates
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
joabAM -
r1279:c53fe2a4a291
parent child
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -1,1290 +1,1295
1 1 '''
2 2 Updated on January , 2018, for multiprocessing purposes
3 3 Author: Sergio Cortez
4 4 Created on September , 2012
5 5 '''
6 6 from platform import python_version
7 7 import sys
8 8 import ast
9 9 import datetime
10 10 import traceback
11 11 import math
12 12 import time
13 13 import zmq
14 14 from multiprocessing import Process, Queue, Event, Value, cpu_count
15 15 from threading import Thread
16 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 17 from xml.dom import minidom
18 18
19 19
20 20 from schainpy.admin import Alarm, SchainWarning
21 21 from schainpy.model import *
22 22 from schainpy.utils import log
23 23
24 24
25 25 DTYPES = {
26 26 'Voltage': '.r',
27 27 'Spectra': '.pdata'
28 28 }
29 29
30 30
31 31 def MPProject(project, n=cpu_count()):
32 32 '''
33 33 Project wrapper to run schain in n processes
34 34 '''
35 35
36 36 rconf = project.getReadUnitObj()
37 37 op = rconf.getOperationObj('run')
38 38 dt1 = op.getParameterValue('startDate')
39 39 dt2 = op.getParameterValue('endDate')
40 40 tm1 = op.getParameterValue('startTime')
41 41 tm2 = op.getParameterValue('endTime')
42 42 days = (dt2 - dt1).days
43 43
44 44 for day in range(days + 1):
45 45 skip = 0
46 46 cursor = 0
47 47 processes = []
48 48 dt = dt1 + datetime.timedelta(day)
49 49 dt_str = dt.strftime('%Y/%m/%d')
50 50 reader = JRODataReader()
51 51 paths, files = reader.searchFilesOffLine(path=rconf.path,
52 52 startDate=dt,
53 53 endDate=dt,
54 54 startTime=tm1,
55 55 endTime=tm2,
56 56 ext=DTYPES[rconf.datatype])
57 57 nFiles = len(files)
58 58 if nFiles == 0:
59 59 continue
60 60 skip = int(math.ceil(nFiles / n))
61 61 while nFiles > cursor * skip:
62 62 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
63 63 skip=skip)
64 64 p = project.clone()
65 65 p.start()
66 66 processes.append(p)
67 67 cursor += 1
68 68
69 69 def beforeExit(exctype, value, trace):
70 70 for process in processes:
71 71 process.terminate()
72 72 process.join()
73 73 print(traceback.print_tb(trace))
74 74
75 75 sys.excepthook = beforeExit
76 76
77 77 for process in processes:
78 78 process.join()
79 79 process.terminate()
80 80
81 81 time.sleep(3)
82 82
83 83 def wait(context):
84 84
85 85 time.sleep(1)
86 86 c = zmq.Context()
87 87 receiver = c.socket(zmq.SUB)
88 88 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
89 89 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
90 90 msg = receiver.recv_multipart()[1]
91 91 context.terminate()
92 92
93 93 class ParameterConf():
94 94
95 95 id = None
96 96 name = None
97 97 value = None
98 98 format = None
99 99
100 100 __formated_value = None
101 101
102 102 ELEMENTNAME = 'Parameter'
103 103
104 104 def __init__(self):
105 105
106 106 self.format = 'str'
107 107
108 108 def getElementName(self):
109 109
110 110 return self.ELEMENTNAME
111 111
112 112 def getValue(self):
113 113
114 114 value = self.value
115 115 format = self.format
116 116
117 117 if self.__formated_value != None:
118 118
119 119 return self.__formated_value
120 120
121 121 if format == 'obj':
122 122 return value
123 123
124 124 if format == 'str':
125 125 self.__formated_value = str(value)
126 126 return self.__formated_value
127 127
128 128 if value == '':
129 129 raise ValueError('%s: This parameter value is empty' % self.name)
130 130
131 131 if format == 'list':
132 132 strList = [s.strip() for s in value.split(',')]
133 133 self.__formated_value = strList
134 134
135 135 return self.__formated_value
136 136
137 137 if format == 'intlist':
138 138 '''
139 139 Example:
140 140 value = (0,1,2)
141 141 '''
142 142
143 143 new_value = ast.literal_eval(value)
144 144
145 145 if type(new_value) not in (tuple, list):
146 146 new_value = [int(new_value)]
147 147
148 148 self.__formated_value = new_value
149 149
150 150 return self.__formated_value
151 151
152 152 if format == 'floatlist':
153 153 '''
154 154 Example:
155 155 value = (0.5, 1.4, 2.7)
156 156 '''
157 157
158 158 new_value = ast.literal_eval(value)
159 159
160 160 if type(new_value) not in (tuple, list):
161 161 new_value = [float(new_value)]
162 162
163 163 self.__formated_value = new_value
164 164
165 165 return self.__formated_value
166 166
167 167 if format == 'date':
168 168 strList = value.split('/')
169 169 intList = [int(x) for x in strList]
170 170 date = datetime.date(intList[0], intList[1], intList[2])
171 171
172 172 self.__formated_value = date
173 173
174 174 return self.__formated_value
175 175
176 176 if format == 'time':
177 177 strList = value.split(':')
178 178 intList = [int(x) for x in strList]
179 179 time = datetime.time(intList[0], intList[1], intList[2])
180 180
181 181 self.__formated_value = time
182 182
183 183 return self.__formated_value
184 184
185 185 if format == 'pairslist':
186 186 '''
187 187 Example:
188 188 value = (0,1),(1,2)
189 189 '''
190 190
191 191 new_value = ast.literal_eval(value)
192 192
193 193 if type(new_value) not in (tuple, list):
194 194 raise ValueError('%s has to be a tuple or list of pairs' % value)
195 195
196 196 if type(new_value[0]) not in (tuple, list):
197 197 if len(new_value) != 2:
198 198 raise ValueError('%s has to be a tuple or list of pairs' % value)
199 199 new_value = [new_value]
200 200
201 201 for thisPair in new_value:
202 202 if len(thisPair) != 2:
203 203 raise ValueError('%s has to be a tuple or list of pairs' % value)
204 204
205 205 self.__formated_value = new_value
206 206
207 207 return self.__formated_value
208 208
209 209 if format == 'multilist':
210 210 '''
211 211 Example:
212 212 value = (0,1,2),(3,4,5)
213 213 '''
214 214 multiList = ast.literal_eval(value)
215 215
216 216 if type(multiList[0]) == int:
217 217 multiList = ast.literal_eval('(' + value + ')')
218 218
219 219 self.__formated_value = multiList
220 220
221 221 return self.__formated_value
222 222
223 223 if format == 'bool':
224 224 value = int(value)
225 225
226 226 if format == 'int':
227 227 value = float(value)
228 228
229 229 format_func = eval(format)
230 230
231 231 self.__formated_value = format_func(value)
232 232
233 233 return self.__formated_value
234 234
235 235 def updateId(self, new_id):
236 236
237 237 self.id = str(new_id)
238 238
239 239 def setup(self, id, name, value, format='str'):
240 240 self.id = str(id)
241 241 self.name = name
242 242 if format == 'obj':
243 243 self.value = value
244 244 else:
245 245 self.value = str(value)
246 246 self.format = str.lower(format)
247 247
248 248 self.getValue()
249 249
250 250 return 1
251 251
252 252 def update(self, name, value, format='str'):
253 253
254 254 self.name = name
255 255 self.value = str(value)
256 256 self.format = format
257 257
258 258 def makeXml(self, opElement):
259 259 if self.name not in ('queue',):
260 260 parmElement = SubElement(opElement, self.ELEMENTNAME)
261 261 parmElement.set('id', str(self.id))
262 262 parmElement.set('name', self.name)
263 263 parmElement.set('value', self.value)
264 264 parmElement.set('format', self.format)
265 265
266 266 def readXml(self, parmElement):
267 267
268 268 self.id = parmElement.get('id')
269 269 self.name = parmElement.get('name')
270 270 self.value = parmElement.get('value')
271 271 self.format = str.lower(parmElement.get('format'))
272 272
273 273 # Compatible with old signal chain version
274 274 if self.format == 'int' and self.name == 'idfigure':
275 275 self.name = 'id'
276 276
277 277 def printattr(self):
278 278
279 279 print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id))
280 280
281 281 class OperationConf():
282 282
283 283 ELEMENTNAME = 'Operation'
284 284
285 285 def __init__(self):
286 286
287 287 self.id = '0'
288 288 self.name = None
289 289 self.priority = None
290 290 self.topic = None
291 291
292 292 def __getNewId(self):
293 293
294 294 return int(self.id) * 10 + len(self.parmConfObjList) + 1
295 295
296 296 def getId(self):
297 297 return self.id
298 298
299 299 def updateId(self, new_id):
300 300
301 301 self.id = str(new_id)
302 302
303 303 n = 1
304 304 for parmObj in self.parmConfObjList:
305 305
306 306 idParm = str(int(new_id) * 10 + n)
307 307 parmObj.updateId(idParm)
308 308
309 309 n += 1
310 310
311 311 def getElementName(self):
312 312
313 313 return self.ELEMENTNAME
314 314
315 315 def getParameterObjList(self):
316 316
317 317 return self.parmConfObjList
318 318
319 319 def getParameterObj(self, parameterName):
320 320
321 321 for parmConfObj in self.parmConfObjList:
322 322
323 323 if parmConfObj.name != parameterName:
324 324 continue
325 325
326 326 return parmConfObj
327 327
328 328 return None
329 329
330 330 def getParameterObjfromValue(self, parameterValue):
331 331
332 332 for parmConfObj in self.parmConfObjList:
333 333
334 334 if parmConfObj.getValue() != parameterValue:
335 335 continue
336 336
337 337 return parmConfObj.getValue()
338 338
339 339 return None
340 340
341 341 def getParameterValue(self, parameterName):
342 342
343 343 parameterObj = self.getParameterObj(parameterName)
344 344
345 345 # if not parameterObj:
346 346 # return None
347 347
348 348 value = parameterObj.getValue()
349 349
350 350 return value
351 351
352 352 def getKwargs(self):
353 353
354 354 kwargs = {}
355 355
356 356 for parmConfObj in self.parmConfObjList:
357 357 if self.name == 'run' and parmConfObj.name == 'datatype':
358 358 continue
359 359
360 360 kwargs[parmConfObj.name] = parmConfObj.getValue()
361 361
362 362 return kwargs
363 363
364 364 def setup(self, id, name, priority, type, project_id, err_queue, lock):
365 365
366 366 self.id = str(id)
367 367 self.project_id = project_id
368 368 self.name = name
369 369 self.type = type
370 370 self.priority = priority
371 371 self.err_queue = err_queue
372 372 self.lock = lock
373 373 self.parmConfObjList = []
374 374
375 375 def removeParameters(self):
376 376
377 377 for obj in self.parmConfObjList:
378 378 del obj
379 379
380 380 self.parmConfObjList = []
381 381
382 382 def addParameter(self, name, value, format='str'):
383 383
384 384 if value is None:
385 385 return None
386 386 id = self.__getNewId()
387 387
388 388 parmConfObj = ParameterConf()
389 389 if not parmConfObj.setup(id, name, value, format):
390 390 return None
391 391
392 392 self.parmConfObjList.append(parmConfObj)
393 393
394 394 return parmConfObj
395 395
396 396 def changeParameter(self, name, value, format='str'):
397 397
398 398 parmConfObj = self.getParameterObj(name)
399 399 parmConfObj.update(name, value, format)
400 400
401 401 return parmConfObj
402 402
403 403 def makeXml(self, procUnitElement):
404 404
405 405 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
406 406 opElement.set('id', str(self.id))
407 407 opElement.set('name', self.name)
408 408 opElement.set('type', self.type)
409 409 opElement.set('priority', str(self.priority))
410 410
411 411 for parmConfObj in self.parmConfObjList:
412 412 parmConfObj.makeXml(opElement)
413 413
414 414 def readXml(self, opElement, project_id):
415 415
416 416 self.id = opElement.get('id')
417 417 self.name = opElement.get('name')
418 418 self.type = opElement.get('type')
419 419 self.priority = opElement.get('priority')
420 420 self.project_id = str(project_id)
421 421
422 422 # Compatible with old signal chain version
423 423 # Use of 'run' method instead 'init'
424 424 if self.type == 'self' and self.name == 'init':
425 425 self.name = 'run'
426 426
427 427 self.parmConfObjList = []
428 428
429 429 parmElementList = opElement.iter(ParameterConf().getElementName())
430 430
431 431 for parmElement in parmElementList:
432 432 parmConfObj = ParameterConf()
433 433 parmConfObj.readXml(parmElement)
434 434
435 435 # Compatible with old signal chain version
436 436 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
437 437 if self.type != 'self' and self.name == 'Plot':
438 438 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
439 439 self.name = parmConfObj.value
440 440 continue
441 441
442 442 self.parmConfObjList.append(parmConfObj)
443 443
444 444 def printattr(self):
445 445
446 446 print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME,
447 447 self.id,
448 448 self.name,
449 449 self.type,
450 450 self.priority,
451 451 self.project_id))
452 452
453 453 for parmConfObj in self.parmConfObjList:
454 454 parmConfObj.printattr()
455 455
456 456 def createObject(self):
457 457
458 458 className = eval(self.name)
459 459
460 460 if self.type == 'other':
461 461 opObj = className()
462 462 elif self.type == 'external':
463 463 kwargs = self.getKwargs()
464 464 opObj = className(self.id, self.id, self.project_id, self.err_queue, self.lock, 'Operation', **kwargs)
465 465 opObj.start()
466 466 self.opObj = opObj
467 467
468 468 return opObj
469 469
470 470 class ProcUnitConf():
471 471
472 472 ELEMENTNAME = 'ProcUnit'
473 473
474 474 def __init__(self):
475 475
476 476 self.id = None
477 477 self.datatype = None
478 478 self.name = None
479 479 self.inputId = None
480 480 self.opConfObjList = []
481 481 self.procUnitObj = None
482 482 self.opObjDict = {}
483 483
484 484 def __getPriority(self):
485 485
486 486 return len(self.opConfObjList) + 1
487 487
488 488 def __getNewId(self):
489 489
490 490 return int(self.id) * 10 + len(self.opConfObjList) + 1
491 491
492 492 def getElementName(self):
493 493
494 494 return self.ELEMENTNAME
495 495
496 496 def getId(self):
497 497
498 498 return self.id
499 499
500 500 def updateId(self, new_id):
501 501 '''
502 502 new_id = int(parentId) * 10 + (int(self.id) % 10)
503 503 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
504 504
505 505 # If this proc unit has not inputs
506 506 #if self.inputId == '0':
507 507 #new_inputId = 0
508 508
509 509 n = 1
510 510 for opConfObj in self.opConfObjList:
511 511
512 512 idOp = str(int(new_id) * 10 + n)
513 513 opConfObj.updateId(idOp)
514 514
515 515 n += 1
516 516
517 517 self.parentId = str(parentId)
518 518 self.id = str(new_id)
519 519 #self.inputId = str(new_inputId)
520 520 '''
521 521 n = 1
522 522
523 523 def getInputId(self):
524 524
525 525 return self.inputId
526 526
527 527 def getOperationObjList(self):
528 528
529 529 return self.opConfObjList
530 530
531 531 def getOperationObj(self, name=None):
532 532
533 533 for opConfObj in self.opConfObjList:
534 534
535 535 if opConfObj.name != name:
536 536 continue
537 537
538 538 return opConfObj
539 539
540 540 return None
541 541
542 542 def getOpObjfromParamValue(self, value=None):
543 543
544 544 for opConfObj in self.opConfObjList:
545 545 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
546 546 continue
547 547 return opConfObj
548 548 return None
549 549
550 550 def getProcUnitObj(self):
551 551
552 552 return self.procUnitObj
553 553
554 554 def setup(self, project_id, id, name, datatype, inputId, err_queue, lock):
555 555 '''
556 556 id sera el topico a publicar
557 557 inputId sera el topico a subscribirse
558 558 '''
559 559
560 560 # Compatible with old signal chain version
561 561 if datatype == None and name == None:
562 562 raise ValueError('datatype or name should be defined')
563 563
564 564 #Definir una condicion para inputId cuando sea 0
565 565
566 566 if name == None:
567 567 if 'Proc' in datatype:
568 568 name = datatype
569 569 else:
570 570 name = '%sProc' % (datatype)
571 571
572 572 if datatype == None:
573 573 datatype = name.replace('Proc', '')
574 574
575 575 self.id = str(id)
576 576 self.project_id = project_id
577 577 self.name = name
578 578 self.datatype = datatype
579 579 self.inputId = inputId
580 580 self.err_queue = err_queue
581 581 self.lock = lock
582 582 self.opConfObjList = []
583 583
584 584 self.addOperation(name='run', optype='self')
585 585
586 586 def removeOperations(self):
587 587
588 588 for obj in self.opConfObjList:
589 589 del obj
590 590
591 591 self.opConfObjList = []
592 592 self.addOperation(name='run')
593 593
594 594 def addParameter(self, **kwargs):
595 595 '''
596 596 Add parameters to 'run' operation
597 597 '''
598 598 opObj = self.opConfObjList[0]
599 599
600 600 opObj.addParameter(**kwargs)
601 601
602 602 return opObj
603 603
604 604 def addOperation(self, name, optype='self'):
605 605 '''
606 606 Actualizacion - > proceso comunicacion
607 607 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
608 608 definir el tipoc de socket o comunicacion ipc++
609 609
610 610 '''
611 611
612 612 id = self.__getNewId()
613 613 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
614 614 opConfObj = OperationConf()
615 615 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.lock)
616 616 self.opConfObjList.append(opConfObj)
617 617
618 618 return opConfObj
619 619
620 620 def makeXml(self, projectElement):
621 621
622 622 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
623 623 procUnitElement.set('id', str(self.id))
624 624 procUnitElement.set('name', self.name)
625 625 procUnitElement.set('datatype', self.datatype)
626 626 procUnitElement.set('inputId', str(self.inputId))
627 627
628 628 for opConfObj in self.opConfObjList:
629 629 opConfObj.makeXml(procUnitElement)
630 630
631 631 def readXml(self, upElement, project_id):
632 632
633 633 self.id = upElement.get('id')
634 634 self.name = upElement.get('name')
635 635 self.datatype = upElement.get('datatype')
636 636 self.inputId = upElement.get('inputId')
637 637 self.project_id = str(project_id)
638 638
639 639 if self.ELEMENTNAME == 'ReadUnit':
640 640 self.datatype = self.datatype.replace('Reader', '')
641 641
642 642 if self.ELEMENTNAME == 'ProcUnit':
643 643 self.datatype = self.datatype.replace('Proc', '')
644 644
645 645 if self.inputId == 'None':
646 646 self.inputId = '0'
647 647
648 648 self.opConfObjList = []
649 649
650 650 opElementList = upElement.iter(OperationConf().getElementName())
651 651
652 652 for opElement in opElementList:
653 653 opConfObj = OperationConf()
654 654 opConfObj.readXml(opElement, project_id)
655 655 self.opConfObjList.append(opConfObj)
656 656
657 657 def printattr(self):
658 658
659 659 print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME,
660 660 self.id,
661 661 self.name,
662 662 self.datatype,
663 663 self.inputId,
664 664 self.project_id))
665 665
666 666 for opConfObj in self.opConfObjList:
667 667 opConfObj.printattr()
668 668
669 669 def getKwargs(self):
670 670
671 671 opObj = self.opConfObjList[0]
672 672 kwargs = opObj.getKwargs()
673 673
674 674 return kwargs
675 675
676 676 def createObjects(self):
677 677 '''
678 678 Instancia de unidades de procesamiento.
679 679 '''
680 680
681 681 className = eval(self.name)
682 #print(self.name)
682 683 kwargs = self.getKwargs()
684 #print(kwargs)
685 #print("mark_a")
683 686 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs)
687 #print("mark_b")
684 688 log.success('creating process...', self.name)
685 689
686 690 for opConfObj in self.opConfObjList:
687 691
688 692 if opConfObj.type == 'self' and opConfObj.name == 'run':
689 693 continue
690 694 elif opConfObj.type == 'self':
691 695 opObj = getattr(procUnitObj, opConfObj.name)
692 696 else:
693 697 opObj = opConfObj.createObject()
694 698
695 699 log.success('adding operation: {}, type:{}'.format(
696 700 opConfObj.name,
697 701 opConfObj.type), self.name)
698 702
699 703 procUnitObj.addOperation(opConfObj, opObj)
700 704
701 705 procUnitObj.start()
702 706 self.procUnitObj = procUnitObj
703 707
704 708 def close(self):
705 709
706 710 for opConfObj in self.opConfObjList:
707 711 if opConfObj.type == 'self':
708 712 continue
709 713
710 714 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
711 715 opObj.close()
712 716
713 717 self.procUnitObj.close()
714 718
715 719 return
716 720
717 721
718 722 class ReadUnitConf(ProcUnitConf):
719 723
720 724 ELEMENTNAME = 'ReadUnit'
721 725
722 726 def __init__(self):
723 727
724 728 self.id = None
725 729 self.datatype = None
726 730 self.name = None
727 731 self.inputId = None
728 732 self.opConfObjList = []
729 733 self.lock = Event()
730 734 self.lock.set()
731 735 self.lock.n = Value('d', 0)
732 736
733 737 def getElementName(self):
734 738
735 739 return self.ELEMENTNAME
736 740
737 741 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
738 742 startTime='', endTime='', server=None, **kwargs):
739 743
740 744
741 745 '''
742 746 *****el id del proceso sera el Topico
743 747
744 748 Adicion de {topic}, si no esta presente -> error
745 749 kwargs deben ser trasmitidos en la instanciacion
746 750
747 751 '''
748 752
749 753 # Compatible with old signal chain version
750 754 if datatype == None and name == None:
751 755 raise ValueError('datatype or name should be defined')
752 756 if name == None:
753 757 if 'Reader' in datatype:
754 758 name = datatype
755 759 datatype = name.replace('Reader','')
756 760 else:
757 761 name = '{}Reader'.format(datatype)
758 762 if datatype == None:
759 763 if 'Reader' in name:
760 764 datatype = name.replace('Reader','')
761 765 else:
762 766 datatype = name
763 767 name = '{}Reader'.format(name)
764 768
765 769 self.id = id
766 770 self.project_id = project_id
767 771 self.name = name
768 772 self.datatype = datatype
769 773 if path != '':
770 774 self.path = os.path.abspath(path)
775 print (self.path)
771 776 self.startDate = startDate
772 777 self.endDate = endDate
773 778 self.startTime = startTime
774 779 self.endTime = endTime
775 780 self.server = server
776 781 self.err_queue = err_queue
777 782 self.addRunOperation(**kwargs)
778 783
779 784 def update(self, **kwargs):
780 785
781 786 if 'datatype' in kwargs:
782 787 datatype = kwargs.pop('datatype')
783 788 if 'Reader' in datatype:
784 789 self.name = datatype
785 790 else:
786 791 self.name = '%sReader' % (datatype)
787 792 self.datatype = self.name.replace('Reader', '')
788 793
789 794 attrs = ('path', 'startDate', 'endDate',
790 795 'startTime', 'endTime')
791 796
792 797 for attr in attrs:
793 798 if attr in kwargs:
794 799 setattr(self, attr, kwargs.pop(attr))
795 800
796 801 self.updateRunOperation(**kwargs)
797 802
798 803 def removeOperations(self):
799 804
800 805 for obj in self.opConfObjList:
801 806 del obj
802 807
803 808 self.opConfObjList = []
804 809
805 810 def addRunOperation(self, **kwargs):
806 811
807 812 opObj = self.addOperation(name='run', optype='self')
808 813
809 814 if self.server is None:
810 815 opObj.addParameter(
811 816 name='datatype', value=self.datatype, format='str')
812 817 opObj.addParameter(name='path', value=self.path, format='str')
813 818 opObj.addParameter(
814 819 name='startDate', value=self.startDate, format='date')
815 820 opObj.addParameter(
816 821 name='endDate', value=self.endDate, format='date')
817 822 opObj.addParameter(
818 823 name='startTime', value=self.startTime, format='time')
819 824 opObj.addParameter(
820 825 name='endTime', value=self.endTime, format='time')
821 826
822 827 for key, value in list(kwargs.items()):
823 828 opObj.addParameter(name=key, value=value,
824 829 format=type(value).__name__)
825 830 else:
826 831 opObj.addParameter(name='server', value=self.server, format='str')
827 832
828 833 return opObj
829 834
830 835 def updateRunOperation(self, **kwargs):
831 836
832 837 opObj = self.getOperationObj(name='run')
833 838 opObj.removeParameters()
834 839
835 840 opObj.addParameter(name='datatype', value=self.datatype, format='str')
836 841 opObj.addParameter(name='path', value=self.path, format='str')
837 842 opObj.addParameter(
838 843 name='startDate', value=self.startDate, format='date')
839 844 opObj.addParameter(name='endDate', value=self.endDate, format='date')
840 845 opObj.addParameter(
841 846 name='startTime', value=self.startTime, format='time')
842 847 opObj.addParameter(name='endTime', value=self.endTime, format='time')
843 848
844 849 for key, value in list(kwargs.items()):
845 850 opObj.addParameter(name=key, value=value,
846 851 format=type(value).__name__)
847 852
848 853 return opObj
849 854
850 855 def readXml(self, upElement, project_id):
851 856
852 857 self.id = upElement.get('id')
853 858 self.name = upElement.get('name')
854 859 self.datatype = upElement.get('datatype')
855 860 self.project_id = str(project_id) #yong
856 861
857 862 if self.ELEMENTNAME == 'ReadUnit':
858 863 self.datatype = self.datatype.replace('Reader', '')
859 864
860 865 self.opConfObjList = []
861 866
862 867 opElementList = upElement.iter(OperationConf().getElementName())
863 868
864 869 for opElement in opElementList:
865 870 opConfObj = OperationConf()
866 871 opConfObj.readXml(opElement, project_id)
867 872 self.opConfObjList.append(opConfObj)
868 873
869 874 if opConfObj.name == 'run':
870 875 self.path = opConfObj.getParameterValue('path')
871 876 self.startDate = opConfObj.getParameterValue('startDate')
872 877 self.endDate = opConfObj.getParameterValue('endDate')
873 878 self.startTime = opConfObj.getParameterValue('startTime')
874 879 self.endTime = opConfObj.getParameterValue('endTime')
875 880
876 881
877 882 class Project(Process):
878 883
879 884 ELEMENTNAME = 'Project'
880 885
881 886 def __init__(self):
882 887
883 888 Process.__init__(self)
884 889 self.id = None
885 890 self.filename = None
886 891 self.description = None
887 892 self.email = None
888 893 self.alarm = None
889 894 self.procUnitConfObjDict = {}
890 895 self.err_queue = Queue()
891 896
892 897 def __getNewId(self):
893 898
894 899 idList = list(self.procUnitConfObjDict.keys())
895 900 id = int(self.id) * 10
896 901
897 902 while True:
898 903 id += 1
899 904
900 905 if str(id) in idList:
901 906 continue
902 907
903 908 break
904 909
905 910 return str(id)
906 911
907 912 def getElementName(self):
908 913
909 914 return self.ELEMENTNAME
910 915
911 916 def getId(self):
912 917
913 918 return self.id
914 919
915 920 def updateId(self, new_id):
916 921
917 922 self.id = str(new_id)
918 923
919 924 keyList = list(self.procUnitConfObjDict.keys())
920 925 keyList.sort()
921 926
922 927 n = 1
923 928 newProcUnitConfObjDict = {}
924 929
925 930 for procKey in keyList:
926 931
927 932 procUnitConfObj = self.procUnitConfObjDict[procKey]
928 933 idProcUnit = str(int(self.id) * 10 + n)
929 934 procUnitConfObj.updateId(idProcUnit)
930 935 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
931 936 n += 1
932 937
933 938 self.procUnitConfObjDict = newProcUnitConfObjDict
934 939
935 940 def setup(self, id=1, name='', description='', email=None, alarm=[]):
936 941
937 942 print(' ')
938 943 print('*' * 60)
939 944 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
940 945 print('*' * 60)
941 946 print("* Python " + python_version() + " *")
942 947 print('*' * 19)
943 948 print(' ')
944 949 self.id = str(id)
945 950 self.description = description
946 951 self.email = email
947 952 self.alarm = alarm
948 953 if name:
949 954 self.name = '{} ({})'.format(Process.__name__, name)
950 955
951 956 def update(self, **kwargs):
952 957
953 958 for key, value in list(kwargs.items()):
954 959 setattr(self, key, value)
955 960
956 961 def clone(self):
957 962
958 963 p = Project()
959 964 p.procUnitConfObjDict = self.procUnitConfObjDict
960 965 return p
961 966
962 967 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
963 968
964 969 '''
965 970 Actualizacion:
966 971 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
967 972
968 973 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
969 974
970 975 '''
971 976
972 977 if id is None:
973 978 idReadUnit = self.__getNewId()
974 979 else:
975 980 idReadUnit = str(id)
976 981
977 982 readUnitConfObj = ReadUnitConf()
978 983 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
979 984 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
980 985
981 986 return readUnitConfObj
982 987
983 988 def addProcUnit(self, inputId='0', datatype=None, name=None):
984 989
985 990 '''
986 991 Actualizacion:
987 992 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
988 993 Deberia reemplazar a "inputId"
989 994
990 995 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
991 996 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
992 997
993 998 '''
994 999
995 1000 idProcUnit = self.__getNewId()
996 1001 procUnitConfObj = ProcUnitConf()
997 1002 input_proc = self.procUnitConfObjDict[inputId]
998 1003 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.lock)
999 1004 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1000 1005
1001 1006 return procUnitConfObj
1002 1007
1003 1008 def removeProcUnit(self, id):
1004 1009
1005 1010 if id in list(self.procUnitConfObjDict.keys()):
1006 1011 self.procUnitConfObjDict.pop(id)
1007 1012
1008 1013 def getReadUnitId(self):
1009 1014
1010 1015 readUnitConfObj = self.getReadUnitObj()
1011 1016
1012 1017 return readUnitConfObj.id
1013 1018
1014 1019 def getReadUnitObj(self):
1015 1020
1016 1021 for obj in list(self.procUnitConfObjDict.values()):
1017 1022 if obj.getElementName() == 'ReadUnit':
1018 1023 return obj
1019 1024
1020 1025 return None
1021 1026
1022 1027 def getProcUnitObj(self, id=None, name=None):
1023 1028
1024 1029 if id != None:
1025 1030 return self.procUnitConfObjDict[id]
1026 1031
1027 1032 if name != None:
1028 1033 return self.getProcUnitObjByName(name)
1029 1034
1030 1035 return None
1031 1036
1032 1037 def getProcUnitObjByName(self, name):
1033 1038
1034 1039 for obj in list(self.procUnitConfObjDict.values()):
1035 1040 if obj.name == name:
1036 1041 return obj
1037 1042
1038 1043 return None
1039 1044
1040 1045 def procUnitItems(self):
1041 1046
1042 1047 return list(self.procUnitConfObjDict.items())
1043 1048
1044 1049 def makeXml(self):
1045 1050
1046 1051 projectElement = Element('Project')
1047 1052 projectElement.set('id', str(self.id))
1048 1053 projectElement.set('name', self.name)
1049 1054 projectElement.set('description', self.description)
1050 1055
1051 1056 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1052 1057 procUnitConfObj.makeXml(projectElement)
1053 1058
1054 1059 self.projectElement = projectElement
1055 1060
1056 1061 def writeXml(self, filename=None):
1057 1062
1058 1063 if filename == None:
1059 1064 if self.filename:
1060 1065 filename = self.filename
1061 1066 else:
1062 1067 filename = 'schain.xml'
1063 1068
1064 1069 if not filename:
1065 1070 print('filename has not been defined. Use setFilename(filename) for do it.')
1066 1071 return 0
1067 1072
1068 1073 abs_file = os.path.abspath(filename)
1069 1074
1070 1075 if not os.access(os.path.dirname(abs_file), os.W_OK):
1071 1076 print('No write permission on %s' % os.path.dirname(abs_file))
1072 1077 return 0
1073 1078
1074 1079 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1075 1080 print('File %s already exists and it could not be overwriten' % abs_file)
1076 1081 return 0
1077 1082
1078 1083 self.makeXml()
1079 1084
1080 1085 ElementTree(self.projectElement).write(abs_file, method='xml')
1081 1086
1082 1087 self.filename = abs_file
1083 1088
1084 1089 return 1
1085 1090
1086 1091 def readXml(self, filename=None):
1087 1092
1088 1093 if not filename:
1089 1094 print('filename is not defined')
1090 1095 return 0
1091 1096
1092 1097 abs_file = os.path.abspath(filename)
1093 1098
1094 1099 if not os.path.isfile(abs_file):
1095 1100 print('%s file does not exist' % abs_file)
1096 1101 return 0
1097 1102
1098 1103 self.projectElement = None
1099 1104 self.procUnitConfObjDict = {}
1100 1105
1101 1106 try:
1102 1107 self.projectElement = ElementTree().parse(abs_file)
1103 1108 except:
1104 1109 print('Error reading %s, verify file format' % filename)
1105 1110 return 0
1106 1111
1107 1112 self.project = self.projectElement.tag
1108 1113
1109 1114 self.id = self.projectElement.get('id')
1110 1115 self.name = self.projectElement.get('name')
1111 1116 self.description = self.projectElement.get('description')
1112 1117
1113 1118 readUnitElementList = self.projectElement.iter(
1114 1119 ReadUnitConf().getElementName())
1115 1120
1116 1121 for readUnitElement in readUnitElementList:
1117 1122 readUnitConfObj = ReadUnitConf()
1118 1123 readUnitConfObj.readXml(readUnitElement, self.id)
1119 1124 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1120 1125
1121 1126 procUnitElementList = self.projectElement.iter(
1122 1127 ProcUnitConf().getElementName())
1123 1128
1124 1129 for procUnitElement in procUnitElementList:
1125 1130 procUnitConfObj = ProcUnitConf()
1126 1131 procUnitConfObj.readXml(procUnitElement, self.id)
1127 1132 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1128 1133
1129 1134 self.filename = abs_file
1130 1135
1131 1136 return 1
1132 1137
1133 1138 def __str__(self):
1134 1139
1135 1140 print('Project: name = %s, description = %s, id = %s' % (
1136 1141 self.name,
1137 1142 self.description,
1138 1143 self.id))
1139 1144
1140 1145 for procUnitConfObj in self.procUnitConfObjDict.values():
1141 1146 print(procUnitConfObj)
1142 1147
1143 1148 def createObjects(self):
1144 1149
1145 1150
1146 1151 keys = list(self.procUnitConfObjDict.keys())
1147 1152 keys.sort()
1148 1153 for key in keys:
1149 1154 self.procUnitConfObjDict[key].createObjects()
1150 1155
1151 1156 def monitor(self):
1152 1157
1153 1158 t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx))
1154 1159 t.start()
1155 1160
1156 1161 def __monitor(self, queue, ctx):
1157 1162
1158 1163 import socket
1159 1164
1160 1165 procs = 0
1161 1166 err_msg = ''
1162 1167
1163 1168 while True:
1164 1169 msg = queue.get()
1165 1170 if '#_start_#' in msg:
1166 1171 procs += 1
1167 1172 elif '#_end_#' in msg:
1168 1173 procs -=1
1169 1174 else:
1170 1175 err_msg = msg
1171 1176
1172 1177 if procs == 0 or 'Traceback' in err_msg:
1173 1178 break
1174 1179 time.sleep(0.1)
1175 1180
1176 1181 if '|' in err_msg:
1177 1182 name, err = err_msg.split('|')
1178 1183 if 'SchainWarning' in err:
1179 1184 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
1180 1185 elif 'SchainError' in err:
1181 1186 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
1182 1187 else:
1183 1188 log.error(err, name)
1184 1189 else:
1185 1190 name, err = self.name, err_msg
1186 1191
1187 1192 time.sleep(2)
1188 1193
1189 1194 for conf in self.procUnitConfObjDict.values():
1190 1195 for confop in conf.opConfObjList:
1191 1196 if confop.type == 'external':
1192 1197 confop.opObj.terminate()
1193 1198 conf.procUnitObj.terminate()
1194 1199
1195 1200 ctx.term()
1196 1201
1197 1202 message = ''.join(err)
1198 1203
1199 1204 if err_msg:
1200 1205 subject = 'SChain v%s: Error running %s\n' % (
1201 1206 schainpy.__version__, self.name)
1202 1207
1203 1208 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
1204 1209 socket.gethostname())
1205 1210 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1206 1211 subtitle += 'Configuration file: %s\n' % self.filename
1207 1212 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1208 1213
1209 1214 readUnitConfObj = self.getReadUnitObj()
1210 1215 if readUnitConfObj:
1211 1216 subtitle += '\nInput parameters:\n'
1212 1217 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1213 1218 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1214 1219 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1215 1220 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1216 1221 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1217 1222 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1218 1223
1219 1224 a = Alarm(
1220 1225 modes=self.alarm,
1221 1226 email=self.email,
1222 1227 message=message,
1223 1228 subject=subject,
1224 1229 subtitle=subtitle,
1225 1230 filename=self.filename
1226 1231 )
1227 1232
1228 1233 a.start()
1229 1234
1230 1235 def isPaused(self):
1231 1236 return 0
1232 1237
1233 1238 def isStopped(self):
1234 1239 return 0
1235 1240
1236 1241 def runController(self):
1237 1242 '''
1238 1243 returns 0 when this process has been stopped, 1 otherwise
1239 1244 '''
1240 1245
1241 1246 if self.isPaused():
1242 1247 print('Process suspended')
1243 1248
1244 1249 while True:
1245 1250 time.sleep(0.1)
1246 1251
1247 1252 if not self.isPaused():
1248 1253 break
1249 1254
1250 1255 if self.isStopped():
1251 1256 break
1252 1257
1253 1258 print('Process reinitialized')
1254 1259
1255 1260 if self.isStopped():
1256 1261 print('Process stopped')
1257 1262 return 0
1258 1263
1259 1264 return 1
1260 1265
1261 1266 def setFilename(self, filename):
1262 1267
1263 1268 self.filename = filename
1264 1269
1265 1270 def setProxy(self):
1266 1271
1267 1272 if not os.path.exists('/tmp/schain'):
1268 1273 os.mkdir('/tmp/schain')
1269 1274
1270 1275 self.ctx = zmq.Context()
1271 1276 xpub = self.ctx.socket(zmq.XPUB)
1272 1277 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1273 1278 xsub = self.ctx.socket(zmq.XSUB)
1274 1279 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1275 1280 self.monitor()
1276 1281 try:
1277 1282 zmq.proxy(xpub, xsub)
1278 1283 except zmq.ContextTerminated:
1279 1284 xpub.close()
1280 1285 xsub.close()
1281 1286
1282 1287 def run(self):
1283 1288
1284 1289 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1285 1290 self.start_time = time.time()
1286 1291 self.createObjects()
1287 1292 self.setProxy()
1288 1293 log.success('{} Done (Time: {}s)'.format(
1289 1294 self.name,
1290 1295 time.time()-self.start_time), '')
1 NO CONTENT: modified file
1 NO CONTENT: modified file
@@ -1,810 +1,808
1 1
2 2 import os
3 3 import sys
4 4 import zmq
5 5 import time
6 6 import numpy
7 7 import datetime
8 8 from functools import wraps
9 9 from threading import Thread
10 10 import matplotlib
11 11
12 12 if 'BACKEND' in os.environ:
13 13 matplotlib.use(os.environ['BACKEND'])
14 14 elif 'linux' in sys.platform:
15 15 matplotlib.use("TkAgg")
16 16 elif 'darwin' in sys.platform:
17 17 matplotlib.use('WxAgg')
18 18 else:
19 19 from schainpy.utils import log
20 20 log.warning('Using default Backend="Agg"', 'INFO')
21 21 matplotlib.use('Agg')
22 22
23 23 import matplotlib.pyplot as plt
24 24 from matplotlib.patches import Polygon
25 25 from mpl_toolkits.axes_grid1 import make_axes_locatable
26 26 from matplotlib.ticker import FuncFormatter, LinearLocator, MultipleLocator
27 27
28 28 from schainpy.model.data.jrodata import PlotterData
29 29 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
30 30 from schainpy.utils import log
31 31
32 32 jet_values = matplotlib.pyplot.get_cmap('jet', 100)(numpy.arange(100))[10:90]
33 33 blu_values = matplotlib.pyplot.get_cmap(
34 34 'seismic_r', 20)(numpy.arange(20))[10:15]
35 35 ncmap = matplotlib.colors.LinearSegmentedColormap.from_list(
36 36 'jro', numpy.vstack((blu_values, jet_values)))
37 37 matplotlib.pyplot.register_cmap(cmap=ncmap)
38 38
39 39 CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis',
40 40 'plasma', 'inferno', 'Greys', 'seismic', 'bwr', 'coolwarm')]
41 41
42 42 EARTH_RADIUS = 6.3710e3
43 43
44 44 def ll2xy(lat1, lon1, lat2, lon2):
45 45
46 46 p = 0.017453292519943295
47 47 a = 0.5 - numpy.cos((lat2 - lat1) * p)/2 + numpy.cos(lat1 * p) * \
48 48 numpy.cos(lat2 * p) * (1 - numpy.cos((lon2 - lon1) * p)) / 2
49 49 r = 12742 * numpy.arcsin(numpy.sqrt(a))
50 50 theta = numpy.arctan2(numpy.sin((lon2-lon1)*p)*numpy.cos(lat2*p), numpy.cos(lat1*p)
51 51 * numpy.sin(lat2*p)-numpy.sin(lat1*p)*numpy.cos(lat2*p)*numpy.cos((lon2-lon1)*p))
52 52 theta = -theta + numpy.pi/2
53 53 return r*numpy.cos(theta), r*numpy.sin(theta)
54 54
55 55
56 56 def km2deg(km):
57 57 '''
58 58 Convert distance in km to degrees
59 59 '''
60 60
61 61 return numpy.rad2deg(km/EARTH_RADIUS)
62 62
63 63
64 64 def figpause(interval):
65 65 backend = plt.rcParams['backend']
66 66 if backend in matplotlib.rcsetup.interactive_bk:
67 67 figManager = matplotlib._pylab_helpers.Gcf.get_active()
68 68 if figManager is not None:
69 69 canvas = figManager.canvas
70 70 if canvas.figure.stale:
71 71 canvas.draw()
72 72 try:
73 73 canvas.start_event_loop(interval)
74 74 except:
75 75 pass
76 76 return
77 77
78 78
79 79 def popup(message):
80 80 '''
81 81 '''
82 82
83 83 fig = plt.figure(figsize=(12, 8), facecolor='r')
84 84 text = '\n'.join([s.strip() for s in message.split(':')])
85 85 fig.text(0.01, 0.5, text, ha='left', va='center',
86 86 size='20', weight='heavy', color='w')
87 87 fig.show()
88 88 figpause(1000)
89 89
90 90
91 91 class Throttle(object):
92 92 '''
93 93 Decorator that prevents a function from being called more than once every
94 94 time period.
95 95 To create a function that cannot be called more than once a minute, but
96 96 will sleep until it can be called:
97 97 @Throttle(minutes=1)
98 98 def foo():
99 99 pass
100 100
101 101 for i in range(10):
102 102 foo()
103 103 print "This function has run %s times." % i
104 104 '''
105 105
106 106 def __init__(self, seconds=0, minutes=0, hours=0):
107 107 self.throttle_period = datetime.timedelta(
108 108 seconds=seconds, minutes=minutes, hours=hours
109 109 )
110 110
111 111 self.time_of_last_call = datetime.datetime.min
112 112
113 113 def __call__(self, fn):
114 114 @wraps(fn)
115 115 def wrapper(*args, **kwargs):
116 116 coerce = kwargs.pop('coerce', None)
117 117 if coerce:
118 118 self.time_of_last_call = datetime.datetime.now()
119 119 return fn(*args, **kwargs)
120 120 else:
121 121 now = datetime.datetime.now()
122 122 time_since_last_call = now - self.time_of_last_call
123 123 time_left = self.throttle_period - time_since_last_call
124 124
125 125 if time_left > datetime.timedelta(seconds=0):
126 126 return
127 127
128 128 self.time_of_last_call = datetime.datetime.now()
129 129 return fn(*args, **kwargs)
130 130
131 131 return wrapper
132 132
133 133 def apply_throttle(value):
134 134
135 135 @Throttle(seconds=value)
136 136 def fnThrottled(fn):
137 137 fn()
138 138
139 139 return fnThrottled
140 140
141 141
142 142 @MPDecorator
143 143 class Plot(Operation):
144 144 '''
145 145 Base class for Schain plotting operations
146 146 '''
147 147
148 148 CODE = 'Figure'
149 149 colormap = 'jet'
150 150 bgcolor = 'white'
151 151 __missing = 1E30
152 152
153 153 __attrs__ = ['show', 'save', 'xmin', 'xmax', 'ymin', 'ymax', 'zmin', 'zmax',
154 154 'zlimits', 'xlabel', 'ylabel', 'xaxis', 'cb_label', 'title',
155 155 'colorbar', 'bgcolor', 'width', 'height', 'localtime', 'oneFigure',
156 156 'showprofile', 'decimation', 'pause']
157 157
158 158 def __init__(self):
159 159
160 160 Operation.__init__(self)
161 161 self.isConfig = False
162 162 self.isPlotConfig = False
163 163 self.save_counter = 1
164 164 self.sender_counter = 1
165 165 self.data = None
166 166
167 167 def __fmtTime(self, x, pos):
168 168 '''
169 169 '''
170 170
171 171 return '{}'.format(self.getDateTime(x).strftime('%H:%M'))
172 172
173 173 def __setup(self, **kwargs):
174 174 '''
175 175 Initialize variables
176 176 '''
177 177
178 178 self.figures = []
179 179 self.axes = []
180 180 self.cb_axes = []
181 181 self.localtime = kwargs.pop('localtime', True)
182 182 self.show = kwargs.get('show', True)
183 183 self.save = kwargs.get('save', False)
184 184 self.save_period = kwargs.get('save_period', 1)
185 185 self.ftp = kwargs.get('ftp', False)
186 186 self.colormap = kwargs.get('colormap', self.colormap)
187 187 self.colormap_coh = kwargs.get('colormap_coh', 'jet')
188 188 self.colormap_phase = kwargs.get('colormap_phase', 'RdBu_r')
189 189 self.colormaps = kwargs.get('colormaps', None)
190 190 self.bgcolor = kwargs.get('bgcolor', self.bgcolor)
191 191 self.showprofile = kwargs.get('showprofile', False)
192 192 self.title = kwargs.get('wintitle', self.CODE.upper())
193 193 self.cb_label = kwargs.get('cb_label', None)
194 194 self.cb_labels = kwargs.get('cb_labels', None)
195 195 self.labels = kwargs.get('labels', None)
196 196 self.xaxis = kwargs.get('xaxis', 'frequency')
197 197 self.zmin = kwargs.get('zmin', None)
198 198 self.zmax = kwargs.get('zmax', None)
199 199 self.zlimits = kwargs.get('zlimits', None)
200 200 self.xmin = kwargs.get('xmin', None)
201 201 self.xmax = kwargs.get('xmax', None)
202 202 self.xrange = kwargs.get('xrange', 24)
203 203 self.xscale = kwargs.get('xscale', None)
204 204 self.ymin = kwargs.get('ymin', None)
205 205 self.ymax = kwargs.get('ymax', None)
206 206 self.yscale = kwargs.get('yscale', None)
207 207 self.xlabel = kwargs.get('xlabel', None)
208 208 self.decimation = kwargs.get('decimation', None)
209 209 self.showSNR = kwargs.get('showSNR', False)
210 210 self.oneFigure = kwargs.get('oneFigure', True)
211 211 self.width = kwargs.get('width', None)
212 212 self.height = kwargs.get('height', None)
213 213 self.colorbar = kwargs.get('colorbar', True)
214 214 self.factors = kwargs.get('factors', [1, 1, 1, 1, 1, 1, 1, 1])
215 215 self.channels = kwargs.get('channels', None)
216 216 self.titles = kwargs.get('titles', [])
217 217 self.polar = False
218 218 self.type = kwargs.get('type', 'iq')
219 219 self.grid = kwargs.get('grid', False)
220 220 self.pause = kwargs.get('pause', False)
221 221 self.save_labels = kwargs.get('save_labels', None)
222 222 self.realtime = kwargs.get('realtime', True)
223 223 self.buffering = kwargs.get('buffering', True)
224 224 self.throttle = kwargs.get('throttle', 2)
225 225 self.exp_code = kwargs.get('exp_code', None)
226 226 self.plot_server = kwargs.get('plot_server', False)
227 227 self.sender_period = kwargs.get('sender_period', 1)
228 228 self.__throttle_plot = apply_throttle(self.throttle)
229 229 self.data = PlotterData(
230 230 self.CODE, self.throttle, self.exp_code, self.buffering, snr=self.showSNR)
231 231
232 232 if self.plot_server:
233 233 if not self.plot_server.startswith('tcp://'):
234 234 self.plot_server = 'tcp://{}'.format(self.plot_server)
235 235 log.success(
236 236 'Sending to server: {}'.format(self.plot_server),
237 237 self.name
238 238 )
239 239 if 'plot_name' in kwargs:
240 240 self.plot_name = kwargs['plot_name']
241 241
242 242 def __setup_plot(self):
243 243 '''
244 244 Common setup for all figures, here figures and axes are created
245 245 '''
246 246
247 247 self.setup()
248 248
249 249 self.time_label = 'LT' if self.localtime else 'UTC'
250 250
251 251 if self.width is None:
252 252 self.width = 8
253 253
254 254 self.figures = []
255 255 self.axes = []
256 256 self.cb_axes = []
257 257 self.pf_axes = []
258 258 self.cmaps = []
259 259
260 260 size = '15%' if self.ncols == 1 else '30%'
261 261 pad = '4%' if self.ncols == 1 else '8%'
262 262
263 263 if self.oneFigure:
264 264 if self.height is None:
265 265 self.height = 1.4 * self.nrows + 1
266 266 fig = plt.figure(figsize=(self.width, self.height),
267 267 edgecolor='k',
268 268 facecolor='w')
269 269 self.figures.append(fig)
270 270 for n in range(self.nplots):
271 271 ax = fig.add_subplot(self.nrows, self.ncols,
272 272 n + 1, polar=self.polar)
273 273 ax.tick_params(labelsize=8)
274 274 ax.firsttime = True
275 275 ax.index = 0
276 276 ax.press = None
277 277 self.axes.append(ax)
278 278 if self.showprofile:
279 279 cax = self.__add_axes(ax, size=size, pad=pad)
280 280 cax.tick_params(labelsize=8)
281 281 self.pf_axes.append(cax)
282 282 else:
283 283 if self.height is None:
284 284 self.height = 3
285 285 for n in range(self.nplots):
286 286 fig = plt.figure(figsize=(self.width, self.height),
287 287 edgecolor='k',
288 288 facecolor='w')
289 289 ax = fig.add_subplot(1, 1, 1, polar=self.polar)
290 290 ax.tick_params(labelsize=8)
291 291 ax.firsttime = True
292 292 ax.index = 0
293 293 ax.press = None
294 294 self.figures.append(fig)
295 295 self.axes.append(ax)
296 296 if self.showprofile:
297 297 cax = self.__add_axes(ax, size=size, pad=pad)
298 298 cax.tick_params(labelsize=8)
299 299 self.pf_axes.append(cax)
300 300
301 301 for n in range(self.nrows):
302 302 if self.colormaps is not None:
303 303 cmap = plt.get_cmap(self.colormaps[n])
304 304 else:
305 305 cmap = plt.get_cmap(self.colormap)
306 306 cmap.set_bad(self.bgcolor, 1.)
307 307 self.cmaps.append(cmap)
308 308
309 309 for fig in self.figures:
310 310 fig.canvas.mpl_connect('key_press_event', self.OnKeyPress)
311 311 fig.canvas.mpl_connect('scroll_event', self.OnBtnScroll)
312 312 fig.canvas.mpl_connect('button_press_event', self.onBtnPress)
313 313 fig.canvas.mpl_connect('motion_notify_event', self.onMotion)
314 314 fig.canvas.mpl_connect('button_release_event', self.onBtnRelease)
315 315
316 316 def OnKeyPress(self, event):
317 317 '''
318 318 Event for pressing keys (up, down) change colormap
319 319 '''
320 320 ax = event.inaxes
321 321 if ax in self.axes:
322 322 if event.key == 'down':
323 323 ax.index += 1
324 324 elif event.key == 'up':
325 325 ax.index -= 1
326 326 if ax.index < 0:
327 327 ax.index = len(CMAPS) - 1
328 328 elif ax.index == len(CMAPS):
329 329 ax.index = 0
330 330 cmap = CMAPS[ax.index]
331 331 ax.cbar.set_cmap(cmap)
332 332 ax.cbar.draw_all()
333 333 ax.plt.set_cmap(cmap)
334 334 ax.cbar.patch.figure.canvas.draw()
335 335 self.colormap = cmap.name
336 336
337 337 def OnBtnScroll(self, event):
338 338 '''
339 339 Event for scrolling, scale figure
340 340 '''
341 341 cb_ax = event.inaxes
342 342 if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]:
343 343 ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0]
344 344 pt = ax.cbar.ax.bbox.get_points()[:, 1]
345 345 nrm = ax.cbar.norm
346 346 vmin, vmax, p0, p1, pS = (
347 347 nrm.vmin, nrm.vmax, pt[0], pt[1], event.y)
348 348 scale = 2 if event.step == 1 else 0.5
349 349 point = vmin + (vmax - vmin) / (p1 - p0) * (pS - p0)
350 350 ax.cbar.norm.vmin = point - scale * (point - vmin)
351 351 ax.cbar.norm.vmax = point - scale * (point - vmax)
352 352 ax.plt.set_norm(ax.cbar.norm)
353 353 ax.cbar.draw_all()
354 354 ax.cbar.patch.figure.canvas.draw()
355 355
356 356 def onBtnPress(self, event):
357 357 '''
358 358 Event for mouse button press
359 359 '''
360 360 cb_ax = event.inaxes
361 361 if cb_ax is None:
362 362 return
363 363
364 364 if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]:
365 365 cb_ax.press = event.x, event.y
366 366 else:
367 367 cb_ax.press = None
368 368
369 369 def onMotion(self, event):
370 370 '''
371 371 Event for move inside colorbar
372 372 '''
373 373 cb_ax = event.inaxes
374 374 if cb_ax is None:
375 375 return
376 376 if cb_ax not in [ax.cbar.ax for ax in self.axes if ax.cbar]:
377 377 return
378 378 if cb_ax.press is None:
379 379 return
380 380
381 381 ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0]
382 382 xprev, yprev = cb_ax.press
383 383 dx = event.x - xprev
384 384 dy = event.y - yprev
385 385 cb_ax.press = event.x, event.y
386 386 scale = ax.cbar.norm.vmax - ax.cbar.norm.vmin
387 387 perc = 0.03
388 388
389 389 if event.button == 1:
390 390 ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy)
391 391 ax.cbar.norm.vmax -= (perc * scale) * numpy.sign(dy)
392 392 elif event.button == 3:
393 393 ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy)
394 394 ax.cbar.norm.vmax += (perc * scale) * numpy.sign(dy)
395 395
396 396 ax.cbar.draw_all()
397 397 ax.plt.set_norm(ax.cbar.norm)
398 398 ax.cbar.patch.figure.canvas.draw()
399 399
400 400 def onBtnRelease(self, event):
401 401 '''
402 402 Event for mouse button release
403 403 '''
404 404 cb_ax = event.inaxes
405 405 if cb_ax is not None:
406 406 cb_ax.press = None
407 407
408 408 def __add_axes(self, ax, size='30%', pad='8%'):
409 409 '''
410 410 Add new axes to the given figure
411 411 '''
412 412 divider = make_axes_locatable(ax)
413 413 nax = divider.new_horizontal(size=size, pad=pad)
414 414 ax.figure.add_axes(nax)
415 415 return nax
416 416
417 417 def fill_gaps(self, x_buffer, y_buffer, z_buffer):
418 418 '''
419 419 Create a masked array for missing data
420 420 '''
421 421 if x_buffer.shape[0] < 2:
422 422 return x_buffer, y_buffer, z_buffer
423 423
424 424 deltas = x_buffer[1:] - x_buffer[0:-1]
425 425 x_median = numpy.median(deltas)
426 426
427 427 index = numpy.where(deltas > 5 * x_median)
428 428
429 429 if len(index[0]) != 0:
430 430 z_buffer[::, index[0], ::] = self.__missing
431 431 z_buffer = numpy.ma.masked_inside(z_buffer,
432 432 0.99 * self.__missing,
433 433 1.01 * self.__missing)
434 434
435 435 return x_buffer, y_buffer, z_buffer
436 436
437 437 def decimate(self):
438 438
439 439 # dx = int(len(self.x)/self.__MAXNUMX) + 1
440 440 dy = int(len(self.y) / self.decimation) + 1
441 441
442 442 # x = self.x[::dx]
443 443 x = self.x
444 444 y = self.y[::dy]
445 445 z = self.z[::, ::, ::dy]
446 446
447 447 return x, y, z
448 448
449 449 def format(self):
450 450 '''
451 451 Set min and max values, labels, ticks and titles
452 452 '''
453 453
454 454 if self.xmin is None:
455 455 xmin = self.data.min_time
456 456 else:
457 457 if self.xaxis is 'time':
458 458 dt = self.getDateTime(self.data.min_time)
459 459 xmin = (dt.replace(hour=int(self.xmin), minute=0, second=0) -
460 460 datetime.datetime(1970, 1, 1)).total_seconds()
461 461 if self.data.localtime:
462 462 xmin += time.timezone
463 463 else:
464 464 xmin = self.xmin
465 465
466 466 if self.xmax is None:
467 467 xmax = xmin + self.xrange * 60 * 60
468 468 else:
469 469 if self.xaxis is 'time':
470 470 dt = self.getDateTime(self.data.max_time)
471 471 xmax = (dt.replace(hour=int(self.xmax), minute=59, second=59) -
472 472 datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=1)).total_seconds()
473 473 if self.data.localtime:
474 474 xmax += time.timezone
475 475 else:
476 476 xmax = self.xmax
477 477
478 478 ymin = self.ymin if self.ymin else numpy.nanmin(self.y)
479 479 ymax = self.ymax if self.ymax else numpy.nanmax(self.y)
480 480 #Y = numpy.array([1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000])
481 481
482 482 #i = 1 if numpy.where(
483 483 # abs(ymax-ymin) <= Y)[0][0] < 0 else numpy.where(abs(ymax-ymin) <= Y)[0][0]
484 484 #ystep = Y[i] / 10.
485 485 dig = int(numpy.log10(ymax))
486 486 if dig == 0:
487 487 digD = len(str(ymax)) - 2
488 488 ydec = ymax*(10**digD)
489 489
490 490 dig = int(numpy.log10(ydec))
491 491 ystep = ((ydec + (10**(dig)))//10**(dig))*(10**(dig))
492 492 ystep = ystep/5
493 493 ystep = ystep/(10**digD)
494 494
495 495 else:
496 496 ystep = ((ymax + (10**(dig)))//10**(dig))*(10**(dig))
497 497 ystep = ystep/5
498 498
499 499 if self.xaxis is not 'time':
500 500
501 501 dig = int(numpy.log10(xmax))
502 502
503 503 if dig <= 0:
504 504 digD = len(str(xmax)) - 2
505 505 xdec = xmax*(10**digD)
506 506
507 507 dig = int(numpy.log10(xdec))
508 508 xstep = ((xdec + (10**(dig)))//10**(dig))*(10**(dig))
509 509 xstep = xstep*0.5
510 510 xstep = xstep/(10**digD)
511 511
512 512 else:
513 513 xstep = ((xmax + (10**(dig)))//10**(dig))*(10**(dig))
514 514 xstep = xstep/5
515 515
516 516 for n, ax in enumerate(self.axes):
517 517 if ax.firsttime:
518 518 ax.set_facecolor(self.bgcolor)
519 519 ax.yaxis.set_major_locator(MultipleLocator(ystep))
520 520 if self.xscale:
521 521 ax.xaxis.set_major_formatter(FuncFormatter(
522 522 lambda x, pos: '{0:g}'.format(x*self.xscale)))
523 523 if self.xscale:
524 524 ax.yaxis.set_major_formatter(FuncFormatter(
525 525 lambda x, pos: '{0:g}'.format(x*self.yscale)))
526 526 if self.xaxis is 'time':
527 527 ax.xaxis.set_major_formatter(FuncFormatter(self.__fmtTime))
528 528 ax.xaxis.set_major_locator(LinearLocator(9))
529 529 else:
530 530 ax.xaxis.set_major_locator(MultipleLocator(xstep))
531 531 if self.xlabel is not None:
532 532 ax.set_xlabel(self.xlabel)
533 533 ax.set_ylabel(self.ylabel)
534 534 ax.firsttime = False
535 535 if self.showprofile:
536 536 self.pf_axes[n].set_ylim(ymin, ymax)
537 537 self.pf_axes[n].set_xlim(self.zmin, self.zmax)
538 538 self.pf_axes[n].set_xlabel('dB')
539 539 self.pf_axes[n].grid(b=True, axis='x')
540 540 [tick.set_visible(False)
541 541 for tick in self.pf_axes[n].get_yticklabels()]
542 542 if self.colorbar:
543 543 ax.cbar = plt.colorbar(
544 544 ax.plt, ax=ax, fraction=0.05, pad=0.02, aspect=10)
545 545 ax.cbar.ax.tick_params(labelsize=8)
546 546 ax.cbar.ax.press = None
547 547 if self.cb_label:
548 548 ax.cbar.set_label(self.cb_label, size=8)
549 549 elif self.cb_labels:
550 550 ax.cbar.set_label(self.cb_labels[n], size=8)
551 551 else:
552 552 ax.cbar = None
553 553 if self.grid:
554 554 ax.grid(True)
555 555
556 556 if not self.polar:
557 557 ax.set_xlim(xmin, xmax)
558 558 ax.set_ylim(ymin, ymax)
559 559 ax.set_title('{} {} {}'.format(
560 560 self.titles[n],
561 561 self.getDateTime(self.data.max_time).strftime(
562 562 '%Y-%m-%d %H:%M:%S'),
563 563 self.time_label),
564 564 size=8)
565 565 else:
566 566 ax.set_title('{}'.format(self.titles[n]), size=8)
567 567 ax.set_ylim(0, 90)
568 568 ax.set_yticks(numpy.arange(0, 90, 20))
569 569 ax.yaxis.labelpad = 40
570 570
571 571 def clear_figures(self):
572 572 '''
573 573 Reset axes for redraw plots
574 574 '''
575 575
576 576 for ax in self.axes:
577 577 ax.clear()
578 578 ax.firsttime = True
579 579 if ax.cbar:
580 580 ax.cbar.remove()
581 581
582 582 def __plot(self):
583 583 '''
584 584 Main function to plot, format and save figures
585 585 '''
586 586
587 587 try:
588 588 self.plot()
589 589 self.format()
590 590 except Exception as e:
591 591 log.warning('{} Plot could not be updated... check data'.format(
592 592 self.CODE), self.name)
593 593 log.error(str(e), '')
594 594 return
595 595
596 596 for n, fig in enumerate(self.figures):
597 597 if self.nrows == 0 or self.nplots == 0:
598 598 log.warning('No data', self.name)
599 599 fig.text(0.5, 0.5, 'No Data', fontsize='large', ha='center')
600 600 fig.canvas.manager.set_window_title(self.CODE)
601 601 continue
602 602
603 603 fig.tight_layout()
604 604 fig.canvas.manager.set_window_title('{} - {}'.format(self.title,
605 605 self.getDateTime(self.data.max_time).strftime('%Y/%m/%d')))
606 606 fig.canvas.draw()
607 607 if self.show:
608 608 fig.show()
609 609 figpause(0.1)
610 610
611 611 if self.save:
612 612 self.save_figure(n)
613 613
614 614 if self.plot_server:
615 615 self.send_to_server()
616 616 # t = Thread(target=self.send_to_server)
617 617 # t.start()
618 618
619 619 def save_figure(self, n):
620 620 '''
621 621 '''
622 622
623 623 if self.save_counter < self.save_period:
624 624 self.save_counter += 1
625 625 return
626 626
627 627 self.save_counter = 1
628 628
629 629 fig = self.figures[n]
630 630
631 631 if self.save_labels:
632 632 labels = self.save_labels
633 633 else:
634 634 labels = list(range(self.nrows))
635 635
636 636 if self.oneFigure:
637 637 label = ''
638 638 else:
639 639 label = '-{}'.format(labels[n])
640 640 figname = os.path.join(
641 641 self.save,
642 642 self.CODE,
643 643 '{}{}_{}.png'.format(
644 644 self.CODE,
645 645 label,
646 self.getDateTime(self.data.max_time).strftime(
647 '%Y%m%d_%H%M%S'
648 ),
646 self.getDateTime(self.data.max_time).strftime('%Y%m%d_%H%M%S'),
649 647 )
650 648 )
649
651 650 log.log('Saving figure: {}'.format(figname), self.name)
652 651 if not os.path.isdir(os.path.dirname(figname)):
653 652 os.makedirs(os.path.dirname(figname))
654 653 fig.savefig(figname)
655 654
656 655 if self.realtime:
657 656 figname = os.path.join(
658 657 self.save,
659 658 '{}{}_{}.png'.format(
660 659 self.CODE,
661 660 label,
662 661 self.getDateTime(self.data.min_time).strftime(
663 662 '%Y%m%d'
664 663 ),
665 664 )
666 665 )
667 666 fig.savefig(figname)
668 667
669 668 def send_to_server(self):
670 669 '''
671 670 '''
672 671
673 672 if self.sender_counter < self.sender_period:
674 673 self.sender_counter += 1
675 674 return
676 675
677 676 self.sender_counter = 1
678 677 self.data.meta['titles'] = self.titles
679 678 retries = 2
680 679 while True:
681 680 self.socket.send_string(self.data.jsonify(self.plot_name, self.plot_type))
682 681 socks = dict(self.poll.poll(5000))
683 682 if socks.get(self.socket) == zmq.POLLIN:
684 683 reply = self.socket.recv_string()
685 684 if reply == 'ok':
686 685 log.log("Response from server ok", self.name)
687 686 break
688 687 else:
689 688 log.warning(
690 689 "Malformed reply from server: {}".format(reply), self.name)
691 690
692 691 else:
693 692 log.warning(
694 693 "No response from server, retrying...", self.name)
695 694 self.socket.setsockopt(zmq.LINGER, 0)
696 695 self.socket.close()
697 696 self.poll.unregister(self.socket)
698 697 retries -= 1
699 698 if retries == 0:
700 699 log.error(
701 700 "Server seems to be offline, abandoning", self.name)
702 701 self.socket = self.context.socket(zmq.REQ)
703 702 self.socket.connect(self.plot_server)
704 703 self.poll.register(self.socket, zmq.POLLIN)
705 704 time.sleep(1)
706 705 break
707 706 self.socket = self.context.socket(zmq.REQ)
708 707 self.socket.connect(self.plot_server)
709 708 self.poll.register(self.socket, zmq.POLLIN)
710 709 time.sleep(0.5)
711 710
712 711 def setup(self):
713 712 '''
714 713 This method should be implemented in the child class, the following
715 714 attributes should be set:
716 715
717 716 self.nrows: number of rows
718 717 self.ncols: number of cols
719 718 self.nplots: number of plots (channels or pairs)
720 719 self.ylabel: label for Y axes
721 720 self.titles: list of axes title
722 721
723 722 '''
724 723 raise NotImplementedError
725 724
726 725 def plot(self):
727 726 '''
728 727 Must be defined in the child class
729 728 '''
730 729 raise NotImplementedError
731 730
732 731 def run(self, dataOut, **kwargs):
733 732 '''
734 733 Main plotting routine
735 734 '''
736 735
737 736 if self.isConfig is False:
738 737 self.__setup(**kwargs)
739 738 if dataOut.type == 'Parameters':
740 739 t = dataOut.utctimeInit
741 740 else:
742 741 t = dataOut.utctime
743 742
744 743 if dataOut.useLocalTime:
745 744 self.getDateTime = datetime.datetime.fromtimestamp
746 745 if not self.localtime:
747 746 t += time.timezone
748 747 else:
749 748 self.getDateTime = datetime.datetime.utcfromtimestamp
750 749 if self.localtime:
751 750 t -= time.timezone
752 751
753 752 if 'buffer' in self.plot_type:
754 753 if self.xmin is None:
755 754 self.tmin = t
756 755 else:
757 756 self.tmin = (
758 757 self.getDateTime(t).replace(
759 758 hour=self.xmin,
760 759 minute=0,
761 760 second=0) - self.getDateTime(0)).total_seconds()
762 761
763 762 self.data.setup()
764 763 self.isConfig = True
765 764 if self.plot_server:
766 765 self.context = zmq.Context()
767 766 self.socket = self.context.socket(zmq.REQ)
768 767 self.socket.connect(self.plot_server)
769 768 self.poll = zmq.Poller()
770 769 self.poll.register(self.socket, zmq.POLLIN)
771 770
772 771 if dataOut.type == 'Parameters':
773 772 tm = dataOut.utctimeInit
774 773 else:
775 774 tm = dataOut.utctime
776 775
777 776 if not dataOut.useLocalTime and self.localtime:
778 777 tm -= time.timezone
779 778 if dataOut.useLocalTime and not self.localtime:
780 779 tm += time.timezone
781 780
782 781 if self.xaxis is 'time' and self.data and (tm - self.tmin) >= self.xrange*60*60:
783 782 self.save_counter = self.save_period
784 783 self.__plot()
785 784 self.xmin += self.xrange
786 785 if self.xmin >= 24:
787 786 self.xmin -= 24
788 787 self.tmin += self.xrange*60*60
789 788 self.data.setup()
790 789 self.clear_figures()
791 790
792 791 self.data.update(dataOut, tm)
793 792
794 793 if self.isPlotConfig is False:
795 794 self.__setup_plot()
796 795 self.isPlotConfig = True
797 796
798 797 if self.realtime:
799 798 self.__plot()
800 799 else:
801 800 self.__throttle_plot(self.__plot)#, coerce=coerce)
802 801
803 802 def close(self):
804 803
805 804 if self.data:
806 805 self.save_counter = self.save_period
807 806 self.__plot()
808 807 if self.data and self.pause:
809 808 figpause(10)
810
@@ -1,629 +1,649
1 1 '''
2 2 Created on Set 9, 2015
3 3
4 4 @author: roj-idl71 Karim Kuyeng
5 5 '''
6 6
7 7 import os
8 8 import sys
9 9 import glob
10 10 import fnmatch
11 11 import datetime
12 12 import time
13 13 import re
14 14 import h5py
15 15 import numpy
16 16
17 17 try:
18 18 from gevent import sleep
19 19 except:
20 20 from time import sleep
21 21
22 22 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
23 23 from schainpy.model.data.jrodata import Voltage
24 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
24 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
25 25 from numpy import imag
26 26
27 @MPDecorator
27 28 class AMISRReader(ProcessingUnit):
28 29 '''
29 30 classdocs
30 31 '''
31 32
32 33 def __init__(self):
33 34 '''
34 35 Constructor
35 36 '''
36 37
37 38 ProcessingUnit.__init__(self)
38 39
39 40 self.set = None
40 41 self.subset = None
41 42 self.extension_file = '.h5'
42 43 self.dtc_str = 'dtc'
43 44 self.dtc_id = 0
44 45 self.status = True
45 46 self.isConfig = False
46 47 self.dirnameList = []
47 48 self.filenameList = []
48 49 self.fileIndex = None
49 50 self.flagNoMoreFiles = False
50 51 self.flagIsNewFile = 0
51 52 self.filename = ''
52 53 self.amisrFilePointer = None
53 54
54 55
55 self.dataset = None
56 #self.dataset = None
56 57
57 58
58 59
59 60
60 61 self.profileIndex = 0
61 62
62 63
63 64 self.beamCodeByFrame = None
64 65 self.radacTimeByFrame = None
65 66
66 67 self.dataset = None
67 68
68 69
69 70
70 71
71 72 self.__firstFile = True
72 73
73 74 self.buffer = None
74 75
75 76
76 77 self.timezone = 'ut'
77 78
78 79 self.__waitForNewFile = 20
79 80 self.__filename_online = None
80 81 #Is really necessary create the output object in the initializer
81 82 self.dataOut = Voltage()
83 self.dataOut.error=False
82 84
83 85 def setup(self,path=None,
84 86 startDate=None,
85 87 endDate=None,
86 88 startTime=None,
87 89 endTime=None,
88 90 walk=True,
89 91 timezone='ut',
90 92 all=0,
91 93 code = None,
92 94 nCode = 0,
93 95 nBaud = 0,
94 96 online=False):
95 97
98 #print ("T",path)
99
96 100 self.timezone = timezone
97 101 self.all = all
98 102 self.online = online
99 103
100 104 self.code = code
101 105 self.nCode = int(nCode)
102 106 self.nBaud = int(nBaud)
103 107
104 108
105 109
106 110 #self.findFiles()
107 111 if not(online):
108 112 #Busqueda de archivos offline
109 113 self.searchFilesOffLine(path, startDate, endDate, startTime, endTime, walk)
110 114 else:
111 115 self.searchFilesOnLine(path, startDate, endDate, startTime,endTime,walk)
112 116
113 117 if not(self.filenameList):
114 118 print("There is no files into the folder: %s"%(path))
115
116 119 sys.exit(-1)
117 120
118 121 self.fileIndex = -1
119 122
120 123 self.readNextFile(online)
121 124
122 125 '''
123 126 Add code
124 127 '''
125 128 self.isConfig = True
126 129
127 130 pass
128 131
129 132
130 133 def readAMISRHeader(self,fp):
131 134 header = 'Raw11/Data/RadacHeader'
132 135 self.beamCodeByPulse = fp.get(header+'/BeamCode') # LIST OF BEAMS PER PROFILE, TO BE USED ON REARRANGE
133 136 self.beamCode = fp.get('Raw11/Data/Beamcodes') # NUMBER OF CHANNELS AND IDENTIFY POSITION TO CREATE A FILE WITH THAT INFO
134 137 #self.code = fp.get(header+'/Code') # NOT USE FOR THIS
135 138 self.frameCount = fp.get(header+'/FrameCount')# NOT USE FOR THIS
136 139 self.modeGroup = fp.get(header+'/ModeGroup')# NOT USE FOR THIS
137 140 self.nsamplesPulse = fp.get(header+'/NSamplesPulse')# TO GET NSA OR USING DATA FOR THAT
138 141 self.pulseCount = fp.get(header+'/PulseCount')# NOT USE FOR THIS
139 142 self.radacTime = fp.get(header+'/RadacTime')# 1st TIME ON FILE ANDE CALCULATE THE REST WITH IPP*nindexprofile
140 143 self.timeCount = fp.get(header+'/TimeCount')# NOT USE FOR THIS
141 144 self.timeStatus = fp.get(header+'/TimeStatus')# NOT USE FOR THIS
142 145 self.rangeFromFile = fp.get('Raw11/Data/Samples/Range')
143 146 self.frequency = fp.get('Rx/Frequency')
144 147 txAus = fp.get('Raw11/Data/Pulsewidth')
145 148
146 149
147 150 self.nblocks = self.pulseCount.shape[0] #nblocks
148 151
149 152 self.nprofiles = self.pulseCount.shape[1] #nprofile
150 153 self.nsa = self.nsamplesPulse[0,0] #ngates
151 154 self.nchannels = self.beamCode.shape[1]
152 155 self.ippSeconds = (self.radacTime[0][1] -self.radacTime[0][0]) #Ipp in seconds
153 156 #self.__waitForNewFile = self.nblocks # wait depending on the number of blocks since each block is 1 sec
154 157 self.__waitForNewFile = self.nblocks * self.nprofiles * self.ippSeconds # wait until new file is created
155 158
156 159 #filling radar controller header parameters
157 160 self.__ippKm = self.ippSeconds *.15*1e6 # in km
158 161 self.__txA = (txAus.value)*.15 #(ipp[us]*.15km/1us) in km
159 162 self.__txB = 0
160 163 nWindows=1
161 164 self.__nSamples = self.nsa
162 165 self.__firstHeight = self.rangeFromFile[0][0]/1000 #in km
163 166 self.__deltaHeight = (self.rangeFromFile[0][1] - self.rangeFromFile[0][0])/1000
164 167
165 168 #for now until understand why the code saved is different (code included even though code not in tuf file)
166 169 #self.__codeType = 0
167 170 # self.__nCode = None
168 171 # self.__nBaud = None
169 172 self.__code = self.code
170 173 self.__codeType = 0
171 174 if self.code != None:
172 175 self.__codeType = 1
173 176 self.__nCode = self.nCode
174 177 self.__nBaud = self.nBaud
175 178 #self.__code = 0
176 179
177 180 #filling system header parameters
178 181 self.__nSamples = self.nsa
179 182 self.newProfiles = self.nprofiles/self.nchannels
180 183 self.__channelList = list(range(self.nchannels))
181 184
182 185 self.__frequency = self.frequency[0][0]
183 186
184 187
185 188
186 189 def createBuffers(self):
187 190
188 191 pass
189 192
190 193 def __setParameters(self,path='', startDate='',endDate='',startTime='', endTime='', walk=''):
191 194 self.path = path
192 195 self.startDate = startDate
193 196 self.endDate = endDate
194 197 self.startTime = startTime
195 198 self.endTime = endTime
196 199 self.walk = walk
197 200
198 201 def __checkPath(self):
199 202 if os.path.exists(self.path):
200 203 self.status = 1
201 204 else:
202 205 self.status = 0
203 206 print('Path:%s does not exists'%self.path)
204 207
205 208 return
206 209
207 210
208 211 def __selDates(self, amisr_dirname_format):
209 212 try:
210 213 year = int(amisr_dirname_format[0:4])
211 214 month = int(amisr_dirname_format[4:6])
212 215 dom = int(amisr_dirname_format[6:8])
213 216 thisDate = datetime.date(year,month,dom)
214 217
215 218 if (thisDate>=self.startDate and thisDate <= self.endDate):
216 219 return amisr_dirname_format
217 220 except:
218 221 return None
219 222
220 223
221 224 def __findDataForDates(self,online=False):
222 225
223 226 if not(self.status):
224 227 return None
225 228
226 229 pat = '\d+.\d+'
227 230 dirnameList = [re.search(pat,x) for x in os.listdir(self.path)]
228 231 dirnameList = [x for x in dirnameList if x!=None]
229 232 dirnameList = [x.string for x in dirnameList]
230 233 if not(online):
231 234 dirnameList = [self.__selDates(x) for x in dirnameList]
232 235 dirnameList = [x for x in dirnameList if x!=None]
233 236 if len(dirnameList)>0:
234 237 self.status = 1
235 238 self.dirnameList = dirnameList
236 239 self.dirnameList.sort()
237 240 else:
238 241 self.status = 0
239 242 return None
240 243
241 244 def __getTimeFromData(self):
242 245 startDateTime_Reader = datetime.datetime.combine(self.startDate,self.startTime)
243 246 endDateTime_Reader = datetime.datetime.combine(self.endDate,self.endTime)
244 247
245 248 print('Filtering Files from %s to %s'%(startDateTime_Reader, endDateTime_Reader))
246 249 print('........................................')
247 250 filter_filenameList = []
248 251 self.filenameList.sort()
249 252 #for i in range(len(self.filenameList)-1):
250 253 for i in range(len(self.filenameList)):
251 254 filename = self.filenameList[i]
252 255 fp = h5py.File(filename,'r')
253 256 time_str = fp.get('Time/RadacTimeString')
254 257
255 startDateTimeStr_File = time_str[0][0].split('.')[0]
258 startDateTimeStr_File = time_str[0][0].decode('UTF-8').split('.')[0]
259 #startDateTimeStr_File = "2019-12-16 09:21:11"
256 260 junk = time.strptime(startDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
257 261 startDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
258 262
259 endDateTimeStr_File = time_str[-1][-1].split('.')[0]
263 #endDateTimeStr_File = "2019-12-16 11:10:11"
264 endDateTimeStr_File = time_str[-1][-1].decode('UTF-8').split('.')[0]
260 265 junk = time.strptime(endDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
261 266 endDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
262 267
263 268 fp.close()
264 269
270 #print("check time", startDateTime_File)
265 271 if self.timezone == 'lt':
266 272 startDateTime_File = startDateTime_File - datetime.timedelta(minutes = 300)
267 273 endDateTime_File = endDateTime_File - datetime.timedelta(minutes = 300)
268
269 274 if (endDateTime_File>=startDateTime_Reader and endDateTime_File<endDateTime_Reader):
270 275 #self.filenameList.remove(filename)
271 276 filter_filenameList.append(filename)
272 277
273 278 if (endDateTime_File>=endDateTime_Reader):
274 279 break
275 280
276 281
277 282 filter_filenameList.sort()
278 283 self.filenameList = filter_filenameList
279 284 return 1
280 285
281 286 def __filterByGlob1(self, dirName):
282 287 filter_files = glob.glob1(dirName, '*.*%s'%self.extension_file)
283 288 filter_files.sort()
284 289 filterDict = {}
285 290 filterDict.setdefault(dirName)
286 291 filterDict[dirName] = filter_files
287 292 return filterDict
288 293
289 294 def __getFilenameList(self, fileListInKeys, dirList):
290 295 for value in fileListInKeys:
291 296 dirName = list(value.keys())[0]
292 297 for file in value[dirName]:
293 298 filename = os.path.join(dirName, file)
294 299 self.filenameList.append(filename)
295 300
296 301
297 302 def __selectDataForTimes(self, online=False):
298 303 #aun no esta implementado el filtro for tiempo
299 304 if not(self.status):
300 305 return None
301 306
302 307 dirList = [os.path.join(self.path,x) for x in self.dirnameList]
303 308
304 309 fileListInKeys = [self.__filterByGlob1(x) for x in dirList]
305 310
306 311 self.__getFilenameList(fileListInKeys, dirList)
307 312 if not(online):
308 313 #filtro por tiempo
309 314 if not(self.all):
310 315 self.__getTimeFromData()
311 316
312 317 if len(self.filenameList)>0:
313 318 self.status = 1
314 319 self.filenameList.sort()
315 320 else:
316 321 self.status = 0
317 322 return None
318 323
319 324 else:
320 325 #get the last file - 1
321 326 self.filenameList = [self.filenameList[-2]]
322 327
323 328 new_dirnameList = []
324 329 for dirname in self.dirnameList:
325 330 junk = numpy.array([dirname in x for x in self.filenameList])
326 331 junk_sum = junk.sum()
327 332 if junk_sum > 0:
328 333 new_dirnameList.append(dirname)
329 334 self.dirnameList = new_dirnameList
330 335 return 1
331 336
332 337 def searchFilesOnLine(self, path, startDate, endDate, startTime=datetime.time(0,0,0),
333 338 endTime=datetime.time(23,59,59),walk=True):
334 339
335 340 if endDate ==None:
336 341 startDate = datetime.datetime.utcnow().date()
337 342 endDate = datetime.datetime.utcnow().date()
338 343
339 344 self.__setParameters(path=path, startDate=startDate, endDate=endDate,startTime = startTime,endTime=endTime, walk=walk)
340 345
341 346 self.__checkPath()
342 347
343 348 self.__findDataForDates(online=True)
344 349
345 350 self.dirnameList = [self.dirnameList[-1]]
346 351
347 352 self.__selectDataForTimes(online=True)
348 353
349 354 return
350 355
351 356
352 357 def searchFilesOffLine(self,
353 358 path,
354 359 startDate,
355 360 endDate,
356 361 startTime=datetime.time(0,0,0),
357 362 endTime=datetime.time(23,59,59),
358 363 walk=True):
359 364
360 365 self.__setParameters(path, startDate, endDate, startTime, endTime, walk)
361 366
362 367 self.__checkPath()
363 368
364 369 self.__findDataForDates()
365 370
366 371 self.__selectDataForTimes()
367 372
368 373 for i in range(len(self.filenameList)):
369 374 print("%s" %(self.filenameList[i]))
370 375
371 376 return
372 377
373 378 def __setNextFileOffline(self):
374 379 idFile = self.fileIndex
375 380
376 381 while (True):
377 382 idFile += 1
378 383 if not(idFile < len(self.filenameList)):
379 384 self.flagNoMoreFiles = 1
380 385 print("No more Files")
386 self.dataOut.error = True
381 387 return 0
382 388
383 389 filename = self.filenameList[idFile]
384 390
385 391 amisrFilePointer = h5py.File(filename,'r')
386 392
387 393 break
388 394
389 395 self.flagIsNewFile = 1
390 396 self.fileIndex = idFile
391 397 self.filename = filename
392 398
393 399 self.amisrFilePointer = amisrFilePointer
394 400
395 401 print("Setting the file: %s"%self.filename)
396 402
397 403 return 1
398 404
399 405
400 406 def __setNextFileOnline(self):
401 407 filename = self.filenameList[0]
402 408 if self.__filename_online != None:
403 409 self.__selectDataForTimes(online=True)
404 410 filename = self.filenameList[0]
405 411 wait = 0
406 412 while self.__filename_online == filename:
407 413 print('waiting %d seconds to get a new file...'%(self.__waitForNewFile))
408 414 if wait == 5:
409 415 return 0
410 416 sleep(self.__waitForNewFile)
411 417 self.__selectDataForTimes(online=True)
412 418 filename = self.filenameList[0]
413 419 wait += 1
414 420
415 421 self.__filename_online = filename
416 422
417 423 self.amisrFilePointer = h5py.File(filename,'r')
418 424 self.flagIsNewFile = 1
419 425 self.filename = filename
420 426 print("Setting the file: %s"%self.filename)
421 427 return 1
422 428
423 429
424 430 def readData(self):
425 431 buffer = self.amisrFilePointer.get('Raw11/Data/Samples/Data')
426 432 re = buffer[:,:,:,0]
427 433 im = buffer[:,:,:,1]
428 434 dataset = re + im*1j
435
429 436 self.radacTime = self.amisrFilePointer.get('Raw11/Data/RadacHeader/RadacTime')
430 437 timeset = self.radacTime[:,0]
438
431 439 return dataset,timeset
432 440
433 441 def reshapeData(self):
434 442 #self.beamCodeByPulse, self.beamCode, self.nblocks, self.nprofiles, self.nsa,
435 443 channels = self.beamCodeByPulse[0,:]
436 444 nchan = self.nchannels
437 445 #self.newProfiles = self.nprofiles/nchan #must be defined on filljroheader
438 446 nblocks = self.nblocks
439 447 nsamples = self.nsa
440 448
441 449 #Dimensions : nChannels, nProfiles, nSamples
442 new_block = numpy.empty((nblocks, nchan, self.newProfiles, nsamples), dtype="complex64")
450 new_block = numpy.empty((nblocks, nchan, numpy.int_(self.newProfiles), nsamples), dtype="complex64")
443 451 ############################################
444 452
445 453 for thisChannel in range(nchan):
446 454 new_block[:,thisChannel,:,:] = self.dataset[:,numpy.where(channels==self.beamCode[0][thisChannel])[0],:]
447 455
448 456
449 457 new_block = numpy.transpose(new_block, (1,0,2,3))
450 458 new_block = numpy.reshape(new_block, (nchan,-1, nsamples))
451 459
452 460 return new_block
453 461
454 462 def updateIndexes(self):
455 463
456 464 pass
457 465
458 466 def fillJROHeader(self):
459 467
460 468 #fill radar controller header
461 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(ippKm=self.__ippKm,
469 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(ipp=self.__ippKm,
462 470 txA=self.__txA,
463 471 txB=0,
464 472 nWindows=1,
465 473 nHeights=self.__nSamples,
466 474 firstHeight=self.__firstHeight,
467 475 deltaHeight=self.__deltaHeight,
468 476 codeType=self.__codeType,
469 477 nCode=self.__nCode, nBaud=self.__nBaud,
470 478 code = self.__code,
471 479 fClock=1)
472 480
473
474
475 481 #fill system header
476 482 self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples,
477 483 nProfiles=self.newProfiles,
478 484 nChannels=len(self.__channelList),
479 485 adcResolution=14,
480 pciDioBusWith=32)
486 pciDioBusWidth=32)
481 487
482 488 self.dataOut.type = "Voltage"
483 489
484 490 self.dataOut.data = None
485 491
486 492 self.dataOut.dtype = numpy.dtype([('real','<i8'),('imag','<i8')])
487 493
488 494 # self.dataOut.nChannels = 0
489 495
490 496 # self.dataOut.nHeights = 0
491 497
492 498 self.dataOut.nProfiles = self.newProfiles*self.nblocks
493 499
494 500 #self.dataOut.heightList = self.__firstHeigth + numpy.arange(self.__nSamples, dtype = numpy.float)*self.__deltaHeigth
495 501 ranges = numpy.reshape(self.rangeFromFile.value,(-1))
496 502 self.dataOut.heightList = ranges/1000.0 #km
497 503
498 504
499 505 self.dataOut.channelList = self.__channelList
500 506
501 507 self.dataOut.blocksize = self.dataOut.getNChannels() * self.dataOut.getNHeights()
502 508
503 509 # self.dataOut.channelIndexList = None
504 510
505 511 self.dataOut.flagNoData = True
506 512
507 513 #Set to TRUE if the data is discontinuous
508 514 self.dataOut.flagDiscontinuousBlock = False
509 515
510 516 self.dataOut.utctime = None
511 517
512 518 #self.dataOut.timeZone = -5 #self.__timezone/60 #timezone like jroheader, difference in minutes between UTC and localtime
513 519 if self.timezone == 'lt':
514 520 self.dataOut.timeZone = time.timezone / 60. #get the timezone in minutes
515 521 else:
516 522 self.dataOut.timeZone = 0 #by default time is UTC
517 523
518 524 self.dataOut.dstFlag = 0
519 525
520 526 self.dataOut.errorCount = 0
521 527
522 528 self.dataOut.nCohInt = 1
523 529
524 530 self.dataOut.flagDecodeData = False #asumo que la data esta decodificada
525 531
526 532 self.dataOut.flagDeflipData = False #asumo que la data esta sin flip
527 533
528 534 self.dataOut.flagShiftFFT = False
529 535
530 536 self.dataOut.ippSeconds = self.ippSeconds
531 537
532 538 #Time interval between profiles
533 539 #self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt
534 540
535 541 self.dataOut.frequency = self.__frequency
536
537 542 self.dataOut.realtime = self.online
538 543 pass
539 544
540 545 def readNextFile(self,online=False):
541 546
542 547 if not(online):
543 548 newFile = self.__setNextFileOffline()
544 549 else:
545 550 newFile = self.__setNextFileOnline()
546 551
547 552 if not(newFile):
548 553 return 0
549
550 554 #if self.__firstFile:
551 555 self.readAMISRHeader(self.amisrFilePointer)
556
552 557 self.createBuffers()
558
553 559 self.fillJROHeader()
560
554 561 #self.__firstFile = False
555 562
556 563
557 564
558 565 self.dataset,self.timeset = self.readData()
559 566
560 567 if self.endDate!=None:
561 568 endDateTime_Reader = datetime.datetime.combine(self.endDate,self.endTime)
562 569 time_str = self.amisrFilePointer.get('Time/RadacTimeString')
563 startDateTimeStr_File = time_str[0][0].split('.')[0]
570 startDateTimeStr_File = time_str[0][0].decode('UTF-8').split('.')[0]
564 571 junk = time.strptime(startDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
565 572 startDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
566 573 if self.timezone == 'lt':
567 574 startDateTime_File = startDateTime_File - datetime.timedelta(minutes = 300)
568 575 if (startDateTime_File>endDateTime_Reader):
569 576 return 0
570 577
571 578 self.jrodataset = self.reshapeData()
572 579 #----self.updateIndexes()
573 580 self.profileIndex = 0
574 581
575 582 return 1
576 583
577 584
578 585 def __hasNotDataInBuffer(self):
579 586 if self.profileIndex >= (self.newProfiles*self.nblocks):
580 587 return 1
581 588 return 0
582 589
583 590
584 591 def getData(self):
585 592
586 593 if self.flagNoMoreFiles:
587 594 self.dataOut.flagNoData = True
588 595 return 0
589 596
590 597 if self.__hasNotDataInBuffer():
591 598 if not (self.readNextFile(self.online)):
592 599 return 0
593 600
594 601
595 if self.dataset is None: # setear esta condicion cuando no hayan datos por leers
602 if self.dataset is None: # setear esta condicion cuando no hayan datos por leer
596 603 self.dataOut.flagNoData = True
597 604 return 0
598 605
599 606 #self.dataOut.data = numpy.reshape(self.jrodataset[self.profileIndex,:],(1,-1))
600 607
601 608 self.dataOut.data = self.jrodataset[:,self.profileIndex,:]
602 609
610 #print("R_t",self.timeset)
611
603 612 #self.dataOut.utctime = self.jrotimeset[self.profileIndex]
604 613 #verificar basic header de jro data y ver si es compatible con este valor
605 614 #self.dataOut.utctime = self.timeset + (self.profileIndex * self.ippSeconds * self.nchannels)
606 615 indexprof = numpy.mod(self.profileIndex, self.newProfiles)
607 616 indexblock = self.profileIndex/self.newProfiles
608 #print indexblock, indexprof
609 self.dataOut.utctime = self.timeset[indexblock] + (indexprof * self.ippSeconds * self.nchannels)
617 #print (indexblock, indexprof)
618 diffUTC = 1.8e4 #UTC diference from peru in seconds --Joab
619 diffUTC = 0
620 t_comp = (indexprof * self.ippSeconds * self.nchannels) + diffUTC #
621 #cambio posible 18/02/2020
622
623
624
625 #print("utc :",indexblock," __ ",t_comp)
626 #print(numpy.shape(self.timeset))
627 self.dataOut.utctime = self.timeset[numpy.int_(indexblock)] + t_comp
628 #self.dataOut.utctime = self.timeset[self.profileIndex] + t_comp
629 #print(self.dataOut.utctime)
610 630 self.dataOut.profileIndex = self.profileIndex
611 631 self.dataOut.flagNoData = False
612 632 # if indexprof == 0:
613 633 # print self.dataOut.utctime
614 634
615 635 self.profileIndex += 1
616 636
617 637 return self.dataOut.data
618 638
619 639
620 640 def run(self, **kwargs):
621 641 '''
622 642 This method will be called many times so here you should put all your code
623 643 '''
624 644
625 645 if not self.isConfig:
626 646 self.setup(**kwargs)
627 647 self.isConfig = True
628 648
629 649 self.getData()
@@ -1,1435 +1,1435
1 1 import numpy
2 2 import time
3 3 import os
4 4 import h5py
5 5 import re
6 6 import datetime
7 7
8 8 import schainpy.admin
9 9 from schainpy.model.data.jrodata import *
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.io.jroIO_base import *
12 12 from schainpy.utils import log
13 13
14 14 @MPDecorator
15 15 class ParamReader(JRODataReader,ProcessingUnit):
16 16 '''
17 17 Reads HDF5 format files
18 18 path
19 19 startDate
20 20 endDate
21 21 startTime
22 22 endTime
23 23 '''
24 24
25 25 ext = ".hdf5"
26 26 optchar = "D"
27 27 timezone = None
28 28 startTime = None
29 29 endTime = None
30 30 fileIndex = None
31 31 utcList = None #To select data in the utctime list
32 32 blockList = None #List to blocks to be read from the file
33 33 blocksPerFile = None #Number of blocks to be read
34 34 blockIndex = None
35 35 path = None
36 36 #List of Files
37 37 filenameList = None
38 38 datetimeList = None
39 39 #Hdf5 File
40 40 listMetaname = None
41 41 listMeta = None
42 42 listDataname = None
43 43 listData = None
44 44 listShapes = None
45 45 fp = None
46 46 #dataOut reconstruction
47 47 dataOut = None
48 48
49 49 def __init__(self):#, **kwargs):
50 50 ProcessingUnit.__init__(self) #, **kwargs)
51 51 self.dataOut = Parameters()
52 52 return
53 53
54 54 def setup(self, **kwargs):
55 55
56 56 path = kwargs['path']
57 57 startDate = kwargs['startDate']
58 58 endDate = kwargs['endDate']
59 59 startTime = kwargs['startTime']
60 60 endTime = kwargs['endTime']
61 61 walk = kwargs['walk']
62 62 if 'ext' in kwargs:
63 63 ext = kwargs['ext']
64 64 else:
65 65 ext = '.hdf5'
66 66 if 'timezone' in kwargs:
67 67 self.timezone = kwargs['timezone']
68 68 else:
69 69 self.timezone = 'lt'
70 70
71 71 print("[Reading] Searching files in offline mode ...")
72 72 pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate,
73 73 startTime=startTime, endTime=endTime,
74 74 ext=ext, walk=walk)
75 75
76 76 if not(filenameList):
77 77 print("There is no files into the folder: %s"%(path))
78 78 sys.exit(-1)
79 79
80 80 self.fileIndex = -1
81 81 self.startTime = startTime
82 82 self.endTime = endTime
83 83
84 84 self.__readMetadata()
85 85
86 86 self.__setNextFileOffline()
87 87
88 88 return
89 89
90 90 def searchFilesOffLine(self,
91 91 path,
92 92 startDate=None,
93 93 endDate=None,
94 94 startTime=datetime.time(0,0,0),
95 95 endTime=datetime.time(23,59,59),
96 96 ext='.hdf5',
97 97 walk=True):
98 98
99 99 expLabel = ''
100 100 self.filenameList = []
101 101 self.datetimeList = []
102 102
103 103 pathList = []
104 104
105 105 JRODataObj = JRODataReader()
106 106 dateList, pathList = JRODataObj.findDatafiles(path, startDate, endDate, expLabel, ext, walk, include_path=True)
107 107
108 108 if dateList == []:
109 109 print("[Reading] No *%s files in %s from %s to %s)"%(ext, path,
110 110 datetime.datetime.combine(startDate,startTime).ctime(),
111 111 datetime.datetime.combine(endDate,endTime).ctime()))
112 112
113 113 return None, None
114 114
115 115 if len(dateList) > 1:
116 116 print("[Reading] %d days were found in date range: %s - %s" %(len(dateList), startDate, endDate))
117 117 else:
118 118 print("[Reading] data was found for the date %s" %(dateList[0]))
119 119
120 120 filenameList = []
121 121 datetimeList = []
122 122
123 123 #----------------------------------------------------------------------------------
124 124
125 125 for thisPath in pathList:
126 126
127 127 fileList = glob.glob1(thisPath, "*%s" %ext)
128 128 fileList.sort()
129 129
130 130 for file in fileList:
131 131
132 132 filename = os.path.join(thisPath,file)
133 133
134 134 if not isFileInDateRange(filename, startDate, endDate):
135 135 continue
136 136
137 137 thisDatetime = self.__isFileInTimeRange(filename, startDate, endDate, startTime, endTime)
138 138
139 139 if not(thisDatetime):
140 140 continue
141 141
142 142 filenameList.append(filename)
143 143 datetimeList.append(thisDatetime)
144 144
145 145 if not(filenameList):
146 146 print("[Reading] Any file was found int time range %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime()))
147 147 return None, None
148 148
149 149 print("[Reading] %d file(s) was(were) found in time range: %s - %s" %(len(filenameList), startTime, endTime))
150 150 print()
151 151
152 152 self.filenameList = filenameList
153 153 self.datetimeList = datetimeList
154 154
155 155 return pathList, filenameList
156 156
157 157 def __isFileInTimeRange(self,filename, startDate, endDate, startTime, endTime):
158 158
159 159 """
160 160 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
161 161
162 162 Inputs:
163 163 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
164 164 startDate : fecha inicial del rango seleccionado en formato datetime.date
165 165 endDate : fecha final del rango seleccionado en formato datetime.date
166 166 startTime : tiempo inicial del rango seleccionado en formato datetime.time
167 167 endTime : tiempo final del rango seleccionado en formato datetime.time
168 168
169 169 Return:
170 170 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
171 171 fecha especificado, de lo contrario retorna False.
172 172
173 173 Excepciones:
174 174 Si el archivo no existe o no puede ser abierto
175 175 Si la cabecera no puede ser leida.
176 176
177 177 """
178 178
179 179 try:
180 180 fp = h5py.File(filename,'r')
181 181 grp1 = fp['Data']
182 182
183 183 except IOError:
184 184 traceback.print_exc()
185 185 raise IOError("The file %s can't be opened" %(filename))
186 186
187 187 #In case has utctime attribute
188 188 grp2 = grp1['utctime']
189 189 # thisUtcTime = grp2.value[0] - 5*3600 #To convert to local time
190 190 thisUtcTime = grp2.value[0]
191 191
192 192 fp.close()
193 193
194 194 if self.timezone == 'lt':
195 195 thisUtcTime -= 5*3600
196 196
197 197 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
198 198 thisDate = thisDatetime.date()
199 199 thisTime = thisDatetime.time()
200 200
201 201 startUtcTime = (datetime.datetime.combine(thisDate,startTime)- datetime.datetime(1970, 1, 1)).total_seconds()
202 202 endUtcTime = (datetime.datetime.combine(thisDate,endTime)- datetime.datetime(1970, 1, 1)).total_seconds()
203 203
204 204 #General case
205 205 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
206 206 #-----------o----------------------------o-----------
207 207 # startTime endTime
208 208
209 209 if endTime >= startTime:
210 210 thisUtcLog = numpy.logical_and(thisUtcTime > startUtcTime, thisUtcTime < endUtcTime)
211 211 if numpy.any(thisUtcLog): #If there is one block between the hours mentioned
212 212 return thisDatetime
213 213 return None
214 214
215 215 #If endTime < startTime then endTime belongs to the next day
216 216 #<<<<<<<<<<<o o>>>>>>>>>>>
217 217 #-----------o----------------------------o-----------
218 218 # endTime startTime
219 219
220 220 if (thisDate == startDate) and numpy.all(thisUtcTime < startUtcTime):
221 221 return None
222 222
223 223 if (thisDate == endDate) and numpy.all(thisUtcTime > endUtcTime):
224 224 return None
225 225
226 226 if numpy.all(thisUtcTime < startUtcTime) and numpy.all(thisUtcTime > endUtcTime):
227 227 return None
228 228
229 229 return thisDatetime
230 230
231 231 def __setNextFileOffline(self):
232 232
233 233 self.fileIndex += 1
234 234 idFile = self.fileIndex
235 235
236 236 if not(idFile < len(self.filenameList)):
237 237 raise schainpy.admin.SchainError("No more Files")
238 238 return 0
239 239
240 240 filename = self.filenameList[idFile]
241 241 filePointer = h5py.File(filename,'r')
242 242 self.filename = filename
243 243 self.fp = filePointer
244 244
245 245 print("Setting the file: %s"%self.filename)
246 246
247 247 self.__setBlockList()
248 248 self.__readData()
249 249 self.blockIndex = 0
250 250 return 1
251 251
252 252 def __setBlockList(self):
253 253 '''
254 254 Selects the data within the times defined
255 255
256 256 self.fp
257 257 self.startTime
258 258 self.endTime
259 259
260 260 self.blockList
261 261 self.blocksPerFile
262 262
263 263 '''
264 264 fp = self.fp
265 265 startTime = self.startTime
266 266 endTime = self.endTime
267 267
268 268 grp = fp['Data']
269 269 thisUtcTime = grp['utctime'].value.astype(numpy.float)[0]
270 270
271 271 #ERROOOOR
272 272 if self.timezone == 'lt':
273 273 thisUtcTime -= 5*3600
274 274
275 275 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
276 276
277 277 thisDate = thisDatetime.date()
278 278 thisTime = thisDatetime.time()
279 279
280 280 startUtcTime = (datetime.datetime.combine(thisDate,startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
281 281 endUtcTime = (datetime.datetime.combine(thisDate,endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
282 282
283 283 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
284 284
285 285 self.blockList = ind
286 286 self.blocksPerFile = len(ind)
287 287
288 288 return
289 289
290 290 def __readMetadata(self):
291 291 '''
292 292 Reads Metadata
293 293
294 294 self.pathMeta
295 295 self.listShapes
296 296 self.listMetaname
297 297 self.listMeta
298 298
299 299 '''
300 300
301 301 filename = self.filenameList[0]
302 302 fp = h5py.File(filename,'r')
303 303 gp = fp['Metadata']
304 304
305 305 listMetaname = []
306 306 listMetadata = []
307 307 for item in list(gp.items()):
308 308 name = item[0]
309 309
310 310 if name=='array dimensions':
311 311 table = gp[name][:]
312 312 listShapes = {}
313 313 for shapes in table:
314 314 listShapes[shapes[0]] = numpy.array([shapes[1],shapes[2],shapes[3],shapes[4],shapes[5]])
315 315 else:
316 316 data = gp[name].value
317 317 listMetaname.append(name)
318 318 listMetadata.append(data)
319 319
320 320 self.listShapes = listShapes
321 321 self.listMetaname = listMetaname
322 322 self.listMeta = listMetadata
323 323
324 324 fp.close()
325 325 return
326 326
327 327 def __readData(self):
328 328 grp = self.fp['Data']
329 329 listdataname = []
330 330 listdata = []
331 331
332 332 for item in list(grp.items()):
333 333 name = item[0]
334 334 listdataname.append(name)
335 335
336 336 array = self.__setDataArray(grp[name],self.listShapes[name])
337 337 listdata.append(array)
338 338
339 339 self.listDataname = listdataname
340 340 self.listData = listdata
341 341 return
342 342
343 343 def __setDataArray(self, dataset, shapes):
344 344
345 345 nDims = shapes[0]
346 346 nDim2 = shapes[1] #Dimension 0
347 347 nDim1 = shapes[2] #Dimension 1, number of Points or Parameters
348 348 nDim0 = shapes[3] #Dimension 2, number of samples or ranges
349 349 mode = shapes[4] #Mode of storing
350 350 blockList = self.blockList
351 351 blocksPerFile = self.blocksPerFile
352 352
353 353 #Depending on what mode the data was stored
354 354 if mode == 0: #Divided in channels
355 355 arrayData = dataset.value.astype(numpy.float)[0][blockList]
356 356 if mode == 1: #Divided in parameter
357 357 strds = 'table'
358 358 nDatas = nDim1
359 359 newShapes = (blocksPerFile,nDim2,nDim0)
360 360 elif mode==2: #Concatenated in a table
361 361 strds = 'table0'
362 362 arrayData = dataset[strds].value
363 363 #Selecting part of the dataset
364 364 utctime = arrayData[:,0]
365 365 u, indices = numpy.unique(utctime, return_index=True)
366 366
367 367 if blockList.size != indices.size:
368 368 indMin = indices[blockList[0]]
369 369 if blockList[1] + 1 >= indices.size:
370 370 arrayData = arrayData[indMin:,:]
371 371 else:
372 372 indMax = indices[blockList[1] + 1]
373 373 arrayData = arrayData[indMin:indMax,:]
374 374 return arrayData
375 375
376 376 # One dimension
377 377 if nDims == 0:
378 378 arrayData = dataset.value.astype(numpy.float)[0][blockList]
379 379
380 380 # Two dimensions
381 381 elif nDims == 2:
382 382 arrayData = numpy.zeros((blocksPerFile,nDim1,nDim0))
383 383 newShapes = (blocksPerFile,nDim0)
384 384 nDatas = nDim1
385 385
386 386 for i in range(nDatas):
387 387 data = dataset[strds + str(i)].value
388 388 arrayData[:,i,:] = data[blockList,:]
389 389
390 390 # Three dimensions
391 391 else:
392 392 arrayData = numpy.zeros((blocksPerFile,nDim2,nDim1,nDim0))
393 393 for i in range(nDatas):
394 394
395 395 data = dataset[strds + str(i)].value
396 396
397 397 for b in range(blockList.size):
398 398 arrayData[b,:,i,:] = data[:,:,blockList[b]]
399 399
400 400 return arrayData
401 401
402 402 def __setDataOut(self):
403 403 listMeta = self.listMeta
404 404 listMetaname = self.listMetaname
405 405 listDataname = self.listDataname
406 406 listData = self.listData
407 407 listShapes = self.listShapes
408 408
409 409 blockIndex = self.blockIndex
410 410 # blockList = self.blockList
411 411
412 412 for i in range(len(listMeta)):
413 413 setattr(self.dataOut,listMetaname[i],listMeta[i])
414 414
415 415 for j in range(len(listData)):
416 416 nShapes = listShapes[listDataname[j]][0]
417 417 mode = listShapes[listDataname[j]][4]
418 418 if nShapes == 1:
419 419 setattr(self.dataOut,listDataname[j],listData[j][blockIndex])
420 420 elif nShapes > 1:
421 421 setattr(self.dataOut,listDataname[j],listData[j][blockIndex,:])
422 422 elif mode==0:
423 423 setattr(self.dataOut,listDataname[j],listData[j][blockIndex])
424 424 #Mode Meteors
425 425 elif mode ==2:
426 426 selectedData = self.__selectDataMode2(listData[j], blockIndex)
427 427 setattr(self.dataOut, listDataname[j], selectedData)
428 428 return
429 429
430 430 def __selectDataMode2(self, data, blockIndex):
431 431 utctime = data[:,0]
432 432 aux, indices = numpy.unique(utctime, return_inverse=True)
433 433 selInd = numpy.where(indices == blockIndex)[0]
434 434 selData = data[selInd,:]
435 435
436 436 return selData
437 437
438 438 def getData(self):
439 439
440 440 if self.blockIndex==self.blocksPerFile:
441 441 if not( self.__setNextFileOffline() ):
442 442 self.dataOut.flagNoData = True
443 443 return 0
444 444
445 445 self.__setDataOut()
446 446 self.dataOut.flagNoData = False
447 447
448 448 self.blockIndex += 1
449 449
450 450 return
451 451
452 452 def run(self, **kwargs):
453 453
454 454 if not(self.isConfig):
455 455 self.setup(**kwargs)
456 456 self.isConfig = True
457 457
458 458 self.getData()
459 459
460 460 return
461 461
462 462 @MPDecorator
463 463 class ParamWriter(Operation):
464 464 '''
465 465 HDF5 Writer, stores parameters data in HDF5 format files
466 466
467 467 path: path where the files will be stored
468 468 blocksPerFile: number of blocks that will be saved in per HDF5 format file
469 469 mode: selects the data stacking mode: '0' channels, '1' parameters, '3' table (for meteors)
470 470 metadataList: list of attributes that will be stored as metadata
471 471 dataList: list of attributes that will be stores as data
472 472 '''
473 473
474 474 ext = ".hdf5"
475 475 optchar = "D"
476 476 metaoptchar = "M"
477 477 metaFile = None
478 478 filename = None
479 479 path = None
480 480 setFile = None
481 481 fp = None
482 482 grp = None
483 483 ds = None
484 484 firsttime = True
485 485 #Configurations
486 486 blocksPerFile = None
487 487 blockIndex = None
488 488 dataOut = None
489 489 #Data Arrays
490 490 dataList = None
491 491 metadataList = None
492 492 dsList = None #List of dictionaries with dataset properties
493 493 tableDim = None
494 494 dtype = [('arrayName', 'S20'),('nDimensions', 'i'), ('dim2', 'i'), ('dim1', 'i'),('dim0', 'i'),('mode', 'b')]
495 495 currentDay = None
496 496 lastTime = None
497 497 setType = None
498 498
499 499 def __init__(self):
500 500
501 501 Operation.__init__(self)
502 502 return
503 503
504 504 def setup(self, dataOut, path=None, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None):
505 505 self.path = path
506 506 self.blocksPerFile = blocksPerFile
507 507 self.metadataList = metadataList
508 508 self.dataList = dataList
509 509 self.dataOut = dataOut
510 510 self.mode = mode
511 511 if self.mode is not None:
512 512 self.mode = numpy.zeros(len(self.dataList)) + mode
513 513 else:
514 514 self.mode = numpy.ones(len(self.dataList))
515 515
516 516 self.setType = setType
517 517
518 518 arrayDim = numpy.zeros((len(self.dataList),5))
519 519
520 520 #Table dimensions
521 521 dtype0 = self.dtype
522 522 tableList = []
523 523
524 524 #Dictionary and list of tables
525 525 dsList = []
526 526
527 527 for i in range(len(self.dataList)):
528 528 dsDict = {}
529 529 dataAux = getattr(self.dataOut, self.dataList[i])
530 530 dsDict['variable'] = self.dataList[i]
531 531 #--------------------- Conditionals ------------------------
532 532 #There is no data
533 533
534 534 if dataAux is None:
535 535
536 536 return 0
537 537
538 538 if isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
539 539 dsDict['mode'] = 0
540 540 dsDict['nDim'] = 0
541 541 arrayDim[i,0] = 0
542 542 dsList.append(dsDict)
543 543
544 544 #Mode 2: meteors
545 545 elif self.mode[i] == 2:
546 546 dsDict['dsName'] = 'table0'
547 547 dsDict['mode'] = 2 # Mode meteors
548 548 dsDict['shape'] = dataAux.shape[-1]
549 549 dsDict['nDim'] = 0
550 550 dsDict['dsNumber'] = 1
551 551 arrayDim[i,3] = dataAux.shape[-1]
552 552 arrayDim[i,4] = self.mode[i] #Mode the data was stored
553 553 dsList.append(dsDict)
554 554
555 555 #Mode 1
556 556 else:
557 557 arrayDim0 = dataAux.shape #Data dimensions
558 558 arrayDim[i,0] = len(arrayDim0) #Number of array dimensions
559 559 arrayDim[i,4] = self.mode[i] #Mode the data was stored
560 560 strtable = 'table'
561 561 dsDict['mode'] = 1 # Mode parameters
562 562
563 563 # Three-dimension arrays
564 564 if len(arrayDim0) == 3:
565 565 arrayDim[i,1:-1] = numpy.array(arrayDim0)
566 566 nTables = int(arrayDim[i,2])
567 567 dsDict['dsNumber'] = nTables
568 568 dsDict['shape'] = arrayDim[i,2:4]
569 569 dsDict['nDim'] = 3
570 570
571 571 for j in range(nTables):
572 572 dsDict = dsDict.copy()
573 573 dsDict['dsName'] = strtable + str(j)
574 574 dsList.append(dsDict)
575 575
576 576 # Two-dimension arrays
577 577 elif len(arrayDim0) == 2:
578 578 arrayDim[i,2:-1] = numpy.array(arrayDim0)
579 579 nTables = int(arrayDim[i,2])
580 580 dsDict['dsNumber'] = nTables
581 581 dsDict['shape'] = arrayDim[i,3]
582 582 dsDict['nDim'] = 2
583 583
584 584 for j in range(nTables):
585 585 dsDict = dsDict.copy()
586 586 dsDict['dsName'] = strtable + str(j)
587 587 dsList.append(dsDict)
588 588
589 589 # One-dimension arrays
590 590 elif len(arrayDim0) == 1:
591 591 arrayDim[i,3] = arrayDim0[0]
592 592 dsDict['shape'] = arrayDim0[0]
593 593 dsDict['dsNumber'] = 1
594 594 dsDict['dsName'] = strtable + str(0)
595 595 dsDict['nDim'] = 1
596 596 dsList.append(dsDict)
597 597
598 598 table = numpy.array((self.dataList[i],) + tuple(arrayDim[i,:]),dtype = dtype0)
599 599 tableList.append(table)
600 600
601 601 self.dsList = dsList
602 602 self.tableDim = numpy.array(tableList, dtype = dtype0)
603 603 self.blockIndex = 0
604 604 timeTuple = time.localtime(dataOut.utctime)
605 605 self.currentDay = timeTuple.tm_yday
606 606
607 607 def putMetadata(self):
608 608
609 609 fp = self.createMetadataFile()
610 610 self.writeMetadata(fp)
611 611 fp.close()
612 612 return
613 613
614 614 def createMetadataFile(self):
615 615 ext = self.ext
616 616 path = self.path
617 617 setFile = self.setFile
618 618
619 619 timeTuple = time.localtime(self.dataOut.utctime)
620 620
621 621 subfolder = ''
622 622 fullpath = os.path.join( path, subfolder )
623 623
624 624 if not( os.path.exists(fullpath) ):
625 625 os.mkdir(fullpath)
626 626 setFile = -1 #inicializo mi contador de seteo
627 627
628 628 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
629 629 fullpath = os.path.join( path, subfolder )
630 630
631 631 if not( os.path.exists(fullpath) ):
632 632 os.mkdir(fullpath)
633 633 setFile = -1 #inicializo mi contador de seteo
634 634
635 635 else:
636 636 filesList = os.listdir( fullpath )
637 637 filesList = sorted( filesList, key=str.lower )
638 638 if len( filesList ) > 0:
639 639 filesList = [k for k in filesList if k.startswith(self.metaoptchar)]
640 640 filen = filesList[-1]
641 641 # el filename debera tener el siguiente formato
642 642 # 0 1234 567 89A BCDE (hex)
643 643 # x YYYY DDD SSS .ext
644 644 if isNumber( filen[8:11] ):
645 645 setFile = int( filen[8:11] ) #inicializo mi contador de seteo al seteo del ultimo file
646 646 else:
647 647 setFile = -1
648 648 else:
649 649 setFile = -1 #inicializo mi contador de seteo
650 650
651 651 if self.setType is None:
652 652 setFile += 1
653 653 file = '%s%4.4d%3.3d%03d%s' % (self.metaoptchar,
654 654 timeTuple.tm_year,
655 655 timeTuple.tm_yday,
656 656 setFile,
657 657 ext )
658 658 else:
659 659 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
660 660 file = '%s%4.4d%3.3d%04d%s' % (self.metaoptchar,
661 661 timeTuple.tm_year,
662 662 timeTuple.tm_yday,
663 663 setFile,
664 664 ext )
665 665
666 666 filename = os.path.join( path, subfolder, file )
667 667 self.metaFile = file
668 668 #Setting HDF5 File
669 669 fp = h5py.File(filename,'w')
670 670
671 671 return fp
672 672
673 673 def writeMetadata(self, fp):
674 674
675 675 grp = fp.create_group("Metadata")
676 676 grp.create_dataset('array dimensions', data = self.tableDim, dtype = self.dtype)
677 677
678 678 for i in range(len(self.metadataList)):
679 679 grp.create_dataset(self.metadataList[i], data=getattr(self.dataOut, self.metadataList[i]))
680 680 return
681 681
682 682 def timeFlag(self):
683 683 currentTime = self.dataOut.utctime
684 684
685 685 if self.lastTime is None:
686 686 self.lastTime = currentTime
687 687
688 688 #Day
689 689 timeTuple = time.localtime(currentTime)
690 690 dataDay = timeTuple.tm_yday
691 691
692 692 #Time
693 693 timeDiff = currentTime - self.lastTime
694 694
695 695 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
696 696 if dataDay != self.currentDay:
697 697 self.currentDay = dataDay
698 698 return True
699 699 elif timeDiff > 3*60*60:
700 700 self.lastTime = currentTime
701 701 return True
702 702 else:
703 703 self.lastTime = currentTime
704 704 return False
705 705
706 706 def setNextFile(self):
707 707
708 708 ext = self.ext
709 709 path = self.path
710 710 setFile = self.setFile
711 711 mode = self.mode
712 712
713 713 timeTuple = time.localtime(self.dataOut.utctime)
714 714 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
715 715
716 716 fullpath = os.path.join( path, subfolder )
717 717
718 718 if os.path.exists(fullpath):
719 719 filesList = os.listdir( fullpath )
720 filesList = [k for k in filesList if 'M' in k]
720 ##filesList = [k for k in filesList if 'M' in k]
721 721 if len( filesList ) > 0:
722 722 filesList = sorted( filesList, key=str.lower )
723 723 filen = filesList[-1]
724 724 # el filename debera tener el siguiente formato
725 725 # 0 1234 567 89A BCDE (hex)
726 726 # x YYYY DDD SSS .ext
727 727 if isNumber( filen[8:11] ):
728 728 setFile = int( filen[8:11] ) #inicializo mi contador de seteo al seteo del ultimo file
729 729 else:
730 730 setFile = -1
731 731 else:
732 732 setFile = -1 #inicializo mi contador de seteo
733 733 else:
734 734 os.makedirs(fullpath)
735 735 setFile = -1 #inicializo mi contador de seteo
736 736
737 737 if self.setType is None:
738 738 setFile += 1
739 739 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
740 740 timeTuple.tm_year,
741 741 timeTuple.tm_yday,
742 742 setFile,
743 743 ext )
744 744 else:
745 745 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
746 746 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
747 747 timeTuple.tm_year,
748 748 timeTuple.tm_yday,
749 749 setFile,
750 750 ext )
751 751
752 752 filename = os.path.join( path, subfolder, file )
753 753
754 754 #Setting HDF5 File
755 755 fp = h5py.File(filename,'w')
756 756 #write metadata
757 757 self.writeMetadata(fp)
758 758 #Write data
759 759 grp = fp.create_group("Data")
760 760 ds = []
761 761 data = []
762 762 dsList = self.dsList
763 763 i = 0
764 764 while i < len(dsList):
765 765 dsInfo = dsList[i]
766 766 #One-dimension data
767 767 if dsInfo['mode'] == 0:
768 768 ds0 = grp.create_dataset(dsInfo['variable'], (1,1), maxshape=(1,self.blocksPerFile) , chunks = True, dtype=numpy.float64)
769 769 ds.append(ds0)
770 770 data.append([])
771 771 i += 1
772 772 continue
773 773
774 774 elif dsInfo['mode'] == 2:
775 775 grp0 = grp.create_group(dsInfo['variable'])
776 776 ds0 = grp0.create_dataset(dsInfo['dsName'], (1,dsInfo['shape']), data = numpy.zeros((1,dsInfo['shape'])) , maxshape=(None,dsInfo['shape']), chunks=True)
777 777 ds.append(ds0)
778 778 data.append([])
779 779 i += 1
780 780 continue
781 781
782 782 elif dsInfo['mode'] == 1:
783 783 grp0 = grp.create_group(dsInfo['variable'])
784 784
785 785 for j in range(dsInfo['dsNumber']):
786 786 dsInfo = dsList[i]
787 787 tableName = dsInfo['dsName']
788 788
789 789
790 790 if dsInfo['nDim'] == 3:
791 791 shape = dsInfo['shape'].astype(int)
792 792 ds0 = grp0.create_dataset(tableName, (shape[0],shape[1],1) , data = numpy.zeros((shape[0],shape[1],1)), maxshape = (None,shape[1],None), chunks=True)
793 793 else:
794 794 shape = int(dsInfo['shape'])
795 795 ds0 = grp0.create_dataset(tableName, (1,shape), data = numpy.zeros((1,shape)) , maxshape=(None,shape), chunks=True)
796 796
797 797 ds.append(ds0)
798 798 data.append([])
799 799 i += 1
800 800
801 801 fp.flush()
802 802 fp.close()
803 803
804 804 log.log('creating file: {}'.format(filename), 'Writing')
805 805 self.filename = filename
806 806 self.ds = ds
807 807 self.data = data
808 808 self.firsttime = True
809 809 self.blockIndex = 0
810 810 return
811 811
812 812 def putData(self):
813 813
814 814 if self.blockIndex == self.blocksPerFile or self.timeFlag():
815 815 self.setNextFile()
816 816
817 817 self.readBlock()
818 818 self.setBlock() #Prepare data to be written
819 819 self.writeBlock() #Write data
820 820
821 821 return
822 822
823 823 def readBlock(self):
824 824
825 825 '''
826 826 data Array configured
827 827
828 828
829 829 self.data
830 830 '''
831 831 dsList = self.dsList
832 832 ds = self.ds
833 833 #Setting HDF5 File
834 834 fp = h5py.File(self.filename,'r+')
835 835 grp = fp["Data"]
836 836 ind = 0
837 837
838 838 while ind < len(dsList):
839 839 dsInfo = dsList[ind]
840 840
841 841 if dsInfo['mode'] == 0:
842 842 ds0 = grp[dsInfo['variable']]
843 843 ds[ind] = ds0
844 844 ind += 1
845 845 else:
846 846
847 847 grp0 = grp[dsInfo['variable']]
848 848
849 849 for j in range(dsInfo['dsNumber']):
850 850 dsInfo = dsList[ind]
851 851 ds0 = grp0[dsInfo['dsName']]
852 852 ds[ind] = ds0
853 853 ind += 1
854 854
855 855 self.fp = fp
856 856 self.grp = grp
857 857 self.ds = ds
858 858
859 859 return
860 860
861 861 def setBlock(self):
862 862 '''
863 863 data Array configured
864 864
865 865
866 866 self.data
867 867 '''
868 868 #Creating Arrays
869 869 dsList = self.dsList
870 870 data = self.data
871 871 ind = 0
872 872
873 873 while ind < len(dsList):
874 874 dsInfo = dsList[ind]
875 875 dataAux = getattr(self.dataOut, dsInfo['variable'])
876 876
877 877 mode = dsInfo['mode']
878 878 nDim = dsInfo['nDim']
879 879
880 880 if mode == 0 or mode == 2 or nDim == 1:
881 881 data[ind] = dataAux
882 882 ind += 1
883 883 # elif nDim == 1:
884 884 # data[ind] = numpy.reshape(dataAux,(numpy.size(dataAux),1))
885 885 # ind += 1
886 886 elif nDim == 2:
887 887 for j in range(dsInfo['dsNumber']):
888 888 data[ind] = dataAux[j,:]
889 889 ind += 1
890 890 elif nDim == 3:
891 891 for j in range(dsInfo['dsNumber']):
892 892 data[ind] = dataAux[:,j,:]
893 893 ind += 1
894 894
895 895 self.data = data
896 896 return
897 897
898 898 def writeBlock(self):
899 899 '''
900 900 Saves the block in the HDF5 file
901 901 '''
902 902 dsList = self.dsList
903 903
904 904 for i in range(len(self.ds)):
905 905 dsInfo = dsList[i]
906 906 nDim = dsInfo['nDim']
907 907 mode = dsInfo['mode']
908 908
909 909 # First time
910 910 if self.firsttime:
911 911 if type(self.data[i]) == numpy.ndarray:
912 912
913 913 if nDim == 3:
914 914 self.data[i] = self.data[i].reshape((self.data[i].shape[0],self.data[i].shape[1],1))
915 915 self.ds[i].resize(self.data[i].shape)
916 916 if mode == 2:
917 917 self.ds[i].resize(self.data[i].shape)
918 918 self.ds[i][:] = self.data[i]
919 919 else:
920 920
921 921 # From second time
922 922 # Meteors!
923 923 if mode == 2:
924 924 dataShape = self.data[i].shape
925 925 dsShape = self.ds[i].shape
926 926 self.ds[i].resize((self.ds[i].shape[0] + dataShape[0],self.ds[i].shape[1]))
927 927 self.ds[i][dsShape[0]:,:] = self.data[i]
928 928 # No dimension
929 929 elif mode == 0:
930 930 self.ds[i].resize((self.ds[i].shape[0], self.ds[i].shape[1] + 1))
931 931 self.ds[i][0,-1] = self.data[i]
932 932 # One dimension
933 933 elif nDim == 1:
934 934 self.ds[i].resize((self.ds[i].shape[0] + 1, self.ds[i].shape[1]))
935 935 self.ds[i][-1,:] = self.data[i]
936 936 # Two dimension
937 937 elif nDim == 2:
938 938 self.ds[i].resize((self.ds[i].shape[0] + 1,self.ds[i].shape[1]))
939 939 self.ds[i][self.blockIndex,:] = self.data[i]
940 940 # Three dimensions
941 941 elif nDim == 3:
942 942 self.ds[i].resize((self.ds[i].shape[0],self.ds[i].shape[1],self.ds[i].shape[2]+1))
943 943 self.ds[i][:,:,-1] = self.data[i]
944 944
945 945 self.firsttime = False
946 946 self.blockIndex += 1
947 947
948 948 #Close to save changes
949 949 self.fp.flush()
950 950 self.fp.close()
951 951 return
952 952
953 953 def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None):
954 954
955 955 self.dataOut = dataOut
956 956 if not(self.isConfig):
957 957 self.setup(dataOut, path=path, blocksPerFile=blocksPerFile,
958 958 metadataList=metadataList, dataList=dataList, mode=mode,
959 959 setType=setType)
960 960
961 961 self.isConfig = True
962 962 self.setNextFile()
963 963
964 964 self.putData()
965 965 return
966 966
967 967
968 968 @MPDecorator
969 969 class ParameterReader(Reader, ProcessingUnit):
970 970 '''
971 971 Reads HDF5 format files
972 972 '''
973 973
974 974 def __init__(self):
975 975 ProcessingUnit.__init__(self)
976 976 self.dataOut = Parameters()
977 977 self.ext = ".hdf5"
978 978 self.optchar = "D"
979 979 self.timezone = "lt"
980 980 self.listMetaname = []
981 981 self.listMeta = []
982 982 self.listDataname = []
983 983 self.listData = []
984 984 self.listShapes = []
985 985 self.open_file = h5py.File
986 986 self.open_mode = 'r'
987 987 self.metadata = False
988 988 self.filefmt = "*%Y%j***"
989 989 self.folderfmt = "*%Y%j"
990 990
991 991 def setup(self, **kwargs):
992 992
993 993 self.set_kwargs(**kwargs)
994 994 if not self.ext.startswith('.'):
995 995 self.ext = '.{}'.format(self.ext)
996 996
997 997 if self.online:
998 998 log.log("Searching files in online mode...", self.name)
999 999
1000 1000 for nTries in range(self.nTries):
1001 1001 fullpath = self.searchFilesOnLine(self.path, self.startDate,
1002 1002 self.endDate, self.expLabel, self.ext, self.walk,
1003 1003 self.filefmt, self.folderfmt)
1004 1004
1005 1005 try:
1006 1006 fullpath = next(fullpath)
1007 1007 except:
1008 1008 fullpath = None
1009 1009
1010 1010 if fullpath:
1011 1011 break
1012 1012
1013 1013 log.warning(
1014 1014 'Waiting {} sec for a valid file in {}: try {} ...'.format(
1015 1015 self.delay, self.path, nTries + 1),
1016 1016 self.name)
1017 1017 time.sleep(self.delay)
1018 1018
1019 1019 if not(fullpath):
1020 1020 raise schainpy.admin.SchainError(
1021 1021 'There isn\'t any valid file in {}'.format(self.path))
1022 1022
1023 1023 pathname, filename = os.path.split(fullpath)
1024 1024 self.year = int(filename[1:5])
1025 1025 self.doy = int(filename[5:8])
1026 1026 self.set = int(filename[8:11]) - 1
1027 1027 else:
1028 1028 log.log("Searching files in {}".format(self.path), self.name)
1029 1029 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
1030 1030 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
1031 1031
1032 1032 self.setNextFile()
1033 1033
1034 1034 return
1035 1035
1036 1036 def readFirstHeader(self):
1037 1037 '''Read metadata and data'''
1038 1038
1039 1039 self.__readMetadata()
1040 1040 self.__readData()
1041 1041 self.__setBlockList()
1042 1042 self.blockIndex = 0
1043 1043
1044 1044 return
1045 1045
1046 1046 def __setBlockList(self):
1047 1047 '''
1048 1048 Selects the data within the times defined
1049 1049
1050 1050 self.fp
1051 1051 self.startTime
1052 1052 self.endTime
1053 1053 self.blockList
1054 1054 self.blocksPerFile
1055 1055
1056 1056 '''
1057 1057
1058 1058 startTime = self.startTime
1059 1059 endTime = self.endTime
1060 1060
1061 1061 index = self.listDataname.index('utctime')
1062 1062 thisUtcTime = self.listData[index]
1063 1063 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
1064 1064
1065 1065 if self.timezone == 'lt':
1066 1066 thisUtcTime -= 5*3600
1067 1067
1068 1068 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
1069 1069
1070 1070 thisDate = thisDatetime.date()
1071 1071 thisTime = thisDatetime.time()
1072 1072
1073 1073 startUtcTime = (datetime.datetime.combine(thisDate,startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
1074 1074 endUtcTime = (datetime.datetime.combine(thisDate,endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
1075 1075
1076 1076 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
1077 1077
1078 1078 self.blockList = ind
1079 1079 self.blocksPerFile = len(ind)
1080 1080 return
1081 1081
1082 1082 def __readMetadata(self):
1083 1083 '''
1084 1084 Reads Metadata
1085 1085 '''
1086 1086
1087 1087 listMetaname = []
1088 1088 listMetadata = []
1089 1089 if 'Metadata' in self.fp:
1090 1090 gp = self.fp['Metadata']
1091 1091 for item in list(gp.items()):
1092 1092 name = item[0]
1093 1093
1094 1094 if name=='variables':
1095 1095 table = gp[name][:]
1096 1096 listShapes = {}
1097 1097 for shapes in table:
1098 1098 listShapes[shapes[0].decode()] = numpy.array([shapes[1]])
1099 1099 else:
1100 1100 data = gp[name].value
1101 1101 listMetaname.append(name)
1102 1102 listMetadata.append(data)
1103 1103 elif self.metadata:
1104 1104 metadata = json.loads(self.metadata)
1105 1105 listShapes = {}
1106 1106 for tup in metadata:
1107 1107 name, values, dim = tup
1108 1108 if dim == -1:
1109 1109 listMetaname.append(name)
1110 1110 listMetadata.append(self.fp[values].value)
1111 1111 else:
1112 1112 listShapes[name] = numpy.array([dim])
1113 1113 else:
1114 1114 raise IOError('Missing Metadata group in file or metadata info')
1115 1115
1116 1116 self.listShapes = listShapes
1117 1117 self.listMetaname = listMetaname
1118 1118 self.listMeta = listMetadata
1119 1119
1120 1120 return
1121 1121
1122 1122 def __readData(self):
1123 1123
1124 1124 listdataname = []
1125 1125 listdata = []
1126 1126
1127 1127 if 'Data' in self.fp:
1128 1128 grp = self.fp['Data']
1129 1129 for item in list(grp.items()):
1130 1130 name = item[0]
1131 1131 listdataname.append(name)
1132 1132 dim = self.listShapes[name][0]
1133 1133 if dim == 0:
1134 1134 array = grp[name].value
1135 1135 else:
1136 1136 array = []
1137 1137 for i in range(dim):
1138 1138 array.append(grp[name]['table{:02d}'.format(i)].value)
1139 1139 array = numpy.array(array)
1140 1140
1141 1141 listdata.append(array)
1142 1142 elif self.metadata:
1143 1143 metadata = json.loads(self.metadata)
1144 1144 for tup in metadata:
1145 1145 name, values, dim = tup
1146 1146 listdataname.append(name)
1147 1147 if dim == -1:
1148 1148 continue
1149 1149 elif dim == 0:
1150 1150 array = self.fp[values].value
1151 1151 else:
1152 1152 array = []
1153 1153 for var in values:
1154 1154 array.append(self.fp[var].value)
1155 1155 array = numpy.array(array)
1156 1156 listdata.append(array)
1157 1157 else:
1158 1158 raise IOError('Missing Data group in file or metadata info')
1159 1159
1160 1160 self.listDataname = listdataname
1161 1161 self.listData = listdata
1162 1162 return
1163 1163
1164 1164 def getData(self):
1165 1165
1166 1166 for i in range(len(self.listMeta)):
1167 1167 setattr(self.dataOut, self.listMetaname[i], self.listMeta[i])
1168 1168
1169 1169 for j in range(len(self.listData)):
1170 1170 dim = self.listShapes[self.listDataname[j]][0]
1171 1171 if dim == 0:
1172 1172 setattr(self.dataOut, self.listDataname[j], self.listData[j][self.blockIndex])
1173 1173 else:
1174 1174 setattr(self.dataOut, self.listDataname[j], self.listData[j][:,self.blockIndex])
1175 1175
1176 1176 self.dataOut.paramInterval = self.interval
1177 1177 self.dataOut.flagNoData = False
1178 1178 self.blockIndex += 1
1179 1179
1180 1180 return
1181 1181
1182 1182 def run(self, **kwargs):
1183 1183
1184 1184 if not(self.isConfig):
1185 1185 self.setup(**kwargs)
1186 1186 self.isConfig = True
1187 1187
1188 1188 if self.blockIndex == self.blocksPerFile:
1189 1189 self.setNextFile()
1190 1190
1191 1191 self.getData()
1192 1192
1193 1193 return
1194 1194
1195 1195 @MPDecorator
1196 1196 class ParameterWriter(Operation):
1197 1197 '''
1198 1198 HDF5 Writer, stores parameters data in HDF5 format files
1199 1199
1200 1200 path: path where the files will be stored
1201 1201 blocksPerFile: number of blocks that will be saved in per HDF5 format file
1202 1202 mode: selects the data stacking mode: '0' channels, '1' parameters, '3' table (for meteors)
1203 1203 metadataList: list of attributes that will be stored as metadata
1204 1204 dataList: list of attributes that will be stores as data
1205 1205 '''
1206 1206
1207 1207
1208 1208 ext = ".hdf5"
1209 1209 optchar = "D"
1210 1210 metaoptchar = "M"
1211 1211 metaFile = None
1212 1212 filename = None
1213 1213 path = None
1214 1214 setFile = None
1215 1215 fp = None
1216 1216 grp = None
1217 1217 ds = None
1218 1218 firsttime = True
1219 1219 #Configurations
1220 1220 blocksPerFile = None
1221 1221 blockIndex = None
1222 1222 dataOut = None
1223 1223 #Data Arrays
1224 1224 dataList = None
1225 1225 metadataList = None
1226 1226 dsList = None #List of dictionaries with dataset properties
1227 1227 tableDim = None
1228 1228 dtype = [('name', 'S20'),('nDim', 'i')]
1229 1229 currentDay = None
1230 1230 lastTime = None
1231 1231
1232 1232 def __init__(self):
1233 1233
1234 1234 Operation.__init__(self)
1235 1235 return
1236 1236
1237 1237 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None):
1238 1238 self.path = path
1239 1239 self.blocksPerFile = blocksPerFile
1240 1240 self.metadataList = metadataList
1241 1241 self.dataList = dataList
1242 1242 self.setType = setType
1243 1243
1244 1244 tableList = []
1245 1245 dsList = []
1246 1246
1247 1247 for i in range(len(self.dataList)):
1248 1248 dsDict = {}
1249 1249 dataAux = getattr(self.dataOut, self.dataList[i])
1250 1250 dsDict['variable'] = self.dataList[i]
1251 1251
1252 1252 if dataAux is None:
1253 1253 continue
1254 1254 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
1255 1255 dsDict['nDim'] = 0
1256 1256 else:
1257 1257 dsDict['nDim'] = len(dataAux.shape)
1258 1258 dsDict['shape'] = dataAux.shape
1259 1259 dsDict['dsNumber'] = dataAux.shape[0]
1260 1260
1261 1261 dsList.append(dsDict)
1262 1262 tableList.append((self.dataList[i], dsDict['nDim']))
1263 1263
1264 1264 self.dsList = dsList
1265 1265 self.tableDim = numpy.array(tableList, dtype=self.dtype)
1266 1266 self.currentDay = self.dataOut.datatime.date()
1267 1267
1268 1268 def timeFlag(self):
1269 1269 currentTime = self.dataOut.utctime
1270 1270 timeTuple = time.localtime(currentTime)
1271 1271 dataDay = timeTuple.tm_yday
1272 1272
1273 1273 if self.lastTime is None:
1274 1274 self.lastTime = currentTime
1275 1275 self.currentDay = dataDay
1276 1276 return False
1277 1277
1278 1278 timeDiff = currentTime - self.lastTime
1279 1279
1280 1280 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
1281 1281 if dataDay != self.currentDay:
1282 1282 self.currentDay = dataDay
1283 1283 return True
1284 1284 elif timeDiff > 3*60*60:
1285 1285 self.lastTime = currentTime
1286 1286 return True
1287 1287 else:
1288 1288 self.lastTime = currentTime
1289 1289 return False
1290 1290
1291 1291 def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, setType=None):
1292 1292
1293 1293 self.dataOut = dataOut
1294 1294 if not(self.isConfig):
1295 1295 self.setup(path=path, blocksPerFile=blocksPerFile,
1296 1296 metadataList=metadataList, dataList=dataList,
1297 1297 setType=setType)
1298 1298
1299 1299 self.isConfig = True
1300 1300 self.setNextFile()
1301 1301
1302 1302 self.putData()
1303 1303 return
1304 1304
1305 1305 def setNextFile(self):
1306 1306
1307 1307 ext = self.ext
1308 1308 path = self.path
1309 1309 setFile = self.setFile
1310 1310
1311 1311 timeTuple = time.localtime(self.dataOut.utctime)
1312 1312 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
1313 1313 fullpath = os.path.join(path, subfolder)
1314 1314
1315 1315 if os.path.exists(fullpath):
1316 1316 filesList = os.listdir(fullpath)
1317 1317 filesList = [k for k in filesList if k.startswith(self.optchar)]
1318 1318 if len( filesList ) > 0:
1319 1319 filesList = sorted(filesList, key=str.lower)
1320 1320 filen = filesList[-1]
1321 1321 # el filename debera tener el siguiente formato
1322 1322 # 0 1234 567 89A BCDE (hex)
1323 1323 # x YYYY DDD SSS .ext
1324 1324 if isNumber(filen[8:11]):
1325 1325 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
1326 1326 else:
1327 1327 setFile = -1
1328 1328 else:
1329 1329 setFile = -1 #inicializo mi contador de seteo
1330 1330 else:
1331 1331 os.makedirs(fullpath)
1332 1332 setFile = -1 #inicializo mi contador de seteo
1333 1333
1334 1334 if self.setType is None:
1335 1335 setFile += 1
1336 1336 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
1337 1337 timeTuple.tm_year,
1338 1338 timeTuple.tm_yday,
1339 1339 setFile,
1340 1340 ext )
1341 1341 else:
1342 1342 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
1343 1343 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
1344 1344 timeTuple.tm_year,
1345 1345 timeTuple.tm_yday,
1346 1346 setFile,
1347 1347 ext )
1348 1348
1349 1349 self.filename = os.path.join( path, subfolder, file )
1350 1350
1351 1351 #Setting HDF5 File
1352 1352 self.fp = h5py.File(self.filename, 'w')
1353 1353 #write metadata
1354 1354 self.writeMetadata(self.fp)
1355 1355 #Write data
1356 1356 self.writeData(self.fp)
1357 1357
1358 1358 def writeMetadata(self, fp):
1359 1359
1360 1360 grp = fp.create_group("Metadata")
1361 1361 grp.create_dataset('variables', data=self.tableDim, dtype=self.dtype)
1362 1362
1363 1363 for i in range(len(self.metadataList)):
1364 1364 if not hasattr(self.dataOut, self.metadataList[i]):
1365 1365 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
1366 1366 continue
1367 1367 value = getattr(self.dataOut, self.metadataList[i])
1368 1368 grp.create_dataset(self.metadataList[i], data=value)
1369 1369 return
1370 1370
1371 1371 def writeData(self, fp):
1372 1372
1373 1373 grp = fp.create_group("Data")
1374 1374 dtsets = []
1375 1375 data = []
1376 1376
1377 1377 for dsInfo in self.dsList:
1378 1378 if dsInfo['nDim'] == 0:
1379 1379 ds = grp.create_dataset(
1380 1380 dsInfo['variable'],
1381 1381 (self.blocksPerFile, ),
1382 1382 chunks=True,
1383 1383 dtype=numpy.float64)
1384 1384 dtsets.append(ds)
1385 1385 data.append((dsInfo['variable'], -1))
1386 1386 else:
1387 1387 sgrp = grp.create_group(dsInfo['variable'])
1388 1388 for i in range(dsInfo['dsNumber']):
1389 1389 ds = sgrp.create_dataset(
1390 1390 'table{:02d}'.format(i),
1391 1391 (self.blocksPerFile, ) + dsInfo['shape'][1:],
1392 1392 chunks=True)
1393 1393 dtsets.append(ds)
1394 1394 data.append((dsInfo['variable'], i))
1395 1395 fp.flush()
1396 1396
1397 1397 log.log('Creating file: {}'.format(fp.filename), self.name)
1398 1398
1399 1399 self.ds = dtsets
1400 1400 self.data = data
1401 1401 self.firsttime = True
1402 1402 self.blockIndex = 0
1403 1403 return
1404 1404
1405 1405 def putData(self):
1406 1406
1407 1407 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
1408 1408 self.closeFile()
1409 1409 self.setNextFile()
1410 1410
1411 1411 for i, ds in enumerate(self.ds):
1412 1412 attr, ch = self.data[i]
1413 1413 if ch == -1:
1414 1414 ds[self.blockIndex] = getattr(self.dataOut, attr)
1415 1415 else:
1416 1416 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
1417 1417
1418 1418 self.fp.flush()
1419 1419 self.blockIndex += 1
1420 1420 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
1421 1421
1422 1422 return
1423 1423
1424 1424 def closeFile(self):
1425 1425
1426 1426 if self.blockIndex != self.blocksPerFile:
1427 1427 for ds in self.ds:
1428 1428 ds.resize(self.blockIndex, axis=0)
1429 1429
1430 1430 self.fp.flush()
1431 1431 self.fp.close()
1432 1432
1433 1433 def close(self):
1434 1434
1435 1435 self.closeFile()
@@ -1,429 +1,426
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14
15 15 import os
16 16 import sys
17 17 import inspect
18 18 import zmq
19 19 import time
20 20 import pickle
21 21 import traceback
22 22 try:
23 23 from queue import Queue
24 24 except:
25 25 from Queue import Queue
26 26 from threading import Thread
27 27 from multiprocessing import Process
28 28
29 29 from schainpy.utils import log
30 30
31 31
32 32 class ProcessingUnit(object):
33 33
34 34 """
35 35 Update - Jan 2018 - MULTIPROCESSING
36 36 All the "call" methods present in the previous base were removed.
37 37 The majority of operations are independant processes, thus
38 38 the decorator is in charge of communicate the operation processes
39 39 with the proccessing unit via IPC.
40 40
41 41 The constructor does not receive any argument. The remaining methods
42 42 are related with the operations to execute.
43 43
44 44
45 45 """
46 46 proc_type = 'processing'
47 47 __attrs__ = []
48 48
49 49 def __init__(self):
50 50
51 51 self.dataIn = None
52 52 self.dataOut = None
53 53 self.isConfig = False
54 54 self.operations = []
55 55 self.plots = []
56 56
57 57 def getAllowedArgs(self):
58 58 if hasattr(self, '__attrs__'):
59 59 return self.__attrs__
60 60 else:
61 61 return inspect.getargspec(self.run).args
62 62
63 63 def addOperation(self, conf, operation):
64 64 """
65 65 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
66 66 posses the id of the operation process (IPC purposes)
67 67
68 68 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
69 69 identificador asociado a este objeto.
70 70
71 71 Input:
72 72
73 73 object : objeto de la clase "Operation"
74 74
75 75 Return:
76 76
77 77 objId : identificador del objeto, necesario para comunicar con master(procUnit)
78 78 """
79 79
80 80 self.operations.append(
81 81 (operation, conf.type, conf.id, conf.getKwargs()))
82 82
83 83 if 'plot' in self.name.lower():
84 84 self.plots.append(operation.CODE)
85 85
86 86 def getOperationObj(self, objId):
87 87
88 88 if objId not in list(self.operations.keys()):
89 89 return None
90 90
91 91 return self.operations[objId]
92 92
93 93 def operation(self, **kwargs):
94 94 """
95 95 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
96 96 atributos del objeto dataOut
97 97
98 98 Input:
99 99
100 100 **kwargs : Diccionario de argumentos de la funcion a ejecutar
101 101 """
102 102
103 103 raise NotImplementedError
104 104
105 105 def setup(self):
106 106
107 107 raise NotImplementedError
108 108
109 109 def run(self):
110 110
111 111 raise NotImplementedError
112 112
113 113 def close(self):
114 114
115 115 return
116 116
117 117
118 118 class Operation(object):
119 119
120 120 """
121 121 Update - Jan 2018 - MULTIPROCESSING
122 122
123 123 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
124 124 The constructor doe snot receive any argument, neither the baseclass.
125 125
126 126
127 127 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
128 128 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
129 129 acumulacion dentro de esta clase
130 130
131 131 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
132 132
133 133 """
134 134 proc_type = 'operation'
135 135 __attrs__ = []
136 136
137 137 def __init__(self):
138 138
139 139 self.id = None
140 140 self.isConfig = False
141 141
142 142 if not hasattr(self, 'name'):
143 143 self.name = self.__class__.__name__
144 144
145 145 def getAllowedArgs(self):
146 146 if hasattr(self, '__attrs__'):
147 147 return self.__attrs__
148 148 else:
149 149 return inspect.getargspec(self.run).args
150 150
151 151 def setup(self):
152 152
153 153 self.isConfig = True
154 154
155 155 raise NotImplementedError
156 156
157 157 def run(self, dataIn, **kwargs):
158 158 """
159 159 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
160 160 atributos del objeto dataIn.
161 161
162 162 Input:
163 163
164 164 dataIn : objeto del tipo JROData
165 165
166 166 Return:
167 167
168 168 None
169 169
170 170 Affected:
171 171 __buffer : buffer de recepcion de datos.
172 172
173 173 """
174 174 if not self.isConfig:
175 175 self.setup(**kwargs)
176 176
177 177 raise NotImplementedError
178 178
179 179 def close(self):
180 180
181 181 return
182 182
183 183 class InputQueue(Thread):
184 184
185 185 '''
186 186 Class to hold input data for Proccessing Units and external Operations,
187 187 '''
188 188
189 189 def __init__(self, project_id, inputId, lock=None):
190 190
191 191 Thread.__init__(self)
192 192 self.queue = Queue()
193 193 self.project_id = project_id
194 194 self.inputId = inputId
195 195 self.lock = lock
196 196 self.islocked = False
197 197 self.size = 0
198 198
199 199 def run(self):
200 200
201 201 c = zmq.Context()
202 202 self.receiver = c.socket(zmq.SUB)
203 203 self.receiver.connect(
204 204 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
205 205 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
206 206
207 207 while True:
208 208 obj = self.receiver.recv_multipart()[1]
209 209 self.size += sys.getsizeof(obj)
210 210 self.queue.put(obj)
211 211
212 212 def get(self):
213 213
214 214 if not self.islocked and self.size/1000000 > 512:
215 215 self.lock.n.value += 1
216 216 self.islocked = True
217 217 self.lock.clear()
218 218 elif self.islocked and self.size/1000000 <= 512:
219 219 self.islocked = False
220 220 self.lock.n.value -= 1
221 221 if self.lock.n.value == 0:
222 222 self.lock.set()
223 223
224 224 obj = self.queue.get()
225 225 self.size -= sys.getsizeof(obj)
226 226 return pickle.loads(obj)
227 227
228 228
229 229 def MPDecorator(BaseClass):
230 230 """
231 231 Multiprocessing class decorator
232 232
233 233 This function add multiprocessing features to a BaseClass. Also, it handle
234 234 the communication beetween processes (readers, procUnits and operations).
235 235 """
236 236
237 237 class MPClass(BaseClass, Process):
238 238
239 239 def __init__(self, *args, **kwargs):
240 240 super(MPClass, self).__init__()
241 241 Process.__init__(self)
242 242 self.operationKwargs = {}
243 243 self.args = args
244 244 self.kwargs = kwargs
245 245 self.sender = None
246 246 self.receiver = None
247 247 self.i = 0
248 248 self.t = time.time()
249 249 self.name = BaseClass.__name__
250 250 self.__doc__ = BaseClass.__doc__
251 251
252 252 if 'plot' in self.name.lower() and not self.name.endswith('_'):
253 253 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
254 254
255 255 self.start_time = time.time()
256 256 self.id = args[0]
257 257 self.inputId = args[1]
258 258 self.project_id = args[2]
259 259 self.err_queue = args[3]
260 260 self.lock = args[4]
261 261 self.typeProc = args[5]
262 262 self.err_queue.put('#_start_#')
263 263 if self.inputId is not None:
264 264 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
265 265
266 266 def subscribe(self):
267 267 '''
268 268 Start the zmq socket receiver and subcribe to input ID.
269 269 '''
270 270
271 271 self.queue.start()
272 272
273 273 def listen(self):
274 274 '''
275 275 This function waits for objects
276 276 '''
277 277
278 278 return self.queue.get()
279 279
280 280 def set_publisher(self):
281 281 '''
282 282 This function create a zmq socket for publishing objects.
283 283 '''
284 284
285 285 time.sleep(0.5)
286 286
287 287 c = zmq.Context()
288 288 self.sender = c.socket(zmq.PUB)
289 289 self.sender.connect(
290 290 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
291 291
292 292 def publish(self, data, id):
293 293 '''
294 294 This function publish an object, to an specific topic.
295 295 It blocks publishing when receiver queue is full to avoid data loss
296 296 '''
297 297
298 298 if self.inputId is None:
299 299 self.lock.wait()
300 300 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
301
302 301 def runReader(self):
303 302 '''
304 303 Run fuction for read units
305 304 '''
306 305 while True:
307 306
308 307 try:
309 308 BaseClass.run(self, **self.kwargs)
310 309 except:
311 310 err = traceback.format_exc()
312 311 if 'No more files' in err:
313 312 log.warning('No more files to read', self.name)
314 313 else:
315 314 self.err_queue.put('{}|{}'.format(self.name, err))
316 315 self.dataOut.error = True
317 316
318 317 for op, optype, opId, kwargs in self.operations:
319 318 if optype == 'self' and not self.dataOut.flagNoData:
320 319 op(**kwargs)
321 320 elif optype == 'other' and not self.dataOut.flagNoData:
322 321 self.dataOut = op.run(self.dataOut, **self.kwargs)
323 322 elif optype == 'external':
324 323 self.publish(self.dataOut, opId)
325 324
326 325 if self.dataOut.flagNoData and not self.dataOut.error:
327 326 continue
328 327
329 328 self.publish(self.dataOut, self.id)
330
331 329 if self.dataOut.error:
332 330 break
333 331
334 332 time.sleep(0.5)
335 333
336 334 def runProc(self):
337 335 '''
338 336 Run function for proccessing units
339 337 '''
340 338
341 339 while True:
342 340 self.dataIn = self.listen()
343 341
344 342 if self.dataIn.flagNoData and self.dataIn.error is None:
345 343 continue
346 344 elif not self.dataIn.error:
347 345 try:
348 346 BaseClass.run(self, **self.kwargs)
349 347 except:
350 348 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
351 349 self.dataOut.error = True
352 350 elif self.dataIn.error:
353 351 self.dataOut.error = self.dataIn.error
354 352 self.dataOut.flagNoData = True
355 353
356 354 for op, optype, opId, kwargs in self.operations:
357 355 if optype == 'self' and not self.dataOut.flagNoData:
358 356 op(**kwargs)
359 357 elif optype == 'other' and not self.dataOut.flagNoData:
360 358 self.dataOut = op.run(self.dataOut, **kwargs)
361 359 elif optype == 'external' and not self.dataOut.flagNoData:
362 360 self.publish(self.dataOut, opId)
363 361
364 362 self.publish(self.dataOut, self.id)
365 363 for op, optype, opId, kwargs in self.operations:
366 364 if optype == 'external' and self.dataOut.error:
367 365 self.publish(self.dataOut, opId)
368 366
369 367 if self.dataOut.error:
370 368 break
371 369
372 370 time.sleep(0.5)
373 371
374 372 def runOp(self):
375 373 '''
376 374 Run function for external operations (this operations just receive data
377 375 ex: plots, writers, publishers)
378 376 '''
379 377
380 378 while True:
381 379
382 380 dataOut = self.listen()
383 381
384 382 if not dataOut.error:
385 383 try:
386 384 BaseClass.run(self, dataOut, **self.kwargs)
387 385 except:
388 386 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
389 387 dataOut.error = True
390 388 else:
391 389 break
392 390
393 391 def run(self):
394 392 if self.typeProc is "ProcUnit":
395 393
396 394 if self.inputId is not None:
397 395 self.subscribe()
398 396
399 397 self.set_publisher()
400 398
401 399 if 'Reader' not in BaseClass.__name__:
402 400 self.runProc()
403 401 else:
404 402 self.runReader()
405
406 403 elif self.typeProc is "Operation":
407 404
408 405 self.subscribe()
409 406 self.runOp()
410 407
411 408 else:
412 409 raise ValueError("Unknown type")
413 410
414 411 self.close()
415 412
416 413 def close(self):
417 414
418 415 BaseClass.close(self)
419 416 self.err_queue.put('#_end_#')
420 417
421 418 if self.sender:
422 419 self.sender.close()
423 420
424 421 if self.receiver:
425 422 self.receiver.close()
426 423
427 424 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
428 425
429 426 return MPClass
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
@@ -1,1328 +1,1327
1 1 import sys
2 2 import numpy
3 3 from scipy import interpolate
4 4 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
5 5 from schainpy.model.data.jrodata import Voltage
6 6 from schainpy.utils import log
7 7 from time import time
8 8
9 9
10 10 @MPDecorator
11 11 class VoltageProc(ProcessingUnit):
12 12
13 13 def __init__(self):
14 14
15 15 ProcessingUnit.__init__(self)
16 16
17 17 self.dataOut = Voltage()
18 18 self.flip = 1
19 19 self.setupReq = False
20 20
21 21 def run(self):
22 22
23 23 if self.dataIn.type == 'AMISR':
24 24 self.__updateObjFromAmisrInput()
25 25
26 26 if self.dataIn.type == 'Voltage':
27 27 self.dataOut.copy(self.dataIn)
28 28
29 29 # self.dataOut.copy(self.dataIn)
30 30
31 31 def __updateObjFromAmisrInput(self):
32 32
33 33 self.dataOut.timeZone = self.dataIn.timeZone
34 34 self.dataOut.dstFlag = self.dataIn.dstFlag
35 35 self.dataOut.errorCount = self.dataIn.errorCount
36 36 self.dataOut.useLocalTime = self.dataIn.useLocalTime
37 37
38 38 self.dataOut.flagNoData = self.dataIn.flagNoData
39 39 self.dataOut.data = self.dataIn.data
40 40 self.dataOut.utctime = self.dataIn.utctime
41 41 self.dataOut.channelList = self.dataIn.channelList
42 42 #self.dataOut.timeInterval = self.dataIn.timeInterval
43 43 self.dataOut.heightList = self.dataIn.heightList
44 44 self.dataOut.nProfiles = self.dataIn.nProfiles
45 45
46 46 self.dataOut.nCohInt = self.dataIn.nCohInt
47 47 self.dataOut.ippSeconds = self.dataIn.ippSeconds
48 48 self.dataOut.frequency = self.dataIn.frequency
49 49
50 50 self.dataOut.azimuth = self.dataIn.azimuth
51 51 self.dataOut.zenith = self.dataIn.zenith
52 52
53 53 self.dataOut.beam.codeList = self.dataIn.beam.codeList
54 54 self.dataOut.beam.azimuthList = self.dataIn.beam.azimuthList
55 55 self.dataOut.beam.zenithList = self.dataIn.beam.zenithList
56 56 #
57 57 # pass#
58 58 #
59 59 # def init(self):
60 60 #
61 61 #
62 62 # if self.dataIn.type == 'AMISR':
63 63 # self.__updateObjFromAmisrInput()
64 64 #
65 65 # if self.dataIn.type == 'Voltage':
66 66 # self.dataOut.copy(self.dataIn)
67 67 # # No necesita copiar en cada init() los atributos de dataIn
68 68 # # la copia deberia hacerse por cada nuevo bloque de datos
69 69
70 70 def selectChannels(self, channelList):
71 71
72 72 channelIndexList = []
73 73
74 74 for channel in channelList:
75 75 if channel not in self.dataOut.channelList:
76 76 raise ValueError("Channel %d is not in %s" %(channel, str(self.dataOut.channelList)))
77 77
78 78 index = self.dataOut.channelList.index(channel)
79 79 channelIndexList.append(index)
80 80
81 81 self.selectChannelsByIndex(channelIndexList)
82 82
83 83 def selectChannelsByIndex(self, channelIndexList):
84 84 """
85 85 Selecciona un bloque de datos en base a canales segun el channelIndexList
86 86
87 87 Input:
88 88 channelIndexList : lista sencilla de canales a seleccionar por ej. [2,3,7]
89 89
90 90 Affected:
91 91 self.dataOut.data
92 92 self.dataOut.channelIndexList
93 93 self.dataOut.nChannels
94 94 self.dataOut.m_ProcessingHeader.totalSpectra
95 95 self.dataOut.systemHeaderObj.numChannels
96 96 self.dataOut.m_ProcessingHeader.blockSize
97 97
98 98 Return:
99 99 None
100 100 """
101 101
102 102 for channelIndex in channelIndexList:
103 103 if channelIndex not in self.dataOut.channelIndexList:
104 104 print(channelIndexList)
105 105 raise ValueError("The value %d in channelIndexList is not valid" %channelIndex)
106 106
107 107 if self.dataOut.flagDataAsBlock:
108 108 """
109 109 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
110 110 """
111 111 data = self.dataOut.data[channelIndexList,:,:]
112 112 else:
113 113 data = self.dataOut.data[channelIndexList,:]
114 114
115 115 self.dataOut.data = data
116 116 # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
117 117 self.dataOut.channelList = range(len(channelIndexList))
118 118
119 119 return 1
120 120
121 121 def selectHeights(self, minHei=None, maxHei=None):
122 122 """
123 123 Selecciona un bloque de datos en base a un grupo de valores de alturas segun el rango
124 124 minHei <= height <= maxHei
125 125
126 126 Input:
127 127 minHei : valor minimo de altura a considerar
128 128 maxHei : valor maximo de altura a considerar
129 129
130 130 Affected:
131 131 Indirectamente son cambiados varios valores a travez del metodo selectHeightsByIndex
132 132
133 133 Return:
134 134 1 si el metodo se ejecuto con exito caso contrario devuelve 0
135 135 """
136 136
137 137 if minHei == None:
138 138 minHei = self.dataOut.heightList[0]
139 139
140 140 if maxHei == None:
141 141 maxHei = self.dataOut.heightList[-1]
142 142
143 143 if (minHei < self.dataOut.heightList[0]):
144 144 minHei = self.dataOut.heightList[0]
145 145
146 146 if (maxHei > self.dataOut.heightList[-1]):
147 147 maxHei = self.dataOut.heightList[-1]
148 148
149 149 minIndex = 0
150 150 maxIndex = 0
151 151 heights = self.dataOut.heightList
152 152
153 153 inda = numpy.where(heights >= minHei)
154 154 indb = numpy.where(heights <= maxHei)
155 155
156 156 try:
157 157 minIndex = inda[0][0]
158 158 except:
159 159 minIndex = 0
160 160
161 161 try:
162 162 maxIndex = indb[0][-1]
163 163 except:
164 164 maxIndex = len(heights)
165 165
166 166 self.selectHeightsByIndex(minIndex, maxIndex)
167 167
168 168 return 1
169 169
170 170
171 171 def selectHeightsByIndex(self, minIndex, maxIndex):
172 172 """
173 173 Selecciona un bloque de datos en base a un grupo indices de alturas segun el rango
174 174 minIndex <= index <= maxIndex
175 175
176 176 Input:
177 177 minIndex : valor de indice minimo de altura a considerar
178 178 maxIndex : valor de indice maximo de altura a considerar
179 179
180 180 Affected:
181 181 self.dataOut.data
182 182 self.dataOut.heightList
183 183
184 184 Return:
185 185 1 si el metodo se ejecuto con exito caso contrario devuelve 0
186 186 """
187 187
188 188 if (minIndex < 0) or (minIndex > maxIndex):
189 189 raise ValueError("Height index range (%d,%d) is not valid" % (minIndex, maxIndex))
190 190
191 191 if (maxIndex >= self.dataOut.nHeights):
192 192 maxIndex = self.dataOut.nHeights
193 193
194 194 #voltage
195 195 if self.dataOut.flagDataAsBlock:
196 196 """
197 197 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
198 198 """
199 199 data = self.dataOut.data[:,:, minIndex:maxIndex]
200 200 else:
201 201 data = self.dataOut.data[:, minIndex:maxIndex]
202 202
203 203 # firstHeight = self.dataOut.heightList[minIndex]
204 204
205 205 self.dataOut.data = data
206 206 self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex]
207 207
208 208 if self.dataOut.nHeights <= 1:
209 209 raise ValueError("selectHeights: Too few heights. Current number of heights is %d" %(self.dataOut.nHeights))
210 210
211 211 return 1
212 212
213 213
214 214 def filterByHeights(self, window):
215 215
216 216 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
217 217
218 218 if window == None:
219 219 window = (self.dataOut.radarControllerHeaderObj.txA/self.dataOut.radarControllerHeaderObj.nBaud) / deltaHeight
220 220
221 221 newdelta = deltaHeight * window
222 222 r = self.dataOut.nHeights % window
223 223 newheights = (self.dataOut.nHeights-r)/window
224 224
225 225 if newheights <= 1:
226 226 raise ValueError("filterByHeights: Too few heights. Current number of heights is %d and window is %d" %(self.dataOut.nHeights, window))
227 227
228 228 if self.dataOut.flagDataAsBlock:
229 229 """
230 230 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
231 231 """
232 232 buffer = self.dataOut.data[:, :, 0:int(self.dataOut.nHeights-r)]
233 233 buffer = buffer.reshape(self.dataOut.nChannels, self.dataOut.nProfiles, int(self.dataOut.nHeights/window), window)
234 234 buffer = numpy.sum(buffer,3)
235 235
236 236 else:
237 237 buffer = self.dataOut.data[:,0:int(self.dataOut.nHeights-r)]
238 238 buffer = buffer.reshape(self.dataOut.nChannels,int(self.dataOut.nHeights/window),int(window))
239 239 buffer = numpy.sum(buffer,2)
240 240
241 241 self.dataOut.data = buffer
242 242 self.dataOut.heightList = self.dataOut.heightList[0] + numpy.arange( newheights )*newdelta
243 243 self.dataOut.windowOfFilter = window
244 244
245 245 def setH0(self, h0, deltaHeight = None):
246 246
247 247 if not deltaHeight:
248 248 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
249 249
250 250 nHeights = self.dataOut.nHeights
251 251
252 252 newHeiRange = h0 + numpy.arange(nHeights)*deltaHeight
253 253
254 254 self.dataOut.heightList = newHeiRange
255 255
256 256 def deFlip(self, channelList = []):
257 257
258 258 data = self.dataOut.data.copy()
259 259
260 260 if self.dataOut.flagDataAsBlock:
261 261 flip = self.flip
262 262 profileList = list(range(self.dataOut.nProfiles))
263 263
264 264 if not channelList:
265 265 for thisProfile in profileList:
266 266 data[:,thisProfile,:] = data[:,thisProfile,:]*flip
267 267 flip *= -1.0
268 268 else:
269 269 for thisChannel in channelList:
270 270 if thisChannel not in self.dataOut.channelList:
271 271 continue
272 272
273 273 for thisProfile in profileList:
274 274 data[thisChannel,thisProfile,:] = data[thisChannel,thisProfile,:]*flip
275 275 flip *= -1.0
276 276
277 277 self.flip = flip
278 278
279 279 else:
280 280 if not channelList:
281 281 data[:,:] = data[:,:]*self.flip
282 282 else:
283 283 for thisChannel in channelList:
284 284 if thisChannel not in self.dataOut.channelList:
285 285 continue
286 286
287 287 data[thisChannel,:] = data[thisChannel,:]*self.flip
288 288
289 289 self.flip *= -1.
290 290
291 291 self.dataOut.data = data
292 292
293 293 def setRadarFrequency(self, frequency=None):
294 294
295 295 if frequency != None:
296 296 self.dataOut.frequency = frequency
297 297
298 298 return 1
299 299
300 300 def interpolateHeights(self, topLim, botLim):
301 301 #69 al 72 para julia
302 302 #82-84 para meteoros
303 303 if len(numpy.shape(self.dataOut.data))==2:
304 304 sampInterp = (self.dataOut.data[:,botLim-1] + self.dataOut.data[:,topLim+1])/2
305 305 sampInterp = numpy.transpose(numpy.tile(sampInterp,(topLim-botLim + 1,1)))
306 306 #self.dataOut.data[:,botLim:limSup+1] = sampInterp
307 307 self.dataOut.data[:,botLim:topLim+1] = sampInterp
308 308 else:
309 309 nHeights = self.dataOut.data.shape[2]
310 310 x = numpy.hstack((numpy.arange(botLim),numpy.arange(topLim+1,nHeights)))
311 311 y = self.dataOut.data[:,:,list(range(botLim))+list(range(topLim+1,nHeights))]
312 312 f = interpolate.interp1d(x, y, axis = 2)
313 313 xnew = numpy.arange(botLim,topLim+1)
314 314 ynew = f(xnew)
315 315
316 316 self.dataOut.data[:,:,botLim:topLim+1] = ynew
317 317
318 318 # import collections
319 319
320 320 class CohInt(Operation):
321 321
322 322 isConfig = False
323 323 __profIndex = 0
324 324 __byTime = False
325 325 __initime = None
326 326 __lastdatatime = None
327 327 __integrationtime = None
328 328 __buffer = None
329 329 __bufferStride = []
330 330 __dataReady = False
331 331 __profIndexStride = 0
332 332 __dataToPutStride = False
333 333 n = None
334 334
335 335 def __init__(self, **kwargs):
336 336
337 337 Operation.__init__(self, **kwargs)
338 338
339 339 # self.isConfig = False
340 340
341 341 def setup(self, n=None, timeInterval=None, stride=None, overlapping=False, byblock=False):
342 342 """
343 343 Set the parameters of the integration class.
344 344
345 345 Inputs:
346 346
347 347 n : Number of coherent integrations
348 348 timeInterval : Time of integration. If the parameter "n" is selected this one does not work
349 349 overlapping :
350 350 """
351 351
352 352 self.__initime = None
353 353 self.__lastdatatime = 0
354 354 self.__buffer = None
355 355 self.__dataReady = False
356 356 self.byblock = byblock
357 357 self.stride = stride
358 358
359 359 if n == None and timeInterval == None:
360 360 raise ValueError("n or timeInterval should be specified ...")
361 361
362 362 if n != None:
363 363 self.n = n
364 364 self.__byTime = False
365 365 else:
366 366 self.__integrationtime = timeInterval #* 60. #if (type(timeInterval)!=integer) -> change this line
367 367 self.n = 9999
368 368 self.__byTime = True
369 369
370 370 if overlapping:
371 371 self.__withOverlapping = True
372 372 self.__buffer = None
373 373 else:
374 374 self.__withOverlapping = False
375 375 self.__buffer = 0
376 376
377 377 self.__profIndex = 0
378 378
379 379 def putData(self, data):
380 380
381 381 """
382 382 Add a profile to the __buffer and increase in one the __profileIndex
383 383
384 384 """
385 385
386 386 if not self.__withOverlapping:
387 387 self.__buffer += data.copy()
388 388 self.__profIndex += 1
389 389 return
390 390
391 391 #Overlapping data
392 392 nChannels, nHeis = data.shape
393 393 data = numpy.reshape(data, (1, nChannels, nHeis))
394 394
395 395 #If the buffer is empty then it takes the data value
396 396 if self.__buffer is None:
397 397 self.__buffer = data
398 398 self.__profIndex += 1
399 399 return
400 400
401 401 #If the buffer length is lower than n then stakcing the data value
402 402 if self.__profIndex < self.n:
403 403 self.__buffer = numpy.vstack((self.__buffer, data))
404 404 self.__profIndex += 1
405 405 return
406 406
407 407 #If the buffer length is equal to n then replacing the last buffer value with the data value
408 408 self.__buffer = numpy.roll(self.__buffer, -1, axis=0)
409 409 self.__buffer[self.n-1] = data
410 410 self.__profIndex = self.n
411 411 return
412 412
413 413
414 414 def pushData(self):
415 415 """
416 416 Return the sum of the last profiles and the profiles used in the sum.
417 417
418 418 Affected:
419 419
420 420 self.__profileIndex
421 421
422 422 """
423 423
424 424 if not self.__withOverlapping:
425 425 data = self.__buffer
426 426 n = self.__profIndex
427 427
428 428 self.__buffer = 0
429 429 self.__profIndex = 0
430 430
431 431 return data, n
432 432
433 433 #Integration with Overlapping
434 434 data = numpy.sum(self.__buffer, axis=0)
435 435 # print data
436 436 # raise
437 437 n = self.__profIndex
438 438
439 439 return data, n
440 440
441 441 def byProfiles(self, data):
442 442
443 443 self.__dataReady = False
444 444 avgdata = None
445 445 # n = None
446 446 # print data
447 447 # raise
448 448 self.putData(data)
449 449
450 450 if self.__profIndex == self.n:
451 451 avgdata, n = self.pushData()
452 452 self.__dataReady = True
453 453
454 454 return avgdata
455 455
456 456 def byTime(self, data, datatime):
457 457
458 458 self.__dataReady = False
459 459 avgdata = None
460 460 n = None
461 461
462 462 self.putData(data)
463 463
464 464 if (datatime - self.__initime) >= self.__integrationtime:
465 465 avgdata, n = self.pushData()
466 466 self.n = n
467 467 self.__dataReady = True
468 468
469 469 return avgdata
470 470
471 471 def integrateByStride(self, data, datatime):
472 472 # print data
473 473 if self.__profIndex == 0:
474 474 self.__buffer = [[data.copy(), datatime]]
475 475 else:
476 476 self.__buffer.append([data.copy(),datatime])
477 477 self.__profIndex += 1
478 478 self.__dataReady = False
479 479
480 480 if self.__profIndex == self.n * self.stride :
481 481 self.__dataToPutStride = True
482 482 self.__profIndexStride = 0
483 483 self.__profIndex = 0
484 484 self.__bufferStride = []
485 485 for i in range(self.stride):
486 486 current = self.__buffer[i::self.stride]
487 487 data = numpy.sum([t[0] for t in current], axis=0)
488 488 avgdatatime = numpy.average([t[1] for t in current])
489 489 # print data
490 490 self.__bufferStride.append((data, avgdatatime))
491 491
492 492 if self.__dataToPutStride:
493 493 self.__dataReady = True
494 494 self.__profIndexStride += 1
495 495 if self.__profIndexStride == self.stride:
496 496 self.__dataToPutStride = False
497 497 # print self.__bufferStride[self.__profIndexStride - 1]
498 498 # raise
499 499 return self.__bufferStride[self.__profIndexStride - 1]
500 500
501 501
502 502 return None, None
503 503
504 504 def integrate(self, data, datatime=None):
505 505
506 506 if self.__initime == None:
507 507 self.__initime = datatime
508 508
509 509 if self.__byTime:
510 510 avgdata = self.byTime(data, datatime)
511 511 else:
512 512 avgdata = self.byProfiles(data)
513 513
514 514
515 515 self.__lastdatatime = datatime
516 516
517 517 if avgdata is None:
518 518 return None, None
519 519
520 520 avgdatatime = self.__initime
521 521
522 522 deltatime = datatime - self.__lastdatatime
523 523
524 524 if not self.__withOverlapping:
525 525 self.__initime = datatime
526 526 else:
527 527 self.__initime += deltatime
528 528
529 529 return avgdata, avgdatatime
530 530
531 531 def integrateByBlock(self, dataOut):
532 532
533 533 times = int(dataOut.data.shape[1]/self.n)
534 534 avgdata = numpy.zeros((dataOut.nChannels, times, dataOut.nHeights), dtype=numpy.complex)
535 535
536 536 id_min = 0
537 537 id_max = self.n
538 538
539 539 for i in range(times):
540 540 junk = dataOut.data[:,id_min:id_max,:]
541 541 avgdata[:,i,:] = junk.sum(axis=1)
542 542 id_min += self.n
543 543 id_max += self.n
544 544
545 545 timeInterval = dataOut.ippSeconds*self.n
546 546 avgdatatime = (times - 1) * timeInterval + dataOut.utctime
547 547 self.__dataReady = True
548 548 return avgdata, avgdatatime
549 549
550 550 def run(self, dataOut, n=None, timeInterval=None, stride=None, overlapping=False, byblock=False, **kwargs):
551 551
552 552 if not self.isConfig:
553 553 self.setup(n=n, stride=stride, timeInterval=timeInterval, overlapping=overlapping, byblock=byblock, **kwargs)
554 554 self.isConfig = True
555 555
556 556 if dataOut.flagDataAsBlock:
557 557 """
558 558 Si la data es leida por bloques, dimension = [nChannels, nProfiles, nHeis]
559 559 """
560 560 avgdata, avgdatatime = self.integrateByBlock(dataOut)
561 561 dataOut.nProfiles /= self.n
562 562 else:
563 563 if stride is None:
564 564 avgdata, avgdatatime = self.integrate(dataOut.data, dataOut.utctime)
565 565 else:
566 566 avgdata, avgdatatime = self.integrateByStride(dataOut.data, dataOut.utctime)
567 567
568 568
569 569 # dataOut.timeInterval *= n
570 570 dataOut.flagNoData = True
571 571
572 572 if self.__dataReady:
573 573 dataOut.data = avgdata
574 574 dataOut.nCohInt *= self.n
575 575 dataOut.utctime = avgdatatime
576 576 # print avgdata, avgdatatime
577 577 # raise
578 578 # dataOut.timeInterval = dataOut.ippSeconds * dataOut.nCohInt
579 579 dataOut.flagNoData = False
580 580 return dataOut
581 581
582 582 class Decoder(Operation):
583 583
584 584 isConfig = False
585 585 __profIndex = 0
586 586
587 587 code = None
588 588
589 589 nCode = None
590 590 nBaud = None
591 591
592 592 def __init__(self, **kwargs):
593 593
594 594 Operation.__init__(self, **kwargs)
595 595
596 596 self.times = None
597 597 self.osamp = None
598 598 # self.__setValues = False
599 599 self.isConfig = False
600 600 self.setupReq = False
601 601 def setup(self, code, osamp, dataOut):
602 602
603 603 self.__profIndex = 0
604 604
605 605 self.code = code
606 606
607 607 self.nCode = len(code)
608 608 self.nBaud = len(code[0])
609
610 609 if (osamp != None) and (osamp >1):
611 610 self.osamp = osamp
612 611 self.code = numpy.repeat(code, repeats=self.osamp, axis=1)
613 612 self.nBaud = self.nBaud*self.osamp
614 613
615 614 self.__nChannels = dataOut.nChannels
616 615 self.__nProfiles = dataOut.nProfiles
617 616 self.__nHeis = dataOut.nHeights
618 617
619 618 if self.__nHeis < self.nBaud:
620 619 raise ValueError('Number of heights (%d) should be greater than number of bauds (%d)' %(self.__nHeis, self.nBaud))
621 620
622 621 #Frequency
623 622 __codeBuffer = numpy.zeros((self.nCode, self.__nHeis), dtype=numpy.complex)
624 623
625 624 __codeBuffer[:,0:self.nBaud] = self.code
626 625
627 626 self.fft_code = numpy.conj(numpy.fft.fft(__codeBuffer, axis=1))
628 627
629 628 if dataOut.flagDataAsBlock:
630 629
631 630 self.ndatadec = self.__nHeis #- self.nBaud + 1
632 631
633 632 self.datadecTime = numpy.zeros((self.__nChannels, self.__nProfiles, self.ndatadec), dtype=numpy.complex)
634 633
635 634 else:
636 635
637 636 #Time
638 637 self.ndatadec = self.__nHeis #- self.nBaud + 1
639 638
640 639 self.datadecTime = numpy.zeros((self.__nChannels, self.ndatadec), dtype=numpy.complex)
641 640
642 641 def __convolutionInFreq(self, data):
643 642
644 643 fft_code = self.fft_code[self.__profIndex].reshape(1,-1)
645 644
646 645 fft_data = numpy.fft.fft(data, axis=1)
647 646
648 647 conv = fft_data*fft_code
649 648
650 649 data = numpy.fft.ifft(conv,axis=1)
651 650
652 651 return data
653 652
654 653 def __convolutionInFreqOpt(self, data):
655 654
656 655 raise NotImplementedError
657 656
658 657 def __convolutionInTime(self, data):
659 658
660 659 code = self.code[self.__profIndex]
661 660 for i in range(self.__nChannels):
662 661 self.datadecTime[i,:] = numpy.correlate(data[i,:], code, mode='full')[self.nBaud-1:]
663 662
664 663 return self.datadecTime
665 664
666 665 def __convolutionByBlockInTime(self, data):
667 666
668 667 repetitions = int(self.__nProfiles / self.nCode)
669 668 junk = numpy.lib.stride_tricks.as_strided(self.code, (repetitions, self.code.size), (0, self.code.itemsize))
670 669 junk = junk.flatten()
671 670 code_block = numpy.reshape(junk, (self.nCode*repetitions, self.nBaud))
672 671 profilesList = range(self.__nProfiles)
673 672
674 673 for i in range(self.__nChannels):
675 674 for j in profilesList:
676 675 self.datadecTime[i,j,:] = numpy.correlate(data[i,j,:], code_block[j,:], mode='full')[self.nBaud-1:]
677 676 return self.datadecTime
678 677
679 678 def __convolutionByBlockInFreq(self, data):
680 679
681 680 raise NotImplementedError("Decoder by frequency fro Blocks not implemented")
682 681
683 682
684 683 fft_code = self.fft_code[self.__profIndex].reshape(1,-1)
685 684
686 685 fft_data = numpy.fft.fft(data, axis=2)
687 686
688 687 conv = fft_data*fft_code
689 688
690 689 data = numpy.fft.ifft(conv,axis=2)
691 690
692 691 return data
693 692
694 693
695 694 def run(self, dataOut, code=None, nCode=None, nBaud=None, mode = 0, osamp=None, times=None):
696 695
697 696 if dataOut.flagDecodeData:
698 697 print("This data is already decoded, recoding again ...")
699 698
700 699 if not self.isConfig:
701 700
702 701 if code is None:
703 702 if dataOut.code is None:
704 703 raise ValueError("Code could not be read from %s instance. Enter a value in Code parameter" %dataOut.type)
705 704
706 705 code = dataOut.code
707 706 else:
708 707 code = numpy.array(code).reshape(nCode,nBaud)
709 708 self.setup(code, osamp, dataOut)
710 709
711 710 self.isConfig = True
712 711
713 712 if mode == 3:
714 713 sys.stderr.write("Decoder Warning: mode=%d is not valid, using mode=0\n" %mode)
715 714
716 715 if times != None:
717 716 sys.stderr.write("Decoder Warning: Argument 'times' in not used anymore\n")
718 717
719 718 if self.code is None:
720 719 print("Fail decoding: Code is not defined.")
721 720 return
722 721
723 722 self.__nProfiles = dataOut.nProfiles
724 723 datadec = None
725 724
726 725 if mode == 3:
727 726 mode = 0
728 727
729 728 if dataOut.flagDataAsBlock:
730 729 """
731 730 Decoding when data have been read as block,
732 731 """
733 732
734 733 if mode == 0:
735 734 datadec = self.__convolutionByBlockInTime(dataOut.data)
736 735 if mode == 1:
737 736 datadec = self.__convolutionByBlockInFreq(dataOut.data)
738 737 else:
739 738 """
740 739 Decoding when data have been read profile by profile
741 740 """
742 741 if mode == 0:
743 742 datadec = self.__convolutionInTime(dataOut.data)
744 743
745 744 if mode == 1:
746 745 datadec = self.__convolutionInFreq(dataOut.data)
747 746
748 747 if mode == 2:
749 748 datadec = self.__convolutionInFreqOpt(dataOut.data)
750 749
751 750 if datadec is None:
752 751 raise ValueError("Codification mode selected is not valid: mode=%d. Try selecting 0 or 1" %mode)
753 752
754 753 dataOut.code = self.code
755 754 dataOut.nCode = self.nCode
756 755 dataOut.nBaud = self.nBaud
757 756
758 757 dataOut.data = datadec
759 758
760 759 dataOut.heightList = dataOut.heightList[0:datadec.shape[-1]]
761 760
762 761 dataOut.flagDecodeData = True #asumo q la data esta decodificada
763 762
764 763 if self.__profIndex == self.nCode-1:
765 764 self.__profIndex = 0
766 765 return dataOut
767 766
768 767 self.__profIndex += 1
769 768
770 769 return dataOut
771 770 # dataOut.flagDeflipData = True #asumo q la data no esta sin flip
772 771
773 772
774 773 class ProfileConcat(Operation):
775 774
776 775 isConfig = False
777 776 buffer = None
778 777
779 778 def __init__(self, **kwargs):
780 779
781 780 Operation.__init__(self, **kwargs)
782 781 self.profileIndex = 0
783 782
784 783 def reset(self):
785 784 self.buffer = numpy.zeros_like(self.buffer)
786 785 self.start_index = 0
787 786 self.times = 1
788 787
789 788 def setup(self, data, m, n=1):
790 789 self.buffer = numpy.zeros((data.shape[0],data.shape[1]*m),dtype=type(data[0,0]))
791 790 self.nHeights = data.shape[1]#.nHeights
792 791 self.start_index = 0
793 792 self.times = 1
794 793
795 794 def concat(self, data):
796 795
797 796 self.buffer[:,self.start_index:self.nHeights*self.times] = data.copy()
798 797 self.start_index = self.start_index + self.nHeights
799 798
800 799 def run(self, dataOut, m):
801 800 dataOut.flagNoData = True
802 801
803 802 if not self.isConfig:
804 803 self.setup(dataOut.data, m, 1)
805 804 self.isConfig = True
806 805
807 806 if dataOut.flagDataAsBlock:
808 807 raise ValueError("ProfileConcat can only be used when voltage have been read profile by profile, getBlock = False")
809 808
810 809 else:
811 810 self.concat(dataOut.data)
812 811 self.times += 1
813 812 if self.times > m:
814 813 dataOut.data = self.buffer
815 814 self.reset()
816 815 dataOut.flagNoData = False
817 816 # se deben actualizar mas propiedades del header y del objeto dataOut, por ejemplo, las alturas
818 817 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
819 818 xf = dataOut.heightList[0] + dataOut.nHeights * deltaHeight * m
820 819 dataOut.heightList = numpy.arange(dataOut.heightList[0], xf, deltaHeight)
821 820 dataOut.ippSeconds *= m
822 821 return dataOut
823 822
824 823 class ProfileSelector(Operation):
825 824
826 825 profileIndex = None
827 826 # Tamanho total de los perfiles
828 827 nProfiles = None
829 828
830 829 def __init__(self, **kwargs):
831 830
832 831 Operation.__init__(self, **kwargs)
833 832 self.profileIndex = 0
834 833
835 834 def incProfileIndex(self):
836 835
837 836 self.profileIndex += 1
838 837
839 838 if self.profileIndex >= self.nProfiles:
840 839 self.profileIndex = 0
841 840
842 841 def isThisProfileInRange(self, profileIndex, minIndex, maxIndex):
843 842
844 843 if profileIndex < minIndex:
845 844 return False
846 845
847 846 if profileIndex > maxIndex:
848 847 return False
849 848
850 849 return True
851 850
852 851 def isThisProfileInList(self, profileIndex, profileList):
853 852
854 853 if profileIndex not in profileList:
855 854 return False
856 855
857 856 return True
858 857
859 858 def run(self, dataOut, profileList=None, profileRangeList=None, beam=None, byblock=False, rangeList = None, nProfiles=None):
860 859
861 860 """
862 861 ProfileSelector:
863 862
864 863 Inputs:
865 864 profileList : Index of profiles selected. Example: profileList = (0,1,2,7,8)
866 865
867 866 profileRangeList : Minimum and maximum profile indexes. Example: profileRangeList = (4, 30)
868 867
869 868 rangeList : List of profile ranges. Example: rangeList = ((4, 30), (32, 64), (128, 256))
870 869
871 870 """
872 871
873 872 if rangeList is not None:
874 873 if type(rangeList[0]) not in (tuple, list):
875 874 rangeList = [rangeList]
876 875
877 876 dataOut.flagNoData = True
878 877
879 878 if dataOut.flagDataAsBlock:
880 879 """
881 880 data dimension = [nChannels, nProfiles, nHeis]
882 881 """
883 882 if profileList != None:
884 883 dataOut.data = dataOut.data[:,profileList,:]
885 884
886 885 if profileRangeList != None:
887 886 minIndex = profileRangeList[0]
888 887 maxIndex = profileRangeList[1]
889 888 profileList = list(range(minIndex, maxIndex+1))
890 889
891 890 dataOut.data = dataOut.data[:,minIndex:maxIndex+1,:]
892 891
893 892 if rangeList != None:
894 893
895 894 profileList = []
896 895
897 896 for thisRange in rangeList:
898 897 minIndex = thisRange[0]
899 898 maxIndex = thisRange[1]
900 899
901 900 profileList.extend(list(range(minIndex, maxIndex+1)))
902 901
903 902 dataOut.data = dataOut.data[:,profileList,:]
904 903
905 904 dataOut.nProfiles = len(profileList)
906 905 dataOut.profileIndex = dataOut.nProfiles - 1
907 906 dataOut.flagNoData = False
908 907
909 908 return dataOut
910 909
911 910 """
912 911 data dimension = [nChannels, nHeis]
913 912 """
914 913
915 914 if profileList != None:
916 915
917 916 if self.isThisProfileInList(dataOut.profileIndex, profileList):
918 917
919 918 self.nProfiles = len(profileList)
920 919 dataOut.nProfiles = self.nProfiles
921 920 dataOut.profileIndex = self.profileIndex
922 921 dataOut.flagNoData = False
923 922
924 923 self.incProfileIndex()
925 924 return dataOut
926 925
927 926 if profileRangeList != None:
928 927
929 928 minIndex = profileRangeList[0]
930 929 maxIndex = profileRangeList[1]
931 930
932 931 if self.isThisProfileInRange(dataOut.profileIndex, minIndex, maxIndex):
933 932
934 933 self.nProfiles = maxIndex - minIndex + 1
935 934 dataOut.nProfiles = self.nProfiles
936 935 dataOut.profileIndex = self.profileIndex
937 936 dataOut.flagNoData = False
938 937
939 938 self.incProfileIndex()
940 939 return dataOut
941 940
942 941 if rangeList != None:
943 942
944 943 nProfiles = 0
945 944
946 945 for thisRange in rangeList:
947 946 minIndex = thisRange[0]
948 947 maxIndex = thisRange[1]
949 948
950 949 nProfiles += maxIndex - minIndex + 1
951 950
952 951 for thisRange in rangeList:
953 952
954 953 minIndex = thisRange[0]
955 954 maxIndex = thisRange[1]
956 955
957 956 if self.isThisProfileInRange(dataOut.profileIndex, minIndex, maxIndex):
958 957
959 958 self.nProfiles = nProfiles
960 959 dataOut.nProfiles = self.nProfiles
961 960 dataOut.profileIndex = self.profileIndex
962 961 dataOut.flagNoData = False
963 962
964 963 self.incProfileIndex()
965 964
966 965 break
967 966
968 967 return dataOut
969 968
970 969
971 970 if beam != None: #beam is only for AMISR data
972 971 if self.isThisProfileInList(dataOut.profileIndex, dataOut.beamRangeDict[beam]):
973 972 dataOut.flagNoData = False
974 973 dataOut.profileIndex = self.profileIndex
975 974
976 975 self.incProfileIndex()
977 976
978 977 return dataOut
979 978
980 979 raise ValueError("ProfileSelector needs profileList, profileRangeList or rangeList parameter")
981 980
982 981 #return False
983 982 return dataOut
984 983
985 984 class Reshaper(Operation):
986 985
987 986 def __init__(self, **kwargs):
988 987
989 988 Operation.__init__(self, **kwargs)
990 989
991 990 self.__buffer = None
992 991 self.__nitems = 0
993 992
994 993 def __appendProfile(self, dataOut, nTxs):
995 994
996 995 if self.__buffer is None:
997 996 shape = (dataOut.nChannels, int(dataOut.nHeights/nTxs) )
998 997 self.__buffer = numpy.empty(shape, dtype = dataOut.data.dtype)
999 998
1000 999 ini = dataOut.nHeights * self.__nitems
1001 1000 end = ini + dataOut.nHeights
1002 1001
1003 1002 self.__buffer[:, ini:end] = dataOut.data
1004 1003
1005 1004 self.__nitems += 1
1006 1005
1007 1006 return int(self.__nitems*nTxs)
1008 1007
1009 1008 def __getBuffer(self):
1010 1009
1011 1010 if self.__nitems == int(1./self.__nTxs):
1012 1011
1013 1012 self.__nitems = 0
1014 1013
1015 1014 return self.__buffer.copy()
1016 1015
1017 1016 return None
1018 1017
1019 1018 def __checkInputs(self, dataOut, shape, nTxs):
1020 1019
1021 1020 if shape is None and nTxs is None:
1022 1021 raise ValueError("Reshaper: shape of factor should be defined")
1023 1022
1024 1023 if nTxs:
1025 1024 if nTxs < 0:
1026 1025 raise ValueError("nTxs should be greater than 0")
1027 1026
1028 1027 if nTxs < 1 and dataOut.nProfiles % (1./nTxs) != 0:
1029 1028 raise ValueError("nProfiles= %d is not divisibled by (1./nTxs) = %f" %(dataOut.nProfiles, (1./nTxs)))
1030 1029
1031 1030 shape = [dataOut.nChannels, dataOut.nProfiles*nTxs, dataOut.nHeights/nTxs]
1032 1031
1033 1032 return shape, nTxs
1034 1033
1035 1034 if len(shape) != 2 and len(shape) != 3:
1036 1035 raise ValueError("shape dimension should be equal to 2 or 3. shape = (nProfiles, nHeis) or (nChannels, nProfiles, nHeis). Actually shape = (%d, %d, %d)" %(dataOut.nChannels, dataOut.nProfiles, dataOut.nHeights))
1037 1036
1038 1037 if len(shape) == 2:
1039 1038 shape_tuple = [dataOut.nChannels]
1040 1039 shape_tuple.extend(shape)
1041 1040 else:
1042 1041 shape_tuple = list(shape)
1043 1042
1044 1043 nTxs = 1.0*shape_tuple[1]/dataOut.nProfiles
1045 1044
1046 1045 return shape_tuple, nTxs
1047 1046
1048 1047 def run(self, dataOut, shape=None, nTxs=None):
1049 1048
1050 1049 shape_tuple, self.__nTxs = self.__checkInputs(dataOut, shape, nTxs)
1051 1050
1052 1051 dataOut.flagNoData = True
1053 1052 profileIndex = None
1054 1053
1055 1054 if dataOut.flagDataAsBlock:
1056 1055
1057 1056 dataOut.data = numpy.reshape(dataOut.data, shape_tuple)
1058 1057 dataOut.flagNoData = False
1059 1058
1060 1059 profileIndex = int(dataOut.nProfiles*self.__nTxs) - 1
1061 1060
1062 1061 else:
1063 1062
1064 1063 if self.__nTxs < 1:
1065 1064
1066 1065 self.__appendProfile(dataOut, self.__nTxs)
1067 1066 new_data = self.__getBuffer()
1068 1067
1069 1068 if new_data is not None:
1070 1069 dataOut.data = new_data
1071 1070 dataOut.flagNoData = False
1072 1071
1073 1072 profileIndex = dataOut.profileIndex*nTxs
1074 1073
1075 1074 else:
1076 1075 raise ValueError("nTxs should be greater than 0 and lower than 1, or use VoltageReader(..., getblock=True)")
1077 1076
1078 1077 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
1079 1078
1080 1079 dataOut.heightList = numpy.arange(dataOut.nHeights/self.__nTxs) * deltaHeight + dataOut.heightList[0]
1081 1080
1082 1081 dataOut.nProfiles = int(dataOut.nProfiles*self.__nTxs)
1083 1082
1084 1083 dataOut.profileIndex = profileIndex
1085 1084
1086 1085 dataOut.ippSeconds /= self.__nTxs
1087 1086
1088 1087 return dataOut
1089 1088
1090 1089 class SplitProfiles(Operation):
1091 1090
1092 1091 def __init__(self, **kwargs):
1093 1092
1094 1093 Operation.__init__(self, **kwargs)
1095 1094
1096 1095 def run(self, dataOut, n):
1097 1096
1098 1097 dataOut.flagNoData = True
1099 1098 profileIndex = None
1100 1099
1101 1100 if dataOut.flagDataAsBlock:
1102 1101
1103 1102 #nchannels, nprofiles, nsamples
1104 1103 shape = dataOut.data.shape
1105 1104
1106 1105 if shape[2] % n != 0:
1107 1106 raise ValueError("Could not split the data, n=%d has to be multiple of %d" %(n, shape[2]))
1108 1107
1109 1108 new_shape = shape[0], shape[1]*n, int(shape[2]/n)
1110 1109
1111 1110 dataOut.data = numpy.reshape(dataOut.data, new_shape)
1112 1111 dataOut.flagNoData = False
1113 1112
1114 1113 profileIndex = int(dataOut.nProfiles/n) - 1
1115 1114
1116 1115 else:
1117 1116
1118 1117 raise ValueError("Could not split the data when is read Profile by Profile. Use VoltageReader(..., getblock=True)")
1119 1118
1120 1119 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
1121 1120
1122 1121 dataOut.heightList = numpy.arange(dataOut.nHeights/n) * deltaHeight + dataOut.heightList[0]
1123 1122
1124 1123 dataOut.nProfiles = int(dataOut.nProfiles*n)
1125 1124
1126 1125 dataOut.profileIndex = profileIndex
1127 1126
1128 1127 dataOut.ippSeconds /= n
1129 1128
1130 1129 return dataOut
1131 1130
1132 1131 class CombineProfiles(Operation):
1133 1132 def __init__(self, **kwargs):
1134 1133
1135 1134 Operation.__init__(self, **kwargs)
1136 1135
1137 1136 self.__remData = None
1138 1137 self.__profileIndex = 0
1139 1138
1140 1139 def run(self, dataOut, n):
1141 1140
1142 1141 dataOut.flagNoData = True
1143 1142 profileIndex = None
1144 1143
1145 1144 if dataOut.flagDataAsBlock:
1146 1145
1147 1146 #nchannels, nprofiles, nsamples
1148 1147 shape = dataOut.data.shape
1149 1148 new_shape = shape[0], shape[1]/n, shape[2]*n
1150 1149
1151 1150 if shape[1] % n != 0:
1152 1151 raise ValueError("Could not split the data, n=%d has to be multiple of %d" %(n, shape[1]))
1153 1152
1154 1153 dataOut.data = numpy.reshape(dataOut.data, new_shape)
1155 1154 dataOut.flagNoData = False
1156 1155
1157 1156 profileIndex = int(dataOut.nProfiles*n) - 1
1158 1157
1159 1158 else:
1160 1159
1161 1160 #nchannels, nsamples
1162 1161 if self.__remData is None:
1163 1162 newData = dataOut.data
1164 1163 else:
1165 1164 newData = numpy.concatenate((self.__remData, dataOut.data), axis=1)
1166 1165
1167 1166 self.__profileIndex += 1
1168 1167
1169 1168 if self.__profileIndex < n:
1170 1169 self.__remData = newData
1171 1170 #continue
1172 1171 return
1173 1172
1174 1173 self.__profileIndex = 0
1175 1174 self.__remData = None
1176 1175
1177 1176 dataOut.data = newData
1178 1177 dataOut.flagNoData = False
1179 1178
1180 1179 profileIndex = dataOut.profileIndex/n
1181 1180
1182 1181
1183 1182 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
1184 1183
1185 1184 dataOut.heightList = numpy.arange(dataOut.nHeights*n) * deltaHeight + dataOut.heightList[0]
1186 1185
1187 1186 dataOut.nProfiles = int(dataOut.nProfiles/n)
1188 1187
1189 1188 dataOut.profileIndex = profileIndex
1190 1189
1191 1190 dataOut.ippSeconds *= n
1192 1191
1193 1192 return dataOut
1194 1193 # import collections
1195 1194 # from scipy.stats import mode
1196 1195 #
1197 1196 # class Synchronize(Operation):
1198 1197 #
1199 1198 # isConfig = False
1200 1199 # __profIndex = 0
1201 1200 #
1202 1201 # def __init__(self, **kwargs):
1203 1202 #
1204 1203 # Operation.__init__(self, **kwargs)
1205 1204 # # self.isConfig = False
1206 1205 # self.__powBuffer = None
1207 1206 # self.__startIndex = 0
1208 1207 # self.__pulseFound = False
1209 1208 #
1210 1209 # def __findTxPulse(self, dataOut, channel=0, pulse_with = None):
1211 1210 #
1212 1211 # #Read data
1213 1212 #
1214 1213 # powerdB = dataOut.getPower(channel = channel)
1215 1214 # noisedB = dataOut.getNoise(channel = channel)[0]
1216 1215 #
1217 1216 # self.__powBuffer.extend(powerdB.flatten())
1218 1217 #
1219 1218 # dataArray = numpy.array(self.__powBuffer)
1220 1219 #
1221 1220 # filteredPower = numpy.correlate(dataArray, dataArray[0:self.__nSamples], "same")
1222 1221 #
1223 1222 # maxValue = numpy.nanmax(filteredPower)
1224 1223 #
1225 1224 # if maxValue < noisedB + 10:
1226 1225 # #No se encuentra ningun pulso de transmision
1227 1226 # return None
1228 1227 #
1229 1228 # maxValuesIndex = numpy.where(filteredPower > maxValue - 0.1*abs(maxValue))[0]
1230 1229 #
1231 1230 # if len(maxValuesIndex) < 2:
1232 1231 # #Solo se encontro un solo pulso de transmision de un baudio, esperando por el siguiente TX
1233 1232 # return None
1234 1233 #
1235 1234 # phasedMaxValuesIndex = maxValuesIndex - self.__nSamples
1236 1235 #
1237 1236 # #Seleccionar solo valores con un espaciamiento de nSamples
1238 1237 # pulseIndex = numpy.intersect1d(maxValuesIndex, phasedMaxValuesIndex)
1239 1238 #
1240 1239 # if len(pulseIndex) < 2:
1241 1240 # #Solo se encontro un pulso de transmision con ancho mayor a 1
1242 1241 # return None
1243 1242 #
1244 1243 # spacing = pulseIndex[1:] - pulseIndex[:-1]
1245 1244 #
1246 1245 # #remover senales que se distancien menos de 10 unidades o muestras
1247 1246 # #(No deberian existir IPP menor a 10 unidades)
1248 1247 #
1249 1248 # realIndex = numpy.where(spacing > 10 )[0]
1250 1249 #
1251 1250 # if len(realIndex) < 2:
1252 1251 # #Solo se encontro un pulso de transmision con ancho mayor a 1
1253 1252 # return None
1254 1253 #
1255 1254 # #Eliminar pulsos anchos (deja solo la diferencia entre IPPs)
1256 1255 # realPulseIndex = pulseIndex[realIndex]
1257 1256 #
1258 1257 # period = mode(realPulseIndex[1:] - realPulseIndex[:-1])[0][0]
1259 1258 #
1260 1259 # print "IPP = %d samples" %period
1261 1260 #
1262 1261 # self.__newNSamples = dataOut.nHeights #int(period)
1263 1262 # self.__startIndex = int(realPulseIndex[0])
1264 1263 #
1265 1264 # return 1
1266 1265 #
1267 1266 #
1268 1267 # def setup(self, nSamples, nChannels, buffer_size = 4):
1269 1268 #
1270 1269 # self.__powBuffer = collections.deque(numpy.zeros( buffer_size*nSamples,dtype=numpy.float),
1271 1270 # maxlen = buffer_size*nSamples)
1272 1271 #
1273 1272 # bufferList = []
1274 1273 #
1275 1274 # for i in range(nChannels):
1276 1275 # bufferByChannel = collections.deque(numpy.zeros( buffer_size*nSamples, dtype=numpy.complex) + numpy.NAN,
1277 1276 # maxlen = buffer_size*nSamples)
1278 1277 #
1279 1278 # bufferList.append(bufferByChannel)
1280 1279 #
1281 1280 # self.__nSamples = nSamples
1282 1281 # self.__nChannels = nChannels
1283 1282 # self.__bufferList = bufferList
1284 1283 #
1285 1284 # def run(self, dataOut, channel = 0):
1286 1285 #
1287 1286 # if not self.isConfig:
1288 1287 # nSamples = dataOut.nHeights
1289 1288 # nChannels = dataOut.nChannels
1290 1289 # self.setup(nSamples, nChannels)
1291 1290 # self.isConfig = True
1292 1291 #
1293 1292 # #Append new data to internal buffer
1294 1293 # for thisChannel in range(self.__nChannels):
1295 1294 # bufferByChannel = self.__bufferList[thisChannel]
1296 1295 # bufferByChannel.extend(dataOut.data[thisChannel])
1297 1296 #
1298 1297 # if self.__pulseFound:
1299 1298 # self.__startIndex -= self.__nSamples
1300 1299 #
1301 1300 # #Finding Tx Pulse
1302 1301 # if not self.__pulseFound:
1303 1302 # indexFound = self.__findTxPulse(dataOut, channel)
1304 1303 #
1305 1304 # if indexFound == None:
1306 1305 # dataOut.flagNoData = True
1307 1306 # return
1308 1307 #
1309 1308 # self.__arrayBuffer = numpy.zeros((self.__nChannels, self.__newNSamples), dtype = numpy.complex)
1310 1309 # self.__pulseFound = True
1311 1310 # self.__startIndex = indexFound
1312 1311 #
1313 1312 # #If pulse was found ...
1314 1313 # for thisChannel in range(self.__nChannels):
1315 1314 # bufferByChannel = self.__bufferList[thisChannel]
1316 1315 # #print self.__startIndex
1317 1316 # x = numpy.array(bufferByChannel)
1318 1317 # self.__arrayBuffer[thisChannel] = x[self.__startIndex:self.__startIndex+self.__newNSamples]
1319 1318 #
1320 1319 # deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
1321 1320 # dataOut.heightList = numpy.arange(self.__newNSamples)*deltaHeight
1322 1321 # # dataOut.ippSeconds = (self.__newNSamples / deltaHeight)/1e6
1323 1322 #
1324 1323 # dataOut.data = self.__arrayBuffer
1325 1324 #
1326 1325 # self.__startIndex += self.__newNSamples
1327 1326 #
1328 1327 # return
1 NO CONTENT: modified file
1 NO CONTENT: modified file
General Comments 0
You need to be logged in to leave comments. Login now