@@ -172,32 +172,6 class Operation(object): | |||
|
172 | 172 | |
|
173 | 173 | return |
|
174 | 174 | |
|
175 | class InputQueue(Thread): | |
|
176 | ''' | |
|
177 | Class to hold input data for Proccessing Units and external Operations, | |
|
178 | ''' | |
|
179 | ||
|
180 | def __init__(self, project_id, inputId): | |
|
181 | Thread.__init__(self) | |
|
182 | self.queue = Queue() | |
|
183 | self.project_id = project_id | |
|
184 | self.inputId = inputId | |
|
185 | ||
|
186 | def run(self): | |
|
187 | ||
|
188 | c = zmq.Context() | |
|
189 | self.receiver = c.socket(zmq.SUB) | |
|
190 | self.receiver.connect( | |
|
191 | 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) | |
|
192 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) | |
|
193 | ||
|
194 | while True: | |
|
195 | self.queue.put(self.receiver.recv_multipart()[1]) | |
|
196 | ||
|
197 | def get(self): | |
|
198 | ||
|
199 | return pickle.loads(self.queue.get()) | |
|
200 | ||
|
201 | 175 | |
|
202 | 176 | def MPDecorator(BaseClass): |
|
203 | 177 | """ |
@@ -218,6 +192,7 def MPDecorator(BaseClass): | |||
|
218 | 192 | self.sender = None |
|
219 | 193 | self.receiver = None |
|
220 | 194 | self.i = 0 |
|
195 | self.t = time.time() | |
|
221 | 196 | self.name = BaseClass.__name__ |
|
222 | 197 | |
|
223 | 198 | if 'plot' in self.name.lower() and not self.name.endswith('_'): |
@@ -229,15 +204,18 def MPDecorator(BaseClass): | |||
|
229 | 204 | self.project_id = args[2] |
|
230 | 205 | self.err_queue = args[3] |
|
231 | 206 | self.typeProc = args[4] |
|
232 | self.queue = InputQueue(self.project_id, self.inputId) | |
|
233 | 207 | self.err_queue.put('#_start_#') |
|
234 | 208 | |
|
235 | 209 | def subscribe(self): |
|
236 | 210 | ''' |
|
237 | This function start the input queue. | |
|
211 | Start the zmq socket receiver and subcribe to input ID. | |
|
238 | 212 | ''' |
|
239 | 213 | |
|
240 | self.queue.start() | |
|
214 | c = zmq.Context() | |
|
215 | self.receiver = c.socket(zmq.SUB) | |
|
216 | self.receiver.connect( | |
|
217 | 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) | |
|
218 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) | |
|
241 | 219 | |
|
242 | 220 | |
|
243 | 221 | def listen(self): |
@@ -245,14 +223,17 def MPDecorator(BaseClass): | |||
|
245 | 223 | This function waits for objects |
|
246 | 224 | ''' |
|
247 | 225 | |
|
248 | return self.queue.get() | |
|
226 | data = pickle.loads(self.receiver.recv_multipart()[1]) | |
|
227 | ||
|
228 | return data | |
|
249 | 229 | |
|
250 | 230 | def set_publisher(self): |
|
251 | 231 | ''' |
|
252 |
This function create a socket for publishing |
|
|
232 | This function create a zmq socket for publishing objects. | |
|
253 | 233 | ''' |
|
254 | 234 | |
|
255 |
time.sleep( |
|
|
235 | time.sleep(0.5) | |
|
236 | ||
|
256 | 237 | c = zmq.Context() |
|
257 | 238 | self.sender = c.socket(zmq.PUB) |
|
258 | 239 | self.sender.connect( |
@@ -260,15 +241,20 def MPDecorator(BaseClass): | |||
|
260 | 241 | |
|
261 | 242 | def publish(self, data, id): |
|
262 | 243 | ''' |
|
263 | This function publish an object, to a specific topic. | |
|
244 | This function publish an object, to an specific topic. | |
|
264 | 245 | For Read Units (inputId == None) adds a little delay |
|
265 | 246 | to avoid data loss |
|
266 | 247 | ''' |
|
267 | 248 | |
|
268 | 249 | if self.inputId is None: |
|
269 | 250 | self.i += 1 |
|
270 | if self.i % 80 == 0: | |
|
251 | if self.i % 40 == 0 and time.time()-self.t > 0.1: | |
|
252 | self.i = 0 | |
|
253 | self.t = time.time() | |
|
254 | time.sleep(0.05) | |
|
255 | elif self.i % 40 == 0: | |
|
271 | 256 | self.i = 0 |
|
257 | self.t = time.time() | |
|
272 | 258 |
time.sleep(0.01) |
|
273 | 259 | |
|
274 | 260 | self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) |
General Comments 0
You need to be logged in to leave comments.
Login now