##// END OF EJS Templates
cambios de prueba
José Chávez -
r890:0078ae766a30
parent child
Show More
@@ -0,0 +1,11
1 ## ROADMAP
2 ## SCHAIN BRANCHES
3
4 ### BRANCH - SCHAIN_MP
5
6 * Revisar si funciona con varios publishers.
7 * Revisar xRange y reinicialización de gráfico.
8 * Grabar cada spectra independientemente.
9 * Agregar kwargs al init
10 * Agregar gráficos restantes
11 * Presentación
@@ -1,376 +1,376
1 1
2 2 import os
3 3 import zmq
4 4 import time
5 5 import numpy
6 6 import datetime
7 7 import numpy as np
8 8 import matplotlib.pyplot as plt
9 9 from mpl_toolkits.axes_grid1 import make_axes_locatable
10 10 from matplotlib.ticker import FuncFormatter, LinearLocator
11 11 from multiprocessing import Process
12 12
13 13 from schainpy.model.proc.jroproc_base import Operation
14 14
15 15 #plt.ion()
16 16
17 17 func = lambda x, pos: ('%s') %(datetime.datetime.utcfromtimestamp(x).strftime('%H:%M'))
18 18
19 19 d1970 = datetime.datetime(1970,1,1)
20 20
21 21 class PlotData(Operation, Process):
22 22
23 23 CODE = 'Figure'
24 24 colormap = 'jet'
25 25 __MAXNUMX = 80
26 26 __MAXNUMY = 80
27 27 __missing = 1E30
28 28
29 29 def __init__(self, **kwargs):
30 30
31 31 Operation.__init__(self)
32 32 Process.__init__(self)
33 33 self.mp = False
34 34 self.dataOut = None
35 35 self.isConfig = False
36 36 self.figure = None
37 37 self.axes = []
38 38 self.localtime = kwargs.pop('localtime', True)
39 39 self.show = kwargs.get('show', True)
40 40 self.save = kwargs.get('save', False)
41 41 self.colormap = kwargs.get('colormap', self.colormap)
42 42 self.showprofile = kwargs.get('showprofile', False)
43 43 self.title = kwargs.get('wintitle', '')
44 44 self.xaxis = kwargs.get('xaxis', 'time')
45 45 self.zmin = kwargs.get('zmin', None)
46 46 self.zmax = kwargs.get('zmax', None)
47 47 self.xmin = kwargs.get('xmin', None)
48 48 self.xmax = kwargs.get('xmax', None)
49 49 self.xrange = kwargs.get('xrange', 24)
50 50 self.ymin = kwargs.get('ymin', None)
51 51 self.ymax = kwargs.get('ymax', None)
52 52
53 53 def fill_gaps(self, x_buffer, y_buffer, z_buffer):
54 54
55 55 if x_buffer.shape[0] < 2:
56 56 return x_buffer, y_buffer, z_buffer
57 57
58 58 deltas = x_buffer[1:] - x_buffer[0:-1]
59 59 x_median = np.median(deltas)
60 60
61 61 index = np.where(deltas > 5*x_median)
62 62
63 63 if len(index[0]) != 0:
64 64 z_buffer[::,index[0],::] = self.__missing
65 65 z_buffer = np.ma.masked_inside(z_buffer,
66 66 0.99*self.__missing,
67 67 1.01*self.__missing)
68 68
69 69 return x_buffer, y_buffer, z_buffer
70 70
71 71 def decimate(self):
72 72
73 73 dx = int(len(self.x)/self.__MAXNUMX) + 1
74 74 dy = int(len(self.y)/self.__MAXNUMY) + 1
75 75
76 76 x = self.x[::dx]
77 77 y = self.y[::dy]
78 78 z = self.z[::, ::dx, ::dy]
79 79
80 80 return x, y, z
81 81
82 82 def __plot(self):
83 83
84 84 print 'plotting...{}'.format(self.CODE)
85 85
86 86 self.plot()
87 87 self.figure.suptitle('{} {}'.format(self.title, self.CODE.upper()))
88 88
89 89 if self.save:
90 90 figname = os.path.join(self.save, '{}_{}.png'.format(self.CODE,
91 91 datetime.datetime.utcfromtimestamp(self.times[-1]).strftime('%y%m%d_%H%M%S')))
92 92 print 'Saving figure: {}'.format(figname)
93 93 self.figure.savefig(figname)
94 94
95 95 self.figure.canvas.draw()
96 96
97 97 def plot(self):
98 98
99 99 print 'plotting...{}'.format(self.CODE.upper())
100 100 return
101 101
102 102 def run(self):
103 103
104 104 print '[Starting] {}'.format(self.name)
105 105 context = zmq.Context()
106 106 receiver = context.socket(zmq.SUB)
107 107 receiver.setsockopt(zmq.SUBSCRIBE, '')
108 108 receiver.setsockopt(zmq.CONFLATE, True)
109 109 receiver.connect("ipc:///tmp/zmq.plots")
110 110
111 111 while True:
112 112 try:
113 113 #if True:
114 114 self.data = receiver.recv_pyobj(flags=zmq.NOBLOCK)
115 115 self.dataOut = self.data['dataOut']
116 116 self.times = self.data['times']
117 117 self.times.sort()
118 118 self.min_time = self.times[0]
119 119 self.max_time = self.times[-1]
120 120
121 121 if self.isConfig is False:
122 122 self.setup()
123 123 self.isConfig = True
124 124
125 125 self.__plot()
126 126
127 127 if 'ENDED' in self.data:
128 #self.setup()
129 #self.__plot()
128 # self.setup()
129 # self.__plot()
130 130 pass
131 131
132 132 except zmq.Again as e:
133 133 print 'Waiting for data...'
134 134 plt.pause(5)
135 135 #time.sleep(3)
136 136
137 137 def close(self):
138 138 if self.dataOut:
139 139 self._plot()
140 140
141 141
142 142 class PlotSpectraData(PlotData):
143 143
144 144 CODE = 'spc'
145 145 colormap = 'jro'
146 146
147 147 def setup(self):
148 148
149 149 ncolspan = 1
150 150 colspan = 1
151 151 self.ncols = int(numpy.sqrt(self.dataOut.nChannels)+0.9)
152 152 self.nrows = int(self.dataOut.nChannels*1./self.ncols + 0.9)
153 153 self.width = 3.6*self.ncols
154 154 self.height = 3.2*self.nrows
155 155 if self.showprofile:
156 156 ncolspan = 3
157 157 colspan = 2
158 158 self.width += 1.2*self.ncols
159 159
160 160 self.ylabel = 'Range [Km]'
161 161 self.titles = ['Channel {}'.format(x) for x in self.dataOut.channelList]
162 162
163 163 if self.figure is None:
164 164 self.figure = plt.figure(figsize=(self.width, self.height),
165 165 edgecolor='k',
166 166 facecolor='w')
167 167 else:
168 168 self.figure.clf()
169 169
170 170 n = 0
171 171 for y in range(self.nrows):
172 172 for x in range(self.ncols):
173 173 if n>=self.dataOut.nChannels:
174 174 break
175 175 ax = plt.subplot2grid((self.nrows, self.ncols*ncolspan), (y, x*ncolspan), 1, colspan)
176 176 if self.showprofile:
177 177 ax.ax_profile = plt.subplot2grid((self.nrows, self.ncols*ncolspan), (y, x*ncolspan+colspan), 1, 1)
178 178
179 179 ax.firsttime = True
180 180 self.axes.append(ax)
181 181 n += 1
182 182
183 183 self.figure.subplots_adjust(wspace=0.9, hspace=0.5)
184 184 self.figure.show()
185 185
186 186 def plot(self):
187 187
188 188 if self.xaxis == "frequency":
189 189 x = self.dataOut.getFreqRange(1)/1000.
190 190 xlabel = "Frequency (kHz)"
191 191 elif self.xaxis == "time":
192 192 x = self.dataOut.getAcfRange(1)
193 193 xlabel = "Time (ms)"
194 194 else:
195 195 x = self.dataOut.getVelRange(1)
196 196 xlabel = "Velocity (m/s)"
197 197
198 198 y = self.dataOut.getHeiRange()
199 199 z = self.data[self.CODE]
200 200
201 201 for n, ax in enumerate(self.axes):
202 202
203 203 if ax.firsttime:
204 204 self.xmax = self.xmax if self.xmax else np.nanmax(x)
205 205 self.xmin = self.xmin if self.xmin else -self.xmax
206 206 self.ymin = self.ymin if self.ymin else np.nanmin(y)
207 207 self.ymax = self.ymax if self.ymax else np.nanmax(y)
208 208 self.zmin = self.zmin if self.zmin else np.nanmin(z)
209 209 self.zmax = self.zmax if self.zmax else np.nanmax(z)
210 210 ax.plot = ax.pcolormesh(x, y, z[n].T,
211 211 vmin=self.zmin,
212 212 vmax=self.zmax,
213 213 cmap=plt.get_cmap(self.colormap)
214 214 )
215 215 divider = make_axes_locatable(ax)
216 216 cax = divider.new_horizontal(size='3%', pad=0.05)
217 217 self.figure.add_axes(cax)
218 218 plt.colorbar(ax.plot, cax)
219 219
220 220 ax.set_xlim(self.xmin, self.xmax)
221 221 ax.set_ylim(self.ymin, self.ymax)
222 222
223 223 ax.xaxis.set_major_locator(LinearLocator(5))
224 224 #ax.yaxis.set_major_locator(LinearLocator(4))
225 225
226 226 ax.set_ylabel(self.ylabel)
227 227 ax.set_xlabel(xlabel)
228 228
229 229 ax.firsttime = False
230 230
231 231 if self.showprofile:
232 232 ax.plot_profile= ax.ax_profile.plot(self.data['rti'][self.max_time][n], y)[0]
233 233 ax.ax_profile.set_xlim(self.zmin, self.zmax)
234 234 ax.ax_profile.set_ylim(self.ymin, self.ymax)
235 235 ax.ax_profile.set_xlabel('dB')
236 236 ax.ax_profile.grid(b=True, axis='x')
237 237 [tick.set_visible(False) for tick in ax.ax_profile.get_yticklabels()]
238 238 noise = 10*numpy.log10(self.data['rti'][self.max_time][n]/self.dataOut.normFactor)
239 239 ax.ax_profile.vlines(noise, self.ymin, self.ymax, colors="k", linestyle="dashed", lw=2)
240 240 else:
241 241 ax.plot.set_array(z[n].T.ravel())
242 242 ax.set_title('{} {}'.format(self.titles[n],
243 243 datetime.datetime.utcfromtimestamp(self.max_time).strftime('%y/%m/%d %H:%M:%S')),
244 244 size=8)
245 245 if self.showprofile:
246 246 ax.plot_profile.set_data(self.data['rti'][self.max_time][n], y)
247 247
248 248
249 249 class PlotRTIData(PlotData):
250 250
251 251 CODE = 'rti'
252 252 colormap = 'jro'
253 253
254 254 def setup(self):
255 255
256 256 self.ncols = 1
257 257 self.nrows = self.dataOut.nChannels
258 258 self.width = 10
259 259 self.height = 2.2*self.nrows
260 260 self.ylabel = 'Range [Km]'
261 261 self.titles = ['Channel {}'.format(x) for x in self.dataOut.channelList]
262 262
263 263 if self.figure is None:
264 264 self.figure = plt.figure(figsize=(self.width, self.height),
265 265 edgecolor='k',
266 266 facecolor='w')
267 267 else:
268 268 self.figure.clf()
269 269
270 270 for n in range(self.nrows):
271 271 ax = self.figure.add_subplot(self.nrows, self.ncols, n+1)
272 272 ax.firsttime = True
273 273 self.axes.append(ax)
274 274
275 275 self.figure.subplots_adjust(hspace=0.5)
276 276 self.figure.show()
277 277
278 278 def plot(self):
279 279
280 280 self.x = np.array(self.times)
281 281 self.y = self.dataOut.getHeiRange()
282 282 self.z = []
283 283
284 284 for ch in range(self.nrows):
285 285 self.z.append([self.data[self.CODE][t][ch] for t in self.times])
286 286
287 287 self.z = np.array(self.z)
288 288
289 289 for n, ax in enumerate(self.axes):
290 290
291 291 x, y, z = self.fill_gaps(*self.decimate())
292 292
293 293 if ax.firsttime:
294 294 self.ymin = self.ymin if self.ymin else np.nanmin(self.y)
295 295 self.ymax = self.ymax if self.ymax else np.nanmax(self.y)
296 296 self.zmin = self.zmin if self.zmin else np.nanmin(self.z)
297 297 zmax = self.zmax if self.zmax else np.nanmax(self.z)
298 298 plot = ax.pcolormesh(x, y, z[n].T,
299 299 vmin=self.zmin,
300 300 vmax=self.zmax,
301 301 cmap=plt.get_cmap(self.colormap)
302 302 )
303 303 divider = make_axes_locatable(ax)
304 304 cax = divider.new_horizontal(size='2%', pad=0.05)
305 305 self.figure.add_axes(cax)
306 306 plt.colorbar(plot, cax)
307 307 ax.set_ylim(self.ymin, self.ymax)
308 308 if self.xaxis=='time':
309 309 ax.xaxis.set_major_formatter(FuncFormatter(func))
310 310 ax.xaxis.set_major_locator(LinearLocator(6))
311 311
312 312 ax.yaxis.set_major_locator(LinearLocator(4))
313 313
314 314 ax.set_ylabel(self.ylabel)
315 315
316 316 if self.xmin is None:
317 317 print 'is none'
318 318 xmin = self.min_time
319 319 else:
320 320
321 321 xmin = (datetime.datetime.combine(self.dataOut.datatime.date(),
322 322 datetime.time(self.xmin, 0, 0))-d1970).total_seconds()
323 323
324 324 xmax = xmin+self.xrange*60*60
325 325
326 326 ax.set_xlim(xmin, xmax)
327 327 ax.firsttime = False
328 328 else:
329 329 ax.collections.remove(ax.collections[0])
330 330 plot = ax.pcolormesh(x, y, z[n].T,
331 331 vmin=self.zmin,
332 332 vmax=self.zmax,
333 333 cmap=plt.get_cmap(self.colormap)
334 334 )
335 335 ax.set_title('{} {}'.format(self.titles[n],
336 336 datetime.datetime.utcfromtimestamp(self.max_time).strftime('%y/%m/%d %H:%M:%S')),
337 337 size=8)
338 338
339 339
340 340 class PlotCOHData(PlotRTIData):
341 341
342 342 CODE = 'coh'
343 343
344 344 def setup(self):
345 345
346 346 self.ncols = 1
347 347 self.nrows = self.dataOut.nPairs
348 348 self.width = 10
349 349 self.height = 2.2*self.nrows
350 350 self.ylabel = 'Range [Km]'
351 351 self.titles = ['Channels {}'.format(x) for x in self.dataOut.pairsList]
352 352
353 353 if self.figure is None:
354 354 self.figure = plt.figure(figsize=(self.width, self.height),
355 355 edgecolor='k',
356 356 facecolor='w')
357 357 else:
358 358 self.figure.clf()
359 359
360 360 for n in range(self.nrows):
361 361 ax = self.figure.add_subplot(self.nrows, self.ncols, n+1)
362 362 ax.firsttime = True
363 363 self.axes.append(ax)
364 364
365 365 self.figure.subplots_adjust(hspace=0.5)
366 366 self.figure.show()
367 367
368 368 class PlotSNRData(PlotRTIData):
369 369
370 370 CODE = 'coh'
371 371
372 372
373 373 class PlotPHASEData(PlotCOHData):
374 374
375 375 CODE = 'phase'
376 376 colormap = 'seismic'
@@ -1,299 +1,301
1 1 '''
2 2
3 3 $Author: murco $
4 4 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
5 5 '''
6 6
7 7 class ProcessingUnit(object):
8 8
9 9 """
10 10 Esta es la clase base para el procesamiento de datos.
11 11
12 12 Contiene el metodo "call" para llamar operaciones. Las operaciones pueden ser:
13 13 - Metodos internos (callMethod)
14 14 - Objetos del tipo Operation (callObject). Antes de ser llamados, estos objetos
15 15 tienen que ser agreagados con el metodo "add".
16 16
17 17 """
18 18 # objeto de datos de entrada (Voltage, Spectra o Correlation)
19 19 dataIn = None
20 20 dataInList = []
21 21
22 22 # objeto de datos de entrada (Voltage, Spectra o Correlation)
23 23 dataOut = None
24 24
25 25 operations2RunDict = None
26 26
27 27 isConfig = False
28 28
29 29
30 30 def __init__(self, *args, **kwargs):
31 31
32 32 self.dataIn = None
33 33 self.dataInList = []
34 34
35 35 self.dataOut = None
36 36
37 37 self.operations2RunDict = {}
38 38
39 39 self.isConfig = False
40 40
41 41 self.args = args
42 # if (kwargs):
43 # self.kwargs = kwargs
42 44 self.kwargs = kwargs
43 45
44 46 def addOperation(self, opObj, objId):
45 47
46 48 """
47 49 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
48 50 identificador asociado a este objeto.
49 51
50 52 Input:
51 53
52 54 object : objeto de la clase "Operation"
53 55
54 56 Return:
55 57
56 58 objId : identificador del objeto, necesario para ejecutar la operacion
57 59 """
58 60
59 61 self.operations2RunDict[objId] = opObj
60 62
61 63 return objId
62 64
63 65 def getOperationObj(self, objId):
64 66
65 67 if objId not in self.operations2RunDict.keys():
66 68 return None
67 69
68 70 return self.operations2RunDict[objId]
69 71
70 72 def operation(self, **kwargs):
71 73
72 74 """
73 75 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
74 76 atributos del objeto dataOut
75 77
76 78 Input:
77 79
78 80 **kwargs : Diccionario de argumentos de la funcion a ejecutar
79 81 """
80 82
81 83 raise NotImplementedError
82 84
83 85 def callMethod(self, name, **kwargs):
84 86
85 87 """
86 88 Ejecuta el metodo con el nombre "name" y con argumentos **kwargs de la propia clase.
87 89
88 90 Input:
89 91 name : nombre del metodo a ejecutar
90 92
91 93 **kwargs : diccionario con los nombres y valores de la funcion a ejecutar.
92 94
93 95 """
94 96
95 97 #Checking the inputs
96 98 if name == 'run':
97 99
98 100 if not self.checkInputs():
99 101 self.dataOut.flagNoData = True
100 102 return False
101 103 else:
102 104 #Si no es un metodo RUN la entrada es la misma dataOut (interna)
103 105 if self.dataOut.isEmpty():
104 106 return False
105 107
106 108 #Getting the pointer to method
107 109 methodToCall = getattr(self, name)
108 110
109 111 #Executing the self method
110 112
111 113 if hasattr(self, 'mp'):
112 114 if self.mp is False:
113 115 self.mp = True
114 116 self.start()
115 117 else:
116 118 methodToCall(**kwargs)
117 119
118 120 if self.dataOut is None:
119 121 return False
120 122
121 123 if self.dataOut.isEmpty():
122 124 return False
123 125
124 126 return True
125 127
126 128 def callObject(self, objId):
127 129
128 130 """
129 131 Ejecuta la operacion asociada al identificador del objeto "objId"
130 132
131 133 Input:
132 134
133 135 objId : identificador del objeto a ejecutar
134 136
135 137 **kwargs : diccionario con los nombres y valores de la funcion a ejecutar.
136 138
137 139 Return:
138 140
139 141 None
140 142 """
141 143
142 144 if self.dataOut is not None and self.dataOut.isEmpty():
143 145 return False
144 146
145 147 externalProcObj = self.operations2RunDict[objId]
146 148
147 149 if hasattr(externalProcObj, 'mp'):
148 150 if externalProcObj.mp is False:
149 151 externalProcObj.mp = True
150 152 externalProcObj.start()
151 153 else:
152 154 externalProcObj.run(self.dataOut, **externalProcObj.kwargs)
153 155
154 156 return True
155 157
156 158 def call(self, opType, opName=None, opId=None):
157 159
158 160 """
159 161 Return True si ejecuta la operacion interna nombrada "opName" o la operacion externa
160 162 identificada con el id "opId"; con los argumentos "**kwargs".
161 163
162 164 False si la operacion no se ha ejecutado.
163 165
164 166 Input:
165 167
166 168 opType : Puede ser "self" o "external"
167 169
168 170 Depende del tipo de operacion para llamar a:callMethod or callObject:
169 171
170 172 1. If opType = "self": Llama a un metodo propio de esta clase:
171 173
172 174 name_method = getattr(self, name)
173 175 name_method(**kwargs)
174 176
175 177
176 178 2. If opType = "other" o"external": Llama al metodo "run()" de una instancia de la
177 179 clase "Operation" o de un derivado de ella:
178 180
179 181 instanceName = self.operationList[opId]
180 182 instanceName.run(**kwargs)
181 183
182 184 opName : Si la operacion es interna (opType = 'self'), entonces el "opName" sera
183 185 usada para llamar a un metodo interno de la clase Processing
184 186
185 187 opId : Si la operacion es externa (opType = 'other' o 'external), entonces el
186 188 "opId" sera usada para llamar al metodo "run" de la clase Operation
187 189 registrada anteriormente con ese Id
188 190
189 191 Exception:
190 192 Este objeto de tipo Operation debe de haber sido agregado antes con el metodo:
191 193 "addOperation" e identificado con el valor "opId" = el id de la operacion.
192 194 De lo contrario retornara un error del tipo ValueError
193 195
194 196 """
195 197
196 198 if opType == 'self':
197 199
198 200 if not opName:
199 201 raise ValueError, "opName parameter should be defined"
200 202
201 203 sts = self.callMethod(opName, **self.kwargs)
202 204
203 205 elif opType == 'other' or opType == 'external' or opType == 'plotter':
204 206
205 207 if not opId:
206 208 raise ValueError, "opId parameter should be defined"
207 209
208 210 if opId not in self.operations2RunDict.keys():
209 211 raise ValueError, "Any operation with id=%s has been added" %str(opId)
210 212
211 213 sts = self.callObject(opId)
212 214
213 215 else:
214 216 raise ValueError, "opType should be 'self', 'external' or 'plotter'; and not '%s'" %opType
215 217
216 218 return sts
217 219
218 220 def setInput(self, dataIn):
219 221
220 222 self.dataIn = dataIn
221 223 self.dataInList.append(dataIn)
222 224
223 225 def getOutputObj(self):
224 226
225 227 return self.dataOut
226 228
227 229 def checkInputs(self):
228 230
229 231 for thisDataIn in self.dataInList:
230 232
231 233 if thisDataIn.isEmpty():
232 234 return False
233 235
234 236 return True
235 237
236 238 def setup(self):
237 239
238 240 raise NotImplementedError
239 241
240 242 def run(self):
241 243
242 244 raise NotImplementedError
243 245
244 246 def close(self):
245 247 #Close every thread, queue or any other object here is it is neccesary.
246 248 return
247 249
248 250 class Operation(object):
249 251
250 252 """
251 253 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
252 254 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
253 255 acumulacion dentro de esta clase
254 256
255 257 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
256 258
257 259 """
258 260
259 261 __buffer = None
260 262 isConfig = False
261 263
262 264 def __init__(self, **kwargs):
263 265
264 266 self.__buffer = None
265 267 self.isConfig = False
266 268 self.kwargs = kwargs
267 269
268 270 def setup(self):
269 271
270 272 self.isConfig = True
271 273
272 274 raise NotImplementedError
273 275
274 276 def run(self, dataIn, **kwargs):
275 277
276 278 """
277 279 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
278 280 atributos del objeto dataIn.
279 281
280 282 Input:
281 283
282 284 dataIn : objeto del tipo JROData
283 285
284 286 Return:
285 287
286 288 None
287 289
288 290 Affected:
289 291 __buffer : buffer de recepcion de datos.
290 292
291 293 """
292 294 if not self.isConfig:
293 295 self.setup(**kwargs)
294 296
295 297 raise NotImplementedError
296 298
297 299 def close(self):
298 300
299 301 pass
@@ -1,378 +1,378
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 9 import zmq
10 10 import cPickle as pickle
11 11 import datetime
12 12 from zmq.utils.monitor import recv_monitor_message
13 13 from functools import wraps
14 14 from threading import Thread
15 15 from multiprocessing import Process
16 16
17 17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18 18
19 19 MAXNUMX = 100
20 20 MAXNUMY = 100
21 21 throttle_value = 5
22 22
23 23 class PrettyFloat(float):
24 24 def __repr__(self):
25 25 return '%.2f' % self
26 26
27 27 def roundFloats(obj):
28 28 if isinstance(obj, list):
29 29 return map(roundFloats, obj)
30 30 elif isinstance(obj, float):
31 31 return round(obj, 2)
32 32
33 33
34 34 class throttle(object):
35 35 """Decorator that prevents a function from being called more than once every
36 36 time period.
37 37 To create a function that cannot be called more than once a minute, but
38 38 will sleep until it can be called:
39 39 @throttle(minutes=1)
40 40 def foo():
41 41 pass
42 42
43 43 for i in range(10):
44 44 foo()
45 45 print "This function has run %s times." % i
46 46 """
47 47
48 48 def __init__(self, seconds=0, minutes=0, hours=0):
49 49 self.throttle_period = datetime.timedelta(
50 50 seconds=seconds, minutes=minutes, hours=hours
51 51 )
52 52 self.time_of_last_call = datetime.datetime.min
53 53
54 54 def __call__(self, fn):
55 55 @wraps(fn)
56 56 def wrapper(*args, **kwargs):
57 57 now = datetime.datetime.now()
58 58 time_since_last_call = now - self.time_of_last_call
59 59 time_left = self.throttle_period - time_since_last_call
60 60
61 61 if time_left > datetime.timedelta(seconds=0):
62 62 return
63 63
64 64 self.time_of_last_call = datetime.datetime.now()
65 65 return fn(*args, **kwargs)
66 66
67 67 return wrapper
68 68
69 69
70 70 class PublishData(Operation):
71 71 """Clase publish."""
72 72
73 73 def __init__(self, **kwargs):
74 74 """Inicio."""
75 75 Operation.__init__(self, **kwargs)
76 76 self.isConfig = False
77 77 self.client = None
78 78 self.zeromq = None
79 79 self.mqtt = None
80 80
81 81 def on_disconnect(self, client, userdata, rc):
82 82 if rc != 0:
83 83 print("Unexpected disconnection.")
84 84 self.connect()
85 85
86 86 def connect(self):
87 87 print 'trying to connect'
88 88 try:
89 89 self.client.connect(
90 90 host=self.host,
91 91 port=self.port,
92 92 keepalive=60*10,
93 93 bind_address='')
94 94 print "connected"
95 95 self.client.loop_start()
96 96 # self.client.publish(
97 97 # self.topic + 'SETUP',
98 98 # json.dumps(setup),
99 99 # retain=True
100 100 # )
101 101 except:
102 102 print "MQTT Conection error."
103 103 self.client = False
104 104
105 105 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
106 106 self.counter = 0
107 107 self.topic = kwargs.get('topic', 'schain')
108 108 self.delay = kwargs.get('delay', 0)
109 109 self.plottype = kwargs.get('plottype', 'spectra')
110 110 self.host = kwargs.get('host', "10.10.10.82")
111 111 self.port = kwargs.get('port', 3000)
112 112 self.clientId = clientId
113 113 self.cnt = 0
114 114 self.zeromq = zeromq
115 115 self.mqtt = kwargs.get('plottype', 0)
116 116 self.client = None
117 117 setup = []
118 118 if mqtt is 1:
119 119 print 'mqqt es 1'
120 120 self.client = mqtt.Client(
121 121 client_id=self.clientId + self.topic + 'SCHAIN',
122 122 clean_session=True)
123 123 self.client.on_disconnect = self.on_disconnect
124 124 self.connect()
125 125 for plot in self.plottype:
126 126 setup.append({
127 127 'plot': plot,
128 128 'topic': self.topic + plot,
129 129 'title': getattr(self, plot + '_' + 'title', False),
130 130 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
131 131 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
132 132 'xrange': getattr(self, plot + '_' + 'xrange', False),
133 133 'yrange': getattr(self, plot + '_' + 'yrange', False),
134 134 'zrange': getattr(self, plot + '_' + 'zrange', False),
135 135 })
136 136 if zeromq is 1:
137 137 context = zmq.Context()
138 138 self.zmq_socket = context.socket(zmq.PUSH)
139 139 server = kwargs.get('server', 'zmq.pipe')
140 140
141 141 if 'tcp://' in server:
142 142 address = server
143 143 else:
144 144 address = 'ipc:///tmp/%s' % server
145 145
146 146 self.zmq_socket.connect(address)
147 147 time.sleep(1)
148 148 print 'zeromq configured'
149 149
150 150
151 151 def publish_data(self):
152 152 self.dataOut.finished = False
153 153 if self.mqtt is 1:
154 154 yData = self.dataOut.heightList[:2].tolist()
155 155 if self.plottype == 'spectra':
156 156 data = getattr(self.dataOut, 'data_spc')
157 157 z = data/self.dataOut.normFactor
158 158 zdB = 10*numpy.log10(z)
159 159 xlen, ylen = zdB[0].shape
160 160 dx = int(xlen/MAXNUMX) + 1
161 161 dy = int(ylen/MAXNUMY) + 1
162 162 Z = [0 for i in self.dataOut.channelList]
163 163 for i in self.dataOut.channelList:
164 164 Z[i] = zdB[i][::dx, ::dy].tolist()
165 165 payload = {
166 166 'timestamp': self.dataOut.utctime,
167 167 'data': roundFloats(Z),
168 168 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
169 169 'interval': self.dataOut.getTimeInterval(),
170 170 'type': self.plottype,
171 171 'yData': yData
172 172 }
173 173 # print payload
174 174
175 175 elif self.plottype in ('rti', 'power'):
176 176 data = getattr(self.dataOut, 'data_spc')
177 177 z = data/self.dataOut.normFactor
178 178 avg = numpy.average(z, axis=1)
179 179 avgdB = 10*numpy.log10(avg)
180 180 xlen, ylen = z[0].shape
181 181 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
182 182 AVG = [0 for i in self.dataOut.channelList]
183 183 for i in self.dataOut.channelList:
184 184 AVG[i] = avgdB[i][::dy].tolist()
185 185 payload = {
186 186 'timestamp': self.dataOut.utctime,
187 187 'data': roundFloats(AVG),
188 188 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
189 189 'interval': self.dataOut.getTimeInterval(),
190 190 'type': self.plottype,
191 191 'yData': yData
192 192 }
193 193 elif self.plottype == 'noise':
194 194 noise = self.dataOut.getNoise()/self.dataOut.normFactor
195 195 noisedB = 10*numpy.log10(noise)
196 196 payload = {
197 197 'timestamp': self.dataOut.utctime,
198 198 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
199 199 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
200 200 'interval': self.dataOut.getTimeInterval(),
201 201 'type': self.plottype,
202 202 'yData': yData
203 203 }
204 204 elif self.plottype == 'snr':
205 205 data = getattr(self.dataOut, 'data_SNR')
206 206 avgdB = 10*numpy.log10(data)
207 207
208 208 ylen = data[0].size
209 209 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
210 210 AVG = [0 for i in self.dataOut.channelList]
211 211 for i in self.dataOut.channelList:
212 212 AVG[i] = avgdB[i][::dy].tolist()
213 213 payload = {
214 214 'timestamp': self.dataOut.utctime,
215 215 'data': roundFloats(AVG),
216 216 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
217 217 'type': self.plottype,
218 218 'yData': yData
219 219 }
220 220 else:
221 221 print "Tipo de grafico invalido"
222 222 payload = {
223 223 'data': 'None',
224 224 'timestamp': 'None',
225 225 'type': None
226 226 }
227 227 # print 'Publishing data to {}'.format(self.host)
228 228 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
229 229
230 230 if self.zeromq is 1:
231 231 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
232 232 self.zmq_socket.send_pyobj(self.dataOut)
233 233
234 234 def run(self, dataOut, **kwargs):
235 235 self.dataOut = dataOut
236 236 if not self.isConfig:
237 237 self.setup(**kwargs)
238 238 self.isConfig = True
239 239
240 240 self.publish_data()
241 241 time.sleep(self.delay)
242 242
243 243 def close(self):
244 244 if self.zeromq is 1:
245 245 self.dataOut.finished = True
246 246 self.zmq_socket.send_pyobj(self.dataOut)
247 247
248 248 if self.client:
249 249 self.client.loop_stop()
250 250 self.client.disconnect()
251 251
252 252
253 253 class ReceiverData(ProcessingUnit, Process):
254 254
255 255 def __init__(self, **kwargs):
256 256
257 257 ProcessingUnit.__init__(self, **kwargs)
258 258 Process.__init__(self)
259 259 self.mp = False
260 260 self.isConfig = False
261 261 self.plottypes =[]
262 262 self.connections = 0
263 263 server = kwargs.get('server', 'zmq.pipe')
264 264 if 'tcp://' in server:
265 265 address = server
266 266 else:
267 267 address = 'ipc:///tmp/%s' % server
268 268
269 269 self.address = address
270 270 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
271 271 self.realtime = kwargs.get('realtime', False)
272 272 global throttle_value
273 273 throttle_value = kwargs.get('throttle', 10)
274 274 self.setup()
275 275
276 276 def setup(self):
277 277
278 278 self.data = {}
279 279 self.data['times'] = []
280 280 for plottype in self.plottypes:
281 281 self.data[plottype] = {}
282 282
283 283 self.isConfig = True
284 284
285 285 def event_monitor(self, monitor):
286 286
287 287 events = {}
288 288
289 289 for name in dir(zmq):
290 290 if name.startswith('EVENT_'):
291 291 value = getattr(zmq, name)
292 292 events[value] = name
293 293
294 294 while monitor.poll():
295 295 evt = recv_monitor_message(monitor)
296 296 if evt['event'] == 32:
297 297 self.connections += 1
298 298 if evt['event'] == 512:
299 299 pass
300 300 if self.connections == 0 and self.started is True:
301 301 self.ended = True
302 302 # send('ENDED')
303 303 evt.update({'description': events[evt['event']]})
304 304
305 305 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
306 306 break
307 307 monitor.close()
308 308 print("event monitor thread done!")
309 309
310 310 @throttle(seconds=throttle_value)
311 311 def sendData(self, data):
312 312 self.send(data)
313 313
314 314 def send(self, data):
315 315 print '[sending] data=%s size=%s' % (data.keys(), len(data['times']))
316 316 self.sender.send_pyobj(data)
317 317
318 318 def update(self):
319 319
320 320 t = self.dataOut.ltctime
321 321 self.data['times'].append(t)
322 322 self.data['dataOut'] = self.dataOut
323 323
324 324 for plottype in self.plottypes:
325 325
326 326 if plottype == 'spc':
327 327 z = self.dataOut.data_spc/self.dataOut.normFactor
328 328 zdB = 10*numpy.log10(z)
329 329 self.data[plottype] = zdB
330 330 if plottype == 'rti':
331 331 self.data[plottype][t] = self.dataOut.getPower()
332 332 if plottype == 'snr':
333 333 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR)
334 334 if plottype == 'dop':
335 335 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP)
336 336 if plottype == 'coh':
337 337 self.data[plottype][t] = self.dataOut.getCoherence()
338 338 if plottype == 'phase':
339 339 self.data[plottype][t] = self.dataOut.getCoherence(phase=True)
340 340
341 341 def run(self):
342 342
343 343 print '[Starting] {} from {}'.format(self.name, self.address)
344 344
345 345 self.context = zmq.Context()
346 346 self.receiver = self.context.socket(zmq.PULL)
347 347 self.receiver.bind(self.address)
348 348 monitor = self.receiver.get_monitor_socket()
349 349 self.sender = self.context.socket(zmq.PUB)
350 350
351 351 self.sender.bind("ipc:///tmp/zmq.plots")
352 352
353 353 t = Thread(target=self.event_monitor, args=(monitor,))
354 354 t.start()
355 355
356 356 while True:
357 357 self.dataOut = self.receiver.recv_pyobj()
358 358 print '[Receiving] {} - {}'.format(self.dataOut.type,
359 359 self.dataOut.datatime.ctime())
360 360
361 361 self.update()
362 362
363 363 if self.dataOut.finished is True:
364 364 self.send(self.data)
365 365 self.connections -= 1
366 if self.connections==0 and self.started:
366 if self.connections == 0 and self.started:
367 367 self.ended = True
368 368 self.data['ENDED'] = True
369 369 self.send(self.data)
370 370 self.setup()
371 371 else:
372 372 if self.realtime:
373 373 self.send(self.data)
374 374 else:
375 375 self.sendData(self.data)
376 376 self.started = True
377 377
378 378 return
@@ -1,77 +1,75
1 1 #!/usr/bin/env python
2 2 '''
3 3 Created on Jul 7, 2014
4 4
5 5 @author: roj-idl71
6 6 '''
7 7 import os, sys
8 8 from datetime import datetime, timedelta
9 9 import multiprocessing
10 10 from schainpy.controller import Project
11 11
12 12 def main(date):
13 13
14 14 controllerObj = Project()
15 15
16 controllerObj.setup(id = '191', name='test01', description='')
16 controllerObj.setup(id='191', name='test01', description='')
17 17
18 18 readUnitConfObj = controllerObj.addReadUnit(datatype='Spectra',
19 path='/data/workspace/data/zeus/',
19 path='/home/nanosat/data/zeus',
20 20 startDate=date,
21 21 endDate=date,
22 22 startTime='00:00:00',
23 23 endTime='23:59:59',
24 24 online=0,
25 25 walk=1,
26 26 expLabel='')
27 27
28 28 procUnitConfObj1 = controllerObj.addProcUnit(datatype='Spectra', inputId=readUnitConfObj.getId())
29 29 #opObj11 = procUnitConfObj1.addOperation(name='removeDC')
30 30 #opObj11.addParameter(name='mode', value='1', format='int')
31 31
32 32 #opObj11 = procUnitConfObj1.addOperation(name='removeInterference')
33 33
34 34
35 35 # opObj11 = procUnitConfObj1.addOperation(name='RTIPlot', optype='other')
36 36 # opObj11.addParameter(name='id', value='10', format='int')
37 37 # opObj11.addParameter(name='wintitle', value='150Km', format='str')
38 38 # opObj11.addParameter(name='colormap', value='jro', format='str')
39 39 # opObj11.addParameter(name='xaxis', value='time', format='str')
40 40 # opObj11.addParameter(name='xmin', value='0', format='int')
41 41 # opObj11.addParameter(name='xmax', value='23', format='int')
42 42 # #opObj11.addParameter(name='ymin', value='100', format='int')
43 43 # #opObj11.addParameter(name='ymax', value='150', format='int')
44 44 # opObj11.addParameter(name='zmin', value='10', format='int')
45 45 # opObj11.addParameter(name='zmax', value='35', format='int')
46 46
47
47
48 48
49 49
50 50 opObj11 = procUnitConfObj1.addOperation(name='PlotRTIData', optype='other')
51 51 opObj11.addParameter(name='id', value='12', format='int')
52 52 opObj11.addParameter(name='wintitle', value='150Km', format='str')
53 53 opObj11.addParameter(name='colormap', value='jro', format='str')
54 54 opObj11.addParameter(name='xaxis', value='time', format='str')
55 55 opObj11.addParameter(name='xmin', value='0', format='int')
56 56 opObj11.addParameter(name='xmax', value='23', format='int')
57 57 #opObj11.addParameter(name='ymin', value='100', format='int')
58 58 #opObj11.addParameter(name='ymax', value='150', format='int')
59 59 opObj11.addParameter(name='zmin', value='10', format='int')
60 60 opObj11.addParameter(name='zmax', value='35', format='int')
61 61 #opObj11.addParameter(name='pause', value='1', format='bool')
62 62 opObj11.addParameter(name='show', value='0', format='bool')
63 63 opObj11.addParameter(name='save', value='/tmp', format='str')
64 64
65 65
66 66 controllerObj.start()
67 67
68 68 if __name__=='__main__':
69
69
70 70 dt = datetime(2017, 1, 12)
71
72 dates = [(dt+timedelta(x)).strftime('%Y/%m/%d') for x in range(20)]
73
71
72 dates = [(dt+timedelta(x)).strftime('%Y/%m/%d') for x in range(20)]
73
74 74 p = multiprocessing.Pool(4)
75 75 p.map(main, dates)
76
77 No newline at end of file
@@ -1,42 +1,42
1 1 #!/usr/bin/env python
2 2 '''
3 3 Created on Jul 7, 2014
4 4
5 5 @author: roj-idl71
6 6 '''
7 7 import os, sys
8 8
9 9 from schainpy.controller import Project
10 10
11 11 if __name__ == '__main__':
12 12 desc = "Segundo Test"
13 13
14 14 controllerObj = Project()
15 controllerObj.setup(id = '191', name='test01', description=desc)
15 controllerObj.setup(id='191', name='test01', description=desc)
16 16
17 17 proc1 = controllerObj.addProcUnit(name='ReceiverData')
18 18 # proc1.addParameter(name='server', value='tcp://10.10.10.87:3000', format='str')
19 19 proc1.addParameter(name='realtime', value='1', format='bool')
20 20 proc1.addParameter(name='plottypes', value='rti,spc', format='str')
21 21
22 22 op1 = proc1.addOperation(name='PlotRTIData', optype='other')
23 23 op1.addParameter(name='wintitle', value='Julia 150Km', format='str')
24 24
25 25 op2 = proc1.addOperation(name='PlotSpectraData', optype='other')
26 26 op2.addParameter(name='wintitle', value='Julia 150Km', format='str')
27 27 op2.addParameter(name='xaxis', value='velocity', format='str')
28 28 op2.addParameter(name='showprofile', value='1', format='bool')
29 29 #op2.addParameter(name='xmin', value='-0.1', format='float')
30 30 #op2.addParameter(name='xmax', value='0.1', format='float')
31 31
32 32 # op1 = proc1.addOperation(name='PlotPHASEData', optype='other')
33 33 # op1.addParameter(name='wintitle', value='Julia 150Km', format='str')
34 34
35 35
36 36 # proc1 = controllerObj.addProcUnit(name='ReceiverData')
37 37 # proc1.addParameter(name='server', value='pipe2', format='str')
38 38 # proc1.addParameter(name='mode', value='buffer', format='str')
39 39 # proc1.addParameter(name='plottypes', value='snr', format='str')
40 40
41 41
42 42 controllerObj.start()
General Comments 0
You need to be logged in to leave comments. Login now