##// END OF EJS Templates
Fix bug in external operations queues
jespinoza -
r1379:a64007518dae
parent child
Show More
@@ -1,203 +1,206
1 '''
1 '''
2 Base clases to create Processing units and operations, the MPDecorator
2 Base clases to create Processing units and operations, the MPDecorator
3 must be used in plotting and writing operations to allow to run as an
3 must be used in plotting and writing operations to allow to run as an
4 external process.
4 external process.
5 '''
5 '''
6
6
7 import os
7 import inspect
8 import inspect
8 import zmq
9 import zmq
9 import time
10 import time
10 import pickle
11 import pickle
11 import traceback
12 import traceback
12 from threading import Thread
13 from threading import Thread
13 from multiprocessing import Process, Queue
14 from multiprocessing import Process, Queue
14 from schainpy.utils import log
15 from schainpy.utils import log
15
16
17 QUEUE_SIZE = int(os.environ.get('QUEUE_MAX_SIZE', '10'))
16
18
17 class ProcessingUnit(object):
19 class ProcessingUnit(object):
18 '''
20 '''
19 Base class to create Signal Chain Units
21 Base class to create Signal Chain Units
20 '''
22 '''
21
23
22 proc_type = 'processing'
24 proc_type = 'processing'
23
25
24 def __init__(self):
26 def __init__(self):
25
27
26 self.dataIn = None
28 self.dataIn = None
27 self.dataOut = None
29 self.dataOut = None
28 self.isConfig = False
30 self.isConfig = False
29 self.operations = []
31 self.operations = []
30
32
31 def setInput(self, unit):
33 def setInput(self, unit):
32
34
33 self.dataIn = unit.dataOut
35 self.dataIn = unit.dataOut
34
36
35 def getAllowedArgs(self):
37 def getAllowedArgs(self):
36 if hasattr(self, '__attrs__'):
38 if hasattr(self, '__attrs__'):
37 return self.__attrs__
39 return self.__attrs__
38 else:
40 else:
39 return inspect.getargspec(self.run).args
41 return inspect.getargspec(self.run).args
40
42
41 def addOperation(self, conf, operation):
43 def addOperation(self, conf, operation):
42 '''
44 '''
43 '''
45 '''
44
46
45 self.operations.append((operation, conf.type, conf.getKwargs()))
47 self.operations.append((operation, conf.type, conf.getKwargs()))
46
48
47 def getOperationObj(self, objId):
49 def getOperationObj(self, objId):
48
50
49 if objId not in list(self.operations.keys()):
51 if objId not in list(self.operations.keys()):
50 return None
52 return None
51
53
52 return self.operations[objId]
54 return self.operations[objId]
53
55
54 def call(self, **kwargs):
56 def call(self, **kwargs):
55 '''
57 '''
56 '''
58 '''
57
59
58 try:
60 try:
59 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
61 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
60 return self.dataIn.isReady()
62 return self.dataIn.isReady()
61 elif self.dataIn is None or not self.dataIn.error:
63 elif self.dataIn is None or not self.dataIn.error:
62 self.run(**kwargs)
64 self.run(**kwargs)
63 elif self.dataIn.error:
65 elif self.dataIn.error:
64 self.dataOut.error = self.dataIn.error
66 self.dataOut.error = self.dataIn.error
65 self.dataOut.flagNoData = True
67 self.dataOut.flagNoData = True
66 except:
68 except:
67 err = traceback.format_exc()
69 err = traceback.format_exc()
68 if 'SchainWarning' in err:
70 if 'SchainWarning' in err:
69 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
71 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
70 elif 'SchainError' in err:
72 elif 'SchainError' in err:
71 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name)
73 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name)
72 else:
74 else:
73 log.error(err, self.name)
75 log.error(err, self.name)
74 self.dataOut.error = True
76 self.dataOut.error = True
75
77
76 for op, optype, opkwargs in self.operations:
78 for op, optype, opkwargs in self.operations:
79 aux = self.dataOut.copy()
77 if optype == 'other' and not self.dataOut.flagNoData:
80 if optype == 'other' and not self.dataOut.flagNoData:
78 self.dataOut = op.run(self.dataOut, **opkwargs)
81 self.dataOut = op.run(self.dataOut, **opkwargs)
79 elif optype == 'external' and not self.dataOut.flagNoData:
82 elif optype == 'external' and not self.dataOut.flagNoData:
80 op.queue.put(self.dataOut)
83 op.queue.put(aux)
81 elif optype == 'external' and self.dataOut.error:
84 elif optype == 'external' and self.dataOut.error:
82 op.queue.put(self.dataOut)
85 op.queue.put(aux)
83
86
84 return 'Error' if self.dataOut.error else self.dataOut.isReady()
87 return 'Error' if self.dataOut.error else self.dataOut.isReady()
85
88
86 def setup(self):
89 def setup(self):
87
90
88 raise NotImplementedError
91 raise NotImplementedError
89
92
90 def run(self):
93 def run(self):
91
94
92 raise NotImplementedError
95 raise NotImplementedError
93
96
94 def close(self):
97 def close(self):
95
98
96 return
99 return
97
100
98
101
99 class Operation(object):
102 class Operation(object):
100
103
101 '''
104 '''
102 '''
105 '''
103
106
104 proc_type = 'operation'
107 proc_type = 'operation'
105
108
106 def __init__(self):
109 def __init__(self):
107
110
108 self.id = None
111 self.id = None
109 self.isConfig = False
112 self.isConfig = False
110
113
111 if not hasattr(self, 'name'):
114 if not hasattr(self, 'name'):
112 self.name = self.__class__.__name__
115 self.name = self.__class__.__name__
113
116
114 def getAllowedArgs(self):
117 def getAllowedArgs(self):
115 if hasattr(self, '__attrs__'):
118 if hasattr(self, '__attrs__'):
116 return self.__attrs__
119 return self.__attrs__
117 else:
120 else:
118 return inspect.getargspec(self.run).args
121 return inspect.getargspec(self.run).args
119
122
120 def setup(self):
123 def setup(self):
121
124
122 self.isConfig = True
125 self.isConfig = True
123
126
124 raise NotImplementedError
127 raise NotImplementedError
125
128
126 def run(self, dataIn, **kwargs):
129 def run(self, dataIn, **kwargs):
127 """
130 """
128 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
131 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
129 atributos del objeto dataIn.
132 atributos del objeto dataIn.
130
133
131 Input:
134 Input:
132
135
133 dataIn : objeto del tipo JROData
136 dataIn : objeto del tipo JROData
134
137
135 Return:
138 Return:
136
139
137 None
140 None
138
141
139 Affected:
142 Affected:
140 __buffer : buffer de recepcion de datos.
143 __buffer : buffer de recepcion de datos.
141
144
142 """
145 """
143 if not self.isConfig:
146 if not self.isConfig:
144 self.setup(**kwargs)
147 self.setup(**kwargs)
145
148
146 raise NotImplementedError
149 raise NotImplementedError
147
150
148 def close(self):
151 def close(self):
149
152
150 return
153 return
151
154
152
155
153 def MPDecorator(BaseClass):
156 def MPDecorator(BaseClass):
154 """
157 """
155 Multiprocessing class decorator
158 Multiprocessing class decorator
156
159
157 This function add multiprocessing features to a BaseClass.
160 This function add multiprocessing features to a BaseClass.
158 """
161 """
159
162
160 class MPClass(BaseClass, Process):
163 class MPClass(BaseClass, Process):
161
164
162 def __init__(self, *args, **kwargs):
165 def __init__(self, *args, **kwargs):
163 super(MPClass, self).__init__()
166 super(MPClass, self).__init__()
164 Process.__init__(self)
167 Process.__init__(self)
165
168
166 self.args = args
169 self.args = args
167 self.kwargs = kwargs
170 self.kwargs = kwargs
168 self.t = time.time()
171 self.t = time.time()
169 self.op_type = 'external'
172 self.op_type = 'external'
170 self.name = BaseClass.__name__
173 self.name = BaseClass.__name__
171 self.__doc__ = BaseClass.__doc__
174 self.__doc__ = BaseClass.__doc__
172
175
173 if 'plot' in self.name.lower() and not self.name.endswith('_'):
176 if 'plot' in self.name.lower() and not self.name.endswith('_'):
174 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
177 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
175
178
176 self.start_time = time.time()
179 self.start_time = time.time()
177 self.err_queue = args[3]
180 self.err_queue = args[3]
178 self.queue = Queue(maxsize=1)
181 self.queue = Queue(maxsize=QUEUE_SIZE)
179 self.myrun = BaseClass.run
182 self.myrun = BaseClass.run
180
183
181 def run(self):
184 def run(self):
182
185
183 while True:
186 while True:
184
187
185 dataOut = self.queue.get()
188 dataOut = self.queue.get()
186
189
187 if not dataOut.error:
190 if not dataOut.error:
188 try:
191 try:
189 BaseClass.run(self, dataOut, **self.kwargs)
192 BaseClass.run(self, dataOut, **self.kwargs)
190 except:
193 except:
191 err = traceback.format_exc()
194 err = traceback.format_exc()
192 log.error(err, self.name)
195 log.error(err, self.name)
193 else:
196 else:
194 break
197 break
195
198
196 self.close()
199 self.close()
197
200
198 def close(self):
201 def close(self):
199
202
200 BaseClass.close(self)
203 BaseClass.close(self)
201 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
204 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
202
205
203 return MPClass
206 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now