##// END OF EJS Templates
Merge remote into local...
jespinoza -
r912:f65929d2cf32 merge
parent child
Show More
@@ -1,422 +1,424
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 9 import zmq
10 10 import cPickle as pickle
11 11 import datetime
12 12 from zmq.utils.monitor import recv_monitor_message
13 13 from functools import wraps
14 14 from threading import Thread
15 15 from multiprocessing import Process
16 16
17 17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18 18
19 19 MAXNUMX = 100
20 20 MAXNUMY = 100
21 21
22 22 class PrettyFloat(float):
23 23 def __repr__(self):
24 24 return '%.2f' % self
25 25
26 26 def roundFloats(obj):
27 27 if isinstance(obj, list):
28 28 return map(roundFloats, obj)
29 29 elif isinstance(obj, float):
30 30 return round(obj, 2)
31 31
32 32 def decimate(z):
33 33 # dx = int(len(self.x)/self.__MAXNUMX) + 1
34 34
35 35 dy = int(len(z[0])/MAXNUMY) + 1
36 36
37 37 return z[::, ::dy]
38 38
39 39 class throttle(object):
40 40 """Decorator that prevents a function from being called more than once every
41 41 time period.
42 42 To create a function that cannot be called more than once a minute, but
43 43 will sleep until it can be called:
44 44 @throttle(minutes=1)
45 45 def foo():
46 46 pass
47 47
48 48 for i in range(10):
49 49 foo()
50 50 print "This function has run %s times." % i
51 51 """
52 52
53 53 def __init__(self, seconds=0, minutes=0, hours=0):
54 54 self.throttle_period = datetime.timedelta(
55 55 seconds=seconds, minutes=minutes, hours=hours
56 56 )
57 57
58 58 self.time_of_last_call = datetime.datetime.min
59 59
60 60 def __call__(self, fn):
61 61 @wraps(fn)
62 62 def wrapper(*args, **kwargs):
63 63 now = datetime.datetime.now()
64 64 time_since_last_call = now - self.time_of_last_call
65 65 time_left = self.throttle_period - time_since_last_call
66 66
67 67 if time_left > datetime.timedelta(seconds=0):
68 68 return
69 69
70 70 self.time_of_last_call = datetime.datetime.now()
71 71 return fn(*args, **kwargs)
72 72
73 73 return wrapper
74 74
75 75
76 76 class PublishData(Operation):
77 77 """Clase publish."""
78 78
79 79 def __init__(self, **kwargs):
80 80 """Inicio."""
81 81 Operation.__init__(self, **kwargs)
82 82 self.isConfig = False
83 83 self.client = None
84 84 self.zeromq = None
85 85 self.mqtt = None
86 86
87 87 def on_disconnect(self, client, userdata, rc):
88 88 if rc != 0:
89 89 print("Unexpected disconnection.")
90 90 self.connect()
91 91
92 92 def connect(self):
93 93 print 'trying to connect'
94 94 try:
95 95 self.client.connect(
96 96 host=self.host,
97 97 port=self.port,
98 98 keepalive=60*10,
99 99 bind_address='')
100 100 self.client.loop_start()
101 101 # self.client.publish(
102 102 # self.topic + 'SETUP',
103 103 # json.dumps(setup),
104 104 # retain=True
105 105 # )
106 106 except:
107 107 print "MQTT Conection error."
108 108 self.client = False
109 109
110 110 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
111 111 self.counter = 0
112 112 self.topic = kwargs.get('topic', 'schain')
113 113 self.delay = kwargs.get('delay', 0)
114 114 self.plottype = kwargs.get('plottype', 'spectra')
115 115 self.host = kwargs.get('host', "10.10.10.82")
116 116 self.port = kwargs.get('port', 3000)
117 117 self.clientId = clientId
118 118 self.cnt = 0
119 119 self.zeromq = zeromq
120 120 self.mqtt = kwargs.get('plottype', 0)
121 121 self.client = None
122 122 setup = []
123 123 if mqtt is 1:
124 124 self.client = mqtt.Client(
125 125 client_id=self.clientId + self.topic + 'SCHAIN',
126 126 clean_session=True)
127 127 self.client.on_disconnect = self.on_disconnect
128 128 self.connect()
129 129 for plot in self.plottype:
130 130 setup.append({
131 131 'plot': plot,
132 132 'topic': self.topic + plot,
133 133 'title': getattr(self, plot + '_' + 'title', False),
134 134 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
135 135 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
136 136 'xrange': getattr(self, plot + '_' + 'xrange', False),
137 137 'yrange': getattr(self, plot + '_' + 'yrange', False),
138 138 'zrange': getattr(self, plot + '_' + 'zrange', False),
139 139 })
140 140 if zeromq is 1:
141 141 context = zmq.Context()
142 142 self.zmq_socket = context.socket(zmq.PUSH)
143 143 server = kwargs.get('server', 'zmq.pipe')
144 144
145 145 if 'tcp://' in server:
146 146 address = server
147 147 else:
148 148 address = 'ipc:///tmp/%s' % server
149 149
150 150 self.zmq_socket.connect(address)
151 151 time.sleep(1)
152 152
153 153 def publish_data(self):
154 154 self.dataOut.finished = False
155 155 if self.mqtt is 1:
156 156 yData = self.dataOut.heightList[:2].tolist()
157 157 if self.plottype == 'spectra':
158 158 data = getattr(self.dataOut, 'data_spc')
159 159 z = data/self.dataOut.normFactor
160 160 zdB = 10*numpy.log10(z)
161 161 xlen, ylen = zdB[0].shape
162 162 dx = int(xlen/MAXNUMX) + 1
163 163 dy = int(ylen/MAXNUMY) + 1
164 164 Z = [0 for i in self.dataOut.channelList]
165 165 for i in self.dataOut.channelList:
166 166 Z[i] = zdB[i][::dx, ::dy].tolist()
167 167 payload = {
168 168 'timestamp': self.dataOut.utctime,
169 169 'data': roundFloats(Z),
170 170 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
171 171 'interval': self.dataOut.getTimeInterval(),
172 172 'type': self.plottype,
173 173 'yData': yData
174 174 }
175 175 # print payload
176 176
177 177 elif self.plottype in ('rti', 'power'):
178 178 data = getattr(self.dataOut, 'data_spc')
179 179 z = data/self.dataOut.normFactor
180 180 avg = numpy.average(z, axis=1)
181 181 avgdB = 10*numpy.log10(avg)
182 182 xlen, ylen = z[0].shape
183 183 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
184 184 AVG = [0 for i in self.dataOut.channelList]
185 185 for i in self.dataOut.channelList:
186 186 AVG[i] = avgdB[i][::dy].tolist()
187 187 payload = {
188 188 'timestamp': self.dataOut.utctime,
189 189 'data': roundFloats(AVG),
190 190 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
191 191 'interval': self.dataOut.getTimeInterval(),
192 192 'type': self.plottype,
193 193 'yData': yData
194 194 }
195 195 elif self.plottype == 'noise':
196 196 noise = self.dataOut.getNoise()/self.dataOut.normFactor
197 197 noisedB = 10*numpy.log10(noise)
198 198 payload = {
199 199 'timestamp': self.dataOut.utctime,
200 200 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
201 201 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
202 202 'interval': self.dataOut.getTimeInterval(),
203 203 'type': self.plottype,
204 204 'yData': yData
205 205 }
206 206 elif self.plottype == 'snr':
207 207 data = getattr(self.dataOut, 'data_SNR')
208 208 avgdB = 10*numpy.log10(data)
209 209
210 210 ylen = data[0].size
211 211 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
212 212 AVG = [0 for i in self.dataOut.channelList]
213 213 for i in self.dataOut.channelList:
214 214 AVG[i] = avgdB[i][::dy].tolist()
215 215 payload = {
216 216 'timestamp': self.dataOut.utctime,
217 217 'data': roundFloats(AVG),
218 218 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
219 219 'type': self.plottype,
220 220 'yData': yData
221 221 }
222 222 else:
223 223 print "Tipo de grafico invalido"
224 224 payload = {
225 225 'data': 'None',
226 226 'timestamp': 'None',
227 227 'type': None
228 228 }
229 229 # print 'Publishing data to {}'.format(self.host)
230 230 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
231 231
232 232 if self.zeromq is 1:
233 233 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
234 234 self.zmq_socket.send_pyobj(self.dataOut)
235 235
236 236 def run(self, dataOut, **kwargs):
237 237 self.dataOut = dataOut
238 238 if not self.isConfig:
239 239 self.setup(**kwargs)
240 240 self.isConfig = True
241 241
242 242 self.publish_data()
243 243 time.sleep(self.delay)
244 244
245 245 def close(self):
246 246 if self.zeromq is 1:
247 247 self.dataOut.finished = True
248 248 self.zmq_socket.send_pyobj(self.dataOut)
249 249
250 250 if self.client:
251 251 self.client.loop_stop()
252 252 self.client.disconnect()
253 253
254 254
255 255 class ReceiverData(ProcessingUnit, Process):
256 256
257 257 throttle_value = 5
258 258
259 259 def __init__(self, **kwargs):
260 260
261 261 ProcessingUnit.__init__(self, **kwargs)
262 262 Process.__init__(self)
263 263 self.mp = False
264 264 self.isConfig = False
265 265 self.isWebConfig = False
266 266 self.plottypes =[]
267 267 self.connections = 0
268 268 server = kwargs.get('server', 'zmq.pipe')
269 269 plot_server = kwargs.get('plot_server', 'zmq.web')
270 270 if 'tcp://' in server:
271 271 address = server
272 272 else:
273 273 address = 'ipc:///tmp/%s' % server
274 274
275 275 if 'tcp://' in plot_server:
276 276 plot_address = plot_server
277 277 else:
278 278 plot_address = 'ipc:///tmp/%s' % plot_server
279 279
280 280 self.address = address
281 281 self.plot_address = plot_address
282 282 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
283 283 self.realtime = kwargs.get('realtime', False)
284 284 self.throttle_value = kwargs.get('throttle', 10)
285 285 self.sendData = self.initThrottle(self.throttle_value)
286 286 self.setup()
287 287
288 288 def setup(self):
289 289
290 290 self.data = {}
291 291 self.data['times'] = []
292 292 for plottype in self.plottypes:
293 293 self.data[plottype] = {}
294 294 self.data['noise'] = {}
295 295 self.data['throttle'] = self.throttle_value
296 296 self.data['ENDED'] = False
297 297 self.isConfig = True
298 298 self.data_web = {}
299 299
300 300 def event_monitor(self, monitor):
301 301
302 302 events = {}
303 303
304 304 for name in dir(zmq):
305 305 if name.startswith('EVENT_'):
306 306 value = getattr(zmq, name)
307 307 events[value] = name
308 308
309 309 while monitor.poll():
310 310 evt = recv_monitor_message(monitor)
311 311 if evt['event'] == 32:
312 312 self.connections += 1
313 313 if evt['event'] == 512:
314 314 pass
315 315 if self.connections == 0 and self.started is True:
316 316 self.ended = True
317 317 # send('ENDED')
318 318 evt.update({'description': events[evt['event']]})
319 319
320 320 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
321 321 break
322 322 monitor.close()
323 323 print("event monitor thread done!")
324 324
325 325 def initThrottle(self, throttle_value):
326 326
327 327 @throttle(seconds=throttle_value)
328 328 def sendDataThrottled(fn_sender, data):
329 329 fn_sender(data)
330 330
331 331 return sendDataThrottled
332 332
333 333 def send(self, data):
334 334 # print '[sending] data=%s size=%s' % (data.keys(), len(data['times']))
335 335 self.sender.send_pyobj(data)
336 336
337 337 def update(self):
338 338
339 339 t = self.dataOut.ltctime
340 340 self.data['times'].append(t)
341 341 self.data['dataOut'] = self.dataOut
342 342 for plottype in self.plottypes:
343 343 if plottype == 'spc':
344 344 z = self.dataOut.data_spc/self.dataOut.normFactor
345 345 self.data[plottype] = 10*numpy.log10(z)
346 346 self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor)
347 347 if plottype == 'rti':
348 348 self.data[plottype][t] = self.dataOut.getPower()
349 349 if plottype == 'snr':
350 350 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR)
351 351 if plottype == 'dop':
352 352 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP)
353 353 if plottype == 'coh':
354 354 self.data[plottype][t] = self.dataOut.getCoherence()
355 355 if plottype == 'phase':
356 356 self.data[plottype][t] = self.dataOut.getCoherence(phase=True)
357 357 if self.realtime:
358 358 if plottype == 'spc':
359 359 self.data_web[plottype] = roundFloats(decimate(self.data[plottype]).tolist())
360 360 else:
361 361 self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist())
362 362 self.data_web['time'] = t
363
363 self.data_web['interval'] = self.dataOut.getTimeInterval()
364 self.data_web['type'] = plottype
365
364 366 def run(self):
365 367
366 368 print '[Starting] {} from {}'.format(self.name, self.address)
367 369
368 370 self.context = zmq.Context()
369 371 self.receiver = self.context.socket(zmq.PULL)
370 372 self.receiver.bind(self.address)
371 373 monitor = self.receiver.get_monitor_socket()
372 374 self.sender = self.context.socket(zmq.PUB)
373 375 if self.realtime:
374 376 self.sender_web = self.context.socket(zmq.PUB)
375 377 self.sender_web.bind(self.plot_address)
376 378 self.sender.bind("ipc:///tmp/zmq.plots")
377 379
378 380 t = Thread(target=self.event_monitor, args=(monitor,))
379 381 t.start()
380 382
381 383 while True:
382 384 self.dataOut = self.receiver.recv_pyobj()
383 385 # print '[Receiving] {} - {}'.format(self.dataOut.type,
384 386 # self.dataOut.datatime.ctime())
385 387
386 388 self.update()
387 389
388 390 if self.dataOut.finished is True:
389 391 self.send(self.data)
390 392 self.connections -= 1
391 393 if self.connections == 0 and self.started:
392 394 self.ended = True
393 395 self.data['ENDED'] = True
394 396 self.send(self.data)
395 397 self.setup()
396 398 else:
397 399 if self.realtime:
398 400 self.send(self.data)
399 401 self.sender_web.send_string(json.dumps(self.data_web))
400 402 else:
401 403 self.sendData(self.send, self.data)
402 404 self.started = True
403 405
404 406 return
405 407
406 408 def sendToWeb(self):
407 409
408 410 if not self.isWebConfig:
409 411 context = zmq.Context()
410 412 sender_web_config = context.socket(zmq.PUB)
411 413 if 'tcp://' in self.plot_address:
412 414 dum, address, port = self.plot_address.split(':')
413 415 conf_address = '{}:{}:{}'.format(dum, address, int(port)+1)
414 416 else:
415 417 conf_address = self.plot_address + '.config'
416 418 sender_web_config.bind(conf_address)
417 419 time.sleep(1)
418 420 for kwargs in self.operationKwargs.values():
419 421 if 'plot' in kwargs:
420 422 print '[Sending] Config data to web for {}'.format(kwargs['code'].upper())
421 423 sender_web_config.send_string(json.dumps(kwargs))
422 424 self.isWebConfig = True
General Comments 0
You need to be logged in to leave comments. Login now