@@ -172,6 +172,57 class Operation(object): | |||||
172 |
|
172 | |||
173 | return |
|
173 | return | |
174 |
|
174 | |||
|
175 | class InputQueue(Thread): | |||
|
176 | ||||
|
177 | ''' | |||
|
178 | ||||
|
179 | Class to hold input data for Proccessing Units and external Operations, | |||
|
180 | ||||
|
181 | ''' | |||
|
182 | ||||
|
183 | ||||
|
184 | ||||
|
185 | def __init__(self, project_id, inputId): | |||
|
186 | ||||
|
187 | Thread.__init__(self) | |||
|
188 | ||||
|
189 | self.queue = Queue() | |||
|
190 | ||||
|
191 | self.project_id = project_id | |||
|
192 | ||||
|
193 | self.inputId = inputId | |||
|
194 | ||||
|
195 | ||||
|
196 | ||||
|
197 | def run(self): | |||
|
198 | ||||
|
199 | ||||
|
200 | ||||
|
201 | c = zmq.Context() | |||
|
202 | ||||
|
203 | self.receiver = c.socket(zmq.SUB) | |||
|
204 | ||||
|
205 | self.receiver.connect( | |||
|
206 | ||||
|
207 | 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) | |||
|
208 | ||||
|
209 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) | |||
|
210 | ||||
|
211 | ||||
|
212 | ||||
|
213 | while True: | |||
|
214 | ||||
|
215 | self.queue.put(self.receiver.recv_multipart()[1]) | |||
|
216 | ||||
|
217 | ||||
|
218 | ||||
|
219 | def get(self): | |||
|
220 | ||||
|
221 | ||||
|
222 | ||||
|
223 | return pickle.loads(self.queue.get()) | |||
|
224 | ||||
|
225 | ||||
175 |
|
226 | |||
176 | def MPDecorator(BaseClass): |
|
227 | def MPDecorator(BaseClass): | |
177 | """ |
|
228 | """ | |
@@ -205,27 +256,21 def MPDecorator(BaseClass): | |||||
205 | self.err_queue = args[3] |
|
256 | self.err_queue = args[3] | |
206 | self.typeProc = args[4] |
|
257 | self.typeProc = args[4] | |
207 | self.err_queue.put('#_start_#') |
|
258 | self.err_queue.put('#_start_#') | |
|
259 | self.queue = InputQueue(self.project_id, self.inputId) | |||
208 |
|
260 | |||
209 | def subscribe(self): |
|
261 | def subscribe(self): | |
210 | ''' |
|
262 | ''' | |
211 | Start the zmq socket receiver and subcribe to input ID. |
|
263 | Start the zmq socket receiver and subcribe to input ID. | |
212 | ''' |
|
264 | ''' | |
213 |
|
265 | |||
214 |
|
|
266 | self.queue.start() | |
215 | self.receiver = c.socket(zmq.SUB) |
|
|||
216 | self.receiver.connect( |
|
|||
217 | 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) |
|
|||
218 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) |
|
|||
219 |
|
||||
220 |
|
267 | |||
221 | def listen(self): |
|
268 | def listen(self): | |
222 | ''' |
|
269 | ''' | |
223 | This function waits for objects |
|
270 | This function waits for objects | |
224 | ''' |
|
271 | ''' | |
225 |
|
272 | |||
226 | data = pickle.loads(self.receiver.recv_multipart()[1]) |
|
273 | return self.queue.get() | |
227 |
|
||||
228 | return data |
|
|||
229 |
|
274 | |||
230 | def set_publisher(self): |
|
275 | def set_publisher(self): | |
231 | ''' |
|
276 | ''' |
General Comments 0
You need to be logged in to leave comments.
Login now