From fff8599e034647a32fa22b24c1771716c167b568 2019-08-12 16:54:13 From: Juan C. Espinoza Date: 2019-08-12 16:54:13 Subject: [PATCH] Add input queues for processing units and external operations --- diff --git a/schainpy/model/io/bltrIO_param.py b/schainpy/model/io/bltrIO_param.py index cd839ce..07cd39c 100644 --- a/schainpy/model/io/bltrIO_param.py +++ b/schainpy/model/io/bltrIO_param.py @@ -230,8 +230,7 @@ class BLTRParamReader(JRODataReader, ProcessingUnit): self.sizeOfFile = os.path.getsize(self.filename) self.counter_records = 0 self.flagIsNewFile = 0 - self.fileIndex += 1 - time.sleep(2) + self.fileIndex += 1 return 1 diff --git a/schainpy/model/io/jroIO_base.py b/schainpy/model/io/jroIO_base.py index 69529ad..32c730f 100644 --- a/schainpy/model/io/jroIO_base.py +++ b/schainpy/model/io/jroIO_base.py @@ -1598,11 +1598,9 @@ class JRODataWriter(JRODataIO): self.basicHeaderObj.size = self.basicHeaderSize # bytes self.basicHeaderObj.version = self.versionFile - self.basicHeaderObj.dataBlock = self.nTotalBlocks - log.warning(datetime.datetime.fromtimestamp(self.dataOut.utctime)) + self.basicHeaderObj.dataBlock = self.nTotalBlocks utc = numpy.floor(self.dataOut.utctime) - milisecond = (self.dataOut.utctime - utc) * 1000.0 - log.warning(milisecond) + milisecond = (self.dataOut.utctime - utc) * 1000.0 self.basicHeaderObj.utc = utc self.basicHeaderObj.miliSecond = milisecond self.basicHeaderObj.timeZone = self.dataOut.timeZone diff --git a/schainpy/model/io/jroIO_param.py b/schainpy/model/io/jroIO_param.py index 7bf51a0..efbd32b 100644 --- a/schainpy/model/io/jroIO_param.py +++ b/schainpy/model/io/jroIO_param.py @@ -1503,7 +1503,7 @@ class ParameterWriter(Operation): data.append((dsInfo['variable'], i)) fp.flush() - log.log('creating file: {}'.format(fp.filename), 'Writing') + log.log('Creating file: {}'.format(fp.filename), self.name) self.ds = dtsets self.data = data @@ -1526,6 +1526,7 @@ class ParameterWriter(Operation): self.fp.flush() self.blockIndex += 1 + log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name) return diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index 902fde2..f82e1e3 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -17,6 +17,8 @@ import inspect import zmq import time import pickle +from queue import Queue +from threading import Thread from multiprocessing import Process from zmq.utils.monitor import recv_monitor_message @@ -170,6 +172,32 @@ class Operation(object): return +class InputQueue(Thread): + ''' + Class to hold input data for Proccessing Units and external Operations, + ''' + + def __init__(self, project_id, inputId): + Thread.__init__(self) + self.queue = Queue() + self.project_id = project_id + self.inputId = inputId + + def run(self): + + c = zmq.Context() + self.receiver = c.socket(zmq.SUB) + self.receiver.connect( + 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) + self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) + + while True: + self.queue.put(self.receiver.recv_multipart()[1]) + + def get(self): + + return pickle.loads(self.queue.get()) + def MPDecorator(BaseClass): """ @@ -189,7 +217,7 @@ def MPDecorator(BaseClass): self.kwargs = kwargs self.sender = None self.receiver = None - self.i = 0 + self.i = 0 self.name = BaseClass.__name__ if 'plot' in self.name.lower() and not self.name.endswith('_'): self.name = '{}{}'.format(self.CODE.upper(), 'Plot') @@ -205,33 +233,23 @@ def MPDecorator(BaseClass): self.inputId = args[0] self.project_id = args[1] self.typeProc = "Operation" - - def fix_publish(self,valor,multiple1): - return True if valor%multiple1 ==0 else False + + self.queue = InputQueue(self.project_id, self.inputId) def subscribe(self): ''' - This function create a socket to receive objects from the - topic `inputId`. + This function start the input queue. ''' - - c = zmq.Context() - self.receiver = c.socket(zmq.SUB) - self.receiver.connect( - 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) - self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) + + self.queue.start() + def listen(self): ''' - This function waits for objects and deserialize using pickle + This function waits for objects ''' - try: - data = pickle.loads(self.receiver.recv_multipart()[1]) - except zmq.ZMQError as e: - if e.errno == zmq.ETERM: - print (e.errno) - - return data + + return self.queue.get() def set_publisher(self): ''' @@ -247,13 +265,15 @@ def MPDecorator(BaseClass): def publish(self, data, id): ''' This function publish an object, to a specific topic. - The fix method only affect inputId None which is Read Unit - Use value between 64 80, you should notice a little retard in processing + For Read Units (inputId == None) adds a little delay + to avoid data loss ''' + if self.inputId is None: - self.i+=1 - if self.fix_publish(self.i,80) == True:# value n - time.sleep(0.01) + self.i += 1 + if self.i % 100 == 0: + self.i = 0 + time.sleep(0.01) self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])