##// END OF EJS Templates
cambios de prueba
José Chávez -
r890:0078ae766a30
parent child
Show More
@@ -0,0 +1,11
1 ## ROADMAP
2 ## SCHAIN BRANCHES
3
4 ### BRANCH - SCHAIN_MP
5
6 * Revisar si funciona con varios publishers.
7 * Revisar xRange y reinicialización de gráfico.
8 * Grabar cada spectra independientemente.
9 * Agregar kwargs al init
10 * Agregar gráficos restantes
11 * Presentación
1 NO CONTENT: modified file
@@ -1,299 +1,301
1 1 '''
2 2
3 3 $Author: murco $
4 4 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
5 5 '''
6 6
7 7 class ProcessingUnit(object):
8 8
9 9 """
10 10 Esta es la clase base para el procesamiento de datos.
11 11
12 12 Contiene el metodo "call" para llamar operaciones. Las operaciones pueden ser:
13 13 - Metodos internos (callMethod)
14 14 - Objetos del tipo Operation (callObject). Antes de ser llamados, estos objetos
15 15 tienen que ser agreagados con el metodo "add".
16 16
17 17 """
18 18 # objeto de datos de entrada (Voltage, Spectra o Correlation)
19 19 dataIn = None
20 20 dataInList = []
21 21
22 22 # objeto de datos de entrada (Voltage, Spectra o Correlation)
23 23 dataOut = None
24 24
25 25 operations2RunDict = None
26 26
27 27 isConfig = False
28 28
29 29
30 30 def __init__(self, *args, **kwargs):
31 31
32 32 self.dataIn = None
33 33 self.dataInList = []
34 34
35 35 self.dataOut = None
36 36
37 37 self.operations2RunDict = {}
38 38
39 39 self.isConfig = False
40 40
41 41 self.args = args
42 # if (kwargs):
43 # self.kwargs = kwargs
42 44 self.kwargs = kwargs
43 45
44 46 def addOperation(self, opObj, objId):
45 47
46 48 """
47 49 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
48 50 identificador asociado a este objeto.
49 51
50 52 Input:
51 53
52 54 object : objeto de la clase "Operation"
53 55
54 56 Return:
55 57
56 58 objId : identificador del objeto, necesario para ejecutar la operacion
57 59 """
58 60
59 61 self.operations2RunDict[objId] = opObj
60 62
61 63 return objId
62 64
63 65 def getOperationObj(self, objId):
64 66
65 67 if objId not in self.operations2RunDict.keys():
66 68 return None
67 69
68 70 return self.operations2RunDict[objId]
69 71
70 72 def operation(self, **kwargs):
71 73
72 74 """
73 75 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
74 76 atributos del objeto dataOut
75 77
76 78 Input:
77 79
78 80 **kwargs : Diccionario de argumentos de la funcion a ejecutar
79 81 """
80 82
81 83 raise NotImplementedError
82 84
83 85 def callMethod(self, name, **kwargs):
84 86
85 87 """
86 88 Ejecuta el metodo con el nombre "name" y con argumentos **kwargs de la propia clase.
87 89
88 90 Input:
89 91 name : nombre del metodo a ejecutar
90 92
91 93 **kwargs : diccionario con los nombres y valores de la funcion a ejecutar.
92 94
93 95 """
94 96
95 97 #Checking the inputs
96 98 if name == 'run':
97 99
98 100 if not self.checkInputs():
99 101 self.dataOut.flagNoData = True
100 102 return False
101 103 else:
102 104 #Si no es un metodo RUN la entrada es la misma dataOut (interna)
103 105 if self.dataOut.isEmpty():
104 106 return False
105 107
106 108 #Getting the pointer to method
107 109 methodToCall = getattr(self, name)
108 110
109 111 #Executing the self method
110 112
111 113 if hasattr(self, 'mp'):
112 114 if self.mp is False:
113 115 self.mp = True
114 116 self.start()
115 117 else:
116 118 methodToCall(**kwargs)
117 119
118 120 if self.dataOut is None:
119 121 return False
120 122
121 123 if self.dataOut.isEmpty():
122 124 return False
123 125
124 126 return True
125 127
126 128 def callObject(self, objId):
127 129
128 130 """
129 131 Ejecuta la operacion asociada al identificador del objeto "objId"
130 132
131 133 Input:
132 134
133 135 objId : identificador del objeto a ejecutar
134 136
135 137 **kwargs : diccionario con los nombres y valores de la funcion a ejecutar.
136 138
137 139 Return:
138 140
139 141 None
140 142 """
141 143
142 144 if self.dataOut is not None and self.dataOut.isEmpty():
143 145 return False
144 146
145 147 externalProcObj = self.operations2RunDict[objId]
146 148
147 149 if hasattr(externalProcObj, 'mp'):
148 150 if externalProcObj.mp is False:
149 151 externalProcObj.mp = True
150 152 externalProcObj.start()
151 153 else:
152 154 externalProcObj.run(self.dataOut, **externalProcObj.kwargs)
153 155
154 156 return True
155 157
156 158 def call(self, opType, opName=None, opId=None):
157 159
158 160 """
159 161 Return True si ejecuta la operacion interna nombrada "opName" o la operacion externa
160 162 identificada con el id "opId"; con los argumentos "**kwargs".
161 163
162 164 False si la operacion no se ha ejecutado.
163 165
164 166 Input:
165 167
166 168 opType : Puede ser "self" o "external"
167 169
168 170 Depende del tipo de operacion para llamar a:callMethod or callObject:
169 171
170 172 1. If opType = "self": Llama a un metodo propio de esta clase:
171 173
172 174 name_method = getattr(self, name)
173 175 name_method(**kwargs)
174 176
175 177
176 178 2. If opType = "other" o"external": Llama al metodo "run()" de una instancia de la
177 179 clase "Operation" o de un derivado de ella:
178 180
179 181 instanceName = self.operationList[opId]
180 182 instanceName.run(**kwargs)
181 183
182 184 opName : Si la operacion es interna (opType = 'self'), entonces el "opName" sera
183 185 usada para llamar a un metodo interno de la clase Processing
184 186
185 187 opId : Si la operacion es externa (opType = 'other' o 'external), entonces el
186 188 "opId" sera usada para llamar al metodo "run" de la clase Operation
187 189 registrada anteriormente con ese Id
188 190
189 191 Exception:
190 192 Este objeto de tipo Operation debe de haber sido agregado antes con el metodo:
191 193 "addOperation" e identificado con el valor "opId" = el id de la operacion.
192 194 De lo contrario retornara un error del tipo ValueError
193 195
194 196 """
195 197
196 198 if opType == 'self':
197 199
198 200 if not opName:
199 201 raise ValueError, "opName parameter should be defined"
200 202
201 203 sts = self.callMethod(opName, **self.kwargs)
202 204
203 205 elif opType == 'other' or opType == 'external' or opType == 'plotter':
204 206
205 207 if not opId:
206 208 raise ValueError, "opId parameter should be defined"
207 209
208 210 if opId not in self.operations2RunDict.keys():
209 211 raise ValueError, "Any operation with id=%s has been added" %str(opId)
210 212
211 213 sts = self.callObject(opId)
212 214
213 215 else:
214 216 raise ValueError, "opType should be 'self', 'external' or 'plotter'; and not '%s'" %opType
215 217
216 218 return sts
217 219
218 220 def setInput(self, dataIn):
219 221
220 222 self.dataIn = dataIn
221 223 self.dataInList.append(dataIn)
222 224
223 225 def getOutputObj(self):
224 226
225 227 return self.dataOut
226 228
227 229 def checkInputs(self):
228 230
229 231 for thisDataIn in self.dataInList:
230 232
231 233 if thisDataIn.isEmpty():
232 234 return False
233 235
234 236 return True
235 237
236 238 def setup(self):
237 239
238 240 raise NotImplementedError
239 241
240 242 def run(self):
241 243
242 244 raise NotImplementedError
243 245
244 246 def close(self):
245 247 #Close every thread, queue or any other object here is it is neccesary.
246 248 return
247 249
248 250 class Operation(object):
249 251
250 252 """
251 253 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
252 254 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
253 255 acumulacion dentro de esta clase
254 256
255 257 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
256 258
257 259 """
258 260
259 261 __buffer = None
260 262 isConfig = False
261 263
262 264 def __init__(self, **kwargs):
263 265
264 266 self.__buffer = None
265 267 self.isConfig = False
266 268 self.kwargs = kwargs
267 269
268 270 def setup(self):
269 271
270 272 self.isConfig = True
271 273
272 274 raise NotImplementedError
273 275
274 276 def run(self, dataIn, **kwargs):
275 277
276 278 """
277 279 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
278 280 atributos del objeto dataIn.
279 281
280 282 Input:
281 283
282 284 dataIn : objeto del tipo JROData
283 285
284 286 Return:
285 287
286 288 None
287 289
288 290 Affected:
289 291 __buffer : buffer de recepcion de datos.
290 292
291 293 """
292 294 if not self.isConfig:
293 295 self.setup(**kwargs)
294 296
295 297 raise NotImplementedError
296 298
297 299 def close(self):
298 300
299 301 pass
1 NO CONTENT: modified file
@@ -1,77 +1,75
1 1 #!/usr/bin/env python
2 2 '''
3 3 Created on Jul 7, 2014
4 4
5 5 @author: roj-idl71
6 6 '''
7 7 import os, sys
8 8 from datetime import datetime, timedelta
9 9 import multiprocessing
10 10 from schainpy.controller import Project
11 11
12 12 def main(date):
13 13
14 14 controllerObj = Project()
15 15
16 16 controllerObj.setup(id = '191', name='test01', description='')
17 17
18 18 readUnitConfObj = controllerObj.addReadUnit(datatype='Spectra',
19 path='/data/workspace/data/zeus/',
19 path='/home/nanosat/data/zeus',
20 20 startDate=date,
21 21 endDate=date,
22 22 startTime='00:00:00',
23 23 endTime='23:59:59',
24 24 online=0,
25 25 walk=1,
26 26 expLabel='')
27 27
28 28 procUnitConfObj1 = controllerObj.addProcUnit(datatype='Spectra', inputId=readUnitConfObj.getId())
29 29 #opObj11 = procUnitConfObj1.addOperation(name='removeDC')
30 30 #opObj11.addParameter(name='mode', value='1', format='int')
31 31
32 32 #opObj11 = procUnitConfObj1.addOperation(name='removeInterference')
33 33
34 34
35 35 # opObj11 = procUnitConfObj1.addOperation(name='RTIPlot', optype='other')
36 36 # opObj11.addParameter(name='id', value='10', format='int')
37 37 # opObj11.addParameter(name='wintitle', value='150Km', format='str')
38 38 # opObj11.addParameter(name='colormap', value='jro', format='str')
39 39 # opObj11.addParameter(name='xaxis', value='time', format='str')
40 40 # opObj11.addParameter(name='xmin', value='0', format='int')
41 41 # opObj11.addParameter(name='xmax', value='23', format='int')
42 42 # #opObj11.addParameter(name='ymin', value='100', format='int')
43 43 # #opObj11.addParameter(name='ymax', value='150', format='int')
44 44 # opObj11.addParameter(name='zmin', value='10', format='int')
45 45 # opObj11.addParameter(name='zmax', value='35', format='int')
46 46
47 47
48 48
49 49
50 50 opObj11 = procUnitConfObj1.addOperation(name='PlotRTIData', optype='other')
51 51 opObj11.addParameter(name='id', value='12', format='int')
52 52 opObj11.addParameter(name='wintitle', value='150Km', format='str')
53 53 opObj11.addParameter(name='colormap', value='jro', format='str')
54 54 opObj11.addParameter(name='xaxis', value='time', format='str')
55 55 opObj11.addParameter(name='xmin', value='0', format='int')
56 56 opObj11.addParameter(name='xmax', value='23', format='int')
57 57 #opObj11.addParameter(name='ymin', value='100', format='int')
58 58 #opObj11.addParameter(name='ymax', value='150', format='int')
59 59 opObj11.addParameter(name='zmin', value='10', format='int')
60 60 opObj11.addParameter(name='zmax', value='35', format='int')
61 61 #opObj11.addParameter(name='pause', value='1', format='bool')
62 62 opObj11.addParameter(name='show', value='0', format='bool')
63 63 opObj11.addParameter(name='save', value='/tmp', format='str')
64 64
65 65
66 66 controllerObj.start()
67 67
68 68 if __name__=='__main__':
69 69
70 70 dt = datetime(2017, 1, 12)
71 71
72 72 dates = [(dt+timedelta(x)).strftime('%Y/%m/%d') for x in range(20)]
73 73
74 74 p = multiprocessing.Pool(4)
75 75 p.map(main, dates)
76
77 No newline at end of file
1 NO CONTENT: modified file
General Comments 0
You need to be logged in to leave comments. Login now