From 6310ac0f91c8e17ed80d59611a3733165338e22c 2023-09-11 22:52:38 From: Alexander Valdez Date: 2023-09-11 22:52:38 Subject: [PATCH] Update reference ISR repository --- diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index fd35346..e8322e6 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -3,7 +3,7 @@ Base clases to create Processing units and operations, the MPDecorator must be used in plotting and writing operations to allow to run as an external process. ''' -# repositorio master +import os import inspect import zmq import time @@ -12,7 +12,8 @@ import traceback from threading import Thread from multiprocessing import Process, Queue from schainpy.utils import log - +import copy +QUEUE_SIZE = int(os.environ.get('QUEUE_MAX_SIZE', '10')) class ProcessingUnit(object): ''' @@ -27,11 +28,20 @@ class ProcessingUnit(object): self.dataOut = None self.isConfig = False self.operations = [] - + self.name = 'Test' + self.inputs = [] + def setInput(self, unit): - self.dataIn = unit.dataOut - + attr = 'dataIn' + for i, u in enumerate(unit): + if i==0: + self.dataIn = u.dataOut#.copy() + self.inputs.append('dataIn') + else: + setattr(self, 'dataIn{}'.format(i), u.dataOut)#.copy()) + self.inputs.append('dataIn{}'.format(i)) + def getAllowedArgs(self): if hasattr(self, '__attrs__'): return self.__attrs__ @@ -41,7 +51,7 @@ class ProcessingUnit(object): def addOperation(self, conf, operation): ''' ''' - + self.operations.append((operation, conf.type, conf.getKwargs())) def getOperationObj(self, objId): @@ -57,14 +67,17 @@ class ProcessingUnit(object): try: if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error: - return self.dataIn.isReady() + if self.dataIn.runNextUnit: + return not self.dataIn.isReady() + else: + return self.dataIn.isReady() elif self.dataIn is None or not self.dataIn.error: self.run(**kwargs) elif self.dataIn.error: self.dataOut.error = self.dataIn.error self.dataOut.flagNoData = True except: - err = traceback.format_exc() + err = traceback.format_exc() if 'SchainWarning' in err: log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name) elif 'SchainError' in err: @@ -72,16 +85,22 @@ class ProcessingUnit(object): else: log.error(err, self.name) self.dataOut.error = True - for op, optype, opkwargs in self.operations: + aux = self.dataOut.copy() if optype == 'other' and not self.dataOut.flagNoData: self.dataOut = op.run(self.dataOut, **opkwargs) elif optype == 'external' and not self.dataOut.flagNoData: - op.queue.put(self.dataOut) - elif optype == 'external' and self.dataOut.error: - op.queue.put(self.dataOut) - - return 'Error' if self.dataOut.error else self.dataOut.isReady() + op.queue.put(aux) + elif optype == 'external' and self.dataOut.error: + op.queue.put(aux) + try: + if self.dataOut.runNextUnit: + runNextUnit = self.dataOut.runNextUnit + else: + runNextUnit = self.dataOut.isReady() + except: + runNextUnit = self.dataOut.isReady() + return 'Error' if self.dataOut.error else runNextUnit# self.dataOut.isReady() def setup(self): @@ -100,7 +119,7 @@ class Operation(object): ''' ''' - + proc_type = 'operation' def __init__(self): @@ -149,12 +168,12 @@ class Operation(object): return - + def MPDecorator(BaseClass): """ Multiprocessing class decorator - This function add multiprocessing features to a BaseClass. + This function add multiprocessing features to a BaseClass. """ class MPClass(BaseClass, Process): @@ -169,17 +188,17 @@ def MPDecorator(BaseClass): self.op_type = 'external' self.name = BaseClass.__name__ self.__doc__ = BaseClass.__doc__ - + if 'plot' in self.name.lower() and not self.name.endswith('_'): self.name = '{}{}'.format(self.CODE.upper(), 'Plot') - + self.start_time = time.time() self.err_queue = args[3] - self.queue = Queue(maxsize=1) + self.queue = Queue(maxsize=QUEUE_SIZE) self.myrun = BaseClass.run def run(self): - + while True: dataOut = self.queue.get() @@ -188,7 +207,7 @@ def MPDecorator(BaseClass): try: BaseClass.run(self, dataOut, **self.kwargs) except: - err = traceback.format_exc() + err = traceback.format_exc() log.error(err, self.name) else: break @@ -198,6 +217,6 @@ def MPDecorator(BaseClass): def close(self): BaseClass.close(self) - log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name) + log.success('Done...(Time:{:4.2f} secs)'.format(time.time() - self.start_time), self.name) - return MPClass + return MPClass \ No newline at end of file