##// END OF EJS Templates
controller.py
José Chávez -
r1060:70b07edd23c2
parent child
Show More
This diff has been collapsed as it changes many lines, (834 lines changed) Show them Hide them
@@ -1,1280 +1,1324
1 1 '''
2 2 Created on September , 2012
3 @author:
3 @author:
4 4 '''
5
5
6 6 import sys
7 7 import ast
8 8 import datetime
9 9 import traceback
10 import math
11 import time
12 from multiprocessing import Process, Queue, cpu_count
13
10 14 import schainpy
11 15 import schainpy.admin
12 16 from schainpy.utils.log import logToFile
13 17
14 18 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
15 19 from xml.dom import minidom
16 20
17 from multiprocessing import cpu_count
18 21 from schainpy.model import *
19 22 from time import sleep
20 23
21 24
25
22 26 def prettify(elem):
23 27 """Return a pretty-printed XML string for the Element.
24 28 """
25 29 rough_string = tostring(elem, 'utf-8')
26 30 reparsed = minidom.parseString(rough_string)
27 31 return reparsed.toprettyxml(indent=" ")
28 32
29 33 def multiSchain(child, nProcess=cpu_count(), startDate=None, endDate=None, by_day=False):
30 34 skip = 0
31 35 cursor = 0
32 36 nFiles = None
33 37 processes = []
34 38 dt1 = datetime.datetime.strptime(startDate, '%Y/%m/%d')
35 39 dt2 = datetime.datetime.strptime(endDate, '%Y/%m/%d')
36 40 days = (dt2 - dt1).days
37 41
38 42 for day in range(days+1):
39 43 skip = 0
40 44 cursor = 0
41 45 q = Queue()
42 46 processes = []
43 47 dt = (dt1 + datetime.timedelta(day)).strftime('%Y/%m/%d')
44 48 firstProcess = Process(target=child, args=(cursor, skip, q, dt))
45 49 firstProcess.start()
46 50 if by_day:
47 51 continue
48 52 nFiles = q.get()
49 53 if nFiles==0:
50 54 continue
51 55 firstProcess.terminate()
52 56 skip = int(math.ceil(nFiles/nProcess))
53 57 while True:
54 58 processes.append(Process(target=child, args=(cursor, skip, q, dt)))
55 59 processes[cursor].start()
56 60 if nFiles < cursor*skip:
57 61 break
58 62 cursor += 1
59 63
60 64 def beforeExit(exctype, value, trace):
61 65 for process in processes:
62 66 process.terminate()
63 67 process.join()
64 68 print traceback.print_tb(trace)
65 69
66 70 sys.excepthook = beforeExit
67 71
68 72 for process in processes:
69 73 process.join()
70 74 process.terminate()
71 75
72 76 time.sleep(3)
73 77
74 78
75 79 class ParameterConf():
76
80
77 81 id = None
78 82 name = None
79 83 value = None
80 84 format = None
81
85
82 86 __formated_value = None
83
87
84 88 ELEMENTNAME = 'Parameter'
85
89
86 90 def __init__(self):
87
91
88 92 self.format = 'str'
89
93
90 94 def getElementName(self):
91
95
92 96 return self.ELEMENTNAME
93
97
94 98 def getValue(self):
95 99
96 100 value = self.value
97 101 format = self.format
98
102
99 103 if self.__formated_value != None:
100
104
101 105 return self.__formated_value
102
106
107 if format == 'obj':
108 return value
109
103 110 if format == 'str':
104 111 self.__formated_value = str(value)
105 112 return self.__formated_value
106
113
107 114 if value == '':
108 115 raise ValueError, "%s: This parameter value is empty" %self.name
109
116
110 117 if format == 'list':
111 118 strList = value.split(',')
112
119
113 120 self.__formated_value = strList
114
121
115 122 return self.__formated_value
116
123
117 124 if format == 'intlist':
118 125 """
119 126 Example:
120 127 value = (0,1,2)
121 128 """
122
129
123 130 new_value = ast.literal_eval(value)
124
131
125 132 if type(new_value) not in (tuple, list):
126 133 new_value = [int(new_value)]
127
134
128 135 self.__formated_value = new_value
129
136
130 137 return self.__formated_value
131
138
132 139 if format == 'floatlist':
133 140 """
134 141 Example:
135 142 value = (0.5, 1.4, 2.7)
136 143 """
137
144
138 145 new_value = ast.literal_eval(value)
139
146
140 147 if type(new_value) not in (tuple, list):
141 148 new_value = [float(new_value)]
142
149
143 150 self.__formated_value = new_value
144
151
145 152 return self.__formated_value
146
153
147 154 if format == 'date':
148 155 strList = value.split('/')
149 156 intList = [int(x) for x in strList]
150 157 date = datetime.date(intList[0], intList[1], intList[2])
151
158
152 159 self.__formated_value = date
153
160
154 161 return self.__formated_value
155
162
156 163 if format == 'time':
157 164 strList = value.split(':')
158 165 intList = [int(x) for x in strList]
159 166 time = datetime.time(intList[0], intList[1], intList[2])
160
167
161 168 self.__formated_value = time
162
169
163 170 return self.__formated_value
164
171
165 172 if format == 'pairslist':
166 173 """
167 174 Example:
168 175 value = (0,1),(1,2)
169 176 """
170 177
171 178 new_value = ast.literal_eval(value)
172
179
173 180 if type(new_value) not in (tuple, list):
174 181 raise ValueError, "%s has to be a tuple or list of pairs" %value
175
182
176 183 if type(new_value[0]) not in (tuple, list):
177 184 if len(new_value) != 2:
178 185 raise ValueError, "%s has to be a tuple or list of pairs" %value
179 186 new_value = [new_value]
180
187
181 188 for thisPair in new_value:
182 189 if len(thisPair) != 2:
183 190 raise ValueError, "%s has to be a tuple or list of pairs" %value
184
191
185 192 self.__formated_value = new_value
186
193
187 194 return self.__formated_value
188
195
189 196 if format == 'multilist':
190 197 """
191 198 Example:
192 199 value = (0,1,2),(3,4,5)
193 200 """
194 201 multiList = ast.literal_eval(value)
195
202
196 203 if type(multiList[0]) == int:
197 204 multiList = ast.literal_eval("(" + value + ")")
198
205
199 206 self.__formated_value = multiList
200
207
201 208 return self.__formated_value
202
209
203 210 if format == 'bool':
204 211 value = int(value)
205
212
206 213 if format == 'int':
207 214 value = float(value)
208
215
209 216 format_func = eval(format)
210
217
211 218 self.__formated_value = format_func(value)
212
219
213 220 return self.__formated_value
214 221
215 222 def updateId(self, new_id):
216
223
217 224 self.id = str(new_id)
218
225
219 226 def setup(self, id, name, value, format='str'):
220
221 227 self.id = str(id)
222 228 self.name = name
223 self.value = str(value)
229 if format == 'obj':
230 self.value = value
231 else:
232 self.value = str(value)
224 233 self.format = str.lower(format)
225
234
226 235 self.getValue()
227
236
228 237 return 1
229
238
230 239 def update(self, name, value, format='str'):
231
240
232 241 self.name = name
233 242 self.value = str(value)
234 243 self.format = format
235
244
236 245 def makeXml(self, opElement):
237
238 parmElement = SubElement(opElement, self.ELEMENTNAME)
239 parmElement.set('id', str(self.id))
240 parmElement.set('name', self.name)
241 parmElement.set('value', self.value)
242 parmElement.set('format', self.format)
243
246 if self.name not in ('queue',):
247 parmElement = SubElement(opElement, self.ELEMENTNAME)
248 parmElement.set('id', str(self.id))
249 parmElement.set('name', self.name)
250 parmElement.set('value', self.value)
251 parmElement.set('format', self.format)
252
244 253 def readXml(self, parmElement):
245
254
246 255 self.id = parmElement.get('id')
247 256 self.name = parmElement.get('name')
248 257 self.value = parmElement.get('value')
249 258 self.format = str.lower(parmElement.get('format'))
250
259
251 260 #Compatible with old signal chain version
252 261 if self.format == 'int' and self.name == 'idfigure':
253 262 self.name = 'id'
254
263
255 264 def printattr(self):
256
265
257 266 print "Parameter[%s]: name = %s, value = %s, format = %s" %(self.id, self.name, self.value, self.format)
258 267
259 268 class OperationConf():
260 269
261 270 id = None
262 271 name = None
263 272 priority = None
264 273 type = None
265
274
266 275 parmConfObjList = []
267
276
268 277 ELEMENTNAME = 'Operation'
269
278
270 279 def __init__(self):
271
280
272 281 self.id = '0'
273 282 self.name = None
274 283 self.priority = None
275 284 self.type = 'self'
276
277
285
286
278 287 def __getNewId(self):
279
288
280 289 return int(self.id)*10 + len(self.parmConfObjList) + 1
281 290
282 291 def updateId(self, new_id):
283
292
284 293 self.id = str(new_id)
285
294
286 295 n = 1
287 296 for parmObj in self.parmConfObjList:
288
297
289 298 idParm = str(int(new_id)*10 + n)
290 299 parmObj.updateId(idParm)
291
300
292 301 n += 1
293
302
294 303 def getElementName(self):
295
304
296 305 return self.ELEMENTNAME
297
306
298 307 def getParameterObjList(self):
299
308
300 309 return self.parmConfObjList
301
310
302 311 def getParameterObj(self, parameterName):
303
312
304 313 for parmConfObj in self.parmConfObjList:
305
314
306 315 if parmConfObj.name != parameterName:
307 316 continue
308
317
309 318 return parmConfObj
310
319
311 320 return None
312 321
313 322 def getParameterObjfromValue(self, parameterValue):
314
323
315 324 for parmConfObj in self.parmConfObjList:
316
325
317 326 if parmConfObj.getValue() != parameterValue:
318 327 continue
319
328
320 329 return parmConfObj.getValue()
321
330
322 331 return None
323
332
324 333 def getParameterValue(self, parameterName):
325
334
326 335 parameterObj = self.getParameterObj(parameterName)
327 336
328 337 # if not parameterObj:
329 338 # return None
330 339
331 340 value = parameterObj.getValue()
332
341
333 342 return value
334
343
344
345 def getKwargs(self):
346
347 kwargs = {}
348
349 for parmConfObj in self.parmConfObjList:
350 if self.name == 'run' and parmConfObj.name == 'datatype':
351 continue
352
353 kwargs[parmConfObj.name] = parmConfObj.getValue()
354
355 return kwargs
356
335 357 def setup(self, id, name, priority, type):
336
358
337 359 self.id = str(id)
338 360 self.name = name
339 361 self.type = type
340 362 self.priority = priority
341
363
342 364 self.parmConfObjList = []
343
365
344 366 def removeParameters(self):
345
367
346 368 for obj in self.parmConfObjList:
347 369 del obj
348
370
349 371 self.parmConfObjList = []
350
372
351 373 def addParameter(self, name, value, format='str'):
352
374
353 375 id = self.__getNewId()
354
376
355 377 parmConfObj = ParameterConf()
356 378 if not parmConfObj.setup(id, name, value, format):
357 379 return None
358
380
359 381 self.parmConfObjList.append(parmConfObj)
360
382
361 383 return parmConfObj
362
384
363 385 def changeParameter(self, name, value, format='str'):
364
386
365 387 parmConfObj = self.getParameterObj(name)
366 388 parmConfObj.update(name, value, format)
367
389
368 390 return parmConfObj
369
391
370 392 def makeXml(self, procUnitElement):
371
393
372 394 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
373 395 opElement.set('id', str(self.id))
374 396 opElement.set('name', self.name)
375 397 opElement.set('type', self.type)
376 398 opElement.set('priority', str(self.priority))
377
399
378 400 for parmConfObj in self.parmConfObjList:
379 401 parmConfObj.makeXml(opElement)
380
402
381 403 def readXml(self, opElement):
382
404
383 405 self.id = opElement.get('id')
384 406 self.name = opElement.get('name')
385 407 self.type = opElement.get('type')
386 408 self.priority = opElement.get('priority')
387
409
388 410 #Compatible with old signal chain version
389 411 #Use of 'run' method instead 'init'
390 412 if self.type == 'self' and self.name == 'init':
391 413 self.name = 'run'
392
414
393 415 self.parmConfObjList = []
394
416
395 417 parmElementList = opElement.iter(ParameterConf().getElementName())
396
418
397 419 for parmElement in parmElementList:
398 420 parmConfObj = ParameterConf()
399 421 parmConfObj.readXml(parmElement)
400
422
401 423 #Compatible with old signal chain version
402 424 #If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
403 425 if self.type != 'self' and self.name == 'Plot':
404 426 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
405 427 self.name = parmConfObj.value
406 428 continue
407
429
408 430 self.parmConfObjList.append(parmConfObj)
409
431
410 432 def printattr(self):
411
433
412 434 print "%s[%s]: name = %s, type = %s, priority = %s" %(self.ELEMENTNAME,
413 435 self.id,
414 436 self.name,
415 437 self.type,
416 438 self.priority)
417
439
418 440 for parmConfObj in self.parmConfObjList:
419 441 parmConfObj.printattr()
420
442
421 443 def createObject(self, plotter_queue=None):
422
444
445
423 446 if self.type == 'self':
424 447 raise ValueError, "This operation type cannot be created"
425
448
426 449 if self.type == 'plotter':
427 450 #Plotter(plotter_name)
428 451 if not plotter_queue:
429 452 raise ValueError, "plotter_queue is not defined. Use:\nmyProject = Project()\nmyProject.setPlotterQueue(plotter_queue)"
430
453
431 454 opObj = Plotter(self.name, plotter_queue)
432
455
433 456 if self.type == 'external' or self.type == 'other':
457
434 458 className = eval(self.name)
435 opObj = className()
436
459 kwargs = self.getKwargs()
460
461 opObj = className(**kwargs)
462
437 463 return opObj
438
464
465
439 466 class ProcUnitConf():
440
467
441 468 id = None
442 469 name = None
443 470 datatype = None
444 471 inputId = None
445 472 parentId = None
446
473
447 474 opConfObjList = []
448
475
449 476 procUnitObj = None
450 477 opObjList = []
451
478
452 479 ELEMENTNAME = 'ProcUnit'
453
480
454 481 def __init__(self):
455
482
456 483 self.id = None
457 484 self.datatype = None
458 485 self.name = None
459 486 self.inputId = None
460
487
461 488 self.opConfObjList = []
462
489
463 490 self.procUnitObj = None
464 491 self.opObjDict = {}
465
492
466 493 def __getPriority(self):
467
494
468 495 return len(self.opConfObjList)+1
469
496
470 497 def __getNewId(self):
471
498
472 499 return int(self.id)*10 + len(self.opConfObjList) + 1
473
500
474 501 def getElementName(self):
475
502
476 503 return self.ELEMENTNAME
477
504
478 505 def getId(self):
479
506
480 507 return self.id
481 508
482 509 def updateId(self, new_id, parentId=parentId):
483
484
510
511
485 512 new_id = int(parentId)*10 + (int(self.id) % 10)
486 513 new_inputId = int(parentId)*10 + (int(self.inputId) % 10)
487
514
488 515 #If this proc unit has not inputs
489 516 if self.inputId == '0':
490 517 new_inputId = 0
491
518
492 519 n = 1
493 520 for opConfObj in self.opConfObjList:
494
521
495 522 idOp = str(int(new_id)*10 + n)
496 523 opConfObj.updateId(idOp)
497
524
498 525 n += 1
499
526
500 527 self.parentId = str(parentId)
501 528 self.id = str(new_id)
502 529 self.inputId = str(new_inputId)
503
504
530
531
505 532 def getInputId(self):
506
533
507 534 return self.inputId
508
535
509 536 def getOperationObjList(self):
510
537
511 538 return self.opConfObjList
512
539
513 540 def getOperationObj(self, name=None):
514
541
515 542 for opConfObj in self.opConfObjList:
516
543
517 544 if opConfObj.name != name:
518 545 continue
519
546
520 547 return opConfObj
521
548
522 549 return None
523
550
524 551 def getOpObjfromParamValue(self, value=None):
525
552
526 553 for opConfObj in self.opConfObjList:
527 554 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
528 555 continue
529 556 return opConfObj
530 557 return None
531
558
532 559 def getProcUnitObj(self):
533
560
534 561 return self.procUnitObj
535
562
536 563 def setup(self, id, name, datatype, inputId, parentId=None):
537
564
538 565 #Compatible with old signal chain version
539 566 if datatype==None and name==None:
540 567 raise ValueError, "datatype or name should be defined"
541
568
542 569 if name==None:
543 570 if 'Proc' in datatype:
544 571 name = datatype
545 572 else:
546 573 name = '%sProc' %(datatype)
547
574
548 575 if datatype==None:
549 576 datatype = name.replace('Proc','')
550
577
551 578 self.id = str(id)
552 579 self.name = name
553 580 self.datatype = datatype
554 581 self.inputId = inputId
555 582 self.parentId = parentId
556
583
557 584 self.opConfObjList = []
558
585
559 586 self.addOperation(name='run', optype='self')
560
587
561 588 def removeOperations(self):
562
589
563 590 for obj in self.opConfObjList:
564 591 del obj
565
592
566 593 self.opConfObjList = []
567 594 self.addOperation(name='run')
568
595
569 596 def addParameter(self, **kwargs):
570 597 '''
571 598 Add parameters to "run" operation
572 599 '''
573 600 opObj = self.opConfObjList[0]
574
601
575 602 opObj.addParameter(**kwargs)
576
603
577 604 return opObj
578
605
579 606 def addOperation(self, name, optype='self'):
580
607
581 608 id = self.__getNewId()
582 priority = self.__getPriority()
583
609 priority = self.__getPriority()
610
584 611 opConfObj = OperationConf()
585 612 opConfObj.setup(id, name=name, priority=priority, type=optype)
586
613
587 614 self.opConfObjList.append(opConfObj)
588
615
589 616 return opConfObj
590
617
591 618 def makeXml(self, projectElement):
592
619
593 620 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
594 621 procUnitElement.set('id', str(self.id))
595 622 procUnitElement.set('name', self.name)
596 623 procUnitElement.set('datatype', self.datatype)
597 624 procUnitElement.set('inputId', str(self.inputId))
598
625
599 626 for opConfObj in self.opConfObjList:
600 627 opConfObj.makeXml(procUnitElement)
601
628
602 629 def readXml(self, upElement):
603
630
604 631 self.id = upElement.get('id')
605 632 self.name = upElement.get('name')
606 633 self.datatype = upElement.get('datatype')
607 634 self.inputId = upElement.get('inputId')
608
635
609 636 if self.ELEMENTNAME == "ReadUnit":
610 637 self.datatype = self.datatype.replace("Reader", "")
611
638
612 639 if self.ELEMENTNAME == "ProcUnit":
613 640 self.datatype = self.datatype.replace("Proc", "")
614
641
615 642 if self.inputId == 'None':
616 643 self.inputId = '0'
617
644
618 645 self.opConfObjList = []
619
646
620 647 opElementList = upElement.iter(OperationConf().getElementName())
621
648
622 649 for opElement in opElementList:
623 650 opConfObj = OperationConf()
624 651 opConfObj.readXml(opElement)
625 652 self.opConfObjList.append(opConfObj)
626
653
627 654 def printattr(self):
628
655
629 656 print "%s[%s]: name = %s, datatype = %s, inputId = %s" %(self.ELEMENTNAME,
630 657 self.id,
631 658 self.name,
632 659 self.datatype,
633 660 self.inputId)
634 661
635 662 for opConfObj in self.opConfObjList:
636 663 opConfObj.printattr()
637
664
665
666 def getKwargs(self):
667
668 opObj = self.opConfObjList[0]
669 kwargs = opObj.getKwargs()
670
671 return kwargs
672
638 673 def createObjects(self, plotter_queue=None):
639
674
640 675 className = eval(self.name)
641 procUnitObj = className()
642
676 kwargs = self.getKwargs()
677 procUnitObj = className(**kwargs)
678
643 679 for opConfObj in self.opConfObjList:
644
645 if opConfObj.type == 'self':
680
681 if opConfObj.type=='self' and self.name=='run':
646 682 continue
647
683 elif opConfObj.type=='self':
684 procUnitObj.addOperationKwargs(opConfObj.id, **opConfObj.getKwargs())
685 continue
686
648 687 opObj = opConfObj.createObject(plotter_queue)
649
688
650 689 self.opObjDict[opConfObj.id] = opObj
690
651 691 procUnitObj.addOperation(opObj, opConfObj.id)
652
692
653 693 self.procUnitObj = procUnitObj
654
694
655 695 return procUnitObj
656
696
657 697 def run(self):
658
698
659 699 is_ok = False
660
700
661 701 for opConfObj in self.opConfObjList:
662
702
663 703 kwargs = {}
664 704 for parmConfObj in opConfObj.getParameterObjList():
665 705 if opConfObj.name == 'run' and parmConfObj.name == 'datatype':
666 706 continue
667
707
668 708 kwargs[parmConfObj.name] = parmConfObj.getValue()
669
709
670 710 #ini = time.time()
671
711
672 712 #print "\tRunning the '%s' operation with %s" %(opConfObj.name, opConfObj.id)
673 713 sts = self.procUnitObj.call(opType = opConfObj.type,
674 714 opName = opConfObj.name,
675 715 opId = opConfObj.id)
676 716
677 717 # total_time = time.time() - ini
678 718 #
679 719 # if total_time > 0.002:
680 720 # print "%s::%s took %f seconds" %(self.name, opConfObj.name, total_time)
681 721
682 722 is_ok = is_ok or sts
683
723
684 724 return is_ok
685 725
686 726 def close(self):
687
727
688 728 for opConfObj in self.opConfObjList:
689 729 if opConfObj.type == 'self':
690 730 continue
691
731
692 732 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
693 733 opObj.close()
694
734
695 735 self.procUnitObj.close()
696
736
697 737 return
698 738
699 739 class ReadUnitConf(ProcUnitConf):
700
740
701 741 path = None
702 742 startDate = None
703 743 endDate = None
704 744 startTime = None
705 745 endTime = None
706
746
707 747 ELEMENTNAME = 'ReadUnit'
708
748
709 749 def __init__(self):
710
750
711 751 self.id = None
712 752 self.datatype = None
713 753 self.name = None
714 754 self.inputId = None
715
755
716 756 self.parentId = None
717
757
718 758 self.opConfObjList = []
719 759 self.opObjList = []
720
760
721 761 def getElementName(self):
722
762
723 763 return self.ELEMENTNAME
724 764
725 765 def setup(self, id, name, datatype, path='', startDate="", endDate="", startTime="",
726 766 endTime="", parentId=None, queue=None, server=None, **kwargs):
727 767 #Compatible with old signal chain version
728 768 if datatype==None and name==None:
729 769 raise ValueError, "datatype or name should be defined"
730 770
731 771 if name==None:
732 772 if 'Reader' in datatype:
733 773 name = datatype
734 774 else:
735 775 name = '%sReader' %(datatype)
736
737 776 if datatype==None:
738 777 datatype = name.replace('Reader','')
739
778
740 779 self.id = id
741 780 self.name = name
742 781 self.datatype = datatype
743
744 self.path = os.path.abspath(path)
782 if path != '':
783 self.path = os.path.abspath(path)
745 784 self.startDate = startDate
746 785 self.endDate = endDate
747 786 self.startTime = startTime
748 787 self.endTime = endTime
749
788
750 789 self.inputId = '0'
751 790 self.parentId = parentId
752
791 self.queue = queue
792 self.server = server
753 793 self.addRunOperation(**kwargs)
754
794
755 795 def update(self, datatype, path, startDate, endDate, startTime, endTime, parentId=None, name=None, **kwargs):
756 796
757 797 #Compatible with old signal chain version
758 798 if datatype==None and name==None:
759 799 raise ValueError, "datatype or name should be defined"
760
800
761 801 if name==None:
762 802 if 'Reader' in datatype:
763 803 name = datatype
764 804 else:
765 805 name = '%sReader' %(datatype)
766
806
767 807 if datatype==None:
768 808 datatype = name.replace('Reader','')
769
809
770 810 self.datatype = datatype
771 811 self.name = name
772 812 self.path = path
773 813 self.startDate = startDate
774 814 self.endDate = endDate
775 815 self.startTime = startTime
776 816 self.endTime = endTime
777
817
778 818 self.inputId = '0'
779 819 self.parentId = parentId
780
820
781 821 self.updateRunOperation(**kwargs)
782
822
783 823 def removeOperations(self):
784
824
785 825 for obj in self.opConfObjList:
786 826 del obj
787
827
788 828 self.opConfObjList = []
789
829
790 830 def addRunOperation(self, **kwargs):
791
831
792 832 opObj = self.addOperation(name = 'run', optype = 'self')
833
834 if self.server is None:
835 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
836 opObj.addParameter(name='path' , value=self.path, format='str')
837 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
838 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
839 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
840 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
841 opObj.addParameter(name='queue' , value=self.queue, format='obj')
842 for key, value in kwargs.items():
843 opObj.addParameter(name=key, value=value, format=type(value).__name__)
844 else:
845 opObj.addParameter(name='server' , value=self.server, format='str')
793 846
794 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
795 opObj.addParameter(name='path' , value=self.path, format='str')
796 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
797 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
798 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
799 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
800
801 for key, value in kwargs.items():
802 opObj.addParameter(name=key, value=value, format=type(value).__name__)
803
847
804 848 return opObj
805
849
806 850 def updateRunOperation(self, **kwargs):
807
851
808 852 opObj = self.getOperationObj(name = 'run')
809 853 opObj.removeParameters()
810
854
811 855 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
812 856 opObj.addParameter(name='path' , value=self.path, format='str')
813 857 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
814 858 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
815 859 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
816 860 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
817
861
818 862 for key, value in kwargs.items():
819 863 opObj.addParameter(name=key, value=value, format=type(value).__name__)
820
864
821 865 return opObj
822 866
823 867 # def makeXml(self, projectElement):
824 868 #
825 869 # procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
826 870 # procUnitElement.set('id', str(self.id))
827 871 # procUnitElement.set('name', self.name)
828 872 # procUnitElement.set('datatype', self.datatype)
829 873 # procUnitElement.set('inputId', str(self.inputId))
830 874 #
831 875 # for opConfObj in self.opConfObjList:
832 876 # opConfObj.makeXml(procUnitElement)
833 877
834 878 def readXml(self, upElement):
835
879
836 880 self.id = upElement.get('id')
837 881 self.name = upElement.get('name')
838 882 self.datatype = upElement.get('datatype')
839 883 self.inputId = upElement.get('inputId')
840
884
841 885 if self.ELEMENTNAME == "ReadUnit":
842 886 self.datatype = self.datatype.replace("Reader", "")
843
887
844 888 if self.inputId == 'None':
845 889 self.inputId = '0'
846
890
847 891 self.opConfObjList = []
848
892
849 893 opElementList = upElement.iter(OperationConf().getElementName())
850
894
851 895 for opElement in opElementList:
852 896 opConfObj = OperationConf()
853 897 opConfObj.readXml(opElement)
854 898 self.opConfObjList.append(opConfObj)
855
899
856 900 if opConfObj.name == 'run':
857 901 self.path = opConfObj.getParameterValue('path')
858 902 self.startDate = opConfObj.getParameterValue('startDate')
859 903 self.endDate = opConfObj.getParameterValue('endDate')
860 904 self.startTime = opConfObj.getParameterValue('startTime')
861 905 self.endTime = opConfObj.getParameterValue('endTime')
862 906
863 907 class Project(Process):
864 908 id = None
865 909 name = None
866 910 description = None
867 911 filename = None
868
912
869 913 procUnitConfObjDict = None
870
914
871 915 ELEMENTNAME = 'Project'
872
916
873 917 plotterQueue = None
874 918
875 919 def __init__(self, plotter_queue=None, logfile=None):
876 920 Process.__init__(self)
877 921 self.id = None
878 922 self.name = None
879 923 self.description = None
880 924 if logfile is not None:
881 925 logToFile(logfile)
882 926 self.plotterQueue = plotter_queue
883
927
884 928 self.procUnitConfObjDict = {}
885 929
886 930 def __getNewId(self):
887
931
888 932 idList = self.procUnitConfObjDict.keys()
889
933
890 934 id = int(self.id)*10
891
935
892 936 while True:
893 937 id += 1
894
938
895 939 if str(id) in idList:
896 940 continue
897
941
898 942 break
899
943
900 944 return str(id)
901
945
902 946 def getElementName(self):
903
947
904 948 return self.ELEMENTNAME
905 949
906 950 def getId(self):
907
951
908 952 return self.id
909
953
910 954 def updateId(self, new_id):
911
955
912 956 self.id = str(new_id)
913
957
914 958 keyList = self.procUnitConfObjDict.keys()
915 959 keyList.sort()
916
960
917 961 n = 1
918 962 newProcUnitConfObjDict = {}
919
963
920 964 for procKey in keyList:
921
965
922 966 procUnitConfObj = self.procUnitConfObjDict[procKey]
923 967 idProcUnit = str(int(self.id)*10 + n)
924 968 procUnitConfObj.updateId(idProcUnit, parentId = self.id)
925
969
926 970 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
927 971 n += 1
928
972
929 973 self.procUnitConfObjDict = newProcUnitConfObjDict
930
974
931 975 def setup(self, id, name, description):
932
976
933 977 self.id = str(id)
934 978 self.name = name
935 979 self.description = description
936 980
937 981 def update(self, name, description):
938
982
939 983 self.name = name
940 984 self.description = description
941
985
942 986 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
943 987 if id is None:
944 988 idReadUnit = self.__getNewId()
945 989 else:
946 990 idReadUnit = str(id)
947
991
948 992 readUnitConfObj = ReadUnitConf()
949 993 readUnitConfObj.setup(idReadUnit, name, datatype, parentId=self.id, **kwargs)
950
994
951 995 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
952
996
953 997 return readUnitConfObj
954
998
955 999 def addProcUnit(self, inputId='0', datatype=None, name=None):
956
1000
957 1001 idProcUnit = self.__getNewId()
958
1002
959 1003 procUnitConfObj = ProcUnitConf()
960 1004 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, parentId=self.id)
961
1005
962 1006 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
963
1007
964 1008 return procUnitConfObj
965
1009
966 1010 def removeProcUnit(self, id):
967
1011
968 1012 if id in self.procUnitConfObjDict.keys():
969 1013 self.procUnitConfObjDict.pop(id)
970
1014
971 1015 def getReadUnitId(self):
972
1016
973 1017 readUnitConfObj = self.getReadUnitObj()
974
1018
975 1019 return readUnitConfObj.id
976
1020
977 1021 def getReadUnitObj(self):
978
1022
979 1023 for obj in self.procUnitConfObjDict.values():
980 1024 if obj.getElementName() == "ReadUnit":
981 1025 return obj
982
1026
983 1027 return None
984
1028
985 1029 def getProcUnitObj(self, id=None, name=None):
986
1030
987 1031 if id != None:
988 1032 return self.procUnitConfObjDict[id]
989
1033
990 1034 if name != None:
991 1035 return self.getProcUnitObjByName(name)
992
1036
993 1037 return None
994
1038
995 1039 def getProcUnitObjByName(self, name):
996
1040
997 1041 for obj in self.procUnitConfObjDict.values():
998 1042 if obj.name == name:
999 1043 return obj
1000
1044
1001 1045 return None
1002
1046
1003 1047 def procUnitItems(self):
1004
1048
1005 1049 return self.procUnitConfObjDict.items()
1006
1007 def makeXml(self):
1008
1050
1051 def makeXml(self):
1052
1009 1053 projectElement = Element('Project')
1010 1054 projectElement.set('id', str(self.id))
1011 1055 projectElement.set('name', self.name)
1012 1056 projectElement.set('description', self.description)
1013
1057
1014 1058 for procUnitConfObj in self.procUnitConfObjDict.values():
1015 1059 procUnitConfObj.makeXml(projectElement)
1016
1060
1017 1061 self.projectElement = projectElement
1018
1062
1019 1063 def writeXml(self, filename=None):
1020
1064
1021 1065 if filename == None:
1022 1066 if self.filename:
1023 1067 filename = self.filename
1024 1068 else:
1025 1069 filename = "schain.xml"
1026
1070
1027 1071 if not filename:
1028 1072 print "filename has not been defined. Use setFilename(filename) for do it."
1029 1073 return 0
1030
1074
1031 1075 abs_file = os.path.abspath(filename)
1032
1076
1033 1077 if not os.access(os.path.dirname(abs_file), os.W_OK):
1034 1078 print "No write permission on %s" %os.path.dirname(abs_file)
1035 1079 return 0
1036
1080
1037 1081 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1038 1082 print "File %s already exists and it could not be overwriten" %abs_file
1039 1083 return 0
1040
1084
1041 1085 self.makeXml()
1042
1086
1043 1087 ElementTree(self.projectElement).write(abs_file, method='xml')
1044
1088
1045 1089 self.filename = abs_file
1046
1090
1047 1091 return 1
1048 1092
1049 1093 def readXml(self, filename = None):
1050
1094
1051 1095 if not filename:
1052 1096 print "filename is not defined"
1053 1097 return 0
1054
1098
1055 1099 abs_file = os.path.abspath(filename)
1056
1100
1057 1101 if not os.path.isfile(abs_file):
1058 1102 print "%s file does not exist" %abs_file
1059 1103 return 0
1060
1104
1061 1105 self.projectElement = None
1062 1106 self.procUnitConfObjDict = {}
1063
1107
1064 1108 try:
1065 1109 self.projectElement = ElementTree().parse(abs_file)
1066 1110 except:
1067 1111 print "Error reading %s, verify file format" %filename
1068 1112 return 0
1069
1113
1070 1114 self.project = self.projectElement.tag
1071
1115
1072 1116 self.id = self.projectElement.get('id')
1073 1117 self.name = self.projectElement.get('name')
1074 1118 self.description = self.projectElement.get('description')
1075 1119
1076 1120 readUnitElementList = self.projectElement.iter(ReadUnitConf().getElementName())
1077
1121
1078 1122 for readUnitElement in readUnitElementList:
1079 1123 readUnitConfObj = ReadUnitConf()
1080 1124 readUnitConfObj.readXml(readUnitElement)
1081
1125
1082 1126 if readUnitConfObj.parentId == None:
1083 1127 readUnitConfObj.parentId = self.id
1084
1128
1085 1129 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1086
1130
1087 1131 procUnitElementList = self.projectElement.iter(ProcUnitConf().getElementName())
1088
1132
1089 1133 for procUnitElement in procUnitElementList:
1090 1134 procUnitConfObj = ProcUnitConf()
1091 1135 procUnitConfObj.readXml(procUnitElement)
1092
1136
1093 1137 if procUnitConfObj.parentId == None:
1094 1138 procUnitConfObj.parentId = self.id
1095
1139
1096 1140 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1097
1141
1098 1142 self.filename = abs_file
1099
1143
1100 1144 return 1
1101
1145
1102 1146 def printattr(self):
1103
1147
1104 1148 print "Project[%s]: name = %s, description = %s" %(self.id,
1105 1149 self.name,
1106 1150 self.description)
1107 1151
1108 1152 for procUnitConfObj in self.procUnitConfObjDict.values():
1109 1153 procUnitConfObj.printattr()
1110
1154
1111 1155 def createObjects(self):
1112
1156
1113 1157 for procUnitConfObj in self.procUnitConfObjDict.values():
1114 1158 procUnitConfObj.createObjects(self.plotterQueue)
1115
1159
1116 1160 def __connect(self, objIN, thisObj):
1117
1161
1118 1162 thisObj.setInput(objIN.getOutputObj())
1119
1163
1120 1164 def connectObjects(self):
1121
1165
1122 1166 for thisPUConfObj in self.procUnitConfObjDict.values():
1123
1167
1124 1168 inputId = thisPUConfObj.getInputId()
1125
1169
1126 1170 if int(inputId) == 0:
1127 1171 continue
1128
1172
1129 1173 #Get input object
1130 1174 puConfINObj = self.procUnitConfObjDict[inputId]
1131 1175 puObjIN = puConfINObj.getProcUnitObj()
1132
1176
1133 1177 #Get current object
1134 1178 thisPUObj = thisPUConfObj.getProcUnitObj()
1135
1179
1136 1180 self.__connect(puObjIN, thisPUObj)
1137
1181
1138 1182 def __handleError(self, procUnitConfObj, send_email=True):
1139
1183
1140 1184 import socket
1141
1185
1142 1186 err = traceback.format_exception(sys.exc_info()[0],
1143 1187 sys.exc_info()[1],
1144 1188 sys.exc_info()[2])
1145 1189
1146 1190 print "***** Error occurred in %s *****" %(procUnitConfObj.name)
1147 1191 print "***** %s" %err[-1]
1148 1192
1149 1193 message = "".join(err)
1150
1194
1151 1195 sys.stderr.write(message)
1152
1196
1153 1197 if not send_email:
1154 1198 return
1155
1199
1156 1200 subject = "SChain v%s: Error running %s\n" %(schainpy.__version__, procUnitConfObj.name)
1157
1201
1158 1202 subtitle = "%s: %s\n" %(procUnitConfObj.getElementName() ,procUnitConfObj.name)
1159 1203 subtitle += "Hostname: %s\n" %socket.gethostbyname(socket.gethostname())
1160 1204 subtitle += "Working directory: %s\n" %os.path.abspath("./")
1161 1205 subtitle += "Configuration file: %s\n" %self.filename
1162 1206 subtitle += "Time: %s\n" %str(datetime.datetime.now())
1163
1207
1164 1208 readUnitConfObj = self.getReadUnitObj()
1165 1209 if readUnitConfObj:
1166 1210 subtitle += "\nInput parameters:\n"
1167 1211 subtitle += "[Data path = %s]\n" %readUnitConfObj.path
1168 1212 subtitle += "[Data type = %s]\n" %readUnitConfObj.datatype
1169 1213 subtitle += "[Start date = %s]\n" %readUnitConfObj.startDate
1170 1214 subtitle += "[End date = %s]\n" %readUnitConfObj.endDate
1171 1215 subtitle += "[Start time = %s]\n" %readUnitConfObj.startTime
1172 1216 subtitle += "[End time = %s]\n" %readUnitConfObj.endTime
1173
1217
1174 1218 adminObj = schainpy.admin.SchainNotify()
1175 1219 adminObj.sendAlert(message=message,
1176 1220 subject=subject,
1177 1221 subtitle=subtitle,
1178 1222 filename=self.filename)
1179 1223
1180 1224 def isPaused(self):
1181 1225 return 0
1182
1226
1183 1227 def isStopped(self):
1184 1228 return 0
1185
1229
1186 1230 def runController(self):
1187 1231 """
1188 1232 returns 0 when this process has been stopped, 1 otherwise
1189 1233 """
1190
1234
1191 1235 if self.isPaused():
1192 1236 print "Process suspended"
1193
1237
1194 1238 while True:
1195 1239 sleep(0.1)
1196
1240
1197 1241 if not self.isPaused():
1198 1242 break
1199
1243
1200 1244 if self.isStopped():
1201 1245 break
1202
1246
1203 1247 print "Process reinitialized"
1204
1248
1205 1249 if self.isStopped():
1206 1250 print "Process stopped"
1207 1251 return 0
1208
1252
1209 1253 return 1
1210 1254
1211 1255 def setFilename(self, filename):
1212
1256
1213 1257 self.filename = filename
1214
1258
1215 1259 def setPlotterQueue(self, plotter_queue):
1216
1260
1217 1261 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1218 1262
1219 1263 def getPlotterQueue(self):
1220
1264
1221 1265 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1222 1266
1223 1267 def useExternalPlotter(self):
1224
1268
1225 1269 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1226 1270
1227 1271
1228 1272 def run(self, filename=None):
1229 1273
1230 1274 # self.writeXml(filename)
1231 1275 self.createObjects()
1232 1276 self.connectObjects()
1233 1277
1234 1278 print
1235 1279 print "*"*60
1236 1280 print " Starting SIGNAL CHAIN PROCESSING v%s " %schainpy.__version__
1237 1281 print "*"*60
1238 1282 print
1239
1283
1240 1284 keyList = self.procUnitConfObjDict.keys()
1241 1285 keyList.sort()
1242
1286
1243 1287 while(True):
1244
1288
1245 1289 is_ok = False
1246
1290
1247 1291 for procKey in keyList:
1248 1292 # print "Running the '%s' process with %s" %(procUnitConfObj.name, procUnitConfObj.id)
1249
1293
1250 1294 procUnitConfObj = self.procUnitConfObjDict[procKey]
1251
1295
1252 1296 try:
1253 1297 sts = procUnitConfObj.run()
1254 1298 is_ok = is_ok or sts
1255 1299 except KeyboardInterrupt:
1256 1300 is_ok = False
1257 1301 break
1258 1302 except ValueError, e:
1259 1303 sleep(0.5)
1260 1304 self.__handleError(procUnitConfObj, send_email=True)
1261 1305 is_ok = False
1262 1306 break
1263 1307 except:
1264 1308 sleep(0.5)
1265 1309 self.__handleError(procUnitConfObj)
1266 1310 is_ok = False
1267 1311 break
1268
1312
1269 1313 #If every process unit finished so end process
1270 1314 if not(is_ok):
1271 1315 # print "Every process unit have finished"
1272 1316 break
1273 1317
1274 1318 if not self.runController():
1275 1319 break
1276
1320
1277 1321 #Closing every process
1278 1322 for procKey in keyList:
1279 1323 procUnitConfObj = self.procUnitConfObjDict[procKey]
1280 1324 procUnitConfObj.close()
General Comments 0
You need to be logged in to leave comments. Login now