jroproc_base.py
221 lines
| 5.8 KiB
| text/x-python
|
PythonLexer
|
r487 | ''' | |
|
r1287 | Base clases to create Processing units and operations, the MPDecorator | |
must be used in plotting and writing operations to allow to run as an | |||
external process. | |||
|
r487 | ''' | |
|
r1674 | import os | |
|
r929 | import inspect | |
|
r1171 | import zmq | |
import time | |||
import pickle | |||
r1241 | import traceback | ||
|
r1235 | from threading import Thread | |
|
r1287 | from multiprocessing import Process, Queue | |
|
r1171 | from schainpy.utils import log | |
|
r1674 | import copy | |
QUEUE_SIZE = int(os.environ.get('QUEUE_MAX_SIZE', '10')) | |||
|
r487 | ||
|
r568 | class ProcessingUnit(object): | |
|
r1287 | ''' | |
Base class to create Signal Chain Units | |||
''' | |||
|
r860 | ||
r1254 | proc_type = 'processing' | ||
|
r860 | ||
|
r1171 | def __init__(self): | |
|
r860 | ||
|
r487 | self.dataIn = None | |
|
r573 | self.dataOut = None | |
|
r487 | self.isConfig = False | |
|
r1177 | self.operations = [] | |
|
r1674 | self.name = 'Test' | |
self.inputs = [] | |||
|
r1287 | def setInput(self, unit): | |
|
r860 | ||
|
r1674 | attr = 'dataIn' | |
for i, u in enumerate(unit): | |||
if i==0: | |||
self.dataIn = u.dataOut#.copy() | |||
self.inputs.append('dataIn') | |||
else: | |||
setattr(self, 'dataIn{}'.format(i), u.dataOut)#.copy()) | |||
self.inputs.append('dataIn{}'.format(i)) | |||
|
r929 | def getAllowedArgs(self): | |
|
r1097 | if hasattr(self, '__attrs__'): | |
return self.__attrs__ | |||
else: | |||
return inspect.getargspec(self.run).args | |||
|
r860 | ||
|
r1187 | def addOperation(self, conf, operation): | |
|
r1287 | ''' | |
''' | |||
|
r1674 | ||
|
r1287 | self.operations.append((operation, conf.type, conf.getKwargs())) | |
|
r1171 | ||
|
r577 | def getOperationObj(self, objId): | |
|
r860 | ||
|
r1177 | if objId not in list(self.operations.keys()): | |
|
r577 | return None | |
|
r860 | ||
|
r1177 | return self.operations[objId] | |
|
r860 | ||
|
r1287 | def call(self, **kwargs): | |
''' | |||
''' | |||
try: | |||
if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error: | |||
|
r1674 | if self.dataIn.runNextUnit: | |
return not self.dataIn.isReady() | |||
else: | |||
return self.dataIn.isReady() | |||
|
r1287 | elif self.dataIn is None or not self.dataIn.error: | |
self.run(**kwargs) | |||
elif self.dataIn.error: | |||
self.dataOut.error = self.dataIn.error | |||
self.dataOut.flagNoData = True | |||
except: | |||
|
r1674 | err = traceback.format_exc() | |
|
r1287 | if 'SchainWarning' in err: | |
log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name) | |||
elif 'SchainError' in err: | |||
log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name) | |||
else: | |||
|
r1310 | log.error(err, self.name) | |
|
r1287 | self.dataOut.error = True | |
for op, optype, opkwargs in self.operations: | |||
|
r1674 | aux = self.dataOut.copy() | |
|
r1287 | if optype == 'other' and not self.dataOut.flagNoData: | |
self.dataOut = op.run(self.dataOut, **opkwargs) | |||
elif optype == 'external' and not self.dataOut.flagNoData: | |||
|
r1674 | op.queue.put(aux) | |
elif optype == 'external' and self.dataOut.error: | |||
op.queue.put(aux) | |||
try: | |||
if self.dataOut.runNextUnit: | |||
runNextUnit = self.dataOut.runNextUnit | |||
else: | |||
runNextUnit = self.dataOut.isReady() | |||
except: | |||
runNextUnit = self.dataOut.isReady() | |||
return 'Error' if self.dataOut.error else runNextUnit# self.dataOut.isReady() | |||
|
r860 | ||
|
r1171 | def setup(self): | |
|
r860 | ||
|
r1171 | raise NotImplementedError | |
|
r860 | ||
|
r1171 | def run(self): | |
|
r860 | ||
|
r1171 | raise NotImplementedError | |
|
r860 | ||
|
r1171 | def close(self): | |
|
r1187 | ||
|
r1171 | return | |
|
r1187 | ||
|
r1171 | class Operation(object): | |
|
r860 | ||
|
r1287 | ''' | |
''' | |||
|
r1674 | ||
r1254 | proc_type = 'operation' | ||
|
r860 | ||
|
r1171 | def __init__(self): | |
|
r860 | ||
|
r1187 | self.id = None | |
|
r1171 | self.isConfig = False | |
|
r860 | ||
|
r1171 | if not hasattr(self, 'name'): | |
self.name = self.__class__.__name__ | |||
|
r1187 | ||
|
r1171 | def getAllowedArgs(self): | |
if hasattr(self, '__attrs__'): | |||
return self.__attrs__ | |||
else: | |||
return inspect.getargspec(self.run).args | |||
|
r860 | ||
|
r1171 | def setup(self): | |
|
r860 | ||
|
r1171 | self.isConfig = True | |
|
r860 | ||
|
r1171 | raise NotImplementedError | |
|
r860 | ||
|
r1171 | def run(self, dataIn, **kwargs): | |
|
r487 | """ | |
|
r1171 | Realiza las operaciones necesarias sobre la dataIn.data y actualiza los | |
atributos del objeto dataIn. | |||
|
r860 | ||
|
r568 | Input: | |
|
r860 | ||
|
r1171 | dataIn : objeto del tipo JROData | |
|
r860 | ||
|
r1171 | Return: | |
|
r860 | ||
|
r1171 | None | |
|
r860 | ||
|
r1171 | Affected: | |
__buffer : buffer de recepcion de datos. | |||
|
r860 | ||
|
r1171 | """ | |
if not self.isConfig: | |||
self.setup(**kwargs) | |||
|
r860 | ||
|
r1171 | raise NotImplementedError | |
|
r860 | ||
|
r1171 | def close(self): | |
|
r860 | ||
|
r1187 | return | |
|
r860 | ||
|
r1674 | ||
|
r1171 | def MPDecorator(BaseClass): | |
""" | |||
|
r1177 | Multiprocessing class decorator | |
|
r860 | ||
|
r1674 | This function add multiprocessing features to a BaseClass. | |
|
r1171 | """ | |
|
r1187 | ||
|
r1171 | class MPClass(BaseClass, Process): | |
|
r1177 | ||
|
r1171 | def __init__(self, *args, **kwargs): | |
super(MPClass, self).__init__() | |||
Process.__init__(self) | |||
|
r1287 | ||
|
r1171 | self.args = args | |
self.kwargs = kwargs | |||
r1245 | self.t = time.time() | ||
|
r1287 | self.op_type = 'external' | |
|
r1177 | self.name = BaseClass.__name__ | |
r1254 | self.__doc__ = BaseClass.__doc__ | ||
|
r1674 | ||
|
r1201 | if 'plot' in self.name.lower() and not self.name.endswith('_'): | |
self.name = '{}{}'.format(self.CODE.upper(), 'Plot') | |||
|
r1674 | ||
|
r1287 | self.start_time = time.time() | |
r1241 | self.err_queue = args[3] | ||
|
r1674 | self.queue = Queue(maxsize=QUEUE_SIZE) | |
|
r1287 | self.myrun = BaseClass.run | |
|
r1171 | ||
|
r1287 | def run(self): | |
|
r1674 | ||
|
r1171 | while True: | |
|
r1287 | dataOut = self.queue.get() | |
|
r1179 | ||
r1241 | if not dataOut.error: | ||
r1268 | try: | ||
BaseClass.run(self, dataOut, **self.kwargs) | |||
except: | |||
|
r1674 | err = traceback.format_exc() | |
|
r1310 | log.error(err, self.name) | |
|
r1171 | else: | |
|
r1287 | break | |
|
r860 | ||
|
r1177 | self.close() | |
def close(self): | |||
|
r1187 | ||
BaseClass.close(self) | |||
|
r1674 | log.success('Done...(Time:{:4.2f} secs)'.format(time.time() - self.start_time), self.name) | |
|
r1187 | ||
|
r1674 | return MPClass |