##// END OF EJS Templates
Multiprocessing for digitalRF just for reading Unit(Task #1461 )
George Yong -
r1192:fa272605c13d
parent child
Show More
@@ -1,1254 +1,1257
1 1 '''
2 2 Updated on January , 2018, for multiprocessing purposes
3 3 Author: Sergio Cortez
4 4 Created on September , 2012
5 5 '''
6 6 from platform import python_version
7 7 import sys
8 8 import ast
9 9 import datetime
10 10 import traceback
11 11 import math
12 12 import time
13 13 import zmq
14 14 from multiprocessing import Process, cpu_count
15 15 from threading import Thread
16 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 17 from xml.dom import minidom
18 18
19 19
20 20 from schainpy.admin import Alarm, SchainWarning
21 21 from schainpy.model import *
22 22 from schainpy.utils import log
23 23
24 24
25 25 DTYPES = {
26 26 'Voltage': '.r',
27 27 'Spectra': '.pdata'
28 28 }
29 29
30 30
31 31 def MPProject(project, n=cpu_count()):
32 32 '''
33 33 Project wrapper to run schain in n processes
34 34 '''
35 35
36 36 rconf = project.getReadUnitObj()
37 37 op = rconf.getOperationObj('run')
38 38 dt1 = op.getParameterValue('startDate')
39 39 dt2 = op.getParameterValue('endDate')
40 40 tm1 = op.getParameterValue('startTime')
41 41 tm2 = op.getParameterValue('endTime')
42 42 days = (dt2 - dt1).days
43 43
44 44 for day in range(days + 1):
45 45 skip = 0
46 46 cursor = 0
47 47 processes = []
48 48 dt = dt1 + datetime.timedelta(day)
49 49 dt_str = dt.strftime('%Y/%m/%d')
50 50 reader = JRODataReader()
51 51 paths, files = reader.searchFilesOffLine(path=rconf.path,
52 52 startDate=dt,
53 53 endDate=dt,
54 54 startTime=tm1,
55 55 endTime=tm2,
56 56 ext=DTYPES[rconf.datatype])
57 57 nFiles = len(files)
58 58 if nFiles == 0:
59 59 continue
60 60 skip = int(math.ceil(nFiles / n))
61 61 while nFiles > cursor * skip:
62 62 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
63 63 skip=skip)
64 64 p = project.clone()
65 65 p.start()
66 66 processes.append(p)
67 67 cursor += 1
68 68
69 69 def beforeExit(exctype, value, trace):
70 70 for process in processes:
71 71 process.terminate()
72 72 process.join()
73 73 print(traceback.print_tb(trace))
74 74
75 75 sys.excepthook = beforeExit
76 76
77 77 for process in processes:
78 78 process.join()
79 79 process.terminate()
80 80
81 81 time.sleep(3)
82 82
83 83 def wait(context):
84 84
85 85 time.sleep(1)
86 86 c = zmq.Context()
87 87 receiver = c.socket(zmq.SUB)
88 88 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
89 89 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
90 90 msg = receiver.recv_multipart()[1]
91 91 context.terminate()
92 92
93 93 class ParameterConf():
94 94
95 95 id = None
96 96 name = None
97 97 value = None
98 98 format = None
99 99
100 100 __formated_value = None
101 101
102 102 ELEMENTNAME = 'Parameter'
103 103
104 104 def __init__(self):
105 105
106 106 self.format = 'str'
107 107
108 108 def getElementName(self):
109 109
110 110 return self.ELEMENTNAME
111 111
112 112 def getValue(self):
113 113
114 114 value = self.value
115 115 format = self.format
116 116
117 117 if self.__formated_value != None:
118 118
119 119 return self.__formated_value
120 120
121 121 if format == 'obj':
122 122 return value
123 123
124 124 if format == 'str':
125 125 self.__formated_value = str(value)
126 126 return self.__formated_value
127 127
128 128 if value == '':
129 129 raise ValueError('%s: This parameter value is empty' % self.name)
130 130
131 131 if format == 'list':
132 132 strList = value.split(',')
133 133
134 134 self.__formated_value = strList
135 135
136 136 return self.__formated_value
137 137
138 138 if format == 'intlist':
139 139 '''
140 140 Example:
141 141 value = (0,1,2)
142 142 '''
143 143
144 144 new_value = ast.literal_eval(value)
145 145
146 146 if type(new_value) not in (tuple, list):
147 147 new_value = [int(new_value)]
148 148
149 149 self.__formated_value = new_value
150 150
151 151 return self.__formated_value
152 152
153 153 if format == 'floatlist':
154 154 '''
155 155 Example:
156 156 value = (0.5, 1.4, 2.7)
157 157 '''
158 158
159 159 new_value = ast.literal_eval(value)
160 160
161 161 if type(new_value) not in (tuple, list):
162 162 new_value = [float(new_value)]
163 163
164 164 self.__formated_value = new_value
165 165
166 166 return self.__formated_value
167 167
168 168 if format == 'date':
169 169 strList = value.split('/')
170 170 intList = [int(x) for x in strList]
171 171 date = datetime.date(intList[0], intList[1], intList[2])
172 172
173 173 self.__formated_value = date
174 174
175 175 return self.__formated_value
176 176
177 177 if format == 'time':
178 178 strList = value.split(':')
179 179 intList = [int(x) for x in strList]
180 180 time = datetime.time(intList[0], intList[1], intList[2])
181 181
182 182 self.__formated_value = time
183 183
184 184 return self.__formated_value
185 185
186 186 if format == 'pairslist':
187 187 '''
188 188 Example:
189 189 value = (0,1),(1,2)
190 190 '''
191 191
192 192 new_value = ast.literal_eval(value)
193 193
194 194 if type(new_value) not in (tuple, list):
195 195 raise ValueError('%s has to be a tuple or list of pairs' % value)
196 196
197 197 if type(new_value[0]) not in (tuple, list):
198 198 if len(new_value) != 2:
199 199 raise ValueError('%s has to be a tuple or list of pairs' % value)
200 200 new_value = [new_value]
201 201
202 202 for thisPair in new_value:
203 203 if len(thisPair) != 2:
204 204 raise ValueError('%s has to be a tuple or list of pairs' % value)
205 205
206 206 self.__formated_value = new_value
207 207
208 208 return self.__formated_value
209 209
210 210 if format == 'multilist':
211 211 '''
212 212 Example:
213 213 value = (0,1,2),(3,4,5)
214 214 '''
215 215 multiList = ast.literal_eval(value)
216 216
217 217 if type(multiList[0]) == int:
218 218 multiList = ast.literal_eval('(' + value + ')')
219 219
220 220 self.__formated_value = multiList
221 221
222 222 return self.__formated_value
223 223
224 224 if format == 'bool':
225 225 value = int(value)
226 226
227 227 if format == 'int':
228 228 value = float(value)
229 229
230 230 format_func = eval(format)
231 231
232 232 self.__formated_value = format_func(value)
233 233
234 234 return self.__formated_value
235 235
236 236 def updateId(self, new_id):
237 237
238 238 self.id = str(new_id)
239 239
240 240 def setup(self, id, name, value, format='str'):
241 241 self.id = str(id)
242 242 self.name = name
243 243 if format == 'obj':
244 244 self.value = value
245 245 else:
246 246 self.value = str(value)
247 247 self.format = str.lower(format)
248 248
249 249 self.getValue()
250 250
251 251 return 1
252 252
253 253 def update(self, name, value, format='str'):
254 254
255 255 self.name = name
256 256 self.value = str(value)
257 257 self.format = format
258 258
259 259 def makeXml(self, opElement):
260 260 if self.name not in ('queue',):
261 261 parmElement = SubElement(opElement, self.ELEMENTNAME)
262 262 parmElement.set('id', str(self.id))
263 263 parmElement.set('name', self.name)
264 264 parmElement.set('value', self.value)
265 265 parmElement.set('format', self.format)
266 266
267 267 def readXml(self, parmElement):
268 268
269 269 self.id = parmElement.get('id')
270 270 self.name = parmElement.get('name')
271 271 self.value = parmElement.get('value')
272 272 self.format = str.lower(parmElement.get('format'))
273 273
274 274 # Compatible with old signal chain version
275 275 if self.format == 'int' and self.name == 'idfigure':
276 276 self.name = 'id'
277 277
278 278 def printattr(self):
279 279
280 280 print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id))
281 281
282 282 class OperationConf():
283 283
284 284 ELEMENTNAME = 'Operation'
285 285
286 286 def __init__(self):
287 287
288 288 self.id = '0'
289 289 self.name = None
290 290 self.priority = None
291 291 self.topic = None
292 292
293 293 def __getNewId(self):
294 294
295 295 return int(self.id) * 10 + len(self.parmConfObjList) + 1
296 296
297 297 def getId(self):
298 298 return self.id
299 299
300 300 def updateId(self, new_id):
301 301
302 302 self.id = str(new_id)
303 303
304 304 n = 1
305 305 for parmObj in self.parmConfObjList:
306 306
307 307 idParm = str(int(new_id) * 10 + n)
308 308 parmObj.updateId(idParm)
309 309
310 310 n += 1
311 311
312 312 def getElementName(self):
313 313
314 314 return self.ELEMENTNAME
315 315
316 316 def getParameterObjList(self):
317 317
318 318 return self.parmConfObjList
319 319
320 320 def getParameterObj(self, parameterName):
321 321
322 322 for parmConfObj in self.parmConfObjList:
323 323
324 324 if parmConfObj.name != parameterName:
325 325 continue
326 326
327 327 return parmConfObj
328 328
329 329 return None
330 330
331 331 def getParameterObjfromValue(self, parameterValue):
332 332
333 333 for parmConfObj in self.parmConfObjList:
334 334
335 335 if parmConfObj.getValue() != parameterValue:
336 336 continue
337 337
338 338 return parmConfObj.getValue()
339 339
340 340 return None
341 341
342 342 def getParameterValue(self, parameterName):
343 343
344 344 parameterObj = self.getParameterObj(parameterName)
345 345
346 346 # if not parameterObj:
347 347 # return None
348 348
349 349 value = parameterObj.getValue()
350 350
351 351 return value
352 352
353 353 def getKwargs(self):
354 354
355 355 kwargs = {}
356 356
357 357 for parmConfObj in self.parmConfObjList:
358 358 if self.name == 'run' and parmConfObj.name == 'datatype':
359 359 continue
360 360
361 361 kwargs[parmConfObj.name] = parmConfObj.getValue()
362 362
363 363 return kwargs
364 364
365 365 def setup(self, id, name, priority, type, project_id):
366 366
367 367 self.id = str(id)
368 368 self.project_id = project_id
369 369 self.name = name
370 370 self.type = type
371 371 self.priority = priority
372 372 self.parmConfObjList = []
373 373
374 374 def removeParameters(self):
375 375
376 376 for obj in self.parmConfObjList:
377 377 del obj
378 378
379 379 self.parmConfObjList = []
380 380
381 381 def addParameter(self, name, value, format='str'):
382 382
383 383 if value is None:
384 384 return None
385 385 id = self.__getNewId()
386 386
387 387 parmConfObj = ParameterConf()
388 388 if not parmConfObj.setup(id, name, value, format):
389 389 return None
390 390
391 391 self.parmConfObjList.append(parmConfObj)
392 392
393 393 return parmConfObj
394 394
395 395 def changeParameter(self, name, value, format='str'):
396 396
397 397 parmConfObj = self.getParameterObj(name)
398 398 parmConfObj.update(name, value, format)
399 399
400 400 return parmConfObj
401 401
402 402 def makeXml(self, procUnitElement):
403 403
404 404 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
405 405 opElement.set('id', str(self.id))
406 406 opElement.set('name', self.name)
407 407 opElement.set('type', self.type)
408 408 opElement.set('priority', str(self.priority))
409 409
410 410 for parmConfObj in self.parmConfObjList:
411 411 parmConfObj.makeXml(opElement)
412 412
413 413 def readXml(self, opElement, project_id):
414 414
415 415 self.id = opElement.get('id')
416 416 self.name = opElement.get('name')
417 417 self.type = opElement.get('type')
418 418 self.priority = opElement.get('priority')
419 self.project_id = str(project_id) #yong
419 self.project_id = str(project_id)
420 420
421 421 # Compatible with old signal chain version
422 422 # Use of 'run' method instead 'init'
423 423 if self.type == 'self' and self.name == 'init':
424 424 self.name = 'run'
425 425
426 426 self.parmConfObjList = []
427 427
428 428 parmElementList = opElement.iter(ParameterConf().getElementName())
429 429
430 430 for parmElement in parmElementList:
431 431 parmConfObj = ParameterConf()
432 432 parmConfObj.readXml(parmElement)
433 433
434 434 # Compatible with old signal chain version
435 435 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
436 436 if self.type != 'self' and self.name == 'Plot':
437 437 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
438 438 self.name = parmConfObj.value
439 439 continue
440 440
441 441 self.parmConfObjList.append(parmConfObj)
442 442
443 443 def printattr(self):
444 444
445 445 print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME,
446 446 self.id,
447 447 self.name,
448 448 self.type,
449 449 self.priority,
450 450 self.project_id))
451 451
452 452 for parmConfObj in self.parmConfObjList:
453 453 parmConfObj.printattr()
454 454
455 455 def createObject(self):
456 456
457 457 className = eval(self.name)
458 458
459 459 if self.type == 'other':
460 460 opObj = className()
461 461 elif self.type == 'external':
462 462 kwargs = self.getKwargs()
463 463 opObj = className(self.id, self.project_id, **kwargs)
464 464 opObj.start()
465 465
466 466 return opObj
467 467
468 468 class ProcUnitConf():
469 469
470 470 ELEMENTNAME = 'ProcUnit'
471 471
472 472 def __init__(self):
473 473
474 474 self.id = None
475 475 self.datatype = None
476 476 self.name = None
477 477 self.inputId = None
478 478 self.opConfObjList = []
479 479 self.procUnitObj = None
480 480 self.opObjDict = {}
481 481
482 482 def __getPriority(self):
483 483
484 484 return len(self.opConfObjList) + 1
485 485
486 486 def __getNewId(self):
487 487
488 488 return int(self.id) * 10 + len(self.opConfObjList) + 1
489 489
490 490 def getElementName(self):
491 491
492 492 return self.ELEMENTNAME
493 493
494 494 def getId(self):
495 495
496 496 return self.id
497 497
498 498 def updateId(self, new_id):
499 499 '''
500 500 new_id = int(parentId) * 10 + (int(self.id) % 10)
501 501 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
502 502
503 503 # If this proc unit has not inputs
504 504 #if self.inputId == '0':
505 505 #new_inputId = 0
506 506
507 507 n = 1
508 508 for opConfObj in self.opConfObjList:
509 509
510 510 idOp = str(int(new_id) * 10 + n)
511 511 opConfObj.updateId(idOp)
512 512
513 513 n += 1
514 514
515 515 self.parentId = str(parentId)
516 516 self.id = str(new_id)
517 517 #self.inputId = str(new_inputId)
518 518 '''
519 519 n = 1
520 520
521 521 def getInputId(self):
522 522
523 523 return self.inputId
524 524
525 525 def getOperationObjList(self):
526 526
527 527 return self.opConfObjList
528 528
529 529 def getOperationObj(self, name=None):
530 530
531 531 for opConfObj in self.opConfObjList:
532 532
533 533 if opConfObj.name != name:
534 534 continue
535 535
536 536 return opConfObj
537 537
538 538 return None
539 539
540 540 def getOpObjfromParamValue(self, value=None):
541 541
542 542 for opConfObj in self.opConfObjList:
543 543 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
544 544 continue
545 545 return opConfObj
546 546 return None
547 547
548 548 def getProcUnitObj(self):
549 549
550 550 return self.procUnitObj
551 551
552 552 def setup(self, project_id, id, name, datatype, inputId):
553 553 '''
554 554 id sera el topico a publicar
555 555 inputId sera el topico a subscribirse
556 556 '''
557 557
558 558 # Compatible with old signal chain version
559 559 if datatype == None and name == None:
560 560 raise ValueError('datatype or name should be defined')
561 561
562 562 #Definir una condicion para inputId cuando sea 0
563 563
564 564 if name == None:
565 565 if 'Proc' in datatype:
566 566 name = datatype
567 567 else:
568 568 name = '%sProc' % (datatype)
569 569
570 570 if datatype == None:
571 571 datatype = name.replace('Proc', '')
572 572
573 573 self.id = str(id)
574 574 self.project_id = project_id
575 575 self.name = name
576 576 self.datatype = datatype
577 577 self.inputId = inputId
578 578 self.opConfObjList = []
579 579
580 580 self.addOperation(name='run', optype='self')
581 581
582 582 def removeOperations(self):
583 583
584 584 for obj in self.opConfObjList:
585 585 del obj
586 586
587 587 self.opConfObjList = []
588 588 self.addOperation(name='run')
589 589
590 590 def addParameter(self, **kwargs):
591 591 '''
592 592 Add parameters to 'run' operation
593 593 '''
594 594 opObj = self.opConfObjList[0]
595 595
596 596 opObj.addParameter(**kwargs)
597 597
598 598 return opObj
599 599
600 600 def addOperation(self, name, optype='self'):
601 601 '''
602 602 Actualizacion - > proceso comunicacion
603 603 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
604 604 definir el tipoc de socket o comunicacion ipc++
605 605
606 606 '''
607 607
608 608 id = self.__getNewId()
609 609 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
610 610 opConfObj = OperationConf()
611 611 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id)
612 612 self.opConfObjList.append(opConfObj)
613 613
614 614 return opConfObj
615 615
616 616 def makeXml(self, projectElement):
617 617
618 618 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
619 619 procUnitElement.set('id', str(self.id))
620 620 procUnitElement.set('name', self.name)
621 621 procUnitElement.set('datatype', self.datatype)
622 622 procUnitElement.set('inputId', str(self.inputId))
623 623
624 624 for opConfObj in self.opConfObjList:
625 625 opConfObj.makeXml(procUnitElement)
626 626
627 627 def readXml(self, upElement, project_id):
628 628
629 629 self.id = upElement.get('id')
630 630 self.name = upElement.get('name')
631 631 self.datatype = upElement.get('datatype')
632 632 self.inputId = upElement.get('inputId')
633 633 self.project_id = str(project_id)
634 634
635 635 if self.ELEMENTNAME == 'ReadUnit':
636 636 self.datatype = self.datatype.replace('Reader', '')
637 637
638 638 if self.ELEMENTNAME == 'ProcUnit':
639 639 self.datatype = self.datatype.replace('Proc', '')
640 640
641 641 if self.inputId == 'None':
642 642 self.inputId = '0'
643 643
644 644 self.opConfObjList = []
645 645
646 646 opElementList = upElement.iter(OperationConf().getElementName())
647 647
648 648 for opElement in opElementList:
649 649 opConfObj = OperationConf()
650 650 opConfObj.readXml(opElement, project_id)
651 651 self.opConfObjList.append(opConfObj)
652 652
653 653 def printattr(self):
654 654
655 655 print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME,
656 656 self.id,
657 657 self.name,
658 658 self.datatype,
659 659 self.inputId,
660 660 self.project_id))
661 661
662 662 for opConfObj in self.opConfObjList:
663 663 opConfObj.printattr()
664 664
665 665 def getKwargs(self):
666 666
667 667 opObj = self.opConfObjList[0]
668 668 kwargs = opObj.getKwargs()
669 669
670 670 return kwargs
671 671
672 672 def createObjects(self):
673 673 '''
674 674 Instancia de unidades de procesamiento.
675 675 '''
676 676
677 677 className = eval(self.name)
678 678 kwargs = self.getKwargs()
679 679 procUnitObj = className(self.id, self.inputId, self.project_id, **kwargs) # necesitan saber su id y su entrada por fines de ipc
680 680 log.success('creating process...', self.name)
681 681
682 682 for opConfObj in self.opConfObjList:
683 683
684 684 if opConfObj.type == 'self' and opConfObj.name == 'run':
685 685 continue
686 686 elif opConfObj.type == 'self':
687 687 opObj = getattr(procUnitObj, opConfObj.name)
688 688 else:
689 689 opObj = opConfObj.createObject()
690 690
691 691 log.success('creating operation: {}, type:{}'.format(
692 692 opConfObj.name,
693 693 opConfObj.type), self.name)
694 694
695 695 procUnitObj.addOperation(opConfObj, opObj)
696 696
697 697 procUnitObj.start()
698 698 self.procUnitObj = procUnitObj
699 699
700 700 def close(self):
701 701
702 702 for opConfObj in self.opConfObjList:
703 703 if opConfObj.type == 'self':
704 704 continue
705 705
706 706 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
707 707 opObj.close()
708 708
709 709 self.procUnitObj.close()
710 710
711 711 return
712 712
713 713
714 714 class ReadUnitConf(ProcUnitConf):
715 715
716 716 ELEMENTNAME = 'ReadUnit'
717 717
718 718 def __init__(self):
719 719
720 720 self.id = None
721 721 self.datatype = None
722 722 self.name = None
723 723 self.inputId = None
724 724 self.opConfObjList = []
725 725
726 726 def getElementName(self):
727 727
728 728 return self.ELEMENTNAME
729 729
730 730 def setup(self, project_id, id, name, datatype, path='', startDate='', endDate='',
731 731 startTime='', endTime='', server=None, **kwargs):
732 732
733 733
734 734 '''
735 735 *****el id del proceso sera el Topico
736 736
737 737 Adicion de {topic}, si no esta presente -> error
738 738 kwargs deben ser trasmitidos en la instanciacion
739 739
740 740 '''
741 741
742 742 # Compatible with old signal chain version
743 743 if datatype == None and name == None:
744 744 raise ValueError('datatype or name should be defined')
745 745 if name == None:
746 746 if 'Reader' in datatype:
747 747 name = datatype
748 748 datatype = name.replace('Reader','')
749 749 else:
750 750 name = '{}Reader'.format(datatype)
751 751 if datatype == None:
752 752 if 'Reader' in name:
753 753 datatype = name.replace('Reader','')
754 754 else:
755 755 datatype = name
756 756 name = '{}Reader'.format(name)
757 757
758 758 self.id = id
759 759 self.project_id = project_id
760 760 self.name = name
761 761 self.datatype = datatype
762 762 if path != '':
763 763 self.path = os.path.abspath(path)
764 764 self.startDate = startDate
765 765 self.endDate = endDate
766 766 self.startTime = startTime
767 767 self.endTime = endTime
768 768 self.server = server
769 769 self.addRunOperation(**kwargs)
770 770
771 771 def update(self, **kwargs):
772 772
773 773 if 'datatype' in kwargs:
774 774 datatype = kwargs.pop('datatype')
775 775 if 'Reader' in datatype:
776 776 self.name = datatype
777 777 else:
778 778 self.name = '%sReader' % (datatype)
779 779 self.datatype = self.name.replace('Reader', '')
780 780
781 781 attrs = ('path', 'startDate', 'endDate',
782 782 'startTime', 'endTime')
783 783
784 784 for attr in attrs:
785 785 if attr in kwargs:
786 786 setattr(self, attr, kwargs.pop(attr))
787 787
788 788 self.updateRunOperation(**kwargs)
789 789
790 790 def removeOperations(self):
791 791
792 792 for obj in self.opConfObjList:
793 793 del obj
794 794
795 795 self.opConfObjList = []
796 796
797 797 def addRunOperation(self, **kwargs):
798 798
799 799 opObj = self.addOperation(name='run', optype='self')
800 800
801 801 if self.server is None:
802 802 opObj.addParameter(
803 803 name='datatype', value=self.datatype, format='str')
804 804 opObj.addParameter(name='path', value=self.path, format='str')
805 805 opObj.addParameter(
806 806 name='startDate', value=self.startDate, format='date')
807 807 opObj.addParameter(
808 808 name='endDate', value=self.endDate, format='date')
809 809 opObj.addParameter(
810 810 name='startTime', value=self.startTime, format='time')
811 811 opObj.addParameter(
812 812 name='endTime', value=self.endTime, format='time')
813 813
814 814 for key, value in list(kwargs.items()):
815 815 opObj.addParameter(name=key, value=value,
816 816 format=type(value).__name__)
817 817 else:
818 818 opObj.addParameter(name='server', value=self.server, format='str')
819 819
820 820 return opObj
821 821
822 822 def updateRunOperation(self, **kwargs):
823 823
824 824 opObj = self.getOperationObj(name='run')
825 825 opObj.removeParameters()
826 826
827 827 opObj.addParameter(name='datatype', value=self.datatype, format='str')
828 828 opObj.addParameter(name='path', value=self.path, format='str')
829 829 opObj.addParameter(
830 830 name='startDate', value=self.startDate, format='date')
831 831 opObj.addParameter(name='endDate', value=self.endDate, format='date')
832 832 opObj.addParameter(
833 833 name='startTime', value=self.startTime, format='time')
834 834 opObj.addParameter(name='endTime', value=self.endTime, format='time')
835 835
836 836 for key, value in list(kwargs.items()):
837 837 opObj.addParameter(name=key, value=value,
838 838 format=type(value).__name__)
839 839
840 840 return opObj
841 841
842 842 def readXml(self, upElement, project_id):
843 843
844 844 self.id = upElement.get('id')
845 845 self.name = upElement.get('name')
846 846 self.datatype = upElement.get('datatype')
847 847 self.project_id = str(project_id) #yong
848 848
849 849 if self.ELEMENTNAME == 'ReadUnit':
850 850 self.datatype = self.datatype.replace('Reader', '')
851 851
852 852 self.opConfObjList = []
853 853
854 854 opElementList = upElement.iter(OperationConf().getElementName())
855 855
856 856 for opElement in opElementList:
857 857 opConfObj = OperationConf()
858 858 opConfObj.readXml(opElement, project_id)
859 859 self.opConfObjList.append(opConfObj)
860 860
861 861 if opConfObj.name == 'run':
862 862 self.path = opConfObj.getParameterValue('path')
863 863 self.startDate = opConfObj.getParameterValue('startDate')
864 864 self.endDate = opConfObj.getParameterValue('endDate')
865 865 self.startTime = opConfObj.getParameterValue('startTime')
866 866 self.endTime = opConfObj.getParameterValue('endTime')
867 867
868 868
869 869 class Project(Process):
870 870
871 871 ELEMENTNAME = 'Project'
872 872
873 873 def __init__(self):
874 874
875 875 Process.__init__(self)
876 876 self.id = None
877 877 self.filename = None
878 878 self.description = None
879 879 self.email = None
880 880 self.alarm = None
881 881 self.procUnitConfObjDict = {}
882 882
883 883 def __getNewId(self):
884 884
885 885 idList = list(self.procUnitConfObjDict.keys())
886 886 id = int(self.id) * 10
887 887
888 888 while True:
889 889 id += 1
890 890
891 891 if str(id) in idList:
892 892 continue
893 893
894 894 break
895 895
896 896 return str(id)
897 897
898 898 def getElementName(self):
899 899
900 900 return self.ELEMENTNAME
901 901
902 902 def getId(self):
903 903
904 904 return self.id
905 905
906 906 def updateId(self, new_id):
907 907
908 908 self.id = str(new_id)
909 909
910 910 keyList = list(self.procUnitConfObjDict.keys())
911 911 keyList.sort()
912 912
913 913 n = 1
914 914 newProcUnitConfObjDict = {}
915 915
916 916 for procKey in keyList:
917 917
918 918 procUnitConfObj = self.procUnitConfObjDict[procKey]
919 919 idProcUnit = str(int(self.id) * 10 + n)
920 920 procUnitConfObj.updateId(idProcUnit)
921 921 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
922 922 n += 1
923 923
924 924 self.procUnitConfObjDict = newProcUnitConfObjDict
925 925
926 926 def setup(self, id=1, name='', description='', email=None, alarm=[]):
927 927
928 928 print(' ')
929 929 print('*' * 60)
930 930 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
931 931 print('*' * 60)
932 932 print("* Python " + python_version() + " *")
933 933 print('*' * 19)
934 934 print(' ')
935 935 self.id = str(id)
936 936 self.description = description
937 937 self.email = email
938 938 self.alarm = alarm
939 939
940 940 def update(self, **kwargs):
941 941
942 942 for key, value in list(kwargs.items()):
943 943 setattr(self, key, value)
944 944
945 945 def clone(self):
946 946
947 947 p = Project()
948 948 p.procUnitConfObjDict = self.procUnitConfObjDict
949 949 return p
950 950
951 951 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
952 952
953 953 '''
954 954 Actualizacion:
955 955 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
956 956
957 957 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
958 958
959 959 '''
960 960
961 961 if id is None:
962 962 idReadUnit = self.__getNewId()
963 963 else:
964 964 idReadUnit = str(id)
965 965
966 966 readUnitConfObj = ReadUnitConf()
967 967 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, **kwargs)
968 968 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
969 969
970 970 return readUnitConfObj
971 971
972 972 def addProcUnit(self, inputId='0', datatype=None, name=None):
973 973
974 974 '''
975 975 Actualizacion:
976 976 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
977 977 Deberia reemplazar a "inputId"
978 978
979 979 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
980 980 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
981 981
982 982 '''
983 983
984 984 idProcUnit = self.__getNewId() #Topico para subscripcion
985 985 procUnitConfObj = ProcUnitConf()
986 986 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId) #topic_read, topic_write,
987 987 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
988 988
989 989 return procUnitConfObj
990 990
991 991 def removeProcUnit(self, id):
992 992
993 993 if id in list(self.procUnitConfObjDict.keys()):
994 994 self.procUnitConfObjDict.pop(id)
995 995
996 996 def getReadUnitId(self):
997 997
998 998 readUnitConfObj = self.getReadUnitObj()
999 999
1000 1000 return readUnitConfObj.id
1001 1001
1002 1002 def getReadUnitObj(self):
1003 1003
1004 1004 for obj in list(self.procUnitConfObjDict.values()):
1005 1005 if obj.getElementName() == 'ReadUnit':
1006 1006 return obj
1007 1007
1008 1008 return None
1009 1009
1010 1010 def getProcUnitObj(self, id=None, name=None):
1011 1011
1012 1012 if id != None:
1013 1013 return self.procUnitConfObjDict[id]
1014 1014
1015 1015 if name != None:
1016 1016 return self.getProcUnitObjByName(name)
1017 1017
1018 1018 return None
1019 1019
1020 1020 def getProcUnitObjByName(self, name):
1021 1021
1022 1022 for obj in list(self.procUnitConfObjDict.values()):
1023 1023 if obj.name == name:
1024 1024 return obj
1025 1025
1026 1026 return None
1027 1027
1028 1028 def procUnitItems(self):
1029 1029
1030 1030 return list(self.procUnitConfObjDict.items())
1031 1031
1032 1032 def makeXml(self):
1033 1033
1034 1034 projectElement = Element('Project')
1035 1035 projectElement.set('id', str(self.id))
1036 1036 projectElement.set('name', self.name)
1037 1037 projectElement.set('description', self.description)
1038 1038
1039 1039 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1040 1040 procUnitConfObj.makeXml(projectElement)
1041 1041
1042 1042 self.projectElement = projectElement
1043 1043
1044 1044 def writeXml(self, filename=None):
1045 1045
1046 1046 if filename == None:
1047 1047 if self.filename:
1048 1048 filename = self.filename
1049 1049 else:
1050 1050 filename = 'schain.xml'
1051 1051
1052 1052 if not filename:
1053 1053 print('filename has not been defined. Use setFilename(filename) for do it.')
1054 1054 return 0
1055 1055
1056 1056 abs_file = os.path.abspath(filename)
1057 1057
1058 1058 if not os.access(os.path.dirname(abs_file), os.W_OK):
1059 1059 print('No write permission on %s' % os.path.dirname(abs_file))
1060 1060 return 0
1061 1061
1062 1062 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1063 1063 print('File %s already exists and it could not be overwriten' % abs_file)
1064 1064 return 0
1065 1065
1066 1066 self.makeXml()
1067 1067
1068 1068 ElementTree(self.projectElement).write(abs_file, method='xml')
1069 1069
1070 1070 self.filename = abs_file
1071 1071
1072 1072 return 1
1073 1073
1074 1074 def readXml(self, filename=None):
1075 1075
1076 1076 if not filename:
1077 1077 print('filename is not defined')
1078 1078 return 0
1079 1079
1080 1080 abs_file = os.path.abspath(filename)
1081 1081
1082 1082 if not os.path.isfile(abs_file):
1083 1083 print('%s file does not exist' % abs_file)
1084 1084 return 0
1085 1085
1086 1086 self.projectElement = None
1087 1087 self.procUnitConfObjDict = {}
1088 1088
1089 1089 try:
1090 1090 self.projectElement = ElementTree().parse(abs_file)
1091 1091 except:
1092 1092 print('Error reading %s, verify file format' % filename)
1093 1093 return 0
1094 1094
1095 1095 self.project = self.projectElement.tag
1096 1096
1097 1097 self.id = self.projectElement.get('id')
1098 1098 self.name = self.projectElement.get('name')
1099 1099 self.description = self.projectElement.get('description')
1100 1100
1101 1101 readUnitElementList = self.projectElement.iter(
1102 1102 ReadUnitConf().getElementName())
1103 1103
1104 1104 for readUnitElement in readUnitElementList:
1105 1105 readUnitConfObj = ReadUnitConf()
1106 1106 readUnitConfObj.readXml(readUnitElement, self.id)
1107 1107 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1108 1108
1109 1109 procUnitElementList = self.projectElement.iter(
1110 1110 ProcUnitConf().getElementName())
1111 1111
1112 1112 for procUnitElement in procUnitElementList:
1113 1113 procUnitConfObj = ProcUnitConf()
1114 1114 procUnitConfObj.readXml(procUnitElement, self.id)
1115 1115 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1116 1116
1117 1117 self.filename = abs_file
1118 1118
1119 1119 return 1
1120 1120
1121 1121 def __str__(self):
1122 1122
1123 1123 print('Project[%s]: name = %s, description = %s, project_id = %s' % (self.id,
1124 1124 self.name,
1125 1125 self.description,
1126 1126 self.project_id))
1127 1127
1128 1128 for procUnitConfObj in self.procUnitConfObjDict.values():
1129 1129 print(procUnitConfObj)
1130 1130
1131 1131 def createObjects(self):
1132 1132
1133 for procUnitConfObj in self.procUnitConfObjDict.values():
1134 procUnitConfObj.createObjects()
1133
1134 keys = list(self.procUnitConfObjDict.keys())
1135 keys.sort()
1136 for key in keys:
1137 self.procUnitConfObjDict[key].createObjects()
1135 1138
1136 1139 def __handleError(self, procUnitConfObj, modes=None, stdout=True):
1137 1140
1138 1141 import socket
1139 1142
1140 1143 if modes is None:
1141 1144 modes = self.alarm
1142 1145
1143 1146 if not self.alarm:
1144 1147 modes = []
1145 1148
1146 1149 err = traceback.format_exception(sys.exc_info()[0],
1147 1150 sys.exc_info()[1],
1148 1151 sys.exc_info()[2])
1149 1152
1150 1153 log.error('{}'.format(err[-1]), procUnitConfObj.name)
1151 1154
1152 1155 message = ''.join(err)
1153 1156
1154 1157 if stdout:
1155 1158 sys.stderr.write(message)
1156 1159
1157 1160 subject = 'SChain v%s: Error running %s\n' % (
1158 1161 schainpy.__version__, procUnitConfObj.name)
1159 1162
1160 1163 subtitle = '%s: %s\n' % (
1161 1164 procUnitConfObj.getElementName(), procUnitConfObj.name)
1162 1165 subtitle += 'Hostname: %s\n' % socket.gethostbyname(
1163 1166 socket.gethostname())
1164 1167 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1165 1168 subtitle += 'Configuration file: %s\n' % self.filename
1166 1169 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1167 1170
1168 1171 readUnitConfObj = self.getReadUnitObj()
1169 1172 if readUnitConfObj:
1170 1173 subtitle += '\nInput parameters:\n'
1171 1174 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1172 1175 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1173 1176 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1174 1177 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1175 1178 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1176 1179 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1177 1180
1178 1181 a = Alarm(
1179 1182 modes=modes,
1180 1183 email=self.email,
1181 1184 message=message,
1182 1185 subject=subject,
1183 1186 subtitle=subtitle,
1184 1187 filename=self.filename
1185 1188 )
1186 1189
1187 1190 return a
1188 1191
1189 1192 def isPaused(self):
1190 1193 return 0
1191 1194
1192 1195 def isStopped(self):
1193 1196 return 0
1194 1197
1195 1198 def runController(self):
1196 1199 '''
1197 1200 returns 0 when this process has been stopped, 1 otherwise
1198 1201 '''
1199 1202
1200 1203 if self.isPaused():
1201 1204 print('Process suspended')
1202 1205
1203 1206 while True:
1204 1207 time.sleep(0.1)
1205 1208
1206 1209 if not self.isPaused():
1207 1210 break
1208 1211
1209 1212 if self.isStopped():
1210 1213 break
1211 1214
1212 1215 print('Process reinitialized')
1213 1216
1214 1217 if self.isStopped():
1215 1218 print('Process stopped')
1216 1219 return 0
1217 1220
1218 1221 return 1
1219 1222
1220 1223 def setFilename(self, filename):
1221 1224
1222 1225 self.filename = filename
1223 1226
1224 1227 def setProxyCom(self):
1225 1228
1226 1229 if not os.path.exists('/tmp/schain'):
1227 1230 os.mkdir('/tmp/schain')
1228 1231
1229 1232 self.ctx = zmq.Context()
1230 1233 xpub = self.ctx.socket(zmq.XPUB)
1231 1234 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1232 1235 xsub = self.ctx.socket(zmq.XSUB)
1233 1236 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1234 1237
1235 1238 try:
1236 1239 zmq.proxy(xpub, xsub)
1237 1240 except: # zmq.ContextTerminated:
1238 1241 xpub.close()
1239 1242 xsub.close()
1240 1243
1241 1244 def run(self):
1242 1245
1243 1246 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1244 1247 self.start_time = time.time()
1245 1248 self.createObjects()
1246 1249 # t = Thread(target=wait, args=(self.ctx, ))
1247 1250 # t.start()
1248 1251 self.setProxyCom()
1249 1252
1250 1253 # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo
1251 1254
1252 1255 log.success('{} Done (time: {}s)'.format(
1253 1256 self.name,
1254 1257 time.time()-self.start_time))
@@ -1,800 +1,797
1 1
2 2 '''
3 3 Created on Jul 3, 2014
4 4
5 5 @author: roj-idl71
6 6 '''
7 7 # SUBCHANNELS EN VEZ DE CHANNELS
8 8 # BENCHMARKS -> PROBLEMAS CON ARCHIVOS GRANDES -> INCONSTANTE EN EL TIEMPO
9 9 # ACTUALIZACION DE VERSION
10 10 # HEADERS
11 11 # MODULO DE ESCRITURA
12 12 # METADATA
13 13
14 14 import os
15 15 import datetime
16 16 import numpy
17 17 import timeit
18 18 from fractions import Fraction
19 19
20 20 try:
21 21 from gevent import sleep
22 22 except:
23 23 from time import sleep
24 24
25 25 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
26 26 from schainpy.model.data.jrodata import Voltage
27 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
27 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
28 28 from time import time
29 29
30 30 import pickle
31 31 try:
32 32 import digital_rf
33 33 except:
34 34 print('You should install "digital_rf" module if you want to read Digital RF data')
35 35
36
36 @MPDecorator
37 37 class DigitalRFReader(ProcessingUnit):
38 38 '''
39 39 classdocs
40 40 '''
41 41
42 def __init__(self, **kwargs):
42 def __init__(self):
43 43 '''
44 44 Constructor
45 45 '''
46 46
47 ProcessingUnit.__init__(self, **kwargs)
47 ProcessingUnit.__init__(self)
48 48
49 49 self.dataOut = Voltage()
50 50 self.__printInfo = True
51 51 self.__flagDiscontinuousBlock = False
52 52 self.__bufferIndex = 9999999
53 53 self.__ippKm = None
54 54 self.__codeType = 0
55 55 self.__nCode = None
56 56 self.__nBaud = None
57 57 self.__code = None
58 58 self.dtype = None
59 59 self.oldAverage = None
60 60
61 61 def close(self):
62 62 print('Average of writing to digital rf format is ', self.oldAverage * 1000)
63 63 return
64 64
65 65 def __getCurrentSecond(self):
66 66
67 67 return self.__thisUnixSample / self.__sample_rate
68 68
69 69 thisSecond = property(__getCurrentSecond, "I'm the 'thisSecond' property.")
70 70
71 71 def __setFileHeader(self):
72 72 '''
73 73 In this method will be initialized every parameter of dataOut object (header, no data)
74 74 '''
75 75 ippSeconds = 1.0 * self.__nSamples / self.__sample_rate
76 76
77 77 nProfiles = 1.0 / ippSeconds # Number of profiles in one second
78 78
79 79 try:
80 80 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(
81 81 self.__radarControllerHeader)
82 82 except:
83 83 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(
84 84 txA=0,
85 85 txB=0,
86 86 nWindows=1,
87 87 nHeights=self.__nSamples,
88 88 firstHeight=self.__firstHeigth,
89 89 deltaHeight=self.__deltaHeigth,
90 90 codeType=self.__codeType,
91 91 nCode=self.__nCode, nBaud=self.__nBaud,
92 92 code=self.__code)
93 93
94 94 try:
95 95 self.dataOut.systemHeaderObj = SystemHeader(self.__systemHeader)
96 96 except:
97 97 self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples,
98 98 nProfiles=nProfiles,
99 99 nChannels=len(
100 100 self.__channelList),
101 101 adcResolution=14)
102 102 self.dataOut.type = "Voltage"
103 103
104 104 self.dataOut.data = None
105 105
106 106 self.dataOut.dtype = self.dtype
107 107
108 108 # self.dataOut.nChannels = 0
109 109
110 110 # self.dataOut.nHeights = 0
111 111
112 112 self.dataOut.nProfiles = int(nProfiles)
113 113
114 114 self.dataOut.heightList = self.__firstHeigth + \
115 115 numpy.arange(self.__nSamples, dtype=numpy.float) * \
116 116 self.__deltaHeigth
117 117
118 118 self.dataOut.channelList = list(range(self.__num_subchannels))
119 119
120 120 self.dataOut.blocksize = self.dataOut.getNChannels() * self.dataOut.getNHeights()
121 121
122 122 # self.dataOut.channelIndexList = None
123 123
124 124 self.dataOut.flagNoData = True
125 125
126 126 self.dataOut.flagDataAsBlock = False
127 127 # Set to TRUE if the data is discontinuous
128 128 self.dataOut.flagDiscontinuousBlock = False
129 129
130 130 self.dataOut.utctime = None
131 131
132 132 # timezone like jroheader, difference in minutes between UTC and localtime
133 133 self.dataOut.timeZone = self.__timezone / 60
134 134
135 135 self.dataOut.dstFlag = 0
136 136
137 137 self.dataOut.errorCount = 0
138 138
139 139 try:
140 140 self.dataOut.nCohInt = self.fixed_metadata_dict.get(
141 141 'nCohInt', self.nCohInt)
142 142
143 143 # asumo que la data esta decodificada
144 144 self.dataOut.flagDecodeData = self.fixed_metadata_dict.get(
145 145 'flagDecodeData', self.flagDecodeData)
146 146
147 147 # asumo que la data esta sin flip
148 148 self.dataOut.flagDeflipData = self.fixed_metadata_dict['flagDeflipData']
149 149
150 150 self.dataOut.flagShiftFFT = self.fixed_metadata_dict['flagShiftFFT']
151 151
152 152 self.dataOut.useLocalTime = self.fixed_metadata_dict['useLocalTime']
153 153 except:
154 154 pass
155 155
156 156 self.dataOut.ippSeconds = ippSeconds
157 157
158 158 # Time interval between profiles
159 159 # self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt
160 160
161 161 self.dataOut.frequency = self.__frequency
162 162
163 163 self.dataOut.realtime = self.__online
164 164
165 165 def findDatafiles(self, path, startDate=None, endDate=None):
166 166
167 167 if not os.path.isdir(path):
168 168 return []
169 169
170 170 try:
171 171 digitalReadObj = digital_rf.DigitalRFReader(
172 172 path, load_all_metadata=True)
173 173 except:
174 174 digitalReadObj = digital_rf.DigitalRFReader(path)
175 175
176 176 channelNameList = digitalReadObj.get_channels()
177 177
178 178 if not channelNameList:
179 179 return []
180 180
181 181 metadata_dict = digitalReadObj.get_rf_file_metadata(channelNameList[0])
182 182
183 183 sample_rate = metadata_dict['sample_rate'][0]
184 184
185 185 this_metadata_file = digitalReadObj.get_metadata(channelNameList[0])
186 186
187 187 try:
188 188 timezone = this_metadata_file['timezone'].value
189 189 except:
190 190 timezone = 0
191 191
192 192 startUTCSecond, endUTCSecond = digitalReadObj.get_bounds(
193 193 channelNameList[0]) / sample_rate - timezone
194 194
195 195 startDatetime = datetime.datetime.utcfromtimestamp(startUTCSecond)
196 196 endDatatime = datetime.datetime.utcfromtimestamp(endUTCSecond)
197 197
198 198 if not startDate:
199 199 startDate = startDatetime.date()
200 200
201 201 if not endDate:
202 202 endDate = endDatatime.date()
203 203
204 204 dateList = []
205 205
206 206 thisDatetime = startDatetime
207 207
208 208 while(thisDatetime <= endDatatime):
209 209
210 210 thisDate = thisDatetime.date()
211 211
212 212 if thisDate < startDate:
213 213 continue
214 214
215 215 if thisDate > endDate:
216 216 break
217 217
218 218 dateList.append(thisDate)
219 219 thisDatetime += datetime.timedelta(1)
220 220
221 221 return dateList
222 222
223 223 def setup(self, path=None,
224 224 startDate=None,
225 225 endDate=None,
226 226 startTime=datetime.time(0, 0, 0),
227 227 endTime=datetime.time(23, 59, 59),
228 228 channelList=None,
229 229 nSamples=None,
230 230 online=False,
231 231 delay=60,
232 232 buffer_size=1024,
233 233 ippKm=None,
234 234 nCohInt=1,
235 235 nCode=1,
236 236 nBaud=1,
237 237 flagDecodeData=False,
238 238 code=numpy.ones((1, 1), dtype=numpy.int),
239 239 **kwargs):
240 240 '''
241 241 In this method we should set all initial parameters.
242 242
243 243 Inputs:
244 244 path
245 245 startDate
246 246 endDate
247 247 startTime
248 248 endTime
249 249 set
250 250 expLabel
251 251 ext
252 252 online
253 253 delay
254 254 '''
255 255 self.nCohInt = nCohInt
256 256 self.flagDecodeData = flagDecodeData
257 257 self.i = 0
258 258 if not os.path.isdir(path):
259 259 raise ValueError("[Reading] Directory %s does not exist" % path)
260 260
261 261 try:
262 262 self.digitalReadObj = digital_rf.DigitalRFReader(
263 263 path, load_all_metadata=True)
264 264 except:
265 265 self.digitalReadObj = digital_rf.DigitalRFReader(path)
266 266
267 267 channelNameList = self.digitalReadObj.get_channels()
268 268
269 269 if not channelNameList:
270 270 raise ValueError("[Reading] Directory %s does not have any files" % path)
271 271
272 272 if not channelList:
273 273 channelList = list(range(len(channelNameList)))
274 274
275 275 ########## Reading metadata ######################
276 276
277 277 top_properties = self.digitalReadObj.get_properties(
278 278 channelNameList[channelList[0]])
279 279
280 280 self.__num_subchannels = top_properties['num_subchannels']
281 281 self.__sample_rate = 1.0 * \
282 282 top_properties['sample_rate_numerator'] / \
283 283 top_properties['sample_rate_denominator']
284 284 # self.__samples_per_file = top_properties['samples_per_file'][0]
285 285 self.__deltaHeigth = 1e6 * 0.15 / self.__sample_rate # why 0.15?
286 286
287 287 this_metadata_file = self.digitalReadObj.get_digital_metadata(
288 288 channelNameList[channelList[0]])
289 289 metadata_bounds = this_metadata_file.get_bounds()
290 290 self.fixed_metadata_dict = this_metadata_file.read(
291 291 metadata_bounds[0])[metadata_bounds[0]] # GET FIRST HEADER
292 292
293 293 try:
294 294 self.__processingHeader = self.fixed_metadata_dict['processingHeader']
295 295 self.__radarControllerHeader = self.fixed_metadata_dict['radarControllerHeader']
296 296 self.__systemHeader = self.fixed_metadata_dict['systemHeader']
297 297 self.dtype = pickle.loads(self.fixed_metadata_dict['dtype'])
298 298 except:
299 299 pass
300 300
301 301 self.__frequency = None
302 302
303 303 self.__frequency = self.fixed_metadata_dict.get('frequency', 1)
304 304
305 305 self.__timezone = self.fixed_metadata_dict.get('timezone', 300)
306 306
307 307 try:
308 308 nSamples = self.fixed_metadata_dict['nSamples']
309 309 except:
310 310 nSamples = None
311 311
312 312 self.__firstHeigth = 0
313 313
314 314 try:
315 315 codeType = self.__radarControllerHeader['codeType']
316 316 except:
317 317 codeType = 0
318 318
319 319 try:
320 320 if codeType:
321 321 nCode = self.__radarControllerHeader['nCode']
322 322 nBaud = self.__radarControllerHeader['nBaud']
323 323 code = self.__radarControllerHeader['code']
324 324 except:
325 325 pass
326 326
327 327 if not ippKm:
328 328 try:
329 329 # seconds to km
330 330 ippKm = self.__radarControllerHeader['ipp']
331 331 except:
332 332 ippKm = None
333 333 ####################################################
334 334 self.__ippKm = ippKm
335 335 startUTCSecond = None
336 336 endUTCSecond = None
337 337
338 338 if startDate:
339 339 startDatetime = datetime.datetime.combine(startDate, startTime)
340 340 startUTCSecond = (
341 341 startDatetime - datetime.datetime(1970, 1, 1)).total_seconds() + self.__timezone
342 342
343 343 if endDate:
344 344 endDatetime = datetime.datetime.combine(endDate, endTime)
345 345 endUTCSecond = (endDatetime - datetime.datetime(1970,
346 346 1, 1)).total_seconds() + self.__timezone
347 347
348 348 start_index, end_index = self.digitalReadObj.get_bounds(
349 349 channelNameList[channelList[0]])
350 350
351 351 if not startUTCSecond:
352 352 startUTCSecond = start_index / self.__sample_rate
353 353
354 354 if start_index > startUTCSecond * self.__sample_rate:
355 355 startUTCSecond = start_index / self.__sample_rate
356 356
357 357 if not endUTCSecond:
358 358 endUTCSecond = end_index / self.__sample_rate
359 359
360 360 if end_index < endUTCSecond * self.__sample_rate:
361 361 endUTCSecond = end_index / self.__sample_rate
362 362 if not nSamples:
363 363 if not ippKm:
364 364 raise ValueError("[Reading] nSamples or ippKm should be defined")
365 365 nSamples = int(ippKm / (1e6 * 0.15 / self.__sample_rate))
366 366 channelBoundList = []
367 367 channelNameListFiltered = []
368 368
369 369 for thisIndexChannel in channelList:
370 370 thisChannelName = channelNameList[thisIndexChannel]
371 371 start_index, end_index = self.digitalReadObj.get_bounds(
372 372 thisChannelName)
373 373 channelBoundList.append((start_index, end_index))
374 374 channelNameListFiltered.append(thisChannelName)
375 375
376 376 self.profileIndex = 0
377 377 self.i = 0
378 378 self.__delay = delay
379 379
380 380 self.__codeType = codeType
381 381 self.__nCode = nCode
382 382 self.__nBaud = nBaud
383 383 self.__code = code
384 384
385 385 self.__datapath = path
386 386 self.__online = online
387 387 self.__channelList = channelList
388 388 self.__channelNameList = channelNameListFiltered
389 389 self.__channelBoundList = channelBoundList
390 390 self.__nSamples = nSamples
391 391 self.__samples_to_read = int(nSamples) # FIJO: AHORA 40
392 392 self.__nChannels = len(self.__channelList)
393 393
394 394 self.__startUTCSecond = startUTCSecond
395 395 self.__endUTCSecond = endUTCSecond
396 396
397 397 self.__timeInterval = 1.0 * self.__samples_to_read / \
398 398 self.__sample_rate # Time interval
399 399
400 400 if online:
401 401 # self.__thisUnixSample = int(endUTCSecond*self.__sample_rate - 4*self.__samples_to_read)
402 402 startUTCSecond = numpy.floor(endUTCSecond)
403 403
404 404 # por que en el otro metodo lo primero q se hace es sumar samplestoread
405 405 self.__thisUnixSample = int(
406 406 startUTCSecond * self.__sample_rate) - self.__samples_to_read
407 407
408 408 self.__data_buffer = numpy.zeros(
409 409 (self.__num_subchannels, self.__samples_to_read), dtype=numpy.complex)
410 410
411 411 self.__setFileHeader()
412 412 self.isConfig = True
413 413
414 414 print("[Reading] Digital RF Data was found from %s to %s " % (
415 415 datetime.datetime.utcfromtimestamp(
416 416 self.__startUTCSecond - self.__timezone),
417 417 datetime.datetime.utcfromtimestamp(
418 418 self.__endUTCSecond - self.__timezone)
419 419 ))
420 420
421 421 print("[Reading] Starting process from %s to %s" % (datetime.datetime.utcfromtimestamp(startUTCSecond - self.__timezone),
422 422 datetime.datetime.utcfromtimestamp(
423 423 endUTCSecond - self.__timezone)
424 424 ))
425 425 self.oldAverage = None
426 426 self.count = 0
427 427 self.executionTime = 0
428 428
429 429 def __reload(self):
430 430 # print
431 431 # print "%s not in range [%s, %s]" %(
432 432 # datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
433 433 # datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
434 434 # datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
435 435 # )
436 436 print("[Reading] reloading metadata ...")
437 437
438 438 try:
439 439 self.digitalReadObj.reload(complete_update=True)
440 440 except:
441 441 self.digitalReadObj.reload()
442 442
443 443 start_index, end_index = self.digitalReadObj.get_bounds(
444 444 self.__channelNameList[self.__channelList[0]])
445 445
446 446 if start_index > self.__startUTCSecond * self.__sample_rate:
447 447 self.__startUTCSecond = 1.0 * start_index / self.__sample_rate
448 448
449 449 if end_index > self.__endUTCSecond * self.__sample_rate:
450 450 self.__endUTCSecond = 1.0 * end_index / self.__sample_rate
451 451 print()
452 452 print("[Reading] New timerange found [%s, %s] " % (
453 453 datetime.datetime.utcfromtimestamp(
454 454 self.__startUTCSecond - self.__timezone),
455 455 datetime.datetime.utcfromtimestamp(
456 456 self.__endUTCSecond - self.__timezone)
457 457 ))
458 458
459 459 return True
460 460
461 461 return False
462 462
463 463 def timeit(self, toExecute):
464 464 t0 = time()
465 465 toExecute()
466 466 self.executionTime = time() - t0
467 467 if self.oldAverage is None:
468 468 self.oldAverage = self.executionTime
469 469 self.oldAverage = (self.executionTime + self.count *
470 470 self.oldAverage) / (self.count + 1.0)
471 471 self.count = self.count + 1.0
472 472 return
473 473
474 474 def __readNextBlock(self, seconds=30, volt_scale=1):
475 475 '''
476 476 '''
477 477
478 478 # Set the next data
479 479 self.__flagDiscontinuousBlock = False
480 480 self.__thisUnixSample += self.__samples_to_read
481 481
482 482 if self.__thisUnixSample + 2 * self.__samples_to_read > self.__endUTCSecond * self.__sample_rate:
483 483 print("[Reading] There are no more data into selected time-range")
484 484 if self.__online:
485 485 self.__reload()
486 486 else:
487 487 return False
488 488
489 489 if self.__thisUnixSample + 2 * self.__samples_to_read > self.__endUTCSecond * self.__sample_rate:
490 490 return False
491 491 self.__thisUnixSample -= self.__samples_to_read
492 492
493 493 indexChannel = 0
494 494
495 495 dataOk = False
496 496 for thisChannelName in self.__channelNameList: # TODO VARIOS CHANNELS?
497 497 for indexSubchannel in range(self.__num_subchannels):
498 498 try:
499 499 t0 = time()
500 500 result = self.digitalReadObj.read_vector_c81d(self.__thisUnixSample,
501 501 self.__samples_to_read,
502 502 thisChannelName, sub_channel=indexSubchannel)
503 503 self.executionTime = time() - t0
504 504 if self.oldAverage is None:
505 505 self.oldAverage = self.executionTime
506 506 self.oldAverage = (
507 507 self.executionTime + self.count * self.oldAverage) / (self.count + 1.0)
508 508 self.count = self.count + 1.0
509 509
510 510 except IOError as e:
511 511 # read next profile
512 512 self.__flagDiscontinuousBlock = True
513 513 print("[Reading] %s" % datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), e)
514 514 break
515 515
516 516 if result.shape[0] != self.__samples_to_read:
517 517 self.__flagDiscontinuousBlock = True
518 518 print("[Reading] %s: Too few samples were found, just %d/%d samples" % (datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
519 519 result.shape[0],
520 520 self.__samples_to_read))
521 521 break
522 522
523 523 self.__data_buffer[indexSubchannel, :] = result * volt_scale
524 524
525 525 indexChannel += 1
526 526
527 527 dataOk = True
528 528
529 529 self.__utctime = self.__thisUnixSample / self.__sample_rate
530 530
531 531 if not dataOk:
532 532 return False
533 533
534 534 print("[Reading] %s: %d samples <> %f sec" % (datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
535 535 self.__samples_to_read,
536 536 self.__timeInterval))
537 537
538 538 self.__bufferIndex = 0
539 539
540 540 return True
541 541
542 542 def __isBufferEmpty(self):
543 543 return self.__bufferIndex > self.__samples_to_read - self.__nSamples # 40960 - 40
544 544
545 545 def getData(self, seconds=30, nTries=5):
546 546 '''
547 547 This method gets the data from files and put the data into the dataOut object
548 548
549 549 In addition, increase el the buffer counter in one.
550 550
551 551 Return:
552 552 data : retorna un perfil de voltages (alturas * canales) copiados desde el
553 553 buffer. Si no hay mas archivos a leer retorna None.
554 554
555 555 Affected:
556 556 self.dataOut
557 557 self.profileIndex
558 558 self.flagDiscontinuousBlock
559 559 self.flagIsNewBlock
560 560 '''
561 561
562 562 err_counter = 0
563 563 self.dataOut.flagNoData = True
564 564
565 565 if self.__isBufferEmpty():
566 566 self.__flagDiscontinuousBlock = False
567 567
568 568 while True:
569 569 if self.__readNextBlock():
570 570 break
571 571 if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate:
572 return False
572 self.dataOut.error = (1, '')
573 return
573 574
574 575 if self.__flagDiscontinuousBlock:
575 576 print('[Reading] discontinuous block found ... continue with the next block')
576 continue
577 self.dataOut.error = (1, '')
578 return
577 579
578 580 if not self.__online:
579 return False
581 self.dataOut.error = (1, '')
582 return
580 583
581 584 err_counter += 1
582 585 if err_counter > nTries:
583 return False
586 self.dataOut.error = (1, '')
587 return
584 588
585 589 print('[Reading] waiting %d seconds to read a new block' % seconds)
586 590 sleep(seconds)
587 591
588 592 self.dataOut.data = self.__data_buffer[:,
589 593 self.__bufferIndex:self.__bufferIndex + self.__nSamples]
590 594 self.dataOut.utctime = (
591 595 self.__thisUnixSample + self.__bufferIndex) / self.__sample_rate
592 596 self.dataOut.flagNoData = False
593 597 self.dataOut.flagDiscontinuousBlock = self.__flagDiscontinuousBlock
594 598 self.dataOut.profileIndex = self.profileIndex
595 599
596 600 self.__bufferIndex += self.__nSamples
597 601 self.profileIndex += 1
598 602
599 603 if self.profileIndex == self.dataOut.nProfiles:
600 604 self.profileIndex = 0
601 605
602 return True
606 return
603 607
604 608 def printInfo(self):
605 609 '''
606 610 '''
607 611 if self.__printInfo == False:
608 612 return
609 613
610 614 # self.systemHeaderObj.printInfo()
611 615 # self.radarControllerHeaderObj.printInfo()
612 616
613 617 self.__printInfo = False
614 618
615 619 def printNumberOfBlock(self):
616 620 '''
617 621 '''
618 622 return
619 623 # print self.profileIndex
620 624
621 625 def run(self, **kwargs):
622 626 '''
623 627 This method will be called many times so here you should put all your code
624 628 '''
625 629
626 630 if not self.isConfig:
627 631 self.setup(**kwargs)
628 632 #self.i = self.i+1
629 633 self.getData(seconds=self.__delay)
630 634
631 635 return
632 636
633 637
634 638 class DigitalRFWriter(Operation):
635 639 '''
636 640 classdocs
637 641 '''
638 642
639 643 def __init__(self, **kwargs):
640 644 '''
641 645 Constructor
642 646 '''
643 647 Operation.__init__(self, **kwargs)
644 648 self.metadata_dict = {}
645 649 self.dataOut = None
646 650 self.dtype = None
647 651 self.oldAverage = 0
648 652
649 653 def setHeader(self):
650 654
651 655 self.metadata_dict['frequency'] = self.dataOut.frequency
652 656 self.metadata_dict['timezone'] = self.dataOut.timeZone
653 657 self.metadata_dict['dtype'] = pickle.dumps(self.dataOut.dtype)
654 658 self.metadata_dict['nProfiles'] = self.dataOut.nProfiles
655 659 self.metadata_dict['heightList'] = self.dataOut.heightList
656 660 self.metadata_dict['channelList'] = self.dataOut.channelList
657 661 self.metadata_dict['flagDecodeData'] = self.dataOut.flagDecodeData
658 662 self.metadata_dict['flagDeflipData'] = self.dataOut.flagDeflipData
659 663 self.metadata_dict['flagShiftFFT'] = self.dataOut.flagShiftFFT
660 664 self.metadata_dict['useLocalTime'] = self.dataOut.useLocalTime
661 665 self.metadata_dict['nCohInt'] = self.dataOut.nCohInt
662 666 self.metadata_dict['type'] = self.dataOut.type
663 667 self.metadata_dict['flagDataAsBlock'] = getattr(
664 668 self.dataOut, 'flagDataAsBlock', None) # chequear
665 669
666 670 def setup(self, dataOut, path, frequency, fileCadence, dirCadence, metadataCadence, set=0, metadataFile='metadata', ext='.h5'):
667 671 '''
668 672 In this method we should set all initial parameters.
669 673 Input:
670 674 dataOut: Input data will also be outputa data
671 675 '''
672 676 self.setHeader()
673 677 self.__ippSeconds = dataOut.ippSeconds
674 678 self.__deltaH = dataOut.getDeltaH()
675 679 self.__sample_rate = 1e6 * 0.15 / self.__deltaH
676 680 self.__dtype = dataOut.dtype
677 681 if len(dataOut.dtype) == 2:
678 682 self.__dtype = dataOut.dtype[0]
679 683 self.__nSamples = dataOut.systemHeaderObj.nSamples
680 684 self.__nProfiles = dataOut.nProfiles
681 685
682 686 if self.dataOut.type != 'Voltage':
683 687 raise 'Digital RF cannot be used with this data type'
684 688 self.arr_data = numpy.ones((1, dataOut.nFFTPoints * len(
685 689 self.dataOut.channelList)), dtype=[('r', self.__dtype), ('i', self.__dtype)])
686 690 else:
687 691 self.arr_data = numpy.ones((self.__nSamples, len(
688 692 self.dataOut.channelList)), dtype=[('r', self.__dtype), ('i', self.__dtype)])
689 693
690 694 file_cadence_millisecs = 1000
691 695
692 696 sample_rate_fraction = Fraction(self.__sample_rate).limit_denominator()
693 697 sample_rate_numerator = int(sample_rate_fraction.numerator)
694 698 sample_rate_denominator = int(sample_rate_fraction.denominator)
695 699 start_global_index = dataOut.utctime * self.__sample_rate
696 700
697 701 uuid = 'prueba'
698 702 compression_level = 0
699 703 checksum = False
700 704 is_complex = True
701 705 num_subchannels = len(dataOut.channelList)
702 706 is_continuous = True
703 707 marching_periods = False
704 708
705 709 self.digitalWriteObj = digital_rf.DigitalRFWriter(path, self.__dtype, dirCadence,
706 710 fileCadence, start_global_index,
707 711 sample_rate_numerator, sample_rate_denominator, uuid, compression_level, checksum,
708 712 is_complex, num_subchannels, is_continuous, marching_periods)
709 713 metadata_dir = os.path.join(path, 'metadata')
710 714 os.system('mkdir %s' % (metadata_dir))
711 715 self.digitalMetadataWriteObj = digital_rf.DigitalMetadataWriter(metadata_dir, dirCadence, 1, # 236, file_cadence_millisecs / 1000
712 716 sample_rate_numerator, sample_rate_denominator,
713 717 metadataFile)
714 718 self.isConfig = True
715 719 self.currentSample = 0
716 720 self.oldAverage = 0
717 721 self.count = 0
718 722 return
719 723
720 724 def writeMetadata(self):
721 725 start_idx = self.__sample_rate * self.dataOut.utctime
722 726
723 727 self.metadata_dict['processingHeader'] = self.dataOut.processingHeaderObj.getAsDict(
724 728 )
725 729 self.metadata_dict['radarControllerHeader'] = self.dataOut.radarControllerHeaderObj.getAsDict(
726 730 )
727 731 self.metadata_dict['systemHeader'] = self.dataOut.systemHeaderObj.getAsDict(
728 732 )
729 733 self.digitalMetadataWriteObj.write(start_idx, self.metadata_dict)
730 734 return
731 735
732 736 def timeit(self, toExecute):
733 737 t0 = time()
734 738 toExecute()
735 739 self.executionTime = time() - t0
736 740 if self.oldAverage is None:
737 741 self.oldAverage = self.executionTime
738 742 self.oldAverage = (self.executionTime + self.count *
739 743 self.oldAverage) / (self.count + 1.0)
740 744 self.count = self.count + 1.0
741 745 return
742 746
743 747 def writeData(self):
744 748 if self.dataOut.type != 'Voltage':
745 749 raise 'Digital RF cannot be used with this data type'
746 750 for channel in self.dataOut.channelList:
747 751 for i in range(self.dataOut.nFFTPoints):
748 752 self.arr_data[1][channel * self.dataOut.nFFTPoints +
749 753 i]['r'] = self.dataOut.data[channel][i].real
750 754 self.arr_data[1][channel * self.dataOut.nFFTPoints +
751 755 i]['i'] = self.dataOut.data[channel][i].imag
752 756 else:
753 757 for i in range(self.dataOut.systemHeaderObj.nSamples):
754 758 for channel in self.dataOut.channelList:
755 759 self.arr_data[i][channel]['r'] = self.dataOut.data[channel][i].real
756 760 self.arr_data[i][channel]['i'] = self.dataOut.data[channel][i].imag
757 761
758 762 def f(): return self.digitalWriteObj.rf_write(self.arr_data)
759 763 self.timeit(f)
760 764
761 765 return
762 766
763 767 def run(self, dataOut, frequency=49.92e6, path=None, fileCadence=1000, dirCadence=36000, metadataCadence=1, **kwargs):
764 768 '''
765 769 This method will be called many times so here you should put all your code
766 770 Inputs:
767 771 dataOut: object with the data
768 772 '''
769 773 # print dataOut.__dict__
770 774 self.dataOut = dataOut
771 775 if not self.isConfig:
772 776 self.setup(dataOut, path, frequency, fileCadence,
773 777 dirCadence, metadataCadence, **kwargs)
774 778 self.writeMetadata()
775 779
776 780 self.writeData()
777 781
778 782 ## self.currentSample += 1
779 783 # if self.dataOut.flagDataAsBlock or self.currentSample == 1:
780 784 # self.writeMetadata()
781 785 ## if self.currentSample == self.__nProfiles: self.currentSample = 0
782 786
787 return dataOut
788
783 789 def close(self):
784 790 print('[Writing] - Closing files ')
785 791 print('Average of writing to digital rf format is ', self.oldAverage * 1000)
786 792 try:
787 793 self.digitalWriteObj.close()
788 794 except:
789 795 pass
790 796
791 797
792 # raise
793 if __name__ == '__main__':
794
795 readObj = DigitalRFReader()
796
797 while True:
798 readObj.run(path='/home/jchavez/jicamarca/mocked_data/')
799 # readObj.printInfo()
800 # readObj.printNumberOfBlock() No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now