From 1ee18bfa3eb69f2f40da16c3ca79f7d85f3a4408 2019-09-18 14:40:03 From: jespinoza Date: 2019-09-18 14:40:03 Subject: [PATCH] Use of delays instead of input queue to keep dataouts and avoid loose of them --- diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index 788379e..0f03c68 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -172,32 +172,6 @@ class Operation(object): return -class InputQueue(Thread): - ''' - Class to hold input data for Proccessing Units and external Operations, - ''' - - def __init__(self, project_id, inputId): - Thread.__init__(self) - self.queue = Queue() - self.project_id = project_id - self.inputId = inputId - - def run(self): - - c = zmq.Context() - self.receiver = c.socket(zmq.SUB) - self.receiver.connect( - 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) - self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) - - while True: - self.queue.put(self.receiver.recv_multipart()[1]) - - def get(self): - - return pickle.loads(self.queue.get()) - def MPDecorator(BaseClass): """ @@ -218,6 +192,7 @@ def MPDecorator(BaseClass): self.sender = None self.receiver = None self.i = 0 + self.t = time.time() self.name = BaseClass.__name__ if 'plot' in self.name.lower() and not self.name.endswith('_'): @@ -228,31 +203,37 @@ def MPDecorator(BaseClass): self.inputId = args[1] self.project_id = args[2] self.err_queue = args[3] - self.typeProc = args[4] - self.queue = InputQueue(self.project_id, self.inputId) + self.typeProc = args[4] self.err_queue.put('#_start_#') def subscribe(self): ''' - This function start the input queue. + Start the zmq socket receiver and subcribe to input ID. ''' - - self.queue.start() + + c = zmq.Context() + self.receiver = c.socket(zmq.SUB) + self.receiver.connect( + 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) + self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) def listen(self): ''' This function waits for objects ''' - - return self.queue.get() + + data = pickle.loads(self.receiver.recv_multipart()[1]) + + return data def set_publisher(self): ''' - This function create a socket for publishing purposes. + This function create a zmq socket for publishing objects. ''' - time.sleep(1) + time.sleep(0.5) + c = zmq.Context() self.sender = c.socket(zmq.PUB) self.sender.connect( @@ -260,16 +241,21 @@ def MPDecorator(BaseClass): def publish(self, data, id): ''' - This function publish an object, to a specific topic. + This function publish an object, to an specific topic. For Read Units (inputId == None) adds a little delay to avoid data loss ''' if self.inputId is None: self.i += 1 - if self.i % 80 == 0: + if self.i % 40 == 0 and time.time()-self.t > 0.1: + self.i = 0 + self.t = time.time() + time.sleep(0.05) + elif self.i % 40 == 0: self.i = 0 - time.sleep(0.01) + self.t = time.time() + time.sleep(0.01) self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])