From 7ed2e4983ab981720219d1f30cca2662cd4c396a 2019-10-03 14:45:18 From: jespinoza Date: 2019-10-03 14:45:18 Subject: [PATCH] Block publishing when input queues are full to avoid data loss --- diff --git a/schainpy/cli/cli.py b/schainpy/cli/cli.py index c1546ce..8c04d8d 100644 --- a/schainpy/cli/cli.py +++ b/schainpy/cli/cli.py @@ -63,7 +63,7 @@ def getArgs(op): def getDoc(obj): module = locate('schainpy.model.{}'.format(obj)) try: - obj = module(1,2,3,Queue(),5) + obj = module(1,2,3,Queue(),5,6) except: obj = module() return obj.__doc__ @@ -148,8 +148,7 @@ def search(nextcommand): if nextcommand is None: log.error('There is no Operation/ProcessingUnit to search', '') else: - #try: - if True: + try: args = getArgs(nextcommand) doc = getDoc(nextcommand) if len(args) == 0: @@ -157,11 +156,11 @@ def search(nextcommand): else: log.success('{}\n{}\n\narguments:\n {}'.format( nextcommand, doc, ', '.join(args)), '') - # except Exception as e: - # log.error('Module `{}` does not exists'.format(nextcommand), '') - # allModules = getAll() - # similar = [t[0] for t in process.extract(nextcommand, allModules, limit=12) if t[1]>80] - # log.success('Possible modules are: {}'.format(', '.join(similar)), '') + except Exception as e: + log.error('Module `{}` does not exists'.format(nextcommand), '') + allModules = getAll() + similar = [t[0] for t in process.extract(nextcommand, allModules, limit=12) if t[1]>80] + log.success('Possible modules are: {}'.format(', '.join(similar)), '') def runschain(nextcommand): if nextcommand is None: diff --git a/schainpy/controller.py b/schainpy/controller.py index 0a1d0d9..3fcb04a 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -11,7 +11,7 @@ import traceback import math import time import zmq -from multiprocessing import Process, Queue, cpu_count +from multiprocessing import Process, Queue, Event, cpu_count from threading import Thread from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring from xml.dom import minidom @@ -361,7 +361,7 @@ class OperationConf(): return kwargs - def setup(self, id, name, priority, type, project_id, err_queue): + def setup(self, id, name, priority, type, project_id, err_queue, lock): self.id = str(id) self.project_id = project_id @@ -369,6 +369,7 @@ class OperationConf(): self.type = type self.priority = priority self.err_queue = err_queue + self.lock = lock self.parmConfObjList = [] def removeParameters(self): @@ -460,7 +461,7 @@ class OperationConf(): opObj = className() elif self.type == 'external': kwargs = self.getKwargs() - opObj = className(self.id, self.id, self.project_id, self.err_queue, 'Operation', **kwargs) + opObj = className(self.id, self.id, self.project_id, self.err_queue, self.lock, 'Operation', **kwargs) opObj.start() self.opObj = opObj @@ -479,6 +480,7 @@ class ProcUnitConf(): self.opConfObjList = [] self.procUnitObj = None self.opObjDict = {} + self.mylock = Event() def __getPriority(self): @@ -550,7 +552,7 @@ class ProcUnitConf(): return self.procUnitObj - def setup(self, project_id, id, name, datatype, inputId, err_queue): + def setup(self, project_id, id, name, datatype, inputId, err_queue, lock): ''' id sera el topico a publicar inputId sera el topico a subscribirse @@ -577,6 +579,7 @@ class ProcUnitConf(): self.datatype = datatype self.inputId = inputId self.err_queue = err_queue + self.lock = lock self.opConfObjList = [] self.addOperation(name='run', optype='self') @@ -610,7 +613,7 @@ class ProcUnitConf(): id = self.__getNewId() priority = self.__getPriority() # Sin mucho sentido, pero puede usarse opConfObj = OperationConf() - opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue) + opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.mylock) self.opConfObjList.append(opConfObj) return opConfObj @@ -678,7 +681,7 @@ class ProcUnitConf(): className = eval(self.name) kwargs = self.getKwargs() - procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, 'ProcUnit', **kwargs) + procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs) log.success('creating process...', self.name) for opConfObj in self.opConfObjList: @@ -724,6 +727,7 @@ class ReadUnitConf(ProcUnitConf): self.name = None self.inputId = None self.opConfObjList = [] + self.mylock = Event() def getElementName(self): @@ -769,6 +773,7 @@ class ReadUnitConf(ProcUnitConf): self.endTime = endTime self.server = server self.err_queue = err_queue + self.lock = self.mylock self.addRunOperation(**kwargs) def update(self, **kwargs): @@ -989,7 +994,8 @@ class Project(Process): idProcUnit = self.__getNewId() procUnitConfObj = ProcUnitConf() - procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue) + input_proc = self.procUnitConfObjDict[inputId] + procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.mylock) self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj return procUnitConfObj diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index 156c926..a4b4d85 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -13,6 +13,7 @@ Based on: ''' import os +import sys import inspect import zmq import time @@ -181,31 +182,40 @@ class Operation(object): class InputQueue(Thread): - ''' - Class to hold input data for Proccessing Units and external Operations, - ''' - - def __init__(self, project_id, inputId): - - Thread.__init__(self) - self.queue = Queue() - self.project_id = project_id - self.inputId = inputId - - def run(self): - - c = zmq.Context() - self.receiver = c.socket(zmq.SUB) - self.receiver.connect( - 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) - self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) - - while True: - self.queue.put(self.receiver.recv_multipart()[1]) - - def get(self): + ''' + Class to hold input data for Proccessing Units and external Operations, + ''' + + def __init__(self, project_id, inputId, lock=None): - return pickle.loads(self.queue.get()) + Thread.__init__(self) + self.queue = Queue() + self.project_id = project_id + self.inputId = inputId + self.lock = lock + self.size = 0 + + def run(self): + + c = zmq.Context() + self.receiver = c.socket(zmq.SUB) + self.receiver.connect( + 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) + self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) + + while True: + obj = self.receiver.recv_multipart()[1] + self.size += sys.getsizeof(obj) + self.queue.put(obj) + + def get(self): + if self.size/1000000 > 2048: + self.lock.clear() + else: + self.lock.set() + obj = self.queue.get() + self.size -= sys.getsizeof(obj) + return pickle.loads(obj) def MPDecorator(BaseClass): @@ -239,9 +249,10 @@ def MPDecorator(BaseClass): self.inputId = args[1] self.project_id = args[2] self.err_queue = args[3] - self.typeProc = args[4] + self.lock = args[4] + self.typeProc = args[5] self.err_queue.put('#_start_#') - self.queue = InputQueue(self.project_id, self.inputId) + self.queue = InputQueue(self.project_id, self.inputId, self.lock) def subscribe(self): ''' @@ -272,21 +283,11 @@ def MPDecorator(BaseClass): def publish(self, data, id): ''' This function publish an object, to an specific topic. - For Read Units (inputId == None) adds a little delay - to avoid data loss + It blocks publishing when receiver queue is full to avoid data loss ''' if self.inputId is None: - self.i += 1 - if self.i % 40 == 0 and time.time()-self.t > 0.1: - self.i = 0 - self.t = time.time() - time.sleep(0.05) - elif self.i % 40 == 0: - self.i = 0 - self.t = time.time() - time.sleep(0.01) - + self.lock.wait() self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) def runReader(self):