@@ -39,7 +39,7 class ConfBase(): | |||
|
39 | 39 | def getId(self): |
|
40 | 40 | |
|
41 | 41 | return self.id |
|
42 | ||
|
42 | ||
|
43 | 43 | def getNewId(self): |
|
44 | 44 | |
|
45 | 45 | return int(self.id) * 10 + len(self.operations) + 1 |
@@ -61,7 +61,7 class ConfBase(): | |||
|
61 | 61 | for key, value in self.parameters.items(): |
|
62 | 62 | if value not in (None, '', ' '): |
|
63 | 63 | params[key] = value |
|
64 | ||
|
64 | ||
|
65 | 65 | return params |
|
66 | 66 | |
|
67 | 67 | def update(self, **kwargs): |
@@ -99,21 +99,21 class ConfBase(): | |||
|
99 | 99 | params[key] = str(value) |
|
100 | 100 | |
|
101 | 101 | return params |
|
102 | ||
|
102 | ||
|
103 | 103 | def makeXml(self, element): |
|
104 | 104 | |
|
105 | 105 | xml = SubElement(element, self.ELEMENTNAME) |
|
106 | 106 | for label in self.xml_labels: |
|
107 | 107 | xml.set(label, str(getattr(self, label))) |
|
108 | ||
|
108 | ||
|
109 | 109 | for key, value in self.getParameters().items(): |
|
110 | 110 | xml_param = SubElement(xml, 'Parameter') |
|
111 | 111 | xml_param.set('name', key) |
|
112 | 112 | xml_param.set('value', value) |
|
113 | ||
|
113 | ||
|
114 | 114 | for conf in self.operations: |
|
115 | 115 | conf.makeXml(xml) |
|
116 | ||
|
116 | ||
|
117 | 117 | def __str__(self): |
|
118 | 118 | |
|
119 | 119 | if self.ELEMENTNAME == 'Operation': |
@@ -126,7 +126,7 class ConfBase(): | |||
|
126 | 126 | s += ' {}: {}\n'.format(key, value) |
|
127 | 127 | else: |
|
128 | 128 | s += ' {}: {}\n'.format(key, value) |
|
129 | ||
|
129 | ||
|
130 | 130 | for conf in self.operations: |
|
131 | 131 | s += str(conf) |
|
132 | 132 | |
@@ -179,7 +179,7 class ProcUnitConf(ConfBase): | |||
|
179 | 179 | def setup(self, project_id, id, name, datatype, inputId, err_queue): |
|
180 | 180 | ''' |
|
181 | 181 | ''' |
|
182 | ||
|
182 | ||
|
183 | 183 | if datatype == None and name == None: |
|
184 | 184 | raise ValueError('datatype or name should be defined') |
|
185 | 185 | |
@@ -205,7 +205,7 class ProcUnitConf(ConfBase): | |||
|
205 | 205 | |
|
206 | 206 | i = [1 if x.id==id else 0 for x in self.operations] |
|
207 | 207 | self.operations.pop(i.index(1)) |
|
208 | ||
|
208 | ||
|
209 | 209 | def getOperation(self, id): |
|
210 | 210 | |
|
211 | 211 | for conf in self.operations: |
@@ -233,7 +233,7 class ProcUnitConf(ConfBase): | |||
|
233 | 233 | self.err_queue = err_queue |
|
234 | 234 | self.operations = [] |
|
235 | 235 | self.parameters = {} |
|
236 | ||
|
236 | ||
|
237 | 237 | for elm in element: |
|
238 | 238 | if elm.tag == 'Parameter': |
|
239 | 239 | self.addParameter(elm.get('name'), elm.get('value')) |
@@ -254,21 +254,21 class ProcUnitConf(ConfBase): | |||
|
254 | 254 | log.success('creating process...', self.name) |
|
255 | 255 | |
|
256 | 256 | for conf in self.operations: |
|
257 | ||
|
257 | ||
|
258 | 258 | opObj = conf.createObject() |
|
259 | ||
|
259 | ||
|
260 | 260 | log.success('adding operation: {}, type:{}'.format( |
|
261 | 261 | conf.name, |
|
262 | 262 | conf.type), self.name) |
|
263 | ||
|
263 | ||
|
264 | 264 | procUnitObj.addOperation(conf, opObj) |
|
265 | ||
|
265 | ||
|
266 | 266 | self.object = procUnitObj |
|
267 | 267 | |
|
268 | 268 | def run(self): |
|
269 | 269 | ''' |
|
270 | 270 | ''' |
|
271 | ||
|
271 | ||
|
272 | 272 | return self.object.call(**self.getKwargs()) |
|
273 | 273 | |
|
274 | 274 | |
@@ -284,10 +284,10 class ReadUnitConf(ProcUnitConf): | |||
|
284 | 284 | self.inputId = None |
|
285 | 285 | self.operations = [] |
|
286 | 286 | self.parameters = {} |
|
287 | ||
|
287 | ||
|
288 | 288 | def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='', |
|
289 | 289 | startTime='', endTime='', server=None, **kwargs): |
|
290 | ||
|
290 | ||
|
291 | 291 | if datatype == None and name == None: |
|
292 | 292 | raise ValueError('datatype or name should be defined') |
|
293 | 293 | if name == None: |
@@ -307,8 +307,8 class ReadUnitConf(ProcUnitConf): | |||
|
307 | 307 | self.project_id = project_id |
|
308 | 308 | self.name = name |
|
309 | 309 | self.datatype = datatype |
|
310 |
self.err_queue = err_queue |
|
|
311 | ||
|
310 | self.err_queue = err_queue | |
|
311 | ||
|
312 | 312 | self.addParameter(name='path', value=path) |
|
313 | 313 | self.addParameter(name='startDate', value=startDate) |
|
314 | 314 | self.addParameter(name='endDate', value=endDate) |
@@ -377,7 +377,7 class Project(Process): | |||
|
377 | 377 | def setup(self, id=1, name='', description='', email=None, alarm=[]): |
|
378 | 378 | |
|
379 | 379 | self.id = str(id) |
|
380 |
self.description = description |
|
|
380 | self.description = description | |
|
381 | 381 | self.email = email |
|
382 | 382 | self.alarm = alarm |
|
383 | 383 | if name: |
@@ -411,7 +411,7 class Project(Process): | |||
|
411 | 411 | conf = ReadUnitConf() |
|
412 | 412 | conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs) |
|
413 | 413 | self.configurations[conf.id] = conf |
|
414 | ||
|
414 | ||
|
415 | 415 | return conf |
|
416 | 416 | |
|
417 | 417 | def addProcUnit(self, id=None, inputId='0', datatype=None, name=None): |
@@ -423,7 +423,7 class Project(Process): | |||
|
423 | 423 | idProcUnit = self.getNewId() |
|
424 | 424 | else: |
|
425 | 425 | idProcUnit = id |
|
426 | ||
|
426 | ||
|
427 | 427 | conf = ProcUnitConf() |
|
428 | 428 | conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue) |
|
429 | 429 | self.configurations[conf.id] = conf |
@@ -458,7 +458,7 class Project(Process): | |||
|
458 | 458 | def updateUnit(self, id, **kwargs): |
|
459 | 459 | |
|
460 | 460 | conf = self.configurations[id].update(**kwargs) |
|
461 | ||
|
461 | ||
|
462 | 462 | def makeXml(self): |
|
463 | 463 | |
|
464 | 464 | xml = Element('Project') |
@@ -529,7 +529,7 class Project(Process): | |||
|
529 | 529 | self.configurations[conf.id] = conf |
|
530 | 530 | |
|
531 | 531 | self.filename = abs_file |
|
532 | ||
|
532 | ||
|
533 | 533 | return 1 |
|
534 | 534 | |
|
535 | 535 | def __str__(self): |
@@ -553,20 +553,23 class Project(Process): | |||
|
553 | 553 | conf = self.configurations[key] |
|
554 | 554 | conf.createObjects() |
|
555 | 555 | if conf.inputId is not None: |
|
556 | conf.object.setInput(self.configurations[conf.inputId].object) | |
|
556 | if isinstance(conf.inputId, list): | |
|
557 | conf.object.setInput([self.configurations[x].object for x in conf.inputId]) | |
|
558 | else: | |
|
559 | conf.object.setInput([self.configurations[conf.inputId].object]) | |
|
557 | 560 | |
|
558 | 561 | def monitor(self): |
|
559 | 562 | |
|
560 | 563 | t = Thread(target=self._monitor, args=(self.err_queue, self.ctx)) |
|
561 | 564 | t.start() |
|
562 | ||
|
565 | ||
|
563 | 566 | def _monitor(self, queue, ctx): |
|
564 | 567 | |
|
565 | 568 | import socket |
|
566 | ||
|
569 | ||
|
567 | 570 | procs = 0 |
|
568 | 571 | err_msg = '' |
|
569 | ||
|
572 | ||
|
570 | 573 | while True: |
|
571 | 574 | msg = queue.get() |
|
572 | 575 | if '#_start_#' in msg: |
@@ -575,11 +578,11 class Project(Process): | |||
|
575 | 578 | procs -=1 |
|
576 | 579 | else: |
|
577 | 580 | err_msg = msg |
|
578 | ||
|
579 |
if procs == 0 or 'Traceback' in err_msg: |
|
|
581 | ||
|
582 | if procs == 0 or 'Traceback' in err_msg: | |
|
580 | 583 | break |
|
581 | 584 | time.sleep(0.1) |
|
582 | ||
|
585 | ||
|
583 | 586 | if '|' in err_msg: |
|
584 | 587 | name, err = err_msg.split('|') |
|
585 | 588 | if 'SchainWarning' in err: |
@@ -588,11 +591,11 class Project(Process): | |||
|
588 | 591 | log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name) |
|
589 | 592 | else: |
|
590 | 593 | log.error(err, name) |
|
591 |
else: |
|
|
594 | else: | |
|
592 | 595 | name, err = self.name, err_msg |
|
593 | ||
|
596 | ||
|
594 | 597 | time.sleep(1) |
|
595 | ||
|
598 | ||
|
596 | 599 | ctx.term() |
|
597 | 600 | |
|
598 | 601 | message = ''.join(err) |
@@ -617,7 +620,7 class Project(Process): | |||
|
617 | 620 | subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime'] |
|
618 | 621 | |
|
619 | 622 | a = Alarm( |
|
620 |
modes=self.alarm, |
|
|
623 | modes=self.alarm, | |
|
621 | 624 | email=self.email, |
|
622 | 625 | message=message, |
|
623 | 626 | subject=subject, |
@@ -635,10 +638,9 class Project(Process): | |||
|
635 | 638 | |
|
636 | 639 | err = False |
|
637 | 640 | n = len(self.configurations) |
|
638 | ||
|
639 | 641 | while not err: |
|
640 | 642 | for conf in self.getUnits(): |
|
641 |
ok = conf.run() |
|
|
643 | ok = conf.run() | |
|
642 | 644 | if ok == 'Error': |
|
643 | 645 | n -= 1 |
|
644 | 646 | continue |
@@ -646,14 +648,14 class Project(Process): | |||
|
646 | 648 | break |
|
647 | 649 | if n == 0: |
|
648 | 650 | err = True |
|
649 | ||
|
651 | ||
|
650 | 652 | def run(self): |
|
651 | 653 | |
|
652 | 654 | log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='') |
|
653 | 655 | self.started = True |
|
654 |
self.start_time = time.time() |
|
|
656 | self.start_time = time.time() | |
|
655 | 657 | self.createObjects() |
|
656 | 658 | self.runProcs() |
|
657 | 659 | log.success('{} Done (Time: {:4.2f}s)'.format( |
|
658 | 660 | self.name, |
|
659 |
time.time()-self.start_time), '') |
|
|
661 | time.time()-self.start_time), '') No newline at end of file |
General Comments 0
You need to be logged in to leave comments.
Login now