##// END OF EJS Templates
nada
José Chávez -
r925:8616baf196ba
parent child
Show More
@@ -1,1321 +1,1323
1 1 '''
2 2 Created on September , 2012
3 3 @author:
4 4 '''
5 5
6 6 import sys
7 7 import ast
8 8 import datetime
9 9 import traceback
10 10 import math
11 11 from multiprocessing import Process, Queue, cpu_count
12 12
13 13 import schainpy
14 14 import schainpy.admin
15 15
16 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 17 from xml.dom import minidom
18 18
19 19 from schainpy.model import *
20 20 from time import sleep
21 21
22 22 def prettify(elem):
23 23 """Return a pretty-printed XML string for the Element.
24 24 """
25 25 rough_string = tostring(elem, 'utf-8')
26 26 reparsed = minidom.parseString(rough_string)
27 27 return reparsed.toprettyxml(indent=" ")
28 28
29 29 def multiSchain(child, nProcess=cpu_count(), startDate=None, endDate=None, receiver=None):
30 30 skip = 0
31 31 cursor = 0
32 32 nFiles = None
33 33 processes = []
34 34
35 35
36 36
37 37
38 38 dt1 = datetime.datetime.strptime(startDate, '%Y/%m/%d')
39 39 dt2 = datetime.datetime.strptime(endDate, '%Y/%m/%d')
40 40 days = (dt2 - dt1).days
41 41 print days
42 42 for day in range(days+1):
43 43 skip = 0
44 44 cursor = 0
45 45 q = Queue()
46 46 processes = []
47 47 dt = (dt1 + datetime.timedelta(day)).strftime('%Y/%m/%d')
48 48 firstProcess = Process(target=child, args=(cursor, skip, q, dt))
49 49 firstProcess.start()
50 50 nFiles = q.get()
51 51 firstProcess.terminate()
52 52 skip = int(math.ceil(nFiles/nProcess))
53 53 while True:
54 54 processes.append(Process(target=child, args=(cursor, skip, q, dt)))
55 55 processes[cursor].start()
56 56 if nFiles < cursor*skip:
57 57 break
58 58 cursor += 1
59 59
60 60 def beforeExit(exctype, value, trace):
61 61 for process in processes:
62 62 process.terminate()
63 63 process.join()
64 print traceback.print_tb(trace)
65
64 66 sys.excepthook = beforeExit
65 67
66 68 for process in processes:
67 69 process.join()
68 70 process.terminate()
69
71 sys.exit()
70 72
71 73
72 74 class ParameterConf():
73 75
74 76 id = None
75 77 name = None
76 78 value = None
77 79 format = None
78 80
79 81 __formated_value = None
80 82
81 83 ELEMENTNAME = 'Parameter'
82 84
83 85 def __init__(self):
84 86
85 87 self.format = 'str'
86 88
87 89 def getElementName(self):
88 90
89 91 return self.ELEMENTNAME
90 92
91 93 def getValue(self):
92 94
93 95 value = self.value
94 96 format = self.format
95 97
96 98 if self.__formated_value != None:
97 99
98 100 return self.__formated_value
99 101
100 102 if format == 'obj':
101 103 return value
102 104
103 105 if format == 'str':
104 106 self.__formated_value = str(value)
105 107 return self.__formated_value
106 108
107 109 if value == '':
108 110 raise ValueError, "%s: This parameter value is empty" %self.name
109 111
110 112 if format == 'list':
111 113 strList = value.split(',')
112 114
113 115 self.__formated_value = strList
114 116
115 117 return self.__formated_value
116 118
117 119 if format == 'intlist':
118 120 """
119 121 Example:
120 122 value = (0,1,2)
121 123 """
122 124
123 125 new_value = ast.literal_eval(value)
124 126
125 127 if type(new_value) not in (tuple, list):
126 128 new_value = [int(new_value)]
127 129
128 130 self.__formated_value = new_value
129 131
130 132 return self.__formated_value
131 133
132 134 if format == 'floatlist':
133 135 """
134 136 Example:
135 137 value = (0.5, 1.4, 2.7)
136 138 """
137 139
138 140 new_value = ast.literal_eval(value)
139 141
140 142 if type(new_value) not in (tuple, list):
141 143 new_value = [float(new_value)]
142 144
143 145 self.__formated_value = new_value
144 146
145 147 return self.__formated_value
146 148
147 149 if format == 'date':
148 150 strList = value.split('/')
149 151 intList = [int(x) for x in strList]
150 152 date = datetime.date(intList[0], intList[1], intList[2])
151 153
152 154 self.__formated_value = date
153 155
154 156 return self.__formated_value
155 157
156 158 if format == 'time':
157 159 strList = value.split(':')
158 160 intList = [int(x) for x in strList]
159 161 time = datetime.time(intList[0], intList[1], intList[2])
160 162
161 163 self.__formated_value = time
162 164
163 165 return self.__formated_value
164 166
165 167 if format == 'pairslist':
166 168 """
167 169 Example:
168 170 value = (0,1),(1,2)
169 171 """
170 172
171 173 new_value = ast.literal_eval(value)
172 174
173 175 if type(new_value) not in (tuple, list):
174 176 raise ValueError, "%s has to be a tuple or list of pairs" %value
175 177
176 178 if type(new_value[0]) not in (tuple, list):
177 179 if len(new_value) != 2:
178 180 raise ValueError, "%s has to be a tuple or list of pairs" %value
179 181 new_value = [new_value]
180 182
181 183 for thisPair in new_value:
182 184 if len(thisPair) != 2:
183 185 raise ValueError, "%s has to be a tuple or list of pairs" %value
184 186
185 187 self.__formated_value = new_value
186 188
187 189 return self.__formated_value
188 190
189 191 if format == 'multilist':
190 192 """
191 193 Example:
192 194 value = (0,1,2),(3,4,5)
193 195 """
194 196 multiList = ast.literal_eval(value)
195 197
196 198 if type(multiList[0]) == int:
197 199 multiList = ast.literal_eval("(" + value + ")")
198 200
199 201 self.__formated_value = multiList
200 202
201 203 return self.__formated_value
202 204
203 205 if format == 'bool':
204 206 value = int(value)
205 207
206 208 if format == 'int':
207 209 value = float(value)
208 210
209 211 format_func = eval(format)
210 212
211 213 self.__formated_value = format_func(value)
212 214
213 215 return self.__formated_value
214 216
215 217 def updateId(self, new_id):
216 218
217 219 self.id = str(new_id)
218 220
219 221 def setup(self, id, name, value, format='str'):
220 222
221 223 self.id = str(id)
222 224 self.name = name
223 225 if format == 'obj':
224 226 self.value = value
225 227 else:
226 228 self.value = str(value)
227 229 self.format = str.lower(format)
228 230
229 231 self.getValue()
230 232
231 233 return 1
232 234
233 235 def update(self, name, value, format='str'):
234 236
235 237 self.name = name
236 238 self.value = str(value)
237 239 self.format = format
238 240
239 241 def makeXml(self, opElement):
240 242 if self.name not in ('queue',):
241 243 parmElement = SubElement(opElement, self.ELEMENTNAME)
242 244 parmElement.set('id', str(self.id))
243 245 parmElement.set('name', self.name)
244 246 parmElement.set('value', self.value)
245 247 parmElement.set('format', self.format)
246 248
247 249 def readXml(self, parmElement):
248 250
249 251 self.id = parmElement.get('id')
250 252 self.name = parmElement.get('name')
251 253 self.value = parmElement.get('value')
252 254 self.format = str.lower(parmElement.get('format'))
253 255
254 256 #Compatible with old signal chain version
255 257 if self.format == 'int' and self.name == 'idfigure':
256 258 self.name = 'id'
257 259
258 260 def printattr(self):
259 261
260 262 print "Parameter[%s]: name = %s, value = %s, format = %s" %(self.id, self.name, self.value, self.format)
261 263
262 264 class OperationConf():
263 265
264 266 id = None
265 267 name = None
266 268 priority = None
267 269 type = None
268 270
269 271 parmConfObjList = []
270 272
271 273 ELEMENTNAME = 'Operation'
272 274
273 275 def __init__(self):
274 276
275 277 self.id = '0'
276 278 self.name = None
277 279 self.priority = None
278 280 self.type = 'self'
279 281
280 282
281 283 def __getNewId(self):
282 284
283 285 return int(self.id)*10 + len(self.parmConfObjList) + 1
284 286
285 287 def updateId(self, new_id):
286 288
287 289 self.id = str(new_id)
288 290
289 291 n = 1
290 292 for parmObj in self.parmConfObjList:
291 293
292 294 idParm = str(int(new_id)*10 + n)
293 295 parmObj.updateId(idParm)
294 296
295 297 n += 1
296 298
297 299 def getElementName(self):
298 300
299 301 return self.ELEMENTNAME
300 302
301 303 def getParameterObjList(self):
302 304
303 305 return self.parmConfObjList
304 306
305 307 def getParameterObj(self, parameterName):
306 308
307 309 for parmConfObj in self.parmConfObjList:
308 310
309 311 if parmConfObj.name != parameterName:
310 312 continue
311 313
312 314 return parmConfObj
313 315
314 316 return None
315 317
316 318 def getParameterObjfromValue(self, parameterValue):
317 319
318 320 for parmConfObj in self.parmConfObjList:
319 321
320 322 if parmConfObj.getValue() != parameterValue:
321 323 continue
322 324
323 325 return parmConfObj.getValue()
324 326
325 327 return None
326 328
327 329 def getParameterValue(self, parameterName):
328 330
329 331 parameterObj = self.getParameterObj(parameterName)
330 332
331 333 # if not parameterObj:
332 334 # return None
333 335
334 336 value = parameterObj.getValue()
335 337
336 338 return value
337 339
338 340
339 341 def getKwargs(self):
340 342
341 343 kwargs = {}
342 344
343 345 for parmConfObj in self.parmConfObjList:
344 346 if self.name == 'run' and parmConfObj.name == 'datatype':
345 347 continue
346 348
347 349 kwargs[parmConfObj.name] = parmConfObj.getValue()
348 350
349 351 return kwargs
350 352
351 353 def setup(self, id, name, priority, type):
352 354
353 355 self.id = str(id)
354 356 self.name = name
355 357 self.type = type
356 358 self.priority = priority
357 359
358 360 self.parmConfObjList = []
359 361
360 362 def removeParameters(self):
361 363
362 364 for obj in self.parmConfObjList:
363 365 del obj
364 366
365 367 self.parmConfObjList = []
366 368
367 369 def addParameter(self, name, value, format='str'):
368 370
369 371 id = self.__getNewId()
370 372
371 373 parmConfObj = ParameterConf()
372 374 if not parmConfObj.setup(id, name, value, format):
373 375 return None
374 376
375 377 self.parmConfObjList.append(parmConfObj)
376 378
377 379 return parmConfObj
378 380
379 381 def changeParameter(self, name, value, format='str'):
380 382
381 383 parmConfObj = self.getParameterObj(name)
382 384 parmConfObj.update(name, value, format)
383 385
384 386 return parmConfObj
385 387
386 388 def makeXml(self, procUnitElement):
387 389
388 390 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
389 391 opElement.set('id', str(self.id))
390 392 opElement.set('name', self.name)
391 393 opElement.set('type', self.type)
392 394 opElement.set('priority', str(self.priority))
393 395
394 396 for parmConfObj in self.parmConfObjList:
395 397 parmConfObj.makeXml(opElement)
396 398
397 399 def readXml(self, opElement):
398 400
399 401 self.id = opElement.get('id')
400 402 self.name = opElement.get('name')
401 403 self.type = opElement.get('type')
402 404 self.priority = opElement.get('priority')
403 405
404 406 #Compatible with old signal chain version
405 407 #Use of 'run' method instead 'init'
406 408 if self.type == 'self' and self.name == 'init':
407 409 self.name = 'run'
408 410
409 411 self.parmConfObjList = []
410 412
411 413 parmElementList = opElement.iter(ParameterConf().getElementName())
412 414
413 415 for parmElement in parmElementList:
414 416 parmConfObj = ParameterConf()
415 417 parmConfObj.readXml(parmElement)
416 418
417 419 #Compatible with old signal chain version
418 420 #If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
419 421 if self.type != 'self' and self.name == 'Plot':
420 422 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
421 423 self.name = parmConfObj.value
422 424 continue
423 425
424 426 self.parmConfObjList.append(parmConfObj)
425 427
426 428 def printattr(self):
427 429
428 430 print "%s[%s]: name = %s, type = %s, priority = %s" %(self.ELEMENTNAME,
429 431 self.id,
430 432 self.name,
431 433 self.type,
432 434 self.priority)
433 435
434 436 for parmConfObj in self.parmConfObjList:
435 437 parmConfObj.printattr()
436 438
437 439 def createObject(self, plotter_queue=None):
438 440
439 441
440 442 if self.type == 'self':
441 443 raise ValueError, "This operation type cannot be created"
442 444
443 445 if self.type == 'plotter':
444 446 #Plotter(plotter_name)
445 447 if not plotter_queue:
446 448 raise ValueError, "plotter_queue is not defined. Use:\nmyProject = Project()\nmyProject.setPlotterQueue(plotter_queue)"
447 449
448 450 opObj = Plotter(self.name, plotter_queue)
449 451
450 452 if self.type == 'external' or self.type == 'other':
451 453
452 454 className = eval(self.name)
453 455 kwargs = self.getKwargs()
454 456
455 457 opObj = className(**kwargs)
456 458
457 459 return opObj
458 460
459 461
460 462 class ProcUnitConf():
461 463
462 464 id = None
463 465 name = None
464 466 datatype = None
465 467 inputId = None
466 468 parentId = None
467 469
468 470 opConfObjList = []
469 471
470 472 procUnitObj = None
471 473 opObjList = []
472 474
473 475 ELEMENTNAME = 'ProcUnit'
474 476
475 477 def __init__(self):
476 478
477 479 self.id = None
478 480 self.datatype = None
479 481 self.name = None
480 482 self.inputId = None
481 483
482 484 self.opConfObjList = []
483 485
484 486 self.procUnitObj = None
485 487 self.opObjDict = {}
486 488
487 489 def __getPriority(self):
488 490
489 491 return len(self.opConfObjList)+1
490 492
491 493 def __getNewId(self):
492 494
493 495 return int(self.id)*10 + len(self.opConfObjList) + 1
494 496
495 497 def getElementName(self):
496 498
497 499 return self.ELEMENTNAME
498 500
499 501 def getId(self):
500 502
501 503 return self.id
502 504
503 505 def updateId(self, new_id, parentId=parentId):
504 506
505 507
506 508 new_id = int(parentId)*10 + (int(self.id) % 10)
507 509 new_inputId = int(parentId)*10 + (int(self.inputId) % 10)
508 510
509 511 #If this proc unit has not inputs
510 512 if self.inputId == '0':
511 513 new_inputId = 0
512 514
513 515 n = 1
514 516 for opConfObj in self.opConfObjList:
515 517
516 518 idOp = str(int(new_id)*10 + n)
517 519 opConfObj.updateId(idOp)
518 520
519 521 n += 1
520 522
521 523 self.parentId = str(parentId)
522 524 self.id = str(new_id)
523 525 self.inputId = str(new_inputId)
524 526
525 527
526 528 def getInputId(self):
527 529
528 530 return self.inputId
529 531
530 532 def getOperationObjList(self):
531 533
532 534 return self.opConfObjList
533 535
534 536 def getOperationObj(self, name=None):
535 537
536 538 for opConfObj in self.opConfObjList:
537 539
538 540 if opConfObj.name != name:
539 541 continue
540 542
541 543 return opConfObj
542 544
543 545 return None
544 546
545 547 def getOpObjfromParamValue(self, value=None):
546 548
547 549 for opConfObj in self.opConfObjList:
548 550 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
549 551 continue
550 552 return opConfObj
551 553 return None
552 554
553 555 def getProcUnitObj(self):
554 556
555 557 return self.procUnitObj
556 558
557 559 def setup(self, id, name, datatype, inputId, parentId=None):
558 560
559 561 #Compatible with old signal chain version
560 562 if datatype==None and name==None:
561 563 raise ValueError, "datatype or name should be defined"
562 564
563 565 if name==None:
564 566 if 'Proc' in datatype:
565 567 name = datatype
566 568 else:
567 569 name = '%sProc' %(datatype)
568 570
569 571 if datatype==None:
570 572 datatype = name.replace('Proc','')
571 573
572 574 self.id = str(id)
573 575 self.name = name
574 576 self.datatype = datatype
575 577 self.inputId = inputId
576 578 self.parentId = parentId
577 579
578 580 self.opConfObjList = []
579 581
580 582 self.addOperation(name='run', optype='self')
581 583
582 584 def removeOperations(self):
583 585
584 586 for obj in self.opConfObjList:
585 587 del obj
586 588
587 589 self.opConfObjList = []
588 590 self.addOperation(name='run')
589 591
590 592 def addParameter(self, **kwargs):
591 593 '''
592 594 Add parameters to "run" operation
593 595 '''
594 596 opObj = self.opConfObjList[0]
595 597
596 598 opObj.addParameter(**kwargs)
597 599
598 600 return opObj
599 601
600 602 def addOperation(self, name, optype='self'):
601 603
602 604 id = self.__getNewId()
603 605 priority = self.__getPriority()
604 606
605 607 opConfObj = OperationConf()
606 608 opConfObj.setup(id, name=name, priority=priority, type=optype)
607 609
608 610 self.opConfObjList.append(opConfObj)
609 611
610 612 return opConfObj
611 613
612 614 def makeXml(self, projectElement):
613 615
614 616 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
615 617 procUnitElement.set('id', str(self.id))
616 618 procUnitElement.set('name', self.name)
617 619 procUnitElement.set('datatype', self.datatype)
618 620 procUnitElement.set('inputId', str(self.inputId))
619 621
620 622 for opConfObj in self.opConfObjList:
621 623 opConfObj.makeXml(procUnitElement)
622 624
623 625 def readXml(self, upElement):
624 626
625 627 self.id = upElement.get('id')
626 628 self.name = upElement.get('name')
627 629 self.datatype = upElement.get('datatype')
628 630 self.inputId = upElement.get('inputId')
629 631
630 632 if self.ELEMENTNAME == "ReadUnit":
631 633 self.datatype = self.datatype.replace("Reader", "")
632 634
633 635 if self.ELEMENTNAME == "ProcUnit":
634 636 self.datatype = self.datatype.replace("Proc", "")
635 637
636 638 if self.inputId == 'None':
637 639 self.inputId = '0'
638 640
639 641 self.opConfObjList = []
640 642
641 643 opElementList = upElement.iter(OperationConf().getElementName())
642 644
643 645 for opElement in opElementList:
644 646 opConfObj = OperationConf()
645 647 opConfObj.readXml(opElement)
646 648 self.opConfObjList.append(opConfObj)
647 649
648 650 def printattr(self):
649 651
650 652 print "%s[%s]: name = %s, datatype = %s, inputId = %s" %(self.ELEMENTNAME,
651 653 self.id,
652 654 self.name,
653 655 self.datatype,
654 656 self.inputId)
655 657
656 658 for opConfObj in self.opConfObjList:
657 659 opConfObj.printattr()
658 660
659 661
660 662 def getKwargs(self):
661 663
662 664 opObj = self.opConfObjList[0]
663 665 kwargs = opObj.getKwargs()
664 666
665 667 return kwargs
666 668
667 669 def createObjects(self, plotter_queue=None):
668 670
669 671 className = eval(self.name)
670 672 kwargs = self.getKwargs()
671 673 procUnitObj = className(**kwargs)
672 674
673 675 for opConfObj in self.opConfObjList:
674 676
675 677 if opConfObj.type=='self' and self.name=='run':
676 678 continue
677 679 elif opConfObj.type=='self':
678 680 procUnitObj.addOperationKwargs(opConfObj.id, **opConfObj.getKwargs())
679 681 continue
680 682
681 683 opObj = opConfObj.createObject(plotter_queue)
682 684
683 685 self.opObjDict[opConfObj.id] = opObj
684 686
685 687 procUnitObj.addOperation(opObj, opConfObj.id)
686 688
687 689 self.procUnitObj = procUnitObj
688 690
689 691 return procUnitObj
690 692
691 693 def run(self):
692 694
693 695 is_ok = False
694 696
695 697 for opConfObj in self.opConfObjList:
696 698
697 699 kwargs = {}
698 700 for parmConfObj in opConfObj.getParameterObjList():
699 701 if opConfObj.name == 'run' and parmConfObj.name == 'datatype':
700 702 continue
701 703
702 704 kwargs[parmConfObj.name] = parmConfObj.getValue()
703 705
704 706 #ini = time.time()
705 707
706 708 #print "\tRunning the '%s' operation with %s" %(opConfObj.name, opConfObj.id)
707 709 sts = self.procUnitObj.call(opType = opConfObj.type,
708 710 opName = opConfObj.name,
709 711 opId = opConfObj.id,
710 712 )
711 713
712 714 # total_time = time.time() - ini
713 715 #
714 716 # if total_time > 0.002:
715 717 # print "%s::%s took %f seconds" %(self.name, opConfObj.name, total_time)
716 718
717 719 is_ok = is_ok or sts
718 720
719 721 return is_ok
720 722
721 723 def close(self):
722 724
723 725 for opConfObj in self.opConfObjList:
724 726 if opConfObj.type == 'self':
725 727 continue
726 728
727 729 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
728 730 opObj.close()
729 731
730 732 self.procUnitObj.close()
731 733
732 734 return
733 735
734 736 class ReadUnitConf(ProcUnitConf):
735 737
736 738 path = None
737 739 startDate = None
738 740 endDate = None
739 741 startTime = None
740 742 endTime = None
741 743
742 744 ELEMENTNAME = 'ReadUnit'
743 745
744 746 def __init__(self):
745 747
746 748 self.id = None
747 749 self.datatype = None
748 750 self.name = None
749 751 self.inputId = None
750 752
751 753 self.parentId = None
752 754
753 755 self.opConfObjList = []
754 756 self.opObjList = []
755 757
756 758 def getElementName(self):
757 759
758 760 return self.ELEMENTNAME
759 761
760 762 def setup(self, id, name, datatype, path, startDate="", endDate="", startTime="", endTime="", parentId=None, queue=None, **kwargs):
761 763
762 764 #Compatible with old signal chain version
763 765 if datatype==None and name==None:
764 766 raise ValueError, "datatype or name should be defined"
765 767
766 768 if name==None:
767 769 if 'Reader' in datatype:
768 770 name = datatype
769 771 else:
770 772 name = '%sReader' %(datatype)
771 773
772 774 if datatype==None:
773 775 datatype = name.replace('Reader','')
774 776
775 777 self.id = id
776 778 self.name = name
777 779 self.datatype = datatype
778 780
779 781 self.path = os.path.abspath(path)
780 782 self.startDate = startDate
781 783 self.endDate = endDate
782 784 self.startTime = startTime
783 785 self.endTime = endTime
784 786
785 787 self.inputId = '0'
786 788 self.parentId = parentId
787 789 self.queue = queue
788 790 self.addRunOperation(**kwargs)
789 791
790 792 def update(self, datatype, path, startDate, endDate, startTime, endTime, parentId=None, name=None, **kwargs):
791 793
792 794 #Compatible with old signal chain version
793 795 if datatype==None and name==None:
794 796 raise ValueError, "datatype or name should be defined"
795 797
796 798 if name==None:
797 799 if 'Reader' in datatype:
798 800 name = datatype
799 801 else:
800 802 name = '%sReader' %(datatype)
801 803
802 804 if datatype==None:
803 805 datatype = name.replace('Reader','')
804 806
805 807 self.datatype = datatype
806 808 self.name = name
807 809 self.path = path
808 810 self.startDate = startDate
809 811 self.endDate = endDate
810 812 self.startTime = startTime
811 813 self.endTime = endTime
812 814
813 815 self.inputId = '0'
814 816 self.parentId = parentId
815 817
816 818 self.updateRunOperation(**kwargs)
817 819
818 820 def removeOperations(self):
819 821
820 822 for obj in self.opConfObjList:
821 823 del obj
822 824
823 825 self.opConfObjList = []
824 826
825 827 def addRunOperation(self, **kwargs):
826 828
827 829 opObj = self.addOperation(name = 'run', optype = 'self')
828 830
829 831 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
830 832 opObj.addParameter(name='path' , value=self.path, format='str')
831 833 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
832 834 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
833 835 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
834 836 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
835 837 opObj.addParameter(name='queue' , value=self.queue, format='obj')
836 838
837 839 for key, value in kwargs.items():
838 840 opObj.addParameter(name=key, value=value, format=type(value).__name__)
839 841
840 842 return opObj
841 843
842 844 def updateRunOperation(self, **kwargs):
843 845
844 846 opObj = self.getOperationObj(name = 'run')
845 847 opObj.removeParameters()
846 848
847 849 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
848 850 opObj.addParameter(name='path' , value=self.path, format='str')
849 851 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
850 852 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
851 853 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
852 854 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
853 855
854 856 for key, value in kwargs.items():
855 857 opObj.addParameter(name=key, value=value, format=type(value).__name__)
856 858
857 859 return opObj
858 860
859 861 # def makeXml(self, projectElement):
860 862 #
861 863 # procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
862 864 # procUnitElement.set('id', str(self.id))
863 865 # procUnitElement.set('name', self.name)
864 866 # procUnitElement.set('datatype', self.datatype)
865 867 # procUnitElement.set('inputId', str(self.inputId))
866 868 #
867 869 # for opConfObj in self.opConfObjList:
868 870 # opConfObj.makeXml(procUnitElement)
869 871
870 872 def readXml(self, upElement):
871 873
872 874 self.id = upElement.get('id')
873 875 self.name = upElement.get('name')
874 876 self.datatype = upElement.get('datatype')
875 877 self.inputId = upElement.get('inputId')
876 878
877 879 if self.ELEMENTNAME == "ReadUnit":
878 880 self.datatype = self.datatype.replace("Reader", "")
879 881
880 882 if self.inputId == 'None':
881 883 self.inputId = '0'
882 884
883 885 self.opConfObjList = []
884 886
885 887 opElementList = upElement.iter(OperationConf().getElementName())
886 888
887 889 for opElement in opElementList:
888 890 opConfObj = OperationConf()
889 891 opConfObj.readXml(opElement)
890 892 self.opConfObjList.append(opConfObj)
891 893
892 894 if opConfObj.name == 'run':
893 895 self.path = opConfObj.getParameterValue('path')
894 896 self.startDate = opConfObj.getParameterValue('startDate')
895 897 self.endDate = opConfObj.getParameterValue('endDate')
896 898 self.startTime = opConfObj.getParameterValue('startTime')
897 899 self.endTime = opConfObj.getParameterValue('endTime')
898 900
899 901 class Project():
900 902
901 903 id = None
902 904 name = None
903 905 description = None
904 906 filename = None
905 907
906 908 procUnitConfObjDict = None
907 909
908 910 ELEMENTNAME = 'Project'
909 911
910 912 plotterQueue = None
911 913
912 914 def __init__(self, plotter_queue=None):
913 915
914 916 self.id = None
915 917 self.name = None
916 918 self.description = None
917 919
918 920 self.plotterQueue = plotter_queue
919 921
920 922 self.procUnitConfObjDict = {}
921 923
922 924 def __getNewId(self):
923 925
924 926 idList = self.procUnitConfObjDict.keys()
925 927
926 928 id = int(self.id)*10
927 929
928 930 while True:
929 931 id += 1
930 932
931 933 if str(id) in idList:
932 934 continue
933 935
934 936 break
935 937
936 938 return str(id)
937 939
938 940 def getElementName(self):
939 941
940 942 return self.ELEMENTNAME
941 943
942 944 def getId(self):
943 945
944 946 return self.id
945 947
946 948 def updateId(self, new_id):
947 949
948 950 self.id = str(new_id)
949 951
950 952 keyList = self.procUnitConfObjDict.keys()
951 953 keyList.sort()
952 954
953 955 n = 1
954 956 newProcUnitConfObjDict = {}
955 957
956 958 for procKey in keyList:
957 959
958 960 procUnitConfObj = self.procUnitConfObjDict[procKey]
959 961 idProcUnit = str(int(self.id)*10 + n)
960 962 procUnitConfObj.updateId(idProcUnit, parentId = self.id)
961 963
962 964 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
963 965 n += 1
964 966
965 967 self.procUnitConfObjDict = newProcUnitConfObjDict
966 968
967 969 def setup(self, id, name, description):
968 970
969 971 self.id = str(id)
970 972 self.name = name
971 973 self.description = description
972 974
973 975 def update(self, name, description):
974 976
975 977 self.name = name
976 978 self.description = description
977 979
978 980 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
979 981
980 982 if id is None:
981 983 idReadUnit = self.__getNewId()
982 984 else:
983 985 idReadUnit = str(id)
984 986
985 987 readUnitConfObj = ReadUnitConf()
986 988 readUnitConfObj.setup(idReadUnit, name, datatype, parentId=self.id, **kwargs)
987 989
988 990 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
989 991
990 992 return readUnitConfObj
991 993
992 994 def addProcUnit(self, inputId='0', datatype=None, name=None):
993 995
994 996 idProcUnit = self.__getNewId()
995 997
996 998 procUnitConfObj = ProcUnitConf()
997 999 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, parentId=self.id)
998 1000
999 1001 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1000 1002
1001 1003 return procUnitConfObj
1002 1004
1003 1005 def removeProcUnit(self, id):
1004 1006
1005 1007 if id in self.procUnitConfObjDict.keys():
1006 1008 self.procUnitConfObjDict.pop(id)
1007 1009
1008 1010 def getReadUnitId(self):
1009 1011
1010 1012 readUnitConfObj = self.getReadUnitObj()
1011 1013
1012 1014 return readUnitConfObj.id
1013 1015
1014 1016 def getReadUnitObj(self):
1015 1017
1016 1018 for obj in self.procUnitConfObjDict.values():
1017 1019 if obj.getElementName() == "ReadUnit":
1018 1020 return obj
1019 1021
1020 1022 return None
1021 1023
1022 1024 def getProcUnitObj(self, id=None, name=None):
1023 1025
1024 1026 if id != None:
1025 1027 return self.procUnitConfObjDict[id]
1026 1028
1027 1029 if name != None:
1028 1030 return self.getProcUnitObjByName(name)
1029 1031
1030 1032 return None
1031 1033
1032 1034 def getProcUnitObjByName(self, name):
1033 1035
1034 1036 for obj in self.procUnitConfObjDict.values():
1035 1037 if obj.name == name:
1036 1038 return obj
1037 1039
1038 1040 return None
1039 1041
1040 1042 def procUnitItems(self):
1041 1043
1042 1044 return self.procUnitConfObjDict.items()
1043 1045
1044 1046 def makeXml(self):
1045 1047
1046 1048 projectElement = Element('Project')
1047 1049 projectElement.set('id', str(self.id))
1048 1050 projectElement.set('name', self.name)
1049 1051 projectElement.set('description', self.description)
1050 1052
1051 1053 for procUnitConfObj in self.procUnitConfObjDict.values():
1052 1054 procUnitConfObj.makeXml(projectElement)
1053 1055
1054 1056 self.projectElement = projectElement
1055 1057
1056 1058 def writeXml(self, filename=None):
1057 1059
1058 1060 if filename == None:
1059 1061 if self.filename:
1060 1062 filename = self.filename
1061 1063 else:
1062 1064 filename = "schain.xml"
1063 1065
1064 1066 if not filename:
1065 1067 print "filename has not been defined. Use setFilename(filename) for do it."
1066 1068 return 0
1067 1069
1068 1070 abs_file = os.path.abspath(filename)
1069 1071
1070 1072 if not os.access(os.path.dirname(abs_file), os.W_OK):
1071 1073 print "No write permission on %s" %os.path.dirname(abs_file)
1072 1074 return 0
1073 1075
1074 1076 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1075 1077 print "File %s already exists and it could not be overwriten" %abs_file
1076 1078 return 0
1077 1079
1078 1080 self.makeXml()
1079 1081
1080 1082 ElementTree(self.projectElement).write(abs_file, method='xml')
1081 1083
1082 1084 self.filename = abs_file
1083 1085
1084 1086 return 1
1085 1087
1086 1088 def readXml(self, filename = None):
1087 1089
1088 1090 if not filename:
1089 1091 print "filename is not defined"
1090 1092 return 0
1091 1093
1092 1094 abs_file = os.path.abspath(filename)
1093 1095
1094 1096 if not os.path.isfile(abs_file):
1095 1097 print "%s file does not exist" %abs_file
1096 1098 return 0
1097 1099
1098 1100 self.projectElement = None
1099 1101 self.procUnitConfObjDict = {}
1100 1102
1101 1103 try:
1102 1104 self.projectElement = ElementTree().parse(abs_file)
1103 1105 except:
1104 1106 print "Error reading %s, verify file format" %filename
1105 1107 return 0
1106 1108
1107 1109 self.project = self.projectElement.tag
1108 1110
1109 1111 self.id = self.projectElement.get('id')
1110 1112 self.name = self.projectElement.get('name')
1111 1113 self.description = self.projectElement.get('description')
1112 1114
1113 1115 readUnitElementList = self.projectElement.iter(ReadUnitConf().getElementName())
1114 1116
1115 1117 for readUnitElement in readUnitElementList:
1116 1118 readUnitConfObj = ReadUnitConf()
1117 1119 readUnitConfObj.readXml(readUnitElement)
1118 1120
1119 1121 if readUnitConfObj.parentId == None:
1120 1122 readUnitConfObj.parentId = self.id
1121 1123
1122 1124 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1123 1125
1124 1126 procUnitElementList = self.projectElement.iter(ProcUnitConf().getElementName())
1125 1127
1126 1128 for procUnitElement in procUnitElementList:
1127 1129 procUnitConfObj = ProcUnitConf()
1128 1130 procUnitConfObj.readXml(procUnitElement)
1129 1131
1130 1132 if procUnitConfObj.parentId == None:
1131 1133 procUnitConfObj.parentId = self.id
1132 1134
1133 1135 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1134 1136
1135 1137 self.filename = abs_file
1136 1138
1137 1139 return 1
1138 1140
1139 1141 def printattr(self):
1140 1142
1141 1143 print "Project[%s]: name = %s, description = %s" %(self.id,
1142 1144 self.name,
1143 1145 self.description)
1144 1146
1145 1147 for procUnitConfObj in self.procUnitConfObjDict.values():
1146 1148 procUnitConfObj.printattr()
1147 1149
1148 1150 def createObjects(self):
1149 1151
1150 1152 for procUnitConfObj in self.procUnitConfObjDict.values():
1151 1153 procUnitConfObj.createObjects(self.plotterQueue)
1152 1154
1153 1155 def __connect(self, objIN, thisObj):
1154 1156
1155 1157 thisObj.setInput(objIN.getOutputObj())
1156 1158
1157 1159 def connectObjects(self):
1158 1160
1159 1161 for thisPUConfObj in self.procUnitConfObjDict.values():
1160 1162
1161 1163 inputId = thisPUConfObj.getInputId()
1162 1164
1163 1165 if int(inputId) == 0:
1164 1166 continue
1165 1167
1166 1168 #Get input object
1167 1169 puConfINObj = self.procUnitConfObjDict[inputId]
1168 1170 puObjIN = puConfINObj.getProcUnitObj()
1169 1171
1170 1172 #Get current object
1171 1173 thisPUObj = thisPUConfObj.getProcUnitObj()
1172 1174
1173 1175 self.__connect(puObjIN, thisPUObj)
1174 1176
1175 1177 def __handleError(self, procUnitConfObj, send_email=True):
1176 1178
1177 1179 import socket
1178 1180
1179 1181 err = traceback.format_exception(sys.exc_info()[0],
1180 1182 sys.exc_info()[1],
1181 1183 sys.exc_info()[2])
1182 1184
1183 1185 print "***** Error occurred in %s *****" %(procUnitConfObj.name)
1184 1186 print "***** %s" %err[-1]
1185 1187
1186 1188 message = "".join(err)
1187 1189
1188 1190 sys.stderr.write(message)
1189 1191
1190 1192 if not send_email:
1191 1193 return
1192 1194
1193 1195 subject = "SChain v%s: Error running %s\n" %(schainpy.__version__, procUnitConfObj.name)
1194 1196
1195 1197 subtitle = "%s: %s\n" %(procUnitConfObj.getElementName() ,procUnitConfObj.name)
1196 1198 subtitle += "Hostname: %s\n" %socket.gethostbyname(socket.gethostname())
1197 1199 subtitle += "Working directory: %s\n" %os.path.abspath("./")
1198 1200 subtitle += "Configuration file: %s\n" %self.filename
1199 1201 subtitle += "Time: %s\n" %str(datetime.datetime.now())
1200 1202
1201 1203 readUnitConfObj = self.getReadUnitObj()
1202 1204 if readUnitConfObj:
1203 1205 subtitle += "\nInput parameters:\n"
1204 1206 subtitle += "[Data path = %s]\n" %readUnitConfObj.path
1205 1207 subtitle += "[Data type = %s]\n" %readUnitConfObj.datatype
1206 1208 subtitle += "[Start date = %s]\n" %readUnitConfObj.startDate
1207 1209 subtitle += "[End date = %s]\n" %readUnitConfObj.endDate
1208 1210 subtitle += "[Start time = %s]\n" %readUnitConfObj.startTime
1209 1211 subtitle += "[End time = %s]\n" %readUnitConfObj.endTime
1210 1212
1211 1213 adminObj = schainpy.admin.SchainNotify()
1212 1214 adminObj.sendAlert(message=message,
1213 1215 subject=subject,
1214 1216 subtitle=subtitle,
1215 1217 filename=self.filename)
1216 1218
1217 1219 def isPaused(self):
1218 1220 return 0
1219 1221
1220 1222 def isStopped(self):
1221 1223 return 0
1222 1224
1223 1225 def runController(self):
1224 1226 """
1225 1227 returns 0 when this process has been stopped, 1 otherwise
1226 1228 """
1227 1229
1228 1230 if self.isPaused():
1229 1231 print "Process suspended"
1230 1232
1231 1233 while True:
1232 1234 sleep(0.1)
1233 1235
1234 1236 if not self.isPaused():
1235 1237 break
1236 1238
1237 1239 if self.isStopped():
1238 1240 break
1239 1241
1240 1242 print "Process reinitialized"
1241 1243
1242 1244 if self.isStopped():
1243 1245 print "Process stopped"
1244 1246 return 0
1245 1247
1246 1248 return 1
1247 1249
1248 1250 def setFilename(self, filename):
1249 1251
1250 1252 self.filename = filename
1251 1253
1252 1254 def setPlotterQueue(self, plotter_queue):
1253 1255
1254 1256 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1255 1257
1256 1258 def getPlotterQueue(self):
1257 1259
1258 1260 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1259 1261
1260 1262 def useExternalPlotter(self):
1261 1263
1262 1264 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1263 1265
1264 1266 def run(self):
1265 1267
1266 1268 print
1267 1269 print "*"*60
1268 1270 print " Starting SIGNAL CHAIN PROCESSING v%s " %schainpy.__version__
1269 1271 print "*"*60
1270 1272 print
1271 1273
1272 1274 keyList = self.procUnitConfObjDict.keys()
1273 1275 keyList.sort()
1274 1276
1275 1277 while(True):
1276 1278
1277 1279 is_ok = False
1278 1280
1279 1281 for procKey in keyList:
1280 1282 # print "Running the '%s' process with %s" %(procUnitConfObj.name, procUnitConfObj.id)
1281 1283
1282 1284 procUnitConfObj = self.procUnitConfObjDict[procKey]
1283 1285
1284 1286 try:
1285 1287 sts = procUnitConfObj.run()
1286 1288 is_ok = is_ok or sts
1287 1289 except KeyboardInterrupt:
1288 1290 is_ok = False
1289 1291 break
1290 1292 except ValueError, e:
1291 1293 sleep(0.5)
1292 1294 self.__handleError(procUnitConfObj, send_email=True)
1293 1295 is_ok = False
1294 1296 break
1295 1297 except:
1296 1298 sleep(0.5)
1297 1299 self.__handleError(procUnitConfObj)
1298 1300 is_ok = False
1299 1301 break
1300 1302
1301 1303 #If every process unit finished so end process
1302 1304 if not(is_ok):
1303 1305 # print "Every process unit have finished"
1304 1306 break
1305 1307
1306 1308 if not self.runController():
1307 1309 break
1308 1310
1309 1311 #Closing every process
1310 1312 for procKey in keyList:
1311 1313 procUnitConfObj = self.procUnitConfObjDict[procKey]
1312 1314 procUnitConfObj.close()
1313 1315
1314 1316 print "Process finished"
1315 1317
1316 1318 def start(self):
1317 1319
1318 1320 self.writeXml()
1319 1321 self.createObjects()
1320 1322 self.connectObjects()
1321 1323 self.run()
@@ -1,50 +1,50
1 1 #!/usr/bin/env python
2 2 '''
3 3 Created on Jul 7, 2014
4 4
5 5 @author: roj-idl71
6 6 '''
7 7 import os, sys
8 8
9 9 from schainpy.controller import Project
10 10
11 11 if __name__ == '__main__':
12 12 desc = "Segundo Test"
13 13
14 14 controllerObj = Project()
15 15 controllerObj.setup(id='191', name='test01', description=desc)
16 16
17 17 proc1 = controllerObj.addProcUnit(name='ReceiverData')
18 proc1.addParameter(name='realtime', value='1', format='bool')
18 proc1.addParameter(name='realtime', value='0', format='bool')
19 19 proc1.addParameter(name='plottypes', value='rti', format='str')
20 20 # proc1.addParameter(name='throttle', value='10', format='int')
21 21 proc1.addParameter(name='plot_server', value='tcp://10.10.10.82:7000', format='str')
22 22 ## TODO Agregar direccion de server de publicacion a graficos como variable
23 23
24 # op1 = proc1.addOperation(name='PlotRTIData', optype='other')
25 # op1.addParameter(name='wintitle', value='Julia 150Km', format='str')
26 # op1.addParameter(name='save', value='/home/nanosat/Pictures', format='str')
24 op1 = proc1.addOperation(name='PlotRTIData', optype='other')
25 op1.addParameter(name='wintitle', value='Julia 150Km', format='str')
26 op1.addParameter(name='save', value='/home/nanosat/Pictures', format='str')
27 27 #
28 28 # op2 = proc1.addOperation(name='PlotCOHData', optype='other')
29 29 # op2.addParameter(name='wintitle', value='Julia 150Km', format='str')
30 30 # op2.addParameter(name='save', value='/home/nanosat/Pictures', format='str')
31 31 # #
32 32 # op6 = proc1.addOperation(name='PlotPHASEData', optype='other')
33 33 # op6.addParameter(name='wintitle', value='Julia 150Km', format='str')
34 34 # op6.addParameter(name='save', value='/home/nanosat/Pictures', format='str')
35 35 #
36 36 # proc2 = controllerObj.addProcUnit(name='ReceiverData')
37 37 # proc2.addParameter(name='server', value='juanca', format='str')
38 38 # proc2.addParameter(name='plottypes', value='snr,dop', format='str')
39 39 #
40 40 # op3 = proc2.addOperation(name='PlotSNRData', optype='other')
41 41 # op3.addParameter(name='wintitle', value='Julia 150Km', format='str')
42 42 # op3.addParameter(name='save', value='/home/nanosat/Pictures', format='str')
43 43 #
44 44 # op4 = proc2.addOperation(name='PlotDOPData', optype='other')
45 45 # op4.addParameter(name='wintitle', value='Julia 150Km', format='str')
46 46 # op4.addParameter(name='save', value='/home/nanosat/Pictures', format='str')
47 47
48 48
49 49
50 50 controllerObj.start()
@@ -1,1 +1,1
1 <Project description="HF_EXAMPLE" id="191" name="test01"><ReadUnit datatype="SpectraReader" id="1911" inputId="0" name="SpectraReader"><Operation id="19111" name="run" priority="1" type="self"><Parameter format="str" id="191111" name="datatype" value="SpectraReader" /><Parameter format="str" id="191112" name="path" value="/home/nanosat/data/hysell_data20/pdata" /><Parameter format="date" id="191113" name="startDate" value="2015/09/26" /><Parameter format="date" id="191114" name="endDate" value="2015/09/26" /><Parameter format="time" id="191115" name="startTime" value="00:00:00" /><Parameter format="time" id="191116" name="endTime" value="23:59:59" /><Parameter format="int" id="191118" name="cursor" value="33" /><Parameter format="int" id="191119" name="skip" value="22" /><Parameter format="int" id="191120" name="delay" value="10" /><Parameter format="int" id="191121" name="walk" value="1" /><Parameter format="int" id="191122" name="online" value="0" /></Operation></ReadUnit><ProcUnit datatype="Spectra" id="1912" inputId="1911" name="SpectraProc"><Operation id="19121" name="run" priority="1" type="self" /><Operation id="19122" name="PublishData" priority="2" type="other"><Parameter format="int" id="191221" name="zeromq" value="1" /><Parameter format="int" id="191222" name="delay" value="1" /></Operation></ProcUnit></Project> No newline at end of file
1 <Project description="Segundo Test" id="191" name="test01"><ProcUnit datatype="ReceiverData" id="1911" inputId="0" name="ReceiverData"><Operation id="19111" name="run" priority="1" type="self"><Parameter format="bool" id="191111" name="realtime" value="1" /><Parameter format="str" id="191112" name="plottypes" value="rti" /><Parameter format="str" id="191113" name="plot_server" value="tcp://10.10.10.82:7000" /></Operation><Operation id="19112" name="PlotRTIData" priority="2" type="other"><Parameter format="str" id="191121" name="wintitle" value="Julia 150Km" /><Parameter format="str" id="191122" name="save" value="/home/nanosat/Pictures" /></Operation></ProcUnit></Project> No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now