##// END OF EJS Templates
cambios en como se envia la data a la web
José Chávez -
r909:ec5b39ef16e6
parent child
Show More
@@ -1,393 +1,395
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 9 import zmq
10 10 import cPickle as pickle
11 11 import datetime
12 12 from zmq.utils.monitor import recv_monitor_message
13 13 from functools import wraps
14 14 from threading import Thread
15 15 from multiprocessing import Process
16 16
17 17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18 18
19 19 MAXNUMX = 100
20 20 MAXNUMY = 100
21 21
22 22 class PrettyFloat(float):
23 23 def __repr__(self):
24 24 return '%.2f' % self
25 25
26 26 def roundFloats(obj):
27 27 if isinstance(obj, list):
28 28 return map(roundFloats, obj)
29 29 elif isinstance(obj, float):
30 30 return round(obj, 2)
31 31
32 32 def decimate(z):
33 33 # dx = int(len(self.x)/self.__MAXNUMX) + 1
34 34 dy = int(len(z[0])/MAXNUMY) + 1
35 35 return z[::, ::dy]
36 36
37 37 class throttle(object):
38 38 """Decorator that prevents a function from being called more than once every
39 39 time period.
40 40 To create a function that cannot be called more than once a minute, but
41 41 will sleep until it can be called:
42 42 @throttle(minutes=1)
43 43 def foo():
44 44 pass
45 45
46 46 for i in range(10):
47 47 foo()
48 48 print "This function has run %s times." % i
49 49 """
50 50
51 51 def __init__(self, seconds=0, minutes=0, hours=0):
52 52 self.throttle_period = datetime.timedelta(
53 53 seconds=seconds, minutes=minutes, hours=hours
54 54 )
55 55
56 56 self.time_of_last_call = datetime.datetime.min
57 57
58 58 def __call__(self, fn):
59 59 @wraps(fn)
60 60 def wrapper(*args, **kwargs):
61 61 now = datetime.datetime.now()
62 62 time_since_last_call = now - self.time_of_last_call
63 63 time_left = self.throttle_period - time_since_last_call
64 64
65 65 if time_left > datetime.timedelta(seconds=0):
66 66 return
67 67
68 68 self.time_of_last_call = datetime.datetime.now()
69 69 return fn(*args, **kwargs)
70 70
71 71 return wrapper
72 72
73 73
74 74 class PublishData(Operation):
75 75 """Clase publish."""
76 76
77 77 def __init__(self, **kwargs):
78 78 """Inicio."""
79 79 Operation.__init__(self, **kwargs)
80 80 self.isConfig = False
81 81 self.client = None
82 82 self.zeromq = None
83 83 self.mqtt = None
84 84
85 85 def on_disconnect(self, client, userdata, rc):
86 86 if rc != 0:
87 87 print("Unexpected disconnection.")
88 88 self.connect()
89 89
90 90 def connect(self):
91 91 print 'trying to connect'
92 92 try:
93 93 self.client.connect(
94 94 host=self.host,
95 95 port=self.port,
96 96 keepalive=60*10,
97 97 bind_address='')
98 98 self.client.loop_start()
99 99 # self.client.publish(
100 100 # self.topic + 'SETUP',
101 101 # json.dumps(setup),
102 102 # retain=True
103 103 # )
104 104 except:
105 105 print "MQTT Conection error."
106 106 self.client = False
107 107
108 108 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
109 109 self.counter = 0
110 110 self.topic = kwargs.get('topic', 'schain')
111 111 self.delay = kwargs.get('delay', 0)
112 112 self.plottype = kwargs.get('plottype', 'spectra')
113 113 self.host = kwargs.get('host', "10.10.10.82")
114 114 self.port = kwargs.get('port', 3000)
115 115 self.clientId = clientId
116 116 self.cnt = 0
117 117 self.zeromq = zeromq
118 118 self.mqtt = kwargs.get('plottype', 0)
119 119 self.client = None
120 120 setup = []
121 121 if mqtt is 1:
122 122 self.client = mqtt.Client(
123 123 client_id=self.clientId + self.topic + 'SCHAIN',
124 124 clean_session=True)
125 125 self.client.on_disconnect = self.on_disconnect
126 126 self.connect()
127 127 for plot in self.plottype:
128 128 setup.append({
129 129 'plot': plot,
130 130 'topic': self.topic + plot,
131 131 'title': getattr(self, plot + '_' + 'title', False),
132 132 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
133 133 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
134 134 'xrange': getattr(self, plot + '_' + 'xrange', False),
135 135 'yrange': getattr(self, plot + '_' + 'yrange', False),
136 136 'zrange': getattr(self, plot + '_' + 'zrange', False),
137 137 })
138 138 if zeromq is 1:
139 139 context = zmq.Context()
140 140 self.zmq_socket = context.socket(zmq.PUSH)
141 141 server = kwargs.get('server', 'zmq.pipe')
142 142
143 143 if 'tcp://' in server:
144 144 address = server
145 145 else:
146 146 address = 'ipc:///tmp/%s' % server
147 147
148 148 self.zmq_socket.connect(address)
149 149 time.sleep(1)
150 150
151 151
152 152
153 153 def publish_data(self):
154 154 self.dataOut.finished = False
155 155 if self.mqtt is 1:
156 156 yData = self.dataOut.heightList[:2].tolist()
157 157 if self.plottype == 'spectra':
158 158 data = getattr(self.dataOut, 'data_spc')
159 159 z = data/self.dataOut.normFactor
160 160 zdB = 10*numpy.log10(z)
161 161 xlen, ylen = zdB[0].shape
162 162 dx = int(xlen/MAXNUMX) + 1
163 163 dy = int(ylen/MAXNUMY) + 1
164 164 Z = [0 for i in self.dataOut.channelList]
165 165 for i in self.dataOut.channelList:
166 166 Z[i] = zdB[i][::dx, ::dy].tolist()
167 167 payload = {
168 168 'timestamp': self.dataOut.utctime,
169 169 'data': roundFloats(Z),
170 170 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
171 171 'interval': self.dataOut.getTimeInterval(),
172 172 'type': self.plottype,
173 173 'yData': yData
174 174 }
175 175 # print payload
176 176
177 177 elif self.plottype in ('rti', 'power'):
178 178 data = getattr(self.dataOut, 'data_spc')
179 179 z = data/self.dataOut.normFactor
180 180 avg = numpy.average(z, axis=1)
181 181 avgdB = 10*numpy.log10(avg)
182 182 xlen, ylen = z[0].shape
183 183 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
184 184 AVG = [0 for i in self.dataOut.channelList]
185 185 for i in self.dataOut.channelList:
186 186 AVG[i] = avgdB[i][::dy].tolist()
187 187 payload = {
188 188 'timestamp': self.dataOut.utctime,
189 189 'data': roundFloats(AVG),
190 190 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
191 191 'interval': self.dataOut.getTimeInterval(),
192 192 'type': self.plottype,
193 193 'yData': yData
194 194 }
195 195 elif self.plottype == 'noise':
196 196 noise = self.dataOut.getNoise()/self.dataOut.normFactor
197 197 noisedB = 10*numpy.log10(noise)
198 198 payload = {
199 199 'timestamp': self.dataOut.utctime,
200 200 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
201 201 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
202 202 'interval': self.dataOut.getTimeInterval(),
203 203 'type': self.plottype,
204 204 'yData': yData
205 205 }
206 206 elif self.plottype == 'snr':
207 207 data = getattr(self.dataOut, 'data_SNR')
208 208 avgdB = 10*numpy.log10(data)
209 209
210 210 ylen = data[0].size
211 211 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
212 212 AVG = [0 for i in self.dataOut.channelList]
213 213 for i in self.dataOut.channelList:
214 214 AVG[i] = avgdB[i][::dy].tolist()
215 215 payload = {
216 216 'timestamp': self.dataOut.utctime,
217 217 'data': roundFloats(AVG),
218 218 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
219 219 'type': self.plottype,
220 220 'yData': yData
221 221 }
222 222 else:
223 223 print "Tipo de grafico invalido"
224 224 payload = {
225 225 'data': 'None',
226 226 'timestamp': 'None',
227 227 'type': None
228 228 }
229 229 # print 'Publishing data to {}'.format(self.host)
230 230 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
231 231
232 232 if self.zeromq is 1:
233 233 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
234 234 self.zmq_socket.send_pyobj(self.dataOut)
235 235
236 236 def run(self, dataOut, **kwargs):
237 237 self.dataOut = dataOut
238 238 if not self.isConfig:
239 239 self.setup(**kwargs)
240 240 self.isConfig = True
241 241
242 242 self.publish_data()
243 243 time.sleep(self.delay)
244 244
245 245 def close(self):
246 246 if self.zeromq is 1:
247 247 self.dataOut.finished = True
248 248 self.zmq_socket.send_pyobj(self.dataOut)
249 249
250 250 if self.client:
251 251 self.client.loop_stop()
252 252 self.client.disconnect()
253 253
254 254
255 255 class ReceiverData(ProcessingUnit, Process):
256 256
257 257 throttle_value = 5
258 258
259 259 def __init__(self, **kwargs):
260 260
261 261 ProcessingUnit.__init__(self, **kwargs)
262 262 Process.__init__(self)
263 263 self.mp = False
264 264 self.isConfig = False
265 265 self.plottypes =[]
266 266 self.connections = 0
267 267 server = kwargs.get('server', 'zmq.pipe')
268 268 if 'tcp://' in server:
269 269 address = server
270 270 else:
271 271 address = 'ipc:///tmp/%s' % server
272 272
273 273 self.address = address
274 274 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
275 275 self.realtime = kwargs.get('realtime', False)
276 276 self.throttle_value = kwargs.get('throttle', 10)
277 277 self.sendData = self.initThrottle(self.throttle_value)
278 278 self.setup()
279 279
280 280 def setup(self):
281 281
282 282 self.data = {}
283 283 self.data['times'] = []
284 284 for plottype in self.plottypes:
285 285 self.data[plottype] = {}
286 286 self.data['noise'] = {}
287 287 self.data['throttle'] = self.throttle_value
288 288 self.data['ENDED'] = False
289 289 self.isConfig = True
290 290 self.data_web = {}
291 291
292 292 def event_monitor(self, monitor):
293 293
294 294 events = {}
295 295
296 296 for name in dir(zmq):
297 297 if name.startswith('EVENT_'):
298 298 value = getattr(zmq, name)
299 299 events[value] = name
300 300
301 301 while monitor.poll():
302 302 evt = recv_monitor_message(monitor)
303 303 if evt['event'] == 32:
304 304 self.connections += 1
305 305 if evt['event'] == 512:
306 306 pass
307 307 if self.connections == 0 and self.started is True:
308 308 self.ended = True
309 309 # send('ENDED')
310 310 evt.update({'description': events[evt['event']]})
311 311
312 312 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
313 313 break
314 314 monitor.close()
315 315 print("event monitor thread done!")
316 316
317 317 def initThrottle(self, throttle_value):
318 318
319 319 @throttle(seconds=throttle_value)
320 320 def sendDataThrottled(fn_sender, data):
321 321 fn_sender(data)
322 322
323 323 return sendDataThrottled
324 324
325 325 def send(self, data):
326 326 # print '[sending] data=%s size=%s' % (data.keys(), len(data['times']))
327 327 self.sender.send_pyobj(data)
328 328
329 329 def update(self):
330 330
331 331 t = self.dataOut.ltctime
332 332 self.data['times'].append(t)
333 333 self.data['dataOut'] = self.dataOut
334 334 for plottype in self.plottypes:
335 335 if plottype == 'spc':
336 336 z = self.dataOut.data_spc/self.dataOut.normFactor
337 337 self.data[plottype] = 10*numpy.log10(z)
338 338 self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor)
339 339 if plottype == 'rti':
340 340 self.data[plottype][t] = self.dataOut.getPower()
341 341 if plottype == 'snr':
342 342 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR)
343 343 if plottype == 'dop':
344 344 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP)
345 345 if plottype == 'coh':
346 346 self.data[plottype][t] = self.dataOut.getCoherence()
347 347 if plottype == 'phase':
348 348 self.data[plottype][t] = self.dataOut.getCoherence(phase=True)
349 349 if self.realtime:
350 350 self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist())
351 351 self.data_web['time'] = t
352 self.data_web['interval'] = self.dataOut.getTimeInterval()
353 self.data_web['type'] = plottype
352 354 def run(self):
353 355
354 356 print '[Starting] {} from {}'.format(self.name, self.address)
355 357
356 358 self.context = zmq.Context()
357 359 self.receiver = self.context.socket(zmq.PULL)
358 360 self.receiver.bind(self.address)
359 361 monitor = self.receiver.get_monitor_socket()
360 362 self.sender = self.context.socket(zmq.PUB)
361 363 if self.realtime:
362 364 self.sender_web = self.context.socket(zmq.PUB)
363 365 # self.sender_web.setsockopt(zmq.PUBLISH, 'realtime')
364 366 self.sender_web.bind("ipc:///tmp/zmq.web")
365 367 self.sender.bind("ipc:///tmp/zmq.plots")
366 368
367 369 t = Thread(target=self.event_monitor, args=(monitor,))
368 370 t.start()
369 371
370 372 while True:
371 373 self.dataOut = self.receiver.recv_pyobj()
372 374 # print '[Receiving] {} - {}'.format(self.dataOut.type,
373 375 # self.dataOut.datatime.ctime())
374 376
375 377 self.update()
376 378
377 379 if self.dataOut.finished is True:
378 380 self.send(self.data)
379 381 self.connections -= 1
380 382 if self.connections == 0 and self.started:
381 383 self.ended = True
382 384 self.data['ENDED'] = True
383 385 self.send(self.data)
384 386 self.setup()
385 387 else:
386 388 if self.realtime:
387 389 self.send(self.data)
388 390 self.sender_web.send_string(json.dumps(self.data_web))
389 391 else:
390 392 self.sendData(self.send, self.data)
391 393 self.started = True
392 394
393 395 return
General Comments 0
You need to be logged in to leave comments. Login now