##// END OF EJS Templates
Fix excessive memory RAM consumption
jespinoza -
r1268:b2726fff6520
parent child
Show More
@@ -11,7 +11,7 import traceback
11 import math
11 import math
12 import time
12 import time
13 import zmq
13 import zmq
14 from multiprocessing import Process, Queue, Event, cpu_count
14 from multiprocessing import Process, Queue, Event, Value, cpu_count
15 from threading import Thread
15 from threading import Thread
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 from xml.dom import minidom
17 from xml.dom import minidom
@@ -480,7 +480,6 class ProcUnitConf():
480 self.opConfObjList = []
480 self.opConfObjList = []
481 self.procUnitObj = None
481 self.procUnitObj = None
482 self.opObjDict = {}
482 self.opObjDict = {}
483 self.mylock = Event()
484
483
485 def __getPriority(self):
484 def __getPriority(self):
486
485
@@ -613,7 +612,7 class ProcUnitConf():
613 id = self.__getNewId()
612 id = self.__getNewId()
614 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
613 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
615 opConfObj = OperationConf()
614 opConfObj = OperationConf()
616 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.mylock)
615 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.lock)
617 self.opConfObjList.append(opConfObj)
616 self.opConfObjList.append(opConfObj)
618
617
619 return opConfObj
618 return opConfObj
@@ -727,7 +726,9 class ReadUnitConf(ProcUnitConf):
727 self.name = None
726 self.name = None
728 self.inputId = None
727 self.inputId = None
729 self.opConfObjList = []
728 self.opConfObjList = []
730 self.mylock = Event()
729 self.lock = Event()
730 self.lock.set()
731 self.lock.n = Value('d', 0)
731
732
732 def getElementName(self):
733 def getElementName(self):
733
734
@@ -772,8 +773,7 class ReadUnitConf(ProcUnitConf):
772 self.startTime = startTime
773 self.startTime = startTime
773 self.endTime = endTime
774 self.endTime = endTime
774 self.server = server
775 self.server = server
775 self.err_queue = err_queue
776 self.err_queue = err_queue
776 self.lock = self.mylock
777 self.addRunOperation(**kwargs)
777 self.addRunOperation(**kwargs)
778
778
779 def update(self, **kwargs):
779 def update(self, **kwargs):
@@ -802,7 +802,7 class ReadUnitConf(ProcUnitConf):
802
802
803 self.opConfObjList = []
803 self.opConfObjList = []
804
804
805 def addRunOperation(self, **kwargs):
805 def addRunOperation(self, **kwargs):
806
806
807 opObj = self.addOperation(name='run', optype='self')
807 opObj = self.addOperation(name='run', optype='self')
808
808
@@ -995,7 +995,7 class Project(Process):
995 idProcUnit = self.__getNewId()
995 idProcUnit = self.__getNewId()
996 procUnitConfObj = ProcUnitConf()
996 procUnitConfObj = ProcUnitConf()
997 input_proc = self.procUnitConfObjDict[inputId]
997 input_proc = self.procUnitConfObjDict[inputId]
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.mylock)
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.lock)
999 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
999 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1000
1000
1001 return procUnitConfObj
1001 return procUnitConfObj
@@ -193,6 +193,7 class InputQueue(Thread):
193 self.project_id = project_id
193 self.project_id = project_id
194 self.inputId = inputId
194 self.inputId = inputId
195 self.lock = lock
195 self.lock = lock
196 self.islocked = False
196 self.size = 0
197 self.size = 0
197
198
198 def run(self):
199 def run(self):
@@ -208,16 +209,23 class InputQueue(Thread):
208 self.size += sys.getsizeof(obj)
209 self.size += sys.getsizeof(obj)
209 self.queue.put(obj)
210 self.queue.put(obj)
210
211
211 def get(self):
212 def get(self):
212 if self.size/1000000 > 2048:
213
214 if not self.islocked and self.size/1000000 > 512:
215 self.lock.n.value += 1
216 self.islocked = True
213 self.lock.clear()
217 self.lock.clear()
214 else:
218 elif self.islocked and self.size/1000000 <= 512:
215 self.lock.set()
219 self.islocked = False
220 self.lock.n.value -= 1
221 if self.lock.n.value == 0:
222 self.lock.set()
223
216 obj = self.queue.get()
224 obj = self.queue.get()
217 self.size -= sys.getsizeof(obj)
225 self.size -= sys.getsizeof(obj)
218 return pickle.loads(obj)
226 return pickle.loads(obj)
219
227
220
228
221 def MPDecorator(BaseClass):
229 def MPDecorator(BaseClass):
222 """
230 """
223 Multiprocessing class decorator
231 Multiprocessing class decorator
@@ -252,7 +260,8 def MPDecorator(BaseClass):
252 self.lock = args[4]
260 self.lock = args[4]
253 self.typeProc = args[5]
261 self.typeProc = args[5]
254 self.err_queue.put('#_start_#')
262 self.err_queue.put('#_start_#')
255 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
263 if self.inputId is not None:
264 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
256
265
257 def subscribe(self):
266 def subscribe(self):
258 '''
267 '''
@@ -284,8 +293,8 def MPDecorator(BaseClass):
284 '''
293 '''
285 This function publish an object, to an specific topic.
294 This function publish an object, to an specific topic.
286 It blocks publishing when receiver queue is full to avoid data loss
295 It blocks publishing when receiver queue is full to avoid data loss
287 '''
296 '''
288
297
289 if self.inputId is None:
298 if self.inputId is None:
290 self.lock.wait()
299 self.lock.wait()
291 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
300 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
@@ -373,7 +382,11 def MPDecorator(BaseClass):
373 dataOut = self.listen()
382 dataOut = self.listen()
374
383
375 if not dataOut.error:
384 if not dataOut.error:
376 BaseClass.run(self, dataOut, **self.kwargs)
385 try:
386 BaseClass.run(self, dataOut, **self.kwargs)
387 except:
388 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
389 dataOut.error = True
377 else:
390 else:
378 break
391 break
379
392
General Comments 0
You need to be logged in to leave comments. Login now