##// END OF EJS Templates
Add input queue again :(
jespinoza -
r1250:3296460acfb3
parent child
Show More
@@ -172,6 +172,57 class Operation(object):
172
172
173 return
173 return
174
174
175 class InputQueue(Thread):
176
177 '''
178
179 Class to hold input data for Proccessing Units and external Operations,
180
181 '''
182
183
184
185 def __init__(self, project_id, inputId):
186
187 Thread.__init__(self)
188
189 self.queue = Queue()
190
191 self.project_id = project_id
192
193 self.inputId = inputId
194
195
196
197 def run(self):
198
199
200
201 c = zmq.Context()
202
203 self.receiver = c.socket(zmq.SUB)
204
205 self.receiver.connect(
206
207 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
208
209 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
210
211
212
213 while True:
214
215 self.queue.put(self.receiver.recv_multipart()[1])
216
217
218
219 def get(self):
220
221
222
223 return pickle.loads(self.queue.get())
224
225
175
226
176 def MPDecorator(BaseClass):
227 def MPDecorator(BaseClass):
177 """
228 """
@@ -205,27 +256,21 def MPDecorator(BaseClass):
205 self.err_queue = args[3]
256 self.err_queue = args[3]
206 self.typeProc = args[4]
257 self.typeProc = args[4]
207 self.err_queue.put('#_start_#')
258 self.err_queue.put('#_start_#')
259 self.queue = InputQueue(self.project_id, self.inputId)
208
260
209 def subscribe(self):
261 def subscribe(self):
210 '''
262 '''
211 Start the zmq socket receiver and subcribe to input ID.
263 Start the zmq socket receiver and subcribe to input ID.
212 '''
264 '''
213
265
214 c = zmq.Context()
266 self.queue.start()
215 self.receiver = c.socket(zmq.SUB)
216 self.receiver.connect(
217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
219
220
267
221 def listen(self):
268 def listen(self):
222 '''
269 '''
223 This function waits for objects
270 This function waits for objects
224 '''
271 '''
225
272
226 data = pickle.loads(self.receiver.recv_multipart()[1])
273 return self.queue.get()
227
228 return data
229
274
230 def set_publisher(self):
275 def set_publisher(self):
231 '''
276 '''
General Comments 0
You need to be logged in to leave comments. Login now