@@ -11,7 +11,7 import traceback | |||||
11 | import math |
|
11 | import math | |
12 | import time |
|
12 | import time | |
13 | import zmq |
|
13 | import zmq | |
14 | from multiprocessing import Process, Queue, Event, cpu_count |
|
14 | from multiprocessing import Process, Queue, Event, Value, cpu_count | |
15 | from threading import Thread |
|
15 | from threading import Thread | |
16 | from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring |
|
16 | from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring | |
17 | from xml.dom import minidom |
|
17 | from xml.dom import minidom | |
@@ -480,7 +480,6 class ProcUnitConf(): | |||||
480 | self.opConfObjList = [] |
|
480 | self.opConfObjList = [] | |
481 | self.procUnitObj = None |
|
481 | self.procUnitObj = None | |
482 | self.opObjDict = {} |
|
482 | self.opObjDict = {} | |
483 | self.mylock = Event() |
|
|||
484 |
|
483 | |||
485 | def __getPriority(self): |
|
484 | def __getPriority(self): | |
486 |
|
485 | |||
@@ -613,7 +612,7 class ProcUnitConf(): | |||||
613 | id = self.__getNewId() |
|
612 | id = self.__getNewId() | |
614 | priority = self.__getPriority() # Sin mucho sentido, pero puede usarse |
|
613 | priority = self.__getPriority() # Sin mucho sentido, pero puede usarse | |
615 | opConfObj = OperationConf() |
|
614 | opConfObj = OperationConf() | |
616 |
opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self. |
|
615 | opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.lock) | |
617 | self.opConfObjList.append(opConfObj) |
|
616 | self.opConfObjList.append(opConfObj) | |
618 |
|
617 | |||
619 | return opConfObj |
|
618 | return opConfObj | |
@@ -727,7 +726,9 class ReadUnitConf(ProcUnitConf): | |||||
727 | self.name = None |
|
726 | self.name = None | |
728 | self.inputId = None |
|
727 | self.inputId = None | |
729 | self.opConfObjList = [] |
|
728 | self.opConfObjList = [] | |
730 |
self. |
|
729 | self.lock = Event() | |
|
730 | self.lock.set() | |||
|
731 | self.lock.n = Value('d', 0) | |||
731 |
|
732 | |||
732 | def getElementName(self): |
|
733 | def getElementName(self): | |
733 |
|
734 | |||
@@ -773,7 +774,6 class ReadUnitConf(ProcUnitConf): | |||||
773 | self.endTime = endTime |
|
774 | self.endTime = endTime | |
774 | self.server = server |
|
775 | self.server = server | |
775 | self.err_queue = err_queue |
|
776 | self.err_queue = err_queue | |
776 | self.lock = self.mylock |
|
|||
777 | self.addRunOperation(**kwargs) |
|
777 | self.addRunOperation(**kwargs) | |
778 |
|
778 | |||
779 | def update(self, **kwargs): |
|
779 | def update(self, **kwargs): | |
@@ -995,7 +995,7 class Project(Process): | |||||
995 | idProcUnit = self.__getNewId() |
|
995 | idProcUnit = self.__getNewId() | |
996 | procUnitConfObj = ProcUnitConf() |
|
996 | procUnitConfObj = ProcUnitConf() | |
997 | input_proc = self.procUnitConfObjDict[inputId] |
|
997 | input_proc = self.procUnitConfObjDict[inputId] | |
998 |
procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc. |
|
998 | procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.lock) | |
999 | self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj |
|
999 | self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj | |
1000 |
|
1000 | |||
1001 | return procUnitConfObj |
|
1001 | return procUnitConfObj |
@@ -193,6 +193,7 class InputQueue(Thread): | |||||
193 | self.project_id = project_id |
|
193 | self.project_id = project_id | |
194 | self.inputId = inputId |
|
194 | self.inputId = inputId | |
195 | self.lock = lock |
|
195 | self.lock = lock | |
|
196 | self.islocked = False | |||
196 | self.size = 0 |
|
197 | self.size = 0 | |
197 |
|
198 | |||
198 | def run(self): |
|
199 | def run(self): | |
@@ -209,10 +210,17 class InputQueue(Thread): | |||||
209 | self.queue.put(obj) |
|
210 | self.queue.put(obj) | |
210 |
|
211 | |||
211 |
def get(self): |
|
212 | def get(self): | |
212 | if self.size/1000000 > 2048: |
|
213 | ||
|
214 | if not self.islocked and self.size/1000000 > 512: | |||
|
215 | self.lock.n.value += 1 | |||
|
216 | self.islocked = True | |||
213 | self.lock.clear() |
|
217 | self.lock.clear() | |
214 | else: |
|
218 | elif self.islocked and self.size/1000000 <= 512: | |
|
219 | self.islocked = False | |||
|
220 | self.lock.n.value -= 1 | |||
|
221 | if self.lock.n.value == 0: | |||
215 | self.lock.set() |
|
222 | self.lock.set() | |
|
223 | ||||
216 | obj = self.queue.get() |
|
224 | obj = self.queue.get() | |
217 |
self.size -= sys.getsizeof(obj) |
|
225 | self.size -= sys.getsizeof(obj) | |
218 | return pickle.loads(obj) |
|
226 | return pickle.loads(obj) | |
@@ -252,6 +260,7 def MPDecorator(BaseClass): | |||||
252 | self.lock = args[4] |
|
260 | self.lock = args[4] | |
253 | self.typeProc = args[5] |
|
261 | self.typeProc = args[5] | |
254 | self.err_queue.put('#_start_#') |
|
262 | self.err_queue.put('#_start_#') | |
|
263 | if self.inputId is not None: | |||
255 | self.queue = InputQueue(self.project_id, self.inputId, self.lock) |
|
264 | self.queue = InputQueue(self.project_id, self.inputId, self.lock) | |
256 |
|
265 | |||
257 | def subscribe(self): |
|
266 | def subscribe(self): | |
@@ -373,7 +382,11 def MPDecorator(BaseClass): | |||||
373 | dataOut = self.listen() |
|
382 | dataOut = self.listen() | |
374 |
|
383 | |||
375 | if not dataOut.error: |
|
384 | if not dataOut.error: | |
|
385 | try: | |||
376 | BaseClass.run(self, dataOut, **self.kwargs) |
|
386 | BaseClass.run(self, dataOut, **self.kwargs) | |
|
387 | except: | |||
|
388 | self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc())) | |||
|
389 | dataOut.error = True | |||
377 | else: |
|
390 | else: | |
378 | break |
|
391 | break | |
379 |
|
392 |
General Comments 0
You need to be logged in to leave comments.
Login now