##// END OF EJS Templates
fix multiSchain
jespinoza -
r895:d092fa5fcc1d
parent child
Show More
@@ -1,1314 +1,1318
1 1 '''
2 2 Created on September , 2012
3 3 @author:
4 4 '''
5 5
6 6 import sys
7 7 import ast
8 8 import datetime
9 9 import traceback
10 10 from multiprocessing import Process, Queue, cpu_count
11 11
12 12 import schainpy
13 13 import schainpy.admin
14 14
15 15 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
16 16 from xml.dom import minidom
17 17
18 18 from schainpy.model import *
19 19 from time import sleep
20 20
21 21 def prettify(elem):
22 22 """Return a pretty-printed XML string for the Element.
23 23 """
24 24 rough_string = tostring(elem, 'utf-8')
25 25 reparsed = minidom.parseString(rough_string)
26 26 return reparsed.toprettyxml(indent=" ")
27 27
28 28 def multiSchain(child, nProcess=cpu_count(), startDate=None, endDate=None):
29 29 skip = 0
30 30 cursor = 0
31 31 nFiles = None
32 32 processes = []
33 33
34 34 dt1 = datetime.datetime.strptime(startDate, '%Y/%m/%d')
35 35 dt2 = datetime.datetime.strptime(endDate, '%Y/%m/%d')
36 36 days = (dt2 - dt1).days
37 37 print days
38 38 for day in range(days+1):
39 39 skip = 0
40 40 cursor = 0
41 41 q = Queue()
42 42 processes = []
43 43 dt = (dt1 + datetime.timedelta(day)).strftime('%Y/%m/%d')
44 44 firstProcess = Process(target=child, args=(cursor, skip, q, dt))
45 45 firstProcess.start()
46 print 'a'
46 47 nFiles = q.get()
48
49 print nFiles
47 50 firstProcess.terminate()
48 51 skip = int(math.ceil(nFiles/nProcess))
49 52 try:
50 53 while True:
51 54 processes.append(Process(target=child, args=(cursor, skip, q, dt)))
52 55 processes[cursor].start()
53 56 if nFiles < cursor*skip:
54 57 break
55 58 cursor += 1
56 59 except KeyboardInterrupt:
57 60 for process in processes:
58 61 process.terminate()
59 62 process.join()
60 63 for process in processes:
61 64 process.join()
62 65 #process.terminate()
63 66 sleep(3)
64 67
65 68 try:
66 69 while True:
67 70 pass
68 71 except KeyboardInterrupt:
69 72 for process in processes:
70 73 process.terminate()
71 74 process.join()
72 75
73 76 class ParameterConf():
74 77
75 78 id = None
76 79 name = None
77 80 value = None
78 81 format = None
79 82
80 83 __formated_value = None
81 84
82 85 ELEMENTNAME = 'Parameter'
83 86
84 87 def __init__(self):
85 88
86 89 self.format = 'str'
87 90
88 91 def getElementName(self):
89 92
90 93 return self.ELEMENTNAME
91 94
92 95 def getValue(self):
93 96
94 97 value = self.value
95 98 format = self.format
96 99
97 100 if self.__formated_value != None:
98 101
99 102 return self.__formated_value
100 103
101 104 if format == 'obj':
102 105 return value
103 106
104 107 if format == 'str':
105 108 self.__formated_value = str(value)
106 109 return self.__formated_value
107 110
108 111 if value == '':
109 112 raise ValueError, "%s: This parameter value is empty" %self.name
110 113
111 114 if format == 'list':
112 115 strList = value.split(',')
113 116
114 117 self.__formated_value = strList
115 118
116 119 return self.__formated_value
117 120
118 121 if format == 'intlist':
119 122 """
120 123 Example:
121 124 value = (0,1,2)
122 125 """
123 126
124 127 new_value = ast.literal_eval(value)
125 128
126 129 if type(new_value) not in (tuple, list):
127 130 new_value = [int(new_value)]
128 131
129 132 self.__formated_value = new_value
130 133
131 134 return self.__formated_value
132 135
133 136 if format == 'floatlist':
134 137 """
135 138 Example:
136 139 value = (0.5, 1.4, 2.7)
137 140 """
138 141
139 142 new_value = ast.literal_eval(value)
140 143
141 144 if type(new_value) not in (tuple, list):
142 145 new_value = [float(new_value)]
143 146
144 147 self.__formated_value = new_value
145 148
146 149 return self.__formated_value
147 150
148 151 if format == 'date':
149 152 strList = value.split('/')
150 153 intList = [int(x) for x in strList]
151 154 date = datetime.date(intList[0], intList[1], intList[2])
152 155
153 156 self.__formated_value = date
154 157
155 158 return self.__formated_value
156 159
157 160 if format == 'time':
158 161 strList = value.split(':')
159 162 intList = [int(x) for x in strList]
160 163 time = datetime.time(intList[0], intList[1], intList[2])
161 164
162 165 self.__formated_value = time
163 166
164 167 return self.__formated_value
165 168
166 169 if format == 'pairslist':
167 170 """
168 171 Example:
169 172 value = (0,1),(1,2)
170 173 """
171 174
172 175 new_value = ast.literal_eval(value)
173 176
174 177 if type(new_value) not in (tuple, list):
175 178 raise ValueError, "%s has to be a tuple or list of pairs" %value
176 179
177 180 if type(new_value[0]) not in (tuple, list):
178 181 if len(new_value) != 2:
179 182 raise ValueError, "%s has to be a tuple or list of pairs" %value
180 183 new_value = [new_value]
181 184
182 185 for thisPair in new_value:
183 186 if len(thisPair) != 2:
184 187 raise ValueError, "%s has to be a tuple or list of pairs" %value
185 188
186 189 self.__formated_value = new_value
187 190
188 191 return self.__formated_value
189 192
190 193 if format == 'multilist':
191 194 """
192 195 Example:
193 196 value = (0,1,2),(3,4,5)
194 197 """
195 198 multiList = ast.literal_eval(value)
196 199
197 200 if type(multiList[0]) == int:
198 201 multiList = ast.literal_eval("(" + value + ")")
199 202
200 203 self.__formated_value = multiList
201 204
202 205 return self.__formated_value
203 206
204 207 if format == 'bool':
205 208 value = int(value)
206 209
207 210 if format == 'int':
208 211 value = float(value)
209 212
210 213 format_func = eval(format)
211 214
212 215 self.__formated_value = format_func(value)
213 216
214 217 return self.__formated_value
215 218
216 219 def updateId(self, new_id):
217 220
218 221 self.id = str(new_id)
219 222
220 223 def setup(self, id, name, value, format='str'):
221 224
222 225 self.id = str(id)
223 226 self.name = name
224 227 if format == 'obj':
225 228 self.value = value
226 229 else:
227 230 self.value = str(value)
228 231 self.format = str.lower(format)
229 232
230 233 self.getValue()
231 234
232 235 return 1
233 236
234 237 def update(self, name, value, format='str'):
235 238
236 239 self.name = name
237 240 self.value = str(value)
238 241 self.format = format
239 242
240 243 def makeXml(self, opElement):
241 244
242 245 parmElement = SubElement(opElement, self.ELEMENTNAME)
243 246 parmElement.set('id', str(self.id))
244 247 parmElement.set('name', self.name)
245 248 parmElement.set('value', self.value)
246 249 parmElement.set('format', self.format)
247 250
248 251 def readXml(self, parmElement):
249 252
250 253 self.id = parmElement.get('id')
251 254 self.name = parmElement.get('name')
252 255 self.value = parmElement.get('value')
253 256 self.format = str.lower(parmElement.get('format'))
254 257
255 258 #Compatible with old signal chain version
256 259 if self.format == 'int' and self.name == 'idfigure':
257 260 self.name = 'id'
258 261
259 262 def printattr(self):
260 263
261 264 print "Parameter[%s]: name = %s, value = %s, format = %s" %(self.id, self.name, self.value, self.format)
262 265
263 266 class OperationConf():
264 267
265 268 id = None
266 269 name = None
267 270 priority = None
268 271 type = None
269 272
270 273 parmConfObjList = []
271 274
272 275 ELEMENTNAME = 'Operation'
273 276
274 277 def __init__(self):
275 278
276 279 self.id = '0'
277 280 self.name = None
278 281 self.priority = None
279 282 self.type = 'self'
280 283
281 284
282 285 def __getNewId(self):
283 286
284 287 return int(self.id)*10 + len(self.parmConfObjList) + 1
285 288
286 289 def updateId(self, new_id):
287 290
288 291 self.id = str(new_id)
289 292
290 293 n = 1
291 294 for parmObj in self.parmConfObjList:
292 295
293 296 idParm = str(int(new_id)*10 + n)
294 297 parmObj.updateId(idParm)
295 298
296 299 n += 1
297 300
298 301 def getElementName(self):
299 302
300 303 return self.ELEMENTNAME
301 304
302 305 def getParameterObjList(self):
303 306
304 307 return self.parmConfObjList
305 308
306 309 def getParameterObj(self, parameterName):
307 310
308 311 for parmConfObj in self.parmConfObjList:
309 312
310 313 if parmConfObj.name != parameterName:
311 314 continue
312 315
313 316 return parmConfObj
314 317
315 318 return None
316 319
317 320 def getParameterObjfromValue(self, parameterValue):
318 321
319 322 for parmConfObj in self.parmConfObjList:
320 323
321 324 if parmConfObj.getValue() != parameterValue:
322 325 continue
323 326
324 327 return parmConfObj.getValue()
325 328
326 329 return None
327 330
328 331 def getParameterValue(self, parameterName):
329 332
330 333 parameterObj = self.getParameterObj(parameterName)
331 334
332 335 # if not parameterObj:
333 336 # return None
334 337
335 338 value = parameterObj.getValue()
336 339
337 340 return value
338 341
339 342
340 343 def getKwargs(self):
341 344
342 345 kwargs = {}
343 346
344 347 for parmConfObj in self.parmConfObjList:
345 348 if self.name == 'run' and parmConfObj.name == 'datatype':
346 349 continue
347 350
348 351 kwargs[parmConfObj.name] = parmConfObj.getValue()
349 352
350 353 return kwargs
351 354
352 355 def setup(self, id, name, priority, type):
353 356
354 357 self.id = str(id)
355 358 self.name = name
356 359 self.type = type
357 360 self.priority = priority
358 361
359 362 self.parmConfObjList = []
360 363
361 364 def removeParameters(self):
362 365
363 366 for obj in self.parmConfObjList:
364 367 del obj
365 368
366 369 self.parmConfObjList = []
367 370
368 371 def addParameter(self, name, value, format='str'):
369 372
370 373 id = self.__getNewId()
371 374
372 375 parmConfObj = ParameterConf()
373 376 if not parmConfObj.setup(id, name, value, format):
374 377 return None
375 378
376 379 self.parmConfObjList.append(parmConfObj)
377 380
378 381 return parmConfObj
379 382
380 383 def changeParameter(self, name, value, format='str'):
381 384
382 385 parmConfObj = self.getParameterObj(name)
383 386 parmConfObj.update(name, value, format)
384 387
385 388 return parmConfObj
386 389
387 390 def makeXml(self, procUnitElement):
388 391
389 392 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
390 393 opElement.set('id', str(self.id))
391 394 opElement.set('name', self.name)
392 395 opElement.set('type', self.type)
393 396 opElement.set('priority', str(self.priority))
394 397
395 398 for parmConfObj in self.parmConfObjList:
396 399 parmConfObj.makeXml(opElement)
397 400
398 401 def readXml(self, opElement):
399 402
400 403 self.id = opElement.get('id')
401 404 self.name = opElement.get('name')
402 405 self.type = opElement.get('type')
403 406 self.priority = opElement.get('priority')
404 407
405 408 #Compatible with old signal chain version
406 409 #Use of 'run' method instead 'init'
407 410 if self.type == 'self' and self.name == 'init':
408 411 self.name = 'run'
409 412
410 413 self.parmConfObjList = []
411 414
412 415 parmElementList = opElement.iter(ParameterConf().getElementName())
413 416
414 417 for parmElement in parmElementList:
415 418 parmConfObj = ParameterConf()
416 419 parmConfObj.readXml(parmElement)
417 420
418 421 #Compatible with old signal chain version
419 422 #If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
420 423 if self.type != 'self' and self.name == 'Plot':
421 424 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
422 425 self.name = parmConfObj.value
423 426 continue
424 427
425 428 self.parmConfObjList.append(parmConfObj)
426 429
427 430 def printattr(self):
428 431
429 432 print "%s[%s]: name = %s, type = %s, priority = %s" %(self.ELEMENTNAME,
430 433 self.id,
431 434 self.name,
432 435 self.type,
433 436 self.priority)
434 437
435 438 for parmConfObj in self.parmConfObjList:
436 439 parmConfObj.printattr()
437 440
438 441 def createObject(self, plotter_queue=None):
439 442
440 443 if self.type == 'self':
441 444 raise ValueError, "This operation type cannot be created"
442 445
443 446 if self.type == 'plotter':
444 447 #Plotter(plotter_name)
445 448 if not plotter_queue:
446 449 raise ValueError, "plotter_queue is not defined. Use:\nmyProject = Project()\nmyProject.setPlotterQueue(plotter_queue)"
447 450
448 451 opObj = Plotter(self.name, plotter_queue)
449 452
450 453 if self.type == 'external' or self.type == 'other':
451 454 className = eval(self.name)
452 455 kwargs = self.getKwargs()
453 456 opObj = className(**kwargs)
454 457
455 458 return opObj
456 459
457 460
458 461 class ProcUnitConf():
459 462
460 463 id = None
461 464 name = None
462 465 datatype = None
463 466 inputId = None
464 467 parentId = None
465 468
466 469 opConfObjList = []
467 470
468 471 procUnitObj = None
469 472 opObjList = []
470 473
471 474 ELEMENTNAME = 'ProcUnit'
472 475
473 476 def __init__(self):
474 477
475 478 self.id = None
476 479 self.datatype = None
477 480 self.name = None
478 481 self.inputId = None
479 482
480 483 self.opConfObjList = []
481 484
482 485 self.procUnitObj = None
483 486 self.opObjDict = {}
484 487
485 488 def __getPriority(self):
486 489
487 490 return len(self.opConfObjList)+1
488 491
489 492 def __getNewId(self):
490 493
491 494 return int(self.id)*10 + len(self.opConfObjList) + 1
492 495
493 496 def getElementName(self):
494 497
495 498 return self.ELEMENTNAME
496 499
497 500 def getId(self):
498 501
499 502 return self.id
500 503
501 504 def updateId(self, new_id, parentId=parentId):
502 505
503 506
504 507 new_id = int(parentId)*10 + (int(self.id) % 10)
505 508 new_inputId = int(parentId)*10 + (int(self.inputId) % 10)
506 509
507 510 #If this proc unit has not inputs
508 511 if self.inputId == '0':
509 512 new_inputId = 0
510 513
511 514 n = 1
512 515 for opConfObj in self.opConfObjList:
513 516
514 517 idOp = str(int(new_id)*10 + n)
515 518 opConfObj.updateId(idOp)
516 519
517 520 n += 1
518 521
519 522 self.parentId = str(parentId)
520 523 self.id = str(new_id)
521 524 self.inputId = str(new_inputId)
522 525
523 526
524 527 def getInputId(self):
525 528
526 529 return self.inputId
527 530
528 531 def getOperationObjList(self):
529 532
530 533 return self.opConfObjList
531 534
532 535 def getOperationObj(self, name=None):
533 536
534 537 for opConfObj in self.opConfObjList:
535 538
536 539 if opConfObj.name != name:
537 540 continue
538 541
539 542 return opConfObj
540 543
541 544 return None
542 545
543 546 def getOpObjfromParamValue(self, value=None):
544 547
545 548 for opConfObj in self.opConfObjList:
546 549 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
547 550 continue
548 551 return opConfObj
549 552 return None
550 553
551 554 def getProcUnitObj(self):
552 555
553 556 return self.procUnitObj
554 557
555 558 def setup(self, id, name, datatype, inputId, parentId=None):
556 559
557 560 #Compatible with old signal chain version
558 561 if datatype==None and name==None:
559 562 raise ValueError, "datatype or name should be defined"
560 563
561 564 if name==None:
562 565 if 'Proc' in datatype:
563 566 name = datatype
564 567 else:
565 568 name = '%sProc' %(datatype)
566 569
567 570 if datatype==None:
568 571 datatype = name.replace('Proc','')
569 572
570 573 self.id = str(id)
571 574 self.name = name
572 575 self.datatype = datatype
573 576 self.inputId = inputId
574 577 self.parentId = parentId
575 578
576 579 self.opConfObjList = []
577 580
578 581 self.addOperation(name='run', optype='self')
579 582
580 583 def removeOperations(self):
581 584
582 585 for obj in self.opConfObjList:
583 586 del obj
584 587
585 588 self.opConfObjList = []
586 589 self.addOperation(name='run')
587 590
588 591 def addParameter(self, **kwargs):
589 592 '''
590 593 Add parameters to "run" operation
591 594 '''
592 595 opObj = self.opConfObjList[0]
593 596
594 597 opObj.addParameter(**kwargs)
595 598
596 599 return opObj
597 600
598 601 def addOperation(self, name, optype='self'):
599 602
600 603 id = self.__getNewId()
601 604 priority = self.__getPriority()
602 605
603 606 opConfObj = OperationConf()
604 607 opConfObj.setup(id, name=name, priority=priority, type=optype)
605 608
606 609 self.opConfObjList.append(opConfObj)
607 610
608 611 return opConfObj
609 612
610 613 def makeXml(self, projectElement):
611 614
612 615 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
613 616 procUnitElement.set('id', str(self.id))
614 617 procUnitElement.set('name', self.name)
615 618 procUnitElement.set('datatype', self.datatype)
616 619 procUnitElement.set('inputId', str(self.inputId))
617 620
618 621 for opConfObj in self.opConfObjList:
619 622 opConfObj.makeXml(procUnitElement)
620 623
621 624 def readXml(self, upElement):
622 625
623 626 self.id = upElement.get('id')
624 627 self.name = upElement.get('name')
625 628 self.datatype = upElement.get('datatype')
626 629 self.inputId = upElement.get('inputId')
627 630
628 631 if self.ELEMENTNAME == "ReadUnit":
629 632 self.datatype = self.datatype.replace("Reader", "")
630 633
631 634 if self.ELEMENTNAME == "ProcUnit":
632 635 self.datatype = self.datatype.replace("Proc", "")
633 636
634 637 if self.inputId == 'None':
635 638 self.inputId = '0'
636 639
637 640 self.opConfObjList = []
638 641
639 642 opElementList = upElement.iter(OperationConf().getElementName())
640 643
641 644 for opElement in opElementList:
642 645 opConfObj = OperationConf()
643 646 opConfObj.readXml(opElement)
644 647 self.opConfObjList.append(opConfObj)
645 648
646 649 def printattr(self):
647 650
648 651 print "%s[%s]: name = %s, datatype = %s, inputId = %s" %(self.ELEMENTNAME,
649 652 self.id,
650 653 self.name,
651 654 self.datatype,
652 655 self.inputId)
653 656
654 657 for opConfObj in self.opConfObjList:
655 658 opConfObj.printattr()
656 659
657 660
658 661 def getKwargs(self):
659 662
660 663 opObj = self.opConfObjList[0]
661 664 kwargs = opObj.getKwargs()
662 665
663 666 return kwargs
664 667
665 668 def createObjects(self, plotter_queue=None):
666 669
667 670 className = eval(self.name)
668 671 kwargs = self.getKwargs()
669 672 procUnitObj = className(**kwargs)
670 673
671 674 for opConfObj in self.opConfObjList:
672 675
673 676 if opConfObj.type == 'self':
674 677 continue
675 678
676 679 opObj = opConfObj.createObject(plotter_queue)
677 680
678 681 self.opObjDict[opConfObj.id] = opObj
679 682 procUnitObj.addOperation(opObj, opConfObj.id)
680 683
681 684 self.procUnitObj = procUnitObj
682 685
683 686 return procUnitObj
684 687
685 688 def run(self):
686 689
687 690 is_ok = False
688 691
689 692 for opConfObj in self.opConfObjList:
690 693
691 694 kwargs = {}
692 695 for parmConfObj in opConfObj.getParameterObjList():
693 696 if opConfObj.name == 'run' and parmConfObj.name == 'datatype':
694 697 continue
695 698
696 699 kwargs[parmConfObj.name] = parmConfObj.getValue()
697 700
698 701 #ini = time.time()
699 702
700 703 #print "\tRunning the '%s' operation with %s" %(opConfObj.name, opConfObj.id)
701 704 sts = self.procUnitObj.call(opType = opConfObj.type,
702 705 opName = opConfObj.name,
703 706 opId = opConfObj.id,
704 707 )
705 708
706 709 # total_time = time.time() - ini
707 710 #
708 711 # if total_time > 0.002:
709 712 # print "%s::%s took %f seconds" %(self.name, opConfObj.name, total_time)
710 713
711 714 is_ok = is_ok or sts
712 715
713 716 return is_ok
714 717
715 718 def close(self):
716 719
717 720 for opConfObj in self.opConfObjList:
718 721 if opConfObj.type == 'self':
719 722 continue
720 723
721 724 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
722 725 opObj.close()
723 726
724 727 self.procUnitObj.close()
725 728
726 729 return
727 730
728 731 class ReadUnitConf(ProcUnitConf):
729 732
730 733 path = None
731 734 startDate = None
732 735 endDate = None
733 736 startTime = None
734 737 endTime = None
735 738
736 739 ELEMENTNAME = 'ReadUnit'
737 740
738 741 def __init__(self):
739 742
740 743 self.id = None
741 744 self.datatype = None
742 745 self.name = None
743 746 self.inputId = None
744 747
745 748 self.parentId = None
746 749
747 750 self.opConfObjList = []
748 751 self.opObjList = []
749 752
750 753 def getElementName(self):
751 754
752 755 return self.ELEMENTNAME
753 756
754 757 def setup(self, id, name, datatype, path, startDate="", endDate="", startTime="", endTime="", parentId=None, queue=None, **kwargs):
755 758
756 759 #Compatible with old signal chain version
757 760 if datatype==None and name==None:
758 761 raise ValueError, "datatype or name should be defined"
759 762
760 763 if name==None:
761 764 if 'Reader' in datatype:
762 765 name = datatype
763 766 else:
764 767 name = '%sReader' %(datatype)
765 768
766 769 if datatype==None:
767 770 datatype = name.replace('Reader','')
768 771
769 772 self.id = id
770 773 self.name = name
771 774 self.datatype = datatype
772 775
773 776 self.path = os.path.abspath(path)
774 777 self.startDate = startDate
775 778 self.endDate = endDate
776 779 self.startTime = startTime
777 780 self.endTime = endTime
778 781
779 782 self.inputId = '0'
780 783 self.parentId = parentId
781 784 self.queue = queue
782 785 self.addRunOperation(**kwargs)
783 786
784 787 def update(self, datatype, path, startDate, endDate, startTime, endTime, parentId=None, name=None, **kwargs):
785 788
786 789 #Compatible with old signal chain version
787 790 if datatype==None and name==None:
788 791 raise ValueError, "datatype or name should be defined"
789 792
790 793 if name==None:
791 794 if 'Reader' in datatype:
792 795 name = datatype
793 796 else:
794 797 name = '%sReader' %(datatype)
795 798
796 799 if datatype==None:
797 800 datatype = name.replace('Reader','')
798 801
799 802 self.datatype = datatype
800 803 self.name = name
801 804 self.path = path
802 805 self.startDate = startDate
803 806 self.endDate = endDate
804 807 self.startTime = startTime
805 808 self.endTime = endTime
806 809
807 810 self.inputId = '0'
808 811 self.parentId = parentId
809 812
810 813 self.updateRunOperation(**kwargs)
811 814
812 815 def removeOperations(self):
813 816
814 817 for obj in self.opConfObjList:
815 818 del obj
816 819
817 820 self.opConfObjList = []
818 821
819 822 def addRunOperation(self, **kwargs):
820 823
821 824 opObj = self.addOperation(name = 'run', optype = 'self')
822 825
823 826 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
824 827 opObj.addParameter(name='path' , value=self.path, format='str')
825 828 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
826 829 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
827 830 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
828 831 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
832 opObj.addParameter(name='queue' , value=self.queue, format='obj')
829 833
830 834 for key, value in kwargs.items():
831 835 opObj.addParameter(name=key, value=value, format=type(value).__name__)
832 836
833 837 return opObj
834 838
835 839 def updateRunOperation(self, **kwargs):
836 840
837 841 opObj = self.getOperationObj(name = 'run')
838 842 opObj.removeParameters()
839 843
840 844 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
841 845 opObj.addParameter(name='path' , value=self.path, format='str')
842 846 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
843 847 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
844 848 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
845 849 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
846 850
847 851 for key, value in kwargs.items():
848 852 opObj.addParameter(name=key, value=value, format=type(value).__name__)
849 853
850 854 return opObj
851 855
852 856 # def makeXml(self, projectElement):
853 857 #
854 858 # procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
855 859 # procUnitElement.set('id', str(self.id))
856 860 # procUnitElement.set('name', self.name)
857 861 # procUnitElement.set('datatype', self.datatype)
858 862 # procUnitElement.set('inputId', str(self.inputId))
859 863 #
860 864 # for opConfObj in self.opConfObjList:
861 865 # opConfObj.makeXml(procUnitElement)
862 866
863 867 def readXml(self, upElement):
864 868
865 869 self.id = upElement.get('id')
866 870 self.name = upElement.get('name')
867 871 self.datatype = upElement.get('datatype')
868 872 self.inputId = upElement.get('inputId')
869 873
870 874 if self.ELEMENTNAME == "ReadUnit":
871 875 self.datatype = self.datatype.replace("Reader", "")
872 876
873 877 if self.inputId == 'None':
874 878 self.inputId = '0'
875 879
876 880 self.opConfObjList = []
877 881
878 882 opElementList = upElement.iter(OperationConf().getElementName())
879 883
880 884 for opElement in opElementList:
881 885 opConfObj = OperationConf()
882 886 opConfObj.readXml(opElement)
883 887 self.opConfObjList.append(opConfObj)
884 888
885 889 if opConfObj.name == 'run':
886 890 self.path = opConfObj.getParameterValue('path')
887 891 self.startDate = opConfObj.getParameterValue('startDate')
888 892 self.endDate = opConfObj.getParameterValue('endDate')
889 893 self.startTime = opConfObj.getParameterValue('startTime')
890 894 self.endTime = opConfObj.getParameterValue('endTime')
891 895
892 896 class Project():
893 897
894 898 id = None
895 899 name = None
896 900 description = None
897 901 filename = None
898 902
899 903 procUnitConfObjDict = None
900 904
901 905 ELEMENTNAME = 'Project'
902 906
903 907 plotterQueue = None
904 908
905 909 def __init__(self, plotter_queue=None):
906 910
907 911 self.id = None
908 912 self.name = None
909 913 self.description = None
910 914
911 915 self.plotterQueue = plotter_queue
912 916
913 917 self.procUnitConfObjDict = {}
914 918
915 919 def __getNewId(self):
916 920
917 921 idList = self.procUnitConfObjDict.keys()
918 922
919 923 id = int(self.id)*10
920 924
921 925 while True:
922 926 id += 1
923 927
924 928 if str(id) in idList:
925 929 continue
926 930
927 931 break
928 932
929 933 return str(id)
930 934
931 935 def getElementName(self):
932 936
933 937 return self.ELEMENTNAME
934 938
935 939 def getId(self):
936 940
937 941 return self.id
938 942
939 943 def updateId(self, new_id):
940 944
941 945 self.id = str(new_id)
942 946
943 947 keyList = self.procUnitConfObjDict.keys()
944 948 keyList.sort()
945 949
946 950 n = 1
947 951 newProcUnitConfObjDict = {}
948 952
949 953 for procKey in keyList:
950 954
951 955 procUnitConfObj = self.procUnitConfObjDict[procKey]
952 956 idProcUnit = str(int(self.id)*10 + n)
953 957 procUnitConfObj.updateId(idProcUnit, parentId = self.id)
954 958
955 959 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
956 960 n += 1
957 961
958 962 self.procUnitConfObjDict = newProcUnitConfObjDict
959 963
960 964 def setup(self, id, name, description):
961 965
962 966 self.id = str(id)
963 967 self.name = name
964 968 self.description = description
965 969
966 970 def update(self, name, description):
967 971
968 972 self.name = name
969 973 self.description = description
970 974
971 975 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
972 976
973 977 if id is None:
974 978 idReadUnit = self.__getNewId()
975 979 else:
976 980 idReadUnit = str(id)
977 981
978 982 readUnitConfObj = ReadUnitConf()
979 983 readUnitConfObj.setup(idReadUnit, name, datatype, parentId=self.id, **kwargs)
980 984
981 985 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
982 986
983 987 return readUnitConfObj
984 988
985 989 def addProcUnit(self, inputId='0', datatype=None, name=None):
986 990
987 991 idProcUnit = self.__getNewId()
988 992
989 993 procUnitConfObj = ProcUnitConf()
990 994 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, parentId=self.id)
991 995
992 996 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
993 997
994 998 return procUnitConfObj
995 999
996 1000 def removeProcUnit(self, id):
997 1001
998 1002 if id in self.procUnitConfObjDict.keys():
999 1003 self.procUnitConfObjDict.pop(id)
1000 1004
1001 1005 def getReadUnitId(self):
1002 1006
1003 1007 readUnitConfObj = self.getReadUnitObj()
1004 1008
1005 1009 return readUnitConfObj.id
1006 1010
1007 1011 def getReadUnitObj(self):
1008 1012
1009 1013 for obj in self.procUnitConfObjDict.values():
1010 1014 if obj.getElementName() == "ReadUnit":
1011 1015 return obj
1012 1016
1013 1017 return None
1014 1018
1015 1019 def getProcUnitObj(self, id=None, name=None):
1016 1020
1017 1021 if id != None:
1018 1022 return self.procUnitConfObjDict[id]
1019 1023
1020 1024 if name != None:
1021 1025 return self.getProcUnitObjByName(name)
1022 1026
1023 1027 return None
1024 1028
1025 1029 def getProcUnitObjByName(self, name):
1026 1030
1027 1031 for obj in self.procUnitConfObjDict.values():
1028 1032 if obj.name == name:
1029 1033 return obj
1030 1034
1031 1035 return None
1032 1036
1033 1037 def procUnitItems(self):
1034 1038
1035 1039 return self.procUnitConfObjDict.items()
1036 1040
1037 1041 def makeXml(self):
1038 1042
1039 1043 projectElement = Element('Project')
1040 1044 projectElement.set('id', str(self.id))
1041 1045 projectElement.set('name', self.name)
1042 1046 projectElement.set('description', self.description)
1043 1047
1044 1048 for procUnitConfObj in self.procUnitConfObjDict.values():
1045 1049 procUnitConfObj.makeXml(projectElement)
1046 1050
1047 1051 self.projectElement = projectElement
1048 1052
1049 1053 def writeXml(self, filename=None):
1050 1054
1051 1055 if filename == None:
1052 1056 if self.filename:
1053 1057 filename = self.filename
1054 1058 else:
1055 1059 filename = "schain.xml"
1056 1060
1057 1061 if not filename:
1058 1062 print "filename has not been defined. Use setFilename(filename) for do it."
1059 1063 return 0
1060 1064
1061 1065 abs_file = os.path.abspath(filename)
1062 1066
1063 1067 if not os.access(os.path.dirname(abs_file), os.W_OK):
1064 1068 print "No write permission on %s" %os.path.dirname(abs_file)
1065 1069 return 0
1066 1070
1067 1071 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1068 1072 print "File %s already exists and it could not be overwriten" %abs_file
1069 1073 return 0
1070 1074
1071 1075 self.makeXml()
1072 1076
1073 1077 ElementTree(self.projectElement).write(abs_file, method='xml')
1074 1078
1075 1079 self.filename = abs_file
1076 1080
1077 1081 return 1
1078 1082
1079 1083 def readXml(self, filename = None):
1080 1084
1081 1085 if not filename:
1082 1086 print "filename is not defined"
1083 1087 return 0
1084 1088
1085 1089 abs_file = os.path.abspath(filename)
1086 1090
1087 1091 if not os.path.isfile(abs_file):
1088 1092 print "%s file does not exist" %abs_file
1089 1093 return 0
1090 1094
1091 1095 self.projectElement = None
1092 1096 self.procUnitConfObjDict = {}
1093 1097
1094 1098 try:
1095 1099 self.projectElement = ElementTree().parse(abs_file)
1096 1100 except:
1097 1101 print "Error reading %s, verify file format" %filename
1098 1102 return 0
1099 1103
1100 1104 self.project = self.projectElement.tag
1101 1105
1102 1106 self.id = self.projectElement.get('id')
1103 1107 self.name = self.projectElement.get('name')
1104 1108 self.description = self.projectElement.get('description')
1105 1109
1106 1110 readUnitElementList = self.projectElement.iter(ReadUnitConf().getElementName())
1107 1111
1108 1112 for readUnitElement in readUnitElementList:
1109 1113 readUnitConfObj = ReadUnitConf()
1110 1114 readUnitConfObj.readXml(readUnitElement)
1111 1115
1112 1116 if readUnitConfObj.parentId == None:
1113 1117 readUnitConfObj.parentId = self.id
1114 1118
1115 1119 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1116 1120
1117 1121 procUnitElementList = self.projectElement.iter(ProcUnitConf().getElementName())
1118 1122
1119 1123 for procUnitElement in procUnitElementList:
1120 1124 procUnitConfObj = ProcUnitConf()
1121 1125 procUnitConfObj.readXml(procUnitElement)
1122 1126
1123 1127 if procUnitConfObj.parentId == None:
1124 1128 procUnitConfObj.parentId = self.id
1125 1129
1126 1130 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1127 1131
1128 1132 self.filename = abs_file
1129 1133
1130 1134 return 1
1131 1135
1132 1136 def printattr(self):
1133 1137
1134 1138 print "Project[%s]: name = %s, description = %s" %(self.id,
1135 1139 self.name,
1136 1140 self.description)
1137 1141
1138 1142 for procUnitConfObj in self.procUnitConfObjDict.values():
1139 1143 procUnitConfObj.printattr()
1140 1144
1141 1145 def createObjects(self):
1142 1146
1143 1147 for procUnitConfObj in self.procUnitConfObjDict.values():
1144 1148 procUnitConfObj.createObjects(self.plotterQueue)
1145 1149
1146 1150 def __connect(self, objIN, thisObj):
1147 1151
1148 1152 thisObj.setInput(objIN.getOutputObj())
1149 1153
1150 1154 def connectObjects(self):
1151 1155
1152 1156 for thisPUConfObj in self.procUnitConfObjDict.values():
1153 1157
1154 1158 inputId = thisPUConfObj.getInputId()
1155 1159
1156 1160 if int(inputId) == 0:
1157 1161 continue
1158 1162
1159 1163 #Get input object
1160 1164 puConfINObj = self.procUnitConfObjDict[inputId]
1161 1165 puObjIN = puConfINObj.getProcUnitObj()
1162 1166
1163 1167 #Get current object
1164 1168 thisPUObj = thisPUConfObj.getProcUnitObj()
1165 1169
1166 1170 self.__connect(puObjIN, thisPUObj)
1167 1171
1168 1172 def __handleError(self, procUnitConfObj, send_email=True):
1169 1173
1170 1174 import socket
1171 1175
1172 1176 err = traceback.format_exception(sys.exc_info()[0],
1173 1177 sys.exc_info()[1],
1174 1178 sys.exc_info()[2])
1175 1179
1176 1180 print "***** Error occurred in %s *****" %(procUnitConfObj.name)
1177 1181 print "***** %s" %err[-1]
1178 1182
1179 1183 message = "".join(err)
1180 1184
1181 1185 sys.stderr.write(message)
1182 1186
1183 1187 if not send_email:
1184 1188 return
1185 1189
1186 1190 subject = "SChain v%s: Error running %s\n" %(schainpy.__version__, procUnitConfObj.name)
1187 1191
1188 1192 subtitle = "%s: %s\n" %(procUnitConfObj.getElementName() ,procUnitConfObj.name)
1189 1193 subtitle += "Hostname: %s\n" %socket.gethostbyname(socket.gethostname())
1190 1194 subtitle += "Working directory: %s\n" %os.path.abspath("./")
1191 1195 subtitle += "Configuration file: %s\n" %self.filename
1192 1196 subtitle += "Time: %s\n" %str(datetime.datetime.now())
1193 1197
1194 1198 readUnitConfObj = self.getReadUnitObj()
1195 1199 if readUnitConfObj:
1196 1200 subtitle += "\nInput parameters:\n"
1197 1201 subtitle += "[Data path = %s]\n" %readUnitConfObj.path
1198 1202 subtitle += "[Data type = %s]\n" %readUnitConfObj.datatype
1199 1203 subtitle += "[Start date = %s]\n" %readUnitConfObj.startDate
1200 1204 subtitle += "[End date = %s]\n" %readUnitConfObj.endDate
1201 1205 subtitle += "[Start time = %s]\n" %readUnitConfObj.startTime
1202 1206 subtitle += "[End time = %s]\n" %readUnitConfObj.endTime
1203 1207
1204 1208 adminObj = schainpy.admin.SchainNotify()
1205 1209 adminObj.sendAlert(message=message,
1206 1210 subject=subject,
1207 1211 subtitle=subtitle,
1208 1212 filename=self.filename)
1209 1213
1210 1214 def isPaused(self):
1211 1215 return 0
1212 1216
1213 1217 def isStopped(self):
1214 1218 return 0
1215 1219
1216 1220 def runController(self):
1217 1221 """
1218 1222 returns 0 when this process has been stopped, 1 otherwise
1219 1223 """
1220 1224
1221 1225 if self.isPaused():
1222 1226 print "Process suspended"
1223 1227
1224 1228 while True:
1225 1229 sleep(0.1)
1226 1230
1227 1231 if not self.isPaused():
1228 1232 break
1229 1233
1230 1234 if self.isStopped():
1231 1235 break
1232 1236
1233 1237 print "Process reinitialized"
1234 1238
1235 1239 if self.isStopped():
1236 1240 print "Process stopped"
1237 1241 return 0
1238 1242
1239 1243 return 1
1240 1244
1241 1245 def setFilename(self, filename):
1242 1246
1243 1247 self.filename = filename
1244 1248
1245 1249 def setPlotterQueue(self, plotter_queue):
1246 1250
1247 1251 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1248 1252
1249 1253 def getPlotterQueue(self):
1250 1254
1251 1255 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1252 1256
1253 1257 def useExternalPlotter(self):
1254 1258
1255 1259 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1256 1260
1257 1261 def run(self):
1258 1262
1259 1263 print
1260 1264 print "*"*60
1261 1265 print " Starting SIGNAL CHAIN PROCESSING v%s " %schainpy.__version__
1262 1266 print "*"*60
1263 1267 print
1264 1268
1265 1269 keyList = self.procUnitConfObjDict.keys()
1266 1270 keyList.sort()
1267 1271
1268 1272 while(True):
1269 1273
1270 1274 is_ok = False
1271 1275
1272 1276 for procKey in keyList:
1273 1277 # print "Running the '%s' process with %s" %(procUnitConfObj.name, procUnitConfObj.id)
1274 1278
1275 1279 procUnitConfObj = self.procUnitConfObjDict[procKey]
1276 1280
1277 1281 try:
1278 1282 sts = procUnitConfObj.run()
1279 1283 is_ok = is_ok or sts
1280 1284 except KeyboardInterrupt:
1281 1285 is_ok = False
1282 1286 break
1283 1287 except ValueError, e:
1284 1288 sleep(0.5)
1285 1289 self.__handleError(procUnitConfObj, send_email=True)
1286 1290 is_ok = False
1287 1291 break
1288 1292 except:
1289 1293 sleep(0.5)
1290 1294 self.__handleError(procUnitConfObj)
1291 1295 is_ok = False
1292 1296 break
1293 1297
1294 1298 #If every process unit finished so end process
1295 1299 if not(is_ok):
1296 1300 # print "Every process unit have finished"
1297 1301 break
1298 1302
1299 1303 if not self.runController():
1300 1304 break
1301 1305
1302 1306 #Closing every process
1303 1307 for procKey in keyList:
1304 1308 procUnitConfObj = self.procUnitConfObjDict[procKey]
1305 1309 procUnitConfObj.close()
1306 1310
1307 1311 print "Process finished"
1308 1312
1309 1313 def start(self):
1310 1314
1311 1315 self.writeXml()
1312 1316 self.createObjects()
1313 1317 self.connectObjects()
1314 1318 self.run()
General Comments 0
You need to be logged in to leave comments. Login now