##// END OF EJS Templates
Add input queue again :(
jespinoza -
r1250:3296460acfb3
parent child
Show More
@@ -172,6 +172,57 class Operation(object):
172 172
173 173 return
174 174
175 class InputQueue(Thread):
176
177 '''
178
179 Class to hold input data for Proccessing Units and external Operations,
180
181 '''
182
183
184
185 def __init__(self, project_id, inputId):
186
187 Thread.__init__(self)
188
189 self.queue = Queue()
190
191 self.project_id = project_id
192
193 self.inputId = inputId
194
195
196
197 def run(self):
198
199
200
201 c = zmq.Context()
202
203 self.receiver = c.socket(zmq.SUB)
204
205 self.receiver.connect(
206
207 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
208
209 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
210
211
212
213 while True:
214
215 self.queue.put(self.receiver.recv_multipart()[1])
216
217
218
219 def get(self):
220
221
222
223 return pickle.loads(self.queue.get())
224
225
175 226
176 227 def MPDecorator(BaseClass):
177 228 """
@@ -205,27 +256,21 def MPDecorator(BaseClass):
205 256 self.err_queue = args[3]
206 257 self.typeProc = args[4]
207 258 self.err_queue.put('#_start_#')
259 self.queue = InputQueue(self.project_id, self.inputId)
208 260
209 261 def subscribe(self):
210 262 '''
211 263 Start the zmq socket receiver and subcribe to input ID.
212 264 '''
213
214 c = zmq.Context()
215 self.receiver = c.socket(zmq.SUB)
216 self.receiver.connect(
217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
219
265
266 self.queue.start()
220 267
221 268 def listen(self):
222 269 '''
223 270 This function waits for objects
224 271 '''
225 272
226 data = pickle.loads(self.receiver.recv_multipart()[1])
227
228 return data
273 return self.queue.get()
229 274
230 275 def set_publisher(self):
231 276 '''
General Comments 0
You need to be logged in to leave comments. Login now