@@ -4,6 +4,7 must be used in plotting and writing operations to allow to run as an | |||||
4 | external process. |
|
4 | external process. | |
5 | ''' |
|
5 | ''' | |
6 |
|
6 | |||
|
7 | import os | |||
7 | import inspect |
|
8 | import inspect | |
8 | import zmq |
|
9 | import zmq | |
9 | import time |
|
10 | import time | |
@@ -13,6 +14,7 from threading import Thread | |||||
13 | from multiprocessing import Process, Queue |
|
14 | from multiprocessing import Process, Queue | |
14 | from schainpy.utils import log |
|
15 | from schainpy.utils import log | |
15 |
|
16 | |||
|
17 | QUEUE_SIZE = int(os.environ.get('QUEUE_MAX_SIZE', '10')) | |||
16 |
|
18 | |||
17 | class ProcessingUnit(object): |
|
19 | class ProcessingUnit(object): | |
18 | ''' |
|
20 | ''' | |
@@ -74,12 +76,13 class ProcessingUnit(object): | |||||
74 | self.dataOut.error = True |
|
76 | self.dataOut.error = True | |
75 |
|
77 | |||
76 | for op, optype, opkwargs in self.operations: |
|
78 | for op, optype, opkwargs in self.operations: | |
|
79 | aux = self.dataOut.copy() | |||
77 | if optype == 'other' and not self.dataOut.flagNoData: |
|
80 | if optype == 'other' and not self.dataOut.flagNoData: | |
78 | self.dataOut = op.run(self.dataOut, **opkwargs) |
|
81 | self.dataOut = op.run(self.dataOut, **opkwargs) | |
79 | elif optype == 'external' and not self.dataOut.flagNoData: |
|
82 | elif optype == 'external' and not self.dataOut.flagNoData: | |
80 |
op.queue.put( |
|
83 | op.queue.put(aux) | |
81 | elif optype == 'external' and self.dataOut.error: |
|
84 | elif optype == 'external' and self.dataOut.error: | |
82 |
op.queue.put( |
|
85 | op.queue.put(aux) | |
83 |
|
86 | |||
84 | return 'Error' if self.dataOut.error else self.dataOut.isReady() |
|
87 | return 'Error' if self.dataOut.error else self.dataOut.isReady() | |
85 |
|
88 | |||
@@ -175,7 +178,7 def MPDecorator(BaseClass): | |||||
175 |
|
178 | |||
176 | self.start_time = time.time() |
|
179 | self.start_time = time.time() | |
177 | self.err_queue = args[3] |
|
180 | self.err_queue = args[3] | |
178 |
self.queue = Queue(maxsize= |
|
181 | self.queue = Queue(maxsize=QUEUE_SIZE) | |
179 | self.myrun = BaseClass.run |
|
182 | self.myrun = BaseClass.run | |
180 |
|
183 | |||
181 | def run(self): |
|
184 | def run(self): |
General Comments 0
You need to be logged in to leave comments.
Login now