##// END OF EJS Templates
Multiprocessing for writing Units(Spectral, Voltage and Parameters)
Multiprocessing for writing Units(Spectral, Voltage and Parameters)

File last commit:

r1179:6414333e2ace
r1179:6414333e2ace
Show More
jroproc_base.py
366 lines | 10.3 KiB | text/x-python | PythonLexer
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 '''
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Updated for multiprocessing
Author : Sergio Cortez
Jan 2018
Abstract:
Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
Based on:
$Author: murco $
$Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 '''
George Yong
Multiprocessing for Spectra (all operation) working
r1171 from platform import python_version
José Chávez
checking misspelled kwargs in operations/processing units
r929 import inspect
George Yong
Multiprocessing for Spectra (all operation) working
r1171 import zmq
import time
import pickle
import os
from multiprocessing import Process
from schainpy.utils import log
José Chávez
checking misspelled kwargs in operations/processing units
r929
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487
Miguel Valdez
Merge with branch schain_julia_drifts from rev. 803 to 995....
r568 class ProcessingUnit(object):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Update - Jan 2018 - MULTIPROCESSING
All the "call" methods present in the previous base were removed.
The majority of operations are independant processes, thus
the decorator is in charge of communicate the operation processes
with the proccessing unit via IPC.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 The constructor does not receive any argument. The remaining methods
are related with the operations to execute.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177
METHODS = {}
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 dataIn = None
dataInList = []
George Yong
Multiprocessing for Spectra (all operation) working
r1171 id = None
inputId = None
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 dataOut = None
George Yong
Multiprocessing for Spectra (all operation) working
r1171 dictProcs = None
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 isConfig = False
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def __init__(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 self.dataIn = None
Miguel Valdez
A new SendToServer Unit has been created to upload files to a remote server....
r573 self.dataOut = None
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 self.isConfig = False
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.operations = []
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
José Chávez
checking misspelled kwargs in operations/processing units
r929 def getAllowedArgs(self):
Juan C. Espinoza
Add __attrs__ attribute to Process clasess to improve CLI finder
r1097 if hasattr(self, '__attrs__'):
return self.__attrs__
else:
return inspect.getargspec(self.run).args
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 def addOperation(self, conf, operation):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
George Yong
Multiprocessing for Spectra (all operation) working
r1171 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
posses the id of the operation process (IPC purposes)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
identificador asociado a este objeto.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Input:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 object : objeto de la clase "Operation"
Return:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 objId : identificador del objeto, necesario para comunicar con master(procUnit)
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.operations.append((operation, conf.type, conf.id, conf.getKwargs()))
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Miguel Valdez
r577 def getOperationObj(self, objId):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if objId not in list(self.operations.keys()):
Miguel Valdez
r577 return None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 return self.operations[objId]
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 def operation(self, **kwargs):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
atributos del objeto dataOut
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 Input:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 **kwargs : Diccionario de argumentos de la funcion a ejecutar
"""
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def setup(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def run(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def close(self):
#Close every thread, queue or any other object here is it is neccesary.
return
class Operation(object):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
Update - Jan 2018 - MULTIPROCESSING
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
The constructor doe snot receive any argument, neither the baseclass.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
acumulacion dentro de esta clase
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
id = None
__buffer = None
dest = None
isConfig = False
readyFlag = None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def __init__(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.buffer = None
self.dest = None
self.isConfig = False
self.readyFlag = False
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 if not hasattr(self, 'name'):
self.name = self.__class__.__name__
def getAllowedArgs(self):
if hasattr(self, '__attrs__'):
return self.__attrs__
else:
return inspect.getargspec(self.run).args
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def setup(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.isConfig = True
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def run(self, dataIn, **kwargs):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
atributos del objeto dataIn.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Miguel Valdez
Merge with branch schain_julia_drifts from rev. 803 to 995....
r568 Input:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 dataIn : objeto del tipo JROData
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Return:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Affected:
__buffer : buffer de recepcion de datos.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
if not self.isConfig:
self.setup(**kwargs)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def close(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 pass
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def MPDecorator(BaseClass):
"""
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 Multiprocessing class decorator
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 This function add multiprocessing features to a BaseClass. Also, it handle
the communication beetween processes (readers, procUnits and operations).
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
class MPClass(BaseClass, Process):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def __init__(self, *args, **kwargs):
super(MPClass, self).__init__()
Process.__init__(self)
self.operationKwargs = {}
self.args = args
self.kwargs = kwargs
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.sender = None
self.receiver = None
self.name = BaseClass.__name__
George Yong
Multiprocessing for Spectra (all operation) working
r1171
if len(self.args) is 3:
self.typeProc = "ProcUnit"
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.id = args[0]
self.inputId = args[1]
self.project_id = args[2]
George Yong
Multiprocessing for Spectra (all operation) working
r1171 else:
self.id = args[0]
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.inputId = args[0]
self.project_id = args[1]
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.typeProc = "Operation"
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def getAllowedArgs(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 if hasattr(self, '__attrs__'):
return self.__attrs__
else:
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 return inspect.getargspec(BaseClass.run).args
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 def subscribe(self):
'''
This function create a socket to receive objects from the
topic `inputId`.
'''
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 c = zmq.Context()
self.receiver = c.socket(zmq.SUB)
self.receiver.connect('ipc:///tmp/schain/{}_pub'.format(self.project_id))
self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
def listen(self):
'''
This function waits for objects and deserialize using pickle
'''
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 data = pickle.loads(self.receiver.recv_multipart()[1])
return data
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 def set_publisher(self):
'''
This function create a socket for publishing purposes.
'''
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 time.sleep(1)
c = zmq.Context()
self.sender = c.socket(zmq.PUB)
self.sender.connect('ipc:///tmp/schain/{}_sub'.format(self.project_id))
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 def publish(self, data, id):
'''
This function publish an object, to a specific topic.
'''
José Chávez
checking misspelled kwargs in operations/processing units
r929
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def runReader(self):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 '''
Run fuction for read units
'''
George Yong
Multiprocessing for Spectra (all operation) working
r1171 while True:
BaseClass.run(self, **self.kwargs)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if self.dataOut.error[0] == -1:
log.error(self.dataOut.error[1])
self.publish('end', self.id)
#self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
George Yong
Multiprocessing for Spectra (all operation) working
r1171 break
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 for op, optype, id, kwargs in self.operations:
if optype=='self':
op(**kwargs)
elif optype=='other':
self.dataOut = op.run(self.dataOut, **self.kwargs)
elif optype=='external':
self.publish(self.dataOut, opId)
George Yong
Multiprocessing for Spectra (all operation) working
r1171 if self.dataOut.flagNoData:
continue
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.publish(self.dataOut, self.id)
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def runProc(self):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 '''
Run function for proccessing units
'''
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 while True:
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.dataIn = self.listen()
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if self.dataIn == 'end':
self.publish('end', self.id)
for op, optype, opId, kwargs in self.operations:
if optype == 'external':
self.publish('end', opId)
break
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if self.dataIn.flagNoData:
continue
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 BaseClass.run(self, **self.kwargs)
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 for op, optype, opId, kwargs in self.operations:
if optype=='self':
op(**kwargs)
elif optype=='other':
self.dataOut = op.run(self.dataOut, **kwargs)
elif optype=='external':
self.publish(self.dataOut, opId)
George Yong
Multiprocessing for Spectra (all operation) working
r1171
if self.dataOut.flagNoData:
continue
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.publish(self.dataOut, self.id)
George Yong
Multiprocessing for Spectra (all operation) working
r1171
def runOp(self):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 '''
Run function for operations
'''
George Yong
Multiprocessing for Spectra (all operation) working
r1171
while True:
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 dataOut = self.listen()
George Yong
Multiprocessing for writing Units(Spectral, Voltage and Parameters)
r1179
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if dataOut == 'end':
break
BaseClass.run(self, dataOut, **self.kwargs)
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def run(self):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if self.typeProc is "ProcUnit":
if self.inputId is not None:
self.subscribe()
self.set_publisher()
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if 'Reader' not in BaseClass.__name__:
self.runProc()
George Yong
Multiprocessing for Spectra (all operation) working
r1171 else:
self.runReader()
elif self.typeProc is "Operation":
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.subscribe()
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.runOp()
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 else:
raise ValueError("Unknown type")
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 print("%s done" % BaseClass.__name__)
self.close()
def close(self):
if self.sender:
self.sender.close()
if self.receiver:
self.receiver.close()
George Yong
Multiprocessing for Spectra (all operation) working
r1171 return MPClass