##// END OF EJS Templates
Multiprocessing for voltage (all operations) working
George Yong -
r1173:0148df60175f
parent child
Show More
@@ -1,1332 +1,1333
1 1 '''
2 2 Updated on January , 2018, for multiprocessing purposes
3 3 Author: Sergio Cortez
4 4 Created on September , 2012
5 5 '''
6 6 from platform import python_version
7 7 import sys
8 8 import ast
9 9 import datetime
10 10 import traceback
11 11 import math
12 12 import time
13 13 import zmq
14 14 from multiprocessing import Process, cpu_count
15 15
16 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 17 from xml.dom import minidom
18 18
19 19
20 20 from schainpy.admin import Alarm, SchainWarning
21 21
22 22 ### Temporary imports!!!
23 23 # from schainpy.model import *
24 24 from schainpy.model.io import *
25 25 from schainpy.model.graphics import *
26 26 from schainpy.model.proc.jroproc_base import *
27 27 from schainpy.model.proc.bltrproc_parameters import *
28 28 from schainpy.model.proc.jroproc_spectra import *
29 from schainpy.model.proc.jroproc_voltage import *
29 30 from schainpy.model.proc.jroproc_parameters import *
30 31 from schainpy.model.utils.jroutils_publish import *
31 32 from schainpy.utils import log
32 33 ###
33 34
34 35 DTYPES = {
35 36 'Voltage': '.r',
36 37 'Spectra': '.pdata'
37 38 }
38 39
39 40
40 41 def MPProject(project, n=cpu_count()):
41 42 '''
42 43 Project wrapper to run schain in n processes
43 44 '''
44 45
45 46 rconf = project.getReadUnitObj()
46 47 op = rconf.getOperationObj('run')
47 48 dt1 = op.getParameterValue('startDate')
48 49 dt2 = op.getParameterValue('endDate')
49 50 tm1 = op.getParameterValue('startTime')
50 51 tm2 = op.getParameterValue('endTime')
51 52 days = (dt2 - dt1).days
52 53
53 54 for day in range(days + 1):
54 55 skip = 0
55 56 cursor = 0
56 57 processes = []
57 58 dt = dt1 + datetime.timedelta(day)
58 59 dt_str = dt.strftime('%Y/%m/%d')
59 60 reader = JRODataReader()
60 61 paths, files = reader.searchFilesOffLine(path=rconf.path,
61 62 startDate=dt,
62 63 endDate=dt,
63 64 startTime=tm1,
64 65 endTime=tm2,
65 66 ext=DTYPES[rconf.datatype])
66 67 nFiles = len(files)
67 68 if nFiles == 0:
68 69 continue
69 70 skip = int(math.ceil(nFiles / n))
70 71 while nFiles > cursor * skip:
71 72 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
72 73 skip=skip)
73 74 p = project.clone()
74 75 p.start()
75 76 processes.append(p)
76 77 cursor += 1
77 78
78 79 def beforeExit(exctype, value, trace):
79 80 for process in processes:
80 81 process.terminate()
81 82 process.join()
82 83 print(traceback.print_tb(trace))
83 84
84 85 sys.excepthook = beforeExit
85 86
86 87 for process in processes:
87 88 process.join()
88 89 process.terminate()
89 90
90 91 time.sleep(3)
91 92
92 93 class ParameterConf():
93 94
94 95 id = None
95 96 name = None
96 97 value = None
97 98 format = None
98 99
99 100 __formated_value = None
100 101
101 102 ELEMENTNAME = 'Parameter'
102 103
103 104 def __init__(self):
104 105
105 106 self.format = 'str'
106 107
107 108 def getElementName(self):
108 109
109 110 return self.ELEMENTNAME
110 111
111 112 def getValue(self):
112 113
113 114 value = self.value
114 115 format = self.format
115 116
116 117 if self.__formated_value != None:
117 118
118 119 return self.__formated_value
119 120
120 121 if format == 'obj':
121 122 return value
122 123
123 124 if format == 'str':
124 125 self.__formated_value = str(value)
125 126 return self.__formated_value
126 127
127 128 if value == '':
128 129 raise ValueError('%s: This parameter value is empty' % self.name)
129 130
130 131 if format == 'list':
131 132 strList = value.split(',')
132 133
133 134 self.__formated_value = strList
134 135
135 136 return self.__formated_value
136 137
137 138 if format == 'intlist':
138 139 '''
139 140 Example:
140 141 value = (0,1,2)
141 142 '''
142 143
143 144 new_value = ast.literal_eval(value)
144 145
145 146 if type(new_value) not in (tuple, list):
146 147 new_value = [int(new_value)]
147 148
148 149 self.__formated_value = new_value
149 150
150 151 return self.__formated_value
151 152
152 153 if format == 'floatlist':
153 154 '''
154 155 Example:
155 156 value = (0.5, 1.4, 2.7)
156 157 '''
157 158
158 159 new_value = ast.literal_eval(value)
159 160
160 161 if type(new_value) not in (tuple, list):
161 162 new_value = [float(new_value)]
162 163
163 164 self.__formated_value = new_value
164 165
165 166 return self.__formated_value
166 167
167 168 if format == 'date':
168 169 strList = value.split('/')
169 170 intList = [int(x) for x in strList]
170 171 date = datetime.date(intList[0], intList[1], intList[2])
171 172
172 173 self.__formated_value = date
173 174
174 175 return self.__formated_value
175 176
176 177 if format == 'time':
177 178 strList = value.split(':')
178 179 intList = [int(x) for x in strList]
179 180 time = datetime.time(intList[0], intList[1], intList[2])
180 181
181 182 self.__formated_value = time
182 183
183 184 return self.__formated_value
184 185
185 186 if format == 'pairslist':
186 187 '''
187 188 Example:
188 189 value = (0,1),(1,2)
189 190 '''
190 191
191 192 new_value = ast.literal_eval(value)
192 193
193 194 if type(new_value) not in (tuple, list):
194 195 raise ValueError('%s has to be a tuple or list of pairs' % value)
195 196
196 197 if type(new_value[0]) not in (tuple, list):
197 198 if len(new_value) != 2:
198 199 raise ValueError('%s has to be a tuple or list of pairs' % value)
199 200 new_value = [new_value]
200 201
201 202 for thisPair in new_value:
202 203 if len(thisPair) != 2:
203 204 raise ValueError('%s has to be a tuple or list of pairs' % value)
204 205
205 206 self.__formated_value = new_value
206 207
207 208 return self.__formated_value
208 209
209 210 if format == 'multilist':
210 211 '''
211 212 Example:
212 213 value = (0,1,2),(3,4,5)
213 214 '''
214 215 multiList = ast.literal_eval(value)
215 216
216 217 if type(multiList[0]) == int:
217 218 multiList = ast.literal_eval('(' + value + ')')
218 219
219 220 self.__formated_value = multiList
220 221
221 222 return self.__formated_value
222 223
223 224 if format == 'bool':
224 225 value = int(value)
225 226
226 227 if format == 'int':
227 228 value = float(value)
228 229
229 230 format_func = eval(format)
230 231
231 232 self.__formated_value = format_func(value)
232 233
233 234 return self.__formated_value
234 235
235 236 def updateId(self, new_id):
236 237
237 238 self.id = str(new_id)
238 239
239 240 def setup(self, id, name, value, format='str'):
240 241 self.id = str(id)
241 242 self.name = name
242 243 if format == 'obj':
243 244 self.value = value
244 245 else:
245 246 self.value = str(value)
246 247 self.format = str.lower(format)
247 248
248 249 self.getValue()
249 250
250 251 return 1
251 252
252 253 def update(self, name, value, format='str'):
253 254
254 255 self.name = name
255 256 self.value = str(value)
256 257 self.format = format
257 258
258 259 def makeXml(self, opElement):
259 260 if self.name not in ('queue',):
260 261 parmElement = SubElement(opElement, self.ELEMENTNAME)
261 262 parmElement.set('id', str(self.id))
262 263 parmElement.set('name', self.name)
263 264 parmElement.set('value', self.value)
264 265 parmElement.set('format', self.format)
265 266
266 267 def readXml(self, parmElement):
267 268
268 269 self.id = parmElement.get('id')
269 270 self.name = parmElement.get('name')
270 271 self.value = parmElement.get('value')
271 272 self.format = str.lower(parmElement.get('format'))
272 273
273 274 # Compatible with old signal chain version
274 275 if self.format == 'int' and self.name == 'idfigure':
275 276 self.name = 'id'
276 277
277 278 def printattr(self):
278 279
279 280 print('Parameter[%s]: name = %s, value = %s, format = %s' % (self.id, self.name, self.value, self.format))
280 281
281 282 class OperationConf():
282 283
283 284 id = None
284 285 name = None
285 286 priority = None
286 287 type = None
287 288
288 289 parmConfObjList = []
289 290
290 291 ELEMENTNAME = 'Operation'
291 292
292 293 def __init__(self):
293 294
294 295 self.id = '0'
295 296 self.name = None
296 297 self.priority = None
297 298 self.topic = None
298 299
299 300 def __getNewId(self):
300 301
301 302 return int(self.id) * 10 + len(self.parmConfObjList) + 1
302 303
303 304 def getId(self):
304 305 return self.id
305 306
306 307 def updateId(self, new_id):
307 308
308 309 self.id = str(new_id)
309 310
310 311 n = 1
311 312 for parmObj in self.parmConfObjList:
312 313
313 314 idParm = str(int(new_id) * 10 + n)
314 315 parmObj.updateId(idParm)
315 316
316 317 n += 1
317 318
318 319 def getElementName(self):
319 320
320 321 return self.ELEMENTNAME
321 322
322 323 def getParameterObjList(self):
323 324
324 325 return self.parmConfObjList
325 326
326 327 def getParameterObj(self, parameterName):
327 328
328 329 for parmConfObj in self.parmConfObjList:
329 330
330 331 if parmConfObj.name != parameterName:
331 332 continue
332 333
333 334 return parmConfObj
334 335
335 336 return None
336 337
337 338 def getParameterObjfromValue(self, parameterValue):
338 339
339 340 for parmConfObj in self.parmConfObjList:
340 341
341 342 if parmConfObj.getValue() != parameterValue:
342 343 continue
343 344
344 345 return parmConfObj.getValue()
345 346
346 347 return None
347 348
348 349 def getParameterValue(self, parameterName):
349 350
350 351 parameterObj = self.getParameterObj(parameterName)
351 352
352 353 # if not parameterObj:
353 354 # return None
354 355
355 356 value = parameterObj.getValue()
356 357
357 358 return value
358 359
359 360 def getKwargs(self):
360 361
361 362 kwargs = {}
362 363
363 364 for parmConfObj in self.parmConfObjList:
364 365 if self.name == 'run' and parmConfObj.name == 'datatype':
365 366 continue
366 367
367 368 kwargs[parmConfObj.name] = parmConfObj.getValue()
368 369
369 370 return kwargs
370 371
371 372 def setup(self, id, name, priority, type):
372 373
373 374 self.id = str(id)
374 375 self.name = name
375 376 self.type = type
376 377 self.priority = priority
377 378 self.parmConfObjList = []
378 379
379 380 def removeParameters(self):
380 381
381 382 for obj in self.parmConfObjList:
382 383 del obj
383 384
384 385 self.parmConfObjList = []
385 386
386 387 def addParameter(self, name, value, format='str'):
387 388
388 389 if value is None:
389 390 return None
390 391 id = self.__getNewId()
391 392
392 393 parmConfObj = ParameterConf()
393 394 if not parmConfObj.setup(id, name, value, format):
394 395 return None
395 396
396 397 self.parmConfObjList.append(parmConfObj)
397 398
398 399 return parmConfObj
399 400
400 401 def changeParameter(self, name, value, format='str'):
401 402
402 403 parmConfObj = self.getParameterObj(name)
403 404 parmConfObj.update(name, value, format)
404 405
405 406 return parmConfObj
406 407
407 408 def makeXml(self, procUnitElement):
408 409
409 410 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
410 411 opElement.set('id', str(self.id))
411 412 opElement.set('name', self.name)
412 413 opElement.set('type', self.type)
413 414 opElement.set('priority', str(self.priority))
414 415
415 416 for parmConfObj in self.parmConfObjList:
416 417 parmConfObj.makeXml(opElement)
417 418
418 419 def readXml(self, opElement):
419 420
420 421 self.id = opElement.get('id')
421 422 self.name = opElement.get('name')
422 423 self.type = opElement.get('type')
423 424 self.priority = opElement.get('priority')
424 425
425 426 # Compatible with old signal chain version
426 427 # Use of 'run' method instead 'init'
427 428 if self.type == 'self' and self.name == 'init':
428 429 self.name = 'run'
429 430
430 431 self.parmConfObjList = []
431 432
432 433 parmElementList = opElement.iter(ParameterConf().getElementName())
433 434
434 435 for parmElement in parmElementList:
435 436 parmConfObj = ParameterConf()
436 437 parmConfObj.readXml(parmElement)
437 438
438 439 # Compatible with old signal chain version
439 440 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
440 441 if self.type != 'self' and self.name == 'Plot':
441 442 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
442 443 self.name = parmConfObj.value
443 444 continue
444 445
445 446 self.parmConfObjList.append(parmConfObj)
446 447
447 448 def printattr(self):
448 449
449 450 print('%s[%s]: name = %s, type = %s, priority = %s' % (self.ELEMENTNAME,
450 451 self.id,
451 452 self.name,
452 453 self.type,
453 454 self.priority))
454 455
455 456 for parmConfObj in self.parmConfObjList:
456 457 parmConfObj.printattr()
457 458
458 459 def createObject(self):
459 460
460 461 className = eval(self.name)
461 462 kwargs = self.getKwargs()
462 463
463 464 opObj = className(self.id, **kwargs)
464 465
465 466 opObj.start()
466 467
467 468 print(' Operation created')
468 469
469 470 return opObj
470 471
471 472 class ProcUnitConf():
472 473
473 474 id = None
474 475 name = None
475 476 datatype = None
476 477 inputId = None
477 478 parentId = None
478 479
479 480 opConfObjList = []
480 481
481 482 procUnitObj = None
482 483 opObjList = []
483 484
484 485 ELEMENTNAME = 'ProcUnit'
485 486
486 487 def __init__(self):
487 488
488 489 self.id = None
489 490 self.datatype = None
490 491 self.name = None
491 492 self.inputId = None
492 493
493 494 self.opConfObjList = []
494 495
495 496 self.procUnitObj = None
496 497 self.opObjDict = {}
497 498
498 499 def __getPriority(self):
499 500
500 501 return len(self.opConfObjList) + 1
501 502
502 503 def __getNewId(self):
503 504
504 505 return int(self.id) * 10 + len(self.opConfObjList) + 1
505 506
506 507 def getElementName(self):
507 508
508 509 return self.ELEMENTNAME
509 510
510 511 def getId(self):
511 512
512 513 return self.id
513 514
514 515 def updateId(self, new_id, parentId=parentId):
515 516 '''
516 517 new_id = int(parentId) * 10 + (int(self.id) % 10)
517 518 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
518 519
519 520 # If this proc unit has not inputs
520 521 #if self.inputId == '0':
521 522 #new_inputId = 0
522 523
523 524 n = 1
524 525 for opConfObj in self.opConfObjList:
525 526
526 527 idOp = str(int(new_id) * 10 + n)
527 528 opConfObj.updateId(idOp)
528 529
529 530 n += 1
530 531
531 532 self.parentId = str(parentId)
532 533 self.id = str(new_id)
533 534 #self.inputId = str(new_inputId)
534 535 '''
535 536 n = 1
536 537 def getInputId(self):
537 538
538 539 return self.inputId
539 540
540 541 def getOperationObjList(self):
541 542
542 543 return self.opConfObjList
543 544
544 545 def getOperationObj(self, name=None):
545 546
546 547 for opConfObj in self.opConfObjList:
547 548
548 549 if opConfObj.name != name:
549 550 continue
550 551
551 552 return opConfObj
552 553
553 554 return None
554 555
555 556 def getOpObjfromParamValue(self, value=None):
556 557
557 558 for opConfObj in self.opConfObjList:
558 559 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
559 560 continue
560 561 return opConfObj
561 562 return None
562 563
563 564 def getProcUnitObj(self):
564 565
565 566 return self.procUnitObj
566 567
567 568 def setup(self, id, name, datatype, inputId, parentId=None):
568 569 '''
569 570 id sera el topico a publicar
570 571 inputId sera el topico a subscribirse
571 572 '''
572 573
573 574 # Compatible with old signal chain version
574 575 if datatype == None and name == None:
575 576 raise ValueError('datatype or name should be defined')
576 577
577 578 #Definir una condicion para inputId cuando sea 0
578 579
579 580 if name == None:
580 581 if 'Proc' in datatype:
581 582 name = datatype
582 583 else:
583 584 name = '%sProc' % (datatype)
584 585
585 586 if datatype == None:
586 587 datatype = name.replace('Proc', '')
587 588
588 589 self.id = str(id)
589 590 self.name = name
590 591 self.datatype = datatype
591 592 self.inputId = inputId
592 593 self.parentId = parentId
593 594 self.opConfObjList = []
594 595
595 596 self.addOperation(name='run', optype='self')
596 597
597 598 def removeOperations(self):
598 599
599 600 for obj in self.opConfObjList:
600 601 del obj
601 602
602 603 self.opConfObjList = []
603 604 self.addOperation(name='run')
604 605
605 606 def addParameter(self, **kwargs):
606 607 '''
607 608 Add parameters to 'run' operation
608 609 '''
609 610 opObj = self.opConfObjList[0]
610 611
611 612 opObj.addParameter(**kwargs)
612 613
613 614 return opObj
614 615
615 616 def addOperation(self, name, optype = 'self'):
616 617 '''
617 618 Actualizacion - > proceso comunicacion
618 619 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
619 620 definir el tipoc de socket o comunicacion ipc++
620 621
621 622 '''
622 623
623 624 id = self.__getNewId()
624 625 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
625 626
626 627 opConfObj = OperationConf()
627 628 opConfObj.setup(id, name=name, priority=priority, type=optype)
628 629
629 630 self.opConfObjList.append(opConfObj)
630 631
631 632 return opConfObj
632 633
633 634 def makeXml(self, projectElement):
634 635
635 636 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
636 637 procUnitElement.set('id', str(self.id))
637 638 procUnitElement.set('name', self.name)
638 639 procUnitElement.set('datatype', self.datatype)
639 640 procUnitElement.set('inputId', str(self.inputId))
640 641
641 642 for opConfObj in self.opConfObjList:
642 643 opConfObj.makeXml(procUnitElement)
643 644
644 645 def readXml(self, upElement):
645 646
646 647 self.id = upElement.get('id')
647 648 self.name = upElement.get('name')
648 649 self.datatype = upElement.get('datatype')
649 650 self.inputId = upElement.get('inputId')
650 651
651 652 if self.ELEMENTNAME == 'ReadUnit':
652 653 self.datatype = self.datatype.replace('Reader', '')
653 654
654 655 if self.ELEMENTNAME == 'ProcUnit':
655 656 self.datatype = self.datatype.replace('Proc', '')
656 657
657 658 if self.inputId == 'None':
658 659 self.inputId = '0'
659 660
660 661 self.opConfObjList = []
661 662
662 663 opElementList = upElement.iter(OperationConf().getElementName())
663 664
664 665 for opElement in opElementList:
665 666 opConfObj = OperationConf()
666 667 opConfObj.readXml(opElement)
667 668 self.opConfObjList.append(opConfObj)
668 669
669 670 def printattr(self):
670 671
671 672 print('%s[%s]: name = %s, datatype = %s, inputId = %s' % (self.ELEMENTNAME,
672 673 self.id,
673 674 self.name,
674 675 self.datatype,
675 676 self.inputId))
676 677
677 678 for opConfObj in self.opConfObjList:
678 679 opConfObj.printattr()
679 680
680 681 def getKwargs(self):
681 682
682 683 opObj = self.opConfObjList[0]
683 684 kwargs = opObj.getKwargs()
684 685
685 686 return kwargs
686 687
687 688 def createObjects(self, dictUnits):
688 689 '''
689 690 Instancia de unidades de procesamiento.
690 691
691 692 '''
692 693 className = eval(self.name)
693 694 kwargs = self.getKwargs()
694 695 procUnitObj = className(self.id, self.inputId, dictUnits, **kwargs) # necesitan saber su id y su entrada por fines de ipc
695 696
696 697
697 698 for opConfObj in self.opConfObjList:
698 699
699 700 if opConfObj.type == 'self' and self.name == 'run':
700 701 continue
701 702 elif opConfObj.type == 'self':
702 703 procUnitObj.addOperationKwargs(
703 704 opConfObj.id, **opConfObj.getKwargs())
704 705 continue
705 706 print("Creating operation process:", opConfObj.name, "for", self.name)
706 707 opObj = opConfObj.createObject()
707 708
708 709
709 710 #self.opObjDict[opConfObj.id] = opObj.name
710 711
711 712 procUnitObj.addOperation(opConfObj.name, opConfObj.id)
712 713
713 714 procUnitObj.start()
714 715
715 716 self.procUnitObj = procUnitObj
716 717
717 718
718 719 return procUnitObj
719 720
720 721 def run(self):
721 722
722 723 is_ok = True
723 724 """
724 725 for opConfObj in self.opConfObjList:
725 726
726 727 kwargs = {}
727 728 for parmConfObj in opConfObj.getParameterObjList():
728 729 if opConfObj.name == 'run' and parmConfObj.name == 'datatype':
729 730 continue
730 731
731 732 kwargs[parmConfObj.name] = parmConfObj.getValue()
732 733
733 734 sts = self.procUnitObj.call(opType=opConfObj.type,
734 735 opName=opConfObj.name,
735 736 opId=opConfObj.id)
736 737
737 738 is_ok = is_ok or sts
738 739
739 740 """
740 741 return is_ok
741 742
742 743
743 744 def close(self):
744 745
745 746 for opConfObj in self.opConfObjList:
746 747 if opConfObj.type == 'self':
747 748 continue
748 749
749 750 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
750 751 opObj.close()
751 752
752 753 self.procUnitObj.close()
753 754
754 755 return
755 756
756 757
757 758 class ReadUnitConf(ProcUnitConf):
758 759
759 760 path = None
760 761 startDate = None
761 762 endDate = None
762 763 startTime = None
763 764 endTime = None
764 765
765 766 ELEMENTNAME = 'ReadUnit'
766 767
767 768 def __init__(self):
768 769
769 770 self.id = None
770 771 self.datatype = None
771 772 self.name = None
772 773 self.inputId = None
773 774
774 775 self.parentId = None
775 776
776 777 self.opConfObjList = []
777 778 self.opObjList = []
778 779
779 780 def getElementName(self):
780 781
781 782 return self.ELEMENTNAME
782 783
783 784 def setup(self, id, name, datatype, path='', startDate='', endDate='',
784 785 startTime='', endTime='', parentId=None, server=None, **kwargs):
785 786
786 787
787 788 '''
788 789 *****el id del proceso sera el Topico
789 790
790 791 Adicion de {topic}, si no esta presente -> error
791 792 kwargs deben ser trasmitidos en la instanciacion
792 793
793 794 '''
794 795
795 796 # Compatible with old signal chain version
796 797 if datatype == None and name == None:
797 798 raise ValueError('datatype or name should be defined')
798 799 if name == None:
799 800 if 'Reader' in datatype:
800 801 name = datatype
801 802 datatype = name.replace('Reader','')
802 803 else:
803 804 name = '{}Reader'.format(datatype)
804 805 if datatype == None:
805 806 if 'Reader' in name:
806 807 datatype = name.replace('Reader','')
807 808 else:
808 809 datatype = name
809 810 name = '{}Reader'.format(name)
810 811
811 812 self.id = id
812 813 self.name = name
813 814 self.datatype = datatype
814 815 if path != '':
815 816 self.path = os.path.abspath(path)
816 817 self.startDate = startDate
817 818 self.endDate = endDate
818 819 self.startTime = startTime
819 820 self.endTime = endTime
820 821 self.inputId = '0'
821 822 self.parentId = parentId
822 823 self.server = server
823 824 self.addRunOperation(**kwargs)
824 825
825 826 def update(self, **kwargs):
826 827
827 828 if 'datatype' in kwargs:
828 829 datatype = kwargs.pop('datatype')
829 830 if 'Reader' in datatype:
830 831 self.name = datatype
831 832 else:
832 833 self.name = '%sReader' % (datatype)
833 834 self.datatype = self.name.replace('Reader', '')
834 835
835 836 attrs = ('path', 'startDate', 'endDate',
836 837 'startTime', 'endTime', 'parentId')
837 838
838 839 for attr in attrs:
839 840 if attr in kwargs:
840 841 setattr(self, attr, kwargs.pop(attr))
841 842
842 843 self.inputId = '0'
843 844 self.updateRunOperation(**kwargs)
844 845
845 846 def removeOperations(self):
846 847
847 848 for obj in self.opConfObjList:
848 849 del obj
849 850
850 851 self.opConfObjList = []
851 852
852 853 def addRunOperation(self, **kwargs):
853 854
854 855 opObj = self.addOperation(name='run', optype='self')
855 856
856 857 if self.server is None:
857 858 opObj.addParameter(
858 859 name='datatype', value=self.datatype, format='str')
859 860 opObj.addParameter(name='path', value=self.path, format='str')
860 861 opObj.addParameter(
861 862 name='startDate', value=self.startDate, format='date')
862 863 opObj.addParameter(
863 864 name='endDate', value=self.endDate, format='date')
864 865 opObj.addParameter(
865 866 name='startTime', value=self.startTime, format='time')
866 867 opObj.addParameter(
867 868 name='endTime', value=self.endTime, format='time')
868 869
869 870 for key, value in list(kwargs.items()):
870 871 opObj.addParameter(name=key, value=value,
871 872 format=type(value).__name__)
872 873 else:
873 874 opObj.addParameter(name='server', value=self.server, format='str')
874 875
875 876 return opObj
876 877
877 878 def updateRunOperation(self, **kwargs):
878 879
879 880 opObj = self.getOperationObj(name='run')
880 881 opObj.removeParameters()
881 882
882 883 opObj.addParameter(name='datatype', value=self.datatype, format='str')
883 884 opObj.addParameter(name='path', value=self.path, format='str')
884 885 opObj.addParameter(
885 886 name='startDate', value=self.startDate, format='date')
886 887 opObj.addParameter(name='endDate', value=self.endDate, format='date')
887 888 opObj.addParameter(
888 889 name='startTime', value=self.startTime, format='time')
889 890 opObj.addParameter(name='endTime', value=self.endTime, format='time')
890 891
891 892 for key, value in list(kwargs.items()):
892 893 opObj.addParameter(name=key, value=value,
893 894 format=type(value).__name__)
894 895
895 896 return opObj
896 897
897 898 def readXml(self, upElement):
898 899
899 900 self.id = upElement.get('id')
900 901 self.name = upElement.get('name')
901 902 self.datatype = upElement.get('datatype')
902 903 self.inputId = upElement.get('inputId')
903 904
904 905 if self.ELEMENTNAME == 'ReadUnit':
905 906 self.datatype = self.datatype.replace('Reader', '')
906 907
907 908 if self.inputId == 'None':
908 909 self.inputId = '0'
909 910
910 911 self.opConfObjList = []
911 912
912 913 opElementList = upElement.iter(OperationConf().getElementName())
913 914
914 915 for opElement in opElementList:
915 916 opConfObj = OperationConf()
916 917 opConfObj.readXml(opElement)
917 918 self.opConfObjList.append(opConfObj)
918 919
919 920 if opConfObj.name == 'run':
920 921 self.path = opConfObj.getParameterValue('path')
921 922 self.startDate = opConfObj.getParameterValue('startDate')
922 923 self.endDate = opConfObj.getParameterValue('endDate')
923 924 self.startTime = opConfObj.getParameterValue('startTime')
924 925 self.endTime = opConfObj.getParameterValue('endTime')
925 926
926 927
927 928 class Project(Process):
928 929
929 930 id = None
930 931 description = None
931 932 filename = None
932 933
933 934 procUnitConfObjDict = None
934 935
935 936 ELEMENTNAME = 'Project'
936 937
937 938
938 939
939 940 def __init__(self):
940 941
941 942 Process.__init__(self)
942 943 self.id = None
943 944 self.description = None
944 945 self.email = None
945 946 self.alarm = None
946 947 self.procUnitConfObjDict = {}
947 948
948 949 def __getNewId(self):
949 950
950 951 idList = list(self.procUnitConfObjDict.keys())
951 952
952 953 id = int(self.id) * 10
953 954
954 955 while True:
955 956 id += 1
956 957
957 958 if str(id) in idList:
958 959 continue
959 960
960 961 break
961 962
962 963 return str(id)
963 964
964 965 def getElementName(self):
965 966
966 967 return self.ELEMENTNAME
967 968
968 969 def getId(self):
969 970
970 971 return self.id
971 972
972 973 def updateId(self, new_id):
973 974
974 975 self.id = str(new_id)
975 976
976 977 keyList = list(self.procUnitConfObjDict.keys())
977 978 keyList.sort()
978 979
979 980 n = 1
980 981 newProcUnitConfObjDict = {}
981 982
982 983 for procKey in keyList:
983 984
984 985 procUnitConfObj = self.procUnitConfObjDict[procKey]
985 986 idProcUnit = str(int(self.id) * 10 + n)
986 987 procUnitConfObj.updateId(idProcUnit, parentId=self.id)
987 988 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
988 989 n += 1
989 990
990 991 self.procUnitConfObjDict = newProcUnitConfObjDict
991 992
992 993 def setup(self, id, name='', description='', email=None, alarm=[]):
993 994
994 995 print(' ')
995 996 print('*' * 60)
996 997 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
997 998 print('*' * 60)
998 999 print("* Python " + python_version() + " *")
999 1000 print('*' * 19)
1000 1001 print(' ')
1001 1002 self.id = str(id)
1002 1003 self.description = description
1003 1004 self.email = email
1004 1005 self.alarm = alarm
1005 1006
1006 1007 def update(self, **kwargs):
1007 1008
1008 1009 for key, value in list(kwargs.items()):
1009 1010 setattr(self, key, value)
1010 1011
1011 1012 def clone(self):
1012 1013
1013 1014 p = Project()
1014 1015 p.procUnitConfObjDict = self.procUnitConfObjDict
1015 1016 return p
1016 1017
1017 1018 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
1018 1019
1019 1020 '''
1020 1021 Actualizacion:
1021 1022 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
1022 1023
1023 1024 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
1024 1025
1025 1026 '''
1026 1027
1027 1028 if id is None:
1028 1029 idReadUnit = self.__getNewId()
1029 1030 else:
1030 1031 idReadUnit = str(id)
1031 1032
1032 1033 readUnitConfObj = ReadUnitConf()
1033 1034 readUnitConfObj.setup(idReadUnit, name, datatype,
1034 1035 parentId=self.id, **kwargs)
1035 1036
1036 1037 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1037 1038
1038 1039 return readUnitConfObj
1039 1040
1040 1041 def addProcUnit(self, inputId='0', datatype=None, name=None):
1041 1042
1042 1043 '''
1043 1044 Actualizacion:
1044 1045 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
1045 1046 Deberia reemplazar a "inputId"
1046 1047
1047 1048 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
1048 1049 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
1049 1050
1050 1051 '''
1051 1052
1052 1053 idProcUnit = self.__getNewId() #Topico para subscripcion
1053 1054
1054 1055 procUnitConfObj = ProcUnitConf()
1055 1056 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, #topic_read, topic_write,
1056 1057 parentId=self.id)
1057 1058
1058 1059 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1059 1060
1060 1061 return procUnitConfObj
1061 1062
1062 1063 def removeProcUnit(self, id):
1063 1064
1064 1065 if id in list(self.procUnitConfObjDict.keys()):
1065 1066 self.procUnitConfObjDict.pop(id)
1066 1067
1067 1068 def getReadUnitId(self):
1068 1069
1069 1070 readUnitConfObj = self.getReadUnitObj()
1070 1071
1071 1072 return readUnitConfObj.id
1072 1073
1073 1074 def getReadUnitObj(self):
1074 1075
1075 1076 for obj in list(self.procUnitConfObjDict.values()):
1076 1077 if obj.getElementName() == 'ReadUnit':
1077 1078 return obj
1078 1079
1079 1080 return None
1080 1081
1081 1082 def getProcUnitObj(self, id=None, name=None):
1082 1083
1083 1084 if id != None:
1084 1085 return self.procUnitConfObjDict[id]
1085 1086
1086 1087 if name != None:
1087 1088 return self.getProcUnitObjByName(name)
1088 1089
1089 1090 return None
1090 1091
1091 1092 def getProcUnitObjByName(self, name):
1092 1093
1093 1094 for obj in list(self.procUnitConfObjDict.values()):
1094 1095 if obj.name == name:
1095 1096 return obj
1096 1097
1097 1098 return None
1098 1099
1099 1100 def procUnitItems(self):
1100 1101
1101 1102 return list(self.procUnitConfObjDict.items())
1102 1103
1103 1104 def makeXml(self):
1104 1105
1105 1106 projectElement = Element('Project')
1106 1107 projectElement.set('id', str(self.id))
1107 1108 projectElement.set('name', self.name)
1108 1109 projectElement.set('description', self.description)
1109 1110
1110 1111 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1111 1112 procUnitConfObj.makeXml(projectElement)
1112 1113
1113 1114 self.projectElement = projectElement
1114 1115
1115 1116 def writeXml(self, filename=None):
1116 1117
1117 1118 if filename == None:
1118 1119 if self.filename:
1119 1120 filename = self.filename
1120 1121 else:
1121 1122 filename = 'schain.xml'
1122 1123
1123 1124 if not filename:
1124 1125 print('filename has not been defined. Use setFilename(filename) for do it.')
1125 1126 return 0
1126 1127
1127 1128 abs_file = os.path.abspath(filename)
1128 1129
1129 1130 if not os.access(os.path.dirname(abs_file), os.W_OK):
1130 1131 print('No write permission on %s' % os.path.dirname(abs_file))
1131 1132 return 0
1132 1133
1133 1134 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1134 1135 print('File %s already exists and it could not be overwriten' % abs_file)
1135 1136 return 0
1136 1137
1137 1138 self.makeXml()
1138 1139
1139 1140 ElementTree(self.projectElement).write(abs_file, method='xml')
1140 1141
1141 1142 self.filename = abs_file
1142 1143
1143 1144 return 1
1144 1145
1145 1146 def readXml(self, filename=None):
1146 1147
1147 1148 if not filename:
1148 1149 print('filename is not defined')
1149 1150 return 0
1150 1151
1151 1152 abs_file = os.path.abspath(filename)
1152 1153
1153 1154 if not os.path.isfile(abs_file):
1154 1155 print('%s file does not exist' % abs_file)
1155 1156 return 0
1156 1157
1157 1158 self.projectElement = None
1158 1159 self.procUnitConfObjDict = {}
1159 1160
1160 1161 try:
1161 1162 self.projectElement = ElementTree().parse(abs_file)
1162 1163 except:
1163 1164 print('Error reading %s, verify file format' % filename)
1164 1165 return 0
1165 1166
1166 1167 self.project = self.projectElement.tag
1167 1168
1168 1169 self.id = self.projectElement.get('id')
1169 1170 self.name = self.projectElement.get('name')
1170 1171 self.description = self.projectElement.get('description')
1171 1172
1172 1173 readUnitElementList = self.projectElement.iter(
1173 1174 ReadUnitConf().getElementName())
1174 1175
1175 1176 for readUnitElement in readUnitElementList:
1176 1177 readUnitConfObj = ReadUnitConf()
1177 1178 readUnitConfObj.readXml(readUnitElement)
1178 1179
1179 1180 if readUnitConfObj.parentId == None:
1180 1181 readUnitConfObj.parentId = self.id
1181 1182
1182 1183 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1183 1184
1184 1185 procUnitElementList = self.projectElement.iter(
1185 1186 ProcUnitConf().getElementName())
1186 1187
1187 1188 for procUnitElement in procUnitElementList:
1188 1189 procUnitConfObj = ProcUnitConf()
1189 1190 procUnitConfObj.readXml(procUnitElement)
1190 1191
1191 1192 if procUnitConfObj.parentId == None:
1192 1193 procUnitConfObj.parentId = self.id
1193 1194
1194 1195 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1195 1196
1196 1197 self.filename = abs_file
1197 1198
1198 1199 return 1
1199 1200
1200 1201 def printattr(self):
1201 1202
1202 1203 print('Project[%s]: name = %s, description = %s' % (self.id,
1203 1204 self.name,
1204 1205 self.description))
1205 1206
1206 1207 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1207 1208 procUnitConfObj.printattr()
1208 1209
1209 1210 def createObjects(self):
1210 1211
1211 1212 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1212 1213 print("Creating process:", procUnitConfObj.name)
1213 1214 procUnitConfObj.createObjects(self.procUnitConfObjDict)
1214 1215
1215 1216
1216 1217 print('All processes were created')
1217 1218
1218 1219 def __handleError(self, procUnitConfObj, modes=None, stdout=True):
1219 1220
1220 1221 import socket
1221 1222
1222 1223 if modes is None:
1223 1224 modes = self.alarm
1224 1225
1225 1226 if not self.alarm:
1226 1227 modes = []
1227 1228
1228 1229 err = traceback.format_exception(sys.exc_info()[0],
1229 1230 sys.exc_info()[1],
1230 1231 sys.exc_info()[2])
1231 1232
1232 1233 log.error('{}'.format(err[-1]), procUnitConfObj.name)
1233 1234
1234 1235 message = ''.join(err)
1235 1236
1236 1237 if stdout:
1237 1238 sys.stderr.write(message)
1238 1239
1239 1240 subject = 'SChain v%s: Error running %s\n' % (
1240 1241 schainpy.__version__, procUnitConfObj.name)
1241 1242
1242 1243 subtitle = '%s: %s\n' % (
1243 1244 procUnitConfObj.getElementName(), procUnitConfObj.name)
1244 1245 subtitle += 'Hostname: %s\n' % socket.gethostbyname(
1245 1246 socket.gethostname())
1246 1247 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1247 1248 subtitle += 'Configuration file: %s\n' % self.filename
1248 1249 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1249 1250
1250 1251 readUnitConfObj = self.getReadUnitObj()
1251 1252 if readUnitConfObj:
1252 1253 subtitle += '\nInput parameters:\n'
1253 1254 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1254 1255 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1255 1256 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1256 1257 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1257 1258 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1258 1259 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1259 1260
1260 1261 a = Alarm(
1261 1262 modes=modes,
1262 1263 email=self.email,
1263 1264 message=message,
1264 1265 subject=subject,
1265 1266 subtitle=subtitle,
1266 1267 filename=self.filename
1267 1268 )
1268 1269
1269 1270 return a
1270 1271
1271 1272 def isPaused(self):
1272 1273 return 0
1273 1274
1274 1275 def isStopped(self):
1275 1276 return 0
1276 1277
1277 1278 def runController(self):
1278 1279 '''
1279 1280 returns 0 when this process has been stopped, 1 otherwise
1280 1281 '''
1281 1282
1282 1283 if self.isPaused():
1283 1284 print('Process suspended')
1284 1285
1285 1286 while True:
1286 1287 time.sleep(0.1)
1287 1288
1288 1289 if not self.isPaused():
1289 1290 break
1290 1291
1291 1292 if self.isStopped():
1292 1293 break
1293 1294
1294 1295 print('Process reinitialized')
1295 1296
1296 1297 if self.isStopped():
1297 1298 print('Process stopped')
1298 1299 return 0
1299 1300
1300 1301 return 1
1301 1302
1302 1303 def setFilename(self, filename):
1303 1304
1304 1305 self.filename = filename
1305 1306
1306 1307 def setProxyCom(self):
1307 1308
1308 1309 ctx = zmq.Context()
1309 1310 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
1310 1311 xsub = ctx.socket(zmq.XSUB)
1311 1312 xsub.bind('ipc:///tmp/socketTmp/a')
1312 1313 xpub = ctx.socket(zmq.XPUB)
1313 1314 xpub.bind('ipc:///tmp/socketTmp/b')
1314 1315
1315 1316 print("Controller Ready: Processes and proxy created")
1316 1317 zmq.proxy(xsub, xpub)
1317 1318
1318 1319
1319 1320
1320 1321 def run(self):
1321 1322
1322 1323 log.success('Starting {}'.format(self.name), tag='')
1323 1324 self.start_time = time.time()
1324 1325 self.createObjects()
1325 1326 self.setProxyCom()
1326 1327
1327 1328 # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo
1328 1329
1329 1330 # Closing every process
1330 1331 log.success('{} finished (time: {}s)'.format(
1331 1332 self.name,
1332 1333 time.time()-self.start_time)) No newline at end of file
@@ -1,225 +1,232
1 1 '''
2 2 Created on Jul 9, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6 import os
7 7 import datetime
8 8 import numpy
9
9 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator #YONG
10 from schainpy.utils import log
10 11 from .figure import Figure
11 12
13
14 @MPDecorator
12 15 class Scope(Figure):
13 16
14 17 isConfig = None
15 18
16 def __init__(self, **kwargs):
17 Figure.__init__(self, **kwargs)
19 def __init__(self):#, **kwargs): #YONG
20 Figure.__init__(self)#, **kwargs)
18 21 self.isConfig = False
19 22 self.WIDTH = 300
20 23 self.HEIGHT = 200
21 24 self.counter_imagwr = 0
22 25
23 26 def getSubplots(self):
24 27
25 28 nrow = self.nplots
26 29 ncol = 3
27 30 return nrow, ncol
28 31
29 32 def setup(self, id, nplots, wintitle, show):
30 33
31 34 self.nplots = nplots
32 35
33 36 self.createFigure(id=id,
34 37 wintitle=wintitle,
35 38 show=show)
36 39
37 40 nrow,ncol = self.getSubplots()
38 41 colspan = 3
39 42 rowspan = 1
40 43
41 44 for i in range(nplots):
42 45 self.addAxes(nrow, ncol, i, 0, colspan, rowspan)
43 46
44 47 def plot_iq(self, x, y, id, channelIndexList, thisDatetime, wintitle, show, xmin, xmax, ymin, ymax):
45 48 yreal = y[channelIndexList,:].real
46 49 yimag = y[channelIndexList,:].imag
47 50
48 51 title = wintitle + " Scope: %s" %(thisDatetime.strftime("%d-%b-%Y %H:%M:%S"))
49 52 xlabel = "Range (Km)"
50 53 ylabel = "Intensity - IQ"
51 54
52 55 if not self.isConfig:
53 56 nplots = len(channelIndexList)
54 57
55 58 self.setup(id=id,
56 59 nplots=nplots,
57 60 wintitle='',
58 61 show=show)
59 62
60 63 if xmin == None: xmin = numpy.nanmin(x)
61 64 if xmax == None: xmax = numpy.nanmax(x)
62 65 if ymin == None: ymin = min(numpy.nanmin(yreal),numpy.nanmin(yimag))
63 66 if ymax == None: ymax = max(numpy.nanmax(yreal),numpy.nanmax(yimag))
64 67
65 68 self.isConfig = True
66 69
67 70 self.setWinTitle(title)
68 71
69 72 for i in range(len(self.axesList)):
70 73 title = "Channel %d" %(i)
71 74 axes = self.axesList[i]
72 75
73 76 axes.pline(x, yreal[i,:],
74 77 xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax,
75 78 xlabel=xlabel, ylabel=ylabel, title=title)
76 79
77 80 axes.addpline(x, yimag[i,:], idline=1, color="red", linestyle="solid", lw=2)
78 81
79 82 def plot_power(self, x, y, id, channelIndexList, thisDatetime, wintitle, show, xmin, xmax, ymin, ymax):
80 83 y = y[channelIndexList,:] * numpy.conjugate(y[channelIndexList,:])
81 84 yreal = y.real
82 85
83 86 title = wintitle + " Scope: %s" %(thisDatetime.strftime("%d-%b-%Y %H:%M:%S"))
84 87 xlabel = "Range (Km)"
85 88 ylabel = "Intensity"
86 89
87 90 if not self.isConfig:
88 91 nplots = len(channelIndexList)
89 92
90 93 self.setup(id=id,
91 94 nplots=nplots,
92 95 wintitle='',
93 96 show=show)
94 97
95 98 if xmin == None: xmin = numpy.nanmin(x)
96 99 if xmax == None: xmax = numpy.nanmax(x)
97 100 if ymin == None: ymin = numpy.nanmin(yreal)
98 101 if ymax == None: ymax = numpy.nanmax(yreal)
99 102
100 103 self.isConfig = True
101 104
102 105 self.setWinTitle(title)
103 106
104 107 for i in range(len(self.axesList)):
105 108 title = "Channel %d" %(i)
106 109 axes = self.axesList[i]
107 110 ychannel = yreal[i,:]
108 111 axes.pline(x, ychannel,
109 112 xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax,
110 113 xlabel=xlabel, ylabel=ylabel, title=title)
111 114
112 115
113 116 def run(self, dataOut, id, wintitle="", channelList=None,
114 117 xmin=None, xmax=None, ymin=None, ymax=None, save=False,
115 118 figpath='./', figfile=None, show=True, wr_period=1,
116 119 ftp=False, server=None, folder=None, username=None, password=None, type='power', **kwargs):
117 120
118 121 """
119 122
120 123 Input:
121 124 dataOut :
122 125 id :
123 126 wintitle :
124 127 channelList :
125 128 xmin : None,
126 129 xmax : None,
127 130 ymin : None,
128 131 ymax : None,
129 132 """
133 if dataOut.flagNoData:
134 return dataOut
130 135
131 136 if channelList == None:
132 137 channelIndexList = dataOut.channelIndexList
133 138 else:
134 139 channelIndexList = []
135 140 for channel in channelList:
136 141 if channel not in dataOut.channelList:
137 142 raise ValueError("Channel %d is not in dataOut.channelList")
138 143 channelIndexList.append(dataOut.channelList.index(channel))
139 144
140 145 thisDatetime = datetime.datetime.utcfromtimestamp(dataOut.getTimeRange()[0])
141 146
142 147 if dataOut.flagDataAsBlock:
143 148
144 149 for i in range(dataOut.nProfiles):
145 150
146 151 wintitle1 = wintitle + " [Profile = %d] " %i
147 152
148 153 if type == "power":
149 154 self.plot_power(dataOut.heightList,
150 155 dataOut.data[:,i,:],
151 156 id,
152 157 channelIndexList,
153 158 thisDatetime,
154 159 wintitle1,
155 160 show,
156 161 xmin,
157 162 xmax,
158 163 ymin,
159 164 ymax)
160 165
161 166 if type == "iq":
162 167 self.plot_iq(dataOut.heightList,
163 168 dataOut.data[:,i,:],
164 169 id,
165 170 channelIndexList,
166 171 thisDatetime,
167 172 wintitle1,
168 173 show,
169 174 xmin,
170 175 xmax,
171 176 ymin,
172 177 ymax)
173 178
174 179 self.draw()
175 180
176 181 str_datetime = thisDatetime.strftime("%Y%m%d_%H%M%S")
177 182 figfile = self.getFilename(name = str_datetime) + "_" + str(i)
178 183
179 184 self.save(figpath=figpath,
180 185 figfile=figfile,
181 186 save=save,
182 187 ftp=ftp,
183 188 wr_period=wr_period,
184 189 thisDatetime=thisDatetime)
185 190
186 191 else:
187 192 wintitle += " [Profile = %d] " %dataOut.profileIndex
188 193
189 194 if type == "power":
190 195 self.plot_power(dataOut.heightList,
191 196 dataOut.data,
192 197 id,
193 198 channelIndexList,
194 199 thisDatetime,
195 200 wintitle,
196 201 show,
197 202 xmin,
198 203 xmax,
199 204 ymin,
200 205 ymax)
201 206
202 207 if type == "iq":
203 208 self.plot_iq(dataOut.heightList,
204 209 dataOut.data,
205 210 id,
206 211 channelIndexList,
207 212 thisDatetime,
208 213 wintitle,
209 214 show,
210 215 xmin,
211 216 xmax,
212 217 ymin,
213 218 ymax)
214 219
215 220 self.draw()
216 221
217 222 str_datetime = thisDatetime.strftime("%Y%m%d_%H%M%S") + "_" + str(dataOut.profileIndex)
218 223 figfile = self.getFilename(name = str_datetime)
219 224
220 225 self.save(figpath=figpath,
221 226 figfile=figfile,
222 227 save=save,
223 228 ftp=ftp,
224 229 wr_period=wr_period,
225 thisDatetime=thisDatetime) No newline at end of file
230 thisDatetime=thisDatetime)
231
232 return dataOut No newline at end of file
@@ -1,764 +1,765
1 1 '''
2 2 Created on Jul 2, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6
7 7 import numpy
8 8
9 9 from .jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
12 12 from schainpy.model.data.jrodata import Voltage
13 13 import zmq
14 14 import tempfile
15 15 from io import StringIO
16 16 # from _sha import blocksize
17 17
18
18 @MPDecorator
19 19 class VoltageReader(JRODataReader, ProcessingUnit):
20 20 """
21 21 Esta clase permite leer datos de voltage desde archivos en formato rawdata (.r). La lectura
22 22 de los datos siempre se realiza por bloques. Los datos leidos (array de 3 dimensiones:
23 23 perfiles*alturas*canales) son almacenados en la variable "buffer".
24 24
25 25 perfiles * alturas * canales
26 26
27 27 Esta clase contiene instancias (objetos) de las clases BasicHeader, SystemHeader,
28 28 RadarControllerHeader y Voltage. Los tres primeros se usan para almacenar informacion de la
29 29 cabecera de datos (metadata), y el cuarto (Voltage) para obtener y almacenar un perfil de
30 30 datos desde el "buffer" cada vez que se ejecute el metodo "getData".
31 31
32 32 Example:
33 33
34 34 dpath = "/home/myuser/data"
35 35
36 36 startTime = datetime.datetime(2010,1,20,0,0,0,0,0,0)
37 37
38 38 endTime = datetime.datetime(2010,1,21,23,59,59,0,0,0)
39 39
40 40 readerObj = VoltageReader()
41 41
42 42 readerObj.setup(dpath, startTime, endTime)
43 43
44 44 while(True):
45 45
46 46 #to get one profile
47 47 profile = readerObj.getData()
48 48
49 49 #print the profile
50 50 print profile
51 51
52 52 #If you want to see all datablock
53 53 print readerObj.datablock
54 54
55 55 if readerObj.flagNoMoreFiles:
56 56 break
57 57
58 58 """
59 59
60 60 ext = ".r"
61 61
62 62 optchar = "D"
63 63 dataOut = None
64 64
65 def __init__(self, **kwargs):
65 def __init__(self):#, **kwargs):
66 66 """
67 67 Inicializador de la clase VoltageReader para la lectura de datos de voltage.
68 68
69 69 Input:
70 70 dataOut : Objeto de la clase Voltage. Este objeto sera utilizado para
71 71 almacenar un perfil de datos cada vez que se haga un requerimiento
72 72 (getData). El perfil sera obtenido a partir del buffer de datos,
73 73 si el buffer esta vacio se hara un nuevo proceso de lectura de un
74 74 bloque de datos.
75 75 Si este parametro no es pasado se creara uno internamente.
76 76
77 77 Variables afectadas:
78 78 self.dataOut
79 79
80 80 Return:
81 81 None
82 82 """
83 83
84 ProcessingUnit.__init__(self, **kwargs)
84 ProcessingUnit.__init__(self)#, **kwargs)
85 85
86 86 self.isConfig = False
87 87
88 88 self.datablock = None
89 89
90 90 self.utc = 0
91 91
92 92 self.ext = ".r"
93 93
94 94 self.optchar = "D"
95 95
96 96 self.basicHeaderObj = BasicHeader(LOCALTIME)
97 97
98 98 self.systemHeaderObj = SystemHeader()
99 99
100 100 self.radarControllerHeaderObj = RadarControllerHeader()
101 101
102 102 self.processingHeaderObj = ProcessingHeader()
103 103
104 104 self.online = 0
105 105
106 106 self.fp = None
107 107
108 108 self.idFile = None
109 109
110 110 self.dtype = None
111 111
112 112 self.fileSizeByHeader = None
113 113
114 114 self.filenameList = []
115 115
116 116 self.filename = None
117 117
118 118 self.fileSize = None
119 119
120 120 self.firstHeaderSize = 0
121 121
122 122 self.basicHeaderSize = 24
123 123
124 124 self.pathList = []
125 125
126 126 self.filenameList = []
127 127
128 128 self.lastUTTime = 0
129 129
130 130 self.maxTimeStep = 30
131 131
132 132 self.flagNoMoreFiles = 0
133 133
134 134 self.set = 0
135 135
136 136 self.path = None
137 137
138 138 self.profileIndex = 2**32 - 1
139 139
140 140 self.delay = 3 # seconds
141 141
142 142 self.nTries = 3 # quantity tries
143 143
144 144 self.nFiles = 3 # number of files for searching
145 145
146 146 self.nReadBlocks = 0
147 147
148 148 self.flagIsNewFile = 1
149 149
150 150 self.__isFirstTimeOnline = 1
151 151
152 152 # self.ippSeconds = 0
153 153
154 154 self.flagDiscontinuousBlock = 0
155 155
156 156 self.flagIsNewBlock = 0
157 157
158 158 self.nTotalBlocks = 0
159 159
160 160 self.blocksize = 0
161 161
162 162 self.dataOut = self.createObjByDefault()
163 163
164 164 self.nTxs = 1
165 165
166 166 self.txIndex = 0
167 167
168 168 def createObjByDefault(self):
169 169
170 170 dataObj = Voltage()
171 171
172 172 return dataObj
173 173
174 174 def __hasNotDataInBuffer(self):
175 175
176 176 if self.profileIndex >= self.processingHeaderObj.profilesPerBlock * self.nTxs:
177 177 return 1
178 178
179 179 return 0
180 180
181 181 def getBlockDimension(self):
182 182 """
183 183 Obtiene la cantidad de puntos a leer por cada bloque de datos
184 184
185 185 Affected:
186 186 self.blocksize
187 187
188 188 Return:
189 189 None
190 190 """
191 191 pts2read = self.processingHeaderObj.profilesPerBlock * \
192 192 self.processingHeaderObj.nHeights * self.systemHeaderObj.nChannels
193 193 self.blocksize = pts2read
194 194
195 195 def readBlock(self):
196 196 """
197 197 readBlock lee el bloque de datos desde la posicion actual del puntero del archivo
198 198 (self.fp) y actualiza todos los parametros relacionados al bloque de datos
199 199 (metadata + data). La data leida es almacenada en el buffer y el contador del buffer
200 200 es seteado a 0
201 201
202 202 Inputs:
203 203 None
204 204
205 205 Return:
206 206 None
207 207
208 208 Affected:
209 209 self.profileIndex
210 210 self.datablock
211 211 self.flagIsNewFile
212 212 self.flagIsNewBlock
213 213 self.nTotalBlocks
214 214
215 215 Exceptions:
216 216 Si un bloque leido no es un bloque valido
217 217 """
218 218
219 219 # if self.server is not None:
220 220 # self.zBlock = self.receiver.recv()
221 221 # self.zHeader = self.zBlock[:24]
222 222 # self.zDataBlock = self.zBlock[24:]
223 223 # junk = numpy.fromstring(self.zDataBlock, numpy.dtype([('real','<i4'),('imag','<i4')]))
224 224 # self.processingHeaderObj.profilesPerBlock = 240
225 225 # self.processingHeaderObj.nHeights = 248
226 226 # self.systemHeaderObj.nChannels
227 227 # else:
228 228 current_pointer_location = self.fp.tell()
229 229 junk = numpy.fromfile(self.fp, self.dtype, self.blocksize)
230 230
231 231 try:
232 232 junk = junk.reshape((self.processingHeaderObj.profilesPerBlock,
233 233 self.processingHeaderObj.nHeights, self.systemHeaderObj.nChannels))
234 234 except:
235 235 # print "The read block (%3d) has not enough data" %self.nReadBlocks
236 236
237 237 if self.waitDataBlock(pointer_location=current_pointer_location):
238 238 junk = numpy.fromfile(self.fp, self.dtype, self.blocksize)
239 239 junk = junk.reshape((self.processingHeaderObj.profilesPerBlock,
240 240 self.processingHeaderObj.nHeights, self.systemHeaderObj.nChannels))
241 241 # return 0
242 242
243 243 # Dimensions : nChannels, nProfiles, nSamples
244 244
245 245 junk = numpy.transpose(junk, (2, 0, 1))
246 246 self.datablock = junk['real'] + junk['imag'] * 1j
247 247
248 248 self.profileIndex = 0
249 249
250 250 self.flagIsNewFile = 0
251 251 self.flagIsNewBlock = 1
252 252
253 253 self.nTotalBlocks += 1
254 254 self.nReadBlocks += 1
255 255
256 256 return 1
257 257
258 258 def getFirstHeader(self):
259 259
260 260 self.getBasicHeader()
261 261
262 262 self.dataOut.processingHeaderObj = self.processingHeaderObj.copy()
263 263
264 264 self.dataOut.systemHeaderObj = self.systemHeaderObj.copy()
265 265
266 266 self.dataOut.radarControllerHeaderObj = self.radarControllerHeaderObj.copy()
267 267
268 268 if self.nTxs > 1:
269 269 self.dataOut.radarControllerHeaderObj.ippSeconds = self.radarControllerHeaderObj.ippSeconds / self.nTxs
270 270 # Time interval and code are propierties of dataOut. Its value depends of radarControllerHeaderObj.
271 271
272 272 # self.dataOut.timeInterval = self.radarControllerHeaderObj.ippSeconds * self.processingHeaderObj.nCohInt
273 273 #
274 274 # if self.radarControllerHeaderObj.code is not None:
275 275 #
276 276 # self.dataOut.nCode = self.radarControllerHeaderObj.nCode
277 277 #
278 278 # self.dataOut.nBaud = self.radarControllerHeaderObj.nBaud
279 279 #
280 280 # self.dataOut.code = self.radarControllerHeaderObj.code
281 281
282 282 self.dataOut.dtype = self.dtype
283 283
284 284 self.dataOut.nProfiles = self.processingHeaderObj.profilesPerBlock
285 285
286 286 self.dataOut.heightList = numpy.arange(
287 287 self.processingHeaderObj.nHeights) * self.processingHeaderObj.deltaHeight + self.processingHeaderObj.firstHeight
288 288
289 289 self.dataOut.channelList = list(range(self.systemHeaderObj.nChannels))
290 290
291 291 self.dataOut.nCohInt = self.processingHeaderObj.nCohInt
292 292
293 293 # asumo q la data no esta decodificada
294 294 self.dataOut.flagDecodeData = self.processingHeaderObj.flag_decode
295 295
296 296 # asumo q la data no esta sin flip
297 297 self.dataOut.flagDeflipData = self.processingHeaderObj.flag_deflip
298 298
299 299 self.dataOut.flagShiftFFT = self.processingHeaderObj.shif_fft
300 300
301 301 def reshapeData(self):
302 302
303 303 if self.nTxs < 0:
304 304 return
305 305
306 306 if self.nTxs == 1:
307 307 return
308 308
309 309 if self.nTxs < 1 and self.processingHeaderObj.profilesPerBlock % (1. / self.nTxs) != 0:
310 310 raise ValueError("1./nTxs (=%f), should be a multiple of nProfiles (=%d)" % (
311 311 1. / self.nTxs, self.processingHeaderObj.profilesPerBlock))
312 312
313 313 if self.nTxs > 1 and self.processingHeaderObj.nHeights % self.nTxs != 0:
314 314 raise ValueError("nTxs (=%d), should be a multiple of nHeights (=%d)" % (
315 315 self.nTxs, self.processingHeaderObj.nHeights))
316 316
317 317 self.datablock = self.datablock.reshape(
318 318 (self.systemHeaderObj.nChannels, self.processingHeaderObj.profilesPerBlock * self.nTxs, self.processingHeaderObj.nHeights / self.nTxs))
319 319
320 320 self.dataOut.nProfiles = self.processingHeaderObj.profilesPerBlock * self.nTxs
321 321 self.dataOut.heightList = numpy.arange(self.processingHeaderObj.nHeights / self.nTxs) * \
322 322 self.processingHeaderObj.deltaHeight + self.processingHeaderObj.firstHeight
323 323 self.dataOut.radarControllerHeaderObj.ippSeconds = self.radarControllerHeaderObj.ippSeconds / self.nTxs
324 324
325 325 return
326 326
327 327 def readFirstHeaderFromServer(self):
328 328
329 329 self.getFirstHeader()
330 330
331 331 self.firstHeaderSize = self.basicHeaderObj.size
332 332
333 333 datatype = int(numpy.log2((self.processingHeaderObj.processFlags &
334 334 PROCFLAG.DATATYPE_MASK)) - numpy.log2(PROCFLAG.DATATYPE_CHAR))
335 335 if datatype == 0:
336 336 datatype_str = numpy.dtype([('real', '<i1'), ('imag', '<i1')])
337 337 elif datatype == 1:
338 338 datatype_str = numpy.dtype([('real', '<i2'), ('imag', '<i2')])
339 339 elif datatype == 2:
340 340 datatype_str = numpy.dtype([('real', '<i4'), ('imag', '<i4')])
341 341 elif datatype == 3:
342 342 datatype_str = numpy.dtype([('real', '<i8'), ('imag', '<i8')])
343 343 elif datatype == 4:
344 344 datatype_str = numpy.dtype([('real', '<f4'), ('imag', '<f4')])
345 345 elif datatype == 5:
346 346 datatype_str = numpy.dtype([('real', '<f8'), ('imag', '<f8')])
347 347 else:
348 348 raise ValueError('Data type was not defined')
349 349
350 350 self.dtype = datatype_str
351 351 #self.ippSeconds = 2 * 1000 * self.radarControllerHeaderObj.ipp / self.c
352 352 self.fileSizeByHeader = self.processingHeaderObj.dataBlocksPerFile * self.processingHeaderObj.blockSize + \
353 353 self.firstHeaderSize + self.basicHeaderSize * \
354 354 (self.processingHeaderObj.dataBlocksPerFile - 1)
355 355 # self.dataOut.channelList = numpy.arange(self.systemHeaderObj.numChannels)
356 356 # self.dataOut.channelIndexList = numpy.arange(self.systemHeaderObj.numChannels)
357 357 self.getBlockDimension()
358 358
359 359 def getFromServer(self):
360 360 self.flagDiscontinuousBlock = 0
361 361 self.profileIndex = 0
362 362 self.flagIsNewBlock = 1
363 363 self.dataOut.flagNoData = False
364 364 self.nTotalBlocks += 1
365 365 self.nReadBlocks += 1
366 366 self.blockPointer = 0
367 367
368 368 block = self.receiver.recv()
369 369
370 370 self.basicHeaderObj.read(block[self.blockPointer:])
371 371 self.blockPointer += self.basicHeaderObj.length
372 372 self.systemHeaderObj.read(block[self.blockPointer:])
373 373 self.blockPointer += self.systemHeaderObj.length
374 374 self.radarControllerHeaderObj.read(block[self.blockPointer:])
375 375 self.blockPointer += self.radarControllerHeaderObj.length
376 376 self.processingHeaderObj.read(block[self.blockPointer:])
377 377 self.blockPointer += self.processingHeaderObj.length
378 378 self.readFirstHeaderFromServer()
379 379
380 380 timestamp = self.basicHeaderObj.get_datatime()
381 381 print('[Reading] - Block {} - {}'.format(self.nTotalBlocks, timestamp))
382 382 current_pointer_location = self.blockPointer
383 383 junk = numpy.fromstring(
384 384 block[self.blockPointer:], self.dtype, self.blocksize)
385 385
386 386 try:
387 387 junk = junk.reshape((self.processingHeaderObj.profilesPerBlock,
388 388 self.processingHeaderObj.nHeights, self.systemHeaderObj.nChannels))
389 389 except:
390 390 # print "The read block (%3d) has not enough data" %self.nReadBlocks
391 391 if self.waitDataBlock(pointer_location=current_pointer_location):
392 392 junk = numpy.fromstring(
393 393 block[self.blockPointer:], self.dtype, self.blocksize)
394 394 junk = junk.reshape((self.processingHeaderObj.profilesPerBlock,
395 395 self.processingHeaderObj.nHeights, self.systemHeaderObj.nChannels))
396 396 # return 0
397 397
398 398 # Dimensions : nChannels, nProfiles, nSamples
399 399
400 400 junk = numpy.transpose(junk, (2, 0, 1))
401 401 self.datablock = junk['real'] + junk['imag'] * 1j
402 402 self.profileIndex = 0
403 403 if self.selBlocksize == None:
404 404 self.selBlocksize = self.dataOut.nProfiles
405 405 if self.selBlocktime != None:
406 406 if self.dataOut.nCohInt is not None:
407 407 nCohInt = self.dataOut.nCohInt
408 408 else:
409 409 nCohInt = 1
410 410 self.selBlocksize = int(self.dataOut.nProfiles * round(self.selBlocktime / (
411 411 nCohInt * self.dataOut.ippSeconds * self.dataOut.nProfiles)))
412 412 self.dataOut.data = self.datablock[:,
413 413 self.profileIndex:self.profileIndex + self.selBlocksize, :]
414 414 datasize = self.dataOut.data.shape[1]
415 415 if datasize < self.selBlocksize:
416 416 buffer = numpy.zeros(
417 417 (self.dataOut.data.shape[0], self.selBlocksize, self.dataOut.data.shape[2]), dtype='complex')
418 418 buffer[:, :datasize, :] = self.dataOut.data
419 419 self.dataOut.data = buffer
420 420 self.profileIndex = blockIndex
421 421
422 422 self.dataOut.flagDataAsBlock = True
423 423 self.flagIsNewBlock = 1
424 424 self.dataOut.realtime = self.online
425 425
426 426 return self.dataOut.data
427 427
428 428 def getData(self):
429 429 """
430 430 getData obtiene una unidad de datos del buffer de lectura, un perfil, y la copia al objeto self.dataOut
431 431 del tipo "Voltage" con todos los parametros asociados a este (metadata). cuando no hay datos
432 432 en el buffer de lectura es necesario hacer una nueva lectura de los bloques de datos usando
433 433 "readNextBlock"
434 434
435 435 Ademas incrementa el contador del buffer "self.profileIndex" en 1.
436 436
437 437 Return:
438 438
439 439 Si el flag self.getByBlock ha sido seteado el bloque completo es copiado a self.dataOut y el self.profileIndex
440 440 es igual al total de perfiles leidos desde el archivo.
441 441
442 442 Si self.getByBlock == False:
443 443
444 444 self.dataOut.data = buffer[:, thisProfile, :]
445 445
446 446 shape = [nChannels, nHeis]
447 447
448 448 Si self.getByBlock == True:
449 449
450 450 self.dataOut.data = buffer[:, :, :]
451 451
452 452 shape = [nChannels, nProfiles, nHeis]
453 453
454 454 Variables afectadas:
455 455 self.dataOut
456 456 self.profileIndex
457 457
458 458 Affected:
459 459 self.dataOut
460 460 self.profileIndex
461 461 self.flagDiscontinuousBlock
462 462 self.flagIsNewBlock
463 463 """
464 464 if self.flagNoMoreFiles:
465 465 self.dataOut.flagNoData = True
466 466 print('Process finished')
467 467 return 0
468 468 self.flagDiscontinuousBlock = 0
469 469 self.flagIsNewBlock = 0
470 470 if self.__hasNotDataInBuffer():
471 471 if not(self.readNextBlock()):
472 472 return 0
473 473
474 474 self.getFirstHeader()
475 475
476 476 self.reshapeData()
477 477 if self.datablock is None:
478 478 self.dataOut.flagNoData = True
479 479 return 0
480 480
481 481 if not self.getByBlock:
482 482
483 483 """
484 484 Return profile by profile
485 485
486 486 If nTxs > 1 then one profile is divided by nTxs and number of total
487 487 blocks is increased by nTxs (nProfiles *= nTxs)
488 488 """
489 489 self.dataOut.flagDataAsBlock = False
490 490 self.dataOut.data = self.datablock[:, self.profileIndex, :]
491 491 self.dataOut.profileIndex = self.profileIndex
492 492
493 493 self.profileIndex += 1
494 494
495 495 # elif self.selBlocksize==None or self.selBlocksize==self.dataOut.nProfiles:
496 496 # """
497 497 # Return all block
498 498 # """
499 499 # self.dataOut.flagDataAsBlock = True
500 500 # self.dataOut.data = self.datablock
501 501 # self.dataOut.profileIndex = self.dataOut.nProfiles - 1
502 502 #
503 503 # self.profileIndex = self.dataOut.nProfiles
504 504
505 505 else:
506 506 """
507 507 Return a block
508 508 """
509 509 if self.selBlocksize == None:
510 510 self.selBlocksize = self.dataOut.nProfiles
511 511 if self.selBlocktime != None:
512 512 if self.dataOut.nCohInt is not None:
513 513 nCohInt = self.dataOut.nCohInt
514 514 else:
515 515 nCohInt = 1
516 516 self.selBlocksize = int(self.dataOut.nProfiles * round(self.selBlocktime / (
517 517 nCohInt * self.dataOut.ippSeconds * self.dataOut.nProfiles)))
518 518
519 519 self.dataOut.data = self.datablock[:,
520 520 self.profileIndex:self.profileIndex + self.selBlocksize, :]
521 521 self.profileIndex += self.selBlocksize
522 522 datasize = self.dataOut.data.shape[1]
523 523
524 524 if datasize < self.selBlocksize:
525 525 buffer = numpy.zeros(
526 526 (self.dataOut.data.shape[0], self.selBlocksize, self.dataOut.data.shape[2]), dtype='complex')
527 527 buffer[:, :datasize, :] = self.dataOut.data
528 528
529 529 while datasize < self.selBlocksize: # Not enough profiles to fill the block
530 530 if not(self.readNextBlock()):
531 531 return 0
532 532 self.getFirstHeader()
533 533 self.reshapeData()
534 534 if self.datablock is None:
535 535 self.dataOut.flagNoData = True
536 536 return 0
537 537 # stack data
538 538 blockIndex = self.selBlocksize - datasize
539 539 datablock1 = self.datablock[:, :blockIndex, :]
540 540
541 541 buffer[:, datasize:datasize +
542 542 datablock1.shape[1], :] = datablock1
543 543 datasize += datablock1.shape[1]
544 544
545 545 self.dataOut.data = buffer
546 546 self.profileIndex = blockIndex
547 547
548 548 self.dataOut.flagDataAsBlock = True
549 549 self.dataOut.nProfiles = self.dataOut.data.shape[1]
550 550
551 551 self.dataOut.flagNoData = False
552 552
553 553 self.getBasicHeader()
554 554
555 555 self.dataOut.realtime = self.online
556 556
557 557 return self.dataOut.data
558 558
559 559
560 560 class VoltageWriter(JRODataWriter, Operation):
561 561 """
562 562 Esta clase permite escribir datos de voltajes a archivos procesados (.r). La escritura
563 563 de los datos siempre se realiza por bloques.
564 564 """
565 565
566 566 ext = ".r"
567 567
568 568 optchar = "D"
569 569
570 570 shapeBuffer = None
571 571
572 572 def __init__(self, **kwargs):
573 573 """
574 574 Inicializador de la clase VoltageWriter para la escritura de datos de espectros.
575 575
576 576 Affected:
577 577 self.dataOut
578 578
579 579 Return: None
580 580 """
581 581 Operation.__init__(self, **kwargs)
582 582
583 583 self.nTotalBlocks = 0
584 584
585 585 self.profileIndex = 0
586 586
587 587 self.isConfig = False
588 588
589 589 self.fp = None
590 590
591 591 self.flagIsNewFile = 1
592 592
593 593 self.blockIndex = 0
594 594
595 595 self.flagIsNewBlock = 0
596 596
597 597 self.setFile = None
598 598
599 599 self.dtype = None
600 600
601 601 self.path = None
602 602
603 603 self.filename = None
604 604
605 605 self.basicHeaderObj = BasicHeader(LOCALTIME)
606 606
607 607 self.systemHeaderObj = SystemHeader()
608 608
609 609 self.radarControllerHeaderObj = RadarControllerHeader()
610 610
611 611 self.processingHeaderObj = ProcessingHeader()
612 612
613 613 def hasAllDataInBuffer(self):
614 614 if self.profileIndex >= self.processingHeaderObj.profilesPerBlock:
615 615 return 1
616 616 return 0
617 617
618 618 def setBlockDimension(self):
619 619 """
620 620 Obtiene las formas dimensionales del los subbloques de datos que componen un bloque
621 621
622 622 Affected:
623 623 self.shape_spc_Buffer
624 624 self.shape_cspc_Buffer
625 625 self.shape_dc_Buffer
626 626
627 627 Return: None
628 628 """
629 629 self.shapeBuffer = (self.processingHeaderObj.profilesPerBlock,
630 630 self.processingHeaderObj.nHeights,
631 631 self.systemHeaderObj.nChannels)
632 632
633 633 self.datablock = numpy.zeros((self.systemHeaderObj.nChannels,
634 634 self.processingHeaderObj.profilesPerBlock,
635 635 self.processingHeaderObj.nHeights),
636 636 dtype=numpy.dtype('complex64'))
637 637
638 638 def writeBlock(self):
639 639 """
640 640 Escribe el buffer en el file designado
641 641
642 642 Affected:
643 643 self.profileIndex
644 644 self.flagIsNewFile
645 645 self.flagIsNewBlock
646 646 self.nTotalBlocks
647 647 self.blockIndex
648 648
649 649 Return: None
650 650 """
651 651 data = numpy.zeros(self.shapeBuffer, self.dtype)
652 652
653 653 junk = numpy.transpose(self.datablock, (1, 2, 0))
654 654
655 655 data['real'] = junk.real
656 656 data['imag'] = junk.imag
657 657
658 658 data = data.reshape((-1))
659 659
660 660 data.tofile(self.fp)
661 661
662 662 self.datablock.fill(0)
663 663
664 664 self.profileIndex = 0
665 665 self.flagIsNewFile = 0
666 666 self.flagIsNewBlock = 1
667 667
668 668 self.blockIndex += 1
669 669 self.nTotalBlocks += 1
670 670
671 671 # print "[Writing] Block = %04d" %self.blockIndex
672 672
673 673 def putData(self):
674 674 """
675 675 Setea un bloque de datos y luego los escribe en un file
676 676
677 677 Affected:
678 678 self.flagIsNewBlock
679 679 self.profileIndex
680 680
681 681 Return:
682 682 0 : Si no hay data o no hay mas files que puedan escribirse
683 683 1 : Si se escribio la data de un bloque en un file
684 684 """
685 685 if self.dataOut.flagNoData:
686 686 return 0
687 687
688 688 self.flagIsNewBlock = 0
689 689
690 690 if self.dataOut.flagDiscontinuousBlock:
691 691 self.datablock.fill(0)
692 692 self.profileIndex = 0
693 693 self.setNextFile()
694 694
695 695 if self.profileIndex == 0:
696 696 self.setBasicHeader()
697 697
698 698 self.datablock[:, self.profileIndex, :] = self.dataOut.data
699 699
700 700 self.profileIndex += 1
701 701
702 702 if self.hasAllDataInBuffer():
703 703 # if self.flagIsNewFile:
704 704 self.writeNextBlock()
705 705 # self.setFirstHeader()
706 706
707 707 return 1
708 708
709 709 def __getBlockSize(self):
710 710 '''
711 711 Este metodos determina el cantidad de bytes para un bloque de datos de tipo Voltage
712 712 '''
713 713
714 714 dtype_width = self.getDtypeWidth()
715 715
716 716 blocksize = int(self.dataOut.nHeights * self.dataOut.nChannels *
717 717 self.profilesPerBlock * dtype_width * 2)
718 718
719 719 return blocksize
720 720
721 721 def setFirstHeader(self):
722 722 """
723 723 Obtiene una copia del First Header
724 724
725 725 Affected:
726 726 self.systemHeaderObj
727 727 self.radarControllerHeaderObj
728 728 self.dtype
729 729
730 730 Return:
731 731 None
732 732 """
733 733
734 734 self.systemHeaderObj = self.dataOut.systemHeaderObj.copy()
735 735 self.systemHeaderObj.nChannels = self.dataOut.nChannels
736 736 self.radarControllerHeaderObj = self.dataOut.radarControllerHeaderObj.copy()
737 737
738 738 self.processingHeaderObj.dtype = 0 # Voltage
739 739 self.processingHeaderObj.blockSize = self.__getBlockSize()
740 740 self.processingHeaderObj.profilesPerBlock = self.profilesPerBlock
741 741 self.processingHeaderObj.dataBlocksPerFile = self.blocksPerFile
742 742 # podria ser 1 o self.dataOut.processingHeaderObj.nWindows
743 743 self.processingHeaderObj.nWindows = 1
744 744 self.processingHeaderObj.nCohInt = self.dataOut.nCohInt
745 745 # Cuando la data de origen es de tipo Voltage
746 746 self.processingHeaderObj.nIncohInt = 1
747 747 # Cuando la data de origen es de tipo Voltage
748 748 self.processingHeaderObj.totalSpectra = 0
749 749
750 750 if self.dataOut.code is not None:
751 751 self.processingHeaderObj.code = self.dataOut.code
752 752 self.processingHeaderObj.nCode = self.dataOut.nCode
753 753 self.processingHeaderObj.nBaud = self.dataOut.nBaud
754 754
755 755 if self.processingHeaderObj.nWindows != 0:
756 756 self.processingHeaderObj.firstHeight = self.dataOut.heightList[0]
757 757 self.processingHeaderObj.deltaHeight = self.dataOut.heightList[1] - \
758 758 self.dataOut.heightList[0]
759 759 self.processingHeaderObj.nHeights = self.dataOut.nHeights
760 760 self.processingHeaderObj.samplesWin = self.dataOut.nHeights
761 761
762 762 self.processingHeaderObj.processFlags = self.getProcessFlags()
763 763
764 self.setBasicHeader() No newline at end of file
764 self.setBasicHeader()
765 No newline at end of file
@@ -1,538 +1,538
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14 from platform import python_version
15 15 import inspect
16 16 import zmq
17 17 import time
18 18 import pickle
19 19 import os
20 20 from multiprocessing import Process
21 21
22 22 from schainpy.utils import log
23 23
24 24
25 25 class ProcessingUnit(object):
26 26
27 27 """
28 28 Update - Jan 2018 - MULTIPROCESSING
29 29 All the "call" methods present in the previous base were removed.
30 30 The majority of operations are independant processes, thus
31 31 the decorator is in charge of communicate the operation processes
32 32 with the proccessing unit via IPC.
33 33
34 34 The constructor does not receive any argument. The remaining methods
35 35 are related with the operations to execute.
36 36
37 37
38 38 """
39 39 # objeto de datos de entrada (Voltage, Spectra o Correlation)
40 40 dataIn = None
41 41 dataInList = []
42 42
43 43 # objeto de datos de entrada (Voltage, Spectra o Correlation)
44 44
45 45 id = None
46 46 inputId = None
47 47
48 48 dataOut = None
49 49
50 50 dictProcs = None
51 51
52 52 operations2RunDict = None
53 53
54 54 isConfig = False
55 55
56 56 def __init__(self):
57 57
58 58 self.dataIn = None
59 59 self.dataOut = None
60 60
61 61 self.isConfig = False
62 62
63 63 def getAllowedArgs(self):
64 64 if hasattr(self, '__attrs__'):
65 65 return self.__attrs__
66 66 else:
67 67 return inspect.getargspec(self.run).args
68 68
69 69 def addOperationKwargs(self, objId, **kwargs):
70 70 '''
71 71 '''
72 72
73 73 self.operationKwargs[objId] = kwargs
74 74
75 75 def addOperation(self, opObj, objId):
76 76
77 77 """
78 78 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
79 79 posses the id of the operation process (IPC purposes)
80 80
81 81 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
82 82 identificador asociado a este objeto.
83 83
84 84 Input:
85 85
86 86 object : objeto de la clase "Operation"
87 87
88 88 Return:
89 89
90 90 objId : identificador del objeto, necesario para comunicar con master(procUnit)
91 91 """
92 92
93 93 self.operations2RunDict[objId] = opObj
94 94
95 95 return objId
96 96
97 97
98 98 def getOperationObj(self, objId):
99 99
100 100 if objId not in list(self.operations2RunDict.keys()):
101 101 return None
102 102
103 103 return self.operations2RunDict[objId]
104 104
105 105 def operation(self, **kwargs):
106 106
107 107 """
108 108 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
109 109 atributos del objeto dataOut
110 110
111 111 Input:
112 112
113 113 **kwargs : Diccionario de argumentos de la funcion a ejecutar
114 114 """
115 115
116 116 raise NotImplementedError
117 117
118 118 def setup(self):
119 119
120 120 raise NotImplementedError
121 121
122 122 def run(self):
123 123
124 124 raise NotImplementedError
125 125
126 126 def close(self):
127 127 #Close every thread, queue or any other object here is it is neccesary.
128 128 return
129 129
130 130 class Operation(object):
131 131
132 132 """
133 133 Update - Jan 2018 - MULTIPROCESSING
134 134
135 135 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
136 136 The constructor doe snot receive any argument, neither the baseclass.
137 137
138 138
139 139 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
140 140 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
141 141 acumulacion dentro de esta clase
142 142
143 143 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
144 144
145 145 """
146 146 id = None
147 147 __buffer = None
148 148 dest = None
149 149 isConfig = False
150 150 readyFlag = None
151 151
152 152 def __init__(self):
153 153
154 154 self.buffer = None
155 155 self.dest = None
156 156 self.isConfig = False
157 157 self.readyFlag = False
158 158
159 159 if not hasattr(self, 'name'):
160 160 self.name = self.__class__.__name__
161 161
162 162 def getAllowedArgs(self):
163 163 if hasattr(self, '__attrs__'):
164 164 return self.__attrs__
165 165 else:
166 166 return inspect.getargspec(self.run).args
167 167
168 168 def setup(self):
169 169
170 170 self.isConfig = True
171 171
172 172 raise NotImplementedError
173 173
174 174
175 175 def run(self, dataIn, **kwargs):
176 176
177 177 """
178 178 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
179 179 atributos del objeto dataIn.
180 180
181 181 Input:
182 182
183 183 dataIn : objeto del tipo JROData
184 184
185 185 Return:
186 186
187 187 None
188 188
189 189 Affected:
190 190 __buffer : buffer de recepcion de datos.
191 191
192 192 """
193 193 if not self.isConfig:
194 194 self.setup(**kwargs)
195 195
196 196 raise NotImplementedError
197 197
198 198 def close(self):
199 199
200 200 pass
201 201
202 202
203 203 ######### Decorator #########
204 204
205 205
206 206 def MPDecorator(BaseClass):
207 207
208 208 """
209 209 "Multiprocessing class decorator"
210 210
211 211 This function add multiprocessing features to the base class. Also,
212 212 it handle the communication beetween processes (readers, procUnits and operations).
213 213 Receive the arguments at the moment of instantiation. According to that, discriminates if it
214 214 is a procUnit or an operation
215 215 """
216 216
217 217 class MPClass(BaseClass, Process):
218 218
219 219 "This is the overwritten class"
220 220 operations2RunDict = None
221 221 socket_l = None
222 222 socket_p = None
223 223 socketOP = None
224 224 socket_router = None
225 225 dictProcs = None
226 226 typeProc = None
227 227 def __init__(self, *args, **kwargs):
228 228 super(MPClass, self).__init__()
229 229 Process.__init__(self)
230 230
231 231
232 232 self.operationKwargs = {}
233 233 self.args = args
234 234
235 235
236 236 self.operations2RunDict = {}
237 237 self.kwargs = kwargs
238 238
239 239 # The number of arguments (args) determine the type of process
240 240
241 241 if len(self.args) is 3:
242 242 self.typeProc = "ProcUnit"
243 243 self.id = args[0] #topico de publicacion
244 244 self.inputId = args[1] #topico de subcripcion
245 245 self.dictProcs = args[2] #diccionario de procesos globales
246 246 else:
247 247 self.id = args[0]
248 248 self.typeProc = "Operation"
249 249
250 250 def addOperationKwargs(self, objId, **kwargs):
251 251
252 252 self.operationKwargs[objId] = kwargs
253 253
254 254 def getAllowedArgs(self):
255 255
256 256 if hasattr(self, '__attrs__'):
257 257 return self.__attrs__
258 258 else:
259 259 return inspect.getargspec(self.run).args
260 260
261 261
262 262 def sockListening(self, topic):
263 263
264 264 """
265 265 This function create a socket to receive objects.
266 266 The 'topic' argument is related to the publisher process from which the self process is
267 267 listening (data).
268 268 In the case were the self process is listening to a Reader (proc Unit),
269 269 special conditions are introduced to maximize parallelism.
270 270 """
271 271
272 272 cont = zmq.Context()
273 273 zmq_socket = cont.socket(zmq.SUB)
274 274 if not os.path.exists('/tmp/socketTmp'):
275 275 os.mkdir('/tmp/socketTmp')
276 276
277 277 if 'Reader' in self.dictProcs[self.inputId].name:
278 278 zmq_socket.connect('ipc:///tmp/socketTmp/b')
279 279
280 280 else:
281 281 zmq_socket.connect('ipc:///tmp/socketTmp/%s' % self.inputId)
282 282
283 283 #log.error('RECEIVING FROM {} {}'.format(self.inputId, str(topic).encode()))
284 284 zmq_socket.setsockopt(zmq.SUBSCRIBE, str(topic).encode()) #yong
285 285
286 286 return zmq_socket
287 287
288 288
289 289 def listenProc(self, sock):
290 290
291 291 """
292 292 This function listen to a ipc addres until a message is recovered. To serialize the
293 293 data (object), pickle has been use.
294 294 The 'sock' argument is the socket previously connect to an ipc address and with a topic subscription.
295 295 """
296 296
297 297 a = sock.recv_multipart()
298 298 a = pickle.loads(a[1])
299 299 return a
300 300
301 301 def sockPublishing(self):
302 302
303 303 """
304 304 This function create a socket for publishing purposes.
305 305 Depending on the process type from where is created, it binds or connect
306 306 to special IPC addresses.
307 307 """
308 308 time.sleep(4) #yong
309 309 context = zmq.Context()
310 310 zmq_socket = context.socket(zmq.PUB)
311 311 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
312 312 if 'Reader' in self.dictProcs[self.id].name:
313 313 zmq_socket.connect('ipc:///tmp/socketTmp/a')
314 314 else:
315 315 zmq_socket.bind('ipc:///tmp/socketTmp/%s' % self.id)
316 316
317 317 return zmq_socket
318 318
319 319 def publishProc(self, sock, data):
320 320
321 321 """
322 322 This function publish a python object (data) under a specific topic in a socket (sock).
323 323 Usually, the topic is the self id of the process.
324 324 """
325 325
326 326 sock.send_multipart([str(self.id).encode(), pickle.dumps(data)]) #yong
327 327
328 328 return True
329 329
330 330 def sockOp(self):
331 331
332 332 """
333 333 This function create a socket for communication purposes with operation processes.
334 334 """
335 335
336 336 cont = zmq.Context()
337 337 zmq_socket = cont.socket(zmq.DEALER)
338 338
339 339 if python_version()[0] == '2':
340 340 zmq_socket.setsockopt(zmq.IDENTITY, self.id)
341 341 if python_version()[0] == '3':
342 342 zmq_socket.setsockopt_string(zmq.IDENTITY, self.id)
343 343
344 344
345 345 return zmq_socket
346 346
347 347
348 348 def execOp(self, socket, opId, dataObj):
349 349
350 350 """
351 351 This function 'execute' an operation main routine by establishing a
352 352 connection with it and sending a python object (dataOut).
353 353 """
354 354 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
355 355 socket.connect('ipc:///tmp/socketTmp/%s' %opId)
356 356
357 357
358 358 socket.send(pickle.dumps(dataObj)) #yong
359 359
360 360 argument = socket.recv_multipart()[0]
361 361
362 362 argument = pickle.loads(argument)
363 363
364 364 return argument
365 365
366 366 def sockIO(self):
367 367
368 368 """
369 369 Socket defined for an operation process. It is able to recover the object sent from another process as well as a
370 370 identifier of who sent it.
371 371 """
372 372
373 373 cont = zmq.Context()
374 374 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
375 375 socket = cont.socket(zmq.ROUTER)
376 376 socket.bind('ipc:///tmp/socketTmp/%s' % self.id)
377 377
378 378 return socket
379 379
380 380 def funIOrec(self, socket):
381 381
382 382 """
383 383 Operation method, recover the id of the process who sent a python object.
384 384 The 'socket' argument is the socket binded to a specific process ipc.
385 385 """
386 386
387 387 #id_proc = socket.recv()
388 388
389 389 #dataObj = socket.recv_pyobj()
390 390
391 391 dataObj = socket.recv_multipart()
392 392
393 393 dataObj[1] = pickle.loads(dataObj[1])
394 394 return dataObj[0], dataObj[1]
395 395
396 396 def funIOsen(self, socket, data, dest):
397 397
398 398 """
399 399 Operation method, send a python object to a specific destination.
400 400 The 'dest' argument is the id of a proccesinf unit.
401 401 """
402 402
403 403 socket.send_multipart([dest, pickle.dumps(data)]) #yong
404 404
405 405 return True
406 406
407 407
408 408 def runReader(self):
409 409
410 410 # time.sleep(3)
411 411 while True:
412 412
413 413 BaseClass.run(self, **self.kwargs)
414 414
415 415
416 416 keyList = list(self.operations2RunDict.keys())
417 417 keyList.sort()
418 418
419 419 for key in keyList:
420 420 self.socketOP = self.sockOp()
421 421 self.dataOut = self.execOp(self.socketOP, key, self.dataOut)
422 422
423 423
424 424 if self.flagNoMoreFiles: #Usar un objeto con flags para saber si termino el proc o hubo un error
425 425 self.publishProc(self.socket_p, "Finish")
426 426 break
427 427
428 428 if self.dataOut.flagNoData:
429 429 continue
430 430
431 print("Publishing data...")
431 #print("Publishing data...")
432 432 self.publishProc(self.socket_p, self.dataOut)
433 433 # time.sleep(2)
434 434
435 435
436 436 print("%s done" %BaseClass.__name__)
437 437 return 0
438 438
439 439 def runProc(self):
440 440
441 441 # All the procUnits with kwargs that require a setup initialization must be defined here.
442 442
443 443 if self.setupReq:
444 444 BaseClass.setup(self, **self.kwargs)
445 445
446 446 while True:
447 447 self.dataIn = self.listenProc(self.socket_l)
448 print("%s received data" %BaseClass.__name__)
448 #print("%s received data" %BaseClass.__name__)
449 449
450 450 if self.dataIn == "Finish":
451 451 break
452 452
453 453 m_arg = list(self.kwargs.keys())
454 454 num_arg = list(range(1,int(BaseClass.run.__code__.co_argcount)))
455 455
456 456 run_arg = {}
457 457
458 458 for var in num_arg:
459 459 if BaseClass.run.__code__.co_varnames[var] in m_arg:
460 460 run_arg[BaseClass.run.__code__.co_varnames[var]] = self.kwargs[BaseClass.run.__code__.co_varnames[var]]
461 461
462 462 #BaseClass.run(self, **self.kwargs)
463 463 BaseClass.run(self, **run_arg)
464 464
465 465 ## Iterar sobre una serie de data que podrias aplicarse
466 466
467 467 for m_name in BaseClass.METHODS:
468 468
469 469 met_arg = {}
470 470
471 471 for arg in m_arg:
472 472 if arg in BaseClass.METHODS[m_name]:
473 473 for att in BaseClass.METHODS[m_name]:
474 474 met_arg[att] = self.kwargs[att]
475 475
476 476 method = getattr(BaseClass, m_name)
477 477 method(self, **met_arg)
478 478 break
479 479
480 480 if self.dataOut.flagNoData:
481 481 continue
482 482
483 483 keyList = list(self.operations2RunDict.keys())
484 484 keyList.sort()
485 485
486 486 for key in keyList:
487 487
488 488 self.socketOP = self.sockOp()
489 489 self.dataOut = self.execOp(self.socketOP, key, self.dataOut)
490 490
491 491
492 492 self.publishProc(self.socket_p, self.dataOut)
493 493
494 494
495 495 print("%s done" %BaseClass.__name__)
496 496
497 497 return 0
498 498
499 499 def runOp(self):
500 500
501 501 while True:
502 502
503 503 [self.dest ,self.buffer] = self.funIOrec(self.socket_router)
504 504
505 505 self.buffer = BaseClass.run(self, self.buffer, **self.kwargs)
506 506
507 507 self.funIOsen(self.socket_router, self.buffer, self.dest)
508 508
509 509 print("%s done" %BaseClass.__name__)
510 510 return 0
511 511
512 512
513 513 def run(self):
514 514
515 515 if self.typeProc is "ProcUnit":
516 516
517 517 self.socket_p = self.sockPublishing()
518 518
519 519 if 'Reader' not in self.dictProcs[self.id].name:
520 520 self.socket_l = self.sockListening(self.inputId)
521 521 self.runProc()
522 522
523 523 else:
524 524
525 525 self.runReader()
526 526
527 527 elif self.typeProc is "Operation":
528 528
529 529 self.socket_router = self.sockIO()
530 530
531 531 self.runOp()
532 532
533 533 else:
534 534 raise ValueError("Unknown type")
535 535
536 536 return 0
537 537
538 538 return MPClass No newline at end of file
@@ -1,1321 +1,1335
1 1 import sys
2 2 import numpy
3 3 from scipy import interpolate
4 4 #TODO
5 5 #from schainpy import cSchain
6 from .jroproc_base import ProcessingUnit, Operation
6 from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator, Operation
7 7 from schainpy.model.data.jrodata import Voltage
8 from time import time
9 8 from schainpy.utils import log
9 from time import time
10 10
11 11
12 @MPDecorator
12 13 class VoltageProc(ProcessingUnit):
14
15 METHODS = {} #yong
13 16
17 def __init__(self):#, **kwargs): #yong
14 18
15 def __init__(self, **kwargs):
16
17 ProcessingUnit.__init__(self, **kwargs)
19 ProcessingUnit.__init__(self)#, **kwargs)
18 20
19 21 # self.objectDict = {}
20 22 self.dataOut = Voltage()
21 23 self.flip = 1
24 self.setupReq = False #yong
22 25
23 26 def run(self):
27
24 28 if self.dataIn.type == 'AMISR':
25 29 self.__updateObjFromAmisrInput()
26 30
27 31 if self.dataIn.type == 'Voltage':
28 32 self.dataOut.copy(self.dataIn)
29 33
30 34 # self.dataOut.copy(self.dataIn)
31 35
32 36 def __updateObjFromAmisrInput(self):
33 37
34 38 self.dataOut.timeZone = self.dataIn.timeZone
35 39 self.dataOut.dstFlag = self.dataIn.dstFlag
36 40 self.dataOut.errorCount = self.dataIn.errorCount
37 41 self.dataOut.useLocalTime = self.dataIn.useLocalTime
38 42
39 43 self.dataOut.flagNoData = self.dataIn.flagNoData
40 44 self.dataOut.data = self.dataIn.data
41 45 self.dataOut.utctime = self.dataIn.utctime
42 46 self.dataOut.channelList = self.dataIn.channelList
43 47 # self.dataOut.timeInterval = self.dataIn.timeInterval
44 48 self.dataOut.heightList = self.dataIn.heightList
45 49 self.dataOut.nProfiles = self.dataIn.nProfiles
46 50
47 51 self.dataOut.nCohInt = self.dataIn.nCohInt
48 52 self.dataOut.ippSeconds = self.dataIn.ippSeconds
49 53 self.dataOut.frequency = self.dataIn.frequency
50 54
51 55 self.dataOut.azimuth = self.dataIn.azimuth
52 56 self.dataOut.zenith = self.dataIn.zenith
53 57
54 58 self.dataOut.beam.codeList = self.dataIn.beam.codeList
55 59 self.dataOut.beam.azimuthList = self.dataIn.beam.azimuthList
56 60 self.dataOut.beam.zenithList = self.dataIn.beam.zenithList
57 61 #
58 62 # pass#
59 63 #
60 64 # def init(self):
61 65 #
62 66 #
63 67 # if self.dataIn.type == 'AMISR':
64 68 # self.__updateObjFromAmisrInput()
65 69 #
66 70 # if self.dataIn.type == 'Voltage':
67 71 # self.dataOut.copy(self.dataIn)
68 72 # # No necesita copiar en cada init() los atributos de dataIn
69 73 # # la copia deberia hacerse por cada nuevo bloque de datos
70 74
71 75 def selectChannels(self, channelList):
72 76
73 77 channelIndexList = []
74 78
75 79 for channel in channelList:
76 80 if channel not in self.dataOut.channelList:
77 81 raise ValueError("Channel %d is not in %s" %(channel, str(self.dataOut.channelList)))
78 82
79 83 index = self.dataOut.channelList.index(channel)
80 84 channelIndexList.append(index)
81 85
82 86 self.selectChannelsByIndex(channelIndexList)
83 87
84 88 def selectChannelsByIndex(self, channelIndexList):
85 89 """
86 90 Selecciona un bloque de datos en base a canales segun el channelIndexList
87 91
88 92 Input:
89 93 channelIndexList : lista sencilla de canales a seleccionar por ej. [2,3,7]
90 94
91 95 Affected:
92 96 self.dataOut.data
93 97 self.dataOut.channelIndexList
94 98 self.dataOut.nChannels
95 99 self.dataOut.m_ProcessingHeader.totalSpectra
96 100 self.dataOut.systemHeaderObj.numChannels
97 101 self.dataOut.m_ProcessingHeader.blockSize
98 102
99 103 Return:
100 104 None
101 105 """
102 106
103 107 for channelIndex in channelIndexList:
104 108 if channelIndex not in self.dataOut.channelIndexList:
105 109 print(channelIndexList)
106 110 raise ValueError("The value %d in channelIndexList is not valid" %channelIndex)
107 111
108 112 if self.dataOut.flagDataAsBlock:
109 113 """
110 114 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
111 115 """
112 116 data = self.dataOut.data[channelIndexList,:,:]
113 117 else:
114 118 data = self.dataOut.data[channelIndexList,:]
115 119
116 120 self.dataOut.data = data
117 121 self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
118 122 # self.dataOut.nChannels = nChannels
119 123
120 124 return 1
121 125
122 126 def selectHeights(self, minHei=None, maxHei=None):
123 127 """
124 128 Selecciona un bloque de datos en base a un grupo de valores de alturas segun el rango
125 129 minHei <= height <= maxHei
126 130
127 131 Input:
128 132 minHei : valor minimo de altura a considerar
129 133 maxHei : valor maximo de altura a considerar
130 134
131 135 Affected:
132 136 Indirectamente son cambiados varios valores a travez del metodo selectHeightsByIndex
133 137
134 138 Return:
135 139 1 si el metodo se ejecuto con exito caso contrario devuelve 0
136 140 """
137 141
138 142 if minHei == None:
139 143 minHei = self.dataOut.heightList[0]
140 144
141 145 if maxHei == None:
142 146 maxHei = self.dataOut.heightList[-1]
143 147
144 148 if (minHei < self.dataOut.heightList[0]):
145 149 minHei = self.dataOut.heightList[0]
146 150
147 151 if (maxHei > self.dataOut.heightList[-1]):
148 152 maxHei = self.dataOut.heightList[-1]
149 153
150 154 minIndex = 0
151 155 maxIndex = 0
152 156 heights = self.dataOut.heightList
153 157
154 158 inda = numpy.where(heights >= minHei)
155 159 indb = numpy.where(heights <= maxHei)
156 160
157 161 try:
158 162 minIndex = inda[0][0]
159 163 except:
160 164 minIndex = 0
161 165
162 166 try:
163 167 maxIndex = indb[0][-1]
164 168 except:
165 169 maxIndex = len(heights)
166 170
167 171 self.selectHeightsByIndex(minIndex, maxIndex)
168 172
169 173 return 1
170 174
171 175
172 176 def selectHeightsByIndex(self, minIndex, maxIndex):
173 177 """
174 178 Selecciona un bloque de datos en base a un grupo indices de alturas segun el rango
175 179 minIndex <= index <= maxIndex
176 180
177 181 Input:
178 182 minIndex : valor de indice minimo de altura a considerar
179 183 maxIndex : valor de indice maximo de altura a considerar
180 184
181 185 Affected:
182 186 self.dataOut.data
183 187 self.dataOut.heightList
184 188
185 189 Return:
186 190 1 si el metodo se ejecuto con exito caso contrario devuelve 0
187 191 """
188 192
189 193 if (minIndex < 0) or (minIndex > maxIndex):
190 194 raise ValueError("Height index range (%d,%d) is not valid" % (minIndex, maxIndex))
191 195
192 196 if (maxIndex >= self.dataOut.nHeights):
193 197 maxIndex = self.dataOut.nHeights
194 198
195 199 #voltage
196 200 if self.dataOut.flagDataAsBlock:
197 201 """
198 202 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
199 203 """
200 204 data = self.dataOut.data[:,:, minIndex:maxIndex]
201 205 else:
202 206 data = self.dataOut.data[:, minIndex:maxIndex]
203 207
204 208 # firstHeight = self.dataOut.heightList[minIndex]
205 209
206 210 self.dataOut.data = data
207 211 self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex]
208 212
209 213 if self.dataOut.nHeights <= 1:
210 214 raise ValueError("selectHeights: Too few heights. Current number of heights is %d" %(self.dataOut.nHeights))
211 215
212 216 return 1
213 217
214 218
215 219 def filterByHeights(self, window):
216 220
217 221 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
218 222
219 223 if window == None:
220 224 window = (self.dataOut.radarControllerHeaderObj.txA/self.dataOut.radarControllerHeaderObj.nBaud) / deltaHeight
221 225
222 226 newdelta = deltaHeight * window
223 227 r = self.dataOut.nHeights % window
224 228 newheights = (self.dataOut.nHeights-r)/window
225 229
226 230 if newheights <= 1:
227 231 raise ValueError("filterByHeights: Too few heights. Current number of heights is %d and window is %d" %(self.dataOut.nHeights, window))
228 232
229 233 if self.dataOut.flagDataAsBlock:
230 234 """
231 235 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
232 236 """
233 237 buffer = self.dataOut.data[:, :, 0:self.dataOut.nHeights-r]
234 238 buffer = buffer.reshape(self.dataOut.nChannels,self.dataOut.nProfiles,self.dataOut.nHeights/window,window)
235 239 buffer = numpy.sum(buffer,3)
236 240
237 241 else:
238 242 buffer = self.dataOut.data[:,0:self.dataOut.nHeights-r]
239 243 buffer = buffer.reshape(self.dataOut.nChannels,self.dataOut.nHeights/window,window)
240 244 buffer = numpy.sum(buffer,2)
241 245
242 246 self.dataOut.data = buffer
243 247 self.dataOut.heightList = self.dataOut.heightList[0] + numpy.arange( newheights )*newdelta
244 248 self.dataOut.windowOfFilter = window
245 249
246 250 def setH0(self, h0, deltaHeight = None):
247 251
248 252 if not deltaHeight:
249 253 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
250 254
251 255 nHeights = self.dataOut.nHeights
252 256
253 257 newHeiRange = h0 + numpy.arange(nHeights)*deltaHeight
254 258
255 259 self.dataOut.heightList = newHeiRange
256 260
257 261 def deFlip(self, channelList = []):
258 262
259 263 data = self.dataOut.data.copy()
260 264
261 265 if self.dataOut.flagDataAsBlock:
262 266 flip = self.flip
263 267 profileList = list(range(self.dataOut.nProfiles))
264 268
265 269 if not channelList:
266 270 for thisProfile in profileList:
267 271 data[:,thisProfile,:] = data[:,thisProfile,:]*flip
268 272 flip *= -1.0
269 273 else:
270 274 for thisChannel in channelList:
271 275 if thisChannel not in self.dataOut.channelList:
272 276 continue
273 277
274 278 for thisProfile in profileList:
275 279 data[thisChannel,thisProfile,:] = data[thisChannel,thisProfile,:]*flip
276 280 flip *= -1.0
277 281
278 282 self.flip = flip
279 283
280 284 else:
281 285 if not channelList:
282 286 data[:,:] = data[:,:]*self.flip
283 287 else:
284 288 for thisChannel in channelList:
285 289 if thisChannel not in self.dataOut.channelList:
286 290 continue
287 291
288 292 data[thisChannel,:] = data[thisChannel,:]*self.flip
289 293
290 294 self.flip *= -1.
291 295
292 296 self.dataOut.data = data
293 297
294 298 def setRadarFrequency(self, frequency=None):
295 299
296 300 if frequency != None:
297 301 self.dataOut.frequency = frequency
298 302
299 303 return 1
300 304
301 305 def interpolateHeights(self, topLim, botLim):
302 306 #69 al 72 para julia
303 307 #82-84 para meteoros
304 308 if len(numpy.shape(self.dataOut.data))==2:
305 309 sampInterp = (self.dataOut.data[:,botLim-1] + self.dataOut.data[:,topLim+1])/2
306 310 sampInterp = numpy.transpose(numpy.tile(sampInterp,(topLim-botLim + 1,1)))
307 311 #self.dataOut.data[:,botLim:limSup+1] = sampInterp
308 312 self.dataOut.data[:,botLim:topLim+1] = sampInterp
309 313 else:
310 314 nHeights = self.dataOut.data.shape[2]
311 315 x = numpy.hstack((numpy.arange(botLim),numpy.arange(topLim+1,nHeights)))
312 316 y = self.dataOut.data[:,:,list(range(botLim))+list(range(topLim+1,nHeights))]
313 317 f = interpolate.interp1d(x, y, axis = 2)
314 318 xnew = numpy.arange(botLim,topLim+1)
315 319 ynew = f(xnew)
316 320
317 321 self.dataOut.data[:,:,botLim:topLim+1] = ynew
318 322
319 323 # import collections
320
324 @MPDecorator
321 325 class CohInt(Operation):
322 326
323 327 isConfig = False
324 328 __profIndex = 0
325 329 __byTime = False
326 330 __initime = None
327 331 __lastdatatime = None
328 332 __integrationtime = None
329 333 __buffer = None
330 334 __bufferStride = []
331 335 __dataReady = False
332 336 __profIndexStride = 0
333 337 __dataToPutStride = False
334 338 n = None
335 339
336 def __init__(self, **kwargs):
340 def __init__(self):#, **kwargs):
337 341
338 Operation.__init__(self, **kwargs)
342 Operation.__init__(self)#, **kwargs)
339 343
340 344 # self.isConfig = False
341 345
342 346 def setup(self, n=None, timeInterval=None, stride=None, overlapping=False, byblock=False):
343 347 """
344 348 Set the parameters of the integration class.
345 349
346 350 Inputs:
347 351
348 352 n : Number of coherent integrations
349 353 timeInterval : Time of integration. If the parameter "n" is selected this one does not work
350 354 overlapping :
351 355 """
352 356
353 357 self.__initime = None
354 358 self.__lastdatatime = 0
355 359 self.__buffer = None
356 360 self.__dataReady = False
357 361 self.byblock = byblock
358 362 self.stride = stride
359 363
360 364 if n == None and timeInterval == None:
361 365 raise ValueError("n or timeInterval should be specified ...")
362 366
363 367 if n != None:
364 368 self.n = n
365 369 self.__byTime = False
366 370 else:
367 371 self.__integrationtime = timeInterval #* 60. #if (type(timeInterval)!=integer) -> change this line
368 372 self.n = 9999
369 373 self.__byTime = True
370 374
371 375 if overlapping:
372 376 self.__withOverlapping = True
373 377 self.__buffer = None
374 378 else:
375 379 self.__withOverlapping = False
376 380 self.__buffer = 0
377 381
378 382 self.__profIndex = 0
379 383
380 384 def putData(self, data):
381 385
382 386 """
383 387 Add a profile to the __buffer and increase in one the __profileIndex
384 388
385 389 """
386 390
387 391 if not self.__withOverlapping:
388 392 self.__buffer += data.copy()
389 393 self.__profIndex += 1
390 394 return
391 395
392 396 #Overlapping data
393 397 nChannels, nHeis = data.shape
394 398 data = numpy.reshape(data, (1, nChannels, nHeis))
395 399
396 400 #If the buffer is empty then it takes the data value
397 401 if self.__buffer is None:
398 402 self.__buffer = data
399 403 self.__profIndex += 1
400 404 return
401 405
402 406 #If the buffer length is lower than n then stakcing the data value
403 407 if self.__profIndex < self.n:
404 408 self.__buffer = numpy.vstack((self.__buffer, data))
405 409 self.__profIndex += 1
406 410 return
407 411
408 412 #If the buffer length is equal to n then replacing the last buffer value with the data value
409 413 self.__buffer = numpy.roll(self.__buffer, -1, axis=0)
410 414 self.__buffer[self.n-1] = data
411 415 self.__profIndex = self.n
412 416 return
413 417
414 418
415 419 def pushData(self):
416 420 """
417 421 Return the sum of the last profiles and the profiles used in the sum.
418 422
419 423 Affected:
420 424
421 425 self.__profileIndex
422 426
423 427 """
424 428
425 429 if not self.__withOverlapping:
426 430 data = self.__buffer
427 431 n = self.__profIndex
428 432
429 433 self.__buffer = 0
430 434 self.__profIndex = 0
431 435
432 436 return data, n
433 437
434 438 #Integration with Overlapping
435 439 data = numpy.sum(self.__buffer, axis=0)
436 440 # print data
437 441 # raise
438 442 n = self.__profIndex
439 443
440 444 return data, n
441 445
442 446 def byProfiles(self, data):
443 447
444 448 self.__dataReady = False
445 449 avgdata = None
446 450 # n = None
447 451 # print data
448 452 # raise
449 453 self.putData(data)
450 454
451 455 if self.__profIndex == self.n:
452 456 avgdata, n = self.pushData()
453 457 self.__dataReady = True
454 458
455 459 return avgdata
456 460
457 461 def byTime(self, data, datatime):
458 462
459 463 self.__dataReady = False
460 464 avgdata = None
461 465 n = None
462 466
463 467 self.putData(data)
464 468
465 469 if (datatime - self.__initime) >= self.__integrationtime:
466 470 avgdata, n = self.pushData()
467 471 self.n = n
468 472 self.__dataReady = True
469 473
470 474 return avgdata
471 475
472 476 def integrateByStride(self, data, datatime):
473 477 # print data
474 478 if self.__profIndex == 0:
475 479 self.__buffer = [[data.copy(), datatime]]
476 480 else:
477 481 self.__buffer.append([data.copy(),datatime])
478 482 self.__profIndex += 1
479 483 self.__dataReady = False
480 484
481 485 if self.__profIndex == self.n * self.stride :
482 486 self.__dataToPutStride = True
483 487 self.__profIndexStride = 0
484 488 self.__profIndex = 0
485 489 self.__bufferStride = []
486 490 for i in range(self.stride):
487 491 current = self.__buffer[i::self.stride]
488 492 data = numpy.sum([t[0] for t in current], axis=0)
489 493 avgdatatime = numpy.average([t[1] for t in current])
490 494 # print data
491 495 self.__bufferStride.append((data, avgdatatime))
492 496
493 497 if self.__dataToPutStride:
494 498 self.__dataReady = True
495 499 self.__profIndexStride += 1
496 500 if self.__profIndexStride == self.stride:
497 501 self.__dataToPutStride = False
498 502 # print self.__bufferStride[self.__profIndexStride - 1]
499 503 # raise
500 504 return self.__bufferStride[self.__profIndexStride - 1]
501 505
502 506
503 507 return None, None
504 508
505 509 def integrate(self, data, datatime=None):
506 510
507 511 if self.__initime == None:
508 512 self.__initime = datatime
509 513
510 514 if self.__byTime:
511 515 avgdata = self.byTime(data, datatime)
512 516 else:
513 517 avgdata = self.byProfiles(data)
514 518
515 519
516 520 self.__lastdatatime = datatime
517 521
518 522 if avgdata is None:
519 523 return None, None
520 524
521 525 avgdatatime = self.__initime
522 526
523 527 deltatime = datatime - self.__lastdatatime
524 528
525 529 if not self.__withOverlapping:
526 530 self.__initime = datatime
527 531 else:
528 532 self.__initime += deltatime
529 533
530 534 return avgdata, avgdatatime
531 535
532 536 def integrateByBlock(self, dataOut):
533 537
534 538 times = int(dataOut.data.shape[1]/self.n)
535 539 avgdata = numpy.zeros((dataOut.nChannels, times, dataOut.nHeights), dtype=numpy.complex)
536 540
537 541 id_min = 0
538 542 id_max = self.n
539 543
540 544 for i in range(times):
541 545 junk = dataOut.data[:,id_min:id_max,:]
542 546 avgdata[:,i,:] = junk.sum(axis=1)
543 547 id_min += self.n
544 548 id_max += self.n
545 549
546 550 timeInterval = dataOut.ippSeconds*self.n
547 551 avgdatatime = (times - 1) * timeInterval + dataOut.utctime
548 552 self.__dataReady = True
549 553 return avgdata, avgdatatime
550 554
551 555 def run(self, dataOut, n=None, timeInterval=None, stride=None, overlapping=False, byblock=False, **kwargs):
556
552 557 if not self.isConfig:
553 558 self.setup(n=n, stride=stride, timeInterval=timeInterval, overlapping=overlapping, byblock=byblock, **kwargs)
554 559 self.isConfig = True
555 560
556 561 if dataOut.flagDataAsBlock:
557 562 """
558 563 Si la data es leida por bloques, dimension = [nChannels, nProfiles, nHeis]
559 564 """
560 565 avgdata, avgdatatime = self.integrateByBlock(dataOut)
561 566 dataOut.nProfiles /= self.n
562 567 else:
563 568 if stride is None:
564 569 avgdata, avgdatatime = self.integrate(dataOut.data, dataOut.utctime)
565 570 else:
566 571 avgdata, avgdatatime = self.integrateByStride(dataOut.data, dataOut.utctime)
567 572
568 573
569 574 # dataOut.timeInterval *= n
570 575 dataOut.flagNoData = True
571 576
572 577 if self.__dataReady:
573 578 dataOut.data = avgdata
574 579 dataOut.nCohInt *= self.n
575 580 dataOut.utctime = avgdatatime
576 581 # print avgdata, avgdatatime
577 582 # raise
578 583 # dataOut.timeInterval = dataOut.ippSeconds * dataOut.nCohInt
579 584 dataOut.flagNoData = False
580
585 return dataOut
586 @MPDecorator
581 587 class Decoder(Operation):
582 588
583 589 isConfig = False
584 590 __profIndex = 0
585 591
586 592 code = None
587 593
588 594 nCode = None
589 595 nBaud = None
590 596
591 def __init__(self, **kwargs):
597 def __init__(self):#, **kwargs):
592 598
593 Operation.__init__(self, **kwargs)
599 Operation.__init__(self)#, **kwargs)
594 600
595 601 self.times = None
596 602 self.osamp = None
597 603 # self.__setValues = False
598 self.isConfig = False
599
604 # self.isConfig = False
605 self.setupReq = False
600 606 def setup(self, code, osamp, dataOut):
601 607
602 608 self.__profIndex = 0
603 609
604 610 self.code = code
605 611
606 612 self.nCode = len(code)
607 613 self.nBaud = len(code[0])
608 614
609 615 if (osamp != None) and (osamp >1):
610 616 self.osamp = osamp
611 617 self.code = numpy.repeat(code, repeats=self.osamp, axis=1)
612 618 self.nBaud = self.nBaud*self.osamp
613 619
614 620 self.__nChannels = dataOut.nChannels
615 621 self.__nProfiles = dataOut.nProfiles
616 622 self.__nHeis = dataOut.nHeights
617 623
618 624 if self.__nHeis < self.nBaud:
619 625 raise ValueError('Number of heights (%d) should be greater than number of bauds (%d)' %(self.__nHeis, self.nBaud))
620 626
621 627 #Frequency
622 628 __codeBuffer = numpy.zeros((self.nCode, self.__nHeis), dtype=numpy.complex)
623 629
624 630 __codeBuffer[:,0:self.nBaud] = self.code
625 631
626 632 self.fft_code = numpy.conj(numpy.fft.fft(__codeBuffer, axis=1))
627 633
628 634 if dataOut.flagDataAsBlock:
629 635
630 636 self.ndatadec = self.__nHeis #- self.nBaud + 1
631 637
632 638 self.datadecTime = numpy.zeros((self.__nChannels, self.__nProfiles, self.ndatadec), dtype=numpy.complex)
633 639
634 640 else:
635 641
636 642 #Time
637 643 self.ndatadec = self.__nHeis #- self.nBaud + 1
638 644
639 645 self.datadecTime = numpy.zeros((self.__nChannels, self.ndatadec), dtype=numpy.complex)
640 646
641 647 def __convolutionInFreq(self, data):
642 648
643 649 fft_code = self.fft_code[self.__profIndex].reshape(1,-1)
644 650
645 651 fft_data = numpy.fft.fft(data, axis=1)
646 652
647 653 conv = fft_data*fft_code
648 654
649 655 data = numpy.fft.ifft(conv,axis=1)
650 656
651 657 return data
652 658
653 659 def __convolutionInFreqOpt(self, data):
654 660
655 661 raise NotImplementedError
656 662
657 663 def __convolutionInTime(self, data):
658 664
659 665 code = self.code[self.__profIndex]
660 666 for i in range(self.__nChannels):
661 667 self.datadecTime[i,:] = numpy.correlate(data[i,:], code, mode='full')[self.nBaud-1:]
662 668
663 669 return self.datadecTime
664 670
665 671 def __convolutionByBlockInTime(self, data):
666 672
667 673 repetitions = self.__nProfiles / self.nCode
668 674
669 675 junk = numpy.lib.stride_tricks.as_strided(self.code, (repetitions, self.code.size), (0, self.code.itemsize))
670 676 junk = junk.flatten()
671 677 code_block = numpy.reshape(junk, (self.nCode*repetitions, self.nBaud))
672 678 profilesList = range(self.__nProfiles)
673 679
674 680 for i in range(self.__nChannels):
675 681 for j in profilesList:
676 682 self.datadecTime[i,j,:] = numpy.correlate(data[i,j,:], code_block[j,:], mode='full')[self.nBaud-1:]
677 683 return self.datadecTime
678 684
679 685 def __convolutionByBlockInFreq(self, data):
680 686
681 687 raise NotImplementedError("Decoder by frequency fro Blocks not implemented")
682 688
683 689
684 690 fft_code = self.fft_code[self.__profIndex].reshape(1,-1)
685 691
686 692 fft_data = numpy.fft.fft(data, axis=2)
687 693
688 694 conv = fft_data*fft_code
689 695
690 696 data = numpy.fft.ifft(conv,axis=2)
691 697
692 698 return data
693 699
694 700
695 701 def run(self, dataOut, code=None, nCode=None, nBaud=None, mode = 0, osamp=None, times=None):
696 702
697 703 if dataOut.flagDecodeData:
698 704 print("This data is already decoded, recoding again ...")
699 705
700 706 if not self.isConfig:
701 707
702 708 if code is None:
703 709 if dataOut.code is None:
704 710 raise ValueError("Code could not be read from %s instance. Enter a value in Code parameter" %dataOut.type)
705 711
706 712 code = dataOut.code
707 713 else:
708 714 code = numpy.array(code).reshape(nCode,nBaud)
709 715 self.setup(code, osamp, dataOut)
710 716
711 717 self.isConfig = True
712 718
713 719 if mode == 3:
714 720 sys.stderr.write("Decoder Warning: mode=%d is not valid, using mode=0\n" %mode)
715 721
716 722 if times != None:
717 723 sys.stderr.write("Decoder Warning: Argument 'times' in not used anymore\n")
718 724
719 725 if self.code is None:
720 726 print("Fail decoding: Code is not defined.")
721 727 return
722 728
723 729 self.__nProfiles = dataOut.nProfiles
724 730 datadec = None
725 731
726 732 if mode == 3:
727 733 mode = 0
728 734
729 735 if dataOut.flagDataAsBlock:
730 736 """
731 737 Decoding when data have been read as block,
732 738 """
733 739
734 740 if mode == 0:
735 741 datadec = self.__convolutionByBlockInTime(dataOut.data)
736 742 if mode == 1:
737 743 datadec = self.__convolutionByBlockInFreq(dataOut.data)
738 744 else:
739 745 """
740 746 Decoding when data have been read profile by profile
741 747 """
742 748 if mode == 0:
743 749 datadec = self.__convolutionInTime(dataOut.data)
744 750
745 751 if mode == 1:
746 752 datadec = self.__convolutionInFreq(dataOut.data)
747 753
748 754 if mode == 2:
749 755 datadec = self.__convolutionInFreqOpt(dataOut.data)
750 756
751 757 if datadec is None:
752 758 raise ValueError("Codification mode selected is not valid: mode=%d. Try selecting 0 or 1" %mode)
753 759
754 760 dataOut.code = self.code
755 761 dataOut.nCode = self.nCode
756 762 dataOut.nBaud = self.nBaud
757 763
758 764 dataOut.data = datadec
759 765
760 766 dataOut.heightList = dataOut.heightList[0:datadec.shape[-1]]
761 767
762 768 dataOut.flagDecodeData = True #asumo q la data esta decodificada
763 769
764 770 if self.__profIndex == self.nCode-1:
765 771 self.__profIndex = 0
766 return 1
772 return dataOut
767 773
768 774 self.__profIndex += 1
769 775
770 return 1
776 return dataOut
771 777 # dataOut.flagDeflipData = True #asumo q la data no esta sin flip
772 778
773
779 @MPDecorator
774 780 class ProfileConcat(Operation):
775 781
776 782 isConfig = False
777 783 buffer = None
778 784
779 def __init__(self, **kwargs):
785 def __init__(self):#, **kwargs):
780 786
781 Operation.__init__(self, **kwargs)
787 Operation.__init__(self)#, **kwargs)
782 788 self.profileIndex = 0
783 789
784 790 def reset(self):
785 791 self.buffer = numpy.zeros_like(self.buffer)
786 792 self.start_index = 0
787 793 self.times = 1
788 794
789 795 def setup(self, data, m, n=1):
790 796 self.buffer = numpy.zeros((data.shape[0],data.shape[1]*m),dtype=type(data[0,0]))
791 797 self.nHeights = data.shape[1]#.nHeights
792 798 self.start_index = 0
793 799 self.times = 1
794 800
795 801 def concat(self, data):
796 802
797 803 self.buffer[:,self.start_index:self.nHeights*self.times] = data.copy()
798 804 self.start_index = self.start_index + self.nHeights
799 805
800 806 def run(self, dataOut, m):
801 807
802 808 dataOut.flagNoData = True
803 809
804 810 if not self.isConfig:
805 811 self.setup(dataOut.data, m, 1)
806 812 self.isConfig = True
807 813
808 814 if dataOut.flagDataAsBlock:
809 815 raise ValueError("ProfileConcat can only be used when voltage have been read profile by profile, getBlock = False")
810 816
811 817 else:
812 818 self.concat(dataOut.data)
813 819 self.times += 1
814 820 if self.times > m:
815 821 dataOut.data = self.buffer
816 822 self.reset()
817 823 dataOut.flagNoData = False
818 824 # se deben actualizar mas propiedades del header y del objeto dataOut, por ejemplo, las alturas
819 825 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
820 826 xf = dataOut.heightList[0] + dataOut.nHeights * deltaHeight * m
821 827 dataOut.heightList = numpy.arange(dataOut.heightList[0], xf, deltaHeight)
822 828 dataOut.ippSeconds *= m
823
829 return dataOut
830 @MPDecorator
824 831 class ProfileSelector(Operation):
825 832
826 833 profileIndex = None
827 834 # Tamanho total de los perfiles
828 835 nProfiles = None
829 836
830 def __init__(self, **kwargs):
837 def __init__(self):#, **kwargs):
831 838
832 Operation.__init__(self, **kwargs)
839 Operation.__init__(self)#, **kwargs)
833 840 self.profileIndex = 0
834 841
835 842 def incProfileIndex(self):
836 843
837 844 self.profileIndex += 1
838 845
839 846 if self.profileIndex >= self.nProfiles:
840 847 self.profileIndex = 0
841 848
842 849 def isThisProfileInRange(self, profileIndex, minIndex, maxIndex):
843 850
844 851 if profileIndex < minIndex:
845 852 return False
846 853
847 854 if profileIndex > maxIndex:
848 855 return False
849 856
850 857 return True
851 858
852 859 def isThisProfileInList(self, profileIndex, profileList):
853 860
854 861 if profileIndex not in profileList:
855 862 return False
856 863
857 864 return True
858 865
859 866 def run(self, dataOut, profileList=None, profileRangeList=None, beam=None, byblock=False, rangeList = None, nProfiles=None):
860 867
861 868 """
862 869 ProfileSelector:
863 870
864 871 Inputs:
865 872 profileList : Index of profiles selected. Example: profileList = (0,1,2,7,8)
866 873
867 874 profileRangeList : Minimum and maximum profile indexes. Example: profileRangeList = (4, 30)
868 875
869 876 rangeList : List of profile ranges. Example: rangeList = ((4, 30), (32, 64), (128, 256))
870 877
871 878 """
872 879
873 880 if rangeList is not None:
874 881 if type(rangeList[0]) not in (tuple, list):
875 882 rangeList = [rangeList]
876 883
877 884 dataOut.flagNoData = True
878 885
879 886 if dataOut.flagDataAsBlock:
880 887 """
881 888 data dimension = [nChannels, nProfiles, nHeis]
882 889 """
883 890 if profileList != None:
884 891 dataOut.data = dataOut.data[:,profileList,:]
885 892
886 893 if profileRangeList != None:
887 894 minIndex = profileRangeList[0]
888 895 maxIndex = profileRangeList[1]
889 896 profileList = list(range(minIndex, maxIndex+1))
890 897
891 898 dataOut.data = dataOut.data[:,minIndex:maxIndex+1,:]
892 899
893 900 if rangeList != None:
894 901
895 902 profileList = []
896 903
897 904 for thisRange in rangeList:
898 905 minIndex = thisRange[0]
899 906 maxIndex = thisRange[1]
900 907
901 908 profileList.extend(list(range(minIndex, maxIndex+1)))
902 909
903 910 dataOut.data = dataOut.data[:,profileList,:]
904 911
905 912 dataOut.nProfiles = len(profileList)
906 913 dataOut.profileIndex = dataOut.nProfiles - 1
907 914 dataOut.flagNoData = False
908 915
909 916 return True
910 917
911 918 """
912 919 data dimension = [nChannels, nHeis]
913 920 """
914 921
915 922 if profileList != None:
916 923
917 924 if self.isThisProfileInList(dataOut.profileIndex, profileList):
918 925
919 926 self.nProfiles = len(profileList)
920 927 dataOut.nProfiles = self.nProfiles
921 928 dataOut.profileIndex = self.profileIndex
922 929 dataOut.flagNoData = False
923 930
924 931 self.incProfileIndex()
925 932 return True
926 933
927 934 if profileRangeList != None:
928 935
929 936 minIndex = profileRangeList[0]
930 937 maxIndex = profileRangeList[1]
931 938
932 939 if self.isThisProfileInRange(dataOut.profileIndex, minIndex, maxIndex):
933 940
934 941 self.nProfiles = maxIndex - minIndex + 1
935 942 dataOut.nProfiles = self.nProfiles
936 943 dataOut.profileIndex = self.profileIndex
937 944 dataOut.flagNoData = False
938 945
939 946 self.incProfileIndex()
940 947 return True
941 948
942 949 if rangeList != None:
943 950
944 951 nProfiles = 0
945 952
946 953 for thisRange in rangeList:
947 954 minIndex = thisRange[0]
948 955 maxIndex = thisRange[1]
949 956
950 957 nProfiles += maxIndex - minIndex + 1
951 958
952 959 for thisRange in rangeList:
953 960
954 961 minIndex = thisRange[0]
955 962 maxIndex = thisRange[1]
956 963
957 964 if self.isThisProfileInRange(dataOut.profileIndex, minIndex, maxIndex):
958 965
959 966 self.nProfiles = nProfiles
960 967 dataOut.nProfiles = self.nProfiles
961 968 dataOut.profileIndex = self.profileIndex
962 969 dataOut.flagNoData = False
963 970
964 971 self.incProfileIndex()
965 972
966 973 break
967 974
968 975 return True
969 976
970 977
971 978 if beam != None: #beam is only for AMISR data
972 979 if self.isThisProfileInList(dataOut.profileIndex, dataOut.beamRangeDict[beam]):
973 980 dataOut.flagNoData = False
974 981 dataOut.profileIndex = self.profileIndex
975 982
976 983 self.incProfileIndex()
977 984
978 985 return True
979 986
980 987 raise ValueError("ProfileSelector needs profileList, profileRangeList or rangeList parameter")
981 988
982 return False
983
989 #return False
990 return dataOut
991 @MPDecorator
984 992 class Reshaper(Operation):
985 993
986 def __init__(self, **kwargs):
994 def __init__(self):#, **kwargs):
987 995
988 Operation.__init__(self, **kwargs)
996 Operation.__init__(self)#, **kwargs)
989 997
990 998 self.__buffer = None
991 999 self.__nitems = 0
992 1000
993 1001 def __appendProfile(self, dataOut, nTxs):
994 1002
995 1003 if self.__buffer is None:
996 1004 shape = (dataOut.nChannels, int(dataOut.nHeights/nTxs) )
997 1005 self.__buffer = numpy.empty(shape, dtype = dataOut.data.dtype)
998 1006
999 1007 ini = dataOut.nHeights * self.__nitems
1000 1008 end = ini + dataOut.nHeights
1001 1009
1002 1010 self.__buffer[:, ini:end] = dataOut.data
1003 1011
1004 1012 self.__nitems += 1
1005 1013
1006 1014 return int(self.__nitems*nTxs)
1007 1015
1008 1016 def __getBuffer(self):
1009 1017
1010 1018 if self.__nitems == int(1./self.__nTxs):
1011 1019
1012 1020 self.__nitems = 0
1013 1021
1014 1022 return self.__buffer.copy()
1015 1023
1016 1024 return None
1017 1025
1018 1026 def __checkInputs(self, dataOut, shape, nTxs):
1019 1027
1020 1028 if shape is None and nTxs is None:
1021 1029 raise ValueError("Reshaper: shape of factor should be defined")
1022 1030
1023 1031 if nTxs:
1024 1032 if nTxs < 0:
1025 1033 raise ValueError("nTxs should be greater than 0")
1026 1034
1027 1035 if nTxs < 1 and dataOut.nProfiles % (1./nTxs) != 0:
1028 1036 raise ValueError("nProfiles= %d is not divisibled by (1./nTxs) = %f" %(dataOut.nProfiles, (1./nTxs)))
1029 1037
1030 1038 shape = [dataOut.nChannels, dataOut.nProfiles*nTxs, dataOut.nHeights/nTxs]
1031 1039
1032 1040 return shape, nTxs
1033 1041
1034 1042 if len(shape) != 2 and len(shape) != 3:
1035 1043 raise ValueError("shape dimension should be equal to 2 or 3. shape = (nProfiles, nHeis) or (nChannels, nProfiles, nHeis). Actually shape = (%d, %d, %d)" %(dataOut.nChannels, dataOut.nProfiles, dataOut.nHeights))
1036 1044
1037 1045 if len(shape) == 2:
1038 1046 shape_tuple = [dataOut.nChannels]
1039 1047 shape_tuple.extend(shape)
1040 1048 else:
1041 1049 shape_tuple = list(shape)
1042 1050
1043 1051 nTxs = 1.0*shape_tuple[1]/dataOut.nProfiles
1044 1052
1045 1053 return shape_tuple, nTxs
1046 1054
1047 1055 def run(self, dataOut, shape=None, nTxs=None):
1048 1056
1049 1057 shape_tuple, self.__nTxs = self.__checkInputs(dataOut, shape, nTxs)
1050 1058
1051 1059 dataOut.flagNoData = True
1052 1060 profileIndex = None
1053 1061
1054 1062 if dataOut.flagDataAsBlock:
1055 1063
1056 1064 dataOut.data = numpy.reshape(dataOut.data, shape_tuple)
1057 1065 dataOut.flagNoData = False
1058 1066
1059 1067 profileIndex = int(dataOut.nProfiles*self.__nTxs) - 1
1060 1068
1061 1069 else:
1062 1070
1063 1071 if self.__nTxs < 1:
1064 1072
1065 1073 self.__appendProfile(dataOut, self.__nTxs)
1066 1074 new_data = self.__getBuffer()
1067 1075
1068 1076 if new_data is not None:
1069 1077 dataOut.data = new_data
1070 1078 dataOut.flagNoData = False
1071 1079
1072 1080 profileIndex = dataOut.profileIndex*nTxs
1073 1081
1074 1082 else:
1075 1083 raise ValueError("nTxs should be greater than 0 and lower than 1, or use VoltageReader(..., getblock=True)")
1076 1084
1077 1085 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
1078 1086
1079 1087 dataOut.heightList = numpy.arange(dataOut.nHeights/self.__nTxs) * deltaHeight + dataOut.heightList[0]
1080 1088
1081 1089 dataOut.nProfiles = int(dataOut.nProfiles*self.__nTxs)
1082 1090
1083 1091 dataOut.profileIndex = profileIndex
1084 1092
1085 1093 dataOut.ippSeconds /= self.__nTxs
1086 1094
1095 return dataOut
1096 @MPDecorator
1087 1097 class SplitProfiles(Operation):
1088 1098
1089 def __init__(self, **kwargs):
1099 def __init__(self):#, **kwargs):
1090 1100
1091 Operation.__init__(self, **kwargs)
1101 Operation.__init__(self)#, **kwargs)
1092 1102
1093 1103 def run(self, dataOut, n):
1094 1104
1095 1105 dataOut.flagNoData = True
1096 1106 profileIndex = None
1097 1107
1098 1108 if dataOut.flagDataAsBlock:
1099 1109
1100 1110 #nchannels, nprofiles, nsamples
1101 1111 shape = dataOut.data.shape
1102 1112
1103 1113 if shape[2] % n != 0:
1104 1114 raise ValueError("Could not split the data, n=%d has to be multiple of %d" %(n, shape[2]))
1105
1115
1106 1116 new_shape = shape[0], shape[1]*n, int(shape[2]/n)
1117
1107 1118 dataOut.data = numpy.reshape(dataOut.data, new_shape)
1108 1119 dataOut.flagNoData = False
1109 1120
1110 1121 profileIndex = int(dataOut.nProfiles/n) - 1
1111 1122
1112 1123 else:
1113 1124
1114 1125 raise ValueError("Could not split the data when is read Profile by Profile. Use VoltageReader(..., getblock=True)")
1115 1126
1116 1127 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
1117 1128
1118 1129 dataOut.heightList = numpy.arange(dataOut.nHeights/n) * deltaHeight + dataOut.heightList[0]
1119 1130
1120 1131 dataOut.nProfiles = int(dataOut.nProfiles*n)
1121 1132
1122 1133 dataOut.profileIndex = profileIndex
1123 1134
1124 1135 dataOut.ippSeconds /= n
1125 1136
1137 return dataOut
1138 @MPDecorator
1126 1139 class CombineProfiles(Operation):
1127 def __init__(self, **kwargs):
1140 def __init__(self):#, **kwargs):
1128 1141
1129 Operation.__init__(self, **kwargs)
1142 Operation.__init__(self)#, **kwargs)
1130 1143
1131 1144 self.__remData = None
1132 1145 self.__profileIndex = 0
1133 1146
1134 1147 def run(self, dataOut, n):
1135 1148
1136 1149 dataOut.flagNoData = True
1137 1150 profileIndex = None
1138 1151
1139 1152 if dataOut.flagDataAsBlock:
1140 1153
1141 1154 #nchannels, nprofiles, nsamples
1142 1155 shape = dataOut.data.shape
1143 1156 new_shape = shape[0], shape[1]/n, shape[2]*n
1144 1157
1145 1158 if shape[1] % n != 0:
1146 1159 raise ValueError("Could not split the data, n=%d has to be multiple of %d" %(n, shape[1]))
1147 1160
1148 1161 dataOut.data = numpy.reshape(dataOut.data, new_shape)
1149 1162 dataOut.flagNoData = False
1150 1163
1151 1164 profileIndex = int(dataOut.nProfiles*n) - 1
1152 1165
1153 1166 else:
1154 1167
1155 1168 #nchannels, nsamples
1156 1169 if self.__remData is None:
1157 1170 newData = dataOut.data
1158 1171 else:
1159 1172 newData = numpy.concatenate((self.__remData, dataOut.data), axis=1)
1160 1173
1161 1174 self.__profileIndex += 1
1162 1175
1163 1176 if self.__profileIndex < n:
1164 1177 self.__remData = newData
1165 1178 #continue
1166 1179 return
1167 1180
1168 1181 self.__profileIndex = 0
1169 1182 self.__remData = None
1170 1183
1171 1184 dataOut.data = newData
1172 1185 dataOut.flagNoData = False
1173 1186
1174 1187 profileIndex = dataOut.profileIndex/n
1175 1188
1176 1189
1177 1190 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
1178 1191
1179 1192 dataOut.heightList = numpy.arange(dataOut.nHeights*n) * deltaHeight + dataOut.heightList[0]
1180 1193
1181 1194 dataOut.nProfiles = int(dataOut.nProfiles/n)
1182 1195
1183 1196 dataOut.profileIndex = profileIndex
1184 1197
1185 1198 dataOut.ippSeconds *= n
1186 1199
1200 return dataOut
1187 1201 # import collections
1188 1202 # from scipy.stats import mode
1189 1203 #
1190 1204 # class Synchronize(Operation):
1191 1205 #
1192 1206 # isConfig = False
1193 1207 # __profIndex = 0
1194 1208 #
1195 1209 # def __init__(self, **kwargs):
1196 1210 #
1197 1211 # Operation.__init__(self, **kwargs)
1198 1212 # # self.isConfig = False
1199 1213 # self.__powBuffer = None
1200 1214 # self.__startIndex = 0
1201 1215 # self.__pulseFound = False
1202 1216 #
1203 1217 # def __findTxPulse(self, dataOut, channel=0, pulse_with = None):
1204 1218 #
1205 1219 # #Read data
1206 1220 #
1207 1221 # powerdB = dataOut.getPower(channel = channel)
1208 1222 # noisedB = dataOut.getNoise(channel = channel)[0]
1209 1223 #
1210 1224 # self.__powBuffer.extend(powerdB.flatten())
1211 1225 #
1212 1226 # dataArray = numpy.array(self.__powBuffer)
1213 1227 #
1214 1228 # filteredPower = numpy.correlate(dataArray, dataArray[0:self.__nSamples], "same")
1215 1229 #
1216 1230 # maxValue = numpy.nanmax(filteredPower)
1217 1231 #
1218 1232 # if maxValue < noisedB + 10:
1219 1233 # #No se encuentra ningun pulso de transmision
1220 1234 # return None
1221 1235 #
1222 1236 # maxValuesIndex = numpy.where(filteredPower > maxValue - 0.1*abs(maxValue))[0]
1223 1237 #
1224 1238 # if len(maxValuesIndex) < 2:
1225 1239 # #Solo se encontro un solo pulso de transmision de un baudio, esperando por el siguiente TX
1226 1240 # return None
1227 1241 #
1228 1242 # phasedMaxValuesIndex = maxValuesIndex - self.__nSamples
1229 1243 #
1230 1244 # #Seleccionar solo valores con un espaciamiento de nSamples
1231 1245 # pulseIndex = numpy.intersect1d(maxValuesIndex, phasedMaxValuesIndex)
1232 1246 #
1233 1247 # if len(pulseIndex) < 2:
1234 1248 # #Solo se encontro un pulso de transmision con ancho mayor a 1
1235 1249 # return None
1236 1250 #
1237 1251 # spacing = pulseIndex[1:] - pulseIndex[:-1]
1238 1252 #
1239 1253 # #remover senales que se distancien menos de 10 unidades o muestras
1240 1254 # #(No deberian existir IPP menor a 10 unidades)
1241 1255 #
1242 1256 # realIndex = numpy.where(spacing > 10 )[0]
1243 1257 #
1244 1258 # if len(realIndex) < 2:
1245 1259 # #Solo se encontro un pulso de transmision con ancho mayor a 1
1246 1260 # return None
1247 1261 #
1248 1262 # #Eliminar pulsos anchos (deja solo la diferencia entre IPPs)
1249 1263 # realPulseIndex = pulseIndex[realIndex]
1250 1264 #
1251 1265 # period = mode(realPulseIndex[1:] - realPulseIndex[:-1])[0][0]
1252 1266 #
1253 1267 # print "IPP = %d samples" %period
1254 1268 #
1255 1269 # self.__newNSamples = dataOut.nHeights #int(period)
1256 1270 # self.__startIndex = int(realPulseIndex[0])
1257 1271 #
1258 1272 # return 1
1259 1273 #
1260 1274 #
1261 1275 # def setup(self, nSamples, nChannels, buffer_size = 4):
1262 1276 #
1263 1277 # self.__powBuffer = collections.deque(numpy.zeros( buffer_size*nSamples,dtype=numpy.float),
1264 1278 # maxlen = buffer_size*nSamples)
1265 1279 #
1266 1280 # bufferList = []
1267 1281 #
1268 1282 # for i in range(nChannels):
1269 1283 # bufferByChannel = collections.deque(numpy.zeros( buffer_size*nSamples, dtype=numpy.complex) + numpy.NAN,
1270 1284 # maxlen = buffer_size*nSamples)
1271 1285 #
1272 1286 # bufferList.append(bufferByChannel)
1273 1287 #
1274 1288 # self.__nSamples = nSamples
1275 1289 # self.__nChannels = nChannels
1276 1290 # self.__bufferList = bufferList
1277 1291 #
1278 1292 # def run(self, dataOut, channel = 0):
1279 1293 #
1280 1294 # if not self.isConfig:
1281 1295 # nSamples = dataOut.nHeights
1282 1296 # nChannels = dataOut.nChannels
1283 1297 # self.setup(nSamples, nChannels)
1284 1298 # self.isConfig = True
1285 1299 #
1286 1300 # #Append new data to internal buffer
1287 1301 # for thisChannel in range(self.__nChannels):
1288 1302 # bufferByChannel = self.__bufferList[thisChannel]
1289 1303 # bufferByChannel.extend(dataOut.data[thisChannel])
1290 1304 #
1291 1305 # if self.__pulseFound:
1292 1306 # self.__startIndex -= self.__nSamples
1293 1307 #
1294 1308 # #Finding Tx Pulse
1295 1309 # if not self.__pulseFound:
1296 1310 # indexFound = self.__findTxPulse(dataOut, channel)
1297 1311 #
1298 1312 # if indexFound == None:
1299 1313 # dataOut.flagNoData = True
1300 1314 # return
1301 1315 #
1302 1316 # self.__arrayBuffer = numpy.zeros((self.__nChannels, self.__newNSamples), dtype = numpy.complex)
1303 1317 # self.__pulseFound = True
1304 1318 # self.__startIndex = indexFound
1305 1319 #
1306 1320 # #If pulse was found ...
1307 1321 # for thisChannel in range(self.__nChannels):
1308 1322 # bufferByChannel = self.__bufferList[thisChannel]
1309 1323 # #print self.__startIndex
1310 1324 # x = numpy.array(bufferByChannel)
1311 1325 # self.__arrayBuffer[thisChannel] = x[self.__startIndex:self.__startIndex+self.__newNSamples]
1312 1326 #
1313 1327 # deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
1314 1328 # dataOut.heightList = numpy.arange(self.__newNSamples)*deltaHeight
1315 1329 # # dataOut.ippSeconds = (self.__newNSamples / deltaHeight)/1e6
1316 1330 #
1317 1331 # dataOut.data = self.__arrayBuffer
1318 1332 #
1319 1333 # self.__startIndex += self.__newNSamples
1320 1334 #
1321 # return No newline at end of file
1335 # return
General Comments 0
You need to be logged in to leave comments. Login now