jroproc_base.py
384 lines
| 11.1 KiB
| text/x-python
|
PythonLexer
|
r487 | ''' | ||
|
r1171 | Updated for multiprocessing | ||
Author : Sergio Cortez | ||||
Jan 2018 | ||||
Abstract: | ||||
Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created. | ||||
The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated. | ||||
The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self'). | ||||
Based on: | ||||
$Author: murco $ | ||||
$Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $ | ||||
|
r487 | ''' | ||
|
r1187 | |||
|
r929 | import inspect | ||
|
r1171 | import zmq | ||
import time | ||||
import pickle | ||||
import os | ||||
from multiprocessing import Process | ||||
|
r1187 | from zmq.utils.monitor import recv_monitor_message | ||
|
r1171 | from schainpy.utils import log | ||
|
r929 | |||
|
r487 | |||
|
r568 | class ProcessingUnit(object): | ||
|
r860 | |||
|
r487 | """ | ||
|
r1171 | Update - Jan 2018 - MULTIPROCESSING | ||
All the "call" methods present in the previous base were removed. | ||||
The majority of operations are independant processes, thus | ||||
the decorator is in charge of communicate the operation processes | ||||
with the proccessing unit via IPC. | ||||
|
r860 | |||
|
r1171 | The constructor does not receive any argument. The remaining methods | ||
are related with the operations to execute. | ||||
|
r860 | |||
|
r487 | """ | ||
|
r860 | |||
|
r1171 | def __init__(self): | ||
|
r860 | |||
|
r487 | self.dataIn = None | ||
|
r573 | self.dataOut = None | ||
|
r487 | self.isConfig = False | ||
|
r1177 | self.operations = [] | ||
|
r1187 | self.plots = [] | ||
|
r860 | |||
|
r929 | def getAllowedArgs(self): | ||
|
r1097 | if hasattr(self, '__attrs__'): | ||
return self.__attrs__ | ||||
else: | ||||
return inspect.getargspec(self.run).args | ||||
|
r860 | |||
|
r1187 | def addOperation(self, conf, operation): | ||
|
r487 | """ | ||
|
r1171 | This method is used in the controller, and update the dictionary containing the operations to execute. The dict | ||
posses the id of the operation process (IPC purposes) | ||||
|
r860 | |||
|
r1171 | Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el | ||
identificador asociado a este objeto. | ||||
|
r860 | |||
|
r1171 | Input: | ||
|
r860 | |||
|
r1171 | object : objeto de la clase "Operation" | ||
Return: | ||||
|
r860 | |||
|
r1171 | objId : identificador del objeto, necesario para comunicar con master(procUnit) | ||
|
r487 | """ | ||
|
r860 | |||
|
r1187 | self.operations.append( | ||
(operation, conf.type, conf.id, conf.getKwargs())) | ||||
if 'plot' in self.name.lower(): | ||||
self.plots.append(operation.CODE) | ||||
|
r1171 | |||
|
r577 | def getOperationObj(self, objId): | ||
|
r860 | |||
|
r1177 | if objId not in list(self.operations.keys()): | ||
|
r577 | return None | ||
|
r860 | |||
|
r1177 | return self.operations[objId] | ||
|
r860 | |||
|
r487 | def operation(self, **kwargs): | ||
""" | ||||
Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los | ||||
atributos del objeto dataOut | ||||
|
r860 | |||
|
r487 | Input: | ||
|
r860 | |||
|
r487 | **kwargs : Diccionario de argumentos de la funcion a ejecutar | ||
""" | ||||
|
r860 | |||
|
r1187 | raise NotImplementedError | ||
|
r860 | |||
|
r1171 | def setup(self): | ||
|
r860 | |||
|
r1171 | raise NotImplementedError | ||
|
r860 | |||
|
r1171 | def run(self): | ||
|
r860 | |||
|
r1171 | raise NotImplementedError | ||
|
r860 | |||
|
r1171 | def close(self): | ||
|
r1187 | |||
|
r1171 | return | ||
|
r1187 | |||
|
r1171 | class Operation(object): | ||
|
r860 | |||
|
r1171 | """ | ||
Update - Jan 2018 - MULTIPROCESSING | ||||
|
r860 | |||
|
r1171 | Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process. | ||
The constructor doe snot receive any argument, neither the baseclass. | ||||
|
r860 | |||
|
r1171 | Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit | ||
y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de | ||||
acumulacion dentro de esta clase | ||||
|
r860 | |||
|
r1171 | Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer) | ||
|
r860 | |||
|
r1171 | """ | ||
|
r860 | |||
|
r1171 | def __init__(self): | ||
|
r860 | |||
|
r1187 | self.id = None | ||
|
r1171 | self.isConfig = False | ||
|
r860 | |||
|
r1171 | if not hasattr(self, 'name'): | ||
self.name = self.__class__.__name__ | ||||
|
r1187 | |||
|
r1171 | def getAllowedArgs(self): | ||
if hasattr(self, '__attrs__'): | ||||
return self.__attrs__ | ||||
else: | ||||
return inspect.getargspec(self.run).args | ||||
|
r860 | |||
|
r1171 | def setup(self): | ||
|
r860 | |||
|
r1171 | self.isConfig = True | ||
|
r860 | |||
|
r1171 | raise NotImplementedError | ||
|
r860 | |||
|
r1171 | def run(self, dataIn, **kwargs): | ||
|
r487 | """ | ||
|
r1171 | Realiza las operaciones necesarias sobre la dataIn.data y actualiza los | ||
atributos del objeto dataIn. | ||||
|
r860 | |||
|
r568 | Input: | ||
|
r860 | |||
|
r1171 | dataIn : objeto del tipo JROData | ||
|
r860 | |||
|
r1171 | Return: | ||
|
r860 | |||
|
r1171 | None | ||
|
r860 | |||
|
r1171 | Affected: | ||
__buffer : buffer de recepcion de datos. | ||||
|
r860 | |||
|
r1171 | """ | ||
if not self.isConfig: | ||||
self.setup(**kwargs) | ||||
|
r860 | |||
|
r1171 | raise NotImplementedError | ||
|
r860 | |||
|
r1171 | def close(self): | ||
|
r860 | |||
|
r1187 | return | ||
|
r860 | |||
|
r1171 | def MPDecorator(BaseClass): | ||
""" | ||||
|
r1177 | Multiprocessing class decorator | ||
|
r860 | |||
|
r1177 | This function add multiprocessing features to a BaseClass. Also, it handle | ||
the communication beetween processes (readers, procUnits and operations). | ||||
|
r1171 | """ | ||
|
r1187 | |||
|
r1171 | class MPClass(BaseClass, Process): | ||
|
r1177 | |||
|
r1171 | def __init__(self, *args, **kwargs): | ||
super(MPClass, self).__init__() | ||||
Process.__init__(self) | ||||
self.operationKwargs = {} | ||||
self.args = args | ||||
self.kwargs = kwargs | ||||
|
r1177 | self.sender = None | ||
self.receiver = None | ||||
self.name = BaseClass.__name__ | ||||
|
r1201 | if 'plot' in self.name.lower() and not self.name.endswith('_'): | ||
self.name = '{}{}'.format(self.CODE.upper(), 'Plot') | ||||
|
r1187 | self.start_time = time.time() | ||
|
r1171 | |||
if len(self.args) is 3: | ||||
self.typeProc = "ProcUnit" | ||||
|
r1187 | self.id = args[0] | ||
self.inputId = args[1] | ||||
self.project_id = args[2] | ||||
elif len(self.args) is 2: | ||||
|
r1171 | self.id = args[0] | ||
|
r1177 | self.inputId = args[0] | ||
self.project_id = args[1] | ||||
|
r1171 | self.typeProc = "Operation" | ||
|
r860 | |||
|
r1177 | def subscribe(self): | ||
''' | ||||
This function create a socket to receive objects from the | ||||
topic `inputId`. | ||||
''' | ||||
|
r1187 | |||
|
r1177 | c = zmq.Context() | ||
self.receiver = c.socket(zmq.SUB) | ||||
|
r1187 | self.receiver.connect( | ||
'ipc:///tmp/schain/{}_pub'.format(self.project_id)) | ||||
|
r1177 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) | ||
|
r1191 | |||
|
r1177 | def listen(self): | ||
''' | ||||
This function waits for objects and deserialize using pickle | ||||
''' | ||||
|
r1191 | |||
|
r1187 | data = pickle.loads(self.receiver.recv_multipart()[1]) | ||
|
r1191 | |||
|
r1177 | return data | ||
|
r1171 | |||
|
r1177 | def set_publisher(self): | ||
''' | ||||
This function create a socket for publishing purposes. | ||||
''' | ||||
|
r1171 | |||
|
r1177 | time.sleep(1) | ||
c = zmq.Context() | ||||
|
r1187 | self.sender = c.socket(zmq.PUB) | ||
self.sender.connect( | ||||
'ipc:///tmp/schain/{}_sub'.format(self.project_id)) | ||||
|
r860 | |||
|
r1187 | def publish(self, data, id): | ||
|
r1177 | ''' | ||
This function publish an object, to a specific topic. | ||||
''' | ||||
self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) | ||||
|
r860 | |||
|
r1171 | def runReader(self): | ||
|
r1177 | ''' | ||
Run fuction for read units | ||||
''' | ||||
|
r1171 | while True: | ||
|
r860 | |||
|
r1187 | BaseClass.run(self, **self.kwargs) | ||
|
r860 | |||
|
r1187 | for op, optype, opId, kwargs in self.operations: | ||
|
r1198 | if optype == 'self' and not self.dataOut.flagNoData: | ||
|
r1177 | op(**kwargs) | ||
|
r1198 | elif optype == 'other' and not self.dataOut.flagNoData: | ||
|
r1177 | self.dataOut = op.run(self.dataOut, **self.kwargs) | ||
|
r1187 | elif optype == 'external': | ||
|
r1177 | self.publish(self.dataOut, opId) | ||
|
r1193 | if self.dataOut.flagNoData and not self.dataOut.error: | ||
|
r1171 | continue | ||
|
r1187 | |||
self.publish(self.dataOut, self.id) | ||||
if self.dataOut.error: | ||||
|
r1193 | log.error(self.dataOut.error, self.name) | ||
|
r1187 | # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()]) | ||
break | ||||
time.sleep(1) | ||||
|
r1171 | def runProc(self): | ||
|
r1177 | ''' | ||
Run function for proccessing units | ||||
''' | ||||
|
r860 | |||
|
r1171 | while True: | ||
|
r1191 | self.dataIn = self.listen() | ||
|
r1171 | |||
|
r1187 | if self.dataIn.flagNoData and self.dataIn.error is None: | ||
|
r1177 | continue | ||
|
r1191 | |||
|
r1177 | BaseClass.run(self, **self.kwargs) | ||
|
r1171 | |||
|
r1193 | if self.dataIn.error: | ||
self.dataOut.error = self.dataIn.error | ||||
self.dataOut.flagNoData = True | ||||
|
r1191 | |||
|
r1177 | for op, optype, opId, kwargs in self.operations: | ||
|
r1198 | if optype == 'self' and not self.dataOut.flagNoData: | ||
|
r1177 | op(**kwargs) | ||
|
r1198 | elif optype == 'other' and not self.dataOut.flagNoData: | ||
|
r1177 | self.dataOut = op.run(self.dataOut, **kwargs) | ||
|
r1201 | elif optype == 'external' and not self.dataOut.flagNoData: | ||
|
r1193 | if not self.dataOut.flagNoData or self.dataOut.error: | ||
self.publish(self.dataOut, opId) | ||||
|
r1171 | |||
|
r1198 | if not self.dataOut.flagNoData or self.dataOut.error: | ||
self.publish(self.dataOut, self.id) | ||||
|
r1187 | if self.dataIn.error: | ||
break | ||||
time.sleep(1) | ||||
|
r1171 | |||
def runOp(self): | ||||
|
r1177 | ''' | ||
|
r1187 | Run function for external operations (this operations just receive data | ||
ex: plots, writers, publishers) | ||||
|
r1177 | ''' | ||
|
r1191 | |||
|
r1171 | while True: | ||
|
r1177 | dataOut = self.listen() | ||
|
r1179 | |||
|
r1177 | BaseClass.run(self, dataOut, **self.kwargs) | ||
|
r1187 | |||
if dataOut.error: | ||||
break | ||||
|
r1193 | |||
|
r1187 | time.sleep(1) | ||
|
r1171 | def run(self): | ||
|
r1177 | if self.typeProc is "ProcUnit": | ||
|
r1187 | |||
|
r1177 | if self.inputId is not None: | ||
|
r1191 | |||
|
r1177 | self.subscribe() | ||
|
r1191 | |||
|
r1177 | self.set_publisher() | ||
|
r1171 | |||
|
r1177 | if 'Reader' not in BaseClass.__name__: | ||
self.runProc() | ||||
|
r1171 | else: | ||
self.runReader() | ||||
elif self.typeProc is "Operation": | ||||
|
r1187 | |||
|
r1177 | self.subscribe() | ||
|
r1171 | self.runOp() | ||
|
r860 | |||
|
r1171 | else: | ||
raise ValueError("Unknown type") | ||||
|
r860 | |||
|
r1177 | self.close() | ||
|
r1187 | def event_monitor(self, monitor): | ||
events = {} | ||||
for name in dir(zmq): | ||||
if name.startswith('EVENT_'): | ||||
value = getattr(zmq, name) | ||||
events[value] = name | ||||
while monitor.poll(): | ||||
evt = recv_monitor_message(monitor) | ||||
if evt['event'] == 32: | ||||
self.connections += 1 | ||||
if evt['event'] == 512: | ||||
pass | ||||
evt.update({'description': events[evt['event']]}) | ||||
if evt['event'] == zmq.EVENT_MONITOR_STOPPED: | ||||
break | ||||
monitor.close() | ||||
print('event monitor thread done!') | ||||
|
r1177 | def close(self): | ||
|
r1187 | |||
BaseClass.close(self) | ||||
|
r1177 | if self.sender: | ||
self.sender.close() | ||||
|
r1187 | |||
|
r1177 | if self.receiver: | ||
self.receiver.close() | ||||
|
r1187 | log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name) | ||
return MPClass | ||||