From b013e28003fc667b878150d692b146e6e9f7cd28 2018-07-09 06:18:52 From: Juan C. Espinoza Date: 2018-07-09 06:18:52 Subject: [PATCH] Review MP changes, three types of operations: self, other and external --- diff --git a/schainpy/admin.py b/schainpy/admin.py index 48bfe7e..7dc2984 100644 --- a/schainpy/admin.py +++ b/schainpy/admin.py @@ -11,7 +11,10 @@ import sys import time import traceback import smtplib -import configparser +if sys.version[0] == '3': + from configparser import ConfigParser +else: + from ConfigParser import ConfigParser import io from threading import Thread from multiprocessing import Process @@ -144,7 +147,7 @@ class SchainConfigure(): return # create Parser using standard module ConfigParser - self.__parser = configparser.ConfigParser() + self.__parser = ConfigParser() # read conf file into a StringIO with "[madrigal]\n" section heading prepended strConfFile = io.StringIO("[schain]\n" + self.__confFile.read()) diff --git a/schainpy/controller.py b/schainpy/controller.py index 750fe56..a9371b2 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -12,7 +12,7 @@ import math import time import zmq from multiprocessing import Process, cpu_count - +from threading import Thread from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring from xml.dom import minidom @@ -90,6 +90,18 @@ def MPProject(project, n=cpu_count()): time.sleep(3) +def wait(context): + + time.sleep(1) + c = zmq.Context() + receiver = c.socket(zmq.SUB) + receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id)) + receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode()) + log.error('startinggg') + msg = receiver.recv_multipart()[1] + #log.error(msg) + context.terminate() + class ParameterConf(): id = None @@ -281,13 +293,6 @@ class ParameterConf(): class OperationConf(): - id = None - name = None - priority = None - type = None - - parmConfObjList = [] - ELEMENTNAME = 'Operation' def __init__(self): @@ -369,9 +374,10 @@ class OperationConf(): return kwargs - def setup(self, id, name, priority, type): + def setup(self, id, name, priority, type, project_id): self.id = str(id) + self.project_id = project_id self.name = name self.type = type self.priority = priority @@ -459,29 +465,18 @@ class OperationConf(): def createObject(self): className = eval(self.name) - kwargs = self.getKwargs() - - opObj = className(self.id, **kwargs) - - opObj.start() - - print(' Operation created') + + if self.type == 'other': + opObj = className() + elif self.type == 'external': + kwargs = self.getKwargs() + opObj = className(self.id, self.project_id, **kwargs) + opObj.start() return opObj class ProcUnitConf(): - id = None - name = None - datatype = None - inputId = None - parentId = None - - opConfObjList = [] - - procUnitObj = None - opObjList = [] - ELEMENTNAME = 'ProcUnit' def __init__(self): @@ -490,9 +485,7 @@ class ProcUnitConf(): self.datatype = None self.name = None self.inputId = None - self.opConfObjList = [] - self.procUnitObj = None self.opObjDict = {} @@ -512,7 +505,7 @@ class ProcUnitConf(): return self.id - def updateId(self, new_id, parentId=parentId): + def updateId(self, new_id): ''' new_id = int(parentId) * 10 + (int(self.id) % 10) new_inputId = int(parentId) * 10 + (int(self.inputId) % 10) @@ -534,6 +527,7 @@ class ProcUnitConf(): #self.inputId = str(new_inputId) ''' n = 1 + def getInputId(self): return self.inputId @@ -565,7 +559,7 @@ class ProcUnitConf(): return self.procUnitObj - def setup(self, id, name, datatype, inputId, parentId=None): + def setup(self, project_id, id, name, datatype, inputId): ''' id sera el topico a publicar inputId sera el topico a subscribirse @@ -587,10 +581,10 @@ class ProcUnitConf(): datatype = name.replace('Proc', '') self.id = str(id) + self.project_id = project_id self.name = name self.datatype = datatype self.inputId = inputId - self.parentId = parentId self.opConfObjList = [] self.addOperation(name='run', optype='self') @@ -613,7 +607,7 @@ class ProcUnitConf(): return opObj - def addOperation(self, name, optype = 'self'): + def addOperation(self, name, optype='self'): ''' Actualizacion - > proceso comunicacion En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic @@ -623,10 +617,8 @@ class ProcUnitConf(): id = self.__getNewId() priority = self.__getPriority() # Sin mucho sentido, pero puede usarse - opConfObj = OperationConf() - opConfObj.setup(id, name=name, priority=priority, type=optype) - + opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id) self.opConfObjList.append(opConfObj) return opConfObj @@ -685,62 +677,33 @@ class ProcUnitConf(): return kwargs - def createObjects(self, dictUnits): + def createObjects(self): ''' - Instancia de unidades de procesamiento. - + Instancia de unidades de procesamiento. ''' className = eval(self.name) kwargs = self.getKwargs() - procUnitObj = className(self.id, self.inputId, dictUnits, **kwargs) # necesitan saber su id y su entrada por fines de ipc - + procUnitObj = className(self.id, self.inputId, self.project_id, **kwargs) # necesitan saber su id y su entrada por fines de ipc + log.success('creating process...', self.name) for opConfObj in self.opConfObjList: - - if opConfObj.type == 'self' and self.name == 'run': + + if opConfObj.type == 'self' and opConfObj.name == 'run': continue elif opConfObj.type == 'self': - procUnitObj.addOperationKwargs( - opConfObj.id, **opConfObj.getKwargs()) - continue - print("Creating operation process:", opConfObj.name, "for", self.name) - opObj = opConfObj.createObject() - + opObj = getattr(procUnitObj, opConfObj.name) + else: + opObj = opConfObj.createObject() - #self.opObjDict[opConfObj.id] = opObj.name + log.success('creating operation: {}, type:{}'.format( + opConfObj.name, + opConfObj.type), self.name) - procUnitObj.addOperation(opConfObj.name, opConfObj.id) + procUnitObj.addOperation(opConfObj, opObj) procUnitObj.start() - self.procUnitObj = procUnitObj - - return procUnitObj - - def run(self): - - is_ok = True - """ - for opConfObj in self.opConfObjList: - - kwargs = {} - for parmConfObj in opConfObj.getParameterObjList(): - if opConfObj.name == 'run' and parmConfObj.name == 'datatype': - continue - - kwargs[parmConfObj.name] = parmConfObj.getValue() - - sts = self.procUnitObj.call(opType=opConfObj.type, - opName=opConfObj.name, - opId=opConfObj.id) - - is_ok = is_ok or sts - - """ - return is_ok - - def close(self): for opConfObj in self.opConfObjList: @@ -757,12 +720,6 @@ class ProcUnitConf(): class ReadUnitConf(ProcUnitConf): - path = None - startDate = None - endDate = None - startTime = None - endTime = None - ELEMENTNAME = 'ReadUnit' def __init__(self): @@ -771,18 +728,14 @@ class ReadUnitConf(ProcUnitConf): self.datatype = None self.name = None self.inputId = None - - self.parentId = None - self.opConfObjList = [] - self.opObjList = [] def getElementName(self): return self.ELEMENTNAME - def setup(self, id, name, datatype, path='', startDate='', endDate='', - startTime='', endTime='', parentId=None, server=None, **kwargs): + def setup(self, project_id, id, name, datatype, path='', startDate='', endDate='', + startTime='', endTime='', server=None, **kwargs): ''' @@ -810,6 +763,7 @@ class ReadUnitConf(ProcUnitConf): name = '{}Reader'.format(name) self.id = id + self.project_id = project_id self.name = name self.datatype = datatype if path != '': @@ -818,8 +772,6 @@ class ReadUnitConf(ProcUnitConf): self.endDate = endDate self.startTime = startTime self.endTime = endTime - self.inputId = '0' - self.parentId = parentId self.server = server self.addRunOperation(**kwargs) @@ -834,13 +786,12 @@ class ReadUnitConf(ProcUnitConf): self.datatype = self.name.replace('Reader', '') attrs = ('path', 'startDate', 'endDate', - 'startTime', 'endTime', 'parentId') + 'startTime', 'endTime') for attr in attrs: if attr in kwargs: setattr(self, attr, kwargs.pop(attr)) - self.inputId = '0' self.updateRunOperation(**kwargs) def removeOperations(self): @@ -900,14 +851,10 @@ class ReadUnitConf(ProcUnitConf): self.id = upElement.get('id') self.name = upElement.get('name') self.datatype = upElement.get('datatype') - self.inputId = upElement.get('inputId') if self.ELEMENTNAME == 'ReadUnit': self.datatype = self.datatype.replace('Reader', '') - if self.inputId == 'None': - self.inputId = '0' - self.opConfObjList = [] opElementList = upElement.iter(OperationConf().getElementName()) @@ -927,20 +874,13 @@ class ReadUnitConf(ProcUnitConf): class Project(Process): - id = None - description = None - filename = None - - procUnitConfObjDict = None - ELEMENTNAME = 'Project' - - def __init__(self): Process.__init__(self) self.id = None + self.filename = None self.description = None self.email = None self.alarm = None @@ -949,7 +889,6 @@ class Project(Process): def __getNewId(self): idList = list(self.procUnitConfObjDict.keys()) - id = int(self.id) * 10 while True: @@ -984,13 +923,13 @@ class Project(Process): procUnitConfObj = self.procUnitConfObjDict[procKey] idProcUnit = str(int(self.id) * 10 + n) - procUnitConfObj.updateId(idProcUnit, parentId=self.id) + procUnitConfObj.updateId(idProcUnit) newProcUnitConfObjDict[idProcUnit] = procUnitConfObj n += 1 self.procUnitConfObjDict = newProcUnitConfObjDict - def setup(self, id, name='', description='', email=None, alarm=[]): + def setup(self, id=1, name='', description='', email=None, alarm=[]): print(' ') print('*' * 60) @@ -1031,9 +970,7 @@ class Project(Process): idReadUnit = str(id) readUnitConfObj = ReadUnitConf() - readUnitConfObj.setup(idReadUnit, name, datatype, - parentId=self.id, **kwargs) - + readUnitConfObj.setup(self.id, idReadUnit, name, datatype, **kwargs) self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj return readUnitConfObj @@ -1051,11 +988,8 @@ class Project(Process): ''' idProcUnit = self.__getNewId() #Topico para subscripcion - procUnitConfObj = ProcUnitConf() - procUnitConfObj.setup(idProcUnit, name, datatype, inputId, #topic_read, topic_write, - parentId=self.id) - + procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId) #topic_read, topic_write, self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj return procUnitConfObj @@ -1176,10 +1110,6 @@ class Project(Process): for readUnitElement in readUnitElementList: readUnitConfObj = ReadUnitConf() readUnitConfObj.readXml(readUnitElement) - - if readUnitConfObj.parentId == None: - readUnitConfObj.parentId = self.id - self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj procUnitElementList = self.projectElement.iter( @@ -1188,33 +1118,25 @@ class Project(Process): for procUnitElement in procUnitElementList: procUnitConfObj = ProcUnitConf() procUnitConfObj.readXml(procUnitElement) - - if procUnitConfObj.parentId == None: - procUnitConfObj.parentId = self.id - self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj self.filename = abs_file return 1 - def printattr(self): + def __str__(self): print('Project[%s]: name = %s, description = %s' % (self.id, self.name, self.description)) - for procUnitConfObj in list(self.procUnitConfObjDict.values()): - procUnitConfObj.printattr() + for procUnitConfObj in self.procUnitConfObjDict.values(): + print(procUnitConfObj) def createObjects(self): - for procUnitConfObj in list(self.procUnitConfObjDict.values()): - print("Creating process:", procUnitConfObj.name) - procUnitConfObj.createObjects(self.procUnitConfObjDict) - - - print('All processes were created') + for procUnitConfObj in self.procUnitConfObjDict.values(): + procUnitConfObj.createObjects() def __handleError(self, procUnitConfObj, modes=None, stdout=True): @@ -1305,29 +1227,33 @@ class Project(Process): self.filename = filename def setProxyCom(self): + + if not os.path.exists('/tmp/schain'): + os.mkdir('/tmp/schain') - ctx = zmq.Context() - if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp') - xsub = ctx.socket(zmq.XSUB) - xsub.bind('ipc:///tmp/socketTmp/a') - xpub = ctx.socket(zmq.XPUB) - xpub.bind('ipc:///tmp/socketTmp/b') + self.ctx = zmq.Context() + xpub = self.ctx.socket(zmq.XPUB) + xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id)) + xsub = self.ctx.socket(zmq.XSUB) + xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id)) - print("Controller Ready: Processes and proxy created") - zmq.proxy(xsub, xpub) - - + try: + zmq.proxy(xpub, xsub) + except zmq.ContextTerminated: + xpub.close() + xsub.close() def run(self): - log.success('Starting {}'.format(self.name), tag='') + log.success('Starting {}: {}'.format(self.name, self.id), tag='') self.start_time = time.time() - self.createObjects() + self.createObjects() + # t = Thread(target=wait, args=(self.ctx, )) + # t.start() self.setProxyCom() - + # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo - # Closing every process log.success('{} finished (time: {}s)'.format( self.name, - time.time()-self.start_time)) \ No newline at end of file + time.time()-self.start_time)) diff --git a/schainpy/model/io/jroIO_base.py b/schainpy/model/io/jroIO_base.py index 835a7bd..c250f82 100644 --- a/schainpy/model/io/jroIO_base.py +++ b/schainpy/model/io/jroIO_base.py @@ -898,7 +898,7 @@ class JRODataReader(JRODataIO): newFile = self.__setNextFileOffline() if not(newFile): - raise(schainpy.admin.SchainWarning('No more files to read')) + self.dataOut.error = (-1, 'No more files to read') return 0 if self.verbose: @@ -1052,7 +1052,7 @@ class JRODataReader(JRODataIO): # Skip block out of startTime and endTime while True: if not(self.__setNewBlock()): - raise(schainpy.admin.SchainWarning('No more files')) + self.dataOut.error = (-1, 'No more files to read') return 0 if not(self.readBlock()): @@ -1324,7 +1324,7 @@ class JRODataReader(JRODataIO): sleep(self.delay) if not(fullpath): - raise(schainpy.admin.SchainWarning('There isn\'t any valid file in {}'.format(path))) + self.dataOut.error = (-1, 'There isn\'t any valid file in {}'.format(path)) return self.year = year diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index eb9d967..78e4451 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -18,7 +18,6 @@ import time import pickle import os from multiprocessing import Process - from schainpy.utils import log @@ -36,43 +35,30 @@ class ProcessingUnit(object): """ - # objeto de datos de entrada (Voltage, Spectra o Correlation) + + METHODS = {} dataIn = None dataInList = [] - - # objeto de datos de entrada (Voltage, Spectra o Correlation) - id = None inputId = None - dataOut = None - dictProcs = None - - operations2RunDict = None - isConfig = False def __init__(self): self.dataIn = None self.dataOut = None - self.isConfig = False + self.operations = [] def getAllowedArgs(self): if hasattr(self, '__attrs__'): return self.__attrs__ else: return inspect.getargspec(self.run).args - - def addOperationKwargs(self, objId, **kwargs): - ''' - ''' - - self.operationKwargs[objId] = kwargs - def addOperation(self, opObj, objId): + def addOperation(self, conf, operation): """ This method is used in the controller, and update the dictionary containing the operations to execute. The dict @@ -90,17 +76,14 @@ class ProcessingUnit(object): objId : identificador del objeto, necesario para comunicar con master(procUnit) """ - self.operations2RunDict[objId] = opObj - - return objId - + self.operations.append((operation, conf.type, conf.id, conf.getKwargs())) def getOperationObj(self, objId): - if objId not in list(self.operations2RunDict.keys()): + if objId not in list(self.operations.keys()): return None - return self.operations2RunDict[objId] + return self.operations[objId] def operation(self, **kwargs): @@ -200,339 +183,185 @@ class Operation(object): pass -######### Decorator ######### - - def MPDecorator(BaseClass): """ - "Multiprocessing class decorator" + Multiprocessing class decorator - This function add multiprocessing features to the base class. Also, - it handle the communication beetween processes (readers, procUnits and operations). - Receive the arguments at the moment of instantiation. According to that, discriminates if it - is a procUnit or an operation + This function add multiprocessing features to a BaseClass. Also, it handle + the communication beetween processes (readers, procUnits and operations). """ class MPClass(BaseClass, Process): - - "This is the overwritten class" - operations2RunDict = None - socket_l = None - socket_p = None - socketOP = None - socket_router = None - dictProcs = None - typeProc = None + def __init__(self, *args, **kwargs): super(MPClass, self).__init__() Process.__init__(self) - - self.operationKwargs = {} self.args = args - - - self.operations2RunDict = {} self.kwargs = kwargs - - # The number of arguments (args) determine the type of process + self.sender = None + self.receiver = None + self.name = BaseClass.__name__ if len(self.args) is 3: self.typeProc = "ProcUnit" - self.id = args[0] #topico de publicacion - self.inputId = args[1] #topico de subcripcion - self.dictProcs = args[2] #diccionario de procesos globales + self.id = args[0] + self.inputId = args[1] + self.project_id = args[2] else: self.id = args[0] + self.inputId = args[0] + self.project_id = args[1] self.typeProc = "Operation" - - def addOperationKwargs(self, objId, **kwargs): - - self.operationKwargs[objId] = kwargs def getAllowedArgs(self): if hasattr(self, '__attrs__'): return self.__attrs__ else: - return inspect.getargspec(self.run).args - - - def sockListening(self, topic): - - """ - This function create a socket to receive objects. - The 'topic' argument is related to the publisher process from which the self process is - listening (data). - In the case were the self process is listening to a Reader (proc Unit), - special conditions are introduced to maximize parallelism. - """ - - cont = zmq.Context() - zmq_socket = cont.socket(zmq.SUB) - if not os.path.exists('/tmp/socketTmp'): - os.mkdir('/tmp/socketTmp') - - if 'Reader' in self.dictProcs[self.inputId].name: - zmq_socket.connect('ipc:///tmp/socketTmp/b') - - else: - zmq_socket.connect('ipc:///tmp/socketTmp/%s' % self.inputId) - - #log.error('RECEIVING FROM {} {}'.format(self.inputId, str(topic).encode())) - zmq_socket.setsockopt(zmq.SUBSCRIBE, str(topic).encode()) #yong - - return zmq_socket - - - def listenProc(self, sock): - - """ - This function listen to a ipc addres until a message is recovered. To serialize the - data (object), pickle has been use. - The 'sock' argument is the socket previously connect to an ipc address and with a topic subscription. - """ - - a = sock.recv_multipart() - a = pickle.loads(a[1]) - return a - - def sockPublishing(self): - - """ - This function create a socket for publishing purposes. - Depending on the process type from where is created, it binds or connect - to special IPC addresses. - """ - time.sleep(4) #yong - context = zmq.Context() - zmq_socket = context.socket(zmq.PUB) - if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp') - if 'Reader' in self.dictProcs[self.id].name: - zmq_socket.connect('ipc:///tmp/socketTmp/a') - else: - zmq_socket.bind('ipc:///tmp/socketTmp/%s' % self.id) - - return zmq_socket - - def publishProc(self, sock, data): - - """ - This function publish a python object (data) under a specific topic in a socket (sock). - Usually, the topic is the self id of the process. - """ - - sock.send_multipart([str(self.id).encode(), pickle.dumps(data)]) #yong - - return True - - def sockOp(self): - - """ - This function create a socket for communication purposes with operation processes. - """ - - cont = zmq.Context() - zmq_socket = cont.socket(zmq.DEALER) - - if python_version()[0] == '2': - zmq_socket.setsockopt(zmq.IDENTITY, self.id) - if python_version()[0] == '3': - zmq_socket.setsockopt_string(zmq.IDENTITY, self.id) - - - return zmq_socket - - - def execOp(self, socket, opId, dataObj): - - """ - This function 'execute' an operation main routine by establishing a - connection with it and sending a python object (dataOut). - """ - if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp') - socket.connect('ipc:///tmp/socketTmp/%s' %opId) - - - socket.send(pickle.dumps(dataObj)) #yong + return inspect.getargspec(BaseClass.run).args - argument = socket.recv_multipart()[0] - - argument = pickle.loads(argument) - - return argument - - def sockIO(self): - - """ - Socket defined for an operation process. It is able to recover the object sent from another process as well as a - identifier of who sent it. - """ - - cont = zmq.Context() - if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp') - socket = cont.socket(zmq.ROUTER) - socket.bind('ipc:///tmp/socketTmp/%s' % self.id) - - return socket - - def funIOrec(self, socket): - - """ - Operation method, recover the id of the process who sent a python object. - The 'socket' argument is the socket binded to a specific process ipc. - """ - - #id_proc = socket.recv() - - #dataObj = socket.recv_pyobj() + def subscribe(self): + ''' + This function create a socket to receive objects from the + topic `inputId`. + ''' - dataObj = socket.recv_multipart() + c = zmq.Context() + self.receiver = c.socket(zmq.SUB) + self.receiver.connect('ipc:///tmp/schain/{}_pub'.format(self.project_id)) + self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) + + def listen(self): + ''' + This function waits for objects and deserialize using pickle + ''' - dataObj[1] = pickle.loads(dataObj[1]) - return dataObj[0], dataObj[1] + data = pickle.loads(self.receiver.recv_multipart()[1]) + return data - def funIOsen(self, socket, data, dest): + def set_publisher(self): + ''' + This function create a socket for publishing purposes. + ''' - """ - Operation method, send a python object to a specific destination. - The 'dest' argument is the id of a proccesinf unit. - """ - - socket.send_multipart([dest, pickle.dumps(data)]) #yong + time.sleep(1) + c = zmq.Context() + self.sender = c.socket(zmq.PUB) + self.sender.connect('ipc:///tmp/schain/{}_sub'.format(self.project_id)) - return True + def publish(self, data, id): + ''' + This function publish an object, to a specific topic. + ''' + self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) def runReader(self): - - # time.sleep(3) + ''' + Run fuction for read units + ''' while True: BaseClass.run(self, **self.kwargs) - - keyList = list(self.operations2RunDict.keys()) - keyList.sort() - - for key in keyList: - self.socketOP = self.sockOp() - self.dataOut = self.execOp(self.socketOP, key, self.dataOut) - - - if self.flagNoMoreFiles: #Usar un objeto con flags para saber si termino el proc o hubo un error - self.publishProc(self.socket_p, "Finish") + if self.dataOut.error[0] == -1: + log.error(self.dataOut.error[1]) + self.publish('end', self.id) + #self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()]) break + for op, optype, id, kwargs in self.operations: + if optype=='self': + op(**kwargs) + elif optype=='other': + self.dataOut = op.run(self.dataOut, **self.kwargs) + elif optype=='external': + self.publish(self.dataOut, opId) + if self.dataOut.flagNoData: continue - #print("Publishing data...") - self.publishProc(self.socket_p, self.dataOut) - # time.sleep(2) - - - print("%s done" %BaseClass.__name__) - return 0 - + self.publish(self.dataOut, self.id) + def runProc(self): - - # All the procUnits with kwargs that require a setup initialization must be defined here. - - if self.setupReq: - BaseClass.setup(self, **self.kwargs) + ''' + Run function for proccessing units + ''' while True: - self.dataIn = self.listenProc(self.socket_l) - #print("%s received data" %BaseClass.__name__) - - if self.dataIn == "Finish": - break + self.dataIn = self.listen() - m_arg = list(self.kwargs.keys()) - num_arg = list(range(1,int(BaseClass.run.__code__.co_argcount))) - - run_arg = {} - - for var in num_arg: - if BaseClass.run.__code__.co_varnames[var] in m_arg: - run_arg[BaseClass.run.__code__.co_varnames[var]] = self.kwargs[BaseClass.run.__code__.co_varnames[var]] - - #BaseClass.run(self, **self.kwargs) - BaseClass.run(self, **run_arg) - - ## Iterar sobre una serie de data que podrias aplicarse - - for m_name in BaseClass.METHODS: + if self.dataIn == 'end': + self.publish('end', self.id) + for op, optype, opId, kwargs in self.operations: + if optype == 'external': + self.publish('end', opId) + break - met_arg = {} + if self.dataIn.flagNoData: + continue - for arg in m_arg: - if arg in BaseClass.METHODS[m_name]: - for att in BaseClass.METHODS[m_name]: - met_arg[att] = self.kwargs[att] + BaseClass.run(self, **self.kwargs) - method = getattr(BaseClass, m_name) - method(self, **met_arg) - break + for op, optype, opId, kwargs in self.operations: + if optype=='self': + op(**kwargs) + elif optype=='other': + self.dataOut = op.run(self.dataOut, **kwargs) + elif optype=='external': + self.publish(self.dataOut, opId) if self.dataOut.flagNoData: continue - keyList = list(self.operations2RunDict.keys()) - keyList.sort() - - for key in keyList: - - self.socketOP = self.sockOp() - self.dataOut = self.execOp(self.socketOP, key, self.dataOut) - - - self.publishProc(self.socket_p, self.dataOut) - - - print("%s done" %BaseClass.__name__) - - return 0 + self.publish(self.dataOut, self.id) def runOp(self): + ''' + Run function for operations + ''' while True: - [self.dest ,self.buffer] = self.funIOrec(self.socket_router) - - self.buffer = BaseClass.run(self, self.buffer, **self.kwargs) + dataOut = self.listen() - self.funIOsen(self.socket_router, self.buffer, self.dest) - - print("%s done" %BaseClass.__name__) - return 0 - - + if dataOut == 'end': + break + + BaseClass.run(self, dataOut, **self.kwargs) + def run(self): - - if self.typeProc is "ProcUnit": - - self.socket_p = self.sockPublishing() - if 'Reader' not in self.dictProcs[self.id].name: - self.socket_l = self.sockListening(self.inputId) - self.runProc() + if self.typeProc is "ProcUnit": + + if self.inputId is not None: + self.subscribe() + self.set_publisher() + if 'Reader' not in BaseClass.__name__: + self.runProc() else: - self.runReader() elif self.typeProc is "Operation": - self.socket_router = self.sockIO() - + self.subscribe() self.runOp() else: raise ValueError("Unknown type") - return 0 - + print("%s done" % BaseClass.__name__) + self.close() + + def close(self): + + if self.sender: + self.sender.close() + + if self.receiver: + self.receiver.close() + return MPClass \ No newline at end of file diff --git a/schainpy/model/proc/jroproc_spectra.py b/schainpy/model/proc/jroproc_spectra.py index c02f770..5a7246c 100644 --- a/schainpy/model/proc/jroproc_spectra.py +++ b/schainpy/model/proc/jroproc_spectra.py @@ -127,8 +127,6 @@ class SpectraProc(ProcessingUnit): def run(self, nProfiles=None, nFFTPoints=None, pairsList=[], ippFactor=None, shift_fft=False): - self.dataOut.flagNoData = True - if self.dataIn.type == "Spectra": self.dataOut.copy(self.dataIn) # if not pairsList: @@ -783,7 +781,7 @@ class SpectraProc(ProcessingUnit): return 1 -@MPDecorator + class IncohInt(Operation): __profIndex = 0 @@ -962,5 +960,5 @@ class IncohInt(Operation): dataOut.nIncohInt *= self.n dataOut.utctime = avgdatatime dataOut.flagNoData = False - + return dataOut \ No newline at end of file diff --git a/schainpy/model/proc/jroproc_voltage.py b/schainpy/model/proc/jroproc_voltage.py index 9b61a70..765322d 100644 --- a/schainpy/model/proc/jroproc_voltage.py +++ b/schainpy/model/proc/jroproc_voltage.py @@ -1,7 +1,7 @@ import sys import numpy from scipy import interpolate -from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator, Operation +from schainpy.model.proc.jroproc_base import ProcessingUnit,, Operation from schainpy.model.data.jrodata import Voltage from schainpy.utils import log from time import time @@ -10,16 +10,13 @@ from time import time @MPDecorator class VoltageProc(ProcessingUnit): - METHODS = {} #yong + def __init__(self): - def __init__(self):#, **kwargs): #yong + ProcessingUnit.__init__(self) - ProcessingUnit.__init__(self)#, **kwargs) - - # self.objectDict = {} self.dataOut = Voltage() self.flip = 1 - self.setupReq = False #yong + self.setupReq = False def run(self): @@ -319,7 +316,7 @@ class VoltageProc(ProcessingUnit): self.dataOut.data[:,:,botLim:topLim+1] = ynew # import collections -@MPDecorator + class CohInt(Operation): isConfig = False @@ -581,7 +578,7 @@ class CohInt(Operation): # dataOut.timeInterval = dataOut.ippSeconds * dataOut.nCohInt dataOut.flagNoData = False return dataOut -@MPDecorator + class Decoder(Operation): isConfig = False @@ -774,7 +771,7 @@ class Decoder(Operation): return dataOut # dataOut.flagDeflipData = True #asumo q la data no esta sin flip -@MPDecorator + class ProfileConcat(Operation): isConfig = False @@ -825,7 +822,7 @@ class ProfileConcat(Operation): dataOut.heightList = numpy.arange(dataOut.heightList[0], xf, deltaHeight) dataOut.ippSeconds *= m return dataOut -@MPDecorator + class ProfileSelector(Operation): profileIndex = None @@ -986,7 +983,7 @@ class ProfileSelector(Operation): #return False return dataOut -@MPDecorator + class Reshaper(Operation): def __init__(self):#, **kwargs): @@ -1091,7 +1088,7 @@ class Reshaper(Operation): dataOut.ippSeconds /= self.__nTxs return dataOut -@MPDecorator + class SplitProfiles(Operation): def __init__(self):#, **kwargs): @@ -1133,7 +1130,7 @@ class SplitProfiles(Operation): dataOut.ippSeconds /= n return dataOut -@MPDecorator + class CombineProfiles(Operation): def __init__(self):#, **kwargs):