##// END OF EJS Templates
Update reference ISR repository
Alexander Valdez -
r1674:6310ac0f91c8
parent child
Show More
@@ -3,7 +3,7 Base clases to create Processing units and operations, the MPDecorator
3 3 must be used in plotting and writing operations to allow to run as an
4 4 external process.
5 5 '''
6 # repositorio master
6 import os
7 7 import inspect
8 8 import zmq
9 9 import time
@@ -12,7 +12,8 import traceback
12 12 from threading import Thread
13 13 from multiprocessing import Process, Queue
14 14 from schainpy.utils import log
15
15 import copy
16 QUEUE_SIZE = int(os.environ.get('QUEUE_MAX_SIZE', '10'))
16 17
17 18 class ProcessingUnit(object):
18 19 '''
@@ -27,10 +28,19 class ProcessingUnit(object):
27 28 self.dataOut = None
28 29 self.isConfig = False
29 30 self.operations = []
31 self.name = 'Test'
32 self.inputs = []
30 33
31 34 def setInput(self, unit):
32 35
33 self.dataIn = unit.dataOut
36 attr = 'dataIn'
37 for i, u in enumerate(unit):
38 if i==0:
39 self.dataIn = u.dataOut#.copy()
40 self.inputs.append('dataIn')
41 else:
42 setattr(self, 'dataIn{}'.format(i), u.dataOut)#.copy())
43 self.inputs.append('dataIn{}'.format(i))
34 44
35 45 def getAllowedArgs(self):
36 46 if hasattr(self, '__attrs__'):
@@ -57,6 +67,9 class ProcessingUnit(object):
57 67
58 68 try:
59 69 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
70 if self.dataIn.runNextUnit:
71 return not self.dataIn.isReady()
72 else:
60 73 return self.dataIn.isReady()
61 74 elif self.dataIn is None or not self.dataIn.error:
62 75 self.run(**kwargs)
@@ -72,16 +85,22 class ProcessingUnit(object):
72 85 else:
73 86 log.error(err, self.name)
74 87 self.dataOut.error = True
75
76 88 for op, optype, opkwargs in self.operations:
89 aux = self.dataOut.copy()
77 90 if optype == 'other' and not self.dataOut.flagNoData:
78 91 self.dataOut = op.run(self.dataOut, **opkwargs)
79 92 elif optype == 'external' and not self.dataOut.flagNoData:
80 op.queue.put(self.dataOut)
93 op.queue.put(aux)
81 94 elif optype == 'external' and self.dataOut.error:
82 op.queue.put(self.dataOut)
83
84 return 'Error' if self.dataOut.error else self.dataOut.isReady()
95 op.queue.put(aux)
96 try:
97 if self.dataOut.runNextUnit:
98 runNextUnit = self.dataOut.runNextUnit
99 else:
100 runNextUnit = self.dataOut.isReady()
101 except:
102 runNextUnit = self.dataOut.isReady()
103 return 'Error' if self.dataOut.error else runNextUnit# self.dataOut.isReady()
85 104
86 105 def setup(self):
87 106
@@ -175,7 +194,7 def MPDecorator(BaseClass):
175 194
176 195 self.start_time = time.time()
177 196 self.err_queue = args[3]
178 self.queue = Queue(maxsize=1)
197 self.queue = Queue(maxsize=QUEUE_SIZE)
179 198 self.myrun = BaseClass.run
180 199
181 200 def run(self):
General Comments 0
You need to be logged in to leave comments. Login now