##// END OF EJS Templates
Fix name and datatype in ReadUnitConf
Juan C. Espinoza -
r1088:f797261b7b45
parent child
Show More
@@ -1,1297 +1,1302
1 1 '''
2 2 Created on September , 2012
3 3 @author:
4 4 '''
5 5
6 6 import sys
7 7 import ast
8 8 import datetime
9 9 import traceback
10 10 import math
11 11 import time
12 12 from multiprocessing import Process, cpu_count
13 13
14 14 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
15 15 from xml.dom import minidom
16 16
17 17 import schainpy
18 18 import schainpy.admin
19 19 from schainpy.model import *
20 20 from schainpy.utils import log
21 21
22 22 DTYPES = {
23 23 'Voltage': '.r',
24 24 'Spectra': '.pdata'
25 25 }
26 26
27 27 def MPProject(project, n=cpu_count()):
28 28 '''
29 29 Project wrapper to run schain in n processes
30 30 '''
31 31
32 32 rconf = project.getReadUnitObj()
33 33 op = rconf.getOperationObj('run')
34 34 dt1 = op.getParameterValue('startDate')
35 35 dt2 = op.getParameterValue('endDate')
36 36 days = (dt2 - dt1).days
37 37
38 38 for day in range(days+1):
39 39 skip = 0
40 40 cursor = 0
41 41 processes = []
42 42 dt = dt1 + datetime.timedelta(day)
43 43 dt_str = dt.strftime('%Y/%m/%d')
44 44 reader = JRODataReader()
45 45 paths, files = reader.searchFilesOffLine(path=rconf.path,
46 46 startDate=dt,
47 47 endDate=dt,
48 48 ext=DTYPES[rconf.datatype])
49 49 nFiles = len(files)
50 50 if nFiles == 0:
51 51 continue
52 52 skip = int(math.ceil(nFiles/n))
53 53 while nFiles > cursor*skip:
54 54 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
55 55 skip=skip)
56 56 p = project.clone()
57 57 p.start()
58 58 processes.append(p)
59 59 cursor += 1
60 60
61 61 def beforeExit(exctype, value, trace):
62 62 for process in processes:
63 63 process.terminate()
64 64 process.join()
65 65 print traceback.print_tb(trace)
66 66
67 67 sys.excepthook = beforeExit
68 68
69 69 for process in processes:
70 70 process.join()
71 71 process.terminate()
72 72
73 73 time.sleep(3)
74 74
75 75 class ParameterConf():
76 76
77 77 id = None
78 78 name = None
79 79 value = None
80 80 format = None
81 81
82 82 __formated_value = None
83 83
84 84 ELEMENTNAME = 'Parameter'
85 85
86 86 def __init__(self):
87 87
88 88 self.format = 'str'
89 89
90 90 def getElementName(self):
91 91
92 92 return self.ELEMENTNAME
93 93
94 94 def getValue(self):
95 95
96 96 value = self.value
97 97 format = self.format
98 98
99 99 if self.__formated_value != None:
100 100
101 101 return self.__formated_value
102 102
103 103 if format == 'obj':
104 104 return value
105 105
106 106 if format == 'str':
107 107 self.__formated_value = str(value)
108 108 return self.__formated_value
109 109
110 110 if value == '':
111 111 raise ValueError, '%s: This parameter value is empty' %self.name
112 112
113 113 if format == 'list':
114 114 strList = value.split(',')
115 115
116 116 self.__formated_value = strList
117 117
118 118 return self.__formated_value
119 119
120 120 if format == 'intlist':
121 121 '''
122 122 Example:
123 123 value = (0,1,2)
124 124 '''
125 125
126 126 new_value = ast.literal_eval(value)
127 127
128 128 if type(new_value) not in (tuple, list):
129 129 new_value = [int(new_value)]
130 130
131 131 self.__formated_value = new_value
132 132
133 133 return self.__formated_value
134 134
135 135 if format == 'floatlist':
136 136 '''
137 137 Example:
138 138 value = (0.5, 1.4, 2.7)
139 139 '''
140 140
141 141 new_value = ast.literal_eval(value)
142 142
143 143 if type(new_value) not in (tuple, list):
144 144 new_value = [float(new_value)]
145 145
146 146 self.__formated_value = new_value
147 147
148 148 return self.__formated_value
149 149
150 150 if format == 'date':
151 151 strList = value.split('/')
152 152 intList = [int(x) for x in strList]
153 153 date = datetime.date(intList[0], intList[1], intList[2])
154 154
155 155 self.__formated_value = date
156 156
157 157 return self.__formated_value
158 158
159 159 if format == 'time':
160 160 strList = value.split(':')
161 161 intList = [int(x) for x in strList]
162 162 time = datetime.time(intList[0], intList[1], intList[2])
163 163
164 164 self.__formated_value = time
165 165
166 166 return self.__formated_value
167 167
168 168 if format == 'pairslist':
169 169 '''
170 170 Example:
171 171 value = (0,1),(1,2)
172 172 '''
173 173
174 174 new_value = ast.literal_eval(value)
175 175
176 176 if type(new_value) not in (tuple, list):
177 177 raise ValueError, '%s has to be a tuple or list of pairs' %value
178 178
179 179 if type(new_value[0]) not in (tuple, list):
180 180 if len(new_value) != 2:
181 181 raise ValueError, '%s has to be a tuple or list of pairs' %value
182 182 new_value = [new_value]
183 183
184 184 for thisPair in new_value:
185 185 if len(thisPair) != 2:
186 186 raise ValueError, '%s has to be a tuple or list of pairs' %value
187 187
188 188 self.__formated_value = new_value
189 189
190 190 return self.__formated_value
191 191
192 192 if format == 'multilist':
193 193 '''
194 194 Example:
195 195 value = (0,1,2),(3,4,5)
196 196 '''
197 197 multiList = ast.literal_eval(value)
198 198
199 199 if type(multiList[0]) == int:
200 200 multiList = ast.literal_eval('(' + value + ')')
201 201
202 202 self.__formated_value = multiList
203 203
204 204 return self.__formated_value
205 205
206 206 if format == 'bool':
207 207 value = int(value)
208 208
209 209 if format == 'int':
210 210 value = float(value)
211 211
212 212 format_func = eval(format)
213 213
214 214 self.__formated_value = format_func(value)
215 215
216 216 return self.__formated_value
217 217
218 218 def updateId(self, new_id):
219 219
220 220 self.id = str(new_id)
221 221
222 222 def setup(self, id, name, value, format='str'):
223 223 self.id = str(id)
224 224 self.name = name
225 225 if format == 'obj':
226 226 self.value = value
227 227 else:
228 228 self.value = str(value)
229 229 self.format = str.lower(format)
230 230
231 231 self.getValue()
232 232
233 233 return 1
234 234
235 235 def update(self, name, value, format='str'):
236 236
237 237 self.name = name
238 238 self.value = str(value)
239 239 self.format = format
240 240
241 241 def makeXml(self, opElement):
242 242 if self.name not in ('queue',):
243 243 parmElement = SubElement(opElement, self.ELEMENTNAME)
244 244 parmElement.set('id', str(self.id))
245 245 parmElement.set('name', self.name)
246 246 parmElement.set('value', self.value)
247 247 parmElement.set('format', self.format)
248 248
249 249 def readXml(self, parmElement):
250 250
251 251 self.id = parmElement.get('id')
252 252 self.name = parmElement.get('name')
253 253 self.value = parmElement.get('value')
254 254 self.format = str.lower(parmElement.get('format'))
255 255
256 256 #Compatible with old signal chain version
257 257 if self.format == 'int' and self.name == 'idfigure':
258 258 self.name = 'id'
259 259
260 260 def printattr(self):
261 261
262 262 print 'Parameter[%s]: name = %s, value = %s, format = %s' %(self.id, self.name, self.value, self.format)
263 263
264 264 class OperationConf():
265 265
266 266 id = None
267 267 name = None
268 268 priority = None
269 269 type = None
270 270
271 271 parmConfObjList = []
272 272
273 273 ELEMENTNAME = 'Operation'
274 274
275 275 def __init__(self):
276 276
277 277 self.id = '0'
278 278 self.name = None
279 279 self.priority = None
280 280 self.type = 'self'
281 281
282 282
283 283 def __getNewId(self):
284 284
285 285 return int(self.id)*10 + len(self.parmConfObjList) + 1
286 286
287 287 def updateId(self, new_id):
288 288
289 289 self.id = str(new_id)
290 290
291 291 n = 1
292 292 for parmObj in self.parmConfObjList:
293 293
294 294 idParm = str(int(new_id)*10 + n)
295 295 parmObj.updateId(idParm)
296 296
297 297 n += 1
298 298
299 299 def getElementName(self):
300 300
301 301 return self.ELEMENTNAME
302 302
303 303 def getParameterObjList(self):
304 304
305 305 return self.parmConfObjList
306 306
307 307 def getParameterObj(self, parameterName):
308 308
309 309 for parmConfObj in self.parmConfObjList:
310 310
311 311 if parmConfObj.name != parameterName:
312 312 continue
313 313
314 314 return parmConfObj
315 315
316 316 return None
317 317
318 318 def getParameterObjfromValue(self, parameterValue):
319 319
320 320 for parmConfObj in self.parmConfObjList:
321 321
322 322 if parmConfObj.getValue() != parameterValue:
323 323 continue
324 324
325 325 return parmConfObj.getValue()
326 326
327 327 return None
328 328
329 329 def getParameterValue(self, parameterName):
330 330
331 331 parameterObj = self.getParameterObj(parameterName)
332 332
333 333 # if not parameterObj:
334 334 # return None
335 335
336 336 value = parameterObj.getValue()
337 337
338 338 return value
339 339
340 340
341 341 def getKwargs(self):
342 342
343 343 kwargs = {}
344 344
345 345 for parmConfObj in self.parmConfObjList:
346 346 if self.name == 'run' and parmConfObj.name == 'datatype':
347 347 continue
348 348
349 349 kwargs[parmConfObj.name] = parmConfObj.getValue()
350 350
351 351 return kwargs
352 352
353 353 def setup(self, id, name, priority, type):
354 354
355 355 self.id = str(id)
356 356 self.name = name
357 357 self.type = type
358 358 self.priority = priority
359 359
360 360 self.parmConfObjList = []
361 361
362 362 def removeParameters(self):
363 363
364 364 for obj in self.parmConfObjList:
365 365 del obj
366 366
367 367 self.parmConfObjList = []
368 368
369 369 def addParameter(self, name, value, format='str'):
370 370
371 371 if value is None:
372 372 return None
373 373 id = self.__getNewId()
374 374
375 375 parmConfObj = ParameterConf()
376 376 if not parmConfObj.setup(id, name, value, format):
377 377 return None
378 378
379 379 self.parmConfObjList.append(parmConfObj)
380 380
381 381 return parmConfObj
382 382
383 383 def changeParameter(self, name, value, format='str'):
384 384
385 385 parmConfObj = self.getParameterObj(name)
386 386 parmConfObj.update(name, value, format)
387 387
388 388 return parmConfObj
389 389
390 390 def makeXml(self, procUnitElement):
391 391
392 392 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
393 393 opElement.set('id', str(self.id))
394 394 opElement.set('name', self.name)
395 395 opElement.set('type', self.type)
396 396 opElement.set('priority', str(self.priority))
397 397
398 398 for parmConfObj in self.parmConfObjList:
399 399 parmConfObj.makeXml(opElement)
400 400
401 401 def readXml(self, opElement):
402 402
403 403 self.id = opElement.get('id')
404 404 self.name = opElement.get('name')
405 405 self.type = opElement.get('type')
406 406 self.priority = opElement.get('priority')
407 407
408 408 #Compatible with old signal chain version
409 409 #Use of 'run' method instead 'init'
410 410 if self.type == 'self' and self.name == 'init':
411 411 self.name = 'run'
412 412
413 413 self.parmConfObjList = []
414 414
415 415 parmElementList = opElement.iter(ParameterConf().getElementName())
416 416
417 417 for parmElement in parmElementList:
418 418 parmConfObj = ParameterConf()
419 419 parmConfObj.readXml(parmElement)
420 420
421 421 #Compatible with old signal chain version
422 422 #If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
423 423 if self.type != 'self' and self.name == 'Plot':
424 424 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
425 425 self.name = parmConfObj.value
426 426 continue
427 427
428 428 self.parmConfObjList.append(parmConfObj)
429 429
430 430 def printattr(self):
431 431
432 432 print '%s[%s]: name = %s, type = %s, priority = %s' %(self.ELEMENTNAME,
433 433 self.id,
434 434 self.name,
435 435 self.type,
436 436 self.priority)
437 437
438 438 for parmConfObj in self.parmConfObjList:
439 439 parmConfObj.printattr()
440 440
441 441 def createObject(self, plotter_queue=None):
442 442
443 443
444 444 if self.type == 'self':
445 445 raise ValueError, 'This operation type cannot be created'
446 446
447 447 if self.type == 'plotter':
448 448 if not plotter_queue:
449 449 raise ValueError, 'plotter_queue is not defined. Use:\nmyProject = Project()\nmyProject.setPlotterQueue(plotter_queue)'
450 450
451 451 opObj = Plotter(self.name, plotter_queue)
452 452
453 453 if self.type == 'external' or self.type == 'other':
454 454
455 455 className = eval(self.name)
456 456 kwargs = self.getKwargs()
457 457
458 458 opObj = className(**kwargs)
459 459
460 460 return opObj
461 461
462 462
463 463 class ProcUnitConf():
464 464
465 465 id = None
466 466 name = None
467 467 datatype = None
468 468 inputId = None
469 469 parentId = None
470 470
471 471 opConfObjList = []
472 472
473 473 procUnitObj = None
474 474 opObjList = []
475 475
476 476 ELEMENTNAME = 'ProcUnit'
477 477
478 478 def __init__(self):
479 479
480 480 self.id = None
481 481 self.datatype = None
482 482 self.name = None
483 483 self.inputId = None
484 484
485 485 self.opConfObjList = []
486 486
487 487 self.procUnitObj = None
488 488 self.opObjDict = {}
489 489
490 490 def __getPriority(self):
491 491
492 492 return len(self.opConfObjList)+1
493 493
494 494 def __getNewId(self):
495 495
496 496 return int(self.id)*10 + len(self.opConfObjList) + 1
497 497
498 498 def getElementName(self):
499 499
500 500 return self.ELEMENTNAME
501 501
502 502 def getId(self):
503 503
504 504 return self.id
505 505
506 506 def updateId(self, new_id, parentId=parentId):
507 507
508 508
509 509 new_id = int(parentId)*10 + (int(self.id) % 10)
510 510 new_inputId = int(parentId)*10 + (int(self.inputId) % 10)
511 511
512 512 #If this proc unit has not inputs
513 513 if self.inputId == '0':
514 514 new_inputId = 0
515 515
516 516 n = 1
517 517 for opConfObj in self.opConfObjList:
518 518
519 519 idOp = str(int(new_id)*10 + n)
520 520 opConfObj.updateId(idOp)
521 521
522 522 n += 1
523 523
524 524 self.parentId = str(parentId)
525 525 self.id = str(new_id)
526 526 self.inputId = str(new_inputId)
527 527
528 528
529 529 def getInputId(self):
530 530
531 531 return self.inputId
532 532
533 533 def getOperationObjList(self):
534 534
535 535 return self.opConfObjList
536 536
537 537 def getOperationObj(self, name=None):
538 538
539 539 for opConfObj in self.opConfObjList:
540 540
541 541 if opConfObj.name != name:
542 542 continue
543 543
544 544 return opConfObj
545 545
546 546 return None
547 547
548 548 def getOpObjfromParamValue(self, value=None):
549 549
550 550 for opConfObj in self.opConfObjList:
551 551 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
552 552 continue
553 553 return opConfObj
554 554 return None
555 555
556 556 def getProcUnitObj(self):
557 557
558 558 return self.procUnitObj
559 559
560 560 def setup(self, id, name, datatype, inputId, parentId=None):
561 561
562 562 #Compatible with old signal chain version
563 563 if datatype==None and name==None:
564 564 raise ValueError, 'datatype or name should be defined'
565 565
566 566 if name==None:
567 567 if 'Proc' in datatype:
568 568 name = datatype
569 569 else:
570 570 name = '%sProc' %(datatype)
571 571
572 572 if datatype==None:
573 573 datatype = name.replace('Proc','')
574 574
575 575 self.id = str(id)
576 576 self.name = name
577 577 self.datatype = datatype
578 578 self.inputId = inputId
579 579 self.parentId = parentId
580 580
581 581 self.opConfObjList = []
582 582
583 583 self.addOperation(name='run', optype='self')
584 584
585 585 def removeOperations(self):
586 586
587 587 for obj in self.opConfObjList:
588 588 del obj
589 589
590 590 self.opConfObjList = []
591 591 self.addOperation(name='run')
592 592
593 593 def addParameter(self, **kwargs):
594 594 '''
595 595 Add parameters to 'run' operation
596 596 '''
597 597 opObj = self.opConfObjList[0]
598 598
599 599 opObj.addParameter(**kwargs)
600 600
601 601 return opObj
602 602
603 603 def addOperation(self, name, optype='self'):
604 604
605 605 id = self.__getNewId()
606 606 priority = self.__getPriority()
607 607
608 608 opConfObj = OperationConf()
609 609 opConfObj.setup(id, name=name, priority=priority, type=optype)
610 610
611 611 self.opConfObjList.append(opConfObj)
612 612
613 613 return opConfObj
614 614
615 615 def makeXml(self, projectElement):
616 616
617 617 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
618 618 procUnitElement.set('id', str(self.id))
619 619 procUnitElement.set('name', self.name)
620 620 procUnitElement.set('datatype', self.datatype)
621 621 procUnitElement.set('inputId', str(self.inputId))
622 622
623 623 for opConfObj in self.opConfObjList:
624 624 opConfObj.makeXml(procUnitElement)
625 625
626 626 def readXml(self, upElement):
627 627
628 628 self.id = upElement.get('id')
629 629 self.name = upElement.get('name')
630 630 self.datatype = upElement.get('datatype')
631 631 self.inputId = upElement.get('inputId')
632 632
633 633 if self.ELEMENTNAME == 'ReadUnit':
634 634 self.datatype = self.datatype.replace('Reader', '')
635 635
636 636 if self.ELEMENTNAME == 'ProcUnit':
637 637 self.datatype = self.datatype.replace('Proc', '')
638 638
639 639 if self.inputId == 'None':
640 640 self.inputId = '0'
641 641
642 642 self.opConfObjList = []
643 643
644 644 opElementList = upElement.iter(OperationConf().getElementName())
645 645
646 646 for opElement in opElementList:
647 647 opConfObj = OperationConf()
648 648 opConfObj.readXml(opElement)
649 649 self.opConfObjList.append(opConfObj)
650 650
651 651 def printattr(self):
652 652
653 653 print '%s[%s]: name = %s, datatype = %s, inputId = %s' %(self.ELEMENTNAME,
654 654 self.id,
655 655 self.name,
656 656 self.datatype,
657 657 self.inputId)
658 658
659 659 for opConfObj in self.opConfObjList:
660 660 opConfObj.printattr()
661 661
662 662
663 663 def getKwargs(self):
664 664
665 665 opObj = self.opConfObjList[0]
666 666 kwargs = opObj.getKwargs()
667 667
668 668 return kwargs
669 669
670 670 def createObjects(self, plotter_queue=None):
671 671
672 672 className = eval(self.name)
673 673 kwargs = self.getKwargs()
674 674 procUnitObj = className(**kwargs)
675 675
676 676 for opConfObj in self.opConfObjList:
677 677
678 678 if opConfObj.type=='self' and self.name=='run':
679 679 continue
680 680 elif opConfObj.type=='self':
681 681 procUnitObj.addOperationKwargs(opConfObj.id, **opConfObj.getKwargs())
682 682 continue
683 683
684 684 opObj = opConfObj.createObject(plotter_queue)
685 685
686 686 self.opObjDict[opConfObj.id] = opObj
687 687
688 688 procUnitObj.addOperation(opObj, opConfObj.id)
689 689
690 690 self.procUnitObj = procUnitObj
691 691
692 692 return procUnitObj
693 693
694 694 def run(self):
695 695
696 696 is_ok = False
697 697
698 698 for opConfObj in self.opConfObjList:
699 699
700 700 kwargs = {}
701 701 for parmConfObj in opConfObj.getParameterObjList():
702 702 if opConfObj.name == 'run' and parmConfObj.name == 'datatype':
703 703 continue
704 704
705 705 kwargs[parmConfObj.name] = parmConfObj.getValue()
706 706
707 707 sts = self.procUnitObj.call(opType = opConfObj.type,
708 708 opName = opConfObj.name,
709 709 opId = opConfObj.id)
710 710
711 711 is_ok = is_ok or sts
712 712
713 713 return is_ok
714 714
715 715 def close(self):
716 716
717 717 for opConfObj in self.opConfObjList:
718 718 if opConfObj.type == 'self':
719 719 continue
720 720
721 721 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
722 722 opObj.close()
723 723
724 724 self.procUnitObj.close()
725 725
726 726 return
727 727
728 728 class ReadUnitConf(ProcUnitConf):
729 729
730 730 path = None
731 731 startDate = None
732 732 endDate = None
733 733 startTime = None
734 734 endTime = None
735 735
736 736 ELEMENTNAME = 'ReadUnit'
737 737
738 738 def __init__(self):
739 739
740 740 self.id = None
741 741 self.datatype = None
742 742 self.name = None
743 743 self.inputId = None
744 744
745 745 self.parentId = None
746 746
747 747 self.opConfObjList = []
748 748 self.opObjList = []
749 749
750 750 def getElementName(self):
751 751
752 752 return self.ELEMENTNAME
753 753
754 754 def setup(self, id, name, datatype, path='', startDate='', endDate='',
755 755 startTime='', endTime='', parentId=None, server=None, **kwargs):
756 756
757 757 #Compatible with old signal chain version
758 758 if datatype==None and name==None:
759 759 raise ValueError, 'datatype or name should be defined'
760 760
761 761 if name==None:
762 762 if 'Reader' in datatype:
763 763 name = datatype
764 datatype = name.replace('Reader','')
764 765 else:
765 name = '%sReader' %(datatype)
766 name = '{}Reader'.format(datatype)
766 767 if datatype==None:
768 if 'Reader' in name:
767 769 datatype = name.replace('Reader','')
770 else:
771 datatype = name
772 name = '{}Reader'.format(name)
768 773
769 774 self.id = id
770 775 self.name = name
771 776 self.datatype = datatype
772 777 if path != '':
773 778 self.path = os.path.abspath(path)
774 779 self.startDate = startDate
775 780 self.endDate = endDate
776 781 self.startTime = startTime
777 782 self.endTime = endTime
778 783 self.inputId = '0'
779 784 self.parentId = parentId
780 785 self.server = server
781 786 self.addRunOperation(**kwargs)
782 787
783 788 def update(self, **kwargs):
784 789
785 790 if 'datatype' in kwargs:
786 791 datatype = kwargs.pop('datatype')
787 792 if 'Reader' in datatype:
788 793 self.name = datatype
789 794 else:
790 795 self.name = '%sReader' %(datatype)
791 796 self.datatype = self.name.replace('Reader', '')
792 797
793 798 attrs = ('path', 'startDate', 'endDate', 'startTime', 'endTime', 'parentId')
794 799
795 800 for attr in attrs:
796 801 if attr in kwargs:
797 802 setattr(self, attr, kwargs.pop(attr))
798 803
799 804 self.inputId = '0'
800 805 self.updateRunOperation(**kwargs)
801 806
802 807 def removeOperations(self):
803 808
804 809 for obj in self.opConfObjList:
805 810 del obj
806 811
807 812 self.opConfObjList = []
808 813
809 814 def addRunOperation(self, **kwargs):
810 815
811 816 opObj = self.addOperation(name = 'run', optype = 'self')
812 817
813 818 if self.server is None:
814 819 opObj.addParameter(name='datatype', value=self.datatype, format='str')
815 820 opObj.addParameter(name='path', value=self.path, format='str')
816 821 opObj.addParameter(name='startDate', value=self.startDate, format='date')
817 822 opObj.addParameter(name='endDate', value=self.endDate, format='date')
818 823 opObj.addParameter(name='startTime', value=self.startTime, format='time')
819 824 opObj.addParameter(name='endTime', value=self.endTime, format='time')
820 825
821 826 for key, value in kwargs.items():
822 827 opObj.addParameter(name=key, value=value, format=type(value).__name__)
823 828 else:
824 829 opObj.addParameter(name='server' , value=self.server, format='str')
825 830
826 831
827 832 return opObj
828 833
829 834 def updateRunOperation(self, **kwargs):
830 835
831 836 opObj = self.getOperationObj(name='run')
832 837 opObj.removeParameters()
833 838
834 839 opObj.addParameter(name='datatype', value=self.datatype, format='str')
835 840 opObj.addParameter(name='path', value=self.path, format='str')
836 841 opObj.addParameter(name='startDate', value=self.startDate, format='date')
837 842 opObj.addParameter(name='endDate', value=self.endDate, format='date')
838 843 opObj.addParameter(name='startTime', value=self.startTime, format='time')
839 844 opObj.addParameter(name='endTime', value=self.endTime, format='time')
840 845
841 846 for key, value in kwargs.items():
842 847 opObj.addParameter(name=key, value=value, format=type(value).__name__)
843 848
844 849 return opObj
845 850
846 851 def readXml(self, upElement):
847 852
848 853 self.id = upElement.get('id')
849 854 self.name = upElement.get('name')
850 855 self.datatype = upElement.get('datatype')
851 856 self.inputId = upElement.get('inputId')
852 857
853 858 if self.ELEMENTNAME == 'ReadUnit':
854 859 self.datatype = self.datatype.replace('Reader', '')
855 860
856 861 if self.inputId == 'None':
857 862 self.inputId = '0'
858 863
859 864 self.opConfObjList = []
860 865
861 866 opElementList = upElement.iter(OperationConf().getElementName())
862 867
863 868 for opElement in opElementList:
864 869 opConfObj = OperationConf()
865 870 opConfObj.readXml(opElement)
866 871 self.opConfObjList.append(opConfObj)
867 872
868 873 if opConfObj.name == 'run':
869 874 self.path = opConfObj.getParameterValue('path')
870 875 self.startDate = opConfObj.getParameterValue('startDate')
871 876 self.endDate = opConfObj.getParameterValue('endDate')
872 877 self.startTime = opConfObj.getParameterValue('startTime')
873 878 self.endTime = opConfObj.getParameterValue('endTime')
874 879
875 880 class Project(Process):
876 881
877 882 id = None
878 883 # name = None
879 884 description = None
880 885 filename = None
881 886
882 887 procUnitConfObjDict = None
883 888
884 889 ELEMENTNAME = 'Project'
885 890
886 891 plotterQueue = None
887 892
888 893 def __init__(self, plotter_queue=None):
889 894
890 895 Process.__init__(self)
891 896 self.id = None
892 897 # self.name = None
893 898 self.description = None
894 899
895 900 self.plotterQueue = plotter_queue
896 901
897 902 self.procUnitConfObjDict = {}
898 903
899 904 def __getNewId(self):
900 905
901 906 idList = self.procUnitConfObjDict.keys()
902 907
903 908 id = int(self.id)*10
904 909
905 910 while True:
906 911 id += 1
907 912
908 913 if str(id) in idList:
909 914 continue
910 915
911 916 break
912 917
913 918 return str(id)
914 919
915 920 def getElementName(self):
916 921
917 922 return self.ELEMENTNAME
918 923
919 924 def getId(self):
920 925
921 926 return self.id
922 927
923 928 def updateId(self, new_id):
924 929
925 930 self.id = str(new_id)
926 931
927 932 keyList = self.procUnitConfObjDict.keys()
928 933 keyList.sort()
929 934
930 935 n = 1
931 936 newProcUnitConfObjDict = {}
932 937
933 938 for procKey in keyList:
934 939
935 940 procUnitConfObj = self.procUnitConfObjDict[procKey]
936 941 idProcUnit = str(int(self.id)*10 + n)
937 942 procUnitConfObj.updateId(idProcUnit, parentId = self.id)
938 943
939 944 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
940 945 n += 1
941 946
942 947 self.procUnitConfObjDict = newProcUnitConfObjDict
943 948
944 949 def setup(self, id, name='', description=''):
945 950
946 951 print
947 952 print '*'*60
948 953 print ' Starting SIGNAL CHAIN PROCESSING v%s ' % schainpy.__version__
949 954 print '*'*60
950 955 print
951 956 self.id = str(id)
952 957 self.description = description
953 958
954 959 def update(self, name, description):
955 960
956 961 self.description = description
957 962
958 963 def clone(self):
959 964
960 965 p = Project()
961 966 p.procUnitConfObjDict = self.procUnitConfObjDict
962 967 return p
963 968
964 969 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
965 970
966 971 if id is None:
967 972 idReadUnit = self.__getNewId()
968 973 else:
969 974 idReadUnit = str(id)
970 975
971 976 readUnitConfObj = ReadUnitConf()
972 977 readUnitConfObj.setup(idReadUnit, name, datatype, parentId=self.id, **kwargs)
973 978
974 979 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
975 980
976 981 return readUnitConfObj
977 982
978 983 def addProcUnit(self, inputId='0', datatype=None, name=None):
979 984
980 985 idProcUnit = self.__getNewId()
981 986
982 987 procUnitConfObj = ProcUnitConf()
983 988 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, parentId=self.id)
984 989
985 990 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
986 991
987 992 return procUnitConfObj
988 993
989 994 def removeProcUnit(self, id):
990 995
991 996 if id in self.procUnitConfObjDict.keys():
992 997 self.procUnitConfObjDict.pop(id)
993 998
994 999 def getReadUnitId(self):
995 1000
996 1001 readUnitConfObj = self.getReadUnitObj()
997 1002
998 1003 return readUnitConfObj.id
999 1004
1000 1005 def getReadUnitObj(self):
1001 1006
1002 1007 for obj in self.procUnitConfObjDict.values():
1003 1008 if obj.getElementName() == 'ReadUnit':
1004 1009 return obj
1005 1010
1006 1011 return None
1007 1012
1008 1013 def getProcUnitObj(self, id=None, name=None):
1009 1014
1010 1015 if id != None:
1011 1016 return self.procUnitConfObjDict[id]
1012 1017
1013 1018 if name != None:
1014 1019 return self.getProcUnitObjByName(name)
1015 1020
1016 1021 return None
1017 1022
1018 1023 def getProcUnitObjByName(self, name):
1019 1024
1020 1025 for obj in self.procUnitConfObjDict.values():
1021 1026 if obj.name == name:
1022 1027 return obj
1023 1028
1024 1029 return None
1025 1030
1026 1031 def procUnitItems(self):
1027 1032
1028 1033 return self.procUnitConfObjDict.items()
1029 1034
1030 1035 def makeXml(self):
1031 1036
1032 1037 projectElement = Element('Project')
1033 1038 projectElement.set('id', str(self.id))
1034 1039 projectElement.set('name', self.name)
1035 1040 projectElement.set('description', self.description)
1036 1041
1037 1042 for procUnitConfObj in self.procUnitConfObjDict.values():
1038 1043 procUnitConfObj.makeXml(projectElement)
1039 1044
1040 1045 self.projectElement = projectElement
1041 1046
1042 1047 def writeXml(self, filename=None):
1043 1048
1044 1049 if filename == None:
1045 1050 if self.filename:
1046 1051 filename = self.filename
1047 1052 else:
1048 1053 filename = 'schain.xml'
1049 1054
1050 1055 if not filename:
1051 1056 print 'filename has not been defined. Use setFilename(filename) for do it.'
1052 1057 return 0
1053 1058
1054 1059 abs_file = os.path.abspath(filename)
1055 1060
1056 1061 if not os.access(os.path.dirname(abs_file), os.W_OK):
1057 1062 print 'No write permission on %s' %os.path.dirname(abs_file)
1058 1063 return 0
1059 1064
1060 1065 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1061 1066 print 'File %s already exists and it could not be overwriten' %abs_file
1062 1067 return 0
1063 1068
1064 1069 self.makeXml()
1065 1070
1066 1071 ElementTree(self.projectElement).write(abs_file, method='xml')
1067 1072
1068 1073 self.filename = abs_file
1069 1074
1070 1075 return 1
1071 1076
1072 1077 def readXml(self, filename = None):
1073 1078
1074 1079 if not filename:
1075 1080 print 'filename is not defined'
1076 1081 return 0
1077 1082
1078 1083 abs_file = os.path.abspath(filename)
1079 1084
1080 1085 if not os.path.isfile(abs_file):
1081 1086 print '%s file does not exist' %abs_file
1082 1087 return 0
1083 1088
1084 1089 self.projectElement = None
1085 1090 self.procUnitConfObjDict = {}
1086 1091
1087 1092 try:
1088 1093 self.projectElement = ElementTree().parse(abs_file)
1089 1094 except:
1090 1095 print 'Error reading %s, verify file format' %filename
1091 1096 return 0
1092 1097
1093 1098 self.project = self.projectElement.tag
1094 1099
1095 1100 self.id = self.projectElement.get('id')
1096 1101 self.name = self.projectElement.get('name')
1097 1102 self.description = self.projectElement.get('description')
1098 1103
1099 1104 readUnitElementList = self.projectElement.iter(ReadUnitConf().getElementName())
1100 1105
1101 1106 for readUnitElement in readUnitElementList:
1102 1107 readUnitConfObj = ReadUnitConf()
1103 1108 readUnitConfObj.readXml(readUnitElement)
1104 1109
1105 1110 if readUnitConfObj.parentId == None:
1106 1111 readUnitConfObj.parentId = self.id
1107 1112
1108 1113 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1109 1114
1110 1115 procUnitElementList = self.projectElement.iter(ProcUnitConf().getElementName())
1111 1116
1112 1117 for procUnitElement in procUnitElementList:
1113 1118 procUnitConfObj = ProcUnitConf()
1114 1119 procUnitConfObj.readXml(procUnitElement)
1115 1120
1116 1121 if procUnitConfObj.parentId == None:
1117 1122 procUnitConfObj.parentId = self.id
1118 1123
1119 1124 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1120 1125
1121 1126 self.filename = abs_file
1122 1127
1123 1128 return 1
1124 1129
1125 1130 def printattr(self):
1126 1131
1127 1132 print 'Project[%s]: name = %s, description = %s' %(self.id,
1128 1133 self.name,
1129 1134 self.description)
1130 1135
1131 1136 for procUnitConfObj in self.procUnitConfObjDict.values():
1132 1137 procUnitConfObj.printattr()
1133 1138
1134 1139 def createObjects(self):
1135 1140
1136 1141 for procUnitConfObj in self.procUnitConfObjDict.values():
1137 1142 procUnitConfObj.createObjects(self.plotterQueue)
1138 1143
1139 1144 def __connect(self, objIN, thisObj):
1140 1145
1141 1146 thisObj.setInput(objIN.getOutputObj())
1142 1147
1143 1148 def connectObjects(self):
1144 1149
1145 1150 for thisPUConfObj in self.procUnitConfObjDict.values():
1146 1151
1147 1152 inputId = thisPUConfObj.getInputId()
1148 1153
1149 1154 if int(inputId) == 0:
1150 1155 continue
1151 1156
1152 1157 #Get input object
1153 1158 puConfINObj = self.procUnitConfObjDict[inputId]
1154 1159 puObjIN = puConfINObj.getProcUnitObj()
1155 1160
1156 1161 #Get current object
1157 1162 thisPUObj = thisPUConfObj.getProcUnitObj()
1158 1163
1159 1164 self.__connect(puObjIN, thisPUObj)
1160 1165
1161 1166 def __handleError(self, procUnitConfObj, send_email=False):
1162 1167
1163 1168 import socket
1164 1169
1165 1170 err = traceback.format_exception(sys.exc_info()[0],
1166 1171 sys.exc_info()[1],
1167 1172 sys.exc_info()[2])
1168 1173
1169 1174 print '***** Error occurred in %s *****' %(procUnitConfObj.name)
1170 1175 print '***** %s' %err[-1]
1171 1176
1172 1177 message = ''.join(err)
1173 1178
1174 1179 sys.stderr.write(message)
1175 1180
1176 1181 if not send_email:
1177 1182 return
1178 1183
1179 1184 subject = 'SChain v%s: Error running %s\n' %(schainpy.__version__, procUnitConfObj.name)
1180 1185
1181 1186 subtitle = '%s: %s\n' %(procUnitConfObj.getElementName() ,procUnitConfObj.name)
1182 1187 subtitle += 'Hostname: %s\n' %socket.gethostbyname(socket.gethostname())
1183 1188 subtitle += 'Working directory: %s\n' %os.path.abspath('./')
1184 1189 subtitle += 'Configuration file: %s\n' %self.filename
1185 1190 subtitle += 'Time: %s\n' %str(datetime.datetime.now())
1186 1191
1187 1192 readUnitConfObj = self.getReadUnitObj()
1188 1193 if readUnitConfObj:
1189 1194 subtitle += '\nInput parameters:\n'
1190 1195 subtitle += '[Data path = %s]\n' %readUnitConfObj.path
1191 1196 subtitle += '[Data type = %s]\n' %readUnitConfObj.datatype
1192 1197 subtitle += '[Start date = %s]\n' %readUnitConfObj.startDate
1193 1198 subtitle += '[End date = %s]\n' %readUnitConfObj.endDate
1194 1199 subtitle += '[Start time = %s]\n' %readUnitConfObj.startTime
1195 1200 subtitle += '[End time = %s]\n' %readUnitConfObj.endTime
1196 1201
1197 1202 adminObj = schainpy.admin.SchainNotify()
1198 1203 adminObj.sendAlert(message=message,
1199 1204 subject=subject,
1200 1205 subtitle=subtitle,
1201 1206 filename=self.filename)
1202 1207
1203 1208 def isPaused(self):
1204 1209 return 0
1205 1210
1206 1211 def isStopped(self):
1207 1212 return 0
1208 1213
1209 1214 def runController(self):
1210 1215 '''
1211 1216 returns 0 when this process has been stopped, 1 otherwise
1212 1217 '''
1213 1218
1214 1219 if self.isPaused():
1215 1220 print 'Process suspended'
1216 1221
1217 1222 while True:
1218 1223 time.sleep(0.1)
1219 1224
1220 1225 if not self.isPaused():
1221 1226 break
1222 1227
1223 1228 if self.isStopped():
1224 1229 break
1225 1230
1226 1231 print 'Process reinitialized'
1227 1232
1228 1233 if self.isStopped():
1229 1234 print 'Process stopped'
1230 1235 return 0
1231 1236
1232 1237 return 1
1233 1238
1234 1239 def setFilename(self, filename):
1235 1240
1236 1241 self.filename = filename
1237 1242
1238 1243 def setPlotterQueue(self, plotter_queue):
1239 1244
1240 1245 raise NotImplementedError, 'Use schainpy.controller_api.ControllerThread instead Project class'
1241 1246
1242 1247 def getPlotterQueue(self):
1243 1248
1244 1249 raise NotImplementedError, 'Use schainpy.controller_api.ControllerThread instead Project class'
1245 1250
1246 1251 def useExternalPlotter(self):
1247 1252
1248 1253 raise NotImplementedError, 'Use schainpy.controller_api.ControllerThread instead Project class'
1249 1254
1250 1255 def run(self):
1251 1256
1252 1257 log.success('Starting {}'.format(self.name))
1253 1258
1254 1259 self.createObjects()
1255 1260 self.connectObjects()
1256 1261
1257 1262 keyList = self.procUnitConfObjDict.keys()
1258 1263 keyList.sort()
1259 1264
1260 1265 while(True):
1261 1266
1262 1267 is_ok = False
1263 1268
1264 1269 for procKey in keyList:
1265 1270
1266 1271 procUnitConfObj = self.procUnitConfObjDict[procKey]
1267 1272
1268 1273 try:
1269 1274 sts = procUnitConfObj.run()
1270 1275 is_ok = is_ok or sts
1271 1276 except KeyboardInterrupt:
1272 1277 is_ok = False
1273 1278 break
1274 1279 except ValueError, e:
1275 1280 time.sleep(0.5)
1276 1281 self.__handleError(procUnitConfObj, send_email=True)
1277 1282 is_ok = False
1278 1283 break
1279 1284 except:
1280 1285 time.sleep(0.5)
1281 1286 self.__handleError(procUnitConfObj)
1282 1287 is_ok = False
1283 1288 break
1284 1289
1285 1290 #If every process unit finished so end process
1286 1291 if not(is_ok):
1287 1292 break
1288 1293
1289 1294 if not self.runController():
1290 1295 break
1291 1296
1292 1297 #Closing every process
1293 1298 for procKey in keyList:
1294 1299 procUnitConfObj = self.procUnitConfObjDict[procKey]
1295 1300 procUnitConfObj.close()
1296 1301
1297 1302 log.success('{} finished'.format(self.name))
General Comments 0
You need to be logged in to leave comments. Login now