##// END OF EJS Templates
exiting processes properly
José Chávez -
r924:8925c3feabcd
parent child
Show More
@@ -1,1323 +1,1321
1 1 '''
2 2 Created on September , 2012
3 3 @author:
4 4 '''
5 5
6 6 import sys
7 7 import ast
8 8 import datetime
9 9 import traceback
10 10 import math
11 11 from multiprocessing import Process, Queue, cpu_count
12 12
13 13 import schainpy
14 14 import schainpy.admin
15 15
16 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 17 from xml.dom import minidom
18 18
19 19 from schainpy.model import *
20 20 from time import sleep
21 21
22 22 def prettify(elem):
23 23 """Return a pretty-printed XML string for the Element.
24 24 """
25 25 rough_string = tostring(elem, 'utf-8')
26 26 reparsed = minidom.parseString(rough_string)
27 27 return reparsed.toprettyxml(indent=" ")
28 28
29 29 def multiSchain(child, nProcess=cpu_count(), startDate=None, endDate=None, receiver=None):
30 30 skip = 0
31 31 cursor = 0
32 32 nFiles = None
33 33 processes = []
34 34
35
36
37
35 38 dt1 = datetime.datetime.strptime(startDate, '%Y/%m/%d')
36 39 dt2 = datetime.datetime.strptime(endDate, '%Y/%m/%d')
37 40 days = (dt2 - dt1).days
38 41 print days
39 42 for day in range(days+1):
40 43 skip = 0
41 44 cursor = 0
42 45 q = Queue()
43 46 processes = []
44 47 dt = (dt1 + datetime.timedelta(day)).strftime('%Y/%m/%d')
45 48 firstProcess = Process(target=child, args=(cursor, skip, q, dt))
46 49 firstProcess.start()
47 50 nFiles = q.get()
48 51 firstProcess.terminate()
49 52 skip = int(math.ceil(nFiles/nProcess))
50 try:
51 while True:
52 processes.append(Process(target=child, args=(cursor, skip, q, dt)))
53 processes[cursor].start()
54 if nFiles < cursor*skip:
55 break
56 cursor += 1
57 except KeyboardInterrupt:
53 while True:
54 processes.append(Process(target=child, args=(cursor, skip, q, dt)))
55 processes[cursor].start()
56 if nFiles < cursor*skip:
57 break
58 cursor += 1
59
60 def beforeExit(exctype, value, trace):
58 61 for process in processes:
59 62 process.terminate()
60 63 process.join()
61 for process in processes:
62 process.join()
63 # process.terminate()
64 sleep(3)
64 sys.excepthook = beforeExit
65 65
66 try:
67 while True:
68 pass
69 except KeyboardInterrupt:
70 66 for process in processes:
71 process.terminate()
72 67 process.join()
68 process.terminate()
69
70
73 71
74 72 class ParameterConf():
75 73
76 74 id = None
77 75 name = None
78 76 value = None
79 77 format = None
80 78
81 79 __formated_value = None
82 80
83 81 ELEMENTNAME = 'Parameter'
84 82
85 83 def __init__(self):
86 84
87 85 self.format = 'str'
88 86
89 87 def getElementName(self):
90 88
91 89 return self.ELEMENTNAME
92 90
93 91 def getValue(self):
94 92
95 93 value = self.value
96 94 format = self.format
97 95
98 96 if self.__formated_value != None:
99 97
100 98 return self.__formated_value
101 99
102 100 if format == 'obj':
103 101 return value
104 102
105 103 if format == 'str':
106 104 self.__formated_value = str(value)
107 105 return self.__formated_value
108 106
109 107 if value == '':
110 108 raise ValueError, "%s: This parameter value is empty" %self.name
111 109
112 110 if format == 'list':
113 111 strList = value.split(',')
114 112
115 113 self.__formated_value = strList
116 114
117 115 return self.__formated_value
118 116
119 117 if format == 'intlist':
120 118 """
121 119 Example:
122 120 value = (0,1,2)
123 121 """
124 122
125 123 new_value = ast.literal_eval(value)
126 124
127 125 if type(new_value) not in (tuple, list):
128 126 new_value = [int(new_value)]
129 127
130 128 self.__formated_value = new_value
131 129
132 130 return self.__formated_value
133 131
134 132 if format == 'floatlist':
135 133 """
136 134 Example:
137 135 value = (0.5, 1.4, 2.7)
138 136 """
139 137
140 138 new_value = ast.literal_eval(value)
141 139
142 140 if type(new_value) not in (tuple, list):
143 141 new_value = [float(new_value)]
144 142
145 143 self.__formated_value = new_value
146 144
147 145 return self.__formated_value
148 146
149 147 if format == 'date':
150 148 strList = value.split('/')
151 149 intList = [int(x) for x in strList]
152 150 date = datetime.date(intList[0], intList[1], intList[2])
153 151
154 152 self.__formated_value = date
155 153
156 154 return self.__formated_value
157 155
158 156 if format == 'time':
159 157 strList = value.split(':')
160 158 intList = [int(x) for x in strList]
161 159 time = datetime.time(intList[0], intList[1], intList[2])
162 160
163 161 self.__formated_value = time
164 162
165 163 return self.__formated_value
166 164
167 165 if format == 'pairslist':
168 166 """
169 167 Example:
170 168 value = (0,1),(1,2)
171 169 """
172 170
173 171 new_value = ast.literal_eval(value)
174 172
175 173 if type(new_value) not in (tuple, list):
176 174 raise ValueError, "%s has to be a tuple or list of pairs" %value
177 175
178 176 if type(new_value[0]) not in (tuple, list):
179 177 if len(new_value) != 2:
180 178 raise ValueError, "%s has to be a tuple or list of pairs" %value
181 179 new_value = [new_value]
182 180
183 181 for thisPair in new_value:
184 182 if len(thisPair) != 2:
185 183 raise ValueError, "%s has to be a tuple or list of pairs" %value
186 184
187 185 self.__formated_value = new_value
188 186
189 187 return self.__formated_value
190 188
191 189 if format == 'multilist':
192 190 """
193 191 Example:
194 192 value = (0,1,2),(3,4,5)
195 193 """
196 194 multiList = ast.literal_eval(value)
197 195
198 196 if type(multiList[0]) == int:
199 197 multiList = ast.literal_eval("(" + value + ")")
200 198
201 199 self.__formated_value = multiList
202 200
203 201 return self.__formated_value
204 202
205 203 if format == 'bool':
206 204 value = int(value)
207 205
208 206 if format == 'int':
209 207 value = float(value)
210 208
211 209 format_func = eval(format)
212 210
213 211 self.__formated_value = format_func(value)
214 212
215 213 return self.__formated_value
216 214
217 215 def updateId(self, new_id):
218 216
219 217 self.id = str(new_id)
220 218
221 219 def setup(self, id, name, value, format='str'):
222 220
223 221 self.id = str(id)
224 222 self.name = name
225 223 if format == 'obj':
226 224 self.value = value
227 225 else:
228 226 self.value = str(value)
229 227 self.format = str.lower(format)
230 228
231 229 self.getValue()
232 230
233 231 return 1
234 232
235 233 def update(self, name, value, format='str'):
236 234
237 235 self.name = name
238 236 self.value = str(value)
239 237 self.format = format
240 238
241 239 def makeXml(self, opElement):
242 240 if self.name not in ('queue',):
243 241 parmElement = SubElement(opElement, self.ELEMENTNAME)
244 242 parmElement.set('id', str(self.id))
245 243 parmElement.set('name', self.name)
246 244 parmElement.set('value', self.value)
247 245 parmElement.set('format', self.format)
248 246
249 247 def readXml(self, parmElement):
250 248
251 249 self.id = parmElement.get('id')
252 250 self.name = parmElement.get('name')
253 251 self.value = parmElement.get('value')
254 252 self.format = str.lower(parmElement.get('format'))
255 253
256 254 #Compatible with old signal chain version
257 255 if self.format == 'int' and self.name == 'idfigure':
258 256 self.name = 'id'
259 257
260 258 def printattr(self):
261 259
262 260 print "Parameter[%s]: name = %s, value = %s, format = %s" %(self.id, self.name, self.value, self.format)
263 261
264 262 class OperationConf():
265 263
266 264 id = None
267 265 name = None
268 266 priority = None
269 267 type = None
270 268
271 269 parmConfObjList = []
272 270
273 271 ELEMENTNAME = 'Operation'
274 272
275 273 def __init__(self):
276 274
277 275 self.id = '0'
278 276 self.name = None
279 277 self.priority = None
280 278 self.type = 'self'
281 279
282 280
283 281 def __getNewId(self):
284 282
285 283 return int(self.id)*10 + len(self.parmConfObjList) + 1
286 284
287 285 def updateId(self, new_id):
288 286
289 287 self.id = str(new_id)
290 288
291 289 n = 1
292 290 for parmObj in self.parmConfObjList:
293 291
294 292 idParm = str(int(new_id)*10 + n)
295 293 parmObj.updateId(idParm)
296 294
297 295 n += 1
298 296
299 297 def getElementName(self):
300 298
301 299 return self.ELEMENTNAME
302 300
303 301 def getParameterObjList(self):
304 302
305 303 return self.parmConfObjList
306 304
307 305 def getParameterObj(self, parameterName):
308 306
309 307 for parmConfObj in self.parmConfObjList:
310 308
311 309 if parmConfObj.name != parameterName:
312 310 continue
313 311
314 312 return parmConfObj
315 313
316 314 return None
317 315
318 316 def getParameterObjfromValue(self, parameterValue):
319 317
320 318 for parmConfObj in self.parmConfObjList:
321 319
322 320 if parmConfObj.getValue() != parameterValue:
323 321 continue
324 322
325 323 return parmConfObj.getValue()
326 324
327 325 return None
328 326
329 327 def getParameterValue(self, parameterName):
330 328
331 329 parameterObj = self.getParameterObj(parameterName)
332 330
333 331 # if not parameterObj:
334 332 # return None
335 333
336 334 value = parameterObj.getValue()
337 335
338 336 return value
339 337
340 338
341 339 def getKwargs(self):
342 340
343 341 kwargs = {}
344 342
345 343 for parmConfObj in self.parmConfObjList:
346 344 if self.name == 'run' and parmConfObj.name == 'datatype':
347 345 continue
348 346
349 347 kwargs[parmConfObj.name] = parmConfObj.getValue()
350 348
351 349 return kwargs
352 350
353 351 def setup(self, id, name, priority, type):
354 352
355 353 self.id = str(id)
356 354 self.name = name
357 355 self.type = type
358 356 self.priority = priority
359 357
360 358 self.parmConfObjList = []
361 359
362 360 def removeParameters(self):
363 361
364 362 for obj in self.parmConfObjList:
365 363 del obj
366 364
367 365 self.parmConfObjList = []
368 366
369 367 def addParameter(self, name, value, format='str'):
370 368
371 369 id = self.__getNewId()
372 370
373 371 parmConfObj = ParameterConf()
374 372 if not parmConfObj.setup(id, name, value, format):
375 373 return None
376 374
377 375 self.parmConfObjList.append(parmConfObj)
378 376
379 377 return parmConfObj
380 378
381 379 def changeParameter(self, name, value, format='str'):
382 380
383 381 parmConfObj = self.getParameterObj(name)
384 382 parmConfObj.update(name, value, format)
385 383
386 384 return parmConfObj
387 385
388 386 def makeXml(self, procUnitElement):
389 387
390 388 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
391 389 opElement.set('id', str(self.id))
392 390 opElement.set('name', self.name)
393 391 opElement.set('type', self.type)
394 392 opElement.set('priority', str(self.priority))
395 393
396 394 for parmConfObj in self.parmConfObjList:
397 395 parmConfObj.makeXml(opElement)
398 396
399 397 def readXml(self, opElement):
400 398
401 399 self.id = opElement.get('id')
402 400 self.name = opElement.get('name')
403 401 self.type = opElement.get('type')
404 402 self.priority = opElement.get('priority')
405 403
406 404 #Compatible with old signal chain version
407 405 #Use of 'run' method instead 'init'
408 406 if self.type == 'self' and self.name == 'init':
409 407 self.name = 'run'
410 408
411 409 self.parmConfObjList = []
412 410
413 411 parmElementList = opElement.iter(ParameterConf().getElementName())
414 412
415 413 for parmElement in parmElementList:
416 414 parmConfObj = ParameterConf()
417 415 parmConfObj.readXml(parmElement)
418 416
419 417 #Compatible with old signal chain version
420 418 #If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
421 419 if self.type != 'self' and self.name == 'Plot':
422 420 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
423 421 self.name = parmConfObj.value
424 422 continue
425 423
426 424 self.parmConfObjList.append(parmConfObj)
427 425
428 426 def printattr(self):
429 427
430 428 print "%s[%s]: name = %s, type = %s, priority = %s" %(self.ELEMENTNAME,
431 429 self.id,
432 430 self.name,
433 431 self.type,
434 432 self.priority)
435 433
436 434 for parmConfObj in self.parmConfObjList:
437 435 parmConfObj.printattr()
438 436
439 437 def createObject(self, plotter_queue=None):
440 438
441
442 if self.type == 'self':
439
440 if self.type == 'self':
443 441 raise ValueError, "This operation type cannot be created"
444 442
445 443 if self.type == 'plotter':
446 444 #Plotter(plotter_name)
447 445 if not plotter_queue:
448 446 raise ValueError, "plotter_queue is not defined. Use:\nmyProject = Project()\nmyProject.setPlotterQueue(plotter_queue)"
449 447
450 448 opObj = Plotter(self.name, plotter_queue)
451 449
452 450 if self.type == 'external' or self.type == 'other':
453
451
454 452 className = eval(self.name)
455 453 kwargs = self.getKwargs()
456
454
457 455 opObj = className(**kwargs)
458 456
459 457 return opObj
460 458
461 459
462 460 class ProcUnitConf():
463 461
464 462 id = None
465 463 name = None
466 464 datatype = None
467 465 inputId = None
468 466 parentId = None
469 467
470 468 opConfObjList = []
471 469
472 470 procUnitObj = None
473 471 opObjList = []
474 472
475 473 ELEMENTNAME = 'ProcUnit'
476 474
477 475 def __init__(self):
478 476
479 477 self.id = None
480 478 self.datatype = None
481 479 self.name = None
482 480 self.inputId = None
483 481
484 482 self.opConfObjList = []
485 483
486 484 self.procUnitObj = None
487 485 self.opObjDict = {}
488 486
489 487 def __getPriority(self):
490 488
491 489 return len(self.opConfObjList)+1
492 490
493 491 def __getNewId(self):
494 492
495 493 return int(self.id)*10 + len(self.opConfObjList) + 1
496 494
497 495 def getElementName(self):
498 496
499 497 return self.ELEMENTNAME
500 498
501 499 def getId(self):
502 500
503 501 return self.id
504 502
505 503 def updateId(self, new_id, parentId=parentId):
506 504
507 505
508 506 new_id = int(parentId)*10 + (int(self.id) % 10)
509 507 new_inputId = int(parentId)*10 + (int(self.inputId) % 10)
510 508
511 509 #If this proc unit has not inputs
512 510 if self.inputId == '0':
513 511 new_inputId = 0
514 512
515 513 n = 1
516 514 for opConfObj in self.opConfObjList:
517 515
518 516 idOp = str(int(new_id)*10 + n)
519 517 opConfObj.updateId(idOp)
520 518
521 519 n += 1
522 520
523 521 self.parentId = str(parentId)
524 522 self.id = str(new_id)
525 523 self.inputId = str(new_inputId)
526 524
527 525
528 526 def getInputId(self):
529 527
530 528 return self.inputId
531 529
532 530 def getOperationObjList(self):
533 531
534 532 return self.opConfObjList
535 533
536 534 def getOperationObj(self, name=None):
537 535
538 536 for opConfObj in self.opConfObjList:
539 537
540 538 if opConfObj.name != name:
541 539 continue
542 540
543 541 return opConfObj
544 542
545 543 return None
546 544
547 545 def getOpObjfromParamValue(self, value=None):
548 546
549 547 for opConfObj in self.opConfObjList:
550 548 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
551 549 continue
552 550 return opConfObj
553 551 return None
554 552
555 553 def getProcUnitObj(self):
556 554
557 555 return self.procUnitObj
558 556
559 557 def setup(self, id, name, datatype, inputId, parentId=None):
560 558
561 559 #Compatible with old signal chain version
562 560 if datatype==None and name==None:
563 561 raise ValueError, "datatype or name should be defined"
564 562
565 563 if name==None:
566 564 if 'Proc' in datatype:
567 565 name = datatype
568 566 else:
569 567 name = '%sProc' %(datatype)
570 568
571 569 if datatype==None:
572 570 datatype = name.replace('Proc','')
573 571
574 572 self.id = str(id)
575 573 self.name = name
576 574 self.datatype = datatype
577 575 self.inputId = inputId
578 576 self.parentId = parentId
579 577
580 578 self.opConfObjList = []
581 579
582 580 self.addOperation(name='run', optype='self')
583 581
584 582 def removeOperations(self):
585 583
586 584 for obj in self.opConfObjList:
587 585 del obj
588 586
589 587 self.opConfObjList = []
590 588 self.addOperation(name='run')
591 589
592 590 def addParameter(self, **kwargs):
593 591 '''
594 592 Add parameters to "run" operation
595 593 '''
596 594 opObj = self.opConfObjList[0]
597 595
598 596 opObj.addParameter(**kwargs)
599 597
600 598 return opObj
601 599
602 600 def addOperation(self, name, optype='self'):
603 601
604 602 id = self.__getNewId()
605 603 priority = self.__getPriority()
606 604
607 605 opConfObj = OperationConf()
608 606 opConfObj.setup(id, name=name, priority=priority, type=optype)
609 607
610 608 self.opConfObjList.append(opConfObj)
611 609
612 610 return opConfObj
613 611
614 612 def makeXml(self, projectElement):
615 613
616 614 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
617 615 procUnitElement.set('id', str(self.id))
618 616 procUnitElement.set('name', self.name)
619 617 procUnitElement.set('datatype', self.datatype)
620 618 procUnitElement.set('inputId', str(self.inputId))
621 619
622 620 for opConfObj in self.opConfObjList:
623 621 opConfObj.makeXml(procUnitElement)
624 622
625 623 def readXml(self, upElement):
626 624
627 625 self.id = upElement.get('id')
628 626 self.name = upElement.get('name')
629 627 self.datatype = upElement.get('datatype')
630 628 self.inputId = upElement.get('inputId')
631 629
632 630 if self.ELEMENTNAME == "ReadUnit":
633 631 self.datatype = self.datatype.replace("Reader", "")
634 632
635 633 if self.ELEMENTNAME == "ProcUnit":
636 634 self.datatype = self.datatype.replace("Proc", "")
637 635
638 636 if self.inputId == 'None':
639 637 self.inputId = '0'
640 638
641 639 self.opConfObjList = []
642 640
643 641 opElementList = upElement.iter(OperationConf().getElementName())
644 642
645 643 for opElement in opElementList:
646 644 opConfObj = OperationConf()
647 645 opConfObj.readXml(opElement)
648 646 self.opConfObjList.append(opConfObj)
649 647
650 648 def printattr(self):
651 649
652 650 print "%s[%s]: name = %s, datatype = %s, inputId = %s" %(self.ELEMENTNAME,
653 651 self.id,
654 652 self.name,
655 653 self.datatype,
656 654 self.inputId)
657 655
658 656 for opConfObj in self.opConfObjList:
659 657 opConfObj.printattr()
660 658
661 659
662 660 def getKwargs(self):
663 661
664 662 opObj = self.opConfObjList[0]
665 663 kwargs = opObj.getKwargs()
666 664
667 665 return kwargs
668 666
669 667 def createObjects(self, plotter_queue=None):
670 668
671 669 className = eval(self.name)
672 670 kwargs = self.getKwargs()
673 671 procUnitObj = className(**kwargs)
674 672
675 for opConfObj in self.opConfObjList:
676
673 for opConfObj in self.opConfObjList:
674
677 675 if opConfObj.type=='self' and self.name=='run':
678 676 continue
679 elif opConfObj.type=='self':
677 elif opConfObj.type=='self':
680 678 procUnitObj.addOperationKwargs(opConfObj.id, **opConfObj.getKwargs())
681 679 continue
682 680
683 681 opObj = opConfObj.createObject(plotter_queue)
684 682
685 683 self.opObjDict[opConfObj.id] = opObj
686
684
687 685 procUnitObj.addOperation(opObj, opConfObj.id)
688 686
689 687 self.procUnitObj = procUnitObj
690 688
691 689 return procUnitObj
692 690
693 691 def run(self):
694 692
695 693 is_ok = False
696 694
697 695 for opConfObj in self.opConfObjList:
698 696
699 697 kwargs = {}
700 698 for parmConfObj in opConfObj.getParameterObjList():
701 699 if opConfObj.name == 'run' and parmConfObj.name == 'datatype':
702 700 continue
703 701
704 702 kwargs[parmConfObj.name] = parmConfObj.getValue()
705 703
706 704 #ini = time.time()
707 705
708 706 #print "\tRunning the '%s' operation with %s" %(opConfObj.name, opConfObj.id)
709 707 sts = self.procUnitObj.call(opType = opConfObj.type,
710 708 opName = opConfObj.name,
711 709 opId = opConfObj.id,
712 710 )
713 711
714 712 # total_time = time.time() - ini
715 713 #
716 714 # if total_time > 0.002:
717 715 # print "%s::%s took %f seconds" %(self.name, opConfObj.name, total_time)
718 716
719 717 is_ok = is_ok or sts
720 718
721 719 return is_ok
722 720
723 721 def close(self):
724 722
725 723 for opConfObj in self.opConfObjList:
726 724 if opConfObj.type == 'self':
727 725 continue
728 726
729 727 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
730 728 opObj.close()
731 729
732 730 self.procUnitObj.close()
733 731
734 732 return
735 733
736 734 class ReadUnitConf(ProcUnitConf):
737 735
738 736 path = None
739 737 startDate = None
740 738 endDate = None
741 739 startTime = None
742 740 endTime = None
743 741
744 742 ELEMENTNAME = 'ReadUnit'
745 743
746 744 def __init__(self):
747 745
748 746 self.id = None
749 747 self.datatype = None
750 748 self.name = None
751 749 self.inputId = None
752 750
753 751 self.parentId = None
754 752
755 753 self.opConfObjList = []
756 754 self.opObjList = []
757 755
758 756 def getElementName(self):
759 757
760 758 return self.ELEMENTNAME
761 759
762 760 def setup(self, id, name, datatype, path, startDate="", endDate="", startTime="", endTime="", parentId=None, queue=None, **kwargs):
763 761
764 762 #Compatible with old signal chain version
765 763 if datatype==None and name==None:
766 764 raise ValueError, "datatype or name should be defined"
767 765
768 766 if name==None:
769 767 if 'Reader' in datatype:
770 768 name = datatype
771 769 else:
772 770 name = '%sReader' %(datatype)
773 771
774 772 if datatype==None:
775 773 datatype = name.replace('Reader','')
776 774
777 775 self.id = id
778 776 self.name = name
779 777 self.datatype = datatype
780 778
781 779 self.path = os.path.abspath(path)
782 780 self.startDate = startDate
783 781 self.endDate = endDate
784 782 self.startTime = startTime
785 783 self.endTime = endTime
786 784
787 785 self.inputId = '0'
788 786 self.parentId = parentId
789 787 self.queue = queue
790 788 self.addRunOperation(**kwargs)
791 789
792 790 def update(self, datatype, path, startDate, endDate, startTime, endTime, parentId=None, name=None, **kwargs):
793 791
794 792 #Compatible with old signal chain version
795 793 if datatype==None and name==None:
796 794 raise ValueError, "datatype or name should be defined"
797 795
798 796 if name==None:
799 797 if 'Reader' in datatype:
800 798 name = datatype
801 799 else:
802 800 name = '%sReader' %(datatype)
803 801
804 802 if datatype==None:
805 803 datatype = name.replace('Reader','')
806 804
807 805 self.datatype = datatype
808 806 self.name = name
809 807 self.path = path
810 808 self.startDate = startDate
811 809 self.endDate = endDate
812 810 self.startTime = startTime
813 811 self.endTime = endTime
814 812
815 813 self.inputId = '0'
816 814 self.parentId = parentId
817 815
818 816 self.updateRunOperation(**kwargs)
819 817
820 818 def removeOperations(self):
821 819
822 820 for obj in self.opConfObjList:
823 821 del obj
824 822
825 823 self.opConfObjList = []
826 824
827 825 def addRunOperation(self, **kwargs):
828 826
829 827 opObj = self.addOperation(name = 'run', optype = 'self')
830 828
831 829 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
832 830 opObj.addParameter(name='path' , value=self.path, format='str')
833 831 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
834 832 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
835 833 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
836 834 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
837 835 opObj.addParameter(name='queue' , value=self.queue, format='obj')
838 836
839 837 for key, value in kwargs.items():
840 838 opObj.addParameter(name=key, value=value, format=type(value).__name__)
841 839
842 840 return opObj
843 841
844 842 def updateRunOperation(self, **kwargs):
845 843
846 844 opObj = self.getOperationObj(name = 'run')
847 845 opObj.removeParameters()
848 846
849 847 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
850 848 opObj.addParameter(name='path' , value=self.path, format='str')
851 849 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
852 850 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
853 851 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
854 852 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
855 853
856 854 for key, value in kwargs.items():
857 855 opObj.addParameter(name=key, value=value, format=type(value).__name__)
858 856
859 857 return opObj
860 858
861 859 # def makeXml(self, projectElement):
862 860 #
863 861 # procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
864 862 # procUnitElement.set('id', str(self.id))
865 863 # procUnitElement.set('name', self.name)
866 864 # procUnitElement.set('datatype', self.datatype)
867 865 # procUnitElement.set('inputId', str(self.inputId))
868 866 #
869 867 # for opConfObj in self.opConfObjList:
870 868 # opConfObj.makeXml(procUnitElement)
871 869
872 870 def readXml(self, upElement):
873 871
874 872 self.id = upElement.get('id')
875 873 self.name = upElement.get('name')
876 874 self.datatype = upElement.get('datatype')
877 875 self.inputId = upElement.get('inputId')
878 876
879 877 if self.ELEMENTNAME == "ReadUnit":
880 878 self.datatype = self.datatype.replace("Reader", "")
881 879
882 880 if self.inputId == 'None':
883 881 self.inputId = '0'
884 882
885 883 self.opConfObjList = []
886 884
887 885 opElementList = upElement.iter(OperationConf().getElementName())
888 886
889 887 for opElement in opElementList:
890 888 opConfObj = OperationConf()
891 889 opConfObj.readXml(opElement)
892 890 self.opConfObjList.append(opConfObj)
893 891
894 892 if opConfObj.name == 'run':
895 893 self.path = opConfObj.getParameterValue('path')
896 894 self.startDate = opConfObj.getParameterValue('startDate')
897 895 self.endDate = opConfObj.getParameterValue('endDate')
898 896 self.startTime = opConfObj.getParameterValue('startTime')
899 897 self.endTime = opConfObj.getParameterValue('endTime')
900 898
901 899 class Project():
902 900
903 901 id = None
904 902 name = None
905 903 description = None
906 904 filename = None
907 905
908 906 procUnitConfObjDict = None
909 907
910 908 ELEMENTNAME = 'Project'
911 909
912 910 plotterQueue = None
913 911
914 912 def __init__(self, plotter_queue=None):
915 913
916 914 self.id = None
917 915 self.name = None
918 916 self.description = None
919 917
920 918 self.plotterQueue = plotter_queue
921 919
922 920 self.procUnitConfObjDict = {}
923 921
924 922 def __getNewId(self):
925 923
926 924 idList = self.procUnitConfObjDict.keys()
927 925
928 926 id = int(self.id)*10
929 927
930 928 while True:
931 929 id += 1
932 930
933 931 if str(id) in idList:
934 932 continue
935 933
936 934 break
937 935
938 936 return str(id)
939 937
940 938 def getElementName(self):
941 939
942 940 return self.ELEMENTNAME
943 941
944 942 def getId(self):
945 943
946 944 return self.id
947 945
948 946 def updateId(self, new_id):
949 947
950 948 self.id = str(new_id)
951 949
952 950 keyList = self.procUnitConfObjDict.keys()
953 951 keyList.sort()
954 952
955 953 n = 1
956 954 newProcUnitConfObjDict = {}
957 955
958 956 for procKey in keyList:
959 957
960 958 procUnitConfObj = self.procUnitConfObjDict[procKey]
961 959 idProcUnit = str(int(self.id)*10 + n)
962 960 procUnitConfObj.updateId(idProcUnit, parentId = self.id)
963 961
964 962 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
965 963 n += 1
966 964
967 965 self.procUnitConfObjDict = newProcUnitConfObjDict
968 966
969 967 def setup(self, id, name, description):
970 968
971 969 self.id = str(id)
972 970 self.name = name
973 971 self.description = description
974 972
975 973 def update(self, name, description):
976 974
977 975 self.name = name
978 976 self.description = description
979 977
980 978 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
981 979
982 980 if id is None:
983 981 idReadUnit = self.__getNewId()
984 982 else:
985 983 idReadUnit = str(id)
986 984
987 985 readUnitConfObj = ReadUnitConf()
988 986 readUnitConfObj.setup(idReadUnit, name, datatype, parentId=self.id, **kwargs)
989 987
990 988 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
991 989
992 990 return readUnitConfObj
993 991
994 992 def addProcUnit(self, inputId='0', datatype=None, name=None):
995 993
996 994 idProcUnit = self.__getNewId()
997 995
998 996 procUnitConfObj = ProcUnitConf()
999 997 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, parentId=self.id)
1000 998
1001 999 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1002 1000
1003 1001 return procUnitConfObj
1004 1002
1005 1003 def removeProcUnit(self, id):
1006 1004
1007 1005 if id in self.procUnitConfObjDict.keys():
1008 1006 self.procUnitConfObjDict.pop(id)
1009 1007
1010 1008 def getReadUnitId(self):
1011 1009
1012 1010 readUnitConfObj = self.getReadUnitObj()
1013 1011
1014 1012 return readUnitConfObj.id
1015 1013
1016 1014 def getReadUnitObj(self):
1017 1015
1018 1016 for obj in self.procUnitConfObjDict.values():
1019 1017 if obj.getElementName() == "ReadUnit":
1020 1018 return obj
1021 1019
1022 1020 return None
1023 1021
1024 1022 def getProcUnitObj(self, id=None, name=None):
1025 1023
1026 1024 if id != None:
1027 1025 return self.procUnitConfObjDict[id]
1028 1026
1029 1027 if name != None:
1030 1028 return self.getProcUnitObjByName(name)
1031 1029
1032 1030 return None
1033 1031
1034 1032 def getProcUnitObjByName(self, name):
1035 1033
1036 1034 for obj in self.procUnitConfObjDict.values():
1037 1035 if obj.name == name:
1038 1036 return obj
1039 1037
1040 1038 return None
1041 1039
1042 1040 def procUnitItems(self):
1043 1041
1044 1042 return self.procUnitConfObjDict.items()
1045 1043
1046 1044 def makeXml(self):
1047 1045
1048 1046 projectElement = Element('Project')
1049 1047 projectElement.set('id', str(self.id))
1050 1048 projectElement.set('name', self.name)
1051 1049 projectElement.set('description', self.description)
1052 1050
1053 1051 for procUnitConfObj in self.procUnitConfObjDict.values():
1054 1052 procUnitConfObj.makeXml(projectElement)
1055 1053
1056 1054 self.projectElement = projectElement
1057 1055
1058 1056 def writeXml(self, filename=None):
1059 1057
1060 1058 if filename == None:
1061 1059 if self.filename:
1062 1060 filename = self.filename
1063 1061 else:
1064 1062 filename = "schain.xml"
1065 1063
1066 1064 if not filename:
1067 1065 print "filename has not been defined. Use setFilename(filename) for do it."
1068 1066 return 0
1069 1067
1070 1068 abs_file = os.path.abspath(filename)
1071 1069
1072 1070 if not os.access(os.path.dirname(abs_file), os.W_OK):
1073 1071 print "No write permission on %s" %os.path.dirname(abs_file)
1074 1072 return 0
1075 1073
1076 1074 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1077 1075 print "File %s already exists and it could not be overwriten" %abs_file
1078 1076 return 0
1079 1077
1080 1078 self.makeXml()
1081 1079
1082 1080 ElementTree(self.projectElement).write(abs_file, method='xml')
1083 1081
1084 1082 self.filename = abs_file
1085 1083
1086 1084 return 1
1087 1085
1088 1086 def readXml(self, filename = None):
1089 1087
1090 1088 if not filename:
1091 1089 print "filename is not defined"
1092 1090 return 0
1093 1091
1094 1092 abs_file = os.path.abspath(filename)
1095 1093
1096 1094 if not os.path.isfile(abs_file):
1097 1095 print "%s file does not exist" %abs_file
1098 1096 return 0
1099 1097
1100 1098 self.projectElement = None
1101 1099 self.procUnitConfObjDict = {}
1102 1100
1103 1101 try:
1104 1102 self.projectElement = ElementTree().parse(abs_file)
1105 1103 except:
1106 1104 print "Error reading %s, verify file format" %filename
1107 1105 return 0
1108 1106
1109 1107 self.project = self.projectElement.tag
1110 1108
1111 1109 self.id = self.projectElement.get('id')
1112 1110 self.name = self.projectElement.get('name')
1113 1111 self.description = self.projectElement.get('description')
1114 1112
1115 1113 readUnitElementList = self.projectElement.iter(ReadUnitConf().getElementName())
1116 1114
1117 1115 for readUnitElement in readUnitElementList:
1118 1116 readUnitConfObj = ReadUnitConf()
1119 1117 readUnitConfObj.readXml(readUnitElement)
1120 1118
1121 1119 if readUnitConfObj.parentId == None:
1122 1120 readUnitConfObj.parentId = self.id
1123 1121
1124 1122 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1125 1123
1126 1124 procUnitElementList = self.projectElement.iter(ProcUnitConf().getElementName())
1127 1125
1128 1126 for procUnitElement in procUnitElementList:
1129 1127 procUnitConfObj = ProcUnitConf()
1130 1128 procUnitConfObj.readXml(procUnitElement)
1131 1129
1132 1130 if procUnitConfObj.parentId == None:
1133 1131 procUnitConfObj.parentId = self.id
1134 1132
1135 1133 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1136 1134
1137 1135 self.filename = abs_file
1138 1136
1139 1137 return 1
1140 1138
1141 1139 def printattr(self):
1142 1140
1143 1141 print "Project[%s]: name = %s, description = %s" %(self.id,
1144 1142 self.name,
1145 1143 self.description)
1146 1144
1147 1145 for procUnitConfObj in self.procUnitConfObjDict.values():
1148 1146 procUnitConfObj.printattr()
1149 1147
1150 1148 def createObjects(self):
1151 1149
1152 1150 for procUnitConfObj in self.procUnitConfObjDict.values():
1153 1151 procUnitConfObj.createObjects(self.plotterQueue)
1154 1152
1155 1153 def __connect(self, objIN, thisObj):
1156 1154
1157 1155 thisObj.setInput(objIN.getOutputObj())
1158 1156
1159 1157 def connectObjects(self):
1160 1158
1161 1159 for thisPUConfObj in self.procUnitConfObjDict.values():
1162 1160
1163 1161 inputId = thisPUConfObj.getInputId()
1164 1162
1165 1163 if int(inputId) == 0:
1166 1164 continue
1167 1165
1168 1166 #Get input object
1169 1167 puConfINObj = self.procUnitConfObjDict[inputId]
1170 1168 puObjIN = puConfINObj.getProcUnitObj()
1171 1169
1172 1170 #Get current object
1173 1171 thisPUObj = thisPUConfObj.getProcUnitObj()
1174 1172
1175 1173 self.__connect(puObjIN, thisPUObj)
1176 1174
1177 1175 def __handleError(self, procUnitConfObj, send_email=True):
1178 1176
1179 1177 import socket
1180 1178
1181 1179 err = traceback.format_exception(sys.exc_info()[0],
1182 1180 sys.exc_info()[1],
1183 1181 sys.exc_info()[2])
1184 1182
1185 1183 print "***** Error occurred in %s *****" %(procUnitConfObj.name)
1186 1184 print "***** %s" %err[-1]
1187 1185
1188 1186 message = "".join(err)
1189 1187
1190 1188 sys.stderr.write(message)
1191 1189
1192 1190 if not send_email:
1193 1191 return
1194 1192
1195 1193 subject = "SChain v%s: Error running %s\n" %(schainpy.__version__, procUnitConfObj.name)
1196 1194
1197 1195 subtitle = "%s: %s\n" %(procUnitConfObj.getElementName() ,procUnitConfObj.name)
1198 1196 subtitle += "Hostname: %s\n" %socket.gethostbyname(socket.gethostname())
1199 1197 subtitle += "Working directory: %s\n" %os.path.abspath("./")
1200 1198 subtitle += "Configuration file: %s\n" %self.filename
1201 1199 subtitle += "Time: %s\n" %str(datetime.datetime.now())
1202 1200
1203 1201 readUnitConfObj = self.getReadUnitObj()
1204 1202 if readUnitConfObj:
1205 1203 subtitle += "\nInput parameters:\n"
1206 1204 subtitle += "[Data path = %s]\n" %readUnitConfObj.path
1207 1205 subtitle += "[Data type = %s]\n" %readUnitConfObj.datatype
1208 1206 subtitle += "[Start date = %s]\n" %readUnitConfObj.startDate
1209 1207 subtitle += "[End date = %s]\n" %readUnitConfObj.endDate
1210 1208 subtitle += "[Start time = %s]\n" %readUnitConfObj.startTime
1211 1209 subtitle += "[End time = %s]\n" %readUnitConfObj.endTime
1212 1210
1213 1211 adminObj = schainpy.admin.SchainNotify()
1214 1212 adminObj.sendAlert(message=message,
1215 1213 subject=subject,
1216 1214 subtitle=subtitle,
1217 1215 filename=self.filename)
1218 1216
1219 1217 def isPaused(self):
1220 1218 return 0
1221 1219
1222 1220 def isStopped(self):
1223 1221 return 0
1224 1222
1225 1223 def runController(self):
1226 1224 """
1227 1225 returns 0 when this process has been stopped, 1 otherwise
1228 1226 """
1229 1227
1230 1228 if self.isPaused():
1231 1229 print "Process suspended"
1232 1230
1233 1231 while True:
1234 1232 sleep(0.1)
1235 1233
1236 1234 if not self.isPaused():
1237 1235 break
1238 1236
1239 1237 if self.isStopped():
1240 1238 break
1241 1239
1242 1240 print "Process reinitialized"
1243 1241
1244 1242 if self.isStopped():
1245 1243 print "Process stopped"
1246 1244 return 0
1247 1245
1248 1246 return 1
1249 1247
1250 1248 def setFilename(self, filename):
1251 1249
1252 1250 self.filename = filename
1253 1251
1254 1252 def setPlotterQueue(self, plotter_queue):
1255 1253
1256 1254 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1257 1255
1258 1256 def getPlotterQueue(self):
1259 1257
1260 1258 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1261 1259
1262 1260 def useExternalPlotter(self):
1263 1261
1264 1262 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1265 1263
1266 1264 def run(self):
1267 1265
1268 1266 print
1269 1267 print "*"*60
1270 1268 print " Starting SIGNAL CHAIN PROCESSING v%s " %schainpy.__version__
1271 1269 print "*"*60
1272 1270 print
1273 1271
1274 1272 keyList = self.procUnitConfObjDict.keys()
1275 1273 keyList.sort()
1276 1274
1277 1275 while(True):
1278 1276
1279 1277 is_ok = False
1280 1278
1281 1279 for procKey in keyList:
1282 1280 # print "Running the '%s' process with %s" %(procUnitConfObj.name, procUnitConfObj.id)
1283 1281
1284 1282 procUnitConfObj = self.procUnitConfObjDict[procKey]
1285 1283
1286 1284 try:
1287 1285 sts = procUnitConfObj.run()
1288 1286 is_ok = is_ok or sts
1289 1287 except KeyboardInterrupt:
1290 1288 is_ok = False
1291 1289 break
1292 1290 except ValueError, e:
1293 1291 sleep(0.5)
1294 1292 self.__handleError(procUnitConfObj, send_email=True)
1295 1293 is_ok = False
1296 1294 break
1297 1295 except:
1298 1296 sleep(0.5)
1299 1297 self.__handleError(procUnitConfObj)
1300 1298 is_ok = False
1301 1299 break
1302 1300
1303 1301 #If every process unit finished so end process
1304 1302 if not(is_ok):
1305 1303 # print "Every process unit have finished"
1306 1304 break
1307 1305
1308 1306 if not self.runController():
1309 1307 break
1310 1308
1311 1309 #Closing every process
1312 1310 for procKey in keyList:
1313 1311 procUnitConfObj = self.procUnitConfObjDict[procKey]
1314 1312 procUnitConfObj.close()
1315 1313
1316 1314 print "Process finished"
1317 1315
1318 1316 def start(self):
1319 1317
1320 1318 self.writeXml()
1321 1319 self.createObjects()
1322 1320 self.connectObjects()
1323 1321 self.run()
@@ -1,1 +1,1
1 <Project description="Segundo Test" id="191" name="test01"><ProcUnit datatype="ReceiverData" id="1911" inputId="0" name="ReceiverData"><Operation id="19111" name="run" priority="1" type="self"><Parameter format="bool" id="191111" name="realtime" value="1" /><Parameter format="str" id="191112" name="plottypes" value="rti" /><Parameter format="str" id="191113" name="plot_server" value="tcp://10.10.10.82:7000" /></Operation></ProcUnit></Project> No newline at end of file
1 <Project description="HF_EXAMPLE" id="191" name="test01"><ReadUnit datatype="SpectraReader" id="1911" inputId="0" name="SpectraReader"><Operation id="19111" name="run" priority="1" type="self"><Parameter format="str" id="191111" name="datatype" value="SpectraReader" /><Parameter format="str" id="191112" name="path" value="/home/nanosat/data/hysell_data20/pdata" /><Parameter format="date" id="191113" name="startDate" value="2015/09/26" /><Parameter format="date" id="191114" name="endDate" value="2015/09/26" /><Parameter format="time" id="191115" name="startTime" value="00:00:00" /><Parameter format="time" id="191116" name="endTime" value="23:59:59" /><Parameter format="int" id="191118" name="cursor" value="33" /><Parameter format="int" id="191119" name="skip" value="22" /><Parameter format="int" id="191120" name="delay" value="10" /><Parameter format="int" id="191121" name="walk" value="1" /><Parameter format="int" id="191122" name="online" value="0" /></Operation></ReadUnit><ProcUnit datatype="Spectra" id="1912" inputId="1911" name="SpectraProc"><Operation id="19121" name="run" priority="1" type="self" /><Operation id="19122" name="PublishData" priority="2" type="other"><Parameter format="int" id="191221" name="zeromq" value="1" /><Parameter format="int" id="191222" name="delay" value="1" /></Operation></ProcUnit></Project> No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now