##// END OF EJS Templates
Update reference ISR repository
Alexander Valdez -
r1674:6310ac0f91c8
parent child
Show More
@@ -3,7 +3,7 Base clases to create Processing units and operations, the MPDecorator
3 3 must be used in plotting and writing operations to allow to run as an
4 4 external process.
5 5 '''
6 # repositorio master
6 import os
7 7 import inspect
8 8 import zmq
9 9 import time
@@ -12,7 +12,8 import traceback
12 12 from threading import Thread
13 13 from multiprocessing import Process, Queue
14 14 from schainpy.utils import log
15
15 import copy
16 QUEUE_SIZE = int(os.environ.get('QUEUE_MAX_SIZE', '10'))
16 17
17 18 class ProcessingUnit(object):
18 19 '''
@@ -27,11 +28,20 class ProcessingUnit(object):
27 28 self.dataOut = None
28 29 self.isConfig = False
29 30 self.operations = []
30
31 self.name = 'Test'
32 self.inputs = []
33
31 34 def setInput(self, unit):
32 35
33 self.dataIn = unit.dataOut
34
36 attr = 'dataIn'
37 for i, u in enumerate(unit):
38 if i==0:
39 self.dataIn = u.dataOut#.copy()
40 self.inputs.append('dataIn')
41 else:
42 setattr(self, 'dataIn{}'.format(i), u.dataOut)#.copy())
43 self.inputs.append('dataIn{}'.format(i))
44
35 45 def getAllowedArgs(self):
36 46 if hasattr(self, '__attrs__'):
37 47 return self.__attrs__
@@ -41,7 +51,7 class ProcessingUnit(object):
41 51 def addOperation(self, conf, operation):
42 52 '''
43 53 '''
44
54
45 55 self.operations.append((operation, conf.type, conf.getKwargs()))
46 56
47 57 def getOperationObj(self, objId):
@@ -57,14 +67,17 class ProcessingUnit(object):
57 67
58 68 try:
59 69 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
60 return self.dataIn.isReady()
70 if self.dataIn.runNextUnit:
71 return not self.dataIn.isReady()
72 else:
73 return self.dataIn.isReady()
61 74 elif self.dataIn is None or not self.dataIn.error:
62 75 self.run(**kwargs)
63 76 elif self.dataIn.error:
64 77 self.dataOut.error = self.dataIn.error
65 78 self.dataOut.flagNoData = True
66 79 except:
67 err = traceback.format_exc()
80 err = traceback.format_exc()
68 81 if 'SchainWarning' in err:
69 82 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
70 83 elif 'SchainError' in err:
@@ -72,16 +85,22 class ProcessingUnit(object):
72 85 else:
73 86 log.error(err, self.name)
74 87 self.dataOut.error = True
75
76 88 for op, optype, opkwargs in self.operations:
89 aux = self.dataOut.copy()
77 90 if optype == 'other' and not self.dataOut.flagNoData:
78 91 self.dataOut = op.run(self.dataOut, **opkwargs)
79 92 elif optype == 'external' and not self.dataOut.flagNoData:
80 op.queue.put(self.dataOut)
81 elif optype == 'external' and self.dataOut.error:
82 op.queue.put(self.dataOut)
83
84 return 'Error' if self.dataOut.error else self.dataOut.isReady()
93 op.queue.put(aux)
94 elif optype == 'external' and self.dataOut.error:
95 op.queue.put(aux)
96 try:
97 if self.dataOut.runNextUnit:
98 runNextUnit = self.dataOut.runNextUnit
99 else:
100 runNextUnit = self.dataOut.isReady()
101 except:
102 runNextUnit = self.dataOut.isReady()
103 return 'Error' if self.dataOut.error else runNextUnit# self.dataOut.isReady()
85 104
86 105 def setup(self):
87 106
@@ -100,7 +119,7 class Operation(object):
100 119
101 120 '''
102 121 '''
103
122
104 123 proc_type = 'operation'
105 124
106 125 def __init__(self):
@@ -149,12 +168,12 class Operation(object):
149 168
150 169 return
151 170
152
171
153 172 def MPDecorator(BaseClass):
154 173 """
155 174 Multiprocessing class decorator
156 175
157 This function add multiprocessing features to a BaseClass.
176 This function add multiprocessing features to a BaseClass.
158 177 """
159 178
160 179 class MPClass(BaseClass, Process):
@@ -169,17 +188,17 def MPDecorator(BaseClass):
169 188 self.op_type = 'external'
170 189 self.name = BaseClass.__name__
171 190 self.__doc__ = BaseClass.__doc__
172
191
173 192 if 'plot' in self.name.lower() and not self.name.endswith('_'):
174 193 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
175
194
176 195 self.start_time = time.time()
177 196 self.err_queue = args[3]
178 self.queue = Queue(maxsize=1)
197 self.queue = Queue(maxsize=QUEUE_SIZE)
179 198 self.myrun = BaseClass.run
180 199
181 200 def run(self):
182
201
183 202 while True:
184 203
185 204 dataOut = self.queue.get()
@@ -188,7 +207,7 def MPDecorator(BaseClass):
188 207 try:
189 208 BaseClass.run(self, dataOut, **self.kwargs)
190 209 except:
191 err = traceback.format_exc()
210 err = traceback.format_exc()
192 211 log.error(err, self.name)
193 212 else:
194 213 break
@@ -198,6 +217,6 def MPDecorator(BaseClass):
198 217 def close(self):
199 218
200 219 BaseClass.close(self)
201 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
220 log.success('Done...(Time:{:4.2f} secs)'.format(time.time() - self.start_time), self.name)
202 221
203 return MPClass
222 return MPClass No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now