##// END OF EJS Templates
Add default values in jsonify Data
jespinoza -
r1127:be11c71d5d2b
parent child
Show More
@@ -1,631 +1,635
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 9 import zmq
10 10 import datetime
11 11 from zmq.utils.monitor import recv_monitor_message
12 12 from functools import wraps
13 13 from threading import Thread
14 14 from multiprocessing import Process
15 15
16 16 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 17 from schainpy.model.data.jrodata import JROData
18 18 from schainpy.utils import log
19 19
20 20 MAXNUMX = 100
21 21 MAXNUMY = 100
22 22
23 23 class PrettyFloat(float):
24 24 def __repr__(self):
25 25 return '%.2f' % self
26 26
27 27 def roundFloats(obj):
28 28 if isinstance(obj, list):
29 29 return map(roundFloats, obj)
30 30 elif isinstance(obj, float):
31 31 return round(obj, 2)
32 32
33 33 def decimate(z, MAXNUMY):
34 34 dy = int(len(z[0])/MAXNUMY) + 1
35 35
36 36 return z[::, ::dy]
37 37
38 38 class throttle(object):
39 39 '''
40 40 Decorator that prevents a function from being called more than once every
41 41 time period.
42 42 To create a function that cannot be called more than once a minute, but
43 43 will sleep until it can be called:
44 44 @throttle(minutes=1)
45 45 def foo():
46 46 pass
47 47
48 48 for i in range(10):
49 49 foo()
50 50 print "This function has run %s times." % i
51 51 '''
52 52
53 53 def __init__(self, seconds=0, minutes=0, hours=0):
54 54 self.throttle_period = datetime.timedelta(
55 55 seconds=seconds, minutes=minutes, hours=hours
56 56 )
57 57
58 58 self.time_of_last_call = datetime.datetime.min
59 59
60 60 def __call__(self, fn):
61 61 @wraps(fn)
62 62 def wrapper(*args, **kwargs):
63 63 coerce = kwargs.pop('coerce', None)
64 64 if coerce:
65 65 self.time_of_last_call = datetime.datetime.now()
66 66 return fn(*args, **kwargs)
67 67 else:
68 68 now = datetime.datetime.now()
69 69 time_since_last_call = now - self.time_of_last_call
70 70 time_left = self.throttle_period - time_since_last_call
71 71
72 72 if time_left > datetime.timedelta(seconds=0):
73 73 return
74 74
75 75 self.time_of_last_call = datetime.datetime.now()
76 76 return fn(*args, **kwargs)
77 77
78 78 return wrapper
79 79
80 80 class Data(object):
81 81 '''
82 82 Object to hold data to be plotted
83 83 '''
84 84
85 85 def __init__(self, plottypes, throttle_value, exp_code):
86 86 self.plottypes = plottypes
87 87 self.throttle = throttle_value
88 88 self.exp_code = exp_code
89 89 self.ended = False
90 90 self.localtime = False
91 91 self.__times = []
92 92 self.__heights = []
93 93
94 94 def __str__(self):
95 95 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
96 96 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
97 97
98 98 def __len__(self):
99 99 return len(self.__times)
100 100
101 101 def __getitem__(self, key):
102 102 if key not in self.data:
103 103 raise KeyError(log.error('Missing key: {}'.format(key)))
104 104
105 105 if 'spc' in key:
106 106 ret = self.data[key]
107 107 else:
108 108 ret = numpy.array([self.data[key][x] for x in self.times])
109 109 if ret.ndim > 1:
110 110 ret = numpy.swapaxes(ret, 0, 1)
111 111 return ret
112 112
113 113 def __contains__(self, key):
114 114 return key in self.data
115 115
116 116 def setup(self):
117 117 '''
118 118 Configure object
119 119 '''
120 120
121 121 self.ended = False
122 122 self.data = {}
123 123 self.__times = []
124 124 self.__heights = []
125 125 self.__all_heights = set()
126 126 for plot in self.plottypes:
127 127 if 'snr' in plot:
128 128 plot = 'snr'
129 129 self.data[plot] = {}
130 130
131 131 def shape(self, key):
132 132 '''
133 133 Get the shape of the one-element data for the given key
134 134 '''
135 135
136 136 if len(self.data[key]):
137 137 if 'spc' in key:
138 138 return self.data[key].shape
139 139 return self.data[key][self.__times[0]].shape
140 140 return (0,)
141 141
142 142 def update(self, dataOut, tm):
143 143 '''
144 144 Update data object with new dataOut
145 145 '''
146 146
147 147 if tm in self.__times:
148 148 return
149 149
150 150 self.parameters = getattr(dataOut, 'parameters', [])
151 151 if hasattr(dataOut, 'pairsList'):
152 152 self.pairs = dataOut.pairsList
153 153 self.channels = dataOut.channelList
154 154 self.interval = dataOut.getTimeInterval()
155 155 self.localtime = dataOut.useLocalTime
156 156 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
157 157 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
158 158 self.__heights.append(dataOut.heightList)
159 159 self.__all_heights.update(dataOut.heightList)
160 160 self.__times.append(tm)
161 161
162 162 for plot in self.plottypes:
163 163 if plot == 'spc':
164 164 z = dataOut.data_spc/dataOut.normFactor
165 165 self.data[plot] = 10*numpy.log10(z)
166 166 if plot == 'cspc':
167 167 self.data[plot] = dataOut.data_cspc
168 168 if plot == 'noise':
169 169 self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
170 170 if plot == 'rti':
171 171 self.data[plot][tm] = dataOut.getPower()
172 172 if plot == 'snr_db':
173 173 self.data['snr'][tm] = dataOut.data_SNR
174 174 if plot == 'snr':
175 175 self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR)
176 176 if plot == 'dop':
177 177 self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP)
178 178 if plot == 'mean':
179 179 self.data[plot][tm] = dataOut.data_MEAN
180 180 if plot == 'std':
181 181 self.data[plot][tm] = dataOut.data_STD
182 182 if plot == 'coh':
183 183 self.data[plot][tm] = dataOut.getCoherence()
184 184 if plot == 'phase':
185 185 self.data[plot][tm] = dataOut.getCoherence(phase=True)
186 186 if plot == 'output':
187 187 self.data[plot][tm] = dataOut.data_output
188 188 if plot == 'param':
189 189 self.data[plot][tm] = dataOut.data_param
190 190
191 191 def normalize_heights(self):
192 192 '''
193 193 Ensure same-dimension of the data for different heighList
194 194 '''
195 195
196 196 H = numpy.array(list(self.__all_heights))
197 197 H.sort()
198 198 for key in self.data:
199 199 shape = self.shape(key)[:-1] + H.shape
200 200 for tm, obj in self.data[key].items():
201 201 h = self.__heights[self.__times.index(tm)]
202 202 if H.size == h.size:
203 203 continue
204 204 index = numpy.where(numpy.in1d(H, h))[0]
205 205 dummy = numpy.zeros(shape) + numpy.nan
206 206 if len(shape) == 2:
207 207 dummy[:, index] = obj
208 208 else:
209 209 dummy[index] = obj
210 210 self.data[key][tm] = dummy
211 211
212 212 self.__heights = [H for tm in self.__times]
213 213
214 214 def jsonify(self, decimate=False):
215 215 '''
216 216 Convert data to json
217 217 '''
218 218
219 219 data = {}
220 220 tm = self.times[-1]
221 221
222 222 for key in self.data:
223 223 if key in ('spc', 'cspc'):
224 224 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
225 225 dy = int(self.data[key].shape[2]/MAXNUMY) + 1
226 226 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
227 227 else:
228 228 data[key] = roundFloats(self.data[key][tm].tolist())
229 229
230 230 ret = {'data': data}
231 231 ret['exp_code'] = self.exp_code
232 232 ret['time'] = tm
233 233 ret['interval'] = self.interval
234 234 ret['localtime'] = self.localtime
235 235 ret['yrange'] = roundFloats(self.heights.tolist())
236 236 if key in ('spc', 'cspc'):
237 237 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
238 else:
239 ret['xrange'] = []
238 240 if hasattr(self, 'pairs'):
239 241 ret['pairs'] = self.pairs
242 else:
243 ret['pairs'] = []
240 244 return json.dumps(ret)
241 245
242 246 @property
243 247 def times(self):
244 248 '''
245 249 Return the list of times of the current data
246 250 '''
247 251
248 252 ret = numpy.array(self.__times)
249 253 ret.sort()
250 254 return ret
251 255
252 256 @property
253 257 def heights(self):
254 258 '''
255 259 Return the list of heights of the current data
256 260 '''
257 261
258 262 return numpy.array(self.__heights[-1])
259 263
260 264 class PublishData(Operation):
261 265 '''
262 266 Operation to send data over zmq.
263 267 '''
264 268
265 269 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
266 270
267 271 def __init__(self, **kwargs):
268 272 """Inicio."""
269 273 Operation.__init__(self, **kwargs)
270 274 self.isConfig = False
271 275 self.client = None
272 276 self.zeromq = None
273 277 self.mqtt = None
274 278
275 279 def on_disconnect(self, client, userdata, rc):
276 280 if rc != 0:
277 281 log.warning('Unexpected disconnection.')
278 282 self.connect()
279 283
280 284 def connect(self):
281 285 log.warning('trying to connect')
282 286 try:
283 287 self.client.connect(
284 288 host=self.host,
285 289 port=self.port,
286 290 keepalive=60*10,
287 291 bind_address='')
288 292 self.client.loop_start()
289 293 # self.client.publish(
290 294 # self.topic + 'SETUP',
291 295 # json.dumps(setup),
292 296 # retain=True
293 297 # )
294 298 except:
295 299 log.error('MQTT Conection error.')
296 300 self.client = False
297 301
298 302 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
299 303 self.counter = 0
300 304 self.topic = kwargs.get('topic', 'schain')
301 305 self.delay = kwargs.get('delay', 0)
302 306 self.plottype = kwargs.get('plottype', 'spectra')
303 307 self.host = kwargs.get('host', "10.10.10.82")
304 308 self.port = kwargs.get('port', 3000)
305 309 self.clientId = clientId
306 310 self.cnt = 0
307 311 self.zeromq = zeromq
308 312 self.mqtt = kwargs.get('plottype', 0)
309 313 self.client = None
310 314 self.verbose = verbose
311 315 setup = []
312 316 if mqtt is 1:
313 317 self.client = mqtt.Client(
314 318 client_id=self.clientId + self.topic + 'SCHAIN',
315 319 clean_session=True)
316 320 self.client.on_disconnect = self.on_disconnect
317 321 self.connect()
318 322 for plot in self.plottype:
319 323 setup.append({
320 324 'plot': plot,
321 325 'topic': self.topic + plot,
322 326 'title': getattr(self, plot + '_' + 'title', False),
323 327 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
324 328 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
325 329 'xrange': getattr(self, plot + '_' + 'xrange', False),
326 330 'yrange': getattr(self, plot + '_' + 'yrange', False),
327 331 'zrange': getattr(self, plot + '_' + 'zrange', False),
328 332 })
329 333 if zeromq is 1:
330 334 context = zmq.Context()
331 335 self.zmq_socket = context.socket(zmq.PUSH)
332 336 server = kwargs.get('server', 'zmq.pipe')
333 337
334 338 if 'tcp://' in server:
335 339 address = server
336 340 else:
337 341 address = 'ipc:///tmp/%s' % server
338 342
339 343 self.zmq_socket.connect(address)
340 344 time.sleep(1)
341 345
342 346
343 347 def publish_data(self):
344 348 self.dataOut.finished = False
345 349 if self.mqtt is 1:
346 350 yData = self.dataOut.heightList[:2].tolist()
347 351 if self.plottype == 'spectra':
348 352 data = getattr(self.dataOut, 'data_spc')
349 353 z = data/self.dataOut.normFactor
350 354 zdB = 10*numpy.log10(z)
351 355 xlen, ylen = zdB[0].shape
352 356 dx = int(xlen/MAXNUMX) + 1
353 357 dy = int(ylen/MAXNUMY) + 1
354 358 Z = [0 for i in self.dataOut.channelList]
355 359 for i in self.dataOut.channelList:
356 360 Z[i] = zdB[i][::dx, ::dy].tolist()
357 361 payload = {
358 362 'timestamp': self.dataOut.utctime,
359 363 'data': roundFloats(Z),
360 364 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
361 365 'interval': self.dataOut.getTimeInterval(),
362 366 'type': self.plottype,
363 367 'yData': yData
364 368 }
365 369
366 370 elif self.plottype in ('rti', 'power'):
367 371 data = getattr(self.dataOut, 'data_spc')
368 372 z = data/self.dataOut.normFactor
369 373 avg = numpy.average(z, axis=1)
370 374 avgdB = 10*numpy.log10(avg)
371 375 xlen, ylen = z[0].shape
372 376 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
373 377 AVG = [0 for i in self.dataOut.channelList]
374 378 for i in self.dataOut.channelList:
375 379 AVG[i] = avgdB[i][::dy].tolist()
376 380 payload = {
377 381 'timestamp': self.dataOut.utctime,
378 382 'data': roundFloats(AVG),
379 383 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
380 384 'interval': self.dataOut.getTimeInterval(),
381 385 'type': self.plottype,
382 386 'yData': yData
383 387 }
384 388 elif self.plottype == 'noise':
385 389 noise = self.dataOut.getNoise()/self.dataOut.normFactor
386 390 noisedB = 10*numpy.log10(noise)
387 391 payload = {
388 392 'timestamp': self.dataOut.utctime,
389 393 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
390 394 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
391 395 'interval': self.dataOut.getTimeInterval(),
392 396 'type': self.plottype,
393 397 'yData': yData
394 398 }
395 399 elif self.plottype == 'snr':
396 400 data = getattr(self.dataOut, 'data_SNR')
397 401 avgdB = 10*numpy.log10(data)
398 402
399 403 ylen = data[0].size
400 404 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
401 405 AVG = [0 for i in self.dataOut.channelList]
402 406 for i in self.dataOut.channelList:
403 407 AVG[i] = avgdB[i][::dy].tolist()
404 408 payload = {
405 409 'timestamp': self.dataOut.utctime,
406 410 'data': roundFloats(AVG),
407 411 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
408 412 'type': self.plottype,
409 413 'yData': yData
410 414 }
411 415 else:
412 416 print "Tipo de grafico invalido"
413 417 payload = {
414 418 'data': 'None',
415 419 'timestamp': 'None',
416 420 'type': None
417 421 }
418 422
419 423 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
420 424
421 425 if self.zeromq is 1:
422 426 if self.verbose:
423 427 log.log(
424 428 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
425 429 self.name
426 430 )
427 431 self.zmq_socket.send_pyobj(self.dataOut)
428 432
429 433 def run(self, dataOut, **kwargs):
430 434 self.dataOut = dataOut
431 435 if not self.isConfig:
432 436 self.setup(**kwargs)
433 437 self.isConfig = True
434 438
435 439 self.publish_data()
436 440 time.sleep(self.delay)
437 441
438 442 def close(self):
439 443 if self.zeromq is 1:
440 444 self.dataOut.finished = True
441 445 self.zmq_socket.send_pyobj(self.dataOut)
442 446 time.sleep(0.1)
443 447 self.zmq_socket.close()
444 448 if self.client:
445 449 self.client.loop_stop()
446 450 self.client.disconnect()
447 451
448 452
449 453 class ReceiverData(ProcessingUnit):
450 454
451 455 __attrs__ = ['server']
452 456
453 457 def __init__(self, **kwargs):
454 458
455 459 ProcessingUnit.__init__(self, **kwargs)
456 460
457 461 self.isConfig = False
458 462 server = kwargs.get('server', 'zmq.pipe')
459 463 if 'tcp://' in server:
460 464 address = server
461 465 else:
462 466 address = 'ipc:///tmp/%s' % server
463 467
464 468 self.address = address
465 469 self.dataOut = JROData()
466 470
467 471 def setup(self):
468 472
469 473 self.context = zmq.Context()
470 474 self.receiver = self.context.socket(zmq.PULL)
471 475 self.receiver.bind(self.address)
472 476 time.sleep(0.5)
473 477 log.success('ReceiverData from {}'.format(self.address))
474 478
475 479
476 480 def run(self):
477 481
478 482 if not self.isConfig:
479 483 self.setup()
480 484 self.isConfig = True
481 485
482 486 self.dataOut = self.receiver.recv_pyobj()
483 487 log.log('{} - {}'.format(self.dataOut.type,
484 488 self.dataOut.datatime.ctime(),),
485 489 'Receiving')
486 490
487 491
488 492 class PlotterReceiver(ProcessingUnit, Process):
489 493
490 494 throttle_value = 5
491 495 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
492 496 'exp_code', 'web_server']
493 497
494 498 def __init__(self, **kwargs):
495 499
496 500 ProcessingUnit.__init__(self, **kwargs)
497 501 Process.__init__(self)
498 502 self.mp = False
499 503 self.isConfig = False
500 504 self.isWebConfig = False
501 505 self.connections = 0
502 506 server = kwargs.get('server', 'zmq.pipe')
503 507 web_server = kwargs.get('web_server', None)
504 508 if 'tcp://' in server:
505 509 address = server
506 510 else:
507 511 address = 'ipc:///tmp/%s' % server
508 512 self.address = address
509 513 self.web_address = web_server
510 514 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
511 515 self.realtime = kwargs.get('realtime', False)
512 516 self.localtime = kwargs.get('localtime', True)
513 517 self.throttle_value = kwargs.get('throttle', 5)
514 518 self.exp_code = kwargs.get('exp_code', None)
515 519 self.sendData = self.initThrottle(self.throttle_value)
516 520 self.dates = []
517 521 self.setup()
518 522
519 523 def setup(self):
520 524
521 525 self.data = Data(self.plottypes, self.throttle_value, self.exp_code)
522 526 self.isConfig = True
523 527
524 528 def event_monitor(self, monitor):
525 529
526 530 events = {}
527 531
528 532 for name in dir(zmq):
529 533 if name.startswith('EVENT_'):
530 534 value = getattr(zmq, name)
531 535 events[value] = name
532 536
533 537 while monitor.poll():
534 538 evt = recv_monitor_message(monitor)
535 539 if evt['event'] == 32:
536 540 self.connections += 1
537 541 if evt['event'] == 512:
538 542 pass
539 543
540 544 evt.update({'description': events[evt['event']]})
541 545
542 546 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
543 547 break
544 548 monitor.close()
545 549 print('event monitor thread done!')
546 550
547 551 def initThrottle(self, throttle_value):
548 552
549 553 @throttle(seconds=throttle_value)
550 554 def sendDataThrottled(fn_sender, data):
551 555 fn_sender(data)
552 556
553 557 return sendDataThrottled
554 558
555 559 def send(self, data):
556 560 log.success('Sending {}'.format(data), self.name)
557 561 self.sender.send_pyobj(data)
558 562
559 563 def run(self):
560 564
561 565 log.success(
562 566 'Starting from {}'.format(self.address),
563 567 self.name
564 568 )
565 569
566 570 self.context = zmq.Context()
567 571 self.receiver = self.context.socket(zmq.PULL)
568 572 self.receiver.bind(self.address)
569 573 monitor = self.receiver.get_monitor_socket()
570 574 self.sender = self.context.socket(zmq.PUB)
571 575 if self.web_address:
572 576 log.success(
573 577 'Sending to web: {}'.format(self.web_address),
574 578 self.name
575 579 )
576 580 self.sender_web = self.context.socket(zmq.PUB)
577 581 self.sender_web.connect(self.web_address)
578 582 time.sleep(1)
579 583
580 584 if 'server' in self.kwargs:
581 585 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
582 586 else:
583 587 self.sender.bind("ipc:///tmp/zmq.plots")
584 588
585 589 time.sleep(2)
586 590
587 591 t = Thread(target=self.event_monitor, args=(monitor,))
588 592 t.start()
589 593
590 594 while True:
591 595 dataOut = self.receiver.recv_pyobj()
592 596 if not dataOut.flagNoData:
593 597 if dataOut.type == 'Parameters':
594 598 tm = dataOut.utctimeInit
595 599 else:
596 600 tm = dataOut.utctime
597 601 if dataOut.useLocalTime:
598 602 if not self.localtime:
599 603 tm += time.timezone
600 604 dt = datetime.datetime.fromtimestamp(tm).date()
601 605 else:
602 606 if self.localtime:
603 607 tm -= time.timezone
604 608 dt = datetime.datetime.utcfromtimestamp(tm).date()
605 609 coerce = False
606 610 if dt not in self.dates:
607 611 if self.data:
608 612 self.data.ended = True
609 613 self.send(self.data)
610 614 coerce = True
611 615 self.data.setup()
612 616 self.dates.append(dt)
613 617
614 618 self.data.update(dataOut, tm)
615 619
616 620 if dataOut.finished is True:
617 621 self.connections -= 1
618 622 if self.connections == 0 and dt in self.dates:
619 623 self.data.ended = True
620 624 self.send(self.data)
621 625 self.data.setup()
622 626 else:
623 627 if self.realtime:
624 628 self.send(self.data)
625 629 if self.web_address:
626 630 self.sender_web.send(self.data.jsonify())
627 631 else:
628 632 self.sendData(self.send, self.data, coerce=coerce)
629 633 coerce = False
630 634
631 635 return
General Comments 0
You need to be logged in to leave comments. Login now