##// END OF EJS Templates
controller.py
José Chávez -
r1060:70b07edd23c2
parent child
Show More
@@ -1,1280 +1,1324
1 1 '''
2 2 Created on September , 2012
3 3 @author:
4 4 '''
5 5
6 6 import sys
7 7 import ast
8 8 import datetime
9 9 import traceback
10 import math
11 import time
12 from multiprocessing import Process, Queue, cpu_count
13
10 14 import schainpy
11 15 import schainpy.admin
12 16 from schainpy.utils.log import logToFile
13 17
14 18 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
15 19 from xml.dom import minidom
16 20
17 from multiprocessing import cpu_count
18 21 from schainpy.model import *
19 22 from time import sleep
20 23
21 24
25
22 26 def prettify(elem):
23 27 """Return a pretty-printed XML string for the Element.
24 28 """
25 29 rough_string = tostring(elem, 'utf-8')
26 30 reparsed = minidom.parseString(rough_string)
27 31 return reparsed.toprettyxml(indent=" ")
28 32
29 33 def multiSchain(child, nProcess=cpu_count(), startDate=None, endDate=None, by_day=False):
30 34 skip = 0
31 35 cursor = 0
32 36 nFiles = None
33 37 processes = []
34 38 dt1 = datetime.datetime.strptime(startDate, '%Y/%m/%d')
35 39 dt2 = datetime.datetime.strptime(endDate, '%Y/%m/%d')
36 40 days = (dt2 - dt1).days
37 41
38 42 for day in range(days+1):
39 43 skip = 0
40 44 cursor = 0
41 45 q = Queue()
42 46 processes = []
43 47 dt = (dt1 + datetime.timedelta(day)).strftime('%Y/%m/%d')
44 48 firstProcess = Process(target=child, args=(cursor, skip, q, dt))
45 49 firstProcess.start()
46 50 if by_day:
47 51 continue
48 52 nFiles = q.get()
49 53 if nFiles==0:
50 54 continue
51 55 firstProcess.terminate()
52 56 skip = int(math.ceil(nFiles/nProcess))
53 57 while True:
54 58 processes.append(Process(target=child, args=(cursor, skip, q, dt)))
55 59 processes[cursor].start()
56 60 if nFiles < cursor*skip:
57 61 break
58 62 cursor += 1
59 63
60 64 def beforeExit(exctype, value, trace):
61 65 for process in processes:
62 66 process.terminate()
63 67 process.join()
64 68 print traceback.print_tb(trace)
65 69
66 70 sys.excepthook = beforeExit
67 71
68 72 for process in processes:
69 73 process.join()
70 74 process.terminate()
71 75
72 76 time.sleep(3)
73 77
74 78
75 79 class ParameterConf():
76 80
77 81 id = None
78 82 name = None
79 83 value = None
80 84 format = None
81 85
82 86 __formated_value = None
83 87
84 88 ELEMENTNAME = 'Parameter'
85 89
86 90 def __init__(self):
87 91
88 92 self.format = 'str'
89 93
90 94 def getElementName(self):
91 95
92 96 return self.ELEMENTNAME
93 97
94 98 def getValue(self):
95 99
96 100 value = self.value
97 101 format = self.format
98 102
99 103 if self.__formated_value != None:
100 104
101 105 return self.__formated_value
102 106
107 if format == 'obj':
108 return value
109
103 110 if format == 'str':
104 111 self.__formated_value = str(value)
105 112 return self.__formated_value
106 113
107 114 if value == '':
108 115 raise ValueError, "%s: This parameter value is empty" %self.name
109 116
110 117 if format == 'list':
111 118 strList = value.split(',')
112 119
113 120 self.__formated_value = strList
114 121
115 122 return self.__formated_value
116 123
117 124 if format == 'intlist':
118 125 """
119 126 Example:
120 127 value = (0,1,2)
121 128 """
122 129
123 130 new_value = ast.literal_eval(value)
124 131
125 132 if type(new_value) not in (tuple, list):
126 133 new_value = [int(new_value)]
127 134
128 135 self.__formated_value = new_value
129 136
130 137 return self.__formated_value
131 138
132 139 if format == 'floatlist':
133 140 """
134 141 Example:
135 142 value = (0.5, 1.4, 2.7)
136 143 """
137 144
138 145 new_value = ast.literal_eval(value)
139 146
140 147 if type(new_value) not in (tuple, list):
141 148 new_value = [float(new_value)]
142 149
143 150 self.__formated_value = new_value
144 151
145 152 return self.__formated_value
146 153
147 154 if format == 'date':
148 155 strList = value.split('/')
149 156 intList = [int(x) for x in strList]
150 157 date = datetime.date(intList[0], intList[1], intList[2])
151 158
152 159 self.__formated_value = date
153 160
154 161 return self.__formated_value
155 162
156 163 if format == 'time':
157 164 strList = value.split(':')
158 165 intList = [int(x) for x in strList]
159 166 time = datetime.time(intList[0], intList[1], intList[2])
160 167
161 168 self.__formated_value = time
162 169
163 170 return self.__formated_value
164 171
165 172 if format == 'pairslist':
166 173 """
167 174 Example:
168 175 value = (0,1),(1,2)
169 176 """
170 177
171 178 new_value = ast.literal_eval(value)
172 179
173 180 if type(new_value) not in (tuple, list):
174 181 raise ValueError, "%s has to be a tuple or list of pairs" %value
175 182
176 183 if type(new_value[0]) not in (tuple, list):
177 184 if len(new_value) != 2:
178 185 raise ValueError, "%s has to be a tuple or list of pairs" %value
179 186 new_value = [new_value]
180 187
181 188 for thisPair in new_value:
182 189 if len(thisPair) != 2:
183 190 raise ValueError, "%s has to be a tuple or list of pairs" %value
184 191
185 192 self.__formated_value = new_value
186 193
187 194 return self.__formated_value
188 195
189 196 if format == 'multilist':
190 197 """
191 198 Example:
192 199 value = (0,1,2),(3,4,5)
193 200 """
194 201 multiList = ast.literal_eval(value)
195 202
196 203 if type(multiList[0]) == int:
197 204 multiList = ast.literal_eval("(" + value + ")")
198 205
199 206 self.__formated_value = multiList
200 207
201 208 return self.__formated_value
202 209
203 210 if format == 'bool':
204 211 value = int(value)
205 212
206 213 if format == 'int':
207 214 value = float(value)
208 215
209 216 format_func = eval(format)
210 217
211 218 self.__formated_value = format_func(value)
212 219
213 220 return self.__formated_value
214 221
215 222 def updateId(self, new_id):
216 223
217 224 self.id = str(new_id)
218 225
219 226 def setup(self, id, name, value, format='str'):
220
221 227 self.id = str(id)
222 228 self.name = name
229 if format == 'obj':
230 self.value = value
231 else:
223 232 self.value = str(value)
224 233 self.format = str.lower(format)
225 234
226 235 self.getValue()
227 236
228 237 return 1
229 238
230 239 def update(self, name, value, format='str'):
231 240
232 241 self.name = name
233 242 self.value = str(value)
234 243 self.format = format
235 244
236 245 def makeXml(self, opElement):
237
246 if self.name not in ('queue',):
238 247 parmElement = SubElement(opElement, self.ELEMENTNAME)
239 248 parmElement.set('id', str(self.id))
240 249 parmElement.set('name', self.name)
241 250 parmElement.set('value', self.value)
242 251 parmElement.set('format', self.format)
243 252
244 253 def readXml(self, parmElement):
245 254
246 255 self.id = parmElement.get('id')
247 256 self.name = parmElement.get('name')
248 257 self.value = parmElement.get('value')
249 258 self.format = str.lower(parmElement.get('format'))
250 259
251 260 #Compatible with old signal chain version
252 261 if self.format == 'int' and self.name == 'idfigure':
253 262 self.name = 'id'
254 263
255 264 def printattr(self):
256 265
257 266 print "Parameter[%s]: name = %s, value = %s, format = %s" %(self.id, self.name, self.value, self.format)
258 267
259 268 class OperationConf():
260 269
261 270 id = None
262 271 name = None
263 272 priority = None
264 273 type = None
265 274
266 275 parmConfObjList = []
267 276
268 277 ELEMENTNAME = 'Operation'
269 278
270 279 def __init__(self):
271 280
272 281 self.id = '0'
273 282 self.name = None
274 283 self.priority = None
275 284 self.type = 'self'
276 285
277 286
278 287 def __getNewId(self):
279 288
280 289 return int(self.id)*10 + len(self.parmConfObjList) + 1
281 290
282 291 def updateId(self, new_id):
283 292
284 293 self.id = str(new_id)
285 294
286 295 n = 1
287 296 for parmObj in self.parmConfObjList:
288 297
289 298 idParm = str(int(new_id)*10 + n)
290 299 parmObj.updateId(idParm)
291 300
292 301 n += 1
293 302
294 303 def getElementName(self):
295 304
296 305 return self.ELEMENTNAME
297 306
298 307 def getParameterObjList(self):
299 308
300 309 return self.parmConfObjList
301 310
302 311 def getParameterObj(self, parameterName):
303 312
304 313 for parmConfObj in self.parmConfObjList:
305 314
306 315 if parmConfObj.name != parameterName:
307 316 continue
308 317
309 318 return parmConfObj
310 319
311 320 return None
312 321
313 322 def getParameterObjfromValue(self, parameterValue):
314 323
315 324 for parmConfObj in self.parmConfObjList:
316 325
317 326 if parmConfObj.getValue() != parameterValue:
318 327 continue
319 328
320 329 return parmConfObj.getValue()
321 330
322 331 return None
323 332
324 333 def getParameterValue(self, parameterName):
325 334
326 335 parameterObj = self.getParameterObj(parameterName)
327 336
328 337 # if not parameterObj:
329 338 # return None
330 339
331 340 value = parameterObj.getValue()
332 341
333 342 return value
334 343
344
345 def getKwargs(self):
346
347 kwargs = {}
348
349 for parmConfObj in self.parmConfObjList:
350 if self.name == 'run' and parmConfObj.name == 'datatype':
351 continue
352
353 kwargs[parmConfObj.name] = parmConfObj.getValue()
354
355 return kwargs
356
335 357 def setup(self, id, name, priority, type):
336 358
337 359 self.id = str(id)
338 360 self.name = name
339 361 self.type = type
340 362 self.priority = priority
341 363
342 364 self.parmConfObjList = []
343 365
344 366 def removeParameters(self):
345 367
346 368 for obj in self.parmConfObjList:
347 369 del obj
348 370
349 371 self.parmConfObjList = []
350 372
351 373 def addParameter(self, name, value, format='str'):
352 374
353 375 id = self.__getNewId()
354 376
355 377 parmConfObj = ParameterConf()
356 378 if not parmConfObj.setup(id, name, value, format):
357 379 return None
358 380
359 381 self.parmConfObjList.append(parmConfObj)
360 382
361 383 return parmConfObj
362 384
363 385 def changeParameter(self, name, value, format='str'):
364 386
365 387 parmConfObj = self.getParameterObj(name)
366 388 parmConfObj.update(name, value, format)
367 389
368 390 return parmConfObj
369 391
370 392 def makeXml(self, procUnitElement):
371 393
372 394 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
373 395 opElement.set('id', str(self.id))
374 396 opElement.set('name', self.name)
375 397 opElement.set('type', self.type)
376 398 opElement.set('priority', str(self.priority))
377 399
378 400 for parmConfObj in self.parmConfObjList:
379 401 parmConfObj.makeXml(opElement)
380 402
381 403 def readXml(self, opElement):
382 404
383 405 self.id = opElement.get('id')
384 406 self.name = opElement.get('name')
385 407 self.type = opElement.get('type')
386 408 self.priority = opElement.get('priority')
387 409
388 410 #Compatible with old signal chain version
389 411 #Use of 'run' method instead 'init'
390 412 if self.type == 'self' and self.name == 'init':
391 413 self.name = 'run'
392 414
393 415 self.parmConfObjList = []
394 416
395 417 parmElementList = opElement.iter(ParameterConf().getElementName())
396 418
397 419 for parmElement in parmElementList:
398 420 parmConfObj = ParameterConf()
399 421 parmConfObj.readXml(parmElement)
400 422
401 423 #Compatible with old signal chain version
402 424 #If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
403 425 if self.type != 'self' and self.name == 'Plot':
404 426 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
405 427 self.name = parmConfObj.value
406 428 continue
407 429
408 430 self.parmConfObjList.append(parmConfObj)
409 431
410 432 def printattr(self):
411 433
412 434 print "%s[%s]: name = %s, type = %s, priority = %s" %(self.ELEMENTNAME,
413 435 self.id,
414 436 self.name,
415 437 self.type,
416 438 self.priority)
417 439
418 440 for parmConfObj in self.parmConfObjList:
419 441 parmConfObj.printattr()
420 442
421 443 def createObject(self, plotter_queue=None):
422 444
445
423 446 if self.type == 'self':
424 447 raise ValueError, "This operation type cannot be created"
425 448
426 449 if self.type == 'plotter':
427 450 #Plotter(plotter_name)
428 451 if not plotter_queue:
429 452 raise ValueError, "plotter_queue is not defined. Use:\nmyProject = Project()\nmyProject.setPlotterQueue(plotter_queue)"
430 453
431 454 opObj = Plotter(self.name, plotter_queue)
432 455
433 456 if self.type == 'external' or self.type == 'other':
457
434 458 className = eval(self.name)
435 opObj = className()
459 kwargs = self.getKwargs()
460
461 opObj = className(**kwargs)
436 462
437 463 return opObj
438 464
465
439 466 class ProcUnitConf():
440 467
441 468 id = None
442 469 name = None
443 470 datatype = None
444 471 inputId = None
445 472 parentId = None
446 473
447 474 opConfObjList = []
448 475
449 476 procUnitObj = None
450 477 opObjList = []
451 478
452 479 ELEMENTNAME = 'ProcUnit'
453 480
454 481 def __init__(self):
455 482
456 483 self.id = None
457 484 self.datatype = None
458 485 self.name = None
459 486 self.inputId = None
460 487
461 488 self.opConfObjList = []
462 489
463 490 self.procUnitObj = None
464 491 self.opObjDict = {}
465 492
466 493 def __getPriority(self):
467 494
468 495 return len(self.opConfObjList)+1
469 496
470 497 def __getNewId(self):
471 498
472 499 return int(self.id)*10 + len(self.opConfObjList) + 1
473 500
474 501 def getElementName(self):
475 502
476 503 return self.ELEMENTNAME
477 504
478 505 def getId(self):
479 506
480 507 return self.id
481 508
482 509 def updateId(self, new_id, parentId=parentId):
483 510
484 511
485 512 new_id = int(parentId)*10 + (int(self.id) % 10)
486 513 new_inputId = int(parentId)*10 + (int(self.inputId) % 10)
487 514
488 515 #If this proc unit has not inputs
489 516 if self.inputId == '0':
490 517 new_inputId = 0
491 518
492 519 n = 1
493 520 for opConfObj in self.opConfObjList:
494 521
495 522 idOp = str(int(new_id)*10 + n)
496 523 opConfObj.updateId(idOp)
497 524
498 525 n += 1
499 526
500 527 self.parentId = str(parentId)
501 528 self.id = str(new_id)
502 529 self.inputId = str(new_inputId)
503 530
504 531
505 532 def getInputId(self):
506 533
507 534 return self.inputId
508 535
509 536 def getOperationObjList(self):
510 537
511 538 return self.opConfObjList
512 539
513 540 def getOperationObj(self, name=None):
514 541
515 542 for opConfObj in self.opConfObjList:
516 543
517 544 if opConfObj.name != name:
518 545 continue
519 546
520 547 return opConfObj
521 548
522 549 return None
523 550
524 551 def getOpObjfromParamValue(self, value=None):
525 552
526 553 for opConfObj in self.opConfObjList:
527 554 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
528 555 continue
529 556 return opConfObj
530 557 return None
531 558
532 559 def getProcUnitObj(self):
533 560
534 561 return self.procUnitObj
535 562
536 563 def setup(self, id, name, datatype, inputId, parentId=None):
537 564
538 565 #Compatible with old signal chain version
539 566 if datatype==None and name==None:
540 567 raise ValueError, "datatype or name should be defined"
541 568
542 569 if name==None:
543 570 if 'Proc' in datatype:
544 571 name = datatype
545 572 else:
546 573 name = '%sProc' %(datatype)
547 574
548 575 if datatype==None:
549 576 datatype = name.replace('Proc','')
550 577
551 578 self.id = str(id)
552 579 self.name = name
553 580 self.datatype = datatype
554 581 self.inputId = inputId
555 582 self.parentId = parentId
556 583
557 584 self.opConfObjList = []
558 585
559 586 self.addOperation(name='run', optype='self')
560 587
561 588 def removeOperations(self):
562 589
563 590 for obj in self.opConfObjList:
564 591 del obj
565 592
566 593 self.opConfObjList = []
567 594 self.addOperation(name='run')
568 595
569 596 def addParameter(self, **kwargs):
570 597 '''
571 598 Add parameters to "run" operation
572 599 '''
573 600 opObj = self.opConfObjList[0]
574 601
575 602 opObj.addParameter(**kwargs)
576 603
577 604 return opObj
578 605
579 606 def addOperation(self, name, optype='self'):
580 607
581 608 id = self.__getNewId()
582 609 priority = self.__getPriority()
583 610
584 611 opConfObj = OperationConf()
585 612 opConfObj.setup(id, name=name, priority=priority, type=optype)
586 613
587 614 self.opConfObjList.append(opConfObj)
588 615
589 616 return opConfObj
590 617
591 618 def makeXml(self, projectElement):
592 619
593 620 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
594 621 procUnitElement.set('id', str(self.id))
595 622 procUnitElement.set('name', self.name)
596 623 procUnitElement.set('datatype', self.datatype)
597 624 procUnitElement.set('inputId', str(self.inputId))
598 625
599 626 for opConfObj in self.opConfObjList:
600 627 opConfObj.makeXml(procUnitElement)
601 628
602 629 def readXml(self, upElement):
603 630
604 631 self.id = upElement.get('id')
605 632 self.name = upElement.get('name')
606 633 self.datatype = upElement.get('datatype')
607 634 self.inputId = upElement.get('inputId')
608 635
609 636 if self.ELEMENTNAME == "ReadUnit":
610 637 self.datatype = self.datatype.replace("Reader", "")
611 638
612 639 if self.ELEMENTNAME == "ProcUnit":
613 640 self.datatype = self.datatype.replace("Proc", "")
614 641
615 642 if self.inputId == 'None':
616 643 self.inputId = '0'
617 644
618 645 self.opConfObjList = []
619 646
620 647 opElementList = upElement.iter(OperationConf().getElementName())
621 648
622 649 for opElement in opElementList:
623 650 opConfObj = OperationConf()
624 651 opConfObj.readXml(opElement)
625 652 self.opConfObjList.append(opConfObj)
626 653
627 654 def printattr(self):
628 655
629 656 print "%s[%s]: name = %s, datatype = %s, inputId = %s" %(self.ELEMENTNAME,
630 657 self.id,
631 658 self.name,
632 659 self.datatype,
633 660 self.inputId)
634 661
635 662 for opConfObj in self.opConfObjList:
636 663 opConfObj.printattr()
637 664
665
666 def getKwargs(self):
667
668 opObj = self.opConfObjList[0]
669 kwargs = opObj.getKwargs()
670
671 return kwargs
672
638 673 def createObjects(self, plotter_queue=None):
639 674
640 675 className = eval(self.name)
641 procUnitObj = className()
676 kwargs = self.getKwargs()
677 procUnitObj = className(**kwargs)
642 678
643 679 for opConfObj in self.opConfObjList:
644 680
645 if opConfObj.type == 'self':
681 if opConfObj.type=='self' and self.name=='run':
682 continue
683 elif opConfObj.type=='self':
684 procUnitObj.addOperationKwargs(opConfObj.id, **opConfObj.getKwargs())
646 685 continue
647 686
648 687 opObj = opConfObj.createObject(plotter_queue)
649 688
650 689 self.opObjDict[opConfObj.id] = opObj
690
651 691 procUnitObj.addOperation(opObj, opConfObj.id)
652 692
653 693 self.procUnitObj = procUnitObj
654 694
655 695 return procUnitObj
656 696
657 697 def run(self):
658 698
659 699 is_ok = False
660 700
661 701 for opConfObj in self.opConfObjList:
662 702
663 703 kwargs = {}
664 704 for parmConfObj in opConfObj.getParameterObjList():
665 705 if opConfObj.name == 'run' and parmConfObj.name == 'datatype':
666 706 continue
667 707
668 708 kwargs[parmConfObj.name] = parmConfObj.getValue()
669 709
670 710 #ini = time.time()
671 711
672 712 #print "\tRunning the '%s' operation with %s" %(opConfObj.name, opConfObj.id)
673 713 sts = self.procUnitObj.call(opType = opConfObj.type,
674 714 opName = opConfObj.name,
675 715 opId = opConfObj.id)
676 716
677 717 # total_time = time.time() - ini
678 718 #
679 719 # if total_time > 0.002:
680 720 # print "%s::%s took %f seconds" %(self.name, opConfObj.name, total_time)
681 721
682 722 is_ok = is_ok or sts
683 723
684 724 return is_ok
685 725
686 726 def close(self):
687 727
688 728 for opConfObj in self.opConfObjList:
689 729 if opConfObj.type == 'self':
690 730 continue
691 731
692 732 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
693 733 opObj.close()
694 734
695 735 self.procUnitObj.close()
696 736
697 737 return
698 738
699 739 class ReadUnitConf(ProcUnitConf):
700 740
701 741 path = None
702 742 startDate = None
703 743 endDate = None
704 744 startTime = None
705 745 endTime = None
706 746
707 747 ELEMENTNAME = 'ReadUnit'
708 748
709 749 def __init__(self):
710 750
711 751 self.id = None
712 752 self.datatype = None
713 753 self.name = None
714 754 self.inputId = None
715 755
716 756 self.parentId = None
717 757
718 758 self.opConfObjList = []
719 759 self.opObjList = []
720 760
721 761 def getElementName(self):
722 762
723 763 return self.ELEMENTNAME
724 764
725 765 def setup(self, id, name, datatype, path='', startDate="", endDate="", startTime="",
726 766 endTime="", parentId=None, queue=None, server=None, **kwargs):
727 767 #Compatible with old signal chain version
728 768 if datatype==None and name==None:
729 769 raise ValueError, "datatype or name should be defined"
730 770
731 771 if name==None:
732 772 if 'Reader' in datatype:
733 773 name = datatype
734 774 else:
735 775 name = '%sReader' %(datatype)
736
737 776 if datatype==None:
738 777 datatype = name.replace('Reader','')
739 778
740 779 self.id = id
741 780 self.name = name
742 781 self.datatype = datatype
743
782 if path != '':
744 783 self.path = os.path.abspath(path)
745 784 self.startDate = startDate
746 785 self.endDate = endDate
747 786 self.startTime = startTime
748 787 self.endTime = endTime
749 788
750 789 self.inputId = '0'
751 790 self.parentId = parentId
752
791 self.queue = queue
792 self.server = server
753 793 self.addRunOperation(**kwargs)
754 794
755 795 def update(self, datatype, path, startDate, endDate, startTime, endTime, parentId=None, name=None, **kwargs):
756 796
757 797 #Compatible with old signal chain version
758 798 if datatype==None and name==None:
759 799 raise ValueError, "datatype or name should be defined"
760 800
761 801 if name==None:
762 802 if 'Reader' in datatype:
763 803 name = datatype
764 804 else:
765 805 name = '%sReader' %(datatype)
766 806
767 807 if datatype==None:
768 808 datatype = name.replace('Reader','')
769 809
770 810 self.datatype = datatype
771 811 self.name = name
772 812 self.path = path
773 813 self.startDate = startDate
774 814 self.endDate = endDate
775 815 self.startTime = startTime
776 816 self.endTime = endTime
777 817
778 818 self.inputId = '0'
779 819 self.parentId = parentId
780 820
781 821 self.updateRunOperation(**kwargs)
782 822
783 823 def removeOperations(self):
784 824
785 825 for obj in self.opConfObjList:
786 826 del obj
787 827
788 828 self.opConfObjList = []
789 829
790 830 def addRunOperation(self, **kwargs):
791 831
792 832 opObj = self.addOperation(name = 'run', optype = 'self')
793 833
834 if self.server is None:
794 835 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
795 836 opObj.addParameter(name='path' , value=self.path, format='str')
796 837 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
797 838 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
798 839 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
799 840 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
800
841 opObj.addParameter(name='queue' , value=self.queue, format='obj')
801 842 for key, value in kwargs.items():
802 843 opObj.addParameter(name=key, value=value, format=type(value).__name__)
844 else:
845 opObj.addParameter(name='server' , value=self.server, format='str')
846
803 847
804 848 return opObj
805 849
806 850 def updateRunOperation(self, **kwargs):
807 851
808 852 opObj = self.getOperationObj(name = 'run')
809 853 opObj.removeParameters()
810 854
811 855 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
812 856 opObj.addParameter(name='path' , value=self.path, format='str')
813 857 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
814 858 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
815 859 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
816 860 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
817 861
818 862 for key, value in kwargs.items():
819 863 opObj.addParameter(name=key, value=value, format=type(value).__name__)
820 864
821 865 return opObj
822 866
823 867 # def makeXml(self, projectElement):
824 868 #
825 869 # procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
826 870 # procUnitElement.set('id', str(self.id))
827 871 # procUnitElement.set('name', self.name)
828 872 # procUnitElement.set('datatype', self.datatype)
829 873 # procUnitElement.set('inputId', str(self.inputId))
830 874 #
831 875 # for opConfObj in self.opConfObjList:
832 876 # opConfObj.makeXml(procUnitElement)
833 877
834 878 def readXml(self, upElement):
835 879
836 880 self.id = upElement.get('id')
837 881 self.name = upElement.get('name')
838 882 self.datatype = upElement.get('datatype')
839 883 self.inputId = upElement.get('inputId')
840 884
841 885 if self.ELEMENTNAME == "ReadUnit":
842 886 self.datatype = self.datatype.replace("Reader", "")
843 887
844 888 if self.inputId == 'None':
845 889 self.inputId = '0'
846 890
847 891 self.opConfObjList = []
848 892
849 893 opElementList = upElement.iter(OperationConf().getElementName())
850 894
851 895 for opElement in opElementList:
852 896 opConfObj = OperationConf()
853 897 opConfObj.readXml(opElement)
854 898 self.opConfObjList.append(opConfObj)
855 899
856 900 if opConfObj.name == 'run':
857 901 self.path = opConfObj.getParameterValue('path')
858 902 self.startDate = opConfObj.getParameterValue('startDate')
859 903 self.endDate = opConfObj.getParameterValue('endDate')
860 904 self.startTime = opConfObj.getParameterValue('startTime')
861 905 self.endTime = opConfObj.getParameterValue('endTime')
862 906
863 907 class Project(Process):
864 908 id = None
865 909 name = None
866 910 description = None
867 911 filename = None
868 912
869 913 procUnitConfObjDict = None
870 914
871 915 ELEMENTNAME = 'Project'
872 916
873 917 plotterQueue = None
874 918
875 919 def __init__(self, plotter_queue=None, logfile=None):
876 920 Process.__init__(self)
877 921 self.id = None
878 922 self.name = None
879 923 self.description = None
880 924 if logfile is not None:
881 925 logToFile(logfile)
882 926 self.plotterQueue = plotter_queue
883 927
884 928 self.procUnitConfObjDict = {}
885 929
886 930 def __getNewId(self):
887 931
888 932 idList = self.procUnitConfObjDict.keys()
889 933
890 934 id = int(self.id)*10
891 935
892 936 while True:
893 937 id += 1
894 938
895 939 if str(id) in idList:
896 940 continue
897 941
898 942 break
899 943
900 944 return str(id)
901 945
902 946 def getElementName(self):
903 947
904 948 return self.ELEMENTNAME
905 949
906 950 def getId(self):
907 951
908 952 return self.id
909 953
910 954 def updateId(self, new_id):
911 955
912 956 self.id = str(new_id)
913 957
914 958 keyList = self.procUnitConfObjDict.keys()
915 959 keyList.sort()
916 960
917 961 n = 1
918 962 newProcUnitConfObjDict = {}
919 963
920 964 for procKey in keyList:
921 965
922 966 procUnitConfObj = self.procUnitConfObjDict[procKey]
923 967 idProcUnit = str(int(self.id)*10 + n)
924 968 procUnitConfObj.updateId(idProcUnit, parentId = self.id)
925 969
926 970 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
927 971 n += 1
928 972
929 973 self.procUnitConfObjDict = newProcUnitConfObjDict
930 974
931 975 def setup(self, id, name, description):
932 976
933 977 self.id = str(id)
934 978 self.name = name
935 979 self.description = description
936 980
937 981 def update(self, name, description):
938 982
939 983 self.name = name
940 984 self.description = description
941 985
942 986 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
943 987 if id is None:
944 988 idReadUnit = self.__getNewId()
945 989 else:
946 990 idReadUnit = str(id)
947 991
948 992 readUnitConfObj = ReadUnitConf()
949 993 readUnitConfObj.setup(idReadUnit, name, datatype, parentId=self.id, **kwargs)
950 994
951 995 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
952 996
953 997 return readUnitConfObj
954 998
955 999 def addProcUnit(self, inputId='0', datatype=None, name=None):
956 1000
957 1001 idProcUnit = self.__getNewId()
958 1002
959 1003 procUnitConfObj = ProcUnitConf()
960 1004 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, parentId=self.id)
961 1005
962 1006 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
963 1007
964 1008 return procUnitConfObj
965 1009
966 1010 def removeProcUnit(self, id):
967 1011
968 1012 if id in self.procUnitConfObjDict.keys():
969 1013 self.procUnitConfObjDict.pop(id)
970 1014
971 1015 def getReadUnitId(self):
972 1016
973 1017 readUnitConfObj = self.getReadUnitObj()
974 1018
975 1019 return readUnitConfObj.id
976 1020
977 1021 def getReadUnitObj(self):
978 1022
979 1023 for obj in self.procUnitConfObjDict.values():
980 1024 if obj.getElementName() == "ReadUnit":
981 1025 return obj
982 1026
983 1027 return None
984 1028
985 1029 def getProcUnitObj(self, id=None, name=None):
986 1030
987 1031 if id != None:
988 1032 return self.procUnitConfObjDict[id]
989 1033
990 1034 if name != None:
991 1035 return self.getProcUnitObjByName(name)
992 1036
993 1037 return None
994 1038
995 1039 def getProcUnitObjByName(self, name):
996 1040
997 1041 for obj in self.procUnitConfObjDict.values():
998 1042 if obj.name == name:
999 1043 return obj
1000 1044
1001 1045 return None
1002 1046
1003 1047 def procUnitItems(self):
1004 1048
1005 1049 return self.procUnitConfObjDict.items()
1006 1050
1007 1051 def makeXml(self):
1008 1052
1009 1053 projectElement = Element('Project')
1010 1054 projectElement.set('id', str(self.id))
1011 1055 projectElement.set('name', self.name)
1012 1056 projectElement.set('description', self.description)
1013 1057
1014 1058 for procUnitConfObj in self.procUnitConfObjDict.values():
1015 1059 procUnitConfObj.makeXml(projectElement)
1016 1060
1017 1061 self.projectElement = projectElement
1018 1062
1019 1063 def writeXml(self, filename=None):
1020 1064
1021 1065 if filename == None:
1022 1066 if self.filename:
1023 1067 filename = self.filename
1024 1068 else:
1025 1069 filename = "schain.xml"
1026 1070
1027 1071 if not filename:
1028 1072 print "filename has not been defined. Use setFilename(filename) for do it."
1029 1073 return 0
1030 1074
1031 1075 abs_file = os.path.abspath(filename)
1032 1076
1033 1077 if not os.access(os.path.dirname(abs_file), os.W_OK):
1034 1078 print "No write permission on %s" %os.path.dirname(abs_file)
1035 1079 return 0
1036 1080
1037 1081 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1038 1082 print "File %s already exists and it could not be overwriten" %abs_file
1039 1083 return 0
1040 1084
1041 1085 self.makeXml()
1042 1086
1043 1087 ElementTree(self.projectElement).write(abs_file, method='xml')
1044 1088
1045 1089 self.filename = abs_file
1046 1090
1047 1091 return 1
1048 1092
1049 1093 def readXml(self, filename = None):
1050 1094
1051 1095 if not filename:
1052 1096 print "filename is not defined"
1053 1097 return 0
1054 1098
1055 1099 abs_file = os.path.abspath(filename)
1056 1100
1057 1101 if not os.path.isfile(abs_file):
1058 1102 print "%s file does not exist" %abs_file
1059 1103 return 0
1060 1104
1061 1105 self.projectElement = None
1062 1106 self.procUnitConfObjDict = {}
1063 1107
1064 1108 try:
1065 1109 self.projectElement = ElementTree().parse(abs_file)
1066 1110 except:
1067 1111 print "Error reading %s, verify file format" %filename
1068 1112 return 0
1069 1113
1070 1114 self.project = self.projectElement.tag
1071 1115
1072 1116 self.id = self.projectElement.get('id')
1073 1117 self.name = self.projectElement.get('name')
1074 1118 self.description = self.projectElement.get('description')
1075 1119
1076 1120 readUnitElementList = self.projectElement.iter(ReadUnitConf().getElementName())
1077 1121
1078 1122 for readUnitElement in readUnitElementList:
1079 1123 readUnitConfObj = ReadUnitConf()
1080 1124 readUnitConfObj.readXml(readUnitElement)
1081 1125
1082 1126 if readUnitConfObj.parentId == None:
1083 1127 readUnitConfObj.parentId = self.id
1084 1128
1085 1129 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1086 1130
1087 1131 procUnitElementList = self.projectElement.iter(ProcUnitConf().getElementName())
1088 1132
1089 1133 for procUnitElement in procUnitElementList:
1090 1134 procUnitConfObj = ProcUnitConf()
1091 1135 procUnitConfObj.readXml(procUnitElement)
1092 1136
1093 1137 if procUnitConfObj.parentId == None:
1094 1138 procUnitConfObj.parentId = self.id
1095 1139
1096 1140 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1097 1141
1098 1142 self.filename = abs_file
1099 1143
1100 1144 return 1
1101 1145
1102 1146 def printattr(self):
1103 1147
1104 1148 print "Project[%s]: name = %s, description = %s" %(self.id,
1105 1149 self.name,
1106 1150 self.description)
1107 1151
1108 1152 for procUnitConfObj in self.procUnitConfObjDict.values():
1109 1153 procUnitConfObj.printattr()
1110 1154
1111 1155 def createObjects(self):
1112 1156
1113 1157 for procUnitConfObj in self.procUnitConfObjDict.values():
1114 1158 procUnitConfObj.createObjects(self.plotterQueue)
1115 1159
1116 1160 def __connect(self, objIN, thisObj):
1117 1161
1118 1162 thisObj.setInput(objIN.getOutputObj())
1119 1163
1120 1164 def connectObjects(self):
1121 1165
1122 1166 for thisPUConfObj in self.procUnitConfObjDict.values():
1123 1167
1124 1168 inputId = thisPUConfObj.getInputId()
1125 1169
1126 1170 if int(inputId) == 0:
1127 1171 continue
1128 1172
1129 1173 #Get input object
1130 1174 puConfINObj = self.procUnitConfObjDict[inputId]
1131 1175 puObjIN = puConfINObj.getProcUnitObj()
1132 1176
1133 1177 #Get current object
1134 1178 thisPUObj = thisPUConfObj.getProcUnitObj()
1135 1179
1136 1180 self.__connect(puObjIN, thisPUObj)
1137 1181
1138 1182 def __handleError(self, procUnitConfObj, send_email=True):
1139 1183
1140 1184 import socket
1141 1185
1142 1186 err = traceback.format_exception(sys.exc_info()[0],
1143 1187 sys.exc_info()[1],
1144 1188 sys.exc_info()[2])
1145 1189
1146 1190 print "***** Error occurred in %s *****" %(procUnitConfObj.name)
1147 1191 print "***** %s" %err[-1]
1148 1192
1149 1193 message = "".join(err)
1150 1194
1151 1195 sys.stderr.write(message)
1152 1196
1153 1197 if not send_email:
1154 1198 return
1155 1199
1156 1200 subject = "SChain v%s: Error running %s\n" %(schainpy.__version__, procUnitConfObj.name)
1157 1201
1158 1202 subtitle = "%s: %s\n" %(procUnitConfObj.getElementName() ,procUnitConfObj.name)
1159 1203 subtitle += "Hostname: %s\n" %socket.gethostbyname(socket.gethostname())
1160 1204 subtitle += "Working directory: %s\n" %os.path.abspath("./")
1161 1205 subtitle += "Configuration file: %s\n" %self.filename
1162 1206 subtitle += "Time: %s\n" %str(datetime.datetime.now())
1163 1207
1164 1208 readUnitConfObj = self.getReadUnitObj()
1165 1209 if readUnitConfObj:
1166 1210 subtitle += "\nInput parameters:\n"
1167 1211 subtitle += "[Data path = %s]\n" %readUnitConfObj.path
1168 1212 subtitle += "[Data type = %s]\n" %readUnitConfObj.datatype
1169 1213 subtitle += "[Start date = %s]\n" %readUnitConfObj.startDate
1170 1214 subtitle += "[End date = %s]\n" %readUnitConfObj.endDate
1171 1215 subtitle += "[Start time = %s]\n" %readUnitConfObj.startTime
1172 1216 subtitle += "[End time = %s]\n" %readUnitConfObj.endTime
1173 1217
1174 1218 adminObj = schainpy.admin.SchainNotify()
1175 1219 adminObj.sendAlert(message=message,
1176 1220 subject=subject,
1177 1221 subtitle=subtitle,
1178 1222 filename=self.filename)
1179 1223
1180 1224 def isPaused(self):
1181 1225 return 0
1182 1226
1183 1227 def isStopped(self):
1184 1228 return 0
1185 1229
1186 1230 def runController(self):
1187 1231 """
1188 1232 returns 0 when this process has been stopped, 1 otherwise
1189 1233 """
1190 1234
1191 1235 if self.isPaused():
1192 1236 print "Process suspended"
1193 1237
1194 1238 while True:
1195 1239 sleep(0.1)
1196 1240
1197 1241 if not self.isPaused():
1198 1242 break
1199 1243
1200 1244 if self.isStopped():
1201 1245 break
1202 1246
1203 1247 print "Process reinitialized"
1204 1248
1205 1249 if self.isStopped():
1206 1250 print "Process stopped"
1207 1251 return 0
1208 1252
1209 1253 return 1
1210 1254
1211 1255 def setFilename(self, filename):
1212 1256
1213 1257 self.filename = filename
1214 1258
1215 1259 def setPlotterQueue(self, plotter_queue):
1216 1260
1217 1261 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1218 1262
1219 1263 def getPlotterQueue(self):
1220 1264
1221 1265 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1222 1266
1223 1267 def useExternalPlotter(self):
1224 1268
1225 1269 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1226 1270
1227 1271
1228 1272 def run(self, filename=None):
1229 1273
1230 1274 # self.writeXml(filename)
1231 1275 self.createObjects()
1232 1276 self.connectObjects()
1233 1277
1234 1278 print
1235 1279 print "*"*60
1236 1280 print " Starting SIGNAL CHAIN PROCESSING v%s " %schainpy.__version__
1237 1281 print "*"*60
1238 1282 print
1239 1283
1240 1284 keyList = self.procUnitConfObjDict.keys()
1241 1285 keyList.sort()
1242 1286
1243 1287 while(True):
1244 1288
1245 1289 is_ok = False
1246 1290
1247 1291 for procKey in keyList:
1248 1292 # print "Running the '%s' process with %s" %(procUnitConfObj.name, procUnitConfObj.id)
1249 1293
1250 1294 procUnitConfObj = self.procUnitConfObjDict[procKey]
1251 1295
1252 1296 try:
1253 1297 sts = procUnitConfObj.run()
1254 1298 is_ok = is_ok or sts
1255 1299 except KeyboardInterrupt:
1256 1300 is_ok = False
1257 1301 break
1258 1302 except ValueError, e:
1259 1303 sleep(0.5)
1260 1304 self.__handleError(procUnitConfObj, send_email=True)
1261 1305 is_ok = False
1262 1306 break
1263 1307 except:
1264 1308 sleep(0.5)
1265 1309 self.__handleError(procUnitConfObj)
1266 1310 is_ok = False
1267 1311 break
1268 1312
1269 1313 #If every process unit finished so end process
1270 1314 if not(is_ok):
1271 1315 # print "Every process unit have finished"
1272 1316 break
1273 1317
1274 1318 if not self.runController():
1275 1319 break
1276 1320
1277 1321 #Closing every process
1278 1322 for procKey in keyList:
1279 1323 procUnitConfObj = self.procUnitConfObjDict[procKey]
1280 1324 procUnitConfObj.close()
General Comments 0
You need to be logged in to leave comments. Login now