##// END OF EJS Templates
remove unused kwargs in controller
Juan C. Espinoza -
r1031:f48e7568a388
parent child
Show More
@@ -1,1328 +1,1319
1 1 '''
2 2 Created on September , 2012
3 3 @author:
4 4 '''
5 5
6 6 import sys
7 7 import ast
8 8 import datetime
9 9 import traceback
10 10 import math
11 11 import time
12 12 from multiprocessing import Process, Queue, cpu_count
13 13
14 14 import schainpy
15 15 import schainpy.admin
16 16
17 17 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
18 18 from xml.dom import minidom
19 19
20 20 from schainpy.model import *
21 21 from time import sleep
22 22
23 23 def prettify(elem):
24 24 """Return a pretty-printed XML string for the Element.
25 25 """
26 26 rough_string = tostring(elem, 'utf-8')
27 27 reparsed = minidom.parseString(rough_string)
28 28 return reparsed.toprettyxml(indent=" ")
29 29
30 30 def multiSchain(child, nProcess=cpu_count(), startDate=None, endDate=None, by_day=False):
31 31 skip = 0
32 32 cursor = 0
33 33 nFiles = None
34 34 processes = []
35 35 dt1 = datetime.datetime.strptime(startDate, '%Y/%m/%d')
36 36 dt2 = datetime.datetime.strptime(endDate, '%Y/%m/%d')
37 37 days = (dt2 - dt1).days
38 38
39 39 for day in range(days+1):
40 40 skip = 0
41 41 cursor = 0
42 42 q = Queue()
43 43 processes = []
44 44 dt = (dt1 + datetime.timedelta(day)).strftime('%Y/%m/%d')
45 45 firstProcess = Process(target=child, args=(cursor, skip, q, dt))
46 46 firstProcess.start()
47 47 if by_day:
48 48 continue
49 49 nFiles = q.get()
50 50 if nFiles==0:
51 51 continue
52 52 firstProcess.terminate()
53 53 skip = int(math.ceil(nFiles/nProcess))
54 54 while True:
55 55 processes.append(Process(target=child, args=(cursor, skip, q, dt)))
56 56 processes[cursor].start()
57 57 if nFiles < cursor*skip:
58 58 break
59 59 cursor += 1
60 60
61 61 def beforeExit(exctype, value, trace):
62 62 for process in processes:
63 63 process.terminate()
64 64 process.join()
65 65 print traceback.print_tb(trace)
66 66
67 67 sys.excepthook = beforeExit
68 68
69 69 for process in processes:
70 70 process.join()
71 71 process.terminate()
72 72
73 73 time.sleep(3)
74 74
75 75
76 76 class ParameterConf():
77 77
78 78 id = None
79 79 name = None
80 80 value = None
81 81 format = None
82 82
83 83 __formated_value = None
84 84
85 85 ELEMENTNAME = 'Parameter'
86 86
87 87 def __init__(self):
88 88
89 89 self.format = 'str'
90 90
91 91 def getElementName(self):
92 92
93 93 return self.ELEMENTNAME
94 94
95 95 def getValue(self):
96 96
97 97 value = self.value
98 98 format = self.format
99 99
100 100 if self.__formated_value != None:
101 101
102 102 return self.__formated_value
103 103
104 104 if format == 'obj':
105 105 return value
106 106
107 107 if format == 'str':
108 108 self.__formated_value = str(value)
109 109 return self.__formated_value
110 110
111 111 if value == '':
112 112 raise ValueError, "%s: This parameter value is empty" %self.name
113 113
114 114 if format == 'list':
115 115 strList = value.split(',')
116 116
117 117 self.__formated_value = strList
118 118
119 119 return self.__formated_value
120 120
121 121 if format == 'intlist':
122 122 """
123 123 Example:
124 124 value = (0,1,2)
125 125 """
126 126
127 127 new_value = ast.literal_eval(value)
128 128
129 129 if type(new_value) not in (tuple, list):
130 130 new_value = [int(new_value)]
131 131
132 132 self.__formated_value = new_value
133 133
134 134 return self.__formated_value
135 135
136 136 if format == 'floatlist':
137 137 """
138 138 Example:
139 139 value = (0.5, 1.4, 2.7)
140 140 """
141 141
142 142 new_value = ast.literal_eval(value)
143 143
144 144 if type(new_value) not in (tuple, list):
145 145 new_value = [float(new_value)]
146 146
147 147 self.__formated_value = new_value
148 148
149 149 return self.__formated_value
150 150
151 151 if format == 'date':
152 152 strList = value.split('/')
153 153 intList = [int(x) for x in strList]
154 154 date = datetime.date(intList[0], intList[1], intList[2])
155 155
156 156 self.__formated_value = date
157 157
158 158 return self.__formated_value
159 159
160 160 if format == 'time':
161 161 strList = value.split(':')
162 162 intList = [int(x) for x in strList]
163 163 time = datetime.time(intList[0], intList[1], intList[2])
164 164
165 165 self.__formated_value = time
166 166
167 167 return self.__formated_value
168 168
169 169 if format == 'pairslist':
170 170 """
171 171 Example:
172 172 value = (0,1),(1,2)
173 173 """
174 174
175 175 new_value = ast.literal_eval(value)
176 176
177 177 if type(new_value) not in (tuple, list):
178 178 raise ValueError, "%s has to be a tuple or list of pairs" %value
179 179
180 180 if type(new_value[0]) not in (tuple, list):
181 181 if len(new_value) != 2:
182 182 raise ValueError, "%s has to be a tuple or list of pairs" %value
183 183 new_value = [new_value]
184 184
185 185 for thisPair in new_value:
186 186 if len(thisPair) != 2:
187 187 raise ValueError, "%s has to be a tuple or list of pairs" %value
188 188
189 189 self.__formated_value = new_value
190 190
191 191 return self.__formated_value
192 192
193 193 if format == 'multilist':
194 194 """
195 195 Example:
196 196 value = (0,1,2),(3,4,5)
197 197 """
198 198 multiList = ast.literal_eval(value)
199 199
200 200 if type(multiList[0]) == int:
201 201 multiList = ast.literal_eval("(" + value + ")")
202 202
203 203 self.__formated_value = multiList
204 204
205 205 return self.__formated_value
206 206
207 207 if format == 'bool':
208 208 value = int(value)
209 209
210 210 if format == 'int':
211 211 value = float(value)
212 212
213 213 format_func = eval(format)
214 214
215 215 self.__formated_value = format_func(value)
216 216
217 217 return self.__formated_value
218 218
219 219 def updateId(self, new_id):
220 220
221 221 self.id = str(new_id)
222 222
223 223 def setup(self, id, name, value, format='str'):
224 224 self.id = str(id)
225 225 self.name = name
226 226 if format == 'obj':
227 227 self.value = value
228 228 else:
229 229 self.value = str(value)
230 230 self.format = str.lower(format)
231 231
232 232 self.getValue()
233 233
234 234 return 1
235 235
236 236 def update(self, name, value, format='str'):
237 237
238 238 self.name = name
239 239 self.value = str(value)
240 240 self.format = format
241 241
242 242 def makeXml(self, opElement):
243 243 if self.name not in ('queue',):
244 244 parmElement = SubElement(opElement, self.ELEMENTNAME)
245 245 parmElement.set('id', str(self.id))
246 246 parmElement.set('name', self.name)
247 247 parmElement.set('value', self.value)
248 248 parmElement.set('format', self.format)
249 249
250 250 def readXml(self, parmElement):
251 251
252 252 self.id = parmElement.get('id')
253 253 self.name = parmElement.get('name')
254 254 self.value = parmElement.get('value')
255 255 self.format = str.lower(parmElement.get('format'))
256 256
257 257 #Compatible with old signal chain version
258 258 if self.format == 'int' and self.name == 'idfigure':
259 259 self.name = 'id'
260 260
261 261 def printattr(self):
262 262
263 263 print "Parameter[%s]: name = %s, value = %s, format = %s" %(self.id, self.name, self.value, self.format)
264 264
265 265 class OperationConf():
266 266
267 267 id = None
268 268 name = None
269 269 priority = None
270 270 type = None
271 271
272 272 parmConfObjList = []
273 273
274 274 ELEMENTNAME = 'Operation'
275 275
276 276 def __init__(self):
277 277
278 278 self.id = '0'
279 279 self.name = None
280 280 self.priority = None
281 281 self.type = 'self'
282 282
283 283
284 284 def __getNewId(self):
285 285
286 286 return int(self.id)*10 + len(self.parmConfObjList) + 1
287 287
288 288 def updateId(self, new_id):
289 289
290 290 self.id = str(new_id)
291 291
292 292 n = 1
293 293 for parmObj in self.parmConfObjList:
294 294
295 295 idParm = str(int(new_id)*10 + n)
296 296 parmObj.updateId(idParm)
297 297
298 298 n += 1
299 299
300 300 def getElementName(self):
301 301
302 302 return self.ELEMENTNAME
303 303
304 304 def getParameterObjList(self):
305 305
306 306 return self.parmConfObjList
307 307
308 308 def getParameterObj(self, parameterName):
309 309
310 310 for parmConfObj in self.parmConfObjList:
311 311
312 312 if parmConfObj.name != parameterName:
313 313 continue
314 314
315 315 return parmConfObj
316 316
317 317 return None
318 318
319 319 def getParameterObjfromValue(self, parameterValue):
320 320
321 321 for parmConfObj in self.parmConfObjList:
322 322
323 323 if parmConfObj.getValue() != parameterValue:
324 324 continue
325 325
326 326 return parmConfObj.getValue()
327 327
328 328 return None
329 329
330 330 def getParameterValue(self, parameterName):
331 331
332 332 parameterObj = self.getParameterObj(parameterName)
333 333
334 334 # if not parameterObj:
335 335 # return None
336 336
337 337 value = parameterObj.getValue()
338 338
339 339 return value
340 340
341 341
342 342 def getKwargs(self):
343 343
344 344 kwargs = {}
345 345
346 346 for parmConfObj in self.parmConfObjList:
347 347 if self.name == 'run' and parmConfObj.name == 'datatype':
348 348 continue
349 349
350 350 kwargs[parmConfObj.name] = parmConfObj.getValue()
351 351
352 352 return kwargs
353 353
354 354 def setup(self, id, name, priority, type):
355 355
356 356 self.id = str(id)
357 357 self.name = name
358 358 self.type = type
359 359 self.priority = priority
360 360
361 361 self.parmConfObjList = []
362 362
363 363 def removeParameters(self):
364 364
365 365 for obj in self.parmConfObjList:
366 366 del obj
367 367
368 368 self.parmConfObjList = []
369 369
370 370 def addParameter(self, name, value, format='str'):
371 371
372 372 id = self.__getNewId()
373 373
374 374 parmConfObj = ParameterConf()
375 375 if not parmConfObj.setup(id, name, value, format):
376 376 return None
377 377
378 378 self.parmConfObjList.append(parmConfObj)
379 379
380 380 return parmConfObj
381 381
382 382 def changeParameter(self, name, value, format='str'):
383 383
384 384 parmConfObj = self.getParameterObj(name)
385 385 parmConfObj.update(name, value, format)
386 386
387 387 return parmConfObj
388 388
389 389 def makeXml(self, procUnitElement):
390 390
391 391 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
392 392 opElement.set('id', str(self.id))
393 393 opElement.set('name', self.name)
394 394 opElement.set('type', self.type)
395 395 opElement.set('priority', str(self.priority))
396 396
397 397 for parmConfObj in self.parmConfObjList:
398 398 parmConfObj.makeXml(opElement)
399 399
400 400 def readXml(self, opElement):
401 401
402 402 self.id = opElement.get('id')
403 403 self.name = opElement.get('name')
404 404 self.type = opElement.get('type')
405 405 self.priority = opElement.get('priority')
406 406
407 407 #Compatible with old signal chain version
408 408 #Use of 'run' method instead 'init'
409 409 if self.type == 'self' and self.name == 'init':
410 410 self.name = 'run'
411 411
412 412 self.parmConfObjList = []
413 413
414 414 parmElementList = opElement.iter(ParameterConf().getElementName())
415 415
416 416 for parmElement in parmElementList:
417 417 parmConfObj = ParameterConf()
418 418 parmConfObj.readXml(parmElement)
419 419
420 420 #Compatible with old signal chain version
421 421 #If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
422 422 if self.type != 'self' and self.name == 'Plot':
423 423 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
424 424 self.name = parmConfObj.value
425 425 continue
426 426
427 427 self.parmConfObjList.append(parmConfObj)
428 428
429 429 def printattr(self):
430 430
431 431 print "%s[%s]: name = %s, type = %s, priority = %s" %(self.ELEMENTNAME,
432 432 self.id,
433 433 self.name,
434 434 self.type,
435 435 self.priority)
436 436
437 437 for parmConfObj in self.parmConfObjList:
438 438 parmConfObj.printattr()
439 439
440 440 def createObject(self, plotter_queue=None):
441 441
442 442
443 443 if self.type == 'self':
444 444 raise ValueError, "This operation type cannot be created"
445 445
446 446 if self.type == 'plotter':
447 447 #Plotter(plotter_name)
448 448 if not plotter_queue:
449 449 raise ValueError, "plotter_queue is not defined. Use:\nmyProject = Project()\nmyProject.setPlotterQueue(plotter_queue)"
450 450
451 451 opObj = Plotter(self.name, plotter_queue)
452 452
453 453 if self.type == 'external' or self.type == 'other':
454 454
455 455 className = eval(self.name)
456 456 kwargs = self.getKwargs()
457 457
458 458 opObj = className(**kwargs)
459 459
460 460 return opObj
461 461
462 462
463 463 class ProcUnitConf():
464 464
465 465 id = None
466 466 name = None
467 467 datatype = None
468 468 inputId = None
469 469 parentId = None
470 470
471 471 opConfObjList = []
472 472
473 473 procUnitObj = None
474 474 opObjList = []
475 475
476 476 ELEMENTNAME = 'ProcUnit'
477 477
478 478 def __init__(self):
479 479
480 480 self.id = None
481 481 self.datatype = None
482 482 self.name = None
483 483 self.inputId = None
484 484
485 485 self.opConfObjList = []
486 486
487 487 self.procUnitObj = None
488 488 self.opObjDict = {}
489 489
490 490 def __getPriority(self):
491 491
492 492 return len(self.opConfObjList)+1
493 493
494 494 def __getNewId(self):
495 495
496 496 return int(self.id)*10 + len(self.opConfObjList) + 1
497 497
498 498 def getElementName(self):
499 499
500 500 return self.ELEMENTNAME
501 501
502 502 def getId(self):
503 503
504 504 return self.id
505 505
506 506 def updateId(self, new_id, parentId=parentId):
507 507
508 508
509 509 new_id = int(parentId)*10 + (int(self.id) % 10)
510 510 new_inputId = int(parentId)*10 + (int(self.inputId) % 10)
511 511
512 512 #If this proc unit has not inputs
513 513 if self.inputId == '0':
514 514 new_inputId = 0
515 515
516 516 n = 1
517 517 for opConfObj in self.opConfObjList:
518 518
519 519 idOp = str(int(new_id)*10 + n)
520 520 opConfObj.updateId(idOp)
521 521
522 522 n += 1
523 523
524 524 self.parentId = str(parentId)
525 525 self.id = str(new_id)
526 526 self.inputId = str(new_inputId)
527 527
528 528
529 529 def getInputId(self):
530 530
531 531 return self.inputId
532 532
533 533 def getOperationObjList(self):
534 534
535 535 return self.opConfObjList
536 536
537 537 def getOperationObj(self, name=None):
538 538
539 539 for opConfObj in self.opConfObjList:
540 540
541 541 if opConfObj.name != name:
542 542 continue
543 543
544 544 return opConfObj
545 545
546 546 return None
547 547
548 548 def getOpObjfromParamValue(self, value=None):
549 549
550 550 for opConfObj in self.opConfObjList:
551 551 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
552 552 continue
553 553 return opConfObj
554 554 return None
555 555
556 556 def getProcUnitObj(self):
557 557
558 558 return self.procUnitObj
559 559
560 560 def setup(self, id, name, datatype, inputId, parentId=None):
561 561
562 562 #Compatible with old signal chain version
563 563 if datatype==None and name==None:
564 564 raise ValueError, "datatype or name should be defined"
565 565
566 566 if name==None:
567 567 if 'Proc' in datatype:
568 568 name = datatype
569 569 else:
570 570 name = '%sProc' %(datatype)
571 571
572 572 if datatype==None:
573 573 datatype = name.replace('Proc','')
574 574
575 575 self.id = str(id)
576 576 self.name = name
577 577 self.datatype = datatype
578 578 self.inputId = inputId
579 579 self.parentId = parentId
580 580
581 581 self.opConfObjList = []
582 582
583 583 self.addOperation(name='run', optype='self')
584 584
585 585 def removeOperations(self):
586 586
587 587 for obj in self.opConfObjList:
588 588 del obj
589 589
590 590 self.opConfObjList = []
591 591 self.addOperation(name='run')
592 592
593 593 def addParameter(self, **kwargs):
594 594 '''
595 595 Add parameters to "run" operation
596 596 '''
597 597 opObj = self.opConfObjList[0]
598 598
599 599 opObj.addParameter(**kwargs)
600 600
601 601 return opObj
602 602
603 603 def addOperation(self, name, optype='self'):
604 604
605 605 id = self.__getNewId()
606 606 priority = self.__getPriority()
607 607
608 608 opConfObj = OperationConf()
609 609 opConfObj.setup(id, name=name, priority=priority, type=optype)
610 610
611 611 self.opConfObjList.append(opConfObj)
612 612
613 613 return opConfObj
614 614
615 615 def makeXml(self, projectElement):
616 616
617 617 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
618 618 procUnitElement.set('id', str(self.id))
619 619 procUnitElement.set('name', self.name)
620 620 procUnitElement.set('datatype', self.datatype)
621 621 procUnitElement.set('inputId', str(self.inputId))
622 622
623 623 for opConfObj in self.opConfObjList:
624 624 opConfObj.makeXml(procUnitElement)
625 625
626 626 def readXml(self, upElement):
627 627
628 628 self.id = upElement.get('id')
629 629 self.name = upElement.get('name')
630 630 self.datatype = upElement.get('datatype')
631 631 self.inputId = upElement.get('inputId')
632 632
633 633 if self.ELEMENTNAME == "ReadUnit":
634 634 self.datatype = self.datatype.replace("Reader", "")
635 635
636 636 if self.ELEMENTNAME == "ProcUnit":
637 637 self.datatype = self.datatype.replace("Proc", "")
638 638
639 639 if self.inputId == 'None':
640 640 self.inputId = '0'
641 641
642 642 self.opConfObjList = []
643 643
644 644 opElementList = upElement.iter(OperationConf().getElementName())
645 645
646 646 for opElement in opElementList:
647 647 opConfObj = OperationConf()
648 648 opConfObj.readXml(opElement)
649 649 self.opConfObjList.append(opConfObj)
650 650
651 651 def printattr(self):
652 652
653 653 print "%s[%s]: name = %s, datatype = %s, inputId = %s" %(self.ELEMENTNAME,
654 654 self.id,
655 655 self.name,
656 656 self.datatype,
657 657 self.inputId)
658 658
659 659 for opConfObj in self.opConfObjList:
660 660 opConfObj.printattr()
661 661
662 662
663 663 def getKwargs(self):
664 664
665 665 opObj = self.opConfObjList[0]
666 666 kwargs = opObj.getKwargs()
667 667
668 668 return kwargs
669 669
670 670 def createObjects(self, plotter_queue=None):
671 671
672 672 className = eval(self.name)
673 673 kwargs = self.getKwargs()
674 674 procUnitObj = className(**kwargs)
675 675
676 676 for opConfObj in self.opConfObjList:
677 677
678 678 if opConfObj.type=='self' and self.name=='run':
679 679 continue
680 680 elif opConfObj.type=='self':
681 681 procUnitObj.addOperationKwargs(opConfObj.id, **opConfObj.getKwargs())
682 682 continue
683 683
684 684 opObj = opConfObj.createObject(plotter_queue)
685 685
686 686 self.opObjDict[opConfObj.id] = opObj
687 687
688 688 procUnitObj.addOperation(opObj, opConfObj.id)
689 689
690 690 self.procUnitObj = procUnitObj
691 691
692 692 return procUnitObj
693 693
694 694 def run(self):
695 695
696 696 is_ok = False
697 697
698 698 for opConfObj in self.opConfObjList:
699 699
700 700 kwargs = {}
701 701 for parmConfObj in opConfObj.getParameterObjList():
702 702 if opConfObj.name == 'run' and parmConfObj.name == 'datatype':
703 703 continue
704 704
705 705 kwargs[parmConfObj.name] = parmConfObj.getValue()
706 706
707 #ini = time.time()
708
709 #print "\tRunning the '%s' operation with %s" %(opConfObj.name, opConfObj.id)
710 707 sts = self.procUnitObj.call(opType = opConfObj.type,
711 708 opName = opConfObj.name,
712 opId = opConfObj.id,
713 **kwargs)
714
715 # total_time = time.time() - ini
716 #
717 # if total_time > 0.002:
718 # print "%s::%s took %f seconds" %(self.name, opConfObj.name, total_time)
709 opId = opConfObj.id)
719 710
720 711 is_ok = is_ok or sts
721 712
722 713 return is_ok
723 714
724 715 def close(self):
725 716
726 717 for opConfObj in self.opConfObjList:
727 718 if opConfObj.type == 'self':
728 719 continue
729 720
730 721 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
731 722 opObj.close()
732 723
733 724 self.procUnitObj.close()
734 725
735 726 return
736 727
737 728 class ReadUnitConf(ProcUnitConf):
738 729
739 730 path = None
740 731 startDate = None
741 732 endDate = None
742 733 startTime = None
743 734 endTime = None
744 735
745 736 ELEMENTNAME = 'ReadUnit'
746 737
747 738 def __init__(self):
748 739
749 740 self.id = None
750 741 self.datatype = None
751 742 self.name = None
752 743 self.inputId = None
753 744
754 745 self.parentId = None
755 746
756 747 self.opConfObjList = []
757 748 self.opObjList = []
758 749
759 750 def getElementName(self):
760 751
761 752 return self.ELEMENTNAME
762 753
763 754 def setup(self, id, name, datatype, path='', startDate="", endDate="", startTime="",
764 755 endTime="", parentId=None, queue=None, server=None, **kwargs):
765 756
766 757 #Compatible with old signal chain version
767 758 if datatype==None and name==None:
768 759 raise ValueError, "datatype or name should be defined"
769 760
770 761 if name==None:
771 762 if 'Reader' in datatype:
772 763 name = datatype
773 764 else:
774 765 name = '%sReader' %(datatype)
775 766 if datatype==None:
776 767 datatype = name.replace('Reader','')
777 768
778 769 self.id = id
779 770 self.name = name
780 771 self.datatype = datatype
781 772 if path != '':
782 773 self.path = os.path.abspath(path)
783 774 self.startDate = startDate
784 775 self.endDate = endDate
785 776 self.startTime = startTime
786 777 self.endTime = endTime
787 778
788 779 self.inputId = '0'
789 780 self.parentId = parentId
790 781 self.queue = queue
791 782 self.server = server
792 783 self.addRunOperation(**kwargs)
793 784
794 785 def update(self, datatype, path, startDate, endDate, startTime, endTime, parentId=None, name=None, **kwargs):
795 786
796 787 #Compatible with old signal chain version
797 788 if datatype==None and name==None:
798 789 raise ValueError, "datatype or name should be defined"
799 790
800 791 if name==None:
801 792 if 'Reader' in datatype:
802 793 name = datatype
803 794 else:
804 795 name = '%sReader' %(datatype)
805 796
806 797 if datatype==None:
807 798 datatype = name.replace('Reader','')
808 799
809 800 self.datatype = datatype
810 801 self.name = name
811 802 self.path = path
812 803 self.startDate = startDate
813 804 self.endDate = endDate
814 805 self.startTime = startTime
815 806 self.endTime = endTime
816 807
817 808 self.inputId = '0'
818 809 self.parentId = parentId
819 810
820 811 self.updateRunOperation(**kwargs)
821 812
822 813 def removeOperations(self):
823 814
824 815 for obj in self.opConfObjList:
825 816 del obj
826 817
827 818 self.opConfObjList = []
828 819
829 820 def addRunOperation(self, **kwargs):
830 821
831 822 opObj = self.addOperation(name = 'run', optype = 'self')
832 823
833 824 if self.server is None:
834 825 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
835 826 opObj.addParameter(name='path' , value=self.path, format='str')
836 827 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
837 828 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
838 829 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
839 830 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
840 831 opObj.addParameter(name='queue' , value=self.queue, format='obj')
841 832 for key, value in kwargs.items():
842 833 opObj.addParameter(name=key, value=value, format=type(value).__name__)
843 834 else:
844 835 opObj.addParameter(name='server' , value=self.server, format='str')
845 836
846 837
847 838 return opObj
848 839
849 840 def updateRunOperation(self, **kwargs):
850 841
851 842 opObj = self.getOperationObj(name = 'run')
852 843 opObj.removeParameters()
853 844
854 845 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
855 846 opObj.addParameter(name='path' , value=self.path, format='str')
856 847 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
857 848 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
858 849 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
859 850 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
860 851
861 852 for key, value in kwargs.items():
862 853 opObj.addParameter(name=key, value=value, format=type(value).__name__)
863 854
864 855 return opObj
865 856
866 857 # def makeXml(self, projectElement):
867 858 #
868 859 # procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
869 860 # procUnitElement.set('id', str(self.id))
870 861 # procUnitElement.set('name', self.name)
871 862 # procUnitElement.set('datatype', self.datatype)
872 863 # procUnitElement.set('inputId', str(self.inputId))
873 864 #
874 865 # for opConfObj in self.opConfObjList:
875 866 # opConfObj.makeXml(procUnitElement)
876 867
877 868 def readXml(self, upElement):
878 869
879 870 self.id = upElement.get('id')
880 871 self.name = upElement.get('name')
881 872 self.datatype = upElement.get('datatype')
882 873 self.inputId = upElement.get('inputId')
883 874
884 875 if self.ELEMENTNAME == "ReadUnit":
885 876 self.datatype = self.datatype.replace("Reader", "")
886 877
887 878 if self.inputId == 'None':
888 879 self.inputId = '0'
889 880
890 881 self.opConfObjList = []
891 882
892 883 opElementList = upElement.iter(OperationConf().getElementName())
893 884
894 885 for opElement in opElementList:
895 886 opConfObj = OperationConf()
896 887 opConfObj.readXml(opElement)
897 888 self.opConfObjList.append(opConfObj)
898 889
899 890 if opConfObj.name == 'run':
900 891 self.path = opConfObj.getParameterValue('path')
901 892 self.startDate = opConfObj.getParameterValue('startDate')
902 893 self.endDate = opConfObj.getParameterValue('endDate')
903 894 self.startTime = opConfObj.getParameterValue('startTime')
904 895 self.endTime = opConfObj.getParameterValue('endTime')
905 896
906 897 class Project():
907 898
908 899 id = None
909 900 name = None
910 901 description = None
911 902 filename = None
912 903
913 904 procUnitConfObjDict = None
914 905
915 906 ELEMENTNAME = 'Project'
916 907
917 908 plotterQueue = None
918 909
919 910 def __init__(self, plotter_queue=None):
920 911
921 912 self.id = None
922 913 self.name = None
923 914 self.description = None
924 915
925 916 self.plotterQueue = plotter_queue
926 917
927 918 self.procUnitConfObjDict = {}
928 919
929 920 def __getNewId(self):
930 921
931 922 idList = self.procUnitConfObjDict.keys()
932 923
933 924 id = int(self.id)*10
934 925
935 926 while True:
936 927 id += 1
937 928
938 929 if str(id) in idList:
939 930 continue
940 931
941 932 break
942 933
943 934 return str(id)
944 935
945 936 def getElementName(self):
946 937
947 938 return self.ELEMENTNAME
948 939
949 940 def getId(self):
950 941
951 942 return self.id
952 943
953 944 def updateId(self, new_id):
954 945
955 946 self.id = str(new_id)
956 947
957 948 keyList = self.procUnitConfObjDict.keys()
958 949 keyList.sort()
959 950
960 951 n = 1
961 952 newProcUnitConfObjDict = {}
962 953
963 954 for procKey in keyList:
964 955
965 956 procUnitConfObj = self.procUnitConfObjDict[procKey]
966 957 idProcUnit = str(int(self.id)*10 + n)
967 958 procUnitConfObj.updateId(idProcUnit, parentId = self.id)
968 959
969 960 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
970 961 n += 1
971 962
972 963 self.procUnitConfObjDict = newProcUnitConfObjDict
973 964
974 965 def setup(self, id, name, description):
975 966
976 967 self.id = str(id)
977 968 self.name = name
978 969 self.description = description
979 970
980 971 def update(self, name, description):
981 972
982 973 self.name = name
983 974 self.description = description
984 975
985 976 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
986 977
987 978 if id is None:
988 979 idReadUnit = self.__getNewId()
989 980 else:
990 981 idReadUnit = str(id)
991 982
992 983 readUnitConfObj = ReadUnitConf()
993 984 readUnitConfObj.setup(idReadUnit, name, datatype, parentId=self.id, **kwargs)
994 985
995 986 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
996 987
997 988 return readUnitConfObj
998 989
999 990 def addProcUnit(self, inputId='0', datatype=None, name=None):
1000 991
1001 992 idProcUnit = self.__getNewId()
1002 993
1003 994 procUnitConfObj = ProcUnitConf()
1004 995 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, parentId=self.id)
1005 996
1006 997 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1007 998
1008 999 return procUnitConfObj
1009 1000
1010 1001 def removeProcUnit(self, id):
1011 1002
1012 1003 if id in self.procUnitConfObjDict.keys():
1013 1004 self.procUnitConfObjDict.pop(id)
1014 1005
1015 1006 def getReadUnitId(self):
1016 1007
1017 1008 readUnitConfObj = self.getReadUnitObj()
1018 1009
1019 1010 return readUnitConfObj.id
1020 1011
1021 1012 def getReadUnitObj(self):
1022 1013
1023 1014 for obj in self.procUnitConfObjDict.values():
1024 1015 if obj.getElementName() == "ReadUnit":
1025 1016 return obj
1026 1017
1027 1018 return None
1028 1019
1029 1020 def getProcUnitObj(self, id=None, name=None):
1030 1021
1031 1022 if id != None:
1032 1023 return self.procUnitConfObjDict[id]
1033 1024
1034 1025 if name != None:
1035 1026 return self.getProcUnitObjByName(name)
1036 1027
1037 1028 return None
1038 1029
1039 1030 def getProcUnitObjByName(self, name):
1040 1031
1041 1032 for obj in self.procUnitConfObjDict.values():
1042 1033 if obj.name == name:
1043 1034 return obj
1044 1035
1045 1036 return None
1046 1037
1047 1038 def procUnitItems(self):
1048 1039
1049 1040 return self.procUnitConfObjDict.items()
1050 1041
1051 1042 def makeXml(self):
1052 1043
1053 1044 projectElement = Element('Project')
1054 1045 projectElement.set('id', str(self.id))
1055 1046 projectElement.set('name', self.name)
1056 1047 projectElement.set('description', self.description)
1057 1048
1058 1049 for procUnitConfObj in self.procUnitConfObjDict.values():
1059 1050 procUnitConfObj.makeXml(projectElement)
1060 1051
1061 1052 self.projectElement = projectElement
1062 1053
1063 1054 def writeXml(self, filename=None):
1064 1055
1065 1056 if filename == None:
1066 1057 if self.filename:
1067 1058 filename = self.filename
1068 1059 else:
1069 1060 filename = "schain.xml"
1070 1061
1071 1062 if not filename:
1072 1063 print "filename has not been defined. Use setFilename(filename) for do it."
1073 1064 return 0
1074 1065
1075 1066 abs_file = os.path.abspath(filename)
1076 1067
1077 1068 if not os.access(os.path.dirname(abs_file), os.W_OK):
1078 1069 print "No write permission on %s" %os.path.dirname(abs_file)
1079 1070 return 0
1080 1071
1081 1072 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1082 1073 print "File %s already exists and it could not be overwriten" %abs_file
1083 1074 return 0
1084 1075
1085 1076 self.makeXml()
1086 1077
1087 1078 ElementTree(self.projectElement).write(abs_file, method='xml')
1088 1079
1089 1080 self.filename = abs_file
1090 1081
1091 1082 return 1
1092 1083
1093 1084 def readXml(self, filename = None):
1094 1085
1095 1086 if not filename:
1096 1087 print "filename is not defined"
1097 1088 return 0
1098 1089
1099 1090 abs_file = os.path.abspath(filename)
1100 1091
1101 1092 if not os.path.isfile(abs_file):
1102 1093 print "%s file does not exist" %abs_file
1103 1094 return 0
1104 1095
1105 1096 self.projectElement = None
1106 1097 self.procUnitConfObjDict = {}
1107 1098
1108 1099 try:
1109 1100 self.projectElement = ElementTree().parse(abs_file)
1110 1101 except:
1111 1102 print "Error reading %s, verify file format" %filename
1112 1103 return 0
1113 1104
1114 1105 self.project = self.projectElement.tag
1115 1106
1116 1107 self.id = self.projectElement.get('id')
1117 1108 self.name = self.projectElement.get('name')
1118 1109 self.description = self.projectElement.get('description')
1119 1110
1120 1111 readUnitElementList = self.projectElement.iter(ReadUnitConf().getElementName())
1121 1112
1122 1113 for readUnitElement in readUnitElementList:
1123 1114 readUnitConfObj = ReadUnitConf()
1124 1115 readUnitConfObj.readXml(readUnitElement)
1125 1116
1126 1117 if readUnitConfObj.parentId == None:
1127 1118 readUnitConfObj.parentId = self.id
1128 1119
1129 1120 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1130 1121
1131 1122 procUnitElementList = self.projectElement.iter(ProcUnitConf().getElementName())
1132 1123
1133 1124 for procUnitElement in procUnitElementList:
1134 1125 procUnitConfObj = ProcUnitConf()
1135 1126 procUnitConfObj.readXml(procUnitElement)
1136 1127
1137 1128 if procUnitConfObj.parentId == None:
1138 1129 procUnitConfObj.parentId = self.id
1139 1130
1140 1131 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1141 1132
1142 1133 self.filename = abs_file
1143 1134
1144 1135 return 1
1145 1136
1146 1137 def printattr(self):
1147 1138
1148 1139 print "Project[%s]: name = %s, description = %s" %(self.id,
1149 1140 self.name,
1150 1141 self.description)
1151 1142
1152 1143 for procUnitConfObj in self.procUnitConfObjDict.values():
1153 1144 procUnitConfObj.printattr()
1154 1145
1155 1146 def createObjects(self):
1156 1147
1157 1148 for procUnitConfObj in self.procUnitConfObjDict.values():
1158 1149 procUnitConfObj.createObjects(self.plotterQueue)
1159 1150
1160 1151 def __connect(self, objIN, thisObj):
1161 1152
1162 1153 thisObj.setInput(objIN.getOutputObj())
1163 1154
1164 1155 def connectObjects(self):
1165 1156
1166 1157 for thisPUConfObj in self.procUnitConfObjDict.values():
1167 1158
1168 1159 inputId = thisPUConfObj.getInputId()
1169 1160
1170 1161 if int(inputId) == 0:
1171 1162 continue
1172 1163
1173 1164 #Get input object
1174 1165 puConfINObj = self.procUnitConfObjDict[inputId]
1175 1166 puObjIN = puConfINObj.getProcUnitObj()
1176 1167
1177 1168 #Get current object
1178 1169 thisPUObj = thisPUConfObj.getProcUnitObj()
1179 1170
1180 1171 self.__connect(puObjIN, thisPUObj)
1181 1172
1182 1173 def __handleError(self, procUnitConfObj, send_email=True):
1183 1174
1184 1175 import socket
1185 1176
1186 1177 err = traceback.format_exception(sys.exc_info()[0],
1187 1178 sys.exc_info()[1],
1188 1179 sys.exc_info()[2])
1189 1180
1190 1181 print "***** Error occurred in %s *****" %(procUnitConfObj.name)
1191 1182 print "***** %s" %err[-1]
1192 1183
1193 1184 message = "".join(err)
1194 1185
1195 1186 sys.stderr.write(message)
1196 1187
1197 1188 if not send_email:
1198 1189 return
1199 1190
1200 1191 subject = "SChain v%s: Error running %s\n" %(schainpy.__version__, procUnitConfObj.name)
1201 1192
1202 1193 subtitle = "%s: %s\n" %(procUnitConfObj.getElementName() ,procUnitConfObj.name)
1203 1194 subtitle += "Hostname: %s\n" %socket.gethostbyname(socket.gethostname())
1204 1195 subtitle += "Working directory: %s\n" %os.path.abspath("./")
1205 1196 subtitle += "Configuration file: %s\n" %self.filename
1206 1197 subtitle += "Time: %s\n" %str(datetime.datetime.now())
1207 1198
1208 1199 readUnitConfObj = self.getReadUnitObj()
1209 1200 if readUnitConfObj:
1210 1201 subtitle += "\nInput parameters:\n"
1211 1202 subtitle += "[Data path = %s]\n" %readUnitConfObj.path
1212 1203 subtitle += "[Data type = %s]\n" %readUnitConfObj.datatype
1213 1204 subtitle += "[Start date = %s]\n" %readUnitConfObj.startDate
1214 1205 subtitle += "[End date = %s]\n" %readUnitConfObj.endDate
1215 1206 subtitle += "[Start time = %s]\n" %readUnitConfObj.startTime
1216 1207 subtitle += "[End time = %s]\n" %readUnitConfObj.endTime
1217 1208
1218 1209 adminObj = schainpy.admin.SchainNotify()
1219 1210 adminObj.sendAlert(message=message,
1220 1211 subject=subject,
1221 1212 subtitle=subtitle,
1222 1213 filename=self.filename)
1223 1214
1224 1215 def isPaused(self):
1225 1216 return 0
1226 1217
1227 1218 def isStopped(self):
1228 1219 return 0
1229 1220
1230 1221 def runController(self):
1231 1222 """
1232 1223 returns 0 when this process has been stopped, 1 otherwise
1233 1224 """
1234 1225
1235 1226 if self.isPaused():
1236 1227 print "Process suspended"
1237 1228
1238 1229 while True:
1239 1230 sleep(0.1)
1240 1231
1241 1232 if not self.isPaused():
1242 1233 break
1243 1234
1244 1235 if self.isStopped():
1245 1236 break
1246 1237
1247 1238 print "Process reinitialized"
1248 1239
1249 1240 if self.isStopped():
1250 1241 print "Process stopped"
1251 1242 return 0
1252 1243
1253 1244 return 1
1254 1245
1255 1246 def setFilename(self, filename):
1256 1247
1257 1248 self.filename = filename
1258 1249
1259 1250 def setPlotterQueue(self, plotter_queue):
1260 1251
1261 1252 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1262 1253
1263 1254 def getPlotterQueue(self):
1264 1255
1265 1256 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1266 1257
1267 1258 def useExternalPlotter(self):
1268 1259
1269 1260 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1270 1261
1271 1262 def run(self):
1272 1263
1273 1264 print
1274 1265 print "*"*60
1275 1266 print " Starting SIGNAL CHAIN PROCESSING v%s " %schainpy.__version__
1276 1267 print "*"*60
1277 1268 print
1278 1269
1279 1270 keyList = self.procUnitConfObjDict.keys()
1280 1271 keyList.sort()
1281 1272
1282 1273 while(True):
1283 1274
1284 1275 is_ok = False
1285 1276
1286 1277 for procKey in keyList:
1287 1278 # print "Running the '%s' process with %s" %(procUnitConfObj.name, procUnitConfObj.id)
1288 1279
1289 1280 procUnitConfObj = self.procUnitConfObjDict[procKey]
1290 1281
1291 1282 try:
1292 1283 sts = procUnitConfObj.run()
1293 1284 is_ok = is_ok or sts
1294 1285 except KeyboardInterrupt:
1295 1286 is_ok = False
1296 1287 break
1297 1288 except ValueError, e:
1298 1289 sleep(0.5)
1299 1290 self.__handleError(procUnitConfObj, send_email=True)
1300 1291 is_ok = False
1301 1292 break
1302 1293 except:
1303 1294 sleep(0.5)
1304 1295 self.__handleError(procUnitConfObj)
1305 1296 is_ok = False
1306 1297 break
1307 1298
1308 1299 #If every process unit finished so end process
1309 1300 if not(is_ok):
1310 1301 # print "Every process unit have finished"
1311 1302 break
1312 1303
1313 1304 if not self.runController():
1314 1305 break
1315 1306
1316 1307 #Closing every process
1317 1308 for procKey in keyList:
1318 1309 procUnitConfObj = self.procUnitConfObjDict[procKey]
1319 1310 procUnitConfObj.close()
1320 1311
1321 1312 print "Process finished"
1322 1313
1323 1314 def start(self, filename=None):
1324 1315
1325 1316 self.writeXml(filename)
1326 1317 self.createObjects()
1327 1318 self.connectObjects()
1328 1319 self.run()
General Comments 0
You need to be logged in to leave comments. Login now