##// END OF EJS Templates
Update reference ISR repository
Alexander Valdez -
r1674:6310ac0f91c8
parent child
Show More
@@ -1,203 +1,222
1 '''
1 '''
2 Base clases to create Processing units and operations, the MPDecorator
2 Base clases to create Processing units and operations, the MPDecorator
3 must be used in plotting and writing operations to allow to run as an
3 must be used in plotting and writing operations to allow to run as an
4 external process.
4 external process.
5 '''
5 '''
6 # repositorio master
6 import os
7 import inspect
7 import inspect
8 import zmq
8 import zmq
9 import time
9 import time
10 import pickle
10 import pickle
11 import traceback
11 import traceback
12 from threading import Thread
12 from threading import Thread
13 from multiprocessing import Process, Queue
13 from multiprocessing import Process, Queue
14 from schainpy.utils import log
14 from schainpy.utils import log
15
15 import copy
16 QUEUE_SIZE = int(os.environ.get('QUEUE_MAX_SIZE', '10'))
16
17
17 class ProcessingUnit(object):
18 class ProcessingUnit(object):
18 '''
19 '''
19 Base class to create Signal Chain Units
20 Base class to create Signal Chain Units
20 '''
21 '''
21
22
22 proc_type = 'processing'
23 proc_type = 'processing'
23
24
24 def __init__(self):
25 def __init__(self):
25
26
26 self.dataIn = None
27 self.dataIn = None
27 self.dataOut = None
28 self.dataOut = None
28 self.isConfig = False
29 self.isConfig = False
29 self.operations = []
30 self.operations = []
30
31 self.name = 'Test'
32 self.inputs = []
33
31 def setInput(self, unit):
34 def setInput(self, unit):
32
35
33 self.dataIn = unit.dataOut
36 attr = 'dataIn'
34
37 for i, u in enumerate(unit):
38 if i==0:
39 self.dataIn = u.dataOut#.copy()
40 self.inputs.append('dataIn')
41 else:
42 setattr(self, 'dataIn{}'.format(i), u.dataOut)#.copy())
43 self.inputs.append('dataIn{}'.format(i))
44
35 def getAllowedArgs(self):
45 def getAllowedArgs(self):
36 if hasattr(self, '__attrs__'):
46 if hasattr(self, '__attrs__'):
37 return self.__attrs__
47 return self.__attrs__
38 else:
48 else:
39 return inspect.getargspec(self.run).args
49 return inspect.getargspec(self.run).args
40
50
41 def addOperation(self, conf, operation):
51 def addOperation(self, conf, operation):
42 '''
52 '''
43 '''
53 '''
44
54
45 self.operations.append((operation, conf.type, conf.getKwargs()))
55 self.operations.append((operation, conf.type, conf.getKwargs()))
46
56
47 def getOperationObj(self, objId):
57 def getOperationObj(self, objId):
48
58
49 if objId not in list(self.operations.keys()):
59 if objId not in list(self.operations.keys()):
50 return None
60 return None
51
61
52 return self.operations[objId]
62 return self.operations[objId]
53
63
54 def call(self, **kwargs):
64 def call(self, **kwargs):
55 '''
65 '''
56 '''
66 '''
57
67
58 try:
68 try:
59 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
69 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
60 return self.dataIn.isReady()
70 if self.dataIn.runNextUnit:
71 return not self.dataIn.isReady()
72 else:
73 return self.dataIn.isReady()
61 elif self.dataIn is None or not self.dataIn.error:
74 elif self.dataIn is None or not self.dataIn.error:
62 self.run(**kwargs)
75 self.run(**kwargs)
63 elif self.dataIn.error:
76 elif self.dataIn.error:
64 self.dataOut.error = self.dataIn.error
77 self.dataOut.error = self.dataIn.error
65 self.dataOut.flagNoData = True
78 self.dataOut.flagNoData = True
66 except:
79 except:
67 err = traceback.format_exc()
80 err = traceback.format_exc()
68 if 'SchainWarning' in err:
81 if 'SchainWarning' in err:
69 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
82 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
70 elif 'SchainError' in err:
83 elif 'SchainError' in err:
71 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name)
84 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name)
72 else:
85 else:
73 log.error(err, self.name)
86 log.error(err, self.name)
74 self.dataOut.error = True
87 self.dataOut.error = True
75
76 for op, optype, opkwargs in self.operations:
88 for op, optype, opkwargs in self.operations:
89 aux = self.dataOut.copy()
77 if optype == 'other' and not self.dataOut.flagNoData:
90 if optype == 'other' and not self.dataOut.flagNoData:
78 self.dataOut = op.run(self.dataOut, **opkwargs)
91 self.dataOut = op.run(self.dataOut, **opkwargs)
79 elif optype == 'external' and not self.dataOut.flagNoData:
92 elif optype == 'external' and not self.dataOut.flagNoData:
80 op.queue.put(self.dataOut)
93 op.queue.put(aux)
81 elif optype == 'external' and self.dataOut.error:
94 elif optype == 'external' and self.dataOut.error:
82 op.queue.put(self.dataOut)
95 op.queue.put(aux)
83
96 try:
84 return 'Error' if self.dataOut.error else self.dataOut.isReady()
97 if self.dataOut.runNextUnit:
98 runNextUnit = self.dataOut.runNextUnit
99 else:
100 runNextUnit = self.dataOut.isReady()
101 except:
102 runNextUnit = self.dataOut.isReady()
103 return 'Error' if self.dataOut.error else runNextUnit# self.dataOut.isReady()
85
104
86 def setup(self):
105 def setup(self):
87
106
88 raise NotImplementedError
107 raise NotImplementedError
89
108
90 def run(self):
109 def run(self):
91
110
92 raise NotImplementedError
111 raise NotImplementedError
93
112
94 def close(self):
113 def close(self):
95
114
96 return
115 return
97
116
98
117
99 class Operation(object):
118 class Operation(object):
100
119
101 '''
120 '''
102 '''
121 '''
103
122
104 proc_type = 'operation'
123 proc_type = 'operation'
105
124
106 def __init__(self):
125 def __init__(self):
107
126
108 self.id = None
127 self.id = None
109 self.isConfig = False
128 self.isConfig = False
110
129
111 if not hasattr(self, 'name'):
130 if not hasattr(self, 'name'):
112 self.name = self.__class__.__name__
131 self.name = self.__class__.__name__
113
132
114 def getAllowedArgs(self):
133 def getAllowedArgs(self):
115 if hasattr(self, '__attrs__'):
134 if hasattr(self, '__attrs__'):
116 return self.__attrs__
135 return self.__attrs__
117 else:
136 else:
118 return inspect.getargspec(self.run).args
137 return inspect.getargspec(self.run).args
119
138
120 def setup(self):
139 def setup(self):
121
140
122 self.isConfig = True
141 self.isConfig = True
123
142
124 raise NotImplementedError
143 raise NotImplementedError
125
144
126 def run(self, dataIn, **kwargs):
145 def run(self, dataIn, **kwargs):
127 """
146 """
128 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
147 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
129 atributos del objeto dataIn.
148 atributos del objeto dataIn.
130
149
131 Input:
150 Input:
132
151
133 dataIn : objeto del tipo JROData
152 dataIn : objeto del tipo JROData
134
153
135 Return:
154 Return:
136
155
137 None
156 None
138
157
139 Affected:
158 Affected:
140 __buffer : buffer de recepcion de datos.
159 __buffer : buffer de recepcion de datos.
141
160
142 """
161 """
143 if not self.isConfig:
162 if not self.isConfig:
144 self.setup(**kwargs)
163 self.setup(**kwargs)
145
164
146 raise NotImplementedError
165 raise NotImplementedError
147
166
148 def close(self):
167 def close(self):
149
168
150 return
169 return
151
170
152
171
153 def MPDecorator(BaseClass):
172 def MPDecorator(BaseClass):
154 """
173 """
155 Multiprocessing class decorator
174 Multiprocessing class decorator
156
175
157 This function add multiprocessing features to a BaseClass.
176 This function add multiprocessing features to a BaseClass.
158 """
177 """
159
178
160 class MPClass(BaseClass, Process):
179 class MPClass(BaseClass, Process):
161
180
162 def __init__(self, *args, **kwargs):
181 def __init__(self, *args, **kwargs):
163 super(MPClass, self).__init__()
182 super(MPClass, self).__init__()
164 Process.__init__(self)
183 Process.__init__(self)
165
184
166 self.args = args
185 self.args = args
167 self.kwargs = kwargs
186 self.kwargs = kwargs
168 self.t = time.time()
187 self.t = time.time()
169 self.op_type = 'external'
188 self.op_type = 'external'
170 self.name = BaseClass.__name__
189 self.name = BaseClass.__name__
171 self.__doc__ = BaseClass.__doc__
190 self.__doc__ = BaseClass.__doc__
172
191
173 if 'plot' in self.name.lower() and not self.name.endswith('_'):
192 if 'plot' in self.name.lower() and not self.name.endswith('_'):
174 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
193 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
175
194
176 self.start_time = time.time()
195 self.start_time = time.time()
177 self.err_queue = args[3]
196 self.err_queue = args[3]
178 self.queue = Queue(maxsize=1)
197 self.queue = Queue(maxsize=QUEUE_SIZE)
179 self.myrun = BaseClass.run
198 self.myrun = BaseClass.run
180
199
181 def run(self):
200 def run(self):
182
201
183 while True:
202 while True:
184
203
185 dataOut = self.queue.get()
204 dataOut = self.queue.get()
186
205
187 if not dataOut.error:
206 if not dataOut.error:
188 try:
207 try:
189 BaseClass.run(self, dataOut, **self.kwargs)
208 BaseClass.run(self, dataOut, **self.kwargs)
190 except:
209 except:
191 err = traceback.format_exc()
210 err = traceback.format_exc()
192 log.error(err, self.name)
211 log.error(err, self.name)
193 else:
212 else:
194 break
213 break
195
214
196 self.close()
215 self.close()
197
216
198 def close(self):
217 def close(self):
199
218
200 BaseClass.close(self)
219 BaseClass.close(self)
201 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
220 log.success('Done...(Time:{:4.2f} secs)'.format(time.time() - self.start_time), self.name)
202
221
203 return MPClass
222 return MPClass No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now