##// END OF EJS Templates
Use of delays instead of input queue to keep dataouts and avoid loose of them
jespinoza -
r1245:1ee18bfa3eb6
parent child
Show More
@@ -172,32 +172,6 class Operation(object):
172
172
173 return
173 return
174
174
175 class InputQueue(Thread):
176 '''
177 Class to hold input data for Proccessing Units and external Operations,
178 '''
179
180 def __init__(self, project_id, inputId):
181 Thread.__init__(self)
182 self.queue = Queue()
183 self.project_id = project_id
184 self.inputId = inputId
185
186 def run(self):
187
188 c = zmq.Context()
189 self.receiver = c.socket(zmq.SUB)
190 self.receiver.connect(
191 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
192 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
193
194 while True:
195 self.queue.put(self.receiver.recv_multipart()[1])
196
197 def get(self):
198
199 return pickle.loads(self.queue.get())
200
201
175
202 def MPDecorator(BaseClass):
176 def MPDecorator(BaseClass):
203 """
177 """
@@ -218,6 +192,7 def MPDecorator(BaseClass):
218 self.sender = None
192 self.sender = None
219 self.receiver = None
193 self.receiver = None
220 self.i = 0
194 self.i = 0
195 self.t = time.time()
221 self.name = BaseClass.__name__
196 self.name = BaseClass.__name__
222
197
223 if 'plot' in self.name.lower() and not self.name.endswith('_'):
198 if 'plot' in self.name.lower() and not self.name.endswith('_'):
@@ -228,31 +203,37 def MPDecorator(BaseClass):
228 self.inputId = args[1]
203 self.inputId = args[1]
229 self.project_id = args[2]
204 self.project_id = args[2]
230 self.err_queue = args[3]
205 self.err_queue = args[3]
231 self.typeProc = args[4]
206 self.typeProc = args[4]
232 self.queue = InputQueue(self.project_id, self.inputId)
233 self.err_queue.put('#_start_#')
207 self.err_queue.put('#_start_#')
234
208
235 def subscribe(self):
209 def subscribe(self):
236 '''
210 '''
237 This function start the input queue.
211 Start the zmq socket receiver and subcribe to input ID.
238 '''
212 '''
239
213
240 self.queue.start()
214 c = zmq.Context()
215 self.receiver = c.socket(zmq.SUB)
216 self.receiver.connect(
217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
241
219
242
220
243 def listen(self):
221 def listen(self):
244 '''
222 '''
245 This function waits for objects
223 This function waits for objects
246 '''
224 '''
247
225
248 return self.queue.get()
226 data = pickle.loads(self.receiver.recv_multipart()[1])
227
228 return data
249
229
250 def set_publisher(self):
230 def set_publisher(self):
251 '''
231 '''
252 This function create a socket for publishing purposes.
232 This function create a zmq socket for publishing objects.
253 '''
233 '''
254
234
255 time.sleep(1)
235 time.sleep(0.5)
236
256 c = zmq.Context()
237 c = zmq.Context()
257 self.sender = c.socket(zmq.PUB)
238 self.sender = c.socket(zmq.PUB)
258 self.sender.connect(
239 self.sender.connect(
@@ -260,16 +241,21 def MPDecorator(BaseClass):
260
241
261 def publish(self, data, id):
242 def publish(self, data, id):
262 '''
243 '''
263 This function publish an object, to a specific topic.
244 This function publish an object, to an specific topic.
264 For Read Units (inputId == None) adds a little delay
245 For Read Units (inputId == None) adds a little delay
265 to avoid data loss
246 to avoid data loss
266 '''
247 '''
267
248
268 if self.inputId is None:
249 if self.inputId is None:
269 self.i += 1
250 self.i += 1
270 if self.i % 80 == 0:
251 if self.i % 40 == 0 and time.time()-self.t > 0.1:
252 self.i = 0
253 self.t = time.time()
254 time.sleep(0.05)
255 elif self.i % 40 == 0:
271 self.i = 0
256 self.i = 0
272 time.sleep(0.01)
257 self.t = time.time()
258 time.sleep(0.01)
273
259
274 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
260 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
275
261
General Comments 0
You need to be logged in to leave comments. Login now