##// END OF EJS Templates
Update reference ISR repository
Alexander Valdez -
r1674:6310ac0f91c8
parent child
Show More
@@ -1,203 +1,222
1 1 '''
2 2 Base clases to create Processing units and operations, the MPDecorator
3 3 must be used in plotting and writing operations to allow to run as an
4 4 external process.
5 5 '''
6 # repositorio master
6 import os
7 7 import inspect
8 8 import zmq
9 9 import time
10 10 import pickle
11 11 import traceback
12 12 from threading import Thread
13 13 from multiprocessing import Process, Queue
14 14 from schainpy.utils import log
15
15 import copy
16 QUEUE_SIZE = int(os.environ.get('QUEUE_MAX_SIZE', '10'))
16 17
17 18 class ProcessingUnit(object):
18 19 '''
19 20 Base class to create Signal Chain Units
20 21 '''
21 22
22 23 proc_type = 'processing'
23 24
24 25 def __init__(self):
25 26
26 27 self.dataIn = None
27 28 self.dataOut = None
28 29 self.isConfig = False
29 30 self.operations = []
30
31 self.name = 'Test'
32 self.inputs = []
33
31 34 def setInput(self, unit):
32 35
33 self.dataIn = unit.dataOut
34
36 attr = 'dataIn'
37 for i, u in enumerate(unit):
38 if i==0:
39 self.dataIn = u.dataOut#.copy()
40 self.inputs.append('dataIn')
41 else:
42 setattr(self, 'dataIn{}'.format(i), u.dataOut)#.copy())
43 self.inputs.append('dataIn{}'.format(i))
44
35 45 def getAllowedArgs(self):
36 46 if hasattr(self, '__attrs__'):
37 47 return self.__attrs__
38 48 else:
39 49 return inspect.getargspec(self.run).args
40 50
41 51 def addOperation(self, conf, operation):
42 52 '''
43 53 '''
44
54
45 55 self.operations.append((operation, conf.type, conf.getKwargs()))
46 56
47 57 def getOperationObj(self, objId):
48 58
49 59 if objId not in list(self.operations.keys()):
50 60 return None
51 61
52 62 return self.operations[objId]
53 63
54 64 def call(self, **kwargs):
55 65 '''
56 66 '''
57 67
58 68 try:
59 69 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
60 return self.dataIn.isReady()
70 if self.dataIn.runNextUnit:
71 return not self.dataIn.isReady()
72 else:
73 return self.dataIn.isReady()
61 74 elif self.dataIn is None or not self.dataIn.error:
62 75 self.run(**kwargs)
63 76 elif self.dataIn.error:
64 77 self.dataOut.error = self.dataIn.error
65 78 self.dataOut.flagNoData = True
66 79 except:
67 err = traceback.format_exc()
80 err = traceback.format_exc()
68 81 if 'SchainWarning' in err:
69 82 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
70 83 elif 'SchainError' in err:
71 84 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name)
72 85 else:
73 86 log.error(err, self.name)
74 87 self.dataOut.error = True
75
76 88 for op, optype, opkwargs in self.operations:
89 aux = self.dataOut.copy()
77 90 if optype == 'other' and not self.dataOut.flagNoData:
78 91 self.dataOut = op.run(self.dataOut, **opkwargs)
79 92 elif optype == 'external' and not self.dataOut.flagNoData:
80 op.queue.put(self.dataOut)
81 elif optype == 'external' and self.dataOut.error:
82 op.queue.put(self.dataOut)
83
84 return 'Error' if self.dataOut.error else self.dataOut.isReady()
93 op.queue.put(aux)
94 elif optype == 'external' and self.dataOut.error:
95 op.queue.put(aux)
96 try:
97 if self.dataOut.runNextUnit:
98 runNextUnit = self.dataOut.runNextUnit
99 else:
100 runNextUnit = self.dataOut.isReady()
101 except:
102 runNextUnit = self.dataOut.isReady()
103 return 'Error' if self.dataOut.error else runNextUnit# self.dataOut.isReady()
85 104
86 105 def setup(self):
87 106
88 107 raise NotImplementedError
89 108
90 109 def run(self):
91 110
92 111 raise NotImplementedError
93 112
94 113 def close(self):
95 114
96 115 return
97 116
98 117
99 118 class Operation(object):
100 119
101 120 '''
102 121 '''
103
122
104 123 proc_type = 'operation'
105 124
106 125 def __init__(self):
107 126
108 127 self.id = None
109 128 self.isConfig = False
110 129
111 130 if not hasattr(self, 'name'):
112 131 self.name = self.__class__.__name__
113 132
114 133 def getAllowedArgs(self):
115 134 if hasattr(self, '__attrs__'):
116 135 return self.__attrs__
117 136 else:
118 137 return inspect.getargspec(self.run).args
119 138
120 139 def setup(self):
121 140
122 141 self.isConfig = True
123 142
124 143 raise NotImplementedError
125 144
126 145 def run(self, dataIn, **kwargs):
127 146 """
128 147 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
129 148 atributos del objeto dataIn.
130 149
131 150 Input:
132 151
133 152 dataIn : objeto del tipo JROData
134 153
135 154 Return:
136 155
137 156 None
138 157
139 158 Affected:
140 159 __buffer : buffer de recepcion de datos.
141 160
142 161 """
143 162 if not self.isConfig:
144 163 self.setup(**kwargs)
145 164
146 165 raise NotImplementedError
147 166
148 167 def close(self):
149 168
150 169 return
151 170
152
171
153 172 def MPDecorator(BaseClass):
154 173 """
155 174 Multiprocessing class decorator
156 175
157 This function add multiprocessing features to a BaseClass.
176 This function add multiprocessing features to a BaseClass.
158 177 """
159 178
160 179 class MPClass(BaseClass, Process):
161 180
162 181 def __init__(self, *args, **kwargs):
163 182 super(MPClass, self).__init__()
164 183 Process.__init__(self)
165 184
166 185 self.args = args
167 186 self.kwargs = kwargs
168 187 self.t = time.time()
169 188 self.op_type = 'external'
170 189 self.name = BaseClass.__name__
171 190 self.__doc__ = BaseClass.__doc__
172
191
173 192 if 'plot' in self.name.lower() and not self.name.endswith('_'):
174 193 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
175
194
176 195 self.start_time = time.time()
177 196 self.err_queue = args[3]
178 self.queue = Queue(maxsize=1)
197 self.queue = Queue(maxsize=QUEUE_SIZE)
179 198 self.myrun = BaseClass.run
180 199
181 200 def run(self):
182
201
183 202 while True:
184 203
185 204 dataOut = self.queue.get()
186 205
187 206 if not dataOut.error:
188 207 try:
189 208 BaseClass.run(self, dataOut, **self.kwargs)
190 209 except:
191 err = traceback.format_exc()
210 err = traceback.format_exc()
192 211 log.error(err, self.name)
193 212 else:
194 213 break
195 214
196 215 self.close()
197 216
198 217 def close(self):
199 218
200 219 BaseClass.close(self)
201 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
220 log.success('Done...(Time:{:4.2f} secs)'.format(time.time() - self.start_time), self.name)
202 221
203 return MPClass
222 return MPClass No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now