##// END OF EJS Templates
Use of delays instead of input queue to keep dataouts and avoid loose of them
jespinoza -
r1245:1ee18bfa3eb6
parent child
Show More
@@ -172,32 +172,6 class Operation(object):
172 172
173 173 return
174 174
175 class InputQueue(Thread):
176 '''
177 Class to hold input data for Proccessing Units and external Operations,
178 '''
179
180 def __init__(self, project_id, inputId):
181 Thread.__init__(self)
182 self.queue = Queue()
183 self.project_id = project_id
184 self.inputId = inputId
185
186 def run(self):
187
188 c = zmq.Context()
189 self.receiver = c.socket(zmq.SUB)
190 self.receiver.connect(
191 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
192 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
193
194 while True:
195 self.queue.put(self.receiver.recv_multipart()[1])
196
197 def get(self):
198
199 return pickle.loads(self.queue.get())
200
201 175
202 176 def MPDecorator(BaseClass):
203 177 """
@@ -218,6 +192,7 def MPDecorator(BaseClass):
218 192 self.sender = None
219 193 self.receiver = None
220 194 self.i = 0
195 self.t = time.time()
221 196 self.name = BaseClass.__name__
222 197
223 198 if 'plot' in self.name.lower() and not self.name.endswith('_'):
@@ -229,15 +204,18 def MPDecorator(BaseClass):
229 204 self.project_id = args[2]
230 205 self.err_queue = args[3]
231 206 self.typeProc = args[4]
232 self.queue = InputQueue(self.project_id, self.inputId)
233 207 self.err_queue.put('#_start_#')
234 208
235 209 def subscribe(self):
236 210 '''
237 This function start the input queue.
211 Start the zmq socket receiver and subcribe to input ID.
238 212 '''
239 213
240 self.queue.start()
214 c = zmq.Context()
215 self.receiver = c.socket(zmq.SUB)
216 self.receiver.connect(
217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
241 219
242 220
243 221 def listen(self):
@@ -245,14 +223,17 def MPDecorator(BaseClass):
245 223 This function waits for objects
246 224 '''
247 225
248 return self.queue.get()
226 data = pickle.loads(self.receiver.recv_multipart()[1])
227
228 return data
249 229
250 230 def set_publisher(self):
251 231 '''
252 This function create a socket for publishing purposes.
232 This function create a zmq socket for publishing objects.
253 233 '''
254 234
255 time.sleep(1)
235 time.sleep(0.5)
236
256 237 c = zmq.Context()
257 238 self.sender = c.socket(zmq.PUB)
258 239 self.sender.connect(
@@ -260,15 +241,20 def MPDecorator(BaseClass):
260 241
261 242 def publish(self, data, id):
262 243 '''
263 This function publish an object, to a specific topic.
244 This function publish an object, to an specific topic.
264 245 For Read Units (inputId == None) adds a little delay
265 246 to avoid data loss
266 247 '''
267 248
268 249 if self.inputId is None:
269 250 self.i += 1
270 if self.i % 80 == 0:
251 if self.i % 40 == 0 and time.time()-self.t > 0.1:
252 self.i = 0
253 self.t = time.time()
254 time.sleep(0.05)
255 elif self.i % 40 == 0:
271 256 self.i = 0
257 self.t = time.time()
272 258 time.sleep(0.01)
273 259
274 260 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
General Comments 0
You need to be logged in to leave comments. Login now