##// END OF EJS Templates
Fix excessive memory RAM consumption
jespinoza -
r1268:b2726fff6520
parent child
Show More
@@ -11,7 +11,7 import traceback
11 11 import math
12 12 import time
13 13 import zmq
14 from multiprocessing import Process, Queue, Event, cpu_count
14 from multiprocessing import Process, Queue, Event, Value, cpu_count
15 15 from threading import Thread
16 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 17 from xml.dom import minidom
@@ -480,7 +480,6 class ProcUnitConf():
480 480 self.opConfObjList = []
481 481 self.procUnitObj = None
482 482 self.opObjDict = {}
483 self.mylock = Event()
484 483
485 484 def __getPriority(self):
486 485
@@ -613,7 +612,7 class ProcUnitConf():
613 612 id = self.__getNewId()
614 613 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
615 614 opConfObj = OperationConf()
616 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.mylock)
615 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.lock)
617 616 self.opConfObjList.append(opConfObj)
618 617
619 618 return opConfObj
@@ -727,7 +726,9 class ReadUnitConf(ProcUnitConf):
727 726 self.name = None
728 727 self.inputId = None
729 728 self.opConfObjList = []
730 self.mylock = Event()
729 self.lock = Event()
730 self.lock.set()
731 self.lock.n = Value('d', 0)
731 732
732 733 def getElementName(self):
733 734
@@ -773,7 +774,6 class ReadUnitConf(ProcUnitConf):
773 774 self.endTime = endTime
774 775 self.server = server
775 776 self.err_queue = err_queue
776 self.lock = self.mylock
777 777 self.addRunOperation(**kwargs)
778 778
779 779 def update(self, **kwargs):
@@ -995,7 +995,7 class Project(Process):
995 995 idProcUnit = self.__getNewId()
996 996 procUnitConfObj = ProcUnitConf()
997 997 input_proc = self.procUnitConfObjDict[inputId]
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.mylock)
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.lock)
999 999 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1000 1000
1001 1001 return procUnitConfObj
@@ -193,6 +193,7 class InputQueue(Thread):
193 193 self.project_id = project_id
194 194 self.inputId = inputId
195 195 self.lock = lock
196 self.islocked = False
196 197 self.size = 0
197 198
198 199 def run(self):
@@ -209,10 +210,17 class InputQueue(Thread):
209 210 self.queue.put(obj)
210 211
211 212 def get(self):
212 if self.size/1000000 > 2048:
213
214 if not self.islocked and self.size/1000000 > 512:
215 self.lock.n.value += 1
216 self.islocked = True
213 217 self.lock.clear()
214 else:
218 elif self.islocked and self.size/1000000 <= 512:
219 self.islocked = False
220 self.lock.n.value -= 1
221 if self.lock.n.value == 0:
215 222 self.lock.set()
223
216 224 obj = self.queue.get()
217 225 self.size -= sys.getsizeof(obj)
218 226 return pickle.loads(obj)
@@ -252,6 +260,7 def MPDecorator(BaseClass):
252 260 self.lock = args[4]
253 261 self.typeProc = args[5]
254 262 self.err_queue.put('#_start_#')
263 if self.inputId is not None:
255 264 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
256 265
257 266 def subscribe(self):
@@ -373,7 +382,11 def MPDecorator(BaseClass):
373 382 dataOut = self.listen()
374 383
375 384 if not dataOut.error:
385 try:
376 386 BaseClass.run(self, dataOut, **self.kwargs)
387 except:
388 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
389 dataOut.error = True
377 390 else:
378 391 break
379 392
General Comments 0
You need to be logged in to leave comments. Login now