##// END OF EJS Templates
sin kwargs 714
José Chávez -
r1033:a86101267310
parent child
Show More
@@ -1,1329 +1,1328
1 1 '''
2 2 Created on September , 2012
3 3 @author:
4 4 '''
5 5
6 6 import sys
7 7 import ast
8 8 import datetime
9 9 import traceback
10 10 import math
11 11 import time
12 12 from multiprocessing import Process, Queue, cpu_count
13 13
14 14 import schainpy
15 15 import schainpy.admin
16 16
17 17 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
18 18 from xml.dom import minidom
19 19
20 20 from schainpy.model import *
21 21 from time import sleep
22 22
23 23
24 24
25 25 def prettify(elem):
26 26 """Return a pretty-printed XML string for the Element.
27 27 """
28 28 rough_string = tostring(elem, 'utf-8')
29 29 reparsed = minidom.parseString(rough_string)
30 30 return reparsed.toprettyxml(indent=" ")
31 31
32 32 def multiSchain(child, nProcess=cpu_count(), startDate=None, endDate=None, by_day=False):
33 33 skip = 0
34 34 cursor = 0
35 35 nFiles = None
36 36 processes = []
37 37 dt1 = datetime.datetime.strptime(startDate, '%Y/%m/%d')
38 38 dt2 = datetime.datetime.strptime(endDate, '%Y/%m/%d')
39 39 days = (dt2 - dt1).days
40 40
41 41 for day in range(days+1):
42 42 skip = 0
43 43 cursor = 0
44 44 q = Queue()
45 45 processes = []
46 46 dt = (dt1 + datetime.timedelta(day)).strftime('%Y/%m/%d')
47 47 firstProcess = Process(target=child, args=(cursor, skip, q, dt))
48 48 firstProcess.start()
49 49 if by_day:
50 50 continue
51 51 nFiles = q.get()
52 52 if nFiles==0:
53 53 continue
54 54 firstProcess.terminate()
55 55 skip = int(math.ceil(nFiles/nProcess))
56 56 while True:
57 57 processes.append(Process(target=child, args=(cursor, skip, q, dt)))
58 58 processes[cursor].start()
59 59 if nFiles < cursor*skip:
60 60 break
61 61 cursor += 1
62 62
63 63 def beforeExit(exctype, value, trace):
64 64 for process in processes:
65 65 process.terminate()
66 66 process.join()
67 67 print traceback.print_tb(trace)
68 68
69 69 sys.excepthook = beforeExit
70 70
71 71 for process in processes:
72 72 process.join()
73 73 process.terminate()
74 74
75 75 time.sleep(3)
76 76
77 77
78 78 class ParameterConf():
79 79
80 80 id = None
81 81 name = None
82 82 value = None
83 83 format = None
84 84
85 85 __formated_value = None
86 86
87 87 ELEMENTNAME = 'Parameter'
88 88
89 89 def __init__(self):
90 90
91 91 self.format = 'str'
92 92
93 93 def getElementName(self):
94 94
95 95 return self.ELEMENTNAME
96 96
97 97 def getValue(self):
98 98
99 99 value = self.value
100 100 format = self.format
101 101
102 102 if self.__formated_value != None:
103 103
104 104 return self.__formated_value
105 105
106 106 if format == 'obj':
107 107 return value
108 108
109 109 if format == 'str':
110 110 self.__formated_value = str(value)
111 111 return self.__formated_value
112 112
113 113 if value == '':
114 114 raise ValueError, "%s: This parameter value is empty" %self.name
115 115
116 116 if format == 'list':
117 117 strList = value.split(',')
118 118
119 119 self.__formated_value = strList
120 120
121 121 return self.__formated_value
122 122
123 123 if format == 'intlist':
124 124 """
125 125 Example:
126 126 value = (0,1,2)
127 127 """
128 128
129 129 new_value = ast.literal_eval(value)
130 130
131 131 if type(new_value) not in (tuple, list):
132 132 new_value = [int(new_value)]
133 133
134 134 self.__formated_value = new_value
135 135
136 136 return self.__formated_value
137 137
138 138 if format == 'floatlist':
139 139 """
140 140 Example:
141 141 value = (0.5, 1.4, 2.7)
142 142 """
143 143
144 144 new_value = ast.literal_eval(value)
145 145
146 146 if type(new_value) not in (tuple, list):
147 147 new_value = [float(new_value)]
148 148
149 149 self.__formated_value = new_value
150 150
151 151 return self.__formated_value
152 152
153 153 if format == 'date':
154 154 strList = value.split('/')
155 155 intList = [int(x) for x in strList]
156 156 date = datetime.date(intList[0], intList[1], intList[2])
157 157
158 158 self.__formated_value = date
159 159
160 160 return self.__formated_value
161 161
162 162 if format == 'time':
163 163 strList = value.split(':')
164 164 intList = [int(x) for x in strList]
165 165 time = datetime.time(intList[0], intList[1], intList[2])
166 166
167 167 self.__formated_value = time
168 168
169 169 return self.__formated_value
170 170
171 171 if format == 'pairslist':
172 172 """
173 173 Example:
174 174 value = (0,1),(1,2)
175 175 """
176 176
177 177 new_value = ast.literal_eval(value)
178 178
179 179 if type(new_value) not in (tuple, list):
180 180 raise ValueError, "%s has to be a tuple or list of pairs" %value
181 181
182 182 if type(new_value[0]) not in (tuple, list):
183 183 if len(new_value) != 2:
184 184 raise ValueError, "%s has to be a tuple or list of pairs" %value
185 185 new_value = [new_value]
186 186
187 187 for thisPair in new_value:
188 188 if len(thisPair) != 2:
189 189 raise ValueError, "%s has to be a tuple or list of pairs" %value
190 190
191 191 self.__formated_value = new_value
192 192
193 193 return self.__formated_value
194 194
195 195 if format == 'multilist':
196 196 """
197 197 Example:
198 198 value = (0,1,2),(3,4,5)
199 199 """
200 200 multiList = ast.literal_eval(value)
201 201
202 202 if type(multiList[0]) == int:
203 203 multiList = ast.literal_eval("(" + value + ")")
204 204
205 205 self.__formated_value = multiList
206 206
207 207 return self.__formated_value
208 208
209 209 if format == 'bool':
210 210 value = int(value)
211 211
212 212 if format == 'int':
213 213 value = float(value)
214 214
215 215 format_func = eval(format)
216 216
217 217 self.__formated_value = format_func(value)
218 218
219 219 return self.__formated_value
220 220
221 221 def updateId(self, new_id):
222 222
223 223 self.id = str(new_id)
224 224
225 225 def setup(self, id, name, value, format='str'):
226 226 self.id = str(id)
227 227 self.name = name
228 228 if format == 'obj':
229 229 self.value = value
230 230 else:
231 231 self.value = str(value)
232 232 self.format = str.lower(format)
233 233
234 234 self.getValue()
235 235
236 236 return 1
237 237
238 238 def update(self, name, value, format='str'):
239 239
240 240 self.name = name
241 241 self.value = str(value)
242 242 self.format = format
243 243
244 244 def makeXml(self, opElement):
245 245 if self.name not in ('queue',):
246 246 parmElement = SubElement(opElement, self.ELEMENTNAME)
247 247 parmElement.set('id', str(self.id))
248 248 parmElement.set('name', self.name)
249 249 parmElement.set('value', self.value)
250 250 parmElement.set('format', self.format)
251 251
252 252 def readXml(self, parmElement):
253 253
254 254 self.id = parmElement.get('id')
255 255 self.name = parmElement.get('name')
256 256 self.value = parmElement.get('value')
257 257 self.format = str.lower(parmElement.get('format'))
258 258
259 259 #Compatible with old signal chain version
260 260 if self.format == 'int' and self.name == 'idfigure':
261 261 self.name = 'id'
262 262
263 263 def printattr(self):
264 264
265 265 print "Parameter[%s]: name = %s, value = %s, format = %s" %(self.id, self.name, self.value, self.format)
266 266
267 267 class OperationConf():
268 268
269 269 id = None
270 270 name = None
271 271 priority = None
272 272 type = None
273 273
274 274 parmConfObjList = []
275 275
276 276 ELEMENTNAME = 'Operation'
277 277
278 278 def __init__(self):
279 279
280 280 self.id = '0'
281 281 self.name = None
282 282 self.priority = None
283 283 self.type = 'self'
284 284
285 285
286 286 def __getNewId(self):
287 287
288 288 return int(self.id)*10 + len(self.parmConfObjList) + 1
289 289
290 290 def updateId(self, new_id):
291 291
292 292 self.id = str(new_id)
293 293
294 294 n = 1
295 295 for parmObj in self.parmConfObjList:
296 296
297 297 idParm = str(int(new_id)*10 + n)
298 298 parmObj.updateId(idParm)
299 299
300 300 n += 1
301 301
302 302 def getElementName(self):
303 303
304 304 return self.ELEMENTNAME
305 305
306 306 def getParameterObjList(self):
307 307
308 308 return self.parmConfObjList
309 309
310 310 def getParameterObj(self, parameterName):
311 311
312 312 for parmConfObj in self.parmConfObjList:
313 313
314 314 if parmConfObj.name != parameterName:
315 315 continue
316 316
317 317 return parmConfObj
318 318
319 319 return None
320 320
321 321 def getParameterObjfromValue(self, parameterValue):
322 322
323 323 for parmConfObj in self.parmConfObjList:
324 324
325 325 if parmConfObj.getValue() != parameterValue:
326 326 continue
327 327
328 328 return parmConfObj.getValue()
329 329
330 330 return None
331 331
332 332 def getParameterValue(self, parameterName):
333 333
334 334 parameterObj = self.getParameterObj(parameterName)
335 335
336 336 # if not parameterObj:
337 337 # return None
338 338
339 339 value = parameterObj.getValue()
340 340
341 341 return value
342 342
343 343
344 344 def getKwargs(self):
345 345
346 346 kwargs = {}
347 347
348 348 for parmConfObj in self.parmConfObjList:
349 349 if self.name == 'run' and parmConfObj.name == 'datatype':
350 350 continue
351 351
352 352 kwargs[parmConfObj.name] = parmConfObj.getValue()
353 353
354 354 return kwargs
355 355
356 356 def setup(self, id, name, priority, type):
357 357
358 358 self.id = str(id)
359 359 self.name = name
360 360 self.type = type
361 361 self.priority = priority
362 362
363 363 self.parmConfObjList = []
364 364
365 365 def removeParameters(self):
366 366
367 367 for obj in self.parmConfObjList:
368 368 del obj
369 369
370 370 self.parmConfObjList = []
371 371
372 372 def addParameter(self, name, value, format='str'):
373 373
374 374 id = self.__getNewId()
375 375
376 376 parmConfObj = ParameterConf()
377 377 if not parmConfObj.setup(id, name, value, format):
378 378 return None
379 379
380 380 self.parmConfObjList.append(parmConfObj)
381 381
382 382 return parmConfObj
383 383
384 384 def changeParameter(self, name, value, format='str'):
385 385
386 386 parmConfObj = self.getParameterObj(name)
387 387 parmConfObj.update(name, value, format)
388 388
389 389 return parmConfObj
390 390
391 391 def makeXml(self, procUnitElement):
392 392
393 393 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
394 394 opElement.set('id', str(self.id))
395 395 opElement.set('name', self.name)
396 396 opElement.set('type', self.type)
397 397 opElement.set('priority', str(self.priority))
398 398
399 399 for parmConfObj in self.parmConfObjList:
400 400 parmConfObj.makeXml(opElement)
401 401
402 402 def readXml(self, opElement):
403 403
404 404 self.id = opElement.get('id')
405 405 self.name = opElement.get('name')
406 406 self.type = opElement.get('type')
407 407 self.priority = opElement.get('priority')
408 408
409 409 #Compatible with old signal chain version
410 410 #Use of 'run' method instead 'init'
411 411 if self.type == 'self' and self.name == 'init':
412 412 self.name = 'run'
413 413
414 414 self.parmConfObjList = []
415 415
416 416 parmElementList = opElement.iter(ParameterConf().getElementName())
417 417
418 418 for parmElement in parmElementList:
419 419 parmConfObj = ParameterConf()
420 420 parmConfObj.readXml(parmElement)
421 421
422 422 #Compatible with old signal chain version
423 423 #If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
424 424 if self.type != 'self' and self.name == 'Plot':
425 425 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
426 426 self.name = parmConfObj.value
427 427 continue
428 428
429 429 self.parmConfObjList.append(parmConfObj)
430 430
431 431 def printattr(self):
432 432
433 433 print "%s[%s]: name = %s, type = %s, priority = %s" %(self.ELEMENTNAME,
434 434 self.id,
435 435 self.name,
436 436 self.type,
437 437 self.priority)
438 438
439 439 for parmConfObj in self.parmConfObjList:
440 440 parmConfObj.printattr()
441 441
442 442 def createObject(self, plotter_queue=None):
443 443
444 444
445 445 if self.type == 'self':
446 446 raise ValueError, "This operation type cannot be created"
447 447
448 448 if self.type == 'plotter':
449 449 #Plotter(plotter_name)
450 450 if not plotter_queue:
451 451 raise ValueError, "plotter_queue is not defined. Use:\nmyProject = Project()\nmyProject.setPlotterQueue(plotter_queue)"
452 452
453 453 opObj = Plotter(self.name, plotter_queue)
454 454
455 455 if self.type == 'external' or self.type == 'other':
456 456
457 457 className = eval(self.name)
458 458 kwargs = self.getKwargs()
459 459
460 460 opObj = className(**kwargs)
461 461
462 462 return opObj
463 463
464 464
465 465 class ProcUnitConf():
466 466
467 467 id = None
468 468 name = None
469 469 datatype = None
470 470 inputId = None
471 471 parentId = None
472 472
473 473 opConfObjList = []
474 474
475 475 procUnitObj = None
476 476 opObjList = []
477 477
478 478 ELEMENTNAME = 'ProcUnit'
479 479
480 480 def __init__(self):
481 481
482 482 self.id = None
483 483 self.datatype = None
484 484 self.name = None
485 485 self.inputId = None
486 486
487 487 self.opConfObjList = []
488 488
489 489 self.procUnitObj = None
490 490 self.opObjDict = {}
491 491
492 492 def __getPriority(self):
493 493
494 494 return len(self.opConfObjList)+1
495 495
496 496 def __getNewId(self):
497 497
498 498 return int(self.id)*10 + len(self.opConfObjList) + 1
499 499
500 500 def getElementName(self):
501 501
502 502 return self.ELEMENTNAME
503 503
504 504 def getId(self):
505 505
506 506 return self.id
507 507
508 508 def updateId(self, new_id, parentId=parentId):
509 509
510 510
511 511 new_id = int(parentId)*10 + (int(self.id) % 10)
512 512 new_inputId = int(parentId)*10 + (int(self.inputId) % 10)
513 513
514 514 #If this proc unit has not inputs
515 515 if self.inputId == '0':
516 516 new_inputId = 0
517 517
518 518 n = 1
519 519 for opConfObj in self.opConfObjList:
520 520
521 521 idOp = str(int(new_id)*10 + n)
522 522 opConfObj.updateId(idOp)
523 523
524 524 n += 1
525 525
526 526 self.parentId = str(parentId)
527 527 self.id = str(new_id)
528 528 self.inputId = str(new_inputId)
529 529
530 530
531 531 def getInputId(self):
532 532
533 533 return self.inputId
534 534
535 535 def getOperationObjList(self):
536 536
537 537 return self.opConfObjList
538 538
539 539 def getOperationObj(self, name=None):
540 540
541 541 for opConfObj in self.opConfObjList:
542 542
543 543 if opConfObj.name != name:
544 544 continue
545 545
546 546 return opConfObj
547 547
548 548 return None
549 549
550 550 def getOpObjfromParamValue(self, value=None):
551 551
552 552 for opConfObj in self.opConfObjList:
553 553 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
554 554 continue
555 555 return opConfObj
556 556 return None
557 557
558 558 def getProcUnitObj(self):
559 559
560 560 return self.procUnitObj
561 561
562 562 def setup(self, id, name, datatype, inputId, parentId=None):
563 563
564 564 #Compatible with old signal chain version
565 565 if datatype==None and name==None:
566 566 raise ValueError, "datatype or name should be defined"
567 567
568 568 if name==None:
569 569 if 'Proc' in datatype:
570 570 name = datatype
571 571 else:
572 572 name = '%sProc' %(datatype)
573 573
574 574 if datatype==None:
575 575 datatype = name.replace('Proc','')
576 576
577 577 self.id = str(id)
578 578 self.name = name
579 579 self.datatype = datatype
580 580 self.inputId = inputId
581 581 self.parentId = parentId
582 582
583 583 self.opConfObjList = []
584 584
585 585 self.addOperation(name='run', optype='self')
586 586
587 587 def removeOperations(self):
588 588
589 589 for obj in self.opConfObjList:
590 590 del obj
591 591
592 592 self.opConfObjList = []
593 593 self.addOperation(name='run')
594 594
595 595 def addParameter(self, **kwargs):
596 596 '''
597 597 Add parameters to "run" operation
598 598 '''
599 599 opObj = self.opConfObjList[0]
600 600
601 601 opObj.addParameter(**kwargs)
602 602
603 603 return opObj
604 604
605 605 def addOperation(self, name, optype='self'):
606 606
607 607 id = self.__getNewId()
608 608 priority = self.__getPriority()
609 609
610 610 opConfObj = OperationConf()
611 611 opConfObj.setup(id, name=name, priority=priority, type=optype)
612 612
613 613 self.opConfObjList.append(opConfObj)
614 614
615 615 return opConfObj
616 616
617 617 def makeXml(self, projectElement):
618 618
619 619 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
620 620 procUnitElement.set('id', str(self.id))
621 621 procUnitElement.set('name', self.name)
622 622 procUnitElement.set('datatype', self.datatype)
623 623 procUnitElement.set('inputId', str(self.inputId))
624 624
625 625 for opConfObj in self.opConfObjList:
626 626 opConfObj.makeXml(procUnitElement)
627 627
628 628 def readXml(self, upElement):
629 629
630 630 self.id = upElement.get('id')
631 631 self.name = upElement.get('name')
632 632 self.datatype = upElement.get('datatype')
633 633 self.inputId = upElement.get('inputId')
634 634
635 635 if self.ELEMENTNAME == "ReadUnit":
636 636 self.datatype = self.datatype.replace("Reader", "")
637 637
638 638 if self.ELEMENTNAME == "ProcUnit":
639 639 self.datatype = self.datatype.replace("Proc", "")
640 640
641 641 if self.inputId == 'None':
642 642 self.inputId = '0'
643 643
644 644 self.opConfObjList = []
645 645
646 646 opElementList = upElement.iter(OperationConf().getElementName())
647 647
648 648 for opElement in opElementList:
649 649 opConfObj = OperationConf()
650 650 opConfObj.readXml(opElement)
651 651 self.opConfObjList.append(opConfObj)
652 652
653 653 def printattr(self):
654 654
655 655 print "%s[%s]: name = %s, datatype = %s, inputId = %s" %(self.ELEMENTNAME,
656 656 self.id,
657 657 self.name,
658 658 self.datatype,
659 659 self.inputId)
660 660
661 661 for opConfObj in self.opConfObjList:
662 662 opConfObj.printattr()
663 663
664 664
665 665 def getKwargs(self):
666 666
667 667 opObj = self.opConfObjList[0]
668 668 kwargs = opObj.getKwargs()
669 669
670 670 return kwargs
671 671
672 672 def createObjects(self, plotter_queue=None):
673 673
674 674 className = eval(self.name)
675 675 kwargs = self.getKwargs()
676 676 procUnitObj = className(**kwargs)
677 677
678 678 for opConfObj in self.opConfObjList:
679 679
680 680 if opConfObj.type=='self' and self.name=='run':
681 681 continue
682 682 elif opConfObj.type=='self':
683 683 procUnitObj.addOperationKwargs(opConfObj.id, **opConfObj.getKwargs())
684 684 continue
685 685
686 686 opObj = opConfObj.createObject(plotter_queue)
687 687
688 688 self.opObjDict[opConfObj.id] = opObj
689 689
690 690 procUnitObj.addOperation(opObj, opConfObj.id)
691 691
692 692 self.procUnitObj = procUnitObj
693 693
694 694 return procUnitObj
695 695
696 696 def run(self):
697 697
698 698 is_ok = False
699 699
700 700 for opConfObj in self.opConfObjList:
701 701
702 702 kwargs = {}
703 703 for parmConfObj in opConfObj.getParameterObjList():
704 704 if opConfObj.name == 'run' and parmConfObj.name == 'datatype':
705 705 continue
706 706
707 707 kwargs[parmConfObj.name] = parmConfObj.getValue()
708 708
709 709 #ini = time.time()
710 710
711 711 #print "\tRunning the '%s' operation with %s" %(opConfObj.name, opConfObj.id)
712 712 sts = self.procUnitObj.call(opType = opConfObj.type,
713 713 opName = opConfObj.name,
714 opId = opConfObj.id,
715 **kwargs)
714 opId = opConfObj.id)
716 715
717 716 # total_time = time.time() - ini
718 717 #
719 718 # if total_time > 0.002:
720 719 # print "%s::%s took %f seconds" %(self.name, opConfObj.name, total_time)
721 720
722 721 is_ok = is_ok or sts
723 722
724 723 return is_ok
725 724
726 725 def close(self):
727 726
728 727 for opConfObj in self.opConfObjList:
729 728 if opConfObj.type == 'self':
730 729 continue
731 730
732 731 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
733 732 opObj.close()
734 733
735 734 self.procUnitObj.close()
736 735
737 736 return
738 737
739 738 class ReadUnitConf(ProcUnitConf):
740 739
741 740 path = None
742 741 startDate = None
743 742 endDate = None
744 743 startTime = None
745 744 endTime = None
746 745
747 746 ELEMENTNAME = 'ReadUnit'
748 747
749 748 def __init__(self):
750 749
751 750 self.id = None
752 751 self.datatype = None
753 752 self.name = None
754 753 self.inputId = None
755 754
756 755 self.parentId = None
757 756
758 757 self.opConfObjList = []
759 758 self.opObjList = []
760 759
761 760 def getElementName(self):
762 761
763 762 return self.ELEMENTNAME
764 763
765 764 def setup(self, id, name, datatype, path='', startDate="", endDate="", startTime="",
766 765 endTime="", parentId=None, queue=None, server=None, **kwargs):
767 766 #Compatible with old signal chain version
768 767 if datatype==None and name==None:
769 768 raise ValueError, "datatype or name should be defined"
770 769
771 770 if name==None:
772 771 if 'Reader' in datatype:
773 772 name = datatype
774 773 else:
775 774 name = '%sReader' %(datatype)
776 775 if datatype==None:
777 776 datatype = name.replace('Reader','')
778 777
779 778 self.id = id
780 779 self.name = name
781 780 self.datatype = datatype
782 781 if path != '':
783 782 self.path = os.path.abspath(path)
784 783 self.startDate = startDate
785 784 self.endDate = endDate
786 785 self.startTime = startTime
787 786 self.endTime = endTime
788 787
789 788 self.inputId = '0'
790 789 self.parentId = parentId
791 790 self.queue = queue
792 791 self.server = server
793 792 self.addRunOperation(**kwargs)
794 793
795 794 def update(self, datatype, path, startDate, endDate, startTime, endTime, parentId=None, name=None, **kwargs):
796 795
797 796 #Compatible with old signal chain version
798 797 if datatype==None and name==None:
799 798 raise ValueError, "datatype or name should be defined"
800 799
801 800 if name==None:
802 801 if 'Reader' in datatype:
803 802 name = datatype
804 803 else:
805 804 name = '%sReader' %(datatype)
806 805
807 806 if datatype==None:
808 807 datatype = name.replace('Reader','')
809 808
810 809 self.datatype = datatype
811 810 self.name = name
812 811 self.path = path
813 812 self.startDate = startDate
814 813 self.endDate = endDate
815 814 self.startTime = startTime
816 815 self.endTime = endTime
817 816
818 817 self.inputId = '0'
819 818 self.parentId = parentId
820 819
821 820 self.updateRunOperation(**kwargs)
822 821
823 822 def removeOperations(self):
824 823
825 824 for obj in self.opConfObjList:
826 825 del obj
827 826
828 827 self.opConfObjList = []
829 828
830 829 def addRunOperation(self, **kwargs):
831 830
832 831 opObj = self.addOperation(name = 'run', optype = 'self')
833 832
834 833 if self.server is None:
835 834 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
836 835 opObj.addParameter(name='path' , value=self.path, format='str')
837 836 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
838 837 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
839 838 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
840 839 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
841 840 opObj.addParameter(name='queue' , value=self.queue, format='obj')
842 841 for key, value in kwargs.items():
843 842 opObj.addParameter(name=key, value=value, format=type(value).__name__)
844 843 else:
845 844 opObj.addParameter(name='server' , value=self.server, format='str')
846 845
847 846
848 847 return opObj
849 848
850 849 def updateRunOperation(self, **kwargs):
851 850
852 851 opObj = self.getOperationObj(name = 'run')
853 852 opObj.removeParameters()
854 853
855 854 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
856 855 opObj.addParameter(name='path' , value=self.path, format='str')
857 856 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
858 857 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
859 858 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
860 859 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
861 860
862 861 for key, value in kwargs.items():
863 862 opObj.addParameter(name=key, value=value, format=type(value).__name__)
864 863
865 864 return opObj
866 865
867 866 # def makeXml(self, projectElement):
868 867 #
869 868 # procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
870 869 # procUnitElement.set('id', str(self.id))
871 870 # procUnitElement.set('name', self.name)
872 871 # procUnitElement.set('datatype', self.datatype)
873 872 # procUnitElement.set('inputId', str(self.inputId))
874 873 #
875 874 # for opConfObj in self.opConfObjList:
876 875 # opConfObj.makeXml(procUnitElement)
877 876
878 877 def readXml(self, upElement):
879 878
880 879 self.id = upElement.get('id')
881 880 self.name = upElement.get('name')
882 881 self.datatype = upElement.get('datatype')
883 882 self.inputId = upElement.get('inputId')
884 883
885 884 if self.ELEMENTNAME == "ReadUnit":
886 885 self.datatype = self.datatype.replace("Reader", "")
887 886
888 887 if self.inputId == 'None':
889 888 self.inputId = '0'
890 889
891 890 self.opConfObjList = []
892 891
893 892 opElementList = upElement.iter(OperationConf().getElementName())
894 893
895 894 for opElement in opElementList:
896 895 opConfObj = OperationConf()
897 896 opConfObj.readXml(opElement)
898 897 self.opConfObjList.append(opConfObj)
899 898
900 899 if opConfObj.name == 'run':
901 900 self.path = opConfObj.getParameterValue('path')
902 901 self.startDate = opConfObj.getParameterValue('startDate')
903 902 self.endDate = opConfObj.getParameterValue('endDate')
904 903 self.startTime = opConfObj.getParameterValue('startTime')
905 904 self.endTime = opConfObj.getParameterValue('endTime')
906 905
907 906 class Project():
908 907
909 908 id = None
910 909 name = None
911 910 description = None
912 911 filename = None
913 912
914 913 procUnitConfObjDict = None
915 914
916 915 ELEMENTNAME = 'Project'
917 916
918 917 plotterQueue = None
919 918
920 919 def __init__(self, plotter_queue=None):
921 920
922 921 self.id = None
923 922 self.name = None
924 923 self.description = None
925 924
926 925 self.plotterQueue = plotter_queue
927 926
928 927 self.procUnitConfObjDict = {}
929 928
930 929 def __getNewId(self):
931 930
932 931 idList = self.procUnitConfObjDict.keys()
933 932
934 933 id = int(self.id)*10
935 934
936 935 while True:
937 936 id += 1
938 937
939 938 if str(id) in idList:
940 939 continue
941 940
942 941 break
943 942
944 943 return str(id)
945 944
946 945 def getElementName(self):
947 946
948 947 return self.ELEMENTNAME
949 948
950 949 def getId(self):
951 950
952 951 return self.id
953 952
954 953 def updateId(self, new_id):
955 954
956 955 self.id = str(new_id)
957 956
958 957 keyList = self.procUnitConfObjDict.keys()
959 958 keyList.sort()
960 959
961 960 n = 1
962 961 newProcUnitConfObjDict = {}
963 962
964 963 for procKey in keyList:
965 964
966 965 procUnitConfObj = self.procUnitConfObjDict[procKey]
967 966 idProcUnit = str(int(self.id)*10 + n)
968 967 procUnitConfObj.updateId(idProcUnit, parentId = self.id)
969 968
970 969 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
971 970 n += 1
972 971
973 972 self.procUnitConfObjDict = newProcUnitConfObjDict
974 973
975 974 def setup(self, id, name, description):
976 975
977 976 self.id = str(id)
978 977 self.name = name
979 978 self.description = description
980 979
981 980 def update(self, name, description):
982 981
983 982 self.name = name
984 983 self.description = description
985 984
986 985 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
987 986 if id is None:
988 987 idReadUnit = self.__getNewId()
989 988 else:
990 989 idReadUnit = str(id)
991 990
992 991 readUnitConfObj = ReadUnitConf()
993 992 readUnitConfObj.setup(idReadUnit, name, datatype, parentId=self.id, **kwargs)
994 993
995 994 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
996 995
997 996 return readUnitConfObj
998 997
999 998 def addProcUnit(self, inputId='0', datatype=None, name=None):
1000 999
1001 1000 idProcUnit = self.__getNewId()
1002 1001
1003 1002 procUnitConfObj = ProcUnitConf()
1004 1003 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, parentId=self.id)
1005 1004
1006 1005 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1007 1006
1008 1007 return procUnitConfObj
1009 1008
1010 1009 def removeProcUnit(self, id):
1011 1010
1012 1011 if id in self.procUnitConfObjDict.keys():
1013 1012 self.procUnitConfObjDict.pop(id)
1014 1013
1015 1014 def getReadUnitId(self):
1016 1015
1017 1016 readUnitConfObj = self.getReadUnitObj()
1018 1017
1019 1018 return readUnitConfObj.id
1020 1019
1021 1020 def getReadUnitObj(self):
1022 1021
1023 1022 for obj in self.procUnitConfObjDict.values():
1024 1023 if obj.getElementName() == "ReadUnit":
1025 1024 return obj
1026 1025
1027 1026 return None
1028 1027
1029 1028 def getProcUnitObj(self, id=None, name=None):
1030 1029
1031 1030 if id != None:
1032 1031 return self.procUnitConfObjDict[id]
1033 1032
1034 1033 if name != None:
1035 1034 return self.getProcUnitObjByName(name)
1036 1035
1037 1036 return None
1038 1037
1039 1038 def getProcUnitObjByName(self, name):
1040 1039
1041 1040 for obj in self.procUnitConfObjDict.values():
1042 1041 if obj.name == name:
1043 1042 return obj
1044 1043
1045 1044 return None
1046 1045
1047 1046 def procUnitItems(self):
1048 1047
1049 1048 return self.procUnitConfObjDict.items()
1050 1049
1051 1050 def makeXml(self):
1052 1051
1053 1052 projectElement = Element('Project')
1054 1053 projectElement.set('id', str(self.id))
1055 1054 projectElement.set('name', self.name)
1056 1055 projectElement.set('description', self.description)
1057 1056
1058 1057 for procUnitConfObj in self.procUnitConfObjDict.values():
1059 1058 procUnitConfObj.makeXml(projectElement)
1060 1059
1061 1060 self.projectElement = projectElement
1062 1061
1063 1062 def writeXml(self, filename=None):
1064 1063
1065 1064 if filename == None:
1066 1065 if self.filename:
1067 1066 filename = self.filename
1068 1067 else:
1069 1068 filename = "schain.xml"
1070 1069
1071 1070 if not filename:
1072 1071 print "filename has not been defined. Use setFilename(filename) for do it."
1073 1072 return 0
1074 1073
1075 1074 abs_file = os.path.abspath(filename)
1076 1075
1077 1076 if not os.access(os.path.dirname(abs_file), os.W_OK):
1078 1077 print "No write permission on %s" %os.path.dirname(abs_file)
1079 1078 return 0
1080 1079
1081 1080 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1082 1081 print "File %s already exists and it could not be overwriten" %abs_file
1083 1082 return 0
1084 1083
1085 1084 self.makeXml()
1086 1085
1087 1086 ElementTree(self.projectElement).write(abs_file, method='xml')
1088 1087
1089 1088 self.filename = abs_file
1090 1089
1091 1090 return 1
1092 1091
1093 1092 def readXml(self, filename = None):
1094 1093
1095 1094 if not filename:
1096 1095 print "filename is not defined"
1097 1096 return 0
1098 1097
1099 1098 abs_file = os.path.abspath(filename)
1100 1099
1101 1100 if not os.path.isfile(abs_file):
1102 1101 print "%s file does not exist" %abs_file
1103 1102 return 0
1104 1103
1105 1104 self.projectElement = None
1106 1105 self.procUnitConfObjDict = {}
1107 1106
1108 1107 try:
1109 1108 self.projectElement = ElementTree().parse(abs_file)
1110 1109 except:
1111 1110 print "Error reading %s, verify file format" %filename
1112 1111 return 0
1113 1112
1114 1113 self.project = self.projectElement.tag
1115 1114
1116 1115 self.id = self.projectElement.get('id')
1117 1116 self.name = self.projectElement.get('name')
1118 1117 self.description = self.projectElement.get('description')
1119 1118
1120 1119 readUnitElementList = self.projectElement.iter(ReadUnitConf().getElementName())
1121 1120
1122 1121 for readUnitElement in readUnitElementList:
1123 1122 readUnitConfObj = ReadUnitConf()
1124 1123 readUnitConfObj.readXml(readUnitElement)
1125 1124
1126 1125 if readUnitConfObj.parentId == None:
1127 1126 readUnitConfObj.parentId = self.id
1128 1127
1129 1128 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1130 1129
1131 1130 procUnitElementList = self.projectElement.iter(ProcUnitConf().getElementName())
1132 1131
1133 1132 for procUnitElement in procUnitElementList:
1134 1133 procUnitConfObj = ProcUnitConf()
1135 1134 procUnitConfObj.readXml(procUnitElement)
1136 1135
1137 1136 if procUnitConfObj.parentId == None:
1138 1137 procUnitConfObj.parentId = self.id
1139 1138
1140 1139 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1141 1140
1142 1141 self.filename = abs_file
1143 1142
1144 1143 return 1
1145 1144
1146 1145 def printattr(self):
1147 1146
1148 1147 print "Project[%s]: name = %s, description = %s" %(self.id,
1149 1148 self.name,
1150 1149 self.description)
1151 1150
1152 1151 for procUnitConfObj in self.procUnitConfObjDict.values():
1153 1152 procUnitConfObj.printattr()
1154 1153
1155 1154 def createObjects(self):
1156 1155
1157 1156 for procUnitConfObj in self.procUnitConfObjDict.values():
1158 1157 procUnitConfObj.createObjects(self.plotterQueue)
1159 1158
1160 1159 def __connect(self, objIN, thisObj):
1161 1160
1162 1161 thisObj.setInput(objIN.getOutputObj())
1163 1162
1164 1163 def connectObjects(self):
1165 1164
1166 1165 for thisPUConfObj in self.procUnitConfObjDict.values():
1167 1166
1168 1167 inputId = thisPUConfObj.getInputId()
1169 1168
1170 1169 if int(inputId) == 0:
1171 1170 continue
1172 1171
1173 1172 #Get input object
1174 1173 puConfINObj = self.procUnitConfObjDict[inputId]
1175 1174 puObjIN = puConfINObj.getProcUnitObj()
1176 1175
1177 1176 #Get current object
1178 1177 thisPUObj = thisPUConfObj.getProcUnitObj()
1179 1178
1180 1179 self.__connect(puObjIN, thisPUObj)
1181 1180
1182 1181 def __handleError(self, procUnitConfObj, send_email=True):
1183 1182
1184 1183 import socket
1185 1184
1186 1185 err = traceback.format_exception(sys.exc_info()[0],
1187 1186 sys.exc_info()[1],
1188 1187 sys.exc_info()[2])
1189 1188
1190 1189 print "***** Error occurred in %s *****" %(procUnitConfObj.name)
1191 1190 print "***** %s" %err[-1]
1192 1191
1193 1192 message = "".join(err)
1194 1193
1195 1194 sys.stderr.write(message)
1196 1195
1197 1196 if not send_email:
1198 1197 return
1199 1198
1200 1199 subject = "SChain v%s: Error running %s\n" %(schainpy.__version__, procUnitConfObj.name)
1201 1200
1202 1201 subtitle = "%s: %s\n" %(procUnitConfObj.getElementName() ,procUnitConfObj.name)
1203 1202 subtitle += "Hostname: %s\n" %socket.gethostbyname(socket.gethostname())
1204 1203 subtitle += "Working directory: %s\n" %os.path.abspath("./")
1205 1204 subtitle += "Configuration file: %s\n" %self.filename
1206 1205 subtitle += "Time: %s\n" %str(datetime.datetime.now())
1207 1206
1208 1207 readUnitConfObj = self.getReadUnitObj()
1209 1208 if readUnitConfObj:
1210 1209 subtitle += "\nInput parameters:\n"
1211 1210 subtitle += "[Data path = %s]\n" %readUnitConfObj.path
1212 1211 subtitle += "[Data type = %s]\n" %readUnitConfObj.datatype
1213 1212 subtitle += "[Start date = %s]\n" %readUnitConfObj.startDate
1214 1213 subtitle += "[End date = %s]\n" %readUnitConfObj.endDate
1215 1214 subtitle += "[Start time = %s]\n" %readUnitConfObj.startTime
1216 1215 subtitle += "[End time = %s]\n" %readUnitConfObj.endTime
1217 1216
1218 1217 adminObj = schainpy.admin.SchainNotify()
1219 1218 adminObj.sendAlert(message=message,
1220 1219 subject=subject,
1221 1220 subtitle=subtitle,
1222 1221 filename=self.filename)
1223 1222
1224 1223 def isPaused(self):
1225 1224 return 0
1226 1225
1227 1226 def isStopped(self):
1228 1227 return 0
1229 1228
1230 1229 def runController(self):
1231 1230 """
1232 1231 returns 0 when this process has been stopped, 1 otherwise
1233 1232 """
1234 1233
1235 1234 if self.isPaused():
1236 1235 print "Process suspended"
1237 1236
1238 1237 while True:
1239 1238 sleep(0.1)
1240 1239
1241 1240 if not self.isPaused():
1242 1241 break
1243 1242
1244 1243 if self.isStopped():
1245 1244 break
1246 1245
1247 1246 print "Process reinitialized"
1248 1247
1249 1248 if self.isStopped():
1250 1249 print "Process stopped"
1251 1250 return 0
1252 1251
1253 1252 return 1
1254 1253
1255 1254 def setFilename(self, filename):
1256 1255
1257 1256 self.filename = filename
1258 1257
1259 1258 def setPlotterQueue(self, plotter_queue):
1260 1259
1261 1260 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1262 1261
1263 1262 def getPlotterQueue(self):
1264 1263
1265 1264 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1266 1265
1267 1266 def useExternalPlotter(self):
1268 1267
1269 1268 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1270 1269
1271 1270
1272 1271 def run(self):
1273 1272
1274 1273 print
1275 1274 print "*"*60
1276 1275 print " Starting SIGNAL CHAIN PROCESSING v%s " %schainpy.__version__
1277 1276 print "*"*60
1278 1277 print
1279 1278
1280 1279 keyList = self.procUnitConfObjDict.keys()
1281 1280 keyList.sort()
1282 1281
1283 1282 while(True):
1284 1283
1285 1284 is_ok = False
1286 1285
1287 1286 for procKey in keyList:
1288 1287 # print "Running the '%s' process with %s" %(procUnitConfObj.name, procUnitConfObj.id)
1289 1288
1290 1289 procUnitConfObj = self.procUnitConfObjDict[procKey]
1291 1290
1292 1291 try:
1293 1292 sts = procUnitConfObj.run()
1294 1293 is_ok = is_ok or sts
1295 1294 except KeyboardInterrupt:
1296 1295 is_ok = False
1297 1296 break
1298 1297 except ValueError, e:
1299 1298 sleep(0.5)
1300 1299 self.__handleError(procUnitConfObj, send_email=True)
1301 1300 is_ok = False
1302 1301 break
1303 1302 except:
1304 1303 sleep(0.5)
1305 1304 self.__handleError(procUnitConfObj)
1306 1305 is_ok = False
1307 1306 break
1308 1307
1309 1308 #If every process unit finished so end process
1310 1309 if not(is_ok):
1311 1310 # print "Every process unit have finished"
1312 1311 break
1313 1312
1314 1313 if not self.runController():
1315 1314 break
1316 1315
1317 1316 #Closing every process
1318 1317 for procKey in keyList:
1319 1318 procUnitConfObj = self.procUnitConfObjDict[procKey]
1320 1319 procUnitConfObj.close()
1321 1320
1322 1321 print "Process finished"
1323 1322
1324 1323 def start(self, filename=None):
1325 1324
1326 1325 self.writeXml(filename)
1327 1326 self.createObjects()
1328 1327 self.connectObjects()
1329 1328 self.run()
@@ -1,1 +1,1
1 <Project description="Testing USRP data reader" id="191" name="test01"><ReadUnit datatype="Spectra" id="1911" inputId="0" name="SpectraReader"><Operation id="19111" name="run" priority="1" type="self"><Parameter format="str" id="191111" name="datatype" value="Spectra" /><Parameter format="str" id="191112" name="path" value="/home/nanosat/john" /><Parameter format="date" id="191113" name="startDate" value="2017/09/03" /><Parameter format="date" id="191114" name="endDate" value="2017/09/03" /><Parameter format="time" id="191115" name="startTime" value="00:00:00" /><Parameter format="time" id="191116" name="endTime" value="23:59:59" /><Parameter format="int" id="191118" name="cursor" value="7" /><Parameter format="int" id="191119" name="skip" value="3" /><Parameter format="int" id="191120" name="walk" value="1" /><Parameter format="int" id="191121" name="online" value="0" /></Operation></ReadUnit><ProcUnit datatype="SpectraProc" id="1912" inputId="1911" name="SpectraProc"><Operation id="19121" name="run" priority="1" type="self"><Parameter format="int" id="191211" name="nProfiles" value="860" /><Parameter format="int" id="191212" name="nFFTPoints" value="8" /></Operation><Operation id="19122" name="PublishData" priority="2" type="other"><Parameter format="int" id="191221" name="zeromq" value="1" /><Parameter format="bool" id="191222" name="verbose" value="0" /></Operation></ProcUnit></Project> No newline at end of file
1 <Project description="Testing USRP data reader" id="191" name="test01"><ReadUnit datatype="Voltage" id="1911" inputId="0" name="VoltageReader"><Operation id="19111" name="run" priority="1" type="self"><Parameter format="str" id="191111" name="datatype" value="Voltage" /><Parameter format="str" id="191112" name="path" value="/home/nanosat/driftsJohn" /><Parameter format="date" id="191113" name="startDate" value="2017/09/08" /><Parameter format="date" id="191114" name="endDate" value="2017/09/08" /><Parameter format="time" id="191115" name="startTime" value="00:00:00" /><Parameter format="time" id="191116" name="endTime" value="23:59:59" /><Parameter format="int" id="191118" name="cursor" value="5" /><Parameter format="int" id="191119" name="skip" value="102" /><Parameter format="int" id="191120" name="delay" value="10" /><Parameter format="int" id="191121" name="walk" value="1" /><Parameter format="int" id="191122" name="online" value="0" /></Operation></ReadUnit><ProcUnit datatype="SpectraProc" id="1912" inputId="1911" name="SpectraProc"><Operation id="19121" name="run" priority="1" type="self"><Parameter format="int" id="191211" name="nFFTPoints" value="10" /></Operation><Operation id="19122" name="CohInt" priority="2" type="external"><Parameter format="float" id="191221" name="n" value="128" /></Operation><Operation id="19123" name="PublishData" priority="3" type="other"><Parameter format="int" id="191231" name="zeromq" value="1" /><Parameter format="bool" id="191232" name="verbose" value="0" /></Operation></ProcUnit></Project> No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now