diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index 0f03c68..4f8815c 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -172,6 +172,57 @@ class Operation(object): return +class InputQueue(Thread): + + ''' + + Class to hold input data for Proccessing Units and external Operations, + + ''' + + + + def __init__(self, project_id, inputId): + + Thread.__init__(self) + + self.queue = Queue() + + self.project_id = project_id + + self.inputId = inputId + + + + def run(self): + + + + c = zmq.Context() + + self.receiver = c.socket(zmq.SUB) + + self.receiver.connect( + + 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) + + self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) + + + + while True: + + self.queue.put(self.receiver.recv_multipart()[1]) + + + + def get(self): + + + + return pickle.loads(self.queue.get()) + + def MPDecorator(BaseClass): """ @@ -205,27 +256,21 @@ def MPDecorator(BaseClass): self.err_queue = args[3] self.typeProc = args[4] self.err_queue.put('#_start_#') + self.queue = InputQueue(self.project_id, self.inputId) def subscribe(self): ''' Start the zmq socket receiver and subcribe to input ID. ''' - - c = zmq.Context() - self.receiver = c.socket(zmq.SUB) - self.receiver.connect( - 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) - self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) - + + self.queue.start() def listen(self): ''' This function waits for objects ''' - data = pickle.loads(self.receiver.recv_multipart()[1]) - - return data + return self.queue.get() def set_publisher(self): '''