jroproc_base.py
426 lines
| 12.3 KiB
| text/x-python
|
PythonLexer
|
r487 | ''' | ||
|
r1171 | Updated for multiprocessing | ||
Author : Sergio Cortez | ||||
Jan 2018 | ||||
Abstract: | ||||
Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created. | ||||
r1279 | The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated. | |||
The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self'). | ||||
|
r1171 | |||
Based on: | ||||
$Author: murco $ | ||||
$Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $ | ||||
|
r487 | ''' | ||
|
r1187 | |||
|
r1212 | import os | ||
r1256 | import sys | |||
|
r929 | import inspect | ||
|
r1171 | import zmq | ||
import time | ||||
import pickle | ||||
r1241 | import traceback | |||
r1252 | try: | |||
from queue import Queue | ||||
except: | ||||
from Queue import Queue | ||||
|
r1235 | from threading import Thread | ||
|
r1171 | from multiprocessing import Process | ||
|
r1187 | |||
|
r1171 | from schainpy.utils import log | ||
|
r929 | |||
|
r487 | |||
|
r568 | class ProcessingUnit(object): | ||
|
r860 | |||
|
r487 | """ | ||
|
r1171 | Update - Jan 2018 - MULTIPROCESSING | ||
r1279 | All the "call" methods present in the previous base were removed. | |||
|
r1171 | The majority of operations are independant processes, thus | ||
r1279 | the decorator is in charge of communicate the operation processes | |||
|
r1171 | with the proccessing unit via IPC. | ||
|
r860 | |||
|
r1171 | The constructor does not receive any argument. The remaining methods | ||
are related with the operations to execute. | ||||
r1279 | ||||
|
r860 | |||
|
r487 | """ | ||
r1254 | proc_type = 'processing' | |||
__attrs__ = [] | ||||
|
r860 | |||
|
r1171 | def __init__(self): | ||
|
r860 | |||
|
r487 | self.dataIn = None | ||
|
r573 | self.dataOut = None | ||
|
r487 | self.isConfig = False | ||
|
r1177 | self.operations = [] | ||
|
r1187 | self.plots = [] | ||
|
r860 | |||
|
r929 | def getAllowedArgs(self): | ||
|
r1097 | if hasattr(self, '__attrs__'): | ||
return self.__attrs__ | ||||
else: | ||||
return inspect.getargspec(self.run).args | ||||
|
r860 | |||
|
r1187 | def addOperation(self, conf, operation): | ||
|
r487 | """ | ||
r1279 | This method is used in the controller, and update the dictionary containing the operations to execute. The dict | |||
|
r1171 | posses the id of the operation process (IPC purposes) | ||
|
r860 | |||
|
r1171 | Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el | ||
identificador asociado a este objeto. | ||||
|
r860 | |||
|
r1171 | Input: | ||
|
r860 | |||
|
r1171 | object : objeto de la clase "Operation" | ||
Return: | ||||
|
r860 | |||
|
r1171 | objId : identificador del objeto, necesario para comunicar con master(procUnit) | ||
|
r487 | """ | ||
|
r860 | |||
|
r1187 | self.operations.append( | ||
(operation, conf.type, conf.id, conf.getKwargs())) | ||||
r1279 | ||||
|
r1187 | if 'plot' in self.name.lower(): | ||
self.plots.append(operation.CODE) | ||||
|
r1171 | |||
|
r577 | def getOperationObj(self, objId): | ||
|
r860 | |||
|
r1177 | if objId not in list(self.operations.keys()): | ||
|
r577 | return None | ||
|
r860 | |||
|
r1177 | return self.operations[objId] | ||
|
r860 | |||
|
r487 | def operation(self, **kwargs): | ||
""" | ||||
Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los | ||||
atributos del objeto dataOut | ||||
|
r860 | |||
|
r487 | Input: | ||
|
r860 | |||
|
r487 | **kwargs : Diccionario de argumentos de la funcion a ejecutar | ||
""" | ||||
|
r860 | |||
|
r1187 | raise NotImplementedError | ||
|
r860 | |||
|
r1171 | def setup(self): | ||
|
r860 | |||
|
r1171 | raise NotImplementedError | ||
|
r860 | |||
|
r1171 | def run(self): | ||
|
r860 | |||
|
r1171 | raise NotImplementedError | ||
|
r860 | |||
|
r1171 | def close(self): | ||
|
r1187 | |||
|
r1171 | return | ||
|
r1187 | |||
|
r1171 | class Operation(object): | ||
|
r860 | |||
|
r1171 | """ | ||
Update - Jan 2018 - MULTIPROCESSING | ||||
|
r860 | |||
|
r1171 | Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process. | ||
The constructor doe snot receive any argument, neither the baseclass. | ||||
|
r860 | |||
|
r1171 | Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit | ||
y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de | ||||
acumulacion dentro de esta clase | ||||
|
r860 | |||
|
r1171 | Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer) | ||
|
r860 | |||
|
r1171 | """ | ||
r1254 | proc_type = 'operation' | |||
__attrs__ = [] | ||||
|
r860 | |||
|
r1171 | def __init__(self): | ||
|
r860 | |||
|
r1187 | self.id = None | ||
|
r1171 | self.isConfig = False | ||
|
r860 | |||
|
r1171 | if not hasattr(self, 'name'): | ||
self.name = self.__class__.__name__ | ||||
|
r1187 | |||
|
r1171 | def getAllowedArgs(self): | ||
if hasattr(self, '__attrs__'): | ||||
return self.__attrs__ | ||||
else: | ||||
return inspect.getargspec(self.run).args | ||||
|
r860 | |||
|
r1171 | def setup(self): | ||
|
r860 | |||
|
r1171 | self.isConfig = True | ||
|
r860 | |||
|
r1171 | raise NotImplementedError | ||
|
r860 | |||
|
r1171 | def run(self, dataIn, **kwargs): | ||
|
r487 | """ | ||
|
r1171 | Realiza las operaciones necesarias sobre la dataIn.data y actualiza los | ||
atributos del objeto dataIn. | ||||
|
r860 | |||
|
r568 | Input: | ||
|
r860 | |||
|
r1171 | dataIn : objeto del tipo JROData | ||
|
r860 | |||
|
r1171 | Return: | ||
|
r860 | |||
|
r1171 | None | ||
|
r860 | |||
|
r1171 | Affected: | ||
__buffer : buffer de recepcion de datos. | ||||
|
r860 | |||
|
r1171 | """ | ||
if not self.isConfig: | ||||
self.setup(**kwargs) | ||||
|
r860 | |||
|
r1171 | raise NotImplementedError | ||
|
r860 | |||
|
r1171 | def close(self): | ||
|
r860 | |||
|
r1187 | return | ||
|
r860 | |||
r1250 | class InputQueue(Thread): | |||
r1279 | ||||
r1256 | ''' | |||
Class to hold input data for Proccessing Units and external Operations, | ||||
''' | ||||
def __init__(self, project_id, inputId, lock=None): | ||||
r1254 | ||||
r1256 | Thread.__init__(self) | |||
self.queue = Queue() | ||||
self.project_id = project_id | ||||
self.inputId = inputId | ||||
self.lock = lock | ||||
r1268 | self.islocked = False | |||
r1256 | self.size = 0 | |||
def run(self): | ||||
c = zmq.Context() | ||||
self.receiver = c.socket(zmq.SUB) | ||||
self.receiver.connect( | ||||
'ipc:///tmp/schain/{}_pub'.format(self.project_id)) | ||||
self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) | ||||
while True: | ||||
obj = self.receiver.recv_multipart()[1] | ||||
self.size += sys.getsizeof(obj) | ||||
self.queue.put(obj) | ||||
r1268 | def get(self): | |||
if not self.islocked and self.size/1000000 > 512: | ||||
r1279 | self.lock.n.value += 1 | |||
r1268 | self.islocked = True | |||
r1256 | self.lock.clear() | |||
r1268 | elif self.islocked and self.size/1000000 <= 512: | |||
self.islocked = False | ||||
self.lock.n.value -= 1 | ||||
if self.lock.n.value == 0: | ||||
r1279 | self.lock.set() | |||
r1256 | obj = self.queue.get() | |||
r1268 | self.size -= sys.getsizeof(obj) | |||
r1256 | return pickle.loads(obj) | |||
r1254 | ||||
r1279 | ||||
|
r1171 | def MPDecorator(BaseClass): | ||
""" | ||||
|
r1177 | Multiprocessing class decorator | ||
|
r860 | |||
|
r1177 | This function add multiprocessing features to a BaseClass. Also, it handle | ||
r1279 | the communication beetween processes (readers, procUnits and operations). | |||
|
r1171 | """ | ||
|
r1187 | |||
|
r1171 | class MPClass(BaseClass, Process): | ||
|
r1177 | |||
|
r1171 | def __init__(self, *args, **kwargs): | ||
super(MPClass, self).__init__() | ||||
Process.__init__(self) | ||||
self.operationKwargs = {} | ||||
self.args = args | ||||
self.kwargs = kwargs | ||||
|
r1177 | self.sender = None | ||
self.receiver = None | ||||
|
r1235 | self.i = 0 | ||
r1245 | self.t = time.time() | |||
|
r1177 | self.name = BaseClass.__name__ | ||
r1254 | self.__doc__ = BaseClass.__doc__ | |||
r1279 | ||||
|
r1201 | if 'plot' in self.name.lower() and not self.name.endswith('_'): | ||
self.name = '{}{}'.format(self.CODE.upper(), 'Plot') | ||||
r1279 | ||||
self.start_time = time.time() | ||||
r1241 | self.id = args[0] | |||
self.inputId = args[1] | ||||
self.project_id = args[2] | ||||
self.err_queue = args[3] | ||||
r1256 | self.lock = args[4] | |||
self.typeProc = args[5] | ||||
r1241 | self.err_queue.put('#_start_#') | |||
r1268 | if self.inputId is not None: | |||
self.queue = InputQueue(self.project_id, self.inputId, self.lock) | ||||
|
r860 | |||
|
r1177 | def subscribe(self): | ||
''' | ||||
r1245 | Start the zmq socket receiver and subcribe to input ID. | |||
|
r1177 | ''' | ||
r1250 | ||||
self.queue.start() | ||||
r1279 | ||||
|
r1177 | def listen(self): | ||
''' | ||||
|
r1235 | This function waits for objects | ||
|
r1177 | ''' | ||
r1279 | ||||
return self.queue.get() | ||||
|
r1171 | |||
|
r1177 | def set_publisher(self): | ||
''' | ||||
r1279 | This function create a zmq socket for publishing objects. | |||
|
r1177 | ''' | ||
|
r1171 | |||
r1245 | time.sleep(0.5) | |||
r1279 | ||||
|
r1177 | c = zmq.Context() | ||
|
r1187 | self.sender = c.socket(zmq.PUB) | ||
self.sender.connect( | ||||
'ipc:///tmp/schain/{}_sub'.format(self.project_id)) | ||||
|
r860 | |||
|
r1187 | def publish(self, data, id): | ||
|
r1177 | ''' | ||
r1245 | This function publish an object, to an specific topic. | |||
r1256 | It blocks publishing when receiver queue is full to avoid data loss | |||
r1279 | ''' | |||
|
r1220 | if self.inputId is None: | ||
r1256 | self.lock.wait() | |||
|
r1177 | self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) | ||
|
r1171 | def runReader(self): | ||
|
r1177 | ''' | ||
Run fuction for read units | ||||
''' | ||||
|
r1171 | while True: | ||
|
r860 | |||
r1241 | try: | |||
BaseClass.run(self, **self.kwargs) | ||||
except: | ||||
r1279 | err = traceback.format_exc() | |||
r1241 | if 'No more files' in err: | |||
log.warning('No more files to read', self.name) | ||||
else: | ||||
self.err_queue.put('{}|{}'.format(self.name, err)) | ||||
r1279 | self.dataOut.error = True | |||
|
r1187 | for op, optype, opId, kwargs in self.operations: | ||
|
r1198 | if optype == 'self' and not self.dataOut.flagNoData: | ||
|
r1177 | op(**kwargs) | ||
|
r1198 | elif optype == 'other' and not self.dataOut.flagNoData: | ||
|
r1177 | self.dataOut = op.run(self.dataOut, **self.kwargs) | ||
|
r1187 | elif optype == 'external': | ||
|
r1177 | self.publish(self.dataOut, opId) | ||
|
r1193 | if self.dataOut.flagNoData and not self.dataOut.error: | ||
|
r1171 | continue | ||
|
r1187 | |||
self.publish(self.dataOut, self.id) | ||||
r1279 | if self.dataOut.error: | |||
|
r1187 | break | ||
r1241 | time.sleep(0.5) | |||
|
r1187 | |||
|
r1171 | def runProc(self): | ||
|
r1177 | ''' | ||
Run function for proccessing units | ||||
''' | ||||
|
r860 | |||
|
r1171 | while True: | ||
r1279 | self.dataIn = self.listen() | |||
|
r1171 | |||
|
r1187 | if self.dataIn.flagNoData and self.dataIn.error is None: | ||
|
r1177 | continue | ||
r1241 | elif not self.dataIn.error: | |||
try: | ||||
BaseClass.run(self, **self.kwargs) | ||||
except: | ||||
self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc())) | ||||
self.dataOut.error = True | ||||
elif self.dataIn.error: | ||||
|
r1193 | self.dataOut.error = self.dataIn.error | ||
|
r1206 | self.dataOut.flagNoData = True | ||
r1279 | ||||
|
r1177 | for op, optype, opId, kwargs in self.operations: | ||
|
r1198 | if optype == 'self' and not self.dataOut.flagNoData: | ||
|
r1177 | op(**kwargs) | ||
|
r1198 | elif optype == 'other' and not self.dataOut.flagNoData: | ||
|
r1177 | self.dataOut = op.run(self.dataOut, **kwargs) | ||
r1279 | elif optype == 'external' and not self.dataOut.flagNoData: | |||
|
r1206 | self.publish(self.dataOut, opId) | ||
r1279 | ||||
r1241 | self.publish(self.dataOut, self.id) | |||
for op, optype, opId, kwargs in self.operations: | ||||
r1279 | if optype == 'external' and self.dataOut.error: | |||
r1241 | self.publish(self.dataOut, opId) | |||
r1279 | ||||
r1241 | if self.dataOut.error: | |||
|
r1187 | break | ||
r1279 | ||||
r1241 | time.sleep(0.5) | |||
|
r1171 | |||
def runOp(self): | ||||
|
r1177 | ''' | ||
|
r1187 | Run function for external operations (this operations just receive data | ||
ex: plots, writers, publishers) | ||||
|
r1177 | ''' | ||
r1279 | ||||
|
r1171 | while True: | ||
|
r1177 | dataOut = self.listen() | ||
|
r1179 | |||
r1241 | if not dataOut.error: | |||
r1268 | try: | |||
BaseClass.run(self, dataOut, **self.kwargs) | ||||
except: | ||||
self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc())) | ||||
dataOut.error = True | ||||
r1241 | else: | |||
r1279 | break | |||
|
r1187 | |||
|
r1171 | def run(self): | ||
|
r1177 | if self.typeProc is "ProcUnit": | ||
|
r1187 | |||
|
r1177 | if self.inputId is not None: | ||
self.subscribe() | ||||
r1279 | ||||
|
r1177 | self.set_publisher() | ||
|
r1171 | |||
|
r1177 | if 'Reader' not in BaseClass.__name__: | ||
self.runProc() | ||||
|
r1171 | else: | ||
self.runReader() | ||||
elif self.typeProc is "Operation": | ||||
|
r1187 | |||
|
r1177 | self.subscribe() | ||
|
r1171 | self.runOp() | ||
|
r860 | |||
|
r1171 | else: | ||
raise ValueError("Unknown type") | ||||
|
r860 | |||
|
r1177 | self.close() | ||
def close(self): | ||||
|
r1187 | |||
BaseClass.close(self) | ||||
r1241 | self.err_queue.put('#_end_#') | |||
|
r1187 | |||
|
r1177 | if self.sender: | ||
self.sender.close() | ||||
|
r1187 | |||
|
r1177 | if self.receiver: | ||
self.receiver.close() | ||||
|
r1187 | log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name) | ||
return MPClass | ||||