''' Updated on January , 2018, for multiprocessing purposes Author: Sergio Cortez Created on September , 2012 ''' from platform import python_version import sys import ast import datetime import traceback import math import time import zmq from multiprocessing import Process, cpu_count from threading import Thread from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring from xml.dom import minidom from schainpy.admin import Alarm, SchainWarning ### Temporary imports!!! # from schainpy.model import * from schainpy.model.io import * from schainpy.model.graphics import * from schainpy.model.proc.jroproc_base import * from schainpy.model.proc.bltrproc_parameters import * from schainpy.model.proc.jroproc_spectra import * from schainpy.model.proc.jroproc_voltage import * from schainpy.model.proc.jroproc_parameters import * from schainpy.model.utils.jroutils_publish import * from schainpy.utils import log ### DTYPES = { 'Voltage': '.r', 'Spectra': '.pdata' } def MPProject(project, n=cpu_count()): ''' Project wrapper to run schain in n processes ''' rconf = project.getReadUnitObj() op = rconf.getOperationObj('run') dt1 = op.getParameterValue('startDate') dt2 = op.getParameterValue('endDate') tm1 = op.getParameterValue('startTime') tm2 = op.getParameterValue('endTime') days = (dt2 - dt1).days for day in range(days + 1): skip = 0 cursor = 0 processes = [] dt = dt1 + datetime.timedelta(day) dt_str = dt.strftime('%Y/%m/%d') reader = JRODataReader() paths, files = reader.searchFilesOffLine(path=rconf.path, startDate=dt, endDate=dt, startTime=tm1, endTime=tm2, ext=DTYPES[rconf.datatype]) nFiles = len(files) if nFiles == 0: continue skip = int(math.ceil(nFiles / n)) while nFiles > cursor * skip: rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor, skip=skip) p = project.clone() p.start() processes.append(p) cursor += 1 def beforeExit(exctype, value, trace): for process in processes: process.terminate() process.join() print(traceback.print_tb(trace)) sys.excepthook = beforeExit for process in processes: process.join() process.terminate() time.sleep(3) def wait(context): time.sleep(1) c = zmq.Context() receiver = c.socket(zmq.SUB) receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id)) receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode()) log.error('startinggg') msg = receiver.recv_multipart()[1] #log.error(msg) context.terminate() class ParameterConf(): id = None name = None value = None format = None __formated_value = None ELEMENTNAME = 'Parameter' def __init__(self): self.format = 'str' def getElementName(self): return self.ELEMENTNAME def getValue(self): value = self.value format = self.format if self.__formated_value != None: return self.__formated_value if format == 'obj': return value if format == 'str': self.__formated_value = str(value) return self.__formated_value if value == '': raise ValueError('%s: This parameter value is empty' % self.name) if format == 'list': strList = value.split(',') self.__formated_value = strList return self.__formated_value if format == 'intlist': ''' Example: value = (0,1,2) ''' new_value = ast.literal_eval(value) if type(new_value) not in (tuple, list): new_value = [int(new_value)] self.__formated_value = new_value return self.__formated_value if format == 'floatlist': ''' Example: value = (0.5, 1.4, 2.7) ''' new_value = ast.literal_eval(value) if type(new_value) not in (tuple, list): new_value = [float(new_value)] self.__formated_value = new_value return self.__formated_value if format == 'date': strList = value.split('/') intList = [int(x) for x in strList] date = datetime.date(intList[0], intList[1], intList[2]) self.__formated_value = date return self.__formated_value if format == 'time': strList = value.split(':') intList = [int(x) for x in strList] time = datetime.time(intList[0], intList[1], intList[2]) self.__formated_value = time return self.__formated_value if format == 'pairslist': ''' Example: value = (0,1),(1,2) ''' new_value = ast.literal_eval(value) if type(new_value) not in (tuple, list): raise ValueError('%s has to be a tuple or list of pairs' % value) if type(new_value[0]) not in (tuple, list): if len(new_value) != 2: raise ValueError('%s has to be a tuple or list of pairs' % value) new_value = [new_value] for thisPair in new_value: if len(thisPair) != 2: raise ValueError('%s has to be a tuple or list of pairs' % value) self.__formated_value = new_value return self.__formated_value if format == 'multilist': ''' Example: value = (0,1,2),(3,4,5) ''' multiList = ast.literal_eval(value) if type(multiList[0]) == int: multiList = ast.literal_eval('(' + value + ')') self.__formated_value = multiList return self.__formated_value if format == 'bool': value = int(value) if format == 'int': value = float(value) format_func = eval(format) self.__formated_value = format_func(value) return self.__formated_value def updateId(self, new_id): self.id = str(new_id) def setup(self, id, name, value, format='str'): self.id = str(id) self.name = name if format == 'obj': self.value = value else: self.value = str(value) self.format = str.lower(format) self.getValue() return 1 def update(self, name, value, format='str'): self.name = name self.value = str(value) self.format = format def makeXml(self, opElement): if self.name not in ('queue',): parmElement = SubElement(opElement, self.ELEMENTNAME) parmElement.set('id', str(self.id)) parmElement.set('name', self.name) parmElement.set('value', self.value) parmElement.set('format', self.format) def readXml(self, parmElement): self.id = parmElement.get('id') self.name = parmElement.get('name') self.value = parmElement.get('value') self.format = str.lower(parmElement.get('format')) # Compatible with old signal chain version if self.format == 'int' and self.name == 'idfigure': self.name = 'id' def printattr(self): print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id)) class OperationConf(): ELEMENTNAME = 'Operation' def __init__(self): self.id = '0' self.name = None self.priority = None self.topic = None def __getNewId(self): return int(self.id) * 10 + len(self.parmConfObjList) + 1 def getId(self): return self.id def updateId(self, new_id): self.id = str(new_id) n = 1 for parmObj in self.parmConfObjList: idParm = str(int(new_id) * 10 + n) parmObj.updateId(idParm) n += 1 def getElementName(self): return self.ELEMENTNAME def getParameterObjList(self): return self.parmConfObjList def getParameterObj(self, parameterName): for parmConfObj in self.parmConfObjList: if parmConfObj.name != parameterName: continue return parmConfObj return None def getParameterObjfromValue(self, parameterValue): for parmConfObj in self.parmConfObjList: if parmConfObj.getValue() != parameterValue: continue return parmConfObj.getValue() return None def getParameterValue(self, parameterName): parameterObj = self.getParameterObj(parameterName) # if not parameterObj: # return None value = parameterObj.getValue() return value def getKwargs(self): kwargs = {} for parmConfObj in self.parmConfObjList: if self.name == 'run' and parmConfObj.name == 'datatype': continue kwargs[parmConfObj.name] = parmConfObj.getValue() return kwargs def setup(self, id, name, priority, type, project_id): self.id = str(id) self.project_id = project_id self.name = name self.type = type self.priority = priority self.parmConfObjList = [] def removeParameters(self): for obj in self.parmConfObjList: del obj self.parmConfObjList = [] def addParameter(self, name, value, format='str'): if value is None: return None id = self.__getNewId() parmConfObj = ParameterConf() if not parmConfObj.setup(id, name, value, format): return None self.parmConfObjList.append(parmConfObj) return parmConfObj def changeParameter(self, name, value, format='str'): parmConfObj = self.getParameterObj(name) parmConfObj.update(name, value, format) return parmConfObj def makeXml(self, procUnitElement): opElement = SubElement(procUnitElement, self.ELEMENTNAME) opElement.set('id', str(self.id)) opElement.set('name', self.name) opElement.set('type', self.type) opElement.set('priority', str(self.priority)) for parmConfObj in self.parmConfObjList: parmConfObj.makeXml(opElement) def readXml(self, opElement, project_id): self.id = opElement.get('id') self.name = opElement.get('name') self.type = opElement.get('type') self.priority = opElement.get('priority') self.project_id = str(project_id) #yong # Compatible with old signal chain version # Use of 'run' method instead 'init' if self.type == 'self' and self.name == 'init': self.name = 'run' self.parmConfObjList = [] parmElementList = opElement.iter(ParameterConf().getElementName()) for parmElement in parmElementList: parmConfObj = ParameterConf() parmConfObj.readXml(parmElement) # Compatible with old signal chain version # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER if self.type != 'self' and self.name == 'Plot': if parmConfObj.format == 'str' and parmConfObj.name == 'type': self.name = parmConfObj.value continue self.parmConfObjList.append(parmConfObj) def printattr(self): print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME, self.id, self.name, self.type, self.priority, self.project_id)) for parmConfObj in self.parmConfObjList: parmConfObj.printattr() def createObject(self): className = eval(self.name) if self.type == 'other': opObj = className() elif self.type == 'external': kwargs = self.getKwargs() opObj = className(self.id, self.project_id, **kwargs) opObj.start() return opObj class ProcUnitConf(): ELEMENTNAME = 'ProcUnit' def __init__(self): self.id = None self.datatype = None self.name = None self.inputId = None self.opConfObjList = [] self.procUnitObj = None self.opObjDict = {} def __getPriority(self): return len(self.opConfObjList) + 1 def __getNewId(self): return int(self.id) * 10 + len(self.opConfObjList) + 1 def getElementName(self): return self.ELEMENTNAME def getId(self): return self.id def updateId(self, new_id): ''' new_id = int(parentId) * 10 + (int(self.id) % 10) new_inputId = int(parentId) * 10 + (int(self.inputId) % 10) # If this proc unit has not inputs #if self.inputId == '0': #new_inputId = 0 n = 1 for opConfObj in self.opConfObjList: idOp = str(int(new_id) * 10 + n) opConfObj.updateId(idOp) n += 1 self.parentId = str(parentId) self.id = str(new_id) #self.inputId = str(new_inputId) ''' n = 1 def getInputId(self): return self.inputId def getOperationObjList(self): return self.opConfObjList def getOperationObj(self, name=None): for opConfObj in self.opConfObjList: if opConfObj.name != name: continue return opConfObj return None def getOpObjfromParamValue(self, value=None): for opConfObj in self.opConfObjList: if opConfObj.getParameterObjfromValue(parameterValue=value) != value: continue return opConfObj return None def getProcUnitObj(self): return self.procUnitObj def setup(self, project_id, id, name, datatype, inputId): ''' id sera el topico a publicar inputId sera el topico a subscribirse ''' # Compatible with old signal chain version if datatype == None and name == None: raise ValueError('datatype or name should be defined') #Definir una condicion para inputId cuando sea 0 if name == None: if 'Proc' in datatype: name = datatype else: name = '%sProc' % (datatype) if datatype == None: datatype = name.replace('Proc', '') self.id = str(id) self.project_id = project_id self.name = name self.datatype = datatype self.inputId = inputId self.opConfObjList = [] self.addOperation(name='run', optype='self') def removeOperations(self): for obj in self.opConfObjList: del obj self.opConfObjList = [] self.addOperation(name='run') def addParameter(self, **kwargs): ''' Add parameters to 'run' operation ''' opObj = self.opConfObjList[0] opObj.addParameter(**kwargs) return opObj def addOperation(self, name, optype='self'): ''' Actualizacion - > proceso comunicacion En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic definir el tipoc de socket o comunicacion ipc++ ''' id = self.__getNewId() priority = self.__getPriority() # Sin mucho sentido, pero puede usarse opConfObj = OperationConf() opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id) self.opConfObjList.append(opConfObj) return opConfObj def makeXml(self, projectElement): procUnitElement = SubElement(projectElement, self.ELEMENTNAME) procUnitElement.set('id', str(self.id)) procUnitElement.set('name', self.name) procUnitElement.set('datatype', self.datatype) procUnitElement.set('inputId', str(self.inputId)) for opConfObj in self.opConfObjList: opConfObj.makeXml(procUnitElement) def readXml(self, upElement, project_id): self.id = upElement.get('id') self.name = upElement.get('name') self.datatype = upElement.get('datatype') self.inputId = upElement.get('inputId') self.project_id = str(project_id) if self.ELEMENTNAME == 'ReadUnit': self.datatype = self.datatype.replace('Reader', '') if self.ELEMENTNAME == 'ProcUnit': self.datatype = self.datatype.replace('Proc', '') if self.inputId == 'None': self.inputId = '0' self.opConfObjList = [] opElementList = upElement.iter(OperationConf().getElementName()) for opElement in opElementList: opConfObj = OperationConf() opConfObj.readXml(opElement, project_id) self.opConfObjList.append(opConfObj) def printattr(self): print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME, self.id, self.name, self.datatype, self.inputId, self.project_id)) for opConfObj in self.opConfObjList: opConfObj.printattr() def getKwargs(self): opObj = self.opConfObjList[0] kwargs = opObj.getKwargs() return kwargs def createObjects(self): ''' Instancia de unidades de procesamiento. ''' className = eval(self.name) kwargs = self.getKwargs() procUnitObj = className(self.id, self.inputId, self.project_id, **kwargs) # necesitan saber su id y su entrada por fines de ipc log.success('creating process...', self.name) for opConfObj in self.opConfObjList: if opConfObj.type == 'self' and opConfObj.name == 'run': continue elif opConfObj.type == 'self': opObj = getattr(procUnitObj, opConfObj.name) else: opObj = opConfObj.createObject() log.success('creating operation: {}, type:{}'.format( opConfObj.name, opConfObj.type), self.name) procUnitObj.addOperation(opConfObj, opObj) procUnitObj.start() self.procUnitObj = procUnitObj def close(self): for opConfObj in self.opConfObjList: if opConfObj.type == 'self': continue opObj = self.procUnitObj.getOperationObj(opConfObj.id) opObj.close() self.procUnitObj.close() return class ReadUnitConf(ProcUnitConf): ELEMENTNAME = 'ReadUnit' def __init__(self): self.id = None self.datatype = None self.name = None self.inputId = None self.opConfObjList = [] def getElementName(self): return self.ELEMENTNAME def setup(self, project_id, id, name, datatype, path='', startDate='', endDate='', startTime='', endTime='', server=None, **kwargs): ''' *****el id del proceso sera el Topico Adicion de {topic}, si no esta presente -> error kwargs deben ser trasmitidos en la instanciacion ''' # Compatible with old signal chain version if datatype == None and name == None: raise ValueError('datatype or name should be defined') if name == None: if 'Reader' in datatype: name = datatype datatype = name.replace('Reader','') else: name = '{}Reader'.format(datatype) if datatype == None: if 'Reader' in name: datatype = name.replace('Reader','') else: datatype = name name = '{}Reader'.format(name) self.id = id self.project_id = project_id self.name = name self.datatype = datatype if path != '': self.path = os.path.abspath(path) self.startDate = startDate self.endDate = endDate self.startTime = startTime self.endTime = endTime self.server = server self.addRunOperation(**kwargs) def update(self, **kwargs): if 'datatype' in kwargs: datatype = kwargs.pop('datatype') if 'Reader' in datatype: self.name = datatype else: self.name = '%sReader' % (datatype) self.datatype = self.name.replace('Reader', '') attrs = ('path', 'startDate', 'endDate', 'startTime', 'endTime') for attr in attrs: if attr in kwargs: setattr(self, attr, kwargs.pop(attr)) self.updateRunOperation(**kwargs) def removeOperations(self): for obj in self.opConfObjList: del obj self.opConfObjList = [] def addRunOperation(self, **kwargs): opObj = self.addOperation(name='run', optype='self') if self.server is None: opObj.addParameter( name='datatype', value=self.datatype, format='str') opObj.addParameter(name='path', value=self.path, format='str') opObj.addParameter( name='startDate', value=self.startDate, format='date') opObj.addParameter( name='endDate', value=self.endDate, format='date') opObj.addParameter( name='startTime', value=self.startTime, format='time') opObj.addParameter( name='endTime', value=self.endTime, format='time') for key, value in list(kwargs.items()): opObj.addParameter(name=key, value=value, format=type(value).__name__) else: opObj.addParameter(name='server', value=self.server, format='str') return opObj def updateRunOperation(self, **kwargs): opObj = self.getOperationObj(name='run') opObj.removeParameters() opObj.addParameter(name='datatype', value=self.datatype, format='str') opObj.addParameter(name='path', value=self.path, format='str') opObj.addParameter( name='startDate', value=self.startDate, format='date') opObj.addParameter(name='endDate', value=self.endDate, format='date') opObj.addParameter( name='startTime', value=self.startTime, format='time') opObj.addParameter(name='endTime', value=self.endTime, format='time') for key, value in list(kwargs.items()): opObj.addParameter(name=key, value=value, format=type(value).__name__) return opObj def readXml(self, upElement, project_id): self.id = upElement.get('id') self.name = upElement.get('name') self.datatype = upElement.get('datatype') self.project_id = str(project_id) #yong if self.ELEMENTNAME == 'ReadUnit': self.datatype = self.datatype.replace('Reader', '') self.opConfObjList = [] opElementList = upElement.iter(OperationConf().getElementName()) for opElement in opElementList: opConfObj = OperationConf() opConfObj.readXml(opElement, project_id) self.opConfObjList.append(opConfObj) if opConfObj.name == 'run': self.path = opConfObj.getParameterValue('path') self.startDate = opConfObj.getParameterValue('startDate') self.endDate = opConfObj.getParameterValue('endDate') self.startTime = opConfObj.getParameterValue('startTime') self.endTime = opConfObj.getParameterValue('endTime') class Project(Process): ELEMENTNAME = 'Project' def __init__(self): Process.__init__(self) self.id = None self.filename = None self.description = None self.email = None self.alarm = None self.procUnitConfObjDict = {} def __getNewId(self): idList = list(self.procUnitConfObjDict.keys()) id = int(self.id) * 10 while True: id += 1 if str(id) in idList: continue break return str(id) def getElementName(self): return self.ELEMENTNAME def getId(self): return self.id def updateId(self, new_id): self.id = str(new_id) keyList = list(self.procUnitConfObjDict.keys()) keyList.sort() n = 1 newProcUnitConfObjDict = {} for procKey in keyList: procUnitConfObj = self.procUnitConfObjDict[procKey] idProcUnit = str(int(self.id) * 10 + n) procUnitConfObj.updateId(idProcUnit) newProcUnitConfObjDict[idProcUnit] = procUnitConfObj n += 1 self.procUnitConfObjDict = newProcUnitConfObjDict def setup(self, id=1, name='', description='', email=None, alarm=[]): print(' ') print('*' * 60) print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__) print('*' * 60) print("* Python " + python_version() + " *") print('*' * 19) print(' ') self.id = str(id) self.description = description self.email = email self.alarm = alarm def update(self, **kwargs): for key, value in list(kwargs.items()): setattr(self, key, value) def clone(self): p = Project() p.procUnitConfObjDict = self.procUnitConfObjDict return p def addReadUnit(self, id=None, datatype=None, name=None, **kwargs): ''' Actualizacion: Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data) ''' if id is None: idReadUnit = self.__getNewId() else: idReadUnit = str(id) readUnitConfObj = ReadUnitConf() readUnitConfObj.setup(self.id, idReadUnit, name, datatype, **kwargs) self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj return readUnitConfObj def addProcUnit(self, inputId='0', datatype=None, name=None): ''' Actualizacion: Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit) Deberia reemplazar a "inputId" ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica. ''' idProcUnit = self.__getNewId() #Topico para subscripcion procUnitConfObj = ProcUnitConf() procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId) #topic_read, topic_write, self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj return procUnitConfObj def removeProcUnit(self, id): if id in list(self.procUnitConfObjDict.keys()): self.procUnitConfObjDict.pop(id) def getReadUnitId(self): readUnitConfObj = self.getReadUnitObj() return readUnitConfObj.id def getReadUnitObj(self): for obj in list(self.procUnitConfObjDict.values()): if obj.getElementName() == 'ReadUnit': return obj return None def getProcUnitObj(self, id=None, name=None): if id != None: return self.procUnitConfObjDict[id] if name != None: return self.getProcUnitObjByName(name) return None def getProcUnitObjByName(self, name): for obj in list(self.procUnitConfObjDict.values()): if obj.name == name: return obj return None def procUnitItems(self): return list(self.procUnitConfObjDict.items()) def makeXml(self): projectElement = Element('Project') projectElement.set('id', str(self.id)) projectElement.set('name', self.name) projectElement.set('description', self.description) for procUnitConfObj in list(self.procUnitConfObjDict.values()): procUnitConfObj.makeXml(projectElement) self.projectElement = projectElement def writeXml(self, filename=None): if filename == None: if self.filename: filename = self.filename else: filename = 'schain.xml' if not filename: print('filename has not been defined. Use setFilename(filename) for do it.') return 0 abs_file = os.path.abspath(filename) if not os.access(os.path.dirname(abs_file), os.W_OK): print('No write permission on %s' % os.path.dirname(abs_file)) return 0 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)): print('File %s already exists and it could not be overwriten' % abs_file) return 0 self.makeXml() ElementTree(self.projectElement).write(abs_file, method='xml') self.filename = abs_file return 1 def readXml(self, filename=None): if not filename: print('filename is not defined') return 0 abs_file = os.path.abspath(filename) if not os.path.isfile(abs_file): print('%s file does not exist' % abs_file) return 0 self.projectElement = None self.procUnitConfObjDict = {} try: self.projectElement = ElementTree().parse(abs_file) except: print('Error reading %s, verify file format' % filename) return 0 self.project = self.projectElement.tag self.id = self.projectElement.get('id') self.name = self.projectElement.get('name') self.description = self.projectElement.get('description') readUnitElementList = self.projectElement.iter( ReadUnitConf().getElementName()) for readUnitElement in readUnitElementList: readUnitConfObj = ReadUnitConf() readUnitConfObj.readXml(readUnitElement, self.id) self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj procUnitElementList = self.projectElement.iter( ProcUnitConf().getElementName()) for procUnitElement in procUnitElementList: procUnitConfObj = ProcUnitConf() procUnitConfObj.readXml(procUnitElement, self.id) self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj self.filename = abs_file return 1 def __str__(self): print('Project[%s]: name = %s, description = %s, project_id = %s' % (self.id, self.name, self.description, self.project_id)) for procUnitConfObj in self.procUnitConfObjDict.values(): print(procUnitConfObj) def createObjects(self): for procUnitConfObj in self.procUnitConfObjDict.values(): procUnitConfObj.createObjects() def __handleError(self, procUnitConfObj, modes=None, stdout=True): import socket if modes is None: modes = self.alarm if not self.alarm: modes = [] err = traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2]) log.error('{}'.format(err[-1]), procUnitConfObj.name) message = ''.join(err) if stdout: sys.stderr.write(message) subject = 'SChain v%s: Error running %s\n' % ( schainpy.__version__, procUnitConfObj.name) subtitle = '%s: %s\n' % ( procUnitConfObj.getElementName(), procUnitConfObj.name) subtitle += 'Hostname: %s\n' % socket.gethostbyname( socket.gethostname()) subtitle += 'Working directory: %s\n' % os.path.abspath('./') subtitle += 'Configuration file: %s\n' % self.filename subtitle += 'Time: %s\n' % str(datetime.datetime.now()) readUnitConfObj = self.getReadUnitObj() if readUnitConfObj: subtitle += '\nInput parameters:\n' subtitle += '[Data path = %s]\n' % readUnitConfObj.path subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate subtitle += '[End date = %s]\n' % readUnitConfObj.endDate subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime subtitle += '[End time = %s]\n' % readUnitConfObj.endTime a = Alarm( modes=modes, email=self.email, message=message, subject=subject, subtitle=subtitle, filename=self.filename ) return a def isPaused(self): return 0 def isStopped(self): return 0 def runController(self): ''' returns 0 when this process has been stopped, 1 otherwise ''' if self.isPaused(): print('Process suspended') while True: time.sleep(0.1) if not self.isPaused(): break if self.isStopped(): break print('Process reinitialized') if self.isStopped(): print('Process stopped') return 0 return 1 def setFilename(self, filename): self.filename = filename def setProxyCom(self): if not os.path.exists('/tmp/schain'): os.mkdir('/tmp/schain') self.ctx = zmq.Context() xpub = self.ctx.socket(zmq.XPUB) xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id)) xsub = self.ctx.socket(zmq.XSUB) xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id)) try: zmq.proxy(xpub, xsub) except zmq.ContextTerminated: xpub.close() xsub.close() def run(self): log.success('Starting {}: {}'.format(self.name, self.id), tag='') self.start_time = time.time() self.createObjects() # t = Thread(target=wait, args=(self.ctx, )) # t.start() self.setProxyCom() # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo log.success('{} finished (time: {}s)'.format( self.name, time.time()-self.start_time))