##// END OF EJS Templates
update controller change class Project update createObjects
update controller change class Project update createObjects

File last commit:

r1668:8880a814f9ea
r1670:1003ac744c06
Show More
jroproc_base.py
203 lines | 5.1 KiB | text/x-python | PythonLexer
'''
Base clases to create Processing units and operations, the MPDecorator
must be used in plotting and writing operations to allow to run as an
external process.
'''
# repositorio master
import inspect
import zmq
import time
import pickle
import traceback
from threading import Thread
from multiprocessing import Process, Queue
from schainpy.utils import log
class ProcessingUnit(object):
'''
Base class to create Signal Chain Units
'''
proc_type = 'processing'
def __init__(self):
self.dataIn = None
self.dataOut = None
self.isConfig = False
self.operations = []
def setInput(self, unit):
self.dataIn = unit.dataOut
def getAllowedArgs(self):
if hasattr(self, '__attrs__'):
return self.__attrs__
else:
return inspect.getargspec(self.run).args
def addOperation(self, conf, operation):
'''
'''
self.operations.append((operation, conf.type, conf.getKwargs()))
def getOperationObj(self, objId):
if objId not in list(self.operations.keys()):
return None
return self.operations[objId]
def call(self, **kwargs):
'''
'''
try:
if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
return self.dataIn.isReady()
elif self.dataIn is None or not self.dataIn.error:
self.run(**kwargs)
elif self.dataIn.error:
self.dataOut.error = self.dataIn.error
self.dataOut.flagNoData = True
except:
err = traceback.format_exc()
if 'SchainWarning' in err:
log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
elif 'SchainError' in err:
log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name)
else:
log.error(err, self.name)
self.dataOut.error = True
for op, optype, opkwargs in self.operations:
if optype == 'other' and not self.dataOut.flagNoData:
self.dataOut = op.run(self.dataOut, **opkwargs)
elif optype == 'external' and not self.dataOut.flagNoData:
op.queue.put(self.dataOut)
elif optype == 'external' and self.dataOut.error:
op.queue.put(self.dataOut)
return 'Error' if self.dataOut.error else self.dataOut.isReady()
def setup(self):
raise NotImplementedError
def run(self):
raise NotImplementedError
def close(self):
return
class Operation(object):
'''
'''
proc_type = 'operation'
def __init__(self):
self.id = None
self.isConfig = False
if not hasattr(self, 'name'):
self.name = self.__class__.__name__
def getAllowedArgs(self):
if hasattr(self, '__attrs__'):
return self.__attrs__
else:
return inspect.getargspec(self.run).args
def setup(self):
self.isConfig = True
raise NotImplementedError
def run(self, dataIn, **kwargs):
"""
Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
atributos del objeto dataIn.
Input:
dataIn : objeto del tipo JROData
Return:
None
Affected:
__buffer : buffer de recepcion de datos.
"""
if not self.isConfig:
self.setup(**kwargs)
raise NotImplementedError
def close(self):
return
def MPDecorator(BaseClass):
"""
Multiprocessing class decorator
This function add multiprocessing features to a BaseClass.
"""
class MPClass(BaseClass, Process):
def __init__(self, *args, **kwargs):
super(MPClass, self).__init__()
Process.__init__(self)
self.args = args
self.kwargs = kwargs
self.t = time.time()
self.op_type = 'external'
self.name = BaseClass.__name__
self.__doc__ = BaseClass.__doc__
if 'plot' in self.name.lower() and not self.name.endswith('_'):
self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
self.start_time = time.time()
self.err_queue = args[3]
self.queue = Queue(maxsize=1)
self.myrun = BaseClass.run
def run(self):
while True:
dataOut = self.queue.get()
if not dataOut.error:
try:
BaseClass.run(self, dataOut, **self.kwargs)
except:
err = traceback.format_exc()
log.error(err, self.name)
else:
break
self.close()
def close(self):
BaseClass.close(self)
log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
return MPClass