jroproc_base.py
366 lines
| 10.3 KiB
| text/x-python
|
PythonLexer
|
r487 | ''' | |
|
r1171 | Updated for multiprocessing | |
Author : Sergio Cortez | |||
Jan 2018 | |||
Abstract: | |||
Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created. | |||
The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated. | |||
The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self'). | |||
Based on: | |||
$Author: murco $ | |||
$Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $ | |||
|
r487 | ''' | |
|
r1171 | from platform import python_version | |
|
r929 | import inspect | |
|
r1171 | import zmq | |
import time | |||
import pickle | |||
import os | |||
from multiprocessing import Process | |||
from schainpy.utils import log | |||
|
r929 | ||
|
r487 | ||
|
r568 | class ProcessingUnit(object): | |
|
r860 | ||
|
r487 | """ | |
|
r1171 | Update - Jan 2018 - MULTIPROCESSING | |
All the "call" methods present in the previous base were removed. | |||
The majority of operations are independant processes, thus | |||
the decorator is in charge of communicate the operation processes | |||
with the proccessing unit via IPC. | |||
|
r860 | ||
|
r1171 | The constructor does not receive any argument. The remaining methods | |
are related with the operations to execute. | |||
|
r860 | ||
|
r487 | """ | |
|
r1177 | ||
METHODS = {} | |||
|
r487 | dataIn = None | |
dataInList = [] | |||
|
r1171 | id = None | |
inputId = None | |||
|
r487 | dataOut = None | |
|
r1171 | dictProcs = None | |
|
r487 | isConfig = False | |
|
r860 | ||
|
r1171 | def __init__(self): | |
|
r860 | ||
|
r487 | self.dataIn = None | |
|
r573 | self.dataOut = None | |
|
r487 | self.isConfig = False | |
|
r1177 | self.operations = [] | |
|
r860 | ||
|
r929 | def getAllowedArgs(self): | |
|
r1097 | if hasattr(self, '__attrs__'): | |
return self.__attrs__ | |||
else: | |||
return inspect.getargspec(self.run).args | |||
|
r1171 | ||
|
r1177 | def addOperation(self, conf, operation): | |
|
r860 | ||
|
r487 | """ | |
|
r1171 | This method is used in the controller, and update the dictionary containing the operations to execute. The dict | |
posses the id of the operation process (IPC purposes) | |||
|
r860 | ||
|
r1171 | Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el | |
identificador asociado a este objeto. | |||
|
r860 | ||
|
r1171 | Input: | |
|
r860 | ||
|
r1171 | object : objeto de la clase "Operation" | |
Return: | |||
|
r860 | ||
|
r1171 | objId : identificador del objeto, necesario para comunicar con master(procUnit) | |
|
r487 | """ | |
|
r860 | ||
|
r1177 | self.operations.append((operation, conf.type, conf.id, conf.getKwargs())) | |
|
r1171 | ||
|
r577 | def getOperationObj(self, objId): | |
|
r860 | ||
|
r1177 | if objId not in list(self.operations.keys()): | |
|
r577 | return None | |
|
r860 | ||
|
r1177 | return self.operations[objId] | |
|
r860 | ||
|
r487 | def operation(self, **kwargs): | |
|
r860 | ||
|
r487 | """ | |
Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los | |||
atributos del objeto dataOut | |||
|
r860 | ||
|
r487 | Input: | |
|
r860 | ||
|
r487 | **kwargs : Diccionario de argumentos de la funcion a ejecutar | |
""" | |||
|
r860 | ||
|
r1171 | raise NotImplementedError | |
|
r860 | ||
|
r1171 | def setup(self): | |
|
r860 | ||
|
r1171 | raise NotImplementedError | |
|
r860 | ||
|
r1171 | def run(self): | |
|
r860 | ||
|
r1171 | raise NotImplementedError | |
|
r860 | ||
|
r1171 | def close(self): | |
#Close every thread, queue or any other object here is it is neccesary. | |||
return | |||
class Operation(object): | |||
|
r860 | ||
|
r1171 | """ | |
Update - Jan 2018 - MULTIPROCESSING | |||
|
r860 | ||
|
r1171 | Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process. | |
The constructor doe snot receive any argument, neither the baseclass. | |||
|
r860 | ||
|
r1171 | Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit | |
y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de | |||
acumulacion dentro de esta clase | |||
|
r860 | ||
|
r1171 | Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer) | |
|
r860 | ||
|
r1171 | """ | |
id = None | |||
__buffer = None | |||
dest = None | |||
isConfig = False | |||
readyFlag = None | |||
|
r860 | ||
|
r1171 | def __init__(self): | |
|
r860 | ||
|
r1171 | self.buffer = None | |
self.dest = None | |||
self.isConfig = False | |||
self.readyFlag = False | |||
|
r860 | ||
|
r1171 | if not hasattr(self, 'name'): | |
self.name = self.__class__.__name__ | |||
def getAllowedArgs(self): | |||
if hasattr(self, '__attrs__'): | |||
return self.__attrs__ | |||
else: | |||
return inspect.getargspec(self.run).args | |||
|
r860 | ||
|
r1171 | def setup(self): | |
|
r860 | ||
|
r1171 | self.isConfig = True | |
|
r860 | ||
|
r1171 | raise NotImplementedError | |
|
r860 | ||
|
r1171 | def run(self, dataIn, **kwargs): | |
|
r860 | ||
|
r487 | """ | |
|
r1171 | Realiza las operaciones necesarias sobre la dataIn.data y actualiza los | |
atributos del objeto dataIn. | |||
|
r860 | ||
|
r568 | Input: | |
|
r860 | ||
|
r1171 | dataIn : objeto del tipo JROData | |
|
r860 | ||
|
r1171 | Return: | |
|
r860 | ||
|
r1171 | None | |
|
r860 | ||
|
r1171 | Affected: | |
__buffer : buffer de recepcion de datos. | |||
|
r860 | ||
|
r1171 | """ | |
if not self.isConfig: | |||
self.setup(**kwargs) | |||
|
r860 | ||
|
r1171 | raise NotImplementedError | |
|
r860 | ||
|
r1171 | def close(self): | |
|
r860 | ||
|
r1171 | pass | |
|
r860 | ||
|
r1171 | def MPDecorator(BaseClass): | |
""" | |||
|
r1177 | Multiprocessing class decorator | |
|
r860 | ||
|
r1177 | This function add multiprocessing features to a BaseClass. Also, it handle | |
the communication beetween processes (readers, procUnits and operations). | |||
|
r1171 | """ | |
class MPClass(BaseClass, Process): | |||
|
r1177 | ||
|
r1171 | def __init__(self, *args, **kwargs): | |
super(MPClass, self).__init__() | |||
Process.__init__(self) | |||
self.operationKwargs = {} | |||
self.args = args | |||
self.kwargs = kwargs | |||
|
r1177 | self.sender = None | |
self.receiver = None | |||
self.name = BaseClass.__name__ | |||
|
r1171 | ||
if len(self.args) is 3: | |||
self.typeProc = "ProcUnit" | |||
|
r1177 | self.id = args[0] | |
self.inputId = args[1] | |||
self.project_id = args[2] | |||
|
r1171 | else: | |
self.id = args[0] | |||
|
r1177 | self.inputId = args[0] | |
self.project_id = args[1] | |||
|
r1171 | self.typeProc = "Operation" | |
|
r860 | ||
|
r1171 | def getAllowedArgs(self): | |
|
r860 | ||
|
r1171 | if hasattr(self, '__attrs__'): | |
return self.__attrs__ | |||
else: | |||
|
r1177 | return inspect.getargspec(BaseClass.run).args | |
|
r1171 | ||
|
r1177 | def subscribe(self): | |
''' | |||
This function create a socket to receive objects from the | |||
topic `inputId`. | |||
''' | |||
|
r1171 | ||
|
r1177 | c = zmq.Context() | |
self.receiver = c.socket(zmq.SUB) | |||
self.receiver.connect('ipc:///tmp/schain/{}_pub'.format(self.project_id)) | |||
self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) | |||
def listen(self): | |||
''' | |||
This function waits for objects and deserialize using pickle | |||
''' | |||
|
r1171 | ||
|
r1177 | data = pickle.loads(self.receiver.recv_multipart()[1]) | |
return data | |||
|
r1171 | ||
|
r1177 | def set_publisher(self): | |
''' | |||
This function create a socket for publishing purposes. | |||
''' | |||
|
r1171 | ||
|
r1177 | time.sleep(1) | |
c = zmq.Context() | |||
self.sender = c.socket(zmq.PUB) | |||
self.sender.connect('ipc:///tmp/schain/{}_sub'.format(self.project_id)) | |||
|
r860 | ||
|
r1177 | def publish(self, data, id): | |
''' | |||
This function publish an object, to a specific topic. | |||
''' | |||
|
r929 | ||
|
r1177 | self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) | |
|
r860 | ||
|
r1171 | def runReader(self): | |
|
r1177 | ''' | |
Run fuction for read units | |||
''' | |||
|
r1171 | while True: | |
BaseClass.run(self, **self.kwargs) | |||
|
r860 | ||
|
r1177 | if self.dataOut.error[0] == -1: | |
log.error(self.dataOut.error[1]) | |||
self.publish('end', self.id) | |||
#self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()]) | |||
|
r1171 | break | |
|
r860 | ||
|
r1177 | for op, optype, id, kwargs in self.operations: | |
if optype=='self': | |||
op(**kwargs) | |||
elif optype=='other': | |||
self.dataOut = op.run(self.dataOut, **self.kwargs) | |||
elif optype=='external': | |||
self.publish(self.dataOut, opId) | |||
|
r1171 | if self.dataOut.flagNoData: | |
continue | |||
|
r1177 | self.publish(self.dataOut, self.id) | |
|
r1171 | def runProc(self): | |
|
r1177 | ''' | |
Run function for proccessing units | |||
''' | |||
|
r860 | ||
|
r1171 | while True: | |
|
r1177 | self.dataIn = self.listen() | |
|
r1171 | ||
|
r1177 | if self.dataIn == 'end': | |
self.publish('end', self.id) | |||
for op, optype, opId, kwargs in self.operations: | |||
if optype == 'external': | |||
self.publish('end', opId) | |||
break | |||
|
r1171 | ||
|
r1177 | if self.dataIn.flagNoData: | |
continue | |||
|
r1171 | ||
|
r1177 | BaseClass.run(self, **self.kwargs) | |
|
r1171 | ||
|
r1177 | for op, optype, opId, kwargs in self.operations: | |
if optype=='self': | |||
op(**kwargs) | |||
elif optype=='other': | |||
self.dataOut = op.run(self.dataOut, **kwargs) | |||
elif optype=='external': | |||
self.publish(self.dataOut, opId) | |||
|
r1171 | ||
if self.dataOut.flagNoData: | |||
continue | |||
|
r1177 | self.publish(self.dataOut, self.id) | |
|
r1171 | ||
def runOp(self): | |||
|
r1177 | ''' | |
Run function for operations | |||
''' | |||
|
r1171 | ||
while True: | |||
|
r1177 | dataOut = self.listen() | |
|
r1171 | ||
|
r1177 | if dataOut == 'end': | |
break | |||
BaseClass.run(self, dataOut, **self.kwargs) | |||
|
r1171 | def run(self): | |
|
r1177 | if self.typeProc is "ProcUnit": | |
if self.inputId is not None: | |||
self.subscribe() | |||
self.set_publisher() | |||
|
r1171 | ||
|
r1177 | if 'Reader' not in BaseClass.__name__: | |
self.runProc() | |||
|
r1171 | else: | |
self.runReader() | |||
elif self.typeProc is "Operation": | |||
|
r1177 | self.subscribe() | |
|
r1171 | self.runOp() | |
|
r860 | ||
|
r1171 | else: | |
raise ValueError("Unknown type") | |||
|
r860 | ||
|
r1177 | print("%s done" % BaseClass.__name__) | |
self.close() | |||
def close(self): | |||
if self.sender: | |||
self.sender.close() | |||
if self.receiver: | |||
self.receiver.close() | |||
|
r1171 | return MPClass |