##// END OF EJS Templates
quitado el logger
José Chávez -
r1049:5338a74c35d8
parent child
Show More
@@ -1,1330 +1,1324
1 1 '''
2 2 Created on September , 2012
3 3 @author:
4 4 '''
5 5
6 6 import sys
7
8 # save_stdout = sys.stdout
9 # logToFile = newLogger()
10 # sys.stdout = logToFile
11 # sys.stderr = logToFile
12
13 7 import ast
14 8 import datetime
15 9 import traceback
16 10 import math
17 11 import time
18 12 from multiprocessing import Process, Queue, cpu_count
19 13
20 14 import schainpy
21 15 import schainpy.admin
22 16 from schainpy.utils.log import logToFile
23 17
24 18 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
25 19 from xml.dom import minidom
26 20
27 21 from schainpy.model import *
28 22 from time import sleep
29 23
30 24
31 25
32 26 def prettify(elem):
33 27 """Return a pretty-printed XML string for the Element.
34 28 """
35 29 rough_string = tostring(elem, 'utf-8')
36 30 reparsed = minidom.parseString(rough_string)
37 31 return reparsed.toprettyxml(indent=" ")
38 32
39 33 def multiSchain(child, nProcess=cpu_count(), startDate=None, endDate=None, by_day=False):
40 34 skip = 0
41 35 cursor = 0
42 36 nFiles = None
43 37 processes = []
44 38 dt1 = datetime.datetime.strptime(startDate, '%Y/%m/%d')
45 39 dt2 = datetime.datetime.strptime(endDate, '%Y/%m/%d')
46 40 days = (dt2 - dt1).days
47 41
48 42 for day in range(days+1):
49 43 skip = 0
50 44 cursor = 0
51 45 q = Queue()
52 46 processes = []
53 47 dt = (dt1 + datetime.timedelta(day)).strftime('%Y/%m/%d')
54 48 firstProcess = Process(target=child, args=(cursor, skip, q, dt))
55 49 firstProcess.start()
56 50 if by_day:
57 51 continue
58 52 nFiles = q.get()
59 53 if nFiles==0:
60 54 continue
61 55 firstProcess.terminate()
62 56 skip = int(math.ceil(nFiles/nProcess))
63 57 while True:
64 58 processes.append(Process(target=child, args=(cursor, skip, q, dt)))
65 59 processes[cursor].start()
66 60 if nFiles < cursor*skip:
67 61 break
68 62 cursor += 1
69 63
70 64 def beforeExit(exctype, value, trace):
71 65 for process in processes:
72 66 process.terminate()
73 67 process.join()
74 68 print traceback.print_tb(trace)
75 69
76 70 sys.excepthook = beforeExit
77 71
78 72 for process in processes:
79 73 process.join()
80 74 process.terminate()
81 75
82 76 time.sleep(3)
83 77
84 78
85 79 class ParameterConf():
86 80
87 81 id = None
88 82 name = None
89 83 value = None
90 84 format = None
91 85
92 86 __formated_value = None
93 87
94 88 ELEMENTNAME = 'Parameter'
95 89
96 90 def __init__(self):
97 91
98 92 self.format = 'str'
99 93
100 94 def getElementName(self):
101 95
102 96 return self.ELEMENTNAME
103 97
104 98 def getValue(self):
105 99
106 100 value = self.value
107 101 format = self.format
108 102
109 103 if self.__formated_value != None:
110 104
111 105 return self.__formated_value
112 106
113 107 if format == 'obj':
114 108 return value
115 109
116 110 if format == 'str':
117 111 self.__formated_value = str(value)
118 112 return self.__formated_value
119 113
120 114 if value == '':
121 115 raise ValueError, "%s: This parameter value is empty" %self.name
122 116
123 117 if format == 'list':
124 118 strList = value.split(',')
125 119
126 120 self.__formated_value = strList
127 121
128 122 return self.__formated_value
129 123
130 124 if format == 'intlist':
131 125 """
132 126 Example:
133 127 value = (0,1,2)
134 128 """
135 129
136 130 new_value = ast.literal_eval(value)
137 131
138 132 if type(new_value) not in (tuple, list):
139 133 new_value = [int(new_value)]
140 134
141 135 self.__formated_value = new_value
142 136
143 137 return self.__formated_value
144 138
145 139 if format == 'floatlist':
146 140 """
147 141 Example:
148 142 value = (0.5, 1.4, 2.7)
149 143 """
150 144
151 145 new_value = ast.literal_eval(value)
152 146
153 147 if type(new_value) not in (tuple, list):
154 148 new_value = [float(new_value)]
155 149
156 150 self.__formated_value = new_value
157 151
158 152 return self.__formated_value
159 153
160 154 if format == 'date':
161 155 strList = value.split('/')
162 156 intList = [int(x) for x in strList]
163 157 date = datetime.date(intList[0], intList[1], intList[2])
164 158
165 159 self.__formated_value = date
166 160
167 161 return self.__formated_value
168 162
169 163 if format == 'time':
170 164 strList = value.split(':')
171 165 intList = [int(x) for x in strList]
172 166 time = datetime.time(intList[0], intList[1], intList[2])
173 167
174 168 self.__formated_value = time
175 169
176 170 return self.__formated_value
177 171
178 172 if format == 'pairslist':
179 173 """
180 174 Example:
181 175 value = (0,1),(1,2)
182 176 """
183 177
184 178 new_value = ast.literal_eval(value)
185 179
186 180 if type(new_value) not in (tuple, list):
187 181 raise ValueError, "%s has to be a tuple or list of pairs" %value
188 182
189 183 if type(new_value[0]) not in (tuple, list):
190 184 if len(new_value) != 2:
191 185 raise ValueError, "%s has to be a tuple or list of pairs" %value
192 186 new_value = [new_value]
193 187
194 188 for thisPair in new_value:
195 189 if len(thisPair) != 2:
196 190 raise ValueError, "%s has to be a tuple or list of pairs" %value
197 191
198 192 self.__formated_value = new_value
199 193
200 194 return self.__formated_value
201 195
202 196 if format == 'multilist':
203 197 """
204 198 Example:
205 199 value = (0,1,2),(3,4,5)
206 200 """
207 201 multiList = ast.literal_eval(value)
208 202
209 203 if type(multiList[0]) == int:
210 204 multiList = ast.literal_eval("(" + value + ")")
211 205
212 206 self.__formated_value = multiList
213 207
214 208 return self.__formated_value
215 209
216 210 if format == 'bool':
217 211 value = int(value)
218 212
219 213 if format == 'int':
220 214 value = float(value)
221 215
222 216 format_func = eval(format)
223 217
224 218 self.__formated_value = format_func(value)
225 219
226 220 return self.__formated_value
227 221
228 222 def updateId(self, new_id):
229 223
230 224 self.id = str(new_id)
231 225
232 226 def setup(self, id, name, value, format='str'):
233 227 self.id = str(id)
234 228 self.name = name
235 229 if format == 'obj':
236 230 self.value = value
237 231 else:
238 232 self.value = str(value)
239 233 self.format = str.lower(format)
240 234
241 235 self.getValue()
242 236
243 237 return 1
244 238
245 239 def update(self, name, value, format='str'):
246 240
247 241 self.name = name
248 242 self.value = str(value)
249 243 self.format = format
250 244
251 245 def makeXml(self, opElement):
252 246 if self.name not in ('queue',):
253 247 parmElement = SubElement(opElement, self.ELEMENTNAME)
254 248 parmElement.set('id', str(self.id))
255 249 parmElement.set('name', self.name)
256 250 parmElement.set('value', self.value)
257 251 parmElement.set('format', self.format)
258 252
259 253 def readXml(self, parmElement):
260 254
261 255 self.id = parmElement.get('id')
262 256 self.name = parmElement.get('name')
263 257 self.value = parmElement.get('value')
264 258 self.format = str.lower(parmElement.get('format'))
265 259
266 260 #Compatible with old signal chain version
267 261 if self.format == 'int' and self.name == 'idfigure':
268 262 self.name = 'id'
269 263
270 264 def printattr(self):
271 265
272 266 print "Parameter[%s]: name = %s, value = %s, format = %s" %(self.id, self.name, self.value, self.format)
273 267
274 268 class OperationConf():
275 269
276 270 id = None
277 271 name = None
278 272 priority = None
279 273 type = None
280 274
281 275 parmConfObjList = []
282 276
283 277 ELEMENTNAME = 'Operation'
284 278
285 279 def __init__(self):
286 280
287 281 self.id = '0'
288 282 self.name = None
289 283 self.priority = None
290 284 self.type = 'self'
291 285
292 286
293 287 def __getNewId(self):
294 288
295 289 return int(self.id)*10 + len(self.parmConfObjList) + 1
296 290
297 291 def updateId(self, new_id):
298 292
299 293 self.id = str(new_id)
300 294
301 295 n = 1
302 296 for parmObj in self.parmConfObjList:
303 297
304 298 idParm = str(int(new_id)*10 + n)
305 299 parmObj.updateId(idParm)
306 300
307 301 n += 1
308 302
309 303 def getElementName(self):
310 304
311 305 return self.ELEMENTNAME
312 306
313 307 def getParameterObjList(self):
314 308
315 309 return self.parmConfObjList
316 310
317 311 def getParameterObj(self, parameterName):
318 312
319 313 for parmConfObj in self.parmConfObjList:
320 314
321 315 if parmConfObj.name != parameterName:
322 316 continue
323 317
324 318 return parmConfObj
325 319
326 320 return None
327 321
328 322 def getParameterObjfromValue(self, parameterValue):
329 323
330 324 for parmConfObj in self.parmConfObjList:
331 325
332 326 if parmConfObj.getValue() != parameterValue:
333 327 continue
334 328
335 329 return parmConfObj.getValue()
336 330
337 331 return None
338 332
339 333 def getParameterValue(self, parameterName):
340 334
341 335 parameterObj = self.getParameterObj(parameterName)
342 336
343 337 # if not parameterObj:
344 338 # return None
345 339
346 340 value = parameterObj.getValue()
347 341
348 342 return value
349 343
350 344
351 345 def getKwargs(self):
352 346
353 347 kwargs = {}
354 348
355 349 for parmConfObj in self.parmConfObjList:
356 350 if self.name == 'run' and parmConfObj.name == 'datatype':
357 351 continue
358 352
359 353 kwargs[parmConfObj.name] = parmConfObj.getValue()
360 354
361 355 return kwargs
362 356
363 357 def setup(self, id, name, priority, type):
364 358
365 359 self.id = str(id)
366 360 self.name = name
367 361 self.type = type
368 362 self.priority = priority
369 363
370 364 self.parmConfObjList = []
371 365
372 366 def removeParameters(self):
373 367
374 368 for obj in self.parmConfObjList:
375 369 del obj
376 370
377 371 self.parmConfObjList = []
378 372
379 373 def addParameter(self, name, value, format='str'):
380 374
381 375 id = self.__getNewId()
382 376
383 377 parmConfObj = ParameterConf()
384 378 if not parmConfObj.setup(id, name, value, format):
385 379 return None
386 380
387 381 self.parmConfObjList.append(parmConfObj)
388 382
389 383 return parmConfObj
390 384
391 385 def changeParameter(self, name, value, format='str'):
392 386
393 387 parmConfObj = self.getParameterObj(name)
394 388 parmConfObj.update(name, value, format)
395 389
396 390 return parmConfObj
397 391
398 392 def makeXml(self, procUnitElement):
399 393
400 394 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
401 395 opElement.set('id', str(self.id))
402 396 opElement.set('name', self.name)
403 397 opElement.set('type', self.type)
404 398 opElement.set('priority', str(self.priority))
405 399
406 400 for parmConfObj in self.parmConfObjList:
407 401 parmConfObj.makeXml(opElement)
408 402
409 403 def readXml(self, opElement):
410 404
411 405 self.id = opElement.get('id')
412 406 self.name = opElement.get('name')
413 407 self.type = opElement.get('type')
414 408 self.priority = opElement.get('priority')
415 409
416 410 #Compatible with old signal chain version
417 411 #Use of 'run' method instead 'init'
418 412 if self.type == 'self' and self.name == 'init':
419 413 self.name = 'run'
420 414
421 415 self.parmConfObjList = []
422 416
423 417 parmElementList = opElement.iter(ParameterConf().getElementName())
424 418
425 419 for parmElement in parmElementList:
426 420 parmConfObj = ParameterConf()
427 421 parmConfObj.readXml(parmElement)
428 422
429 423 #Compatible with old signal chain version
430 424 #If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
431 425 if self.type != 'self' and self.name == 'Plot':
432 426 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
433 427 self.name = parmConfObj.value
434 428 continue
435 429
436 430 self.parmConfObjList.append(parmConfObj)
437 431
438 432 def printattr(self):
439 433
440 434 print "%s[%s]: name = %s, type = %s, priority = %s" %(self.ELEMENTNAME,
441 435 self.id,
442 436 self.name,
443 437 self.type,
444 438 self.priority)
445 439
446 440 for parmConfObj in self.parmConfObjList:
447 441 parmConfObj.printattr()
448 442
449 443 def createObject(self, plotter_queue=None):
450 444
451 445
452 446 if self.type == 'self':
453 447 raise ValueError, "This operation type cannot be created"
454 448
455 449 if self.type == 'plotter':
456 450 #Plotter(plotter_name)
457 451 if not plotter_queue:
458 452 raise ValueError, "plotter_queue is not defined. Use:\nmyProject = Project()\nmyProject.setPlotterQueue(plotter_queue)"
459 453
460 454 opObj = Plotter(self.name, plotter_queue)
461 455
462 456 if self.type == 'external' or self.type == 'other':
463 457
464 458 className = eval(self.name)
465 459 kwargs = self.getKwargs()
466 460
467 461 opObj = className(**kwargs)
468 462
469 463 return opObj
470 464
471 465
472 466 class ProcUnitConf():
473 467
474 468 id = None
475 469 name = None
476 470 datatype = None
477 471 inputId = None
478 472 parentId = None
479 473
480 474 opConfObjList = []
481 475
482 476 procUnitObj = None
483 477 opObjList = []
484 478
485 479 ELEMENTNAME = 'ProcUnit'
486 480
487 481 def __init__(self):
488 482
489 483 self.id = None
490 484 self.datatype = None
491 485 self.name = None
492 486 self.inputId = None
493 487
494 488 self.opConfObjList = []
495 489
496 490 self.procUnitObj = None
497 491 self.opObjDict = {}
498 492
499 493 def __getPriority(self):
500 494
501 495 return len(self.opConfObjList)+1
502 496
503 497 def __getNewId(self):
504 498
505 499 return int(self.id)*10 + len(self.opConfObjList) + 1
506 500
507 501 def getElementName(self):
508 502
509 503 return self.ELEMENTNAME
510 504
511 505 def getId(self):
512 506
513 507 return self.id
514 508
515 509 def updateId(self, new_id, parentId=parentId):
516 510
517 511
518 512 new_id = int(parentId)*10 + (int(self.id) % 10)
519 513 new_inputId = int(parentId)*10 + (int(self.inputId) % 10)
520 514
521 515 #If this proc unit has not inputs
522 516 if self.inputId == '0':
523 517 new_inputId = 0
524 518
525 519 n = 1
526 520 for opConfObj in self.opConfObjList:
527 521
528 522 idOp = str(int(new_id)*10 + n)
529 523 opConfObj.updateId(idOp)
530 524
531 525 n += 1
532 526
533 527 self.parentId = str(parentId)
534 528 self.id = str(new_id)
535 529 self.inputId = str(new_inputId)
536 530
537 531
538 532 def getInputId(self):
539 533
540 534 return self.inputId
541 535
542 536 def getOperationObjList(self):
543 537
544 538 return self.opConfObjList
545 539
546 540 def getOperationObj(self, name=None):
547 541
548 542 for opConfObj in self.opConfObjList:
549 543
550 544 if opConfObj.name != name:
551 545 continue
552 546
553 547 return opConfObj
554 548
555 549 return None
556 550
557 551 def getOpObjfromParamValue(self, value=None):
558 552
559 553 for opConfObj in self.opConfObjList:
560 554 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
561 555 continue
562 556 return opConfObj
563 557 return None
564 558
565 559 def getProcUnitObj(self):
566 560
567 561 return self.procUnitObj
568 562
569 563 def setup(self, id, name, datatype, inputId, parentId=None):
570 564
571 565 #Compatible with old signal chain version
572 566 if datatype==None and name==None:
573 567 raise ValueError, "datatype or name should be defined"
574 568
575 569 if name==None:
576 570 if 'Proc' in datatype:
577 571 name = datatype
578 572 else:
579 573 name = '%sProc' %(datatype)
580 574
581 575 if datatype==None:
582 576 datatype = name.replace('Proc','')
583 577
584 578 self.id = str(id)
585 579 self.name = name
586 580 self.datatype = datatype
587 581 self.inputId = inputId
588 582 self.parentId = parentId
589 583
590 584 self.opConfObjList = []
591 585
592 586 self.addOperation(name='run', optype='self')
593 587
594 588 def removeOperations(self):
595 589
596 590 for obj in self.opConfObjList:
597 591 del obj
598 592
599 593 self.opConfObjList = []
600 594 self.addOperation(name='run')
601 595
602 596 def addParameter(self, **kwargs):
603 597 '''
604 598 Add parameters to "run" operation
605 599 '''
606 600 opObj = self.opConfObjList[0]
607 601
608 602 opObj.addParameter(**kwargs)
609 603
610 604 return opObj
611 605
612 606 def addOperation(self, name, optype='self'):
613 607
614 608 id = self.__getNewId()
615 609 priority = self.__getPriority()
616 610
617 611 opConfObj = OperationConf()
618 612 opConfObj.setup(id, name=name, priority=priority, type=optype)
619 613
620 614 self.opConfObjList.append(opConfObj)
621 615
622 616 return opConfObj
623 617
624 618 def makeXml(self, projectElement):
625 619
626 620 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
627 621 procUnitElement.set('id', str(self.id))
628 622 procUnitElement.set('name', self.name)
629 623 procUnitElement.set('datatype', self.datatype)
630 624 procUnitElement.set('inputId', str(self.inputId))
631 625
632 626 for opConfObj in self.opConfObjList:
633 627 opConfObj.makeXml(procUnitElement)
634 628
635 629 def readXml(self, upElement):
636 630
637 631 self.id = upElement.get('id')
638 632 self.name = upElement.get('name')
639 633 self.datatype = upElement.get('datatype')
640 634 self.inputId = upElement.get('inputId')
641 635
642 636 if self.ELEMENTNAME == "ReadUnit":
643 637 self.datatype = self.datatype.replace("Reader", "")
644 638
645 639 if self.ELEMENTNAME == "ProcUnit":
646 640 self.datatype = self.datatype.replace("Proc", "")
647 641
648 642 if self.inputId == 'None':
649 643 self.inputId = '0'
650 644
651 645 self.opConfObjList = []
652 646
653 647 opElementList = upElement.iter(OperationConf().getElementName())
654 648
655 649 for opElement in opElementList:
656 650 opConfObj = OperationConf()
657 651 opConfObj.readXml(opElement)
658 652 self.opConfObjList.append(opConfObj)
659 653
660 654 def printattr(self):
661 655
662 656 print "%s[%s]: name = %s, datatype = %s, inputId = %s" %(self.ELEMENTNAME,
663 657 self.id,
664 658 self.name,
665 659 self.datatype,
666 660 self.inputId)
667 661
668 662 for opConfObj in self.opConfObjList:
669 663 opConfObj.printattr()
670 664
671 665
672 666 def getKwargs(self):
673 667
674 668 opObj = self.opConfObjList[0]
675 669 kwargs = opObj.getKwargs()
676 670
677 671 return kwargs
678 672
679 673 def createObjects(self, plotter_queue=None):
680 674
681 675 className = eval(self.name)
682 676 kwargs = self.getKwargs()
683 677 procUnitObj = className(**kwargs)
684 678
685 679 for opConfObj in self.opConfObjList:
686 680
687 681 if opConfObj.type=='self' and self.name=='run':
688 682 continue
689 683 elif opConfObj.type=='self':
690 684 procUnitObj.addOperationKwargs(opConfObj.id, **opConfObj.getKwargs())
691 685 continue
692 686
693 687 opObj = opConfObj.createObject(plotter_queue)
694 688
695 689 self.opObjDict[opConfObj.id] = opObj
696 690
697 691 procUnitObj.addOperation(opObj, opConfObj.id)
698 692
699 693 self.procUnitObj = procUnitObj
700 694
701 695 return procUnitObj
702 696
703 697 def run(self):
704 698
705 699 is_ok = False
706 700
707 701 for opConfObj in self.opConfObjList:
708 702
709 703 kwargs = {}
710 704 for parmConfObj in opConfObj.getParameterObjList():
711 705 if opConfObj.name == 'run' and parmConfObj.name == 'datatype':
712 706 continue
713 707
714 708 kwargs[parmConfObj.name] = parmConfObj.getValue()
715 709
716 710 #ini = time.time()
717 711
718 712 #print "\tRunning the '%s' operation with %s" %(opConfObj.name, opConfObj.id)
719 713 sts = self.procUnitObj.call(opType = opConfObj.type,
720 714 opName = opConfObj.name,
721 715 opId = opConfObj.id)
722 716
723 717 # total_time = time.time() - ini
724 718 #
725 719 # if total_time > 0.002:
726 720 # print "%s::%s took %f seconds" %(self.name, opConfObj.name, total_time)
727 721
728 722 is_ok = is_ok or sts
729 723
730 724 return is_ok
731 725
732 726 def close(self):
733 727
734 728 for opConfObj in self.opConfObjList:
735 729 if opConfObj.type == 'self':
736 730 continue
737 731
738 732 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
739 733 opObj.close()
740 734
741 735 self.procUnitObj.close()
742 736
743 737 return
744 738
745 739 class ReadUnitConf(ProcUnitConf):
746 740
747 741 path = None
748 742 startDate = None
749 743 endDate = None
750 744 startTime = None
751 745 endTime = None
752 746
753 747 ELEMENTNAME = 'ReadUnit'
754 748
755 749 def __init__(self):
756 750
757 751 self.id = None
758 752 self.datatype = None
759 753 self.name = None
760 754 self.inputId = None
761 755
762 756 self.parentId = None
763 757
764 758 self.opConfObjList = []
765 759 self.opObjList = []
766 760
767 761 def getElementName(self):
768 762
769 763 return self.ELEMENTNAME
770 764
771 765 def setup(self, id, name, datatype, path='', startDate="", endDate="", startTime="",
772 766 endTime="", parentId=None, queue=None, server=None, **kwargs):
773 767 #Compatible with old signal chain version
774 768 if datatype==None and name==None:
775 769 raise ValueError, "datatype or name should be defined"
776 770
777 771 if name==None:
778 772 if 'Reader' in datatype:
779 773 name = datatype
780 774 else:
781 775 name = '%sReader' %(datatype)
782 776 if datatype==None:
783 777 datatype = name.replace('Reader','')
784 778
785 779 self.id = id
786 780 self.name = name
787 781 self.datatype = datatype
788 782 if path != '':
789 783 self.path = os.path.abspath(path)
790 784 self.startDate = startDate
791 785 self.endDate = endDate
792 786 self.startTime = startTime
793 787 self.endTime = endTime
794 788
795 789 self.inputId = '0'
796 790 self.parentId = parentId
797 791 self.queue = queue
798 792 self.server = server
799 793 self.addRunOperation(**kwargs)
800 794
801 795 def update(self, datatype, path, startDate, endDate, startTime, endTime, parentId=None, name=None, **kwargs):
802 796
803 797 #Compatible with old signal chain version
804 798 if datatype==None and name==None:
805 799 raise ValueError, "datatype or name should be defined"
806 800
807 801 if name==None:
808 802 if 'Reader' in datatype:
809 803 name = datatype
810 804 else:
811 805 name = '%sReader' %(datatype)
812 806
813 807 if datatype==None:
814 808 datatype = name.replace('Reader','')
815 809
816 810 self.datatype = datatype
817 811 self.name = name
818 812 self.path = path
819 813 self.startDate = startDate
820 814 self.endDate = endDate
821 815 self.startTime = startTime
822 816 self.endTime = endTime
823 817
824 818 self.inputId = '0'
825 819 self.parentId = parentId
826 820
827 821 self.updateRunOperation(**kwargs)
828 822
829 823 def removeOperations(self):
830 824
831 825 for obj in self.opConfObjList:
832 826 del obj
833 827
834 828 self.opConfObjList = []
835 829
836 830 def addRunOperation(self, **kwargs):
837 831
838 832 opObj = self.addOperation(name = 'run', optype = 'self')
839 833
840 834 if self.server is None:
841 835 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
842 836 opObj.addParameter(name='path' , value=self.path, format='str')
843 837 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
844 838 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
845 839 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
846 840 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
847 841 opObj.addParameter(name='queue' , value=self.queue, format='obj')
848 842 for key, value in kwargs.items():
849 843 opObj.addParameter(name=key, value=value, format=type(value).__name__)
850 844 else:
851 845 opObj.addParameter(name='server' , value=self.server, format='str')
852 846
853 847
854 848 return opObj
855 849
856 850 def updateRunOperation(self, **kwargs):
857 851
858 852 opObj = self.getOperationObj(name = 'run')
859 853 opObj.removeParameters()
860 854
861 855 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
862 856 opObj.addParameter(name='path' , value=self.path, format='str')
863 857 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
864 858 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
865 859 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
866 860 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
867 861
868 862 for key, value in kwargs.items():
869 863 opObj.addParameter(name=key, value=value, format=type(value).__name__)
870 864
871 865 return opObj
872 866
873 867 # def makeXml(self, projectElement):
874 868 #
875 869 # procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
876 870 # procUnitElement.set('id', str(self.id))
877 871 # procUnitElement.set('name', self.name)
878 872 # procUnitElement.set('datatype', self.datatype)
879 873 # procUnitElement.set('inputId', str(self.inputId))
880 874 #
881 875 # for opConfObj in self.opConfObjList:
882 876 # opConfObj.makeXml(procUnitElement)
883 877
884 878 def readXml(self, upElement):
885 879
886 880 self.id = upElement.get('id')
887 881 self.name = upElement.get('name')
888 882 self.datatype = upElement.get('datatype')
889 883 self.inputId = upElement.get('inputId')
890 884
891 885 if self.ELEMENTNAME == "ReadUnit":
892 886 self.datatype = self.datatype.replace("Reader", "")
893 887
894 888 if self.inputId == 'None':
895 889 self.inputId = '0'
896 890
897 891 self.opConfObjList = []
898 892
899 893 opElementList = upElement.iter(OperationConf().getElementName())
900 894
901 895 for opElement in opElementList:
902 896 opConfObj = OperationConf()
903 897 opConfObj.readXml(opElement)
904 898 self.opConfObjList.append(opConfObj)
905 899
906 900 if opConfObj.name == 'run':
907 901 self.path = opConfObj.getParameterValue('path')
908 902 self.startDate = opConfObj.getParameterValue('startDate')
909 903 self.endDate = opConfObj.getParameterValue('endDate')
910 904 self.startTime = opConfObj.getParameterValue('startTime')
911 905 self.endTime = opConfObj.getParameterValue('endTime')
912 906
913 907 class Project(Process):
914 908 id = None
915 909 name = None
916 910 description = None
917 911 filename = None
918 912
919 913 procUnitConfObjDict = None
920 914
921 915 ELEMENTNAME = 'Project'
922 916
923 917 plotterQueue = None
924 918
925 919 def __init__(self, plotter_queue=None, logfile=None):
926 920 Process.__init__(self)
927 921 self.id = None
928 922 self.name = None
929 923 self.description = None
930 924 if logfile is not None:
931 925 logToFile(logfile)
932 926 self.plotterQueue = plotter_queue
933 927
934 928 self.procUnitConfObjDict = {}
935 929
936 930 def __getNewId(self):
937 931
938 932 idList = self.procUnitConfObjDict.keys()
939 933
940 934 id = int(self.id)*10
941 935
942 936 while True:
943 937 id += 1
944 938
945 939 if str(id) in idList:
946 940 continue
947 941
948 942 break
949 943
950 944 return str(id)
951 945
952 946 def getElementName(self):
953 947
954 948 return self.ELEMENTNAME
955 949
956 950 def getId(self):
957 951
958 952 return self.id
959 953
960 954 def updateId(self, new_id):
961 955
962 956 self.id = str(new_id)
963 957
964 958 keyList = self.procUnitConfObjDict.keys()
965 959 keyList.sort()
966 960
967 961 n = 1
968 962 newProcUnitConfObjDict = {}
969 963
970 964 for procKey in keyList:
971 965
972 966 procUnitConfObj = self.procUnitConfObjDict[procKey]
973 967 idProcUnit = str(int(self.id)*10 + n)
974 968 procUnitConfObj.updateId(idProcUnit, parentId = self.id)
975 969
976 970 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
977 971 n += 1
978 972
979 973 self.procUnitConfObjDict = newProcUnitConfObjDict
980 974
981 975 def setup(self, id, name, description):
982 976
983 977 self.id = str(id)
984 978 self.name = name
985 979 self.description = description
986 980
987 981 def update(self, name, description):
988 982
989 983 self.name = name
990 984 self.description = description
991 985
992 986 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
993 987 if id is None:
994 988 idReadUnit = self.__getNewId()
995 989 else:
996 990 idReadUnit = str(id)
997 991
998 992 readUnitConfObj = ReadUnitConf()
999 993 readUnitConfObj.setup(idReadUnit, name, datatype, parentId=self.id, **kwargs)
1000 994
1001 995 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1002 996
1003 997 return readUnitConfObj
1004 998
1005 999 def addProcUnit(self, inputId='0', datatype=None, name=None):
1006 1000
1007 1001 idProcUnit = self.__getNewId()
1008 1002
1009 1003 procUnitConfObj = ProcUnitConf()
1010 1004 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, parentId=self.id)
1011 1005
1012 1006 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1013 1007
1014 1008 return procUnitConfObj
1015 1009
1016 1010 def removeProcUnit(self, id):
1017 1011
1018 1012 if id in self.procUnitConfObjDict.keys():
1019 1013 self.procUnitConfObjDict.pop(id)
1020 1014
1021 1015 def getReadUnitId(self):
1022 1016
1023 1017 readUnitConfObj = self.getReadUnitObj()
1024 1018
1025 1019 return readUnitConfObj.id
1026 1020
1027 1021 def getReadUnitObj(self):
1028 1022
1029 1023 for obj in self.procUnitConfObjDict.values():
1030 1024 if obj.getElementName() == "ReadUnit":
1031 1025 return obj
1032 1026
1033 1027 return None
1034 1028
1035 1029 def getProcUnitObj(self, id=None, name=None):
1036 1030
1037 1031 if id != None:
1038 1032 return self.procUnitConfObjDict[id]
1039 1033
1040 1034 if name != None:
1041 1035 return self.getProcUnitObjByName(name)
1042 1036
1043 1037 return None
1044 1038
1045 1039 def getProcUnitObjByName(self, name):
1046 1040
1047 1041 for obj in self.procUnitConfObjDict.values():
1048 1042 if obj.name == name:
1049 1043 return obj
1050 1044
1051 1045 return None
1052 1046
1053 1047 def procUnitItems(self):
1054 1048
1055 1049 return self.procUnitConfObjDict.items()
1056 1050
1057 1051 def makeXml(self):
1058 1052
1059 1053 projectElement = Element('Project')
1060 1054 projectElement.set('id', str(self.id))
1061 1055 projectElement.set('name', self.name)
1062 1056 projectElement.set('description', self.description)
1063 1057
1064 1058 for procUnitConfObj in self.procUnitConfObjDict.values():
1065 1059 procUnitConfObj.makeXml(projectElement)
1066 1060
1067 1061 self.projectElement = projectElement
1068 1062
1069 1063 def writeXml(self, filename=None):
1070 1064
1071 1065 if filename == None:
1072 1066 if self.filename:
1073 1067 filename = self.filename
1074 1068 else:
1075 1069 filename = "schain.xml"
1076 1070
1077 1071 if not filename:
1078 1072 print "filename has not been defined. Use setFilename(filename) for do it."
1079 1073 return 0
1080 1074
1081 1075 abs_file = os.path.abspath(filename)
1082 1076
1083 1077 if not os.access(os.path.dirname(abs_file), os.W_OK):
1084 1078 print "No write permission on %s" %os.path.dirname(abs_file)
1085 1079 return 0
1086 1080
1087 1081 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1088 1082 print "File %s already exists and it could not be overwriten" %abs_file
1089 1083 return 0
1090 1084
1091 1085 self.makeXml()
1092 1086
1093 1087 ElementTree(self.projectElement).write(abs_file, method='xml')
1094 1088
1095 1089 self.filename = abs_file
1096 1090
1097 1091 return 1
1098 1092
1099 1093 def readXml(self, filename = None):
1100 1094
1101 1095 if not filename:
1102 1096 print "filename is not defined"
1103 1097 return 0
1104 1098
1105 1099 abs_file = os.path.abspath(filename)
1106 1100
1107 1101 if not os.path.isfile(abs_file):
1108 1102 print "%s file does not exist" %abs_file
1109 1103 return 0
1110 1104
1111 1105 self.projectElement = None
1112 1106 self.procUnitConfObjDict = {}
1113 1107
1114 1108 try:
1115 1109 self.projectElement = ElementTree().parse(abs_file)
1116 1110 except:
1117 1111 print "Error reading %s, verify file format" %filename
1118 1112 return 0
1119 1113
1120 1114 self.project = self.projectElement.tag
1121 1115
1122 1116 self.id = self.projectElement.get('id')
1123 1117 self.name = self.projectElement.get('name')
1124 1118 self.description = self.projectElement.get('description')
1125 1119
1126 1120 readUnitElementList = self.projectElement.iter(ReadUnitConf().getElementName())
1127 1121
1128 1122 for readUnitElement in readUnitElementList:
1129 1123 readUnitConfObj = ReadUnitConf()
1130 1124 readUnitConfObj.readXml(readUnitElement)
1131 1125
1132 1126 if readUnitConfObj.parentId == None:
1133 1127 readUnitConfObj.parentId = self.id
1134 1128
1135 1129 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1136 1130
1137 1131 procUnitElementList = self.projectElement.iter(ProcUnitConf().getElementName())
1138 1132
1139 1133 for procUnitElement in procUnitElementList:
1140 1134 procUnitConfObj = ProcUnitConf()
1141 1135 procUnitConfObj.readXml(procUnitElement)
1142 1136
1143 1137 if procUnitConfObj.parentId == None:
1144 1138 procUnitConfObj.parentId = self.id
1145 1139
1146 1140 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1147 1141
1148 1142 self.filename = abs_file
1149 1143
1150 1144 return 1
1151 1145
1152 1146 def printattr(self):
1153 1147
1154 1148 print "Project[%s]: name = %s, description = %s" %(self.id,
1155 1149 self.name,
1156 1150 self.description)
1157 1151
1158 1152 for procUnitConfObj in self.procUnitConfObjDict.values():
1159 1153 procUnitConfObj.printattr()
1160 1154
1161 1155 def createObjects(self):
1162 1156
1163 1157 for procUnitConfObj in self.procUnitConfObjDict.values():
1164 1158 procUnitConfObj.createObjects(self.plotterQueue)
1165 1159
1166 1160 def __connect(self, objIN, thisObj):
1167 1161
1168 1162 thisObj.setInput(objIN.getOutputObj())
1169 1163
1170 1164 def connectObjects(self):
1171 1165
1172 1166 for thisPUConfObj in self.procUnitConfObjDict.values():
1173 1167
1174 1168 inputId = thisPUConfObj.getInputId()
1175 1169
1176 1170 if int(inputId) == 0:
1177 1171 continue
1178 1172
1179 1173 #Get input object
1180 1174 puConfINObj = self.procUnitConfObjDict[inputId]
1181 1175 puObjIN = puConfINObj.getProcUnitObj()
1182 1176
1183 1177 #Get current object
1184 1178 thisPUObj = thisPUConfObj.getProcUnitObj()
1185 1179
1186 1180 self.__connect(puObjIN, thisPUObj)
1187 1181
1188 1182 def __handleError(self, procUnitConfObj, send_email=True):
1189 1183
1190 1184 import socket
1191 1185
1192 1186 err = traceback.format_exception(sys.exc_info()[0],
1193 1187 sys.exc_info()[1],
1194 1188 sys.exc_info()[2])
1195 1189
1196 1190 print "***** Error occurred in %s *****" %(procUnitConfObj.name)
1197 1191 print "***** %s" %err[-1]
1198 1192
1199 1193 message = "".join(err)
1200 1194
1201 1195 sys.stderr.write(message)
1202 1196
1203 1197 if not send_email:
1204 1198 return
1205 1199
1206 1200 subject = "SChain v%s: Error running %s\n" %(schainpy.__version__, procUnitConfObj.name)
1207 1201
1208 1202 subtitle = "%s: %s\n" %(procUnitConfObj.getElementName() ,procUnitConfObj.name)
1209 1203 subtitle += "Hostname: %s\n" %socket.gethostbyname(socket.gethostname())
1210 1204 subtitle += "Working directory: %s\n" %os.path.abspath("./")
1211 1205 subtitle += "Configuration file: %s\n" %self.filename
1212 1206 subtitle += "Time: %s\n" %str(datetime.datetime.now())
1213 1207
1214 1208 readUnitConfObj = self.getReadUnitObj()
1215 1209 if readUnitConfObj:
1216 1210 subtitle += "\nInput parameters:\n"
1217 1211 subtitle += "[Data path = %s]\n" %readUnitConfObj.path
1218 1212 subtitle += "[Data type = %s]\n" %readUnitConfObj.datatype
1219 1213 subtitle += "[Start date = %s]\n" %readUnitConfObj.startDate
1220 1214 subtitle += "[End date = %s]\n" %readUnitConfObj.endDate
1221 1215 subtitle += "[Start time = %s]\n" %readUnitConfObj.startTime
1222 1216 subtitle += "[End time = %s]\n" %readUnitConfObj.endTime
1223 1217
1224 1218 adminObj = schainpy.admin.SchainNotify()
1225 1219 adminObj.sendAlert(message=message,
1226 1220 subject=subject,
1227 1221 subtitle=subtitle,
1228 1222 filename=self.filename)
1229 1223
1230 1224 def isPaused(self):
1231 1225 return 0
1232 1226
1233 1227 def isStopped(self):
1234 1228 return 0
1235 1229
1236 1230 def runController(self):
1237 1231 """
1238 1232 returns 0 when this process has been stopped, 1 otherwise
1239 1233 """
1240 1234
1241 1235 if self.isPaused():
1242 1236 print "Process suspended"
1243 1237
1244 1238 while True:
1245 1239 sleep(0.1)
1246 1240
1247 1241 if not self.isPaused():
1248 1242 break
1249 1243
1250 1244 if self.isStopped():
1251 1245 break
1252 1246
1253 1247 print "Process reinitialized"
1254 1248
1255 1249 if self.isStopped():
1256 1250 print "Process stopped"
1257 1251 return 0
1258 1252
1259 1253 return 1
1260 1254
1261 1255 def setFilename(self, filename):
1262 1256
1263 1257 self.filename = filename
1264 1258
1265 1259 def setPlotterQueue(self, plotter_queue):
1266 1260
1267 1261 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1268 1262
1269 1263 def getPlotterQueue(self):
1270 1264
1271 1265 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1272 1266
1273 1267 def useExternalPlotter(self):
1274 1268
1275 1269 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1276 1270
1277 1271
1278 1272 def run(self, filename=None):
1279 1273
1280 1274 # self.writeXml(filename)
1281 1275 self.createObjects()
1282 1276 self.connectObjects()
1283 1277
1284 1278 print
1285 1279 print "*"*60
1286 1280 print " Starting SIGNAL CHAIN PROCESSING v%s " %schainpy.__version__
1287 1281 print "*"*60
1288 1282 print
1289 1283
1290 1284 keyList = self.procUnitConfObjDict.keys()
1291 1285 keyList.sort()
1292 1286
1293 1287 while(True):
1294 1288
1295 1289 is_ok = False
1296 1290
1297 1291 for procKey in keyList:
1298 1292 # print "Running the '%s' process with %s" %(procUnitConfObj.name, procUnitConfObj.id)
1299 1293
1300 1294 procUnitConfObj = self.procUnitConfObjDict[procKey]
1301 1295
1302 1296 try:
1303 1297 sts = procUnitConfObj.run()
1304 1298 is_ok = is_ok or sts
1305 1299 except KeyboardInterrupt:
1306 1300 is_ok = False
1307 1301 break
1308 1302 except ValueError, e:
1309 1303 sleep(0.5)
1310 1304 self.__handleError(procUnitConfObj, send_email=True)
1311 1305 is_ok = False
1312 1306 break
1313 1307 except:
1314 1308 sleep(0.5)
1315 1309 self.__handleError(procUnitConfObj)
1316 1310 is_ok = False
1317 1311 break
1318 1312
1319 1313 #If every process unit finished so end process
1320 1314 if not(is_ok):
1321 1315 # print "Every process unit have finished"
1322 1316 break
1323 1317
1324 1318 if not self.runController():
1325 1319 break
1326 1320
1327 1321 #Closing every process
1328 1322 for procKey in keyList:
1329 1323 procUnitConfObj = self.procUnitConfObjDict[procKey]
1330 1324 procUnitConfObj.close()
General Comments 0
You need to be logged in to leave comments. Login now