From b2726fff6520fc7d7fd09c25c3ec116c3a7341b6 2019-11-21 22:23:16 From: jespinoza Date: 2019-11-21 22:23:16 Subject: [PATCH] Fix excessive memory RAM consumption --- diff --git a/schainpy/controller.py b/schainpy/controller.py index 3fcb04a..8514d8e 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -11,7 +11,7 @@ import traceback import math import time import zmq -from multiprocessing import Process, Queue, Event, cpu_count +from multiprocessing import Process, Queue, Event, Value, cpu_count from threading import Thread from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring from xml.dom import minidom @@ -480,7 +480,6 @@ class ProcUnitConf(): self.opConfObjList = [] self.procUnitObj = None self.opObjDict = {} - self.mylock = Event() def __getPriority(self): @@ -613,7 +612,7 @@ class ProcUnitConf(): id = self.__getNewId() priority = self.__getPriority() # Sin mucho sentido, pero puede usarse opConfObj = OperationConf() - opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.mylock) + opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.lock) self.opConfObjList.append(opConfObj) return opConfObj @@ -727,7 +726,9 @@ class ReadUnitConf(ProcUnitConf): self.name = None self.inputId = None self.opConfObjList = [] - self.mylock = Event() + self.lock = Event() + self.lock.set() + self.lock.n = Value('d', 0) def getElementName(self): @@ -772,8 +773,7 @@ class ReadUnitConf(ProcUnitConf): self.startTime = startTime self.endTime = endTime self.server = server - self.err_queue = err_queue - self.lock = self.mylock + self.err_queue = err_queue self.addRunOperation(**kwargs) def update(self, **kwargs): @@ -802,7 +802,7 @@ class ReadUnitConf(ProcUnitConf): self.opConfObjList = [] - def addRunOperation(self, **kwargs): + def addRunOperation(self, **kwargs): opObj = self.addOperation(name='run', optype='self') @@ -995,7 +995,7 @@ class Project(Process): idProcUnit = self.__getNewId() procUnitConfObj = ProcUnitConf() input_proc = self.procUnitConfObjDict[inputId] - procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.mylock) + procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.lock) self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj return procUnitConfObj diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index a4b4d85..75e7ffe 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -193,6 +193,7 @@ class InputQueue(Thread): self.project_id = project_id self.inputId = inputId self.lock = lock + self.islocked = False self.size = 0 def run(self): @@ -208,16 +209,23 @@ class InputQueue(Thread): self.size += sys.getsizeof(obj) self.queue.put(obj) - def get(self): - if self.size/1000000 > 2048: + def get(self): + + if not self.islocked and self.size/1000000 > 512: + self.lock.n.value += 1 + self.islocked = True self.lock.clear() - else: - self.lock.set() + elif self.islocked and self.size/1000000 <= 512: + self.islocked = False + self.lock.n.value -= 1 + if self.lock.n.value == 0: + self.lock.set() + obj = self.queue.get() - self.size -= sys.getsizeof(obj) + self.size -= sys.getsizeof(obj) return pickle.loads(obj) - + def MPDecorator(BaseClass): """ Multiprocessing class decorator @@ -252,7 +260,8 @@ def MPDecorator(BaseClass): self.lock = args[4] self.typeProc = args[5] self.err_queue.put('#_start_#') - self.queue = InputQueue(self.project_id, self.inputId, self.lock) + if self.inputId is not None: + self.queue = InputQueue(self.project_id, self.inputId, self.lock) def subscribe(self): ''' @@ -284,8 +293,8 @@ def MPDecorator(BaseClass): ''' This function publish an object, to an specific topic. It blocks publishing when receiver queue is full to avoid data loss - ''' - + ''' + if self.inputId is None: self.lock.wait() self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) @@ -373,7 +382,11 @@ def MPDecorator(BaseClass): dataOut = self.listen() if not dataOut.error: - BaseClass.run(self, dataOut, **self.kwargs) + try: + BaseClass.run(self, dataOut, **self.kwargs) + except: + self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc())) + dataOut.error = True else: break