@@ -11,7 +11,7 import traceback | |||
|
11 | 11 | import math |
|
12 | 12 | import time |
|
13 | 13 | import zmq |
|
14 | from multiprocessing import Process, Queue, Event, cpu_count | |
|
14 | from multiprocessing import Process, Queue, Event, Value, cpu_count | |
|
15 | 15 | from threading import Thread |
|
16 | 16 | from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring |
|
17 | 17 | from xml.dom import minidom |
@@ -480,7 +480,6 class ProcUnitConf(): | |||
|
480 | 480 | self.opConfObjList = [] |
|
481 | 481 | self.procUnitObj = None |
|
482 | 482 | self.opObjDict = {} |
|
483 | self.mylock = Event() | |
|
484 | 483 | |
|
485 | 484 | def __getPriority(self): |
|
486 | 485 | |
@@ -613,7 +612,7 class ProcUnitConf(): | |||
|
613 | 612 | id = self.__getNewId() |
|
614 | 613 | priority = self.__getPriority() # Sin mucho sentido, pero puede usarse |
|
615 | 614 | opConfObj = OperationConf() |
|
616 |
opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self. |
|
|
615 | opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.lock) | |
|
617 | 616 | self.opConfObjList.append(opConfObj) |
|
618 | 617 | |
|
619 | 618 | return opConfObj |
@@ -727,7 +726,9 class ReadUnitConf(ProcUnitConf): | |||
|
727 | 726 | self.name = None |
|
728 | 727 | self.inputId = None |
|
729 | 728 | self.opConfObjList = [] |
|
730 |
self. |
|
|
729 | self.lock = Event() | |
|
730 | self.lock.set() | |
|
731 | self.lock.n = Value('d', 0) | |
|
731 | 732 | |
|
732 | 733 | def getElementName(self): |
|
733 | 734 | |
@@ -772,8 +773,7 class ReadUnitConf(ProcUnitConf): | |||
|
772 | 773 | self.startTime = startTime |
|
773 | 774 | self.endTime = endTime |
|
774 | 775 | self.server = server |
|
775 | self.err_queue = err_queue | |
|
776 | self.lock = self.mylock | |
|
776 | self.err_queue = err_queue | |
|
777 | 777 | self.addRunOperation(**kwargs) |
|
778 | 778 | |
|
779 | 779 | def update(self, **kwargs): |
@@ -802,7 +802,7 class ReadUnitConf(ProcUnitConf): | |||
|
802 | 802 | |
|
803 | 803 | self.opConfObjList = [] |
|
804 | 804 | |
|
805 |
def addRunOperation(self, **kwargs): |
|
|
805 | def addRunOperation(self, **kwargs): | |
|
806 | 806 | |
|
807 | 807 | opObj = self.addOperation(name='run', optype='self') |
|
808 | 808 | |
@@ -995,7 +995,7 class Project(Process): | |||
|
995 | 995 | idProcUnit = self.__getNewId() |
|
996 | 996 | procUnitConfObj = ProcUnitConf() |
|
997 | 997 | input_proc = self.procUnitConfObjDict[inputId] |
|
998 |
procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc. |
|
|
998 | procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.lock) | |
|
999 | 999 | self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj |
|
1000 | 1000 | |
|
1001 | 1001 | return procUnitConfObj |
@@ -193,6 +193,7 class InputQueue(Thread): | |||
|
193 | 193 | self.project_id = project_id |
|
194 | 194 | self.inputId = inputId |
|
195 | 195 | self.lock = lock |
|
196 | self.islocked = False | |
|
196 | 197 | self.size = 0 |
|
197 | 198 | |
|
198 | 199 | def run(self): |
@@ -208,16 +209,23 class InputQueue(Thread): | |||
|
208 | 209 | self.size += sys.getsizeof(obj) |
|
209 | 210 | self.queue.put(obj) |
|
210 | 211 | |
|
211 |
def get(self): |
|
|
212 | if self.size/1000000 > 2048: | |
|
212 | def get(self): | |
|
213 | ||
|
214 | if not self.islocked and self.size/1000000 > 512: | |
|
215 | self.lock.n.value += 1 | |
|
216 | self.islocked = True | |
|
213 | 217 | self.lock.clear() |
|
214 | else: | |
|
215 |
self. |
|
|
218 | elif self.islocked and self.size/1000000 <= 512: | |
|
219 | self.islocked = False | |
|
220 | self.lock.n.value -= 1 | |
|
221 | if self.lock.n.value == 0: | |
|
222 | self.lock.set() | |
|
223 | ||
|
216 | 224 | obj = self.queue.get() |
|
217 |
self.size -= sys.getsizeof(obj) |
|
|
225 | self.size -= sys.getsizeof(obj) | |
|
218 | 226 | return pickle.loads(obj) |
|
219 | 227 | |
|
220 | ||
|
228 | ||
|
221 | 229 | def MPDecorator(BaseClass): |
|
222 | 230 | """ |
|
223 | 231 | Multiprocessing class decorator |
@@ -252,7 +260,8 def MPDecorator(BaseClass): | |||
|
252 | 260 | self.lock = args[4] |
|
253 | 261 | self.typeProc = args[5] |
|
254 | 262 | self.err_queue.put('#_start_#') |
|
255 | self.queue = InputQueue(self.project_id, self.inputId, self.lock) | |
|
263 | if self.inputId is not None: | |
|
264 | self.queue = InputQueue(self.project_id, self.inputId, self.lock) | |
|
256 | 265 | |
|
257 | 266 | def subscribe(self): |
|
258 | 267 | ''' |
@@ -284,8 +293,8 def MPDecorator(BaseClass): | |||
|
284 | 293 | ''' |
|
285 | 294 | This function publish an object, to an specific topic. |
|
286 | 295 | It blocks publishing when receiver queue is full to avoid data loss |
|
287 | ''' | |
|
288 | ||
|
296 | ''' | |
|
297 | ||
|
289 | 298 | if self.inputId is None: |
|
290 | 299 | self.lock.wait() |
|
291 | 300 | self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) |
@@ -373,7 +382,11 def MPDecorator(BaseClass): | |||
|
373 | 382 | dataOut = self.listen() |
|
374 | 383 | |
|
375 | 384 | if not dataOut.error: |
|
376 | BaseClass.run(self, dataOut, **self.kwargs) | |
|
385 | try: | |
|
386 | BaseClass.run(self, dataOut, **self.kwargs) | |
|
387 | except: | |
|
388 | self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc())) | |
|
389 | dataOut.error = True | |
|
377 | 390 | else: |
|
378 | 391 | break |
|
379 | 392 |
General Comments 0
You need to be logged in to leave comments.
Login now