##// END OF EJS Templates
Update reference ISR repository
Alexander Valdez -
r1674:6310ac0f91c8
parent child
Show More
@@ -3,7 +3,7 Base clases to create Processing units and operations, the MPDecorator
3 must be used in plotting and writing operations to allow to run as an
3 must be used in plotting and writing operations to allow to run as an
4 external process.
4 external process.
5 '''
5 '''
6 # repositorio master
6 import os
7 import inspect
7 import inspect
8 import zmq
8 import zmq
9 import time
9 import time
@@ -12,7 +12,8 import traceback
12 from threading import Thread
12 from threading import Thread
13 from multiprocessing import Process, Queue
13 from multiprocessing import Process, Queue
14 from schainpy.utils import log
14 from schainpy.utils import log
15
15 import copy
16 QUEUE_SIZE = int(os.environ.get('QUEUE_MAX_SIZE', '10'))
16
17
17 class ProcessingUnit(object):
18 class ProcessingUnit(object):
18 '''
19 '''
@@ -27,11 +28,20 class ProcessingUnit(object):
27 self.dataOut = None
28 self.dataOut = None
28 self.isConfig = False
29 self.isConfig = False
29 self.operations = []
30 self.operations = []
30
31 self.name = 'Test'
32 self.inputs = []
33
31 def setInput(self, unit):
34 def setInput(self, unit):
32
35
33 self.dataIn = unit.dataOut
36 attr = 'dataIn'
34
37 for i, u in enumerate(unit):
38 if i==0:
39 self.dataIn = u.dataOut#.copy()
40 self.inputs.append('dataIn')
41 else:
42 setattr(self, 'dataIn{}'.format(i), u.dataOut)#.copy())
43 self.inputs.append('dataIn{}'.format(i))
44
35 def getAllowedArgs(self):
45 def getAllowedArgs(self):
36 if hasattr(self, '__attrs__'):
46 if hasattr(self, '__attrs__'):
37 return self.__attrs__
47 return self.__attrs__
@@ -41,7 +51,7 class ProcessingUnit(object):
41 def addOperation(self, conf, operation):
51 def addOperation(self, conf, operation):
42 '''
52 '''
43 '''
53 '''
44
54
45 self.operations.append((operation, conf.type, conf.getKwargs()))
55 self.operations.append((operation, conf.type, conf.getKwargs()))
46
56
47 def getOperationObj(self, objId):
57 def getOperationObj(self, objId):
@@ -57,14 +67,17 class ProcessingUnit(object):
57
67
58 try:
68 try:
59 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
69 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
60 return self.dataIn.isReady()
70 if self.dataIn.runNextUnit:
71 return not self.dataIn.isReady()
72 else:
73 return self.dataIn.isReady()
61 elif self.dataIn is None or not self.dataIn.error:
74 elif self.dataIn is None or not self.dataIn.error:
62 self.run(**kwargs)
75 self.run(**kwargs)
63 elif self.dataIn.error:
76 elif self.dataIn.error:
64 self.dataOut.error = self.dataIn.error
77 self.dataOut.error = self.dataIn.error
65 self.dataOut.flagNoData = True
78 self.dataOut.flagNoData = True
66 except:
79 except:
67 err = traceback.format_exc()
80 err = traceback.format_exc()
68 if 'SchainWarning' in err:
81 if 'SchainWarning' in err:
69 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
82 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
70 elif 'SchainError' in err:
83 elif 'SchainError' in err:
@@ -72,16 +85,22 class ProcessingUnit(object):
72 else:
85 else:
73 log.error(err, self.name)
86 log.error(err, self.name)
74 self.dataOut.error = True
87 self.dataOut.error = True
75
76 for op, optype, opkwargs in self.operations:
88 for op, optype, opkwargs in self.operations:
89 aux = self.dataOut.copy()
77 if optype == 'other' and not self.dataOut.flagNoData:
90 if optype == 'other' and not self.dataOut.flagNoData:
78 self.dataOut = op.run(self.dataOut, **opkwargs)
91 self.dataOut = op.run(self.dataOut, **opkwargs)
79 elif optype == 'external' and not self.dataOut.flagNoData:
92 elif optype == 'external' and not self.dataOut.flagNoData:
80 op.queue.put(self.dataOut)
93 op.queue.put(aux)
81 elif optype == 'external' and self.dataOut.error:
94 elif optype == 'external' and self.dataOut.error:
82 op.queue.put(self.dataOut)
95 op.queue.put(aux)
83
96 try:
84 return 'Error' if self.dataOut.error else self.dataOut.isReady()
97 if self.dataOut.runNextUnit:
98 runNextUnit = self.dataOut.runNextUnit
99 else:
100 runNextUnit = self.dataOut.isReady()
101 except:
102 runNextUnit = self.dataOut.isReady()
103 return 'Error' if self.dataOut.error else runNextUnit# self.dataOut.isReady()
85
104
86 def setup(self):
105 def setup(self):
87
106
@@ -100,7 +119,7 class Operation(object):
100
119
101 '''
120 '''
102 '''
121 '''
103
122
104 proc_type = 'operation'
123 proc_type = 'operation'
105
124
106 def __init__(self):
125 def __init__(self):
@@ -149,12 +168,12 class Operation(object):
149
168
150 return
169 return
151
170
152
171
153 def MPDecorator(BaseClass):
172 def MPDecorator(BaseClass):
154 """
173 """
155 Multiprocessing class decorator
174 Multiprocessing class decorator
156
175
157 This function add multiprocessing features to a BaseClass.
176 This function add multiprocessing features to a BaseClass.
158 """
177 """
159
178
160 class MPClass(BaseClass, Process):
179 class MPClass(BaseClass, Process):
@@ -169,17 +188,17 def MPDecorator(BaseClass):
169 self.op_type = 'external'
188 self.op_type = 'external'
170 self.name = BaseClass.__name__
189 self.name = BaseClass.__name__
171 self.__doc__ = BaseClass.__doc__
190 self.__doc__ = BaseClass.__doc__
172
191
173 if 'plot' in self.name.lower() and not self.name.endswith('_'):
192 if 'plot' in self.name.lower() and not self.name.endswith('_'):
174 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
193 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
175
194
176 self.start_time = time.time()
195 self.start_time = time.time()
177 self.err_queue = args[3]
196 self.err_queue = args[3]
178 self.queue = Queue(maxsize=1)
197 self.queue = Queue(maxsize=QUEUE_SIZE)
179 self.myrun = BaseClass.run
198 self.myrun = BaseClass.run
180
199
181 def run(self):
200 def run(self):
182
201
183 while True:
202 while True:
184
203
185 dataOut = self.queue.get()
204 dataOut = self.queue.get()
@@ -188,7 +207,7 def MPDecorator(BaseClass):
188 try:
207 try:
189 BaseClass.run(self, dataOut, **self.kwargs)
208 BaseClass.run(self, dataOut, **self.kwargs)
190 except:
209 except:
191 err = traceback.format_exc()
210 err = traceback.format_exc()
192 log.error(err, self.name)
211 log.error(err, self.name)
193 else:
212 else:
194 break
213 break
@@ -198,6 +217,6 def MPDecorator(BaseClass):
198 def close(self):
217 def close(self):
199
218
200 BaseClass.close(self)
219 BaseClass.close(self)
201 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
220 log.success('Done...(Time:{:4.2f} secs)'.format(time.time() - self.start_time), self.name)
202
221
203 return MPClass
222 return MPClass No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now