@@ -172,6 +172,57 class Operation(object): | |||
|
172 | 172 | |
|
173 | 173 | return |
|
174 | 174 | |
|
175 | class InputQueue(Thread): | |
|
176 | ||
|
177 | ''' | |
|
178 | ||
|
179 | Class to hold input data for Proccessing Units and external Operations, | |
|
180 | ||
|
181 | ''' | |
|
182 | ||
|
183 | ||
|
184 | ||
|
185 | def __init__(self, project_id, inputId): | |
|
186 | ||
|
187 | Thread.__init__(self) | |
|
188 | ||
|
189 | self.queue = Queue() | |
|
190 | ||
|
191 | self.project_id = project_id | |
|
192 | ||
|
193 | self.inputId = inputId | |
|
194 | ||
|
195 | ||
|
196 | ||
|
197 | def run(self): | |
|
198 | ||
|
199 | ||
|
200 | ||
|
201 | c = zmq.Context() | |
|
202 | ||
|
203 | self.receiver = c.socket(zmq.SUB) | |
|
204 | ||
|
205 | self.receiver.connect( | |
|
206 | ||
|
207 | 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) | |
|
208 | ||
|
209 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) | |
|
210 | ||
|
211 | ||
|
212 | ||
|
213 | while True: | |
|
214 | ||
|
215 | self.queue.put(self.receiver.recv_multipart()[1]) | |
|
216 | ||
|
217 | ||
|
218 | ||
|
219 | def get(self): | |
|
220 | ||
|
221 | ||
|
222 | ||
|
223 | return pickle.loads(self.queue.get()) | |
|
224 | ||
|
225 | ||
|
175 | 226 | |
|
176 | 227 | def MPDecorator(BaseClass): |
|
177 | 228 | """ |
@@ -205,27 +256,21 def MPDecorator(BaseClass): | |||
|
205 | 256 | self.err_queue = args[3] |
|
206 | 257 | self.typeProc = args[4] |
|
207 | 258 | self.err_queue.put('#_start_#') |
|
259 | self.queue = InputQueue(self.project_id, self.inputId) | |
|
208 | 260 | |
|
209 | 261 | def subscribe(self): |
|
210 | 262 | ''' |
|
211 | 263 | Start the zmq socket receiver and subcribe to input ID. |
|
212 | 264 | ''' |
|
213 | ||
|
214 |
|
|
|
215 | self.receiver = c.socket(zmq.SUB) | |
|
216 | self.receiver.connect( | |
|
217 | 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) | |
|
218 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) | |
|
219 | ||
|
265 | ||
|
266 | self.queue.start() | |
|
220 | 267 | |
|
221 | 268 | def listen(self): |
|
222 | 269 | ''' |
|
223 | 270 | This function waits for objects |
|
224 | 271 | ''' |
|
225 | 272 | |
|
226 | data = pickle.loads(self.receiver.recv_multipart()[1]) | |
|
227 | ||
|
228 | return data | |
|
273 | return self.queue.get() | |
|
229 | 274 | |
|
230 | 275 | def set_publisher(self): |
|
231 | 276 | ''' |
General Comments 0
You need to be logged in to leave comments.
Login now