##// END OF EJS Templates
agregado para realtime, sin funcionar
José Chávez -
r902:135486d9969b
parent child
Show More
@@ -1,383 +1,388
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 9 import zmq
10 10 import cPickle as pickle
11 11 import datetime
12 12 from zmq.utils.monitor import recv_monitor_message
13 13 from functools import wraps
14 14 from threading import Thread
15 15 from multiprocessing import Process
16 16
17 17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18 18
19 19 MAXNUMX = 100
20 20 MAXNUMY = 100
21 21
22 22 class PrettyFloat(float):
23 23 def __repr__(self):
24 24 return '%.2f' % self
25 25
26 26 def roundFloats(obj):
27 27 if isinstance(obj, list):
28 28 return map(roundFloats, obj)
29 29 elif isinstance(obj, float):
30 30 return round(obj, 2)
31 31
32 32
33 33 class throttle(object):
34 34 """Decorator that prevents a function from being called more than once every
35 35 time period.
36 36 To create a function that cannot be called more than once a minute, but
37 37 will sleep until it can be called:
38 38 @throttle(minutes=1)
39 39 def foo():
40 40 pass
41 41
42 42 for i in range(10):
43 43 foo()
44 44 print "This function has run %s times." % i
45 45 """
46 46
47 47 def __init__(self, seconds=0, minutes=0, hours=0):
48 48 self.throttle_period = datetime.timedelta(
49 49 seconds=seconds, minutes=minutes, hours=hours
50 50 )
51 51
52 52 self.time_of_last_call = datetime.datetime.min
53 53
54 54 def __call__(self, fn):
55 55 @wraps(fn)
56 56 def wrapper(*args, **kwargs):
57 57 now = datetime.datetime.now()
58 58 time_since_last_call = now - self.time_of_last_call
59 59 time_left = self.throttle_period - time_since_last_call
60 60
61 61 if time_left > datetime.timedelta(seconds=0):
62 62 return
63 63
64 64 self.time_of_last_call = datetime.datetime.now()
65 65 return fn(*args, **kwargs)
66 66
67 67 return wrapper
68 68
69 69
70 70 class PublishData(Operation):
71 71 """Clase publish."""
72 72
73 73 def __init__(self, **kwargs):
74 74 """Inicio."""
75 75 Operation.__init__(self, **kwargs)
76 76 self.isConfig = False
77 77 self.client = None
78 78 self.zeromq = None
79 79 self.mqtt = None
80 80
81 81 def on_disconnect(self, client, userdata, rc):
82 82 if rc != 0:
83 83 print("Unexpected disconnection.")
84 84 self.connect()
85 85
86 86 def connect(self):
87 87 print 'trying to connect'
88 88 try:
89 89 self.client.connect(
90 90 host=self.host,
91 91 port=self.port,
92 92 keepalive=60*10,
93 93 bind_address='')
94 94 self.client.loop_start()
95 95 # self.client.publish(
96 96 # self.topic + 'SETUP',
97 97 # json.dumps(setup),
98 98 # retain=True
99 99 # )
100 100 except:
101 101 print "MQTT Conection error."
102 102 self.client = False
103 103
104 104 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
105 105 self.counter = 0
106 106 self.topic = kwargs.get('topic', 'schain')
107 107 self.delay = kwargs.get('delay', 0)
108 108 self.plottype = kwargs.get('plottype', 'spectra')
109 109 self.host = kwargs.get('host', "10.10.10.82")
110 110 self.port = kwargs.get('port', 3000)
111 111 self.clientId = clientId
112 112 self.cnt = 0
113 113 self.zeromq = zeromq
114 114 self.mqtt = kwargs.get('plottype', 0)
115 115 self.client = None
116 116 setup = []
117 117 if mqtt is 1:
118 118 self.client = mqtt.Client(
119 119 client_id=self.clientId + self.topic + 'SCHAIN',
120 120 clean_session=True)
121 121 self.client.on_disconnect = self.on_disconnect
122 122 self.connect()
123 123 for plot in self.plottype:
124 124 setup.append({
125 125 'plot': plot,
126 126 'topic': self.topic + plot,
127 127 'title': getattr(self, plot + '_' + 'title', False),
128 128 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
129 129 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
130 130 'xrange': getattr(self, plot + '_' + 'xrange', False),
131 131 'yrange': getattr(self, plot + '_' + 'yrange', False),
132 132 'zrange': getattr(self, plot + '_' + 'zrange', False),
133 133 })
134 134 if zeromq is 1:
135 135 context = zmq.Context()
136 136 self.zmq_socket = context.socket(zmq.PUSH)
137 137 server = kwargs.get('server', 'zmq.pipe')
138 138
139 139 if 'tcp://' in server:
140 140 address = server
141 141 else:
142 142 address = 'ipc:///tmp/%s' % server
143 143
144 144 self.zmq_socket.connect(address)
145 145 time.sleep(1)
146 146
147 147
148 148
149 149 def publish_data(self):
150 150 self.dataOut.finished = False
151 151 if self.mqtt is 1:
152 152 yData = self.dataOut.heightList[:2].tolist()
153 153 if self.plottype == 'spectra':
154 154 data = getattr(self.dataOut, 'data_spc')
155 155 z = data/self.dataOut.normFactor
156 156 zdB = 10*numpy.log10(z)
157 157 xlen, ylen = zdB[0].shape
158 158 dx = int(xlen/MAXNUMX) + 1
159 159 dy = int(ylen/MAXNUMY) + 1
160 160 Z = [0 for i in self.dataOut.channelList]
161 161 for i in self.dataOut.channelList:
162 162 Z[i] = zdB[i][::dx, ::dy].tolist()
163 163 payload = {
164 164 'timestamp': self.dataOut.utctime,
165 165 'data': roundFloats(Z),
166 166 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
167 167 'interval': self.dataOut.getTimeInterval(),
168 168 'type': self.plottype,
169 169 'yData': yData
170 170 }
171 171 # print payload
172 172
173 173 elif self.plottype in ('rti', 'power'):
174 174 data = getattr(self.dataOut, 'data_spc')
175 175 z = data/self.dataOut.normFactor
176 176 avg = numpy.average(z, axis=1)
177 177 avgdB = 10*numpy.log10(avg)
178 178 xlen, ylen = z[0].shape
179 179 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
180 180 AVG = [0 for i in self.dataOut.channelList]
181 181 for i in self.dataOut.channelList:
182 182 AVG[i] = avgdB[i][::dy].tolist()
183 183 payload = {
184 184 'timestamp': self.dataOut.utctime,
185 185 'data': roundFloats(AVG),
186 186 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
187 187 'interval': self.dataOut.getTimeInterval(),
188 188 'type': self.plottype,
189 189 'yData': yData
190 190 }
191 191 elif self.plottype == 'noise':
192 192 noise = self.dataOut.getNoise()/self.dataOut.normFactor
193 193 noisedB = 10*numpy.log10(noise)
194 194 payload = {
195 195 'timestamp': self.dataOut.utctime,
196 196 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
197 197 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
198 198 'interval': self.dataOut.getTimeInterval(),
199 199 'type': self.plottype,
200 200 'yData': yData
201 201 }
202 202 elif self.plottype == 'snr':
203 203 data = getattr(self.dataOut, 'data_SNR')
204 204 avgdB = 10*numpy.log10(data)
205 205
206 206 ylen = data[0].size
207 207 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
208 208 AVG = [0 for i in self.dataOut.channelList]
209 209 for i in self.dataOut.channelList:
210 210 AVG[i] = avgdB[i][::dy].tolist()
211 211 payload = {
212 212 'timestamp': self.dataOut.utctime,
213 213 'data': roundFloats(AVG),
214 214 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
215 215 'type': self.plottype,
216 216 'yData': yData
217 217 }
218 218 else:
219 219 print "Tipo de grafico invalido"
220 220 payload = {
221 221 'data': 'None',
222 222 'timestamp': 'None',
223 223 'type': None
224 224 }
225 225 # print 'Publishing data to {}'.format(self.host)
226 226 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
227 227
228 228 if self.zeromq is 1:
229 229 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
230 230 self.zmq_socket.send_pyobj(self.dataOut)
231 231
232 232 def run(self, dataOut, **kwargs):
233 233 self.dataOut = dataOut
234 234 if not self.isConfig:
235 235 self.setup(**kwargs)
236 236 self.isConfig = True
237 237
238 238 self.publish_data()
239 239 time.sleep(self.delay)
240 240
241 241 def close(self):
242 242 if self.zeromq is 1:
243 243 self.dataOut.finished = True
244 244 self.zmq_socket.send_pyobj(self.dataOut)
245 245
246 246 if self.client:
247 247 self.client.loop_stop()
248 248 self.client.disconnect()
249 249
250 250
251 251 class ReceiverData(ProcessingUnit, Process):
252 252
253 253 throttle_value = 5
254 254
255 255 def __init__(self, **kwargs):
256 256
257 257 ProcessingUnit.__init__(self, **kwargs)
258 258 Process.__init__(self)
259 259 self.mp = False
260 260 self.isConfig = False
261 261 self.plottypes =[]
262 262 self.connections = 0
263 263 server = kwargs.get('server', 'zmq.pipe')
264 264 if 'tcp://' in server:
265 265 address = server
266 266 else:
267 267 address = 'ipc:///tmp/%s' % server
268 268
269 269 self.address = address
270 270 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
271 271 self.realtime = kwargs.get('realtime', False)
272 272 self.throttle_value = kwargs.get('throttle', 10)
273 273 self.sendData = self.initThrottle(self.throttle_value)
274 274 self.setup()
275 275
276 276 def setup(self):
277 277
278 278 self.data = {}
279 279 self.data['times'] = []
280 280 for plottype in self.plottypes:
281 281 self.data[plottype] = {}
282 282 self.data['noise'] = {}
283 283 self.data['throttle'] = self.throttle_value
284 284 self.data['ENDED'] = False
285 285 self.isConfig = True
286 self.data_web = {}
286 287
287 288 def event_monitor(self, monitor):
288 289
289 290 events = {}
290 291
291 292 for name in dir(zmq):
292 293 if name.startswith('EVENT_'):
293 294 value = getattr(zmq, name)
294 295 events[value] = name
295 296
296 297 while monitor.poll():
297 298 evt = recv_monitor_message(monitor)
298 299 if evt['event'] == 32:
299 300 self.connections += 1
300 301 if evt['event'] == 512:
301 302 pass
302 303 if self.connections == 0 and self.started is True:
303 304 self.ended = True
304 305 # send('ENDED')
305 306 evt.update({'description': events[evt['event']]})
306 307
307 308 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
308 309 break
309 310 monitor.close()
310 311 print("event monitor thread done!")
311 312
312 313 def initThrottle(self, throttle_value):
313 314
314 315 @throttle(seconds=throttle_value)
315 316 def sendDataThrottled(fn_sender, data):
316 317 fn_sender(data)
317 318
318 319 return sendDataThrottled
319 320
320 321 def send(self, data):
321 322 print '[sending] data=%s size=%s' % (data.keys(), len(data['times']))
322 323 self.sender.send_pyobj(data)
323 324
324 325 def update(self):
325 326
326 327 t = self.dataOut.ltctime
327 328 self.data['times'].append(t)
328 329 self.data['dataOut'] = self.dataOut
329 330 for plottype in self.plottypes:
330
331 331 if plottype == 'spc':
332 332 z = self.dataOut.data_spc/self.dataOut.normFactor
333 333 self.data[plottype] = 10*numpy.log10(z)
334 334 self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor)
335 335 if plottype == 'rti':
336 336 self.data[plottype][t] = self.dataOut.getPower()
337 337 if plottype == 'snr':
338 338 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR)
339 339 if plottype == 'dop':
340 340 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP)
341 341 if plottype == 'coh':
342 342 self.data[plottype][t] = self.dataOut.getCoherence()
343 343 if plottype == 'phase':
344 344 self.data[plottype][t] = self.dataOut.getCoherence(phase=True)
345
345 if self.realtime:
346 self.data_web[plottype] = self.data[plottype][t]
347 self.data_web['time'] = t
346 348 def run(self):
347 349
348 350 print '[Starting] {} from {}'.format(self.name, self.address)
349 351
350 352 self.context = zmq.Context()
351 353 self.receiver = self.context.socket(zmq.PULL)
352 354 self.receiver.bind(self.address)
353 355 monitor = self.receiver.get_monitor_socket()
354 356 self.sender = self.context.socket(zmq.PUB)
355
357 if self.realtime:
358 self.sender_web = self.context.socket(zmq.PUB)
359 self.sender.bind("ipc:///tmp/zmq.web")
356 360 self.sender.bind("ipc:///tmp/zmq.plots")
357 361
358 t = Thread(target=self.event_monitor, args=(monitor,))
362 t = Thread(target=self.event_monitor)
359 363 t.start()
360 364
361 365 while True:
362 366 self.dataOut = self.receiver.recv_pyobj()
363 367 # print '[Receiving] {} - {}'.format(self.dataOut.type,
364 368 # self.dataOut.datatime.ctime())
365 369
366 370 self.update()
367 371
368 372 if self.dataOut.finished is True:
369 373 self.send(self.data)
370 374 self.connections -= 1
371 375 if self.connections == 0 and self.started:
372 376 self.ended = True
373 377 self.data['ENDED'] = True
374 378 self.send(self.data)
375 379 self.setup()
376 380 else:
377 381 if self.realtime:
378 382 self.send(self.data)
383 self.sender_web.send_json(json.dumps(self.data_web))
379 384 else:
380 385 self.sendData(self.send, self.data)
381 386 self.started = True
382 387
383 388 return
@@ -1,48 +1,50
1 1 #!/usr/bin/env python
2 2 '''
3 3 Created on Jul 7, 2014
4 4
5 5 @author: roj-idl71
6 6 '''
7 7 import os, sys
8 8
9 9 from schainpy.controller import Project
10 10
11 11 if __name__ == '__main__':
12 12 desc = "Segundo Test"
13 13
14 14 controllerObj = Project()
15 15 controllerObj.setup(id='191', name='test01', description=desc)
16 16
17 17 proc1 = controllerObj.addProcUnit(name='ReceiverData')
18 proc1.addParameter(name='realtime', value='0', format='bool')
19 proc1.addParameter(name='plottypes', value='rti,coh,phase', format='str')
20 proc1.addParameter(name='throttle', value='10', format='int')
18 proc1.addParameter(name='realtime', value='1', format='bool')
21 19
22 op1 = proc1.addOperation(name='PlotRTIData', optype='other')
23 op1.addParameter(name='wintitle', value='Julia 150Km', format='str')
24 op1.addParameter(name='save', value='/home/nanosat/Pictures', format='str')
20 proc1.addParameter(name='plottypes', value='rti', format='str')
21 proc1.addParameter(name='throttle', value='10', format='int')
22 ## TODO Agregar direccion de server de publicacion a graficos como variable
25 23
26 op2 = proc1.addOperation(name='PlotCOHData', optype='other')
27 op2.addParameter(name='wintitle', value='Julia 150Km', format='str')
28 op2.addParameter(name='save', value='/home/nanosat/Pictures', format='str')
24 # op1 = proc1.addOperation(name='PlotRTIData', optype='other')
25 # op1.addParameter(name='wintitle', value='Julia 150Km', format='str')
26 # op1.addParameter(name='save', value='/home/nanosat/Pictures', format='str')
29 27 #
30 op6 = proc1.addOperation(name='PlotPHASEData', optype='other')
31 op6.addParameter(name='wintitle', value='Julia 150Km', format='str')
32 op6.addParameter(name='save', value='/home/nanosat/Pictures', format='str')
28 # op2 = proc1.addOperation(name='PlotCOHData', optype='other')
29 # op2.addParameter(name='wintitle', value='Julia 150Km', format='str')
30 # op2.addParameter(name='save', value='/home/nanosat/Pictures', format='str')
31 # #
32 # op6 = proc1.addOperation(name='PlotPHASEData', optype='other')
33 # op6.addParameter(name='wintitle', value='Julia 150Km', format='str')
34 # op6.addParameter(name='save', value='/home/nanosat/Pictures', format='str')
33 35 #
34 36 # proc2 = controllerObj.addProcUnit(name='ReceiverData')
35 37 # proc2.addParameter(name='server', value='juanca', format='str')
36 38 # proc2.addParameter(name='plottypes', value='snr,dop', format='str')
37 39 #
38 40 # op3 = proc2.addOperation(name='PlotSNRData', optype='other')
39 41 # op3.addParameter(name='wintitle', value='Julia 150Km', format='str')
40 42 # op3.addParameter(name='save', value='/home/nanosat/Pictures', format='str')
41 43 #
42 44 # op4 = proc2.addOperation(name='PlotDOPData', optype='other')
43 45 # op4.addParameter(name='wintitle', value='Julia 150Km', format='str')
44 46 # op4.addParameter(name='save', value='/home/nanosat/Pictures', format='str')
45 47
46 48
47 49
48 50 controllerObj.start()
General Comments 0
You need to be logged in to leave comments. Login now