jroproc_base.py
207 lines
| 5.2 KiB
| text/x-python
|
PythonLexer
|
r487 | ''' | ||
|
r1287 | Base clases to create Processing units and operations, the MPDecorator | ||
must be used in plotting and writing operations to allow to run as an | ||||
external process. | ||||
|
r487 | ''' | ||
|
r1187 | |||
|
r929 | import inspect | ||
|
r1171 | import zmq | ||
import time | ||||
import pickle | ||||
r1241 | import traceback | |||
r1252 | try: | |||
from queue import Queue | ||||
except: | ||||
from Queue import Queue | ||||
|
r1235 | from threading import Thread | ||
|
r1287 | from multiprocessing import Process, Queue | ||
|
r1171 | from schainpy.utils import log | ||
|
r929 | |||
|
r487 | |||
|
r568 | class ProcessingUnit(object): | ||
|
r1287 | ''' | ||
Base class to create Signal Chain Units | ||||
''' | ||||
|
r860 | |||
r1254 | proc_type = 'processing' | |||
|
r860 | |||
|
r1171 | def __init__(self): | ||
|
r860 | |||
|
r487 | self.dataIn = None | ||
|
r573 | self.dataOut = None | ||
|
r487 | self.isConfig = False | ||
|
r1177 | self.operations = [] | ||
|
r1287 | |||
def setInput(self, unit): | ||||
|
r860 | |||
|
r1287 | self.dataIn = unit.dataOut | ||
|
r929 | def getAllowedArgs(self): | ||
|
r1097 | if hasattr(self, '__attrs__'): | ||
return self.__attrs__ | ||||
else: | ||||
return inspect.getargspec(self.run).args | ||||
|
r860 | |||
|
r1187 | def addOperation(self, conf, operation): | ||
|
r1287 | ''' | ||
''' | ||||
|
r1187 | |||
|
r1287 | self.operations.append((operation, conf.type, conf.getKwargs())) | ||
|
r1171 | |||
|
r577 | def getOperationObj(self, objId): | ||
|
r860 | |||
|
r1177 | if objId not in list(self.operations.keys()): | ||
|
r577 | return None | ||
|
r860 | |||
|
r1177 | return self.operations[objId] | ||
|
r860 | |||
|
r1287 | def call(self, **kwargs): | ||
''' | ||||
''' | ||||
try: | ||||
if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error: | ||||
return self.dataIn.isReady() | ||||
elif self.dataIn is None or not self.dataIn.error: | ||||
self.run(**kwargs) | ||||
elif self.dataIn.error: | ||||
self.dataOut.error = self.dataIn.error | ||||
self.dataOut.flagNoData = True | ||||
except: | ||||
err = traceback.format_exc() | ||||
if 'SchainWarning' in err: | ||||
log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name) | ||||
elif 'SchainError' in err: | ||||
log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name) | ||||
else: | ||||
|
r1289 | log.error(err.split('\n')[-2], self.name) | ||
|
r1287 | self.dataOut.error = True | ||
for op, optype, opkwargs in self.operations: | ||||
if optype == 'other' and not self.dataOut.flagNoData: | ||||
self.dataOut = op.run(self.dataOut, **opkwargs) | ||||
elif optype == 'external' and not self.dataOut.flagNoData: | ||||
op.queue.put(self.dataOut) | ||||
elif optype == 'external' and self.dataOut.error: | ||||
op.queue.put(self.dataOut) | ||||
|
r860 | |||
|
r1287 | return 'Error' if self.dataOut.error else self.dataOut.isReady() | ||
|
r860 | |||
|
r1171 | def setup(self): | ||
|
r860 | |||
|
r1171 | raise NotImplementedError | ||
|
r860 | |||
|
r1171 | def run(self): | ||
|
r860 | |||
|
r1171 | raise NotImplementedError | ||
|
r860 | |||
|
r1171 | def close(self): | ||
|
r1187 | |||
|
r1171 | return | ||
|
r1187 | |||
|
r1171 | class Operation(object): | ||
|
r860 | |||
|
r1287 | ''' | ||
''' | ||||
r1254 | proc_type = 'operation' | |||
|
r860 | |||
|
r1171 | def __init__(self): | ||
|
r860 | |||
|
r1187 | self.id = None | ||
|
r1171 | self.isConfig = False | ||
|
r860 | |||
|
r1171 | if not hasattr(self, 'name'): | ||
self.name = self.__class__.__name__ | ||||
|
r1187 | |||
|
r1171 | def getAllowedArgs(self): | ||
if hasattr(self, '__attrs__'): | ||||
return self.__attrs__ | ||||
else: | ||||
return inspect.getargspec(self.run).args | ||||
|
r860 | |||
|
r1171 | def setup(self): | ||
|
r860 | |||
|
r1171 | self.isConfig = True | ||
|
r860 | |||
|
r1171 | raise NotImplementedError | ||
|
r860 | |||
|
r1171 | def run(self, dataIn, **kwargs): | ||
|
r487 | """ | ||
|
r1171 | Realiza las operaciones necesarias sobre la dataIn.data y actualiza los | ||
atributos del objeto dataIn. | ||||
|
r860 | |||
|
r568 | Input: | ||
|
r860 | |||
|
r1171 | dataIn : objeto del tipo JROData | ||
|
r860 | |||
|
r1171 | Return: | ||
|
r860 | |||
|
r1171 | None | ||
|
r860 | |||
|
r1171 | Affected: | ||
__buffer : buffer de recepcion de datos. | ||||
|
r860 | |||
|
r1171 | """ | ||
if not self.isConfig: | ||||
self.setup(**kwargs) | ||||
|
r860 | |||
|
r1171 | raise NotImplementedError | ||
|
r860 | |||
|
r1171 | def close(self): | ||
|
r860 | |||
|
r1187 | return | ||
|
r860 | |||
r1268 | ||||
|
r1171 | def MPDecorator(BaseClass): | ||
""" | ||||
|
r1177 | Multiprocessing class decorator | ||
|
r860 | |||
|
r1287 | This function add multiprocessing features to a BaseClass. | ||
|
r1171 | """ | ||
|
r1187 | |||
|
r1171 | class MPClass(BaseClass, Process): | ||
|
r1177 | |||
|
r1171 | def __init__(self, *args, **kwargs): | ||
super(MPClass, self).__init__() | ||||
Process.__init__(self) | ||||
|
r1287 | |||
|
r1171 | self.args = args | ||
self.kwargs = kwargs | ||||
r1245 | self.t = time.time() | |||
|
r1287 | self.op_type = 'external' | ||
|
r1177 | self.name = BaseClass.__name__ | ||
r1254 | self.__doc__ = BaseClass.__doc__ | |||
r1241 | ||||
|
r1201 | if 'plot' in self.name.lower() and not self.name.endswith('_'): | ||
self.name = '{}{}'.format(self.CODE.upper(), 'Plot') | ||||
r1241 | ||||
|
r1287 | self.start_time = time.time() | ||
r1241 | self.err_queue = args[3] | |||
|
r1287 | self.queue = Queue(maxsize=1) | ||
self.myrun = BaseClass.run | ||||
|
r1171 | |||
|
r1287 | def run(self): | ||
|
r1191 | |||
|
r1171 | while True: | ||
|
r1287 | dataOut = self.queue.get() | ||
|
r1179 | |||
r1241 | if not dataOut.error: | |||
r1268 | try: | |||
BaseClass.run(self, dataOut, **self.kwargs) | ||||
except: | ||||
|
r1287 | err = traceback.format_exc() | ||
log.error(err.split('\n')[-2], self.name) | ||||
|
r1171 | else: | ||
|
r1287 | break | ||
|
r860 | |||
|
r1177 | self.close() | ||
def close(self): | ||||
|
r1187 | |||
BaseClass.close(self) | ||||
log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name) | ||||
return MPClass | ||||