##// END OF EJS Templates
Block publishing when input queues are full to avoid data loss
jespinoza -
r1256:7ed2e4983ab9
parent child
Show More
@@ -63,7 +63,7 def getArgs(op):
63 def getDoc(obj):
63 def getDoc(obj):
64 module = locate('schainpy.model.{}'.format(obj))
64 module = locate('schainpy.model.{}'.format(obj))
65 try:
65 try:
66 obj = module(1,2,3,Queue(),5)
66 obj = module(1,2,3,Queue(),5,6)
67 except:
67 except:
68 obj = module()
68 obj = module()
69 return obj.__doc__
69 return obj.__doc__
@@ -148,8 +148,7 def search(nextcommand):
148 if nextcommand is None:
148 if nextcommand is None:
149 log.error('There is no Operation/ProcessingUnit to search', '')
149 log.error('There is no Operation/ProcessingUnit to search', '')
150 else:
150 else:
151 #try:
151 try:
152 if True:
153 args = getArgs(nextcommand)
152 args = getArgs(nextcommand)
154 doc = getDoc(nextcommand)
153 doc = getDoc(nextcommand)
155 if len(args) == 0:
154 if len(args) == 0:
@@ -157,11 +156,11 def search(nextcommand):
157 else:
156 else:
158 log.success('{}\n{}\n\narguments:\n {}'.format(
157 log.success('{}\n{}\n\narguments:\n {}'.format(
159 nextcommand, doc, ', '.join(args)), '')
158 nextcommand, doc, ', '.join(args)), '')
160 # except Exception as e:
159 except Exception as e:
161 # log.error('Module `{}` does not exists'.format(nextcommand), '')
160 log.error('Module `{}` does not exists'.format(nextcommand), '')
162 # allModules = getAll()
161 allModules = getAll()
163 # similar = [t[0] for t in process.extract(nextcommand, allModules, limit=12) if t[1]>80]
162 similar = [t[0] for t in process.extract(nextcommand, allModules, limit=12) if t[1]>80]
164 # log.success('Possible modules are: {}'.format(', '.join(similar)), '')
163 log.success('Possible modules are: {}'.format(', '.join(similar)), '')
165
164
166 def runschain(nextcommand):
165 def runschain(nextcommand):
167 if nextcommand is None:
166 if nextcommand is None:
@@ -11,7 +11,7 import traceback
11 import math
11 import math
12 import time
12 import time
13 import zmq
13 import zmq
14 from multiprocessing import Process, Queue, cpu_count
14 from multiprocessing import Process, Queue, Event, cpu_count
15 from threading import Thread
15 from threading import Thread
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 from xml.dom import minidom
17 from xml.dom import minidom
@@ -361,7 +361,7 class OperationConf():
361
361
362 return kwargs
362 return kwargs
363
363
364 def setup(self, id, name, priority, type, project_id, err_queue):
364 def setup(self, id, name, priority, type, project_id, err_queue, lock):
365
365
366 self.id = str(id)
366 self.id = str(id)
367 self.project_id = project_id
367 self.project_id = project_id
@@ -369,6 +369,7 class OperationConf():
369 self.type = type
369 self.type = type
370 self.priority = priority
370 self.priority = priority
371 self.err_queue = err_queue
371 self.err_queue = err_queue
372 self.lock = lock
372 self.parmConfObjList = []
373 self.parmConfObjList = []
373
374
374 def removeParameters(self):
375 def removeParameters(self):
@@ -460,7 +461,7 class OperationConf():
460 opObj = className()
461 opObj = className()
461 elif self.type == 'external':
462 elif self.type == 'external':
462 kwargs = self.getKwargs()
463 kwargs = self.getKwargs()
463 opObj = className(self.id, self.id, self.project_id, self.err_queue, 'Operation', **kwargs)
464 opObj = className(self.id, self.id, self.project_id, self.err_queue, self.lock, 'Operation', **kwargs)
464 opObj.start()
465 opObj.start()
465 self.opObj = opObj
466 self.opObj = opObj
466
467
@@ -479,6 +480,7 class ProcUnitConf():
479 self.opConfObjList = []
480 self.opConfObjList = []
480 self.procUnitObj = None
481 self.procUnitObj = None
481 self.opObjDict = {}
482 self.opObjDict = {}
483 self.mylock = Event()
482
484
483 def __getPriority(self):
485 def __getPriority(self):
484
486
@@ -550,7 +552,7 class ProcUnitConf():
550
552
551 return self.procUnitObj
553 return self.procUnitObj
552
554
553 def setup(self, project_id, id, name, datatype, inputId, err_queue):
555 def setup(self, project_id, id, name, datatype, inputId, err_queue, lock):
554 '''
556 '''
555 id sera el topico a publicar
557 id sera el topico a publicar
556 inputId sera el topico a subscribirse
558 inputId sera el topico a subscribirse
@@ -577,6 +579,7 class ProcUnitConf():
577 self.datatype = datatype
579 self.datatype = datatype
578 self.inputId = inputId
580 self.inputId = inputId
579 self.err_queue = err_queue
581 self.err_queue = err_queue
582 self.lock = lock
580 self.opConfObjList = []
583 self.opConfObjList = []
581
584
582 self.addOperation(name='run', optype='self')
585 self.addOperation(name='run', optype='self')
@@ -610,7 +613,7 class ProcUnitConf():
610 id = self.__getNewId()
613 id = self.__getNewId()
611 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
614 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
612 opConfObj = OperationConf()
615 opConfObj = OperationConf()
613 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue)
616 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.mylock)
614 self.opConfObjList.append(opConfObj)
617 self.opConfObjList.append(opConfObj)
615
618
616 return opConfObj
619 return opConfObj
@@ -678,7 +681,7 class ProcUnitConf():
678
681
679 className = eval(self.name)
682 className = eval(self.name)
680 kwargs = self.getKwargs()
683 kwargs = self.getKwargs()
681 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, 'ProcUnit', **kwargs)
684 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs)
682 log.success('creating process...', self.name)
685 log.success('creating process...', self.name)
683
686
684 for opConfObj in self.opConfObjList:
687 for opConfObj in self.opConfObjList:
@@ -724,6 +727,7 class ReadUnitConf(ProcUnitConf):
724 self.name = None
727 self.name = None
725 self.inputId = None
728 self.inputId = None
726 self.opConfObjList = []
729 self.opConfObjList = []
730 self.mylock = Event()
727
731
728 def getElementName(self):
732 def getElementName(self):
729
733
@@ -769,6 +773,7 class ReadUnitConf(ProcUnitConf):
769 self.endTime = endTime
773 self.endTime = endTime
770 self.server = server
774 self.server = server
771 self.err_queue = err_queue
775 self.err_queue = err_queue
776 self.lock = self.mylock
772 self.addRunOperation(**kwargs)
777 self.addRunOperation(**kwargs)
773
778
774 def update(self, **kwargs):
779 def update(self, **kwargs):
@@ -989,7 +994,8 class Project(Process):
989
994
990 idProcUnit = self.__getNewId()
995 idProcUnit = self.__getNewId()
991 procUnitConfObj = ProcUnitConf()
996 procUnitConfObj = ProcUnitConf()
992 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
997 input_proc = self.procUnitConfObjDict[inputId]
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.mylock)
993 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
999 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
994
1000
995 return procUnitConfObj
1001 return procUnitConfObj
@@ -13,6 +13,7 Based on:
13 '''
13 '''
14
14
15 import os
15 import os
16 import sys
16 import inspect
17 import inspect
17 import zmq
18 import zmq
18 import time
19 import time
@@ -181,31 +182,40 class Operation(object):
181
182
182 class InputQueue(Thread):
183 class InputQueue(Thread):
183
184
184 '''
185 '''
185 Class to hold input data for Proccessing Units and external Operations,
186 Class to hold input data for Proccessing Units and external Operations,
186 '''
187 '''
187
188
188 def __init__(self, project_id, inputId):
189 def __init__(self, project_id, inputId, lock=None):
189
190 Thread.__init__(self)
191 self.queue = Queue()
192 self.project_id = project_id
193 self.inputId = inputId
194
195 def run(self):
196
197 c = zmq.Context()
198 self.receiver = c.socket(zmq.SUB)
199 self.receiver.connect(
200 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
201 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
202
203 while True:
204 self.queue.put(self.receiver.recv_multipart()[1])
205
206 def get(self):
207
190
208 return pickle.loads(self.queue.get())
191 Thread.__init__(self)
192 self.queue = Queue()
193 self.project_id = project_id
194 self.inputId = inputId
195 self.lock = lock
196 self.size = 0
197
198 def run(self):
199
200 c = zmq.Context()
201 self.receiver = c.socket(zmq.SUB)
202 self.receiver.connect(
203 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
204 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
205
206 while True:
207 obj = self.receiver.recv_multipart()[1]
208 self.size += sys.getsizeof(obj)
209 self.queue.put(obj)
210
211 def get(self):
212 if self.size/1000000 > 2048:
213 self.lock.clear()
214 else:
215 self.lock.set()
216 obj = self.queue.get()
217 self.size -= sys.getsizeof(obj)
218 return pickle.loads(obj)
209
219
210
220
211 def MPDecorator(BaseClass):
221 def MPDecorator(BaseClass):
@@ -239,9 +249,10 def MPDecorator(BaseClass):
239 self.inputId = args[1]
249 self.inputId = args[1]
240 self.project_id = args[2]
250 self.project_id = args[2]
241 self.err_queue = args[3]
251 self.err_queue = args[3]
242 self.typeProc = args[4]
252 self.lock = args[4]
253 self.typeProc = args[5]
243 self.err_queue.put('#_start_#')
254 self.err_queue.put('#_start_#')
244 self.queue = InputQueue(self.project_id, self.inputId)
255 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
245
256
246 def subscribe(self):
257 def subscribe(self):
247 '''
258 '''
@@ -272,21 +283,11 def MPDecorator(BaseClass):
272 def publish(self, data, id):
283 def publish(self, data, id):
273 '''
284 '''
274 This function publish an object, to an specific topic.
285 This function publish an object, to an specific topic.
275 For Read Units (inputId == None) adds a little delay
286 It blocks publishing when receiver queue is full to avoid data loss
276 to avoid data loss
277 '''
287 '''
278
288
279 if self.inputId is None:
289 if self.inputId is None:
280 self.i += 1
290 self.lock.wait()
281 if self.i % 40 == 0 and time.time()-self.t > 0.1:
282 self.i = 0
283 self.t = time.time()
284 time.sleep(0.05)
285 elif self.i % 40 == 0:
286 self.i = 0
287 self.t = time.time()
288 time.sleep(0.01)
289
290 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
291 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
291
292
292 def runReader(self):
293 def runReader(self):
General Comments 0
You need to be logged in to leave comments. Login now