diff --git a/schainpy/controller.py b/schainpy/controller.py index dc9c5d8..60b4179 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -39,7 +39,7 @@ class ConfBase(): def getId(self): return self.id - + def getNewId(self): return int(self.id) * 10 + len(self.operations) + 1 @@ -61,7 +61,7 @@ class ConfBase(): for key, value in self.parameters.items(): if value not in (None, '', ' '): params[key] = value - + return params def update(self, **kwargs): @@ -99,21 +99,21 @@ class ConfBase(): params[key] = str(value) return params - + def makeXml(self, element): xml = SubElement(element, self.ELEMENTNAME) for label in self.xml_labels: xml.set(label, str(getattr(self, label))) - + for key, value in self.getParameters().items(): xml_param = SubElement(xml, 'Parameter') xml_param.set('name', key) xml_param.set('value', value) - + for conf in self.operations: conf.makeXml(xml) - + def __str__(self): if self.ELEMENTNAME == 'Operation': @@ -126,7 +126,7 @@ class ConfBase(): s += ' {}: {}\n'.format(key, value) else: s += ' {}: {}\n'.format(key, value) - + for conf in self.operations: s += str(conf) @@ -179,7 +179,7 @@ class ProcUnitConf(ConfBase): def setup(self, project_id, id, name, datatype, inputId, err_queue): ''' ''' - + if datatype == None and name == None: raise ValueError('datatype or name should be defined') @@ -205,7 +205,7 @@ class ProcUnitConf(ConfBase): i = [1 if x.id==id else 0 for x in self.operations] self.operations.pop(i.index(1)) - + def getOperation(self, id): for conf in self.operations: @@ -233,7 +233,7 @@ class ProcUnitConf(ConfBase): self.err_queue = err_queue self.operations = [] self.parameters = {} - + for elm in element: if elm.tag == 'Parameter': self.addParameter(elm.get('name'), elm.get('value')) @@ -254,21 +254,21 @@ class ProcUnitConf(ConfBase): log.success('creating process...', self.name) for conf in self.operations: - + opObj = conf.createObject() - + log.success('adding operation: {}, type:{}'.format( conf.name, conf.type), self.name) - + procUnitObj.addOperation(conf, opObj) - + self.object = procUnitObj def run(self): ''' ''' - + return self.object.call(**self.getKwargs()) @@ -284,10 +284,10 @@ class ReadUnitConf(ProcUnitConf): self.inputId = None self.operations = [] self.parameters = {} - + def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='', startTime='', endTime='', server=None, **kwargs): - + if datatype == None and name == None: raise ValueError('datatype or name should be defined') if name == None: @@ -307,8 +307,8 @@ class ReadUnitConf(ProcUnitConf): self.project_id = project_id self.name = name self.datatype = datatype - self.err_queue = err_queue - + self.err_queue = err_queue + self.addParameter(name='path', value=path) self.addParameter(name='startDate', value=startDate) self.addParameter(name='endDate', value=endDate) @@ -377,7 +377,7 @@ class Project(Process): def setup(self, id=1, name='', description='', email=None, alarm=[]): self.id = str(id) - self.description = description + self.description = description self.email = email self.alarm = alarm if name: @@ -411,7 +411,7 @@ class Project(Process): conf = ReadUnitConf() conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs) self.configurations[conf.id] = conf - + return conf def addProcUnit(self, id=None, inputId='0', datatype=None, name=None): @@ -423,7 +423,7 @@ class Project(Process): idProcUnit = self.getNewId() else: idProcUnit = id - + conf = ProcUnitConf() conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue) self.configurations[conf.id] = conf @@ -458,7 +458,7 @@ class Project(Process): def updateUnit(self, id, **kwargs): conf = self.configurations[id].update(**kwargs) - + def makeXml(self): xml = Element('Project') @@ -529,7 +529,7 @@ class Project(Process): self.configurations[conf.id] = conf self.filename = abs_file - + return 1 def __str__(self): @@ -553,20 +553,23 @@ class Project(Process): conf = self.configurations[key] conf.createObjects() if conf.inputId is not None: - conf.object.setInput(self.configurations[conf.inputId].object) + if isinstance(conf.inputId, list): + conf.object.setInput([self.configurations[x].object for x in conf.inputId]) + else: + conf.object.setInput([self.configurations[conf.inputId].object]) def monitor(self): t = Thread(target=self._monitor, args=(self.err_queue, self.ctx)) t.start() - + def _monitor(self, queue, ctx): import socket - + procs = 0 err_msg = '' - + while True: msg = queue.get() if '#_start_#' in msg: @@ -575,11 +578,11 @@ class Project(Process): procs -=1 else: err_msg = msg - - if procs == 0 or 'Traceback' in err_msg: + + if procs == 0 or 'Traceback' in err_msg: break time.sleep(0.1) - + if '|' in err_msg: name, err = err_msg.split('|') if 'SchainWarning' in err: @@ -588,11 +591,11 @@ class Project(Process): log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name) else: log.error(err, name) - else: + else: name, err = self.name, err_msg - + time.sleep(1) - + ctx.term() message = ''.join(err) @@ -617,7 +620,7 @@ class Project(Process): subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime'] a = Alarm( - modes=self.alarm, + modes=self.alarm, email=self.email, message=message, subject=subject, @@ -635,10 +638,9 @@ class Project(Process): err = False n = len(self.configurations) - while not err: for conf in self.getUnits(): - ok = conf.run() + ok = conf.run() if ok == 'Error': n -= 1 continue @@ -646,14 +648,14 @@ class Project(Process): break if n == 0: err = True - + def run(self): log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='') self.started = True - self.start_time = time.time() + self.start_time = time.time() self.createObjects() self.runProcs() log.success('{} Done (Time: {:4.2f}s)'.format( self.name, - time.time()-self.start_time), '') + time.time()-self.start_time), '') \ No newline at end of file