##// END OF EJS Templates
Update reference ISR repository
Alexander Valdez -
r1674:6310ac0f91c8
parent child
Show More
@@ -3,7 +3,7 Base clases to create Processing units and operations, the MPDecorator
3 must be used in plotting and writing operations to allow to run as an
3 must be used in plotting and writing operations to allow to run as an
4 external process.
4 external process.
5 '''
5 '''
6 # repositorio master
6 import os
7 import inspect
7 import inspect
8 import zmq
8 import zmq
9 import time
9 import time
@@ -12,7 +12,8 import traceback
12 from threading import Thread
12 from threading import Thread
13 from multiprocessing import Process, Queue
13 from multiprocessing import Process, Queue
14 from schainpy.utils import log
14 from schainpy.utils import log
15
15 import copy
16 QUEUE_SIZE = int(os.environ.get('QUEUE_MAX_SIZE', '10'))
16
17
17 class ProcessingUnit(object):
18 class ProcessingUnit(object):
18 '''
19 '''
@@ -27,10 +28,19 class ProcessingUnit(object):
27 self.dataOut = None
28 self.dataOut = None
28 self.isConfig = False
29 self.isConfig = False
29 self.operations = []
30 self.operations = []
31 self.name = 'Test'
32 self.inputs = []
30
33
31 def setInput(self, unit):
34 def setInput(self, unit):
32
35
33 self.dataIn = unit.dataOut
36 attr = 'dataIn'
37 for i, u in enumerate(unit):
38 if i==0:
39 self.dataIn = u.dataOut#.copy()
40 self.inputs.append('dataIn')
41 else:
42 setattr(self, 'dataIn{}'.format(i), u.dataOut)#.copy())
43 self.inputs.append('dataIn{}'.format(i))
34
44
35 def getAllowedArgs(self):
45 def getAllowedArgs(self):
36 if hasattr(self, '__attrs__'):
46 if hasattr(self, '__attrs__'):
@@ -57,6 +67,9 class ProcessingUnit(object):
57
67
58 try:
68 try:
59 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
69 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
70 if self.dataIn.runNextUnit:
71 return not self.dataIn.isReady()
72 else:
60 return self.dataIn.isReady()
73 return self.dataIn.isReady()
61 elif self.dataIn is None or not self.dataIn.error:
74 elif self.dataIn is None or not self.dataIn.error:
62 self.run(**kwargs)
75 self.run(**kwargs)
@@ -72,16 +85,22 class ProcessingUnit(object):
72 else:
85 else:
73 log.error(err, self.name)
86 log.error(err, self.name)
74 self.dataOut.error = True
87 self.dataOut.error = True
75
76 for op, optype, opkwargs in self.operations:
88 for op, optype, opkwargs in self.operations:
89 aux = self.dataOut.copy()
77 if optype == 'other' and not self.dataOut.flagNoData:
90 if optype == 'other' and not self.dataOut.flagNoData:
78 self.dataOut = op.run(self.dataOut, **opkwargs)
91 self.dataOut = op.run(self.dataOut, **opkwargs)
79 elif optype == 'external' and not self.dataOut.flagNoData:
92 elif optype == 'external' and not self.dataOut.flagNoData:
80 op.queue.put(self.dataOut)
93 op.queue.put(aux)
81 elif optype == 'external' and self.dataOut.error:
94 elif optype == 'external' and self.dataOut.error:
82 op.queue.put(self.dataOut)
95 op.queue.put(aux)
83
96 try:
84 return 'Error' if self.dataOut.error else self.dataOut.isReady()
97 if self.dataOut.runNextUnit:
98 runNextUnit = self.dataOut.runNextUnit
99 else:
100 runNextUnit = self.dataOut.isReady()
101 except:
102 runNextUnit = self.dataOut.isReady()
103 return 'Error' if self.dataOut.error else runNextUnit# self.dataOut.isReady()
85
104
86 def setup(self):
105 def setup(self):
87
106
@@ -175,7 +194,7 def MPDecorator(BaseClass):
175
194
176 self.start_time = time.time()
195 self.start_time = time.time()
177 self.err_queue = args[3]
196 self.err_queue = args[3]
178 self.queue = Queue(maxsize=1)
197 self.queue = Queue(maxsize=QUEUE_SIZE)
179 self.myrun = BaseClass.run
198 self.myrun = BaseClass.run
180
199
181 def run(self):
200 def run(self):
General Comments 0
You need to be logged in to leave comments. Login now