@@ -172,32 +172,6 class Operation(object): | |||||
172 |
|
172 | |||
173 | return |
|
173 | return | |
174 |
|
174 | |||
175 | class InputQueue(Thread): |
|
|||
176 | ''' |
|
|||
177 | Class to hold input data for Proccessing Units and external Operations, |
|
|||
178 | ''' |
|
|||
179 |
|
||||
180 | def __init__(self, project_id, inputId): |
|
|||
181 | Thread.__init__(self) |
|
|||
182 | self.queue = Queue() |
|
|||
183 | self.project_id = project_id |
|
|||
184 | self.inputId = inputId |
|
|||
185 |
|
||||
186 | def run(self): |
|
|||
187 |
|
||||
188 | c = zmq.Context() |
|
|||
189 | self.receiver = c.socket(zmq.SUB) |
|
|||
190 | self.receiver.connect( |
|
|||
191 | 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) |
|
|||
192 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) |
|
|||
193 |
|
||||
194 | while True: |
|
|||
195 | self.queue.put(self.receiver.recv_multipart()[1]) |
|
|||
196 |
|
||||
197 | def get(self): |
|
|||
198 |
|
||||
199 | return pickle.loads(self.queue.get()) |
|
|||
200 |
|
||||
201 |
|
175 | |||
202 | def MPDecorator(BaseClass): |
|
176 | def MPDecorator(BaseClass): | |
203 | """ |
|
177 | """ | |
@@ -218,6 +192,7 def MPDecorator(BaseClass): | |||||
218 | self.sender = None |
|
192 | self.sender = None | |
219 | self.receiver = None |
|
193 | self.receiver = None | |
220 | self.i = 0 |
|
194 | self.i = 0 | |
|
195 | self.t = time.time() | |||
221 | self.name = BaseClass.__name__ |
|
196 | self.name = BaseClass.__name__ | |
222 |
|
197 | |||
223 | if 'plot' in self.name.lower() and not self.name.endswith('_'): |
|
198 | if 'plot' in self.name.lower() and not self.name.endswith('_'): | |
@@ -228,31 +203,37 def MPDecorator(BaseClass): | |||||
228 | self.inputId = args[1] |
|
203 | self.inputId = args[1] | |
229 | self.project_id = args[2] |
|
204 | self.project_id = args[2] | |
230 | self.err_queue = args[3] |
|
205 | self.err_queue = args[3] | |
231 | self.typeProc = args[4] |
|
206 | self.typeProc = args[4] | |
232 | self.queue = InputQueue(self.project_id, self.inputId) |
|
|||
233 | self.err_queue.put('#_start_#') |
|
207 | self.err_queue.put('#_start_#') | |
234 |
|
208 | |||
235 | def subscribe(self): |
|
209 | def subscribe(self): | |
236 | ''' |
|
210 | ''' | |
237 | This function start the input queue. |
|
211 | Start the zmq socket receiver and subcribe to input ID. | |
238 | ''' |
|
212 | ''' | |
239 |
|
213 | |||
240 | self.queue.start() |
|
214 | c = zmq.Context() | |
|
215 | self.receiver = c.socket(zmq.SUB) | |||
|
216 | self.receiver.connect( | |||
|
217 | 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) | |||
|
218 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) | |||
241 |
|
219 | |||
242 |
|
220 | |||
243 | def listen(self): |
|
221 | def listen(self): | |
244 | ''' |
|
222 | ''' | |
245 | This function waits for objects |
|
223 | This function waits for objects | |
246 | ''' |
|
224 | ''' | |
247 |
|
225 | |||
248 | return self.queue.get() |
|
226 | data = pickle.loads(self.receiver.recv_multipart()[1]) | |
|
227 | ||||
|
228 | return data | |||
249 |
|
229 | |||
250 | def set_publisher(self): |
|
230 | def set_publisher(self): | |
251 | ''' |
|
231 | ''' | |
252 |
This function create a socket for publishing |
|
232 | This function create a zmq socket for publishing objects. | |
253 | ''' |
|
233 | ''' | |
254 |
|
234 | |||
255 |
time.sleep( |
|
235 | time.sleep(0.5) | |
|
236 | ||||
256 | c = zmq.Context() |
|
237 | c = zmq.Context() | |
257 | self.sender = c.socket(zmq.PUB) |
|
238 | self.sender = c.socket(zmq.PUB) | |
258 | self.sender.connect( |
|
239 | self.sender.connect( | |
@@ -260,16 +241,21 def MPDecorator(BaseClass): | |||||
260 |
|
241 | |||
261 | def publish(self, data, id): |
|
242 | def publish(self, data, id): | |
262 | ''' |
|
243 | ''' | |
263 | This function publish an object, to a specific topic. |
|
244 | This function publish an object, to an specific topic. | |
264 | For Read Units (inputId == None) adds a little delay |
|
245 | For Read Units (inputId == None) adds a little delay | |
265 | to avoid data loss |
|
246 | to avoid data loss | |
266 | ''' |
|
247 | ''' | |
267 |
|
248 | |||
268 | if self.inputId is None: |
|
249 | if self.inputId is None: | |
269 | self.i += 1 |
|
250 | self.i += 1 | |
270 | if self.i % 80 == 0: |
|
251 | if self.i % 40 == 0 and time.time()-self.t > 0.1: | |
|
252 | self.i = 0 | |||
|
253 | self.t = time.time() | |||
|
254 | time.sleep(0.05) | |||
|
255 | elif self.i % 40 == 0: | |||
271 | self.i = 0 |
|
256 | self.i = 0 | |
272 |
time. |
|
257 | self.t = time.time() | |
|
258 | time.sleep(0.01) | |||
273 |
|
259 | |||
274 | self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) |
|
260 | self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) | |
275 |
|
261 |
General Comments 0
You need to be logged in to leave comments.
Login now