##// END OF EJS Templates
Fix excessive memory RAM consumption
jespinoza -
r1268:b2726fff6520
parent child
Show More
@@ -11,7 +11,7 import traceback
11 11 import math
12 12 import time
13 13 import zmq
14 from multiprocessing import Process, Queue, Event, cpu_count
14 from multiprocessing import Process, Queue, Event, Value, cpu_count
15 15 from threading import Thread
16 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 17 from xml.dom import minidom
@@ -480,7 +480,6 class ProcUnitConf():
480 480 self.opConfObjList = []
481 481 self.procUnitObj = None
482 482 self.opObjDict = {}
483 self.mylock = Event()
484 483
485 484 def __getPriority(self):
486 485
@@ -613,7 +612,7 class ProcUnitConf():
613 612 id = self.__getNewId()
614 613 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
615 614 opConfObj = OperationConf()
616 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.mylock)
615 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.lock)
617 616 self.opConfObjList.append(opConfObj)
618 617
619 618 return opConfObj
@@ -727,7 +726,9 class ReadUnitConf(ProcUnitConf):
727 726 self.name = None
728 727 self.inputId = None
729 728 self.opConfObjList = []
730 self.mylock = Event()
729 self.lock = Event()
730 self.lock.set()
731 self.lock.n = Value('d', 0)
731 732
732 733 def getElementName(self):
733 734
@@ -772,8 +773,7 class ReadUnitConf(ProcUnitConf):
772 773 self.startTime = startTime
773 774 self.endTime = endTime
774 775 self.server = server
775 self.err_queue = err_queue
776 self.lock = self.mylock
776 self.err_queue = err_queue
777 777 self.addRunOperation(**kwargs)
778 778
779 779 def update(self, **kwargs):
@@ -802,7 +802,7 class ReadUnitConf(ProcUnitConf):
802 802
803 803 self.opConfObjList = []
804 804
805 def addRunOperation(self, **kwargs):
805 def addRunOperation(self, **kwargs):
806 806
807 807 opObj = self.addOperation(name='run', optype='self')
808 808
@@ -995,7 +995,7 class Project(Process):
995 995 idProcUnit = self.__getNewId()
996 996 procUnitConfObj = ProcUnitConf()
997 997 input_proc = self.procUnitConfObjDict[inputId]
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.mylock)
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.lock)
999 999 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1000 1000
1001 1001 return procUnitConfObj
@@ -193,6 +193,7 class InputQueue(Thread):
193 193 self.project_id = project_id
194 194 self.inputId = inputId
195 195 self.lock = lock
196 self.islocked = False
196 197 self.size = 0
197 198
198 199 def run(self):
@@ -208,16 +209,23 class InputQueue(Thread):
208 209 self.size += sys.getsizeof(obj)
209 210 self.queue.put(obj)
210 211
211 def get(self):
212 if self.size/1000000 > 2048:
212 def get(self):
213
214 if not self.islocked and self.size/1000000 > 512:
215 self.lock.n.value += 1
216 self.islocked = True
213 217 self.lock.clear()
214 else:
215 self.lock.set()
218 elif self.islocked and self.size/1000000 <= 512:
219 self.islocked = False
220 self.lock.n.value -= 1
221 if self.lock.n.value == 0:
222 self.lock.set()
223
216 224 obj = self.queue.get()
217 self.size -= sys.getsizeof(obj)
225 self.size -= sys.getsizeof(obj)
218 226 return pickle.loads(obj)
219 227
220
228
221 229 def MPDecorator(BaseClass):
222 230 """
223 231 Multiprocessing class decorator
@@ -252,7 +260,8 def MPDecorator(BaseClass):
252 260 self.lock = args[4]
253 261 self.typeProc = args[5]
254 262 self.err_queue.put('#_start_#')
255 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
263 if self.inputId is not None:
264 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
256 265
257 266 def subscribe(self):
258 267 '''
@@ -284,8 +293,8 def MPDecorator(BaseClass):
284 293 '''
285 294 This function publish an object, to an specific topic.
286 295 It blocks publishing when receiver queue is full to avoid data loss
287 '''
288
296 '''
297
289 298 if self.inputId is None:
290 299 self.lock.wait()
291 300 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
@@ -373,7 +382,11 def MPDecorator(BaseClass):
373 382 dataOut = self.listen()
374 383
375 384 if not dataOut.error:
376 BaseClass.run(self, dataOut, **self.kwargs)
385 try:
386 BaseClass.run(self, dataOut, **self.kwargs)
387 except:
388 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
389 dataOut.error = True
377 390 else:
378 391 break
379 392
General Comments 0
You need to be logged in to leave comments. Login now