##// END OF EJS Templates
Fix bug in external operations queues
jespinoza -
r1379:a64007518dae
parent child
Show More
@@ -1,203 +1,206
1 1 '''
2 2 Base clases to create Processing units and operations, the MPDecorator
3 3 must be used in plotting and writing operations to allow to run as an
4 4 external process.
5 5 '''
6 6
7 import os
7 8 import inspect
8 9 import zmq
9 10 import time
10 11 import pickle
11 12 import traceback
12 13 from threading import Thread
13 14 from multiprocessing import Process, Queue
14 15 from schainpy.utils import log
15 16
17 QUEUE_SIZE = int(os.environ.get('QUEUE_MAX_SIZE', '10'))
16 18
17 19 class ProcessingUnit(object):
18 20 '''
19 21 Base class to create Signal Chain Units
20 22 '''
21 23
22 24 proc_type = 'processing'
23 25
24 26 def __init__(self):
25 27
26 28 self.dataIn = None
27 29 self.dataOut = None
28 30 self.isConfig = False
29 31 self.operations = []
30 32
31 33 def setInput(self, unit):
32 34
33 35 self.dataIn = unit.dataOut
34 36
35 37 def getAllowedArgs(self):
36 38 if hasattr(self, '__attrs__'):
37 39 return self.__attrs__
38 40 else:
39 41 return inspect.getargspec(self.run).args
40 42
41 43 def addOperation(self, conf, operation):
42 44 '''
43 45 '''
44 46
45 47 self.operations.append((operation, conf.type, conf.getKwargs()))
46 48
47 49 def getOperationObj(self, objId):
48 50
49 51 if objId not in list(self.operations.keys()):
50 52 return None
51 53
52 54 return self.operations[objId]
53 55
54 56 def call(self, **kwargs):
55 57 '''
56 58 '''
57 59
58 60 try:
59 61 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
60 62 return self.dataIn.isReady()
61 63 elif self.dataIn is None or not self.dataIn.error:
62 64 self.run(**kwargs)
63 65 elif self.dataIn.error:
64 66 self.dataOut.error = self.dataIn.error
65 67 self.dataOut.flagNoData = True
66 68 except:
67 69 err = traceback.format_exc()
68 70 if 'SchainWarning' in err:
69 71 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
70 72 elif 'SchainError' in err:
71 73 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name)
72 74 else:
73 75 log.error(err, self.name)
74 76 self.dataOut.error = True
75 77
76 78 for op, optype, opkwargs in self.operations:
79 aux = self.dataOut.copy()
77 80 if optype == 'other' and not self.dataOut.flagNoData:
78 81 self.dataOut = op.run(self.dataOut, **opkwargs)
79 82 elif optype == 'external' and not self.dataOut.flagNoData:
80 op.queue.put(self.dataOut)
83 op.queue.put(aux)
81 84 elif optype == 'external' and self.dataOut.error:
82 op.queue.put(self.dataOut)
85 op.queue.put(aux)
83 86
84 87 return 'Error' if self.dataOut.error else self.dataOut.isReady()
85 88
86 89 def setup(self):
87 90
88 91 raise NotImplementedError
89 92
90 93 def run(self):
91 94
92 95 raise NotImplementedError
93 96
94 97 def close(self):
95 98
96 99 return
97 100
98 101
99 102 class Operation(object):
100 103
101 104 '''
102 105 '''
103 106
104 107 proc_type = 'operation'
105 108
106 109 def __init__(self):
107 110
108 111 self.id = None
109 112 self.isConfig = False
110 113
111 114 if not hasattr(self, 'name'):
112 115 self.name = self.__class__.__name__
113 116
114 117 def getAllowedArgs(self):
115 118 if hasattr(self, '__attrs__'):
116 119 return self.__attrs__
117 120 else:
118 121 return inspect.getargspec(self.run).args
119 122
120 123 def setup(self):
121 124
122 125 self.isConfig = True
123 126
124 127 raise NotImplementedError
125 128
126 129 def run(self, dataIn, **kwargs):
127 130 """
128 131 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
129 132 atributos del objeto dataIn.
130 133
131 134 Input:
132 135
133 136 dataIn : objeto del tipo JROData
134 137
135 138 Return:
136 139
137 140 None
138 141
139 142 Affected:
140 143 __buffer : buffer de recepcion de datos.
141 144
142 145 """
143 146 if not self.isConfig:
144 147 self.setup(**kwargs)
145 148
146 149 raise NotImplementedError
147 150
148 151 def close(self):
149 152
150 153 return
151 154
152 155
153 156 def MPDecorator(BaseClass):
154 157 """
155 158 Multiprocessing class decorator
156 159
157 160 This function add multiprocessing features to a BaseClass.
158 161 """
159 162
160 163 class MPClass(BaseClass, Process):
161 164
162 165 def __init__(self, *args, **kwargs):
163 166 super(MPClass, self).__init__()
164 167 Process.__init__(self)
165 168
166 169 self.args = args
167 170 self.kwargs = kwargs
168 171 self.t = time.time()
169 172 self.op_type = 'external'
170 173 self.name = BaseClass.__name__
171 174 self.__doc__ = BaseClass.__doc__
172 175
173 176 if 'plot' in self.name.lower() and not self.name.endswith('_'):
174 177 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
175 178
176 179 self.start_time = time.time()
177 180 self.err_queue = args[3]
178 self.queue = Queue(maxsize=1)
181 self.queue = Queue(maxsize=QUEUE_SIZE)
179 182 self.myrun = BaseClass.run
180 183
181 184 def run(self):
182 185
183 186 while True:
184 187
185 188 dataOut = self.queue.get()
186 189
187 190 if not dataOut.error:
188 191 try:
189 192 BaseClass.run(self, dataOut, **self.kwargs)
190 193 except:
191 194 err = traceback.format_exc()
192 195 log.error(err, self.name)
193 196 else:
194 197 break
195 198
196 199 self.close()
197 200
198 201 def close(self):
199 202
200 203 BaseClass.close(self)
201 204 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
202 205
203 206 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now