jroproc_base.py
537 lines
| 15.6 KiB
| text/x-python
|
PythonLexer
|
r487 | ''' | |
|
r1171 | Updated for multiprocessing | |
Author : Sergio Cortez | |||
Jan 2018 | |||
Abstract: | |||
Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created. | |||
The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated. | |||
The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self'). | |||
Based on: | |||
$Author: murco $ | |||
$Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $ | |||
|
r487 | ''' | |
|
r1171 | from platform import python_version | |
|
r929 | import inspect | |
|
r1171 | import zmq | |
import time | |||
import pickle | |||
import os | |||
from multiprocessing import Process | |||
|
r929 | ||
|
r1171 | from schainpy.utils import log | |
|
r929 | ||
|
r487 | ||
|
r568 | class ProcessingUnit(object): | |
|
r860 | ||
|
r487 | """ | |
|
r1171 | Update - Jan 2018 - MULTIPROCESSING | |
All the "call" methods present in the previous base were removed. | |||
The majority of operations are independant processes, thus | |||
the decorator is in charge of communicate the operation processes | |||
with the proccessing unit via IPC. | |||
|
r860 | ||
|
r1171 | The constructor does not receive any argument. The remaining methods | |
are related with the operations to execute. | |||
|
r860 | ||
|
r487 | """ | |
# objeto de datos de entrada (Voltage, Spectra o Correlation) | |||
dataIn = None | |||
dataInList = [] | |||
|
r860 | ||
|
r487 | # objeto de datos de entrada (Voltage, Spectra o Correlation) | |
|
r1171 | ||
id = None | |||
inputId = None | |||
|
r487 | dataOut = None | |
|
r860 | ||
|
r1171 | dictProcs = None | |
|
r487 | operations2RunDict = None | |
|
r860 | ||
|
r487 | isConfig = False | |
|
r860 | ||
|
r1171 | def __init__(self): | |
|
r860 | ||
|
r487 | self.dataIn = None | |
|
r573 | self.dataOut = None | |
|
r860 | ||
|
r487 | self.isConfig = False | |
|
r860 | ||
|
r929 | def getAllowedArgs(self): | |
|
r1097 | if hasattr(self, '__attrs__'): | |
return self.__attrs__ | |||
else: | |||
return inspect.getargspec(self.run).args | |||
r889 | |||
|
r906 | def addOperationKwargs(self, objId, **kwargs): | |
''' | |||
''' | |||
|
r929 | ||
|
r906 | self.operationKwargs[objId] = kwargs | |
|
r1171 | ||
|
r487 | def addOperation(self, opObj, objId): | |
|
r860 | ||
|
r487 | """ | |
|
r1171 | This method is used in the controller, and update the dictionary containing the operations to execute. The dict | |
posses the id of the operation process (IPC purposes) | |||
|
r860 | ||
|
r1171 | Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el | |
identificador asociado a este objeto. | |||
|
r860 | ||
|
r1171 | Input: | |
|
r860 | ||
|
r1171 | object : objeto de la clase "Operation" | |
Return: | |||
|
r860 | ||
|
r1171 | objId : identificador del objeto, necesario para comunicar con master(procUnit) | |
|
r487 | """ | |
|
r860 | ||
|
r487 | self.operations2RunDict[objId] = opObj | |
|
r860 | ||
|
r487 | return objId | |
|
r860 | ||
|
r1171 | ||
|
r577 | def getOperationObj(self, objId): | |
|
r860 | ||
|
r1167 | if objId not in list(self.operations2RunDict.keys()): | |
|
r577 | return None | |
|
r860 | ||
|
r577 | return self.operations2RunDict[objId] | |
|
r860 | ||
|
r487 | def operation(self, **kwargs): | |
|
r860 | ||
|
r487 | """ | |
Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los | |||
atributos del objeto dataOut | |||
|
r860 | ||
|
r487 | Input: | |
|
r860 | ||
|
r487 | **kwargs : Diccionario de argumentos de la funcion a ejecutar | |
""" | |||
|
r860 | ||
|
r1171 | raise NotImplementedError | |
|
r860 | ||
|
r1171 | def setup(self): | |
|
r860 | ||
|
r1171 | raise NotImplementedError | |
|
r860 | ||
|
r1171 | def run(self): | |
|
r860 | ||
|
r1171 | raise NotImplementedError | |
|
r860 | ||
|
r1171 | def close(self): | |
#Close every thread, queue or any other object here is it is neccesary. | |||
return | |||
class Operation(object): | |||
|
r860 | ||
|
r1171 | """ | |
Update - Jan 2018 - MULTIPROCESSING | |||
|
r860 | ||
|
r1171 | Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process. | |
The constructor doe snot receive any argument, neither the baseclass. | |||
|
r860 | ||
|
r1171 | Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit | |
y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de | |||
acumulacion dentro de esta clase | |||
|
r860 | ||
|
r1171 | Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer) | |
|
r860 | ||
|
r1171 | """ | |
id = None | |||
__buffer = None | |||
dest = None | |||
isConfig = False | |||
readyFlag = None | |||
|
r860 | ||
|
r1171 | def __init__(self): | |
|
r860 | ||
|
r1171 | self.buffer = None | |
self.dest = None | |||
self.isConfig = False | |||
self.readyFlag = False | |||
|
r860 | ||
|
r1171 | if not hasattr(self, 'name'): | |
self.name = self.__class__.__name__ | |||
def getAllowedArgs(self): | |||
if hasattr(self, '__attrs__'): | |||
return self.__attrs__ | |||
else: | |||
return inspect.getargspec(self.run).args | |||
|
r860 | ||
|
r1171 | def setup(self): | |
|
r860 | ||
|
r1171 | self.isConfig = True | |
|
r860 | ||
|
r1171 | raise NotImplementedError | |
|
r860 | ||
|
r1171 | def run(self, dataIn, **kwargs): | |
|
r860 | ||
|
r487 | """ | |
|
r1171 | Realiza las operaciones necesarias sobre la dataIn.data y actualiza los | |
atributos del objeto dataIn. | |||
|
r860 | ||
|
r568 | Input: | |
|
r860 | ||
|
r1171 | dataIn : objeto del tipo JROData | |
|
r860 | ||
|
r1171 | Return: | |
|
r860 | ||
|
r1171 | None | |
|
r860 | ||
|
r1171 | Affected: | |
__buffer : buffer de recepcion de datos. | |||
|
r860 | ||
|
r1171 | """ | |
if not self.isConfig: | |||
self.setup(**kwargs) | |||
|
r860 | ||
|
r1171 | raise NotImplementedError | |
|
r860 | ||
|
r1171 | def close(self): | |
|
r860 | ||
|
r1171 | pass | |
|
r860 | ||
|
r1171 | ######### Decorator ######### | |
|
r860 | ||
|
r1171 | def MPDecorator(BaseClass): | |
""" | |||
"Multiprocessing class decorator" | |||
|
r860 | ||
|
r1171 | This function add multiprocessing features to the base class. Also, | |
it handle the communication beetween processes (readers, procUnits and operations). | |||
Receive the arguments at the moment of instantiation. According to that, discriminates if it | |||
is a procUnit or an operation | |||
""" | |||
class MPClass(BaseClass, Process): | |||
"This is the overwritten class" | |||
operations2RunDict = None | |||
socket_l = None | |||
socket_p = None | |||
socketOP = None | |||
socket_router = None | |||
dictProcs = None | |||
typeProc = None | |||
def __init__(self, *args, **kwargs): | |||
super(MPClass, self).__init__() | |||
Process.__init__(self) | |||
self.operationKwargs = {} | |||
self.args = args | |||
self.operations2RunDict = {} | |||
self.kwargs = kwargs | |||
# The number of arguments (args) determine the type of process | |||
if len(self.args) is 3: | |||
self.typeProc = "ProcUnit" | |||
self.id = args[0] #topico de publicacion | |||
self.inputId = args[1] #topico de subcripcion | |||
self.dictProcs = args[2] #diccionario de procesos globales | |||
else: | |||
self.id = args[0] | |||
self.typeProc = "Operation" | |||
def addOperationKwargs(self, objId, **kwargs): | |||
self.operationKwargs[objId] = kwargs | |||
|
r860 | ||
|
r1171 | def getAllowedArgs(self): | |
|
r860 | ||
|
r1171 | if hasattr(self, '__attrs__'): | |
return self.__attrs__ | |||
else: | |||
return inspect.getargspec(self.run).args | |||
def sockListening(self, topic): | |||
""" | |||
This function create a socket to receive objects. | |||
The 'topic' argument is related to the publisher process from which the self process is | |||
listening (data). | |||
In the case were the self process is listening to a Reader (proc Unit), | |||
special conditions are introduced to maximize parallelism. | |||
""" | |||
cont = zmq.Context() | |||
zmq_socket = cont.socket(zmq.SUB) | |||
if not os.path.exists('/tmp/socketTmp'): | |||
os.mkdir('/tmp/socketTmp') | |||
if 'Reader' in self.dictProcs[self.inputId].name: | |||
zmq_socket.connect('ipc:///tmp/socketTmp/b') | |||
else: | |||
zmq_socket.connect('ipc:///tmp/socketTmp/%s' % self.inputId) | |||
#log.error('RECEIVING FROM {} {}'.format(self.inputId, str(topic).encode())) | |||
zmq_socket.setsockopt(zmq.SUBSCRIBE, str(topic).encode()) #yong | |||
|
r860 | ||
|
r1171 | return zmq_socket | |
|
r860 | ||
|
r1171 | def listenProc(self, sock): | |
|
r860 | ||
|
r1171 | """ | |
This function listen to a ipc addres until a message is recovered. To serialize the | |||
data (object), pickle has been use. | |||
The 'sock' argument is the socket previously connect to an ipc address and with a topic subscription. | |||
""" | |||
a = sock.recv_multipart() | |||
a = pickle.loads(a[1]) | |||
return a | |||
|
r860 | ||
|
r1171 | def sockPublishing(self): | |
|
r860 | ||
|
r1171 | """ | |
This function create a socket for publishing purposes. | |||
Depending on the process type from where is created, it binds or connect | |||
to special IPC addresses. | |||
""" | |||
time.sleep(4) #yong | |||
context = zmq.Context() | |||
zmq_socket = context.socket(zmq.PUB) | |||
if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp') | |||
if 'Reader' in self.dictProcs[self.id].name: | |||
zmq_socket.connect('ipc:///tmp/socketTmp/a') | |||
else: | |||
zmq_socket.bind('ipc:///tmp/socketTmp/%s' % self.id) | |||
|
r860 | ||
|
r1171 | return zmq_socket | |
|
r860 | ||
|
r1171 | def publishProc(self, sock, data): | |
|
r860 | ||
|
r1171 | """ | |
This function publish a python object (data) under a specific topic in a socket (sock). | |||
Usually, the topic is the self id of the process. | |||
""" | |||
|
r860 | ||
|
r1171 | sock.send_multipart([str(self.id).encode(), pickle.dumps(data)]) #yong | |
return True | |||
r889 | |||
|
r1171 | def sockOp(self): | |
|
r860 | ||
|
r1171 | """ | |
This function create a socket for communication purposes with operation processes. | |||
""" | |||
|
r860 | ||
|
r1171 | cont = zmq.Context() | |
zmq_socket = cont.socket(zmq.DEALER) | |||
if python_version()[0] == '2': | |||
zmq_socket.setsockopt(zmq.IDENTITY, self.id) | |||
if python_version()[0] == '3': | |||
zmq_socket.setsockopt_string(zmq.IDENTITY, self.id) | |||
|
r860 | ||
|
r1171 | return zmq_socket | |
|
r860 | ||
|
r1171 | def execOp(self, socket, opId, dataObj): | |
|
r860 | ||
|
r1171 | """ | |
This function 'execute' an operation main routine by establishing a | |||
connection with it and sending a python object (dataOut). | |||
""" | |||
if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp') | |||
socket.connect('ipc:///tmp/socketTmp/%s' %opId) | |||
socket.send(pickle.dumps(dataObj)) #yong | |||
argument = socket.recv_multipart()[0] | |||
argument = pickle.loads(argument) | |||
return argument | |||
def sockIO(self): | |||
|
r860 | ||
|
r1171 | """ | |
Socket defined for an operation process. It is able to recover the object sent from another process as well as a | |||
identifier of who sent it. | |||
""" | |||
|
r860 | ||
|
r1171 | cont = zmq.Context() | |
if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp') | |||
socket = cont.socket(zmq.ROUTER) | |||
socket.bind('ipc:///tmp/socketTmp/%s' % self.id) | |||
|
r860 | ||
|
r1171 | return socket | |
|
r860 | ||
|
r1171 | def funIOrec(self, socket): | |
|
r860 | ||
|
r1171 | """ | |
Operation method, recover the id of the process who sent a python object. | |||
The 'socket' argument is the socket binded to a specific process ipc. | |||
""" | |||
|
r860 | ||
|
r1171 | #id_proc = socket.recv() | |
#dataObj = socket.recv_pyobj() | |||
dataObj = socket.recv_multipart() | |||
dataObj[1] = pickle.loads(dataObj[1]) | |||
return dataObj[0], dataObj[1] | |||
def funIOsen(self, socket, data, dest): | |||
""" | |||
Operation method, send a python object to a specific destination. | |||
The 'dest' argument is the id of a proccesinf unit. | |||
""" | |||
socket.send_multipart([dest, pickle.dumps(data)]) #yong | |||
|
r860 | ||
|
r1171 | return True | |
|
r929 | ||
|
r860 | ||
|
r1171 | def runReader(self): | |
|
r860 | ||
|
r1171 | # time.sleep(3) | |
while True: | |||
BaseClass.run(self, **self.kwargs) | |||
|
r860 | ||
|
r487 | ||
|
r1171 | keyList = list(self.operations2RunDict.keys()) | |
keyList.sort() | |||
for key in keyList: | |||
self.socketOP = self.sockOp() | |||
self.dataOut = self.execOp(self.socketOP, key, self.dataOut) | |||
|
r860 | ||
|
r1171 | ||
if self.flagNoMoreFiles: #Usar un objeto con flags para saber si termino el proc o hubo un error | |||
self.publishProc(self.socket_p, "Finish") | |||
break | |||
|
r860 | ||
|
r1171 | if self.dataOut.flagNoData: | |
continue | |||
|
r1173 | #print("Publishing data...") | |
|
r1171 | self.publishProc(self.socket_p, self.dataOut) | |
# time.sleep(2) | |||
print("%s done" %BaseClass.__name__) | |||
return 0 | |||
def runProc(self): | |||
|
r860 | ||
|
r1171 | # All the procUnits with kwargs that require a setup initialization must be defined here. | |
|
r860 | ||
|
r1171 | if self.setupReq: | |
BaseClass.setup(self, **self.kwargs) | |||
|
r860 | ||
|
r1171 | while True: | |
self.dataIn = self.listenProc(self.socket_l) | |||
|
r1173 | #print("%s received data" %BaseClass.__name__) | |
|
r1171 | ||
if self.dataIn == "Finish": | |||
break | |||
m_arg = list(self.kwargs.keys()) | |||
num_arg = list(range(1,int(BaseClass.run.__code__.co_argcount))) | |||
run_arg = {} | |||
for var in num_arg: | |||
if BaseClass.run.__code__.co_varnames[var] in m_arg: | |||
run_arg[BaseClass.run.__code__.co_varnames[var]] = self.kwargs[BaseClass.run.__code__.co_varnames[var]] | |||
#BaseClass.run(self, **self.kwargs) | |||
BaseClass.run(self, **run_arg) | |||
## Iterar sobre una serie de data que podrias aplicarse | |||
for m_name in BaseClass.METHODS: | |||
met_arg = {} | |||
for arg in m_arg: | |||
if arg in BaseClass.METHODS[m_name]: | |||
for att in BaseClass.METHODS[m_name]: | |||
met_arg[att] = self.kwargs[att] | |||
method = getattr(BaseClass, m_name) | |||
method(self, **met_arg) | |||
break | |||
if self.dataOut.flagNoData: | |||
continue | |||
keyList = list(self.operations2RunDict.keys()) | |||
keyList.sort() | |||
for key in keyList: | |||
self.socketOP = self.sockOp() | |||
self.dataOut = self.execOp(self.socketOP, key, self.dataOut) | |||
self.publishProc(self.socket_p, self.dataOut) | |||
print("%s done" %BaseClass.__name__) | |||
return 0 | |||
def runOp(self): | |||
while True: | |||
[self.dest ,self.buffer] = self.funIOrec(self.socket_router) | |||
self.buffer = BaseClass.run(self, self.buffer, **self.kwargs) | |||
self.funIOsen(self.socket_router, self.buffer, self.dest) | |||
print("%s done" %BaseClass.__name__) | |||
return 0 | |||
def run(self): | |||
if self.typeProc is "ProcUnit": | |||
self.socket_p = self.sockPublishing() | |||
if 'Reader' not in self.dictProcs[self.id].name: | |||
self.socket_l = self.sockListening(self.inputId) | |||
self.runProc() | |||
else: | |||
self.runReader() | |||
elif self.typeProc is "Operation": | |||
self.socket_router = self.sockIO() | |||
self.runOp() | |||
|
r860 | ||
|
r1171 | else: | |
raise ValueError("Unknown type") | |||
|
r860 | ||
|
r1171 | return 0 | |
return MPClass |