##// END OF EJS Templates
Block publishing when input queues are full to avoid data loss
jespinoza -
r1256:7ed2e4983ab9
parent child
Show More
@@ -63,7 +63,7 def getArgs(op):
63 63 def getDoc(obj):
64 64 module = locate('schainpy.model.{}'.format(obj))
65 65 try:
66 obj = module(1,2,3,Queue(),5)
66 obj = module(1,2,3,Queue(),5,6)
67 67 except:
68 68 obj = module()
69 69 return obj.__doc__
@@ -148,8 +148,7 def search(nextcommand):
148 148 if nextcommand is None:
149 149 log.error('There is no Operation/ProcessingUnit to search', '')
150 150 else:
151 #try:
152 if True:
151 try:
153 152 args = getArgs(nextcommand)
154 153 doc = getDoc(nextcommand)
155 154 if len(args) == 0:
@@ -157,11 +156,11 def search(nextcommand):
157 156 else:
158 157 log.success('{}\n{}\n\narguments:\n {}'.format(
159 158 nextcommand, doc, ', '.join(args)), '')
160 # except Exception as e:
161 # log.error('Module `{}` does not exists'.format(nextcommand), '')
162 # allModules = getAll()
163 # similar = [t[0] for t in process.extract(nextcommand, allModules, limit=12) if t[1]>80]
164 # log.success('Possible modules are: {}'.format(', '.join(similar)), '')
159 except Exception as e:
160 log.error('Module `{}` does not exists'.format(nextcommand), '')
161 allModules = getAll()
162 similar = [t[0] for t in process.extract(nextcommand, allModules, limit=12) if t[1]>80]
163 log.success('Possible modules are: {}'.format(', '.join(similar)), '')
165 164
166 165 def runschain(nextcommand):
167 166 if nextcommand is None:
@@ -11,7 +11,7 import traceback
11 11 import math
12 12 import time
13 13 import zmq
14 from multiprocessing import Process, Queue, cpu_count
14 from multiprocessing import Process, Queue, Event, cpu_count
15 15 from threading import Thread
16 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 17 from xml.dom import minidom
@@ -361,7 +361,7 class OperationConf():
361 361
362 362 return kwargs
363 363
364 def setup(self, id, name, priority, type, project_id, err_queue):
364 def setup(self, id, name, priority, type, project_id, err_queue, lock):
365 365
366 366 self.id = str(id)
367 367 self.project_id = project_id
@@ -369,6 +369,7 class OperationConf():
369 369 self.type = type
370 370 self.priority = priority
371 371 self.err_queue = err_queue
372 self.lock = lock
372 373 self.parmConfObjList = []
373 374
374 375 def removeParameters(self):
@@ -460,7 +461,7 class OperationConf():
460 461 opObj = className()
461 462 elif self.type == 'external':
462 463 kwargs = self.getKwargs()
463 opObj = className(self.id, self.id, self.project_id, self.err_queue, 'Operation', **kwargs)
464 opObj = className(self.id, self.id, self.project_id, self.err_queue, self.lock, 'Operation', **kwargs)
464 465 opObj.start()
465 466 self.opObj = opObj
466 467
@@ -479,6 +480,7 class ProcUnitConf():
479 480 self.opConfObjList = []
480 481 self.procUnitObj = None
481 482 self.opObjDict = {}
483 self.mylock = Event()
482 484
483 485 def __getPriority(self):
484 486
@@ -550,7 +552,7 class ProcUnitConf():
550 552
551 553 return self.procUnitObj
552 554
553 def setup(self, project_id, id, name, datatype, inputId, err_queue):
555 def setup(self, project_id, id, name, datatype, inputId, err_queue, lock):
554 556 '''
555 557 id sera el topico a publicar
556 558 inputId sera el topico a subscribirse
@@ -577,6 +579,7 class ProcUnitConf():
577 579 self.datatype = datatype
578 580 self.inputId = inputId
579 581 self.err_queue = err_queue
582 self.lock = lock
580 583 self.opConfObjList = []
581 584
582 585 self.addOperation(name='run', optype='self')
@@ -610,7 +613,7 class ProcUnitConf():
610 613 id = self.__getNewId()
611 614 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
612 615 opConfObj = OperationConf()
613 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue)
616 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.mylock)
614 617 self.opConfObjList.append(opConfObj)
615 618
616 619 return opConfObj
@@ -678,7 +681,7 class ProcUnitConf():
678 681
679 682 className = eval(self.name)
680 683 kwargs = self.getKwargs()
681 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, 'ProcUnit', **kwargs)
684 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs)
682 685 log.success('creating process...', self.name)
683 686
684 687 for opConfObj in self.opConfObjList:
@@ -724,6 +727,7 class ReadUnitConf(ProcUnitConf):
724 727 self.name = None
725 728 self.inputId = None
726 729 self.opConfObjList = []
730 self.mylock = Event()
727 731
728 732 def getElementName(self):
729 733
@@ -769,6 +773,7 class ReadUnitConf(ProcUnitConf):
769 773 self.endTime = endTime
770 774 self.server = server
771 775 self.err_queue = err_queue
776 self.lock = self.mylock
772 777 self.addRunOperation(**kwargs)
773 778
774 779 def update(self, **kwargs):
@@ -989,7 +994,8 class Project(Process):
989 994
990 995 idProcUnit = self.__getNewId()
991 996 procUnitConfObj = ProcUnitConf()
992 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
997 input_proc = self.procUnitConfObjDict[inputId]
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.mylock)
993 999 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
994 1000
995 1001 return procUnitConfObj
@@ -13,6 +13,7 Based on:
13 13 '''
14 14
15 15 import os
16 import sys
16 17 import inspect
17 18 import zmq
18 19 import time
@@ -181,31 +182,40 class Operation(object):
181 182
182 183 class InputQueue(Thread):
183 184
184 '''
185 Class to hold input data for Proccessing Units and external Operations,
186 '''
187
188 def __init__(self, project_id, inputId):
189
190 Thread.__init__(self)
191 self.queue = Queue()
192 self.project_id = project_id
193 self.inputId = inputId
194
195 def run(self):
196
197 c = zmq.Context()
198 self.receiver = c.socket(zmq.SUB)
199 self.receiver.connect(
200 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
201 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
202
203 while True:
204 self.queue.put(self.receiver.recv_multipart()[1])
205
206 def get(self):
185 '''
186 Class to hold input data for Proccessing Units and external Operations,
187 '''
188
189 def __init__(self, project_id, inputId, lock=None):
207 190
208 return pickle.loads(self.queue.get())
191 Thread.__init__(self)
192 self.queue = Queue()
193 self.project_id = project_id
194 self.inputId = inputId
195 self.lock = lock
196 self.size = 0
197
198 def run(self):
199
200 c = zmq.Context()
201 self.receiver = c.socket(zmq.SUB)
202 self.receiver.connect(
203 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
204 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
205
206 while True:
207 obj = self.receiver.recv_multipart()[1]
208 self.size += sys.getsizeof(obj)
209 self.queue.put(obj)
210
211 def get(self):
212 if self.size/1000000 > 2048:
213 self.lock.clear()
214 else:
215 self.lock.set()
216 obj = self.queue.get()
217 self.size -= sys.getsizeof(obj)
218 return pickle.loads(obj)
209 219
210 220
211 221 def MPDecorator(BaseClass):
@@ -239,9 +249,10 def MPDecorator(BaseClass):
239 249 self.inputId = args[1]
240 250 self.project_id = args[2]
241 251 self.err_queue = args[3]
242 self.typeProc = args[4]
252 self.lock = args[4]
253 self.typeProc = args[5]
243 254 self.err_queue.put('#_start_#')
244 self.queue = InputQueue(self.project_id, self.inputId)
255 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
245 256
246 257 def subscribe(self):
247 258 '''
@@ -272,21 +283,11 def MPDecorator(BaseClass):
272 283 def publish(self, data, id):
273 284 '''
274 285 This function publish an object, to an specific topic.
275 For Read Units (inputId == None) adds a little delay
276 to avoid data loss
286 It blocks publishing when receiver queue is full to avoid data loss
277 287 '''
278 288
279 289 if self.inputId is None:
280 self.i += 1
281 if self.i % 40 == 0 and time.time()-self.t > 0.1:
282 self.i = 0
283 self.t = time.time()
284 time.sleep(0.05)
285 elif self.i % 40 == 0:
286 self.i = 0
287 self.t = time.time()
288 time.sleep(0.01)
289
290 self.lock.wait()
290 291 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
291 292
292 293 def runReader(self):
General Comments 0
You need to be logged in to leave comments. Login now