##// END OF EJS Templates
Add default values in jsonify Data
jespinoza -
r1127:be11c71d5d2b
parent child
Show More
@@ -1,631 +1,635
1 '''
1 '''
2 @author: Juan C. Espinoza
2 @author: Juan C. Espinoza
3 '''
3 '''
4
4
5 import time
5 import time
6 import json
6 import json
7 import numpy
7 import numpy
8 import paho.mqtt.client as mqtt
8 import paho.mqtt.client as mqtt
9 import zmq
9 import zmq
10 import datetime
10 import datetime
11 from zmq.utils.monitor import recv_monitor_message
11 from zmq.utils.monitor import recv_monitor_message
12 from functools import wraps
12 from functools import wraps
13 from threading import Thread
13 from threading import Thread
14 from multiprocessing import Process
14 from multiprocessing import Process
15
15
16 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
16 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 from schainpy.model.data.jrodata import JROData
17 from schainpy.model.data.jrodata import JROData
18 from schainpy.utils import log
18 from schainpy.utils import log
19
19
20 MAXNUMX = 100
20 MAXNUMX = 100
21 MAXNUMY = 100
21 MAXNUMY = 100
22
22
23 class PrettyFloat(float):
23 class PrettyFloat(float):
24 def __repr__(self):
24 def __repr__(self):
25 return '%.2f' % self
25 return '%.2f' % self
26
26
27 def roundFloats(obj):
27 def roundFloats(obj):
28 if isinstance(obj, list):
28 if isinstance(obj, list):
29 return map(roundFloats, obj)
29 return map(roundFloats, obj)
30 elif isinstance(obj, float):
30 elif isinstance(obj, float):
31 return round(obj, 2)
31 return round(obj, 2)
32
32
33 def decimate(z, MAXNUMY):
33 def decimate(z, MAXNUMY):
34 dy = int(len(z[0])/MAXNUMY) + 1
34 dy = int(len(z[0])/MAXNUMY) + 1
35
35
36 return z[::, ::dy]
36 return z[::, ::dy]
37
37
38 class throttle(object):
38 class throttle(object):
39 '''
39 '''
40 Decorator that prevents a function from being called more than once every
40 Decorator that prevents a function from being called more than once every
41 time period.
41 time period.
42 To create a function that cannot be called more than once a minute, but
42 To create a function that cannot be called more than once a minute, but
43 will sleep until it can be called:
43 will sleep until it can be called:
44 @throttle(minutes=1)
44 @throttle(minutes=1)
45 def foo():
45 def foo():
46 pass
46 pass
47
47
48 for i in range(10):
48 for i in range(10):
49 foo()
49 foo()
50 print "This function has run %s times." % i
50 print "This function has run %s times." % i
51 '''
51 '''
52
52
53 def __init__(self, seconds=0, minutes=0, hours=0):
53 def __init__(self, seconds=0, minutes=0, hours=0):
54 self.throttle_period = datetime.timedelta(
54 self.throttle_period = datetime.timedelta(
55 seconds=seconds, minutes=minutes, hours=hours
55 seconds=seconds, minutes=minutes, hours=hours
56 )
56 )
57
57
58 self.time_of_last_call = datetime.datetime.min
58 self.time_of_last_call = datetime.datetime.min
59
59
60 def __call__(self, fn):
60 def __call__(self, fn):
61 @wraps(fn)
61 @wraps(fn)
62 def wrapper(*args, **kwargs):
62 def wrapper(*args, **kwargs):
63 coerce = kwargs.pop('coerce', None)
63 coerce = kwargs.pop('coerce', None)
64 if coerce:
64 if coerce:
65 self.time_of_last_call = datetime.datetime.now()
65 self.time_of_last_call = datetime.datetime.now()
66 return fn(*args, **kwargs)
66 return fn(*args, **kwargs)
67 else:
67 else:
68 now = datetime.datetime.now()
68 now = datetime.datetime.now()
69 time_since_last_call = now - self.time_of_last_call
69 time_since_last_call = now - self.time_of_last_call
70 time_left = self.throttle_period - time_since_last_call
70 time_left = self.throttle_period - time_since_last_call
71
71
72 if time_left > datetime.timedelta(seconds=0):
72 if time_left > datetime.timedelta(seconds=0):
73 return
73 return
74
74
75 self.time_of_last_call = datetime.datetime.now()
75 self.time_of_last_call = datetime.datetime.now()
76 return fn(*args, **kwargs)
76 return fn(*args, **kwargs)
77
77
78 return wrapper
78 return wrapper
79
79
80 class Data(object):
80 class Data(object):
81 '''
81 '''
82 Object to hold data to be plotted
82 Object to hold data to be plotted
83 '''
83 '''
84
84
85 def __init__(self, plottypes, throttle_value, exp_code):
85 def __init__(self, plottypes, throttle_value, exp_code):
86 self.plottypes = plottypes
86 self.plottypes = plottypes
87 self.throttle = throttle_value
87 self.throttle = throttle_value
88 self.exp_code = exp_code
88 self.exp_code = exp_code
89 self.ended = False
89 self.ended = False
90 self.localtime = False
90 self.localtime = False
91 self.__times = []
91 self.__times = []
92 self.__heights = []
92 self.__heights = []
93
93
94 def __str__(self):
94 def __str__(self):
95 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
95 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
96 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
96 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
97
97
98 def __len__(self):
98 def __len__(self):
99 return len(self.__times)
99 return len(self.__times)
100
100
101 def __getitem__(self, key):
101 def __getitem__(self, key):
102 if key not in self.data:
102 if key not in self.data:
103 raise KeyError(log.error('Missing key: {}'.format(key)))
103 raise KeyError(log.error('Missing key: {}'.format(key)))
104
104
105 if 'spc' in key:
105 if 'spc' in key:
106 ret = self.data[key]
106 ret = self.data[key]
107 else:
107 else:
108 ret = numpy.array([self.data[key][x] for x in self.times])
108 ret = numpy.array([self.data[key][x] for x in self.times])
109 if ret.ndim > 1:
109 if ret.ndim > 1:
110 ret = numpy.swapaxes(ret, 0, 1)
110 ret = numpy.swapaxes(ret, 0, 1)
111 return ret
111 return ret
112
112
113 def __contains__(self, key):
113 def __contains__(self, key):
114 return key in self.data
114 return key in self.data
115
115
116 def setup(self):
116 def setup(self):
117 '''
117 '''
118 Configure object
118 Configure object
119 '''
119 '''
120
120
121 self.ended = False
121 self.ended = False
122 self.data = {}
122 self.data = {}
123 self.__times = []
123 self.__times = []
124 self.__heights = []
124 self.__heights = []
125 self.__all_heights = set()
125 self.__all_heights = set()
126 for plot in self.plottypes:
126 for plot in self.plottypes:
127 if 'snr' in plot:
127 if 'snr' in plot:
128 plot = 'snr'
128 plot = 'snr'
129 self.data[plot] = {}
129 self.data[plot] = {}
130
130
131 def shape(self, key):
131 def shape(self, key):
132 '''
132 '''
133 Get the shape of the one-element data for the given key
133 Get the shape of the one-element data for the given key
134 '''
134 '''
135
135
136 if len(self.data[key]):
136 if len(self.data[key]):
137 if 'spc' in key:
137 if 'spc' in key:
138 return self.data[key].shape
138 return self.data[key].shape
139 return self.data[key][self.__times[0]].shape
139 return self.data[key][self.__times[0]].shape
140 return (0,)
140 return (0,)
141
141
142 def update(self, dataOut, tm):
142 def update(self, dataOut, tm):
143 '''
143 '''
144 Update data object with new dataOut
144 Update data object with new dataOut
145 '''
145 '''
146
146
147 if tm in self.__times:
147 if tm in self.__times:
148 return
148 return
149
149
150 self.parameters = getattr(dataOut, 'parameters', [])
150 self.parameters = getattr(dataOut, 'parameters', [])
151 if hasattr(dataOut, 'pairsList'):
151 if hasattr(dataOut, 'pairsList'):
152 self.pairs = dataOut.pairsList
152 self.pairs = dataOut.pairsList
153 self.channels = dataOut.channelList
153 self.channels = dataOut.channelList
154 self.interval = dataOut.getTimeInterval()
154 self.interval = dataOut.getTimeInterval()
155 self.localtime = dataOut.useLocalTime
155 self.localtime = dataOut.useLocalTime
156 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
156 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
157 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
157 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
158 self.__heights.append(dataOut.heightList)
158 self.__heights.append(dataOut.heightList)
159 self.__all_heights.update(dataOut.heightList)
159 self.__all_heights.update(dataOut.heightList)
160 self.__times.append(tm)
160 self.__times.append(tm)
161
161
162 for plot in self.plottypes:
162 for plot in self.plottypes:
163 if plot == 'spc':
163 if plot == 'spc':
164 z = dataOut.data_spc/dataOut.normFactor
164 z = dataOut.data_spc/dataOut.normFactor
165 self.data[plot] = 10*numpy.log10(z)
165 self.data[plot] = 10*numpy.log10(z)
166 if plot == 'cspc':
166 if plot == 'cspc':
167 self.data[plot] = dataOut.data_cspc
167 self.data[plot] = dataOut.data_cspc
168 if plot == 'noise':
168 if plot == 'noise':
169 self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
169 self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
170 if plot == 'rti':
170 if plot == 'rti':
171 self.data[plot][tm] = dataOut.getPower()
171 self.data[plot][tm] = dataOut.getPower()
172 if plot == 'snr_db':
172 if plot == 'snr_db':
173 self.data['snr'][tm] = dataOut.data_SNR
173 self.data['snr'][tm] = dataOut.data_SNR
174 if plot == 'snr':
174 if plot == 'snr':
175 self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR)
175 self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR)
176 if plot == 'dop':
176 if plot == 'dop':
177 self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP)
177 self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP)
178 if plot == 'mean':
178 if plot == 'mean':
179 self.data[plot][tm] = dataOut.data_MEAN
179 self.data[plot][tm] = dataOut.data_MEAN
180 if plot == 'std':
180 if plot == 'std':
181 self.data[plot][tm] = dataOut.data_STD
181 self.data[plot][tm] = dataOut.data_STD
182 if plot == 'coh':
182 if plot == 'coh':
183 self.data[plot][tm] = dataOut.getCoherence()
183 self.data[plot][tm] = dataOut.getCoherence()
184 if plot == 'phase':
184 if plot == 'phase':
185 self.data[plot][tm] = dataOut.getCoherence(phase=True)
185 self.data[plot][tm] = dataOut.getCoherence(phase=True)
186 if plot == 'output':
186 if plot == 'output':
187 self.data[plot][tm] = dataOut.data_output
187 self.data[plot][tm] = dataOut.data_output
188 if plot == 'param':
188 if plot == 'param':
189 self.data[plot][tm] = dataOut.data_param
189 self.data[plot][tm] = dataOut.data_param
190
190
191 def normalize_heights(self):
191 def normalize_heights(self):
192 '''
192 '''
193 Ensure same-dimension of the data for different heighList
193 Ensure same-dimension of the data for different heighList
194 '''
194 '''
195
195
196 H = numpy.array(list(self.__all_heights))
196 H = numpy.array(list(self.__all_heights))
197 H.sort()
197 H.sort()
198 for key in self.data:
198 for key in self.data:
199 shape = self.shape(key)[:-1] + H.shape
199 shape = self.shape(key)[:-1] + H.shape
200 for tm, obj in self.data[key].items():
200 for tm, obj in self.data[key].items():
201 h = self.__heights[self.__times.index(tm)]
201 h = self.__heights[self.__times.index(tm)]
202 if H.size == h.size:
202 if H.size == h.size:
203 continue
203 continue
204 index = numpy.where(numpy.in1d(H, h))[0]
204 index = numpy.where(numpy.in1d(H, h))[0]
205 dummy = numpy.zeros(shape) + numpy.nan
205 dummy = numpy.zeros(shape) + numpy.nan
206 if len(shape) == 2:
206 if len(shape) == 2:
207 dummy[:, index] = obj
207 dummy[:, index] = obj
208 else:
208 else:
209 dummy[index] = obj
209 dummy[index] = obj
210 self.data[key][tm] = dummy
210 self.data[key][tm] = dummy
211
211
212 self.__heights = [H for tm in self.__times]
212 self.__heights = [H for tm in self.__times]
213
213
214 def jsonify(self, decimate=False):
214 def jsonify(self, decimate=False):
215 '''
215 '''
216 Convert data to json
216 Convert data to json
217 '''
217 '''
218
218
219 data = {}
219 data = {}
220 tm = self.times[-1]
220 tm = self.times[-1]
221
221
222 for key in self.data:
222 for key in self.data:
223 if key in ('spc', 'cspc'):
223 if key in ('spc', 'cspc'):
224 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
224 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
225 dy = int(self.data[key].shape[2]/MAXNUMY) + 1
225 dy = int(self.data[key].shape[2]/MAXNUMY) + 1
226 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
226 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
227 else:
227 else:
228 data[key] = roundFloats(self.data[key][tm].tolist())
228 data[key] = roundFloats(self.data[key][tm].tolist())
229
229
230 ret = {'data': data}
230 ret = {'data': data}
231 ret['exp_code'] = self.exp_code
231 ret['exp_code'] = self.exp_code
232 ret['time'] = tm
232 ret['time'] = tm
233 ret['interval'] = self.interval
233 ret['interval'] = self.interval
234 ret['localtime'] = self.localtime
234 ret['localtime'] = self.localtime
235 ret['yrange'] = roundFloats(self.heights.tolist())
235 ret['yrange'] = roundFloats(self.heights.tolist())
236 if key in ('spc', 'cspc'):
236 if key in ('spc', 'cspc'):
237 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
237 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
238 else:
239 ret['xrange'] = []
238 if hasattr(self, 'pairs'):
240 if hasattr(self, 'pairs'):
239 ret['pairs'] = self.pairs
241 ret['pairs'] = self.pairs
242 else:
243 ret['pairs'] = []
240 return json.dumps(ret)
244 return json.dumps(ret)
241
245
242 @property
246 @property
243 def times(self):
247 def times(self):
244 '''
248 '''
245 Return the list of times of the current data
249 Return the list of times of the current data
246 '''
250 '''
247
251
248 ret = numpy.array(self.__times)
252 ret = numpy.array(self.__times)
249 ret.sort()
253 ret.sort()
250 return ret
254 return ret
251
255
252 @property
256 @property
253 def heights(self):
257 def heights(self):
254 '''
258 '''
255 Return the list of heights of the current data
259 Return the list of heights of the current data
256 '''
260 '''
257
261
258 return numpy.array(self.__heights[-1])
262 return numpy.array(self.__heights[-1])
259
263
260 class PublishData(Operation):
264 class PublishData(Operation):
261 '''
265 '''
262 Operation to send data over zmq.
266 Operation to send data over zmq.
263 '''
267 '''
264
268
265 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
269 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
266
270
267 def __init__(self, **kwargs):
271 def __init__(self, **kwargs):
268 """Inicio."""
272 """Inicio."""
269 Operation.__init__(self, **kwargs)
273 Operation.__init__(self, **kwargs)
270 self.isConfig = False
274 self.isConfig = False
271 self.client = None
275 self.client = None
272 self.zeromq = None
276 self.zeromq = None
273 self.mqtt = None
277 self.mqtt = None
274
278
275 def on_disconnect(self, client, userdata, rc):
279 def on_disconnect(self, client, userdata, rc):
276 if rc != 0:
280 if rc != 0:
277 log.warning('Unexpected disconnection.')
281 log.warning('Unexpected disconnection.')
278 self.connect()
282 self.connect()
279
283
280 def connect(self):
284 def connect(self):
281 log.warning('trying to connect')
285 log.warning('trying to connect')
282 try:
286 try:
283 self.client.connect(
287 self.client.connect(
284 host=self.host,
288 host=self.host,
285 port=self.port,
289 port=self.port,
286 keepalive=60*10,
290 keepalive=60*10,
287 bind_address='')
291 bind_address='')
288 self.client.loop_start()
292 self.client.loop_start()
289 # self.client.publish(
293 # self.client.publish(
290 # self.topic + 'SETUP',
294 # self.topic + 'SETUP',
291 # json.dumps(setup),
295 # json.dumps(setup),
292 # retain=True
296 # retain=True
293 # )
297 # )
294 except:
298 except:
295 log.error('MQTT Conection error.')
299 log.error('MQTT Conection error.')
296 self.client = False
300 self.client = False
297
301
298 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
302 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
299 self.counter = 0
303 self.counter = 0
300 self.topic = kwargs.get('topic', 'schain')
304 self.topic = kwargs.get('topic', 'schain')
301 self.delay = kwargs.get('delay', 0)
305 self.delay = kwargs.get('delay', 0)
302 self.plottype = kwargs.get('plottype', 'spectra')
306 self.plottype = kwargs.get('plottype', 'spectra')
303 self.host = kwargs.get('host', "10.10.10.82")
307 self.host = kwargs.get('host', "10.10.10.82")
304 self.port = kwargs.get('port', 3000)
308 self.port = kwargs.get('port', 3000)
305 self.clientId = clientId
309 self.clientId = clientId
306 self.cnt = 0
310 self.cnt = 0
307 self.zeromq = zeromq
311 self.zeromq = zeromq
308 self.mqtt = kwargs.get('plottype', 0)
312 self.mqtt = kwargs.get('plottype', 0)
309 self.client = None
313 self.client = None
310 self.verbose = verbose
314 self.verbose = verbose
311 setup = []
315 setup = []
312 if mqtt is 1:
316 if mqtt is 1:
313 self.client = mqtt.Client(
317 self.client = mqtt.Client(
314 client_id=self.clientId + self.topic + 'SCHAIN',
318 client_id=self.clientId + self.topic + 'SCHAIN',
315 clean_session=True)
319 clean_session=True)
316 self.client.on_disconnect = self.on_disconnect
320 self.client.on_disconnect = self.on_disconnect
317 self.connect()
321 self.connect()
318 for plot in self.plottype:
322 for plot in self.plottype:
319 setup.append({
323 setup.append({
320 'plot': plot,
324 'plot': plot,
321 'topic': self.topic + plot,
325 'topic': self.topic + plot,
322 'title': getattr(self, plot + '_' + 'title', False),
326 'title': getattr(self, plot + '_' + 'title', False),
323 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
327 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
324 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
328 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
325 'xrange': getattr(self, plot + '_' + 'xrange', False),
329 'xrange': getattr(self, plot + '_' + 'xrange', False),
326 'yrange': getattr(self, plot + '_' + 'yrange', False),
330 'yrange': getattr(self, plot + '_' + 'yrange', False),
327 'zrange': getattr(self, plot + '_' + 'zrange', False),
331 'zrange': getattr(self, plot + '_' + 'zrange', False),
328 })
332 })
329 if zeromq is 1:
333 if zeromq is 1:
330 context = zmq.Context()
334 context = zmq.Context()
331 self.zmq_socket = context.socket(zmq.PUSH)
335 self.zmq_socket = context.socket(zmq.PUSH)
332 server = kwargs.get('server', 'zmq.pipe')
336 server = kwargs.get('server', 'zmq.pipe')
333
337
334 if 'tcp://' in server:
338 if 'tcp://' in server:
335 address = server
339 address = server
336 else:
340 else:
337 address = 'ipc:///tmp/%s' % server
341 address = 'ipc:///tmp/%s' % server
338
342
339 self.zmq_socket.connect(address)
343 self.zmq_socket.connect(address)
340 time.sleep(1)
344 time.sleep(1)
341
345
342
346
343 def publish_data(self):
347 def publish_data(self):
344 self.dataOut.finished = False
348 self.dataOut.finished = False
345 if self.mqtt is 1:
349 if self.mqtt is 1:
346 yData = self.dataOut.heightList[:2].tolist()
350 yData = self.dataOut.heightList[:2].tolist()
347 if self.plottype == 'spectra':
351 if self.plottype == 'spectra':
348 data = getattr(self.dataOut, 'data_spc')
352 data = getattr(self.dataOut, 'data_spc')
349 z = data/self.dataOut.normFactor
353 z = data/self.dataOut.normFactor
350 zdB = 10*numpy.log10(z)
354 zdB = 10*numpy.log10(z)
351 xlen, ylen = zdB[0].shape
355 xlen, ylen = zdB[0].shape
352 dx = int(xlen/MAXNUMX) + 1
356 dx = int(xlen/MAXNUMX) + 1
353 dy = int(ylen/MAXNUMY) + 1
357 dy = int(ylen/MAXNUMY) + 1
354 Z = [0 for i in self.dataOut.channelList]
358 Z = [0 for i in self.dataOut.channelList]
355 for i in self.dataOut.channelList:
359 for i in self.dataOut.channelList:
356 Z[i] = zdB[i][::dx, ::dy].tolist()
360 Z[i] = zdB[i][::dx, ::dy].tolist()
357 payload = {
361 payload = {
358 'timestamp': self.dataOut.utctime,
362 'timestamp': self.dataOut.utctime,
359 'data': roundFloats(Z),
363 'data': roundFloats(Z),
360 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
364 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
361 'interval': self.dataOut.getTimeInterval(),
365 'interval': self.dataOut.getTimeInterval(),
362 'type': self.plottype,
366 'type': self.plottype,
363 'yData': yData
367 'yData': yData
364 }
368 }
365
369
366 elif self.plottype in ('rti', 'power'):
370 elif self.plottype in ('rti', 'power'):
367 data = getattr(self.dataOut, 'data_spc')
371 data = getattr(self.dataOut, 'data_spc')
368 z = data/self.dataOut.normFactor
372 z = data/self.dataOut.normFactor
369 avg = numpy.average(z, axis=1)
373 avg = numpy.average(z, axis=1)
370 avgdB = 10*numpy.log10(avg)
374 avgdB = 10*numpy.log10(avg)
371 xlen, ylen = z[0].shape
375 xlen, ylen = z[0].shape
372 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
376 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
373 AVG = [0 for i in self.dataOut.channelList]
377 AVG = [0 for i in self.dataOut.channelList]
374 for i in self.dataOut.channelList:
378 for i in self.dataOut.channelList:
375 AVG[i] = avgdB[i][::dy].tolist()
379 AVG[i] = avgdB[i][::dy].tolist()
376 payload = {
380 payload = {
377 'timestamp': self.dataOut.utctime,
381 'timestamp': self.dataOut.utctime,
378 'data': roundFloats(AVG),
382 'data': roundFloats(AVG),
379 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
383 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
380 'interval': self.dataOut.getTimeInterval(),
384 'interval': self.dataOut.getTimeInterval(),
381 'type': self.plottype,
385 'type': self.plottype,
382 'yData': yData
386 'yData': yData
383 }
387 }
384 elif self.plottype == 'noise':
388 elif self.plottype == 'noise':
385 noise = self.dataOut.getNoise()/self.dataOut.normFactor
389 noise = self.dataOut.getNoise()/self.dataOut.normFactor
386 noisedB = 10*numpy.log10(noise)
390 noisedB = 10*numpy.log10(noise)
387 payload = {
391 payload = {
388 'timestamp': self.dataOut.utctime,
392 'timestamp': self.dataOut.utctime,
389 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
393 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
390 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
394 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
391 'interval': self.dataOut.getTimeInterval(),
395 'interval': self.dataOut.getTimeInterval(),
392 'type': self.plottype,
396 'type': self.plottype,
393 'yData': yData
397 'yData': yData
394 }
398 }
395 elif self.plottype == 'snr':
399 elif self.plottype == 'snr':
396 data = getattr(self.dataOut, 'data_SNR')
400 data = getattr(self.dataOut, 'data_SNR')
397 avgdB = 10*numpy.log10(data)
401 avgdB = 10*numpy.log10(data)
398
402
399 ylen = data[0].size
403 ylen = data[0].size
400 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
404 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
401 AVG = [0 for i in self.dataOut.channelList]
405 AVG = [0 for i in self.dataOut.channelList]
402 for i in self.dataOut.channelList:
406 for i in self.dataOut.channelList:
403 AVG[i] = avgdB[i][::dy].tolist()
407 AVG[i] = avgdB[i][::dy].tolist()
404 payload = {
408 payload = {
405 'timestamp': self.dataOut.utctime,
409 'timestamp': self.dataOut.utctime,
406 'data': roundFloats(AVG),
410 'data': roundFloats(AVG),
407 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
411 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
408 'type': self.plottype,
412 'type': self.plottype,
409 'yData': yData
413 'yData': yData
410 }
414 }
411 else:
415 else:
412 print "Tipo de grafico invalido"
416 print "Tipo de grafico invalido"
413 payload = {
417 payload = {
414 'data': 'None',
418 'data': 'None',
415 'timestamp': 'None',
419 'timestamp': 'None',
416 'type': None
420 'type': None
417 }
421 }
418
422
419 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
423 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
420
424
421 if self.zeromq is 1:
425 if self.zeromq is 1:
422 if self.verbose:
426 if self.verbose:
423 log.log(
427 log.log(
424 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
428 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
425 self.name
429 self.name
426 )
430 )
427 self.zmq_socket.send_pyobj(self.dataOut)
431 self.zmq_socket.send_pyobj(self.dataOut)
428
432
429 def run(self, dataOut, **kwargs):
433 def run(self, dataOut, **kwargs):
430 self.dataOut = dataOut
434 self.dataOut = dataOut
431 if not self.isConfig:
435 if not self.isConfig:
432 self.setup(**kwargs)
436 self.setup(**kwargs)
433 self.isConfig = True
437 self.isConfig = True
434
438
435 self.publish_data()
439 self.publish_data()
436 time.sleep(self.delay)
440 time.sleep(self.delay)
437
441
438 def close(self):
442 def close(self):
439 if self.zeromq is 1:
443 if self.zeromq is 1:
440 self.dataOut.finished = True
444 self.dataOut.finished = True
441 self.zmq_socket.send_pyobj(self.dataOut)
445 self.zmq_socket.send_pyobj(self.dataOut)
442 time.sleep(0.1)
446 time.sleep(0.1)
443 self.zmq_socket.close()
447 self.zmq_socket.close()
444 if self.client:
448 if self.client:
445 self.client.loop_stop()
449 self.client.loop_stop()
446 self.client.disconnect()
450 self.client.disconnect()
447
451
448
452
449 class ReceiverData(ProcessingUnit):
453 class ReceiverData(ProcessingUnit):
450
454
451 __attrs__ = ['server']
455 __attrs__ = ['server']
452
456
453 def __init__(self, **kwargs):
457 def __init__(self, **kwargs):
454
458
455 ProcessingUnit.__init__(self, **kwargs)
459 ProcessingUnit.__init__(self, **kwargs)
456
460
457 self.isConfig = False
461 self.isConfig = False
458 server = kwargs.get('server', 'zmq.pipe')
462 server = kwargs.get('server', 'zmq.pipe')
459 if 'tcp://' in server:
463 if 'tcp://' in server:
460 address = server
464 address = server
461 else:
465 else:
462 address = 'ipc:///tmp/%s' % server
466 address = 'ipc:///tmp/%s' % server
463
467
464 self.address = address
468 self.address = address
465 self.dataOut = JROData()
469 self.dataOut = JROData()
466
470
467 def setup(self):
471 def setup(self):
468
472
469 self.context = zmq.Context()
473 self.context = zmq.Context()
470 self.receiver = self.context.socket(zmq.PULL)
474 self.receiver = self.context.socket(zmq.PULL)
471 self.receiver.bind(self.address)
475 self.receiver.bind(self.address)
472 time.sleep(0.5)
476 time.sleep(0.5)
473 log.success('ReceiverData from {}'.format(self.address))
477 log.success('ReceiverData from {}'.format(self.address))
474
478
475
479
476 def run(self):
480 def run(self):
477
481
478 if not self.isConfig:
482 if not self.isConfig:
479 self.setup()
483 self.setup()
480 self.isConfig = True
484 self.isConfig = True
481
485
482 self.dataOut = self.receiver.recv_pyobj()
486 self.dataOut = self.receiver.recv_pyobj()
483 log.log('{} - {}'.format(self.dataOut.type,
487 log.log('{} - {}'.format(self.dataOut.type,
484 self.dataOut.datatime.ctime(),),
488 self.dataOut.datatime.ctime(),),
485 'Receiving')
489 'Receiving')
486
490
487
491
488 class PlotterReceiver(ProcessingUnit, Process):
492 class PlotterReceiver(ProcessingUnit, Process):
489
493
490 throttle_value = 5
494 throttle_value = 5
491 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
495 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
492 'exp_code', 'web_server']
496 'exp_code', 'web_server']
493
497
494 def __init__(self, **kwargs):
498 def __init__(self, **kwargs):
495
499
496 ProcessingUnit.__init__(self, **kwargs)
500 ProcessingUnit.__init__(self, **kwargs)
497 Process.__init__(self)
501 Process.__init__(self)
498 self.mp = False
502 self.mp = False
499 self.isConfig = False
503 self.isConfig = False
500 self.isWebConfig = False
504 self.isWebConfig = False
501 self.connections = 0
505 self.connections = 0
502 server = kwargs.get('server', 'zmq.pipe')
506 server = kwargs.get('server', 'zmq.pipe')
503 web_server = kwargs.get('web_server', None)
507 web_server = kwargs.get('web_server', None)
504 if 'tcp://' in server:
508 if 'tcp://' in server:
505 address = server
509 address = server
506 else:
510 else:
507 address = 'ipc:///tmp/%s' % server
511 address = 'ipc:///tmp/%s' % server
508 self.address = address
512 self.address = address
509 self.web_address = web_server
513 self.web_address = web_server
510 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
514 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
511 self.realtime = kwargs.get('realtime', False)
515 self.realtime = kwargs.get('realtime', False)
512 self.localtime = kwargs.get('localtime', True)
516 self.localtime = kwargs.get('localtime', True)
513 self.throttle_value = kwargs.get('throttle', 5)
517 self.throttle_value = kwargs.get('throttle', 5)
514 self.exp_code = kwargs.get('exp_code', None)
518 self.exp_code = kwargs.get('exp_code', None)
515 self.sendData = self.initThrottle(self.throttle_value)
519 self.sendData = self.initThrottle(self.throttle_value)
516 self.dates = []
520 self.dates = []
517 self.setup()
521 self.setup()
518
522
519 def setup(self):
523 def setup(self):
520
524
521 self.data = Data(self.plottypes, self.throttle_value, self.exp_code)
525 self.data = Data(self.plottypes, self.throttle_value, self.exp_code)
522 self.isConfig = True
526 self.isConfig = True
523
527
524 def event_monitor(self, monitor):
528 def event_monitor(self, monitor):
525
529
526 events = {}
530 events = {}
527
531
528 for name in dir(zmq):
532 for name in dir(zmq):
529 if name.startswith('EVENT_'):
533 if name.startswith('EVENT_'):
530 value = getattr(zmq, name)
534 value = getattr(zmq, name)
531 events[value] = name
535 events[value] = name
532
536
533 while monitor.poll():
537 while monitor.poll():
534 evt = recv_monitor_message(monitor)
538 evt = recv_monitor_message(monitor)
535 if evt['event'] == 32:
539 if evt['event'] == 32:
536 self.connections += 1
540 self.connections += 1
537 if evt['event'] == 512:
541 if evt['event'] == 512:
538 pass
542 pass
539
543
540 evt.update({'description': events[evt['event']]})
544 evt.update({'description': events[evt['event']]})
541
545
542 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
546 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
543 break
547 break
544 monitor.close()
548 monitor.close()
545 print('event monitor thread done!')
549 print('event monitor thread done!')
546
550
547 def initThrottle(self, throttle_value):
551 def initThrottle(self, throttle_value):
548
552
549 @throttle(seconds=throttle_value)
553 @throttle(seconds=throttle_value)
550 def sendDataThrottled(fn_sender, data):
554 def sendDataThrottled(fn_sender, data):
551 fn_sender(data)
555 fn_sender(data)
552
556
553 return sendDataThrottled
557 return sendDataThrottled
554
558
555 def send(self, data):
559 def send(self, data):
556 log.success('Sending {}'.format(data), self.name)
560 log.success('Sending {}'.format(data), self.name)
557 self.sender.send_pyobj(data)
561 self.sender.send_pyobj(data)
558
562
559 def run(self):
563 def run(self):
560
564
561 log.success(
565 log.success(
562 'Starting from {}'.format(self.address),
566 'Starting from {}'.format(self.address),
563 self.name
567 self.name
564 )
568 )
565
569
566 self.context = zmq.Context()
570 self.context = zmq.Context()
567 self.receiver = self.context.socket(zmq.PULL)
571 self.receiver = self.context.socket(zmq.PULL)
568 self.receiver.bind(self.address)
572 self.receiver.bind(self.address)
569 monitor = self.receiver.get_monitor_socket()
573 monitor = self.receiver.get_monitor_socket()
570 self.sender = self.context.socket(zmq.PUB)
574 self.sender = self.context.socket(zmq.PUB)
571 if self.web_address:
575 if self.web_address:
572 log.success(
576 log.success(
573 'Sending to web: {}'.format(self.web_address),
577 'Sending to web: {}'.format(self.web_address),
574 self.name
578 self.name
575 )
579 )
576 self.sender_web = self.context.socket(zmq.PUB)
580 self.sender_web = self.context.socket(zmq.PUB)
577 self.sender_web.connect(self.web_address)
581 self.sender_web.connect(self.web_address)
578 time.sleep(1)
582 time.sleep(1)
579
583
580 if 'server' in self.kwargs:
584 if 'server' in self.kwargs:
581 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
585 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
582 else:
586 else:
583 self.sender.bind("ipc:///tmp/zmq.plots")
587 self.sender.bind("ipc:///tmp/zmq.plots")
584
588
585 time.sleep(2)
589 time.sleep(2)
586
590
587 t = Thread(target=self.event_monitor, args=(monitor,))
591 t = Thread(target=self.event_monitor, args=(monitor,))
588 t.start()
592 t.start()
589
593
590 while True:
594 while True:
591 dataOut = self.receiver.recv_pyobj()
595 dataOut = self.receiver.recv_pyobj()
592 if not dataOut.flagNoData:
596 if not dataOut.flagNoData:
593 if dataOut.type == 'Parameters':
597 if dataOut.type == 'Parameters':
594 tm = dataOut.utctimeInit
598 tm = dataOut.utctimeInit
595 else:
599 else:
596 tm = dataOut.utctime
600 tm = dataOut.utctime
597 if dataOut.useLocalTime:
601 if dataOut.useLocalTime:
598 if not self.localtime:
602 if not self.localtime:
599 tm += time.timezone
603 tm += time.timezone
600 dt = datetime.datetime.fromtimestamp(tm).date()
604 dt = datetime.datetime.fromtimestamp(tm).date()
601 else:
605 else:
602 if self.localtime:
606 if self.localtime:
603 tm -= time.timezone
607 tm -= time.timezone
604 dt = datetime.datetime.utcfromtimestamp(tm).date()
608 dt = datetime.datetime.utcfromtimestamp(tm).date()
605 coerce = False
609 coerce = False
606 if dt not in self.dates:
610 if dt not in self.dates:
607 if self.data:
611 if self.data:
608 self.data.ended = True
612 self.data.ended = True
609 self.send(self.data)
613 self.send(self.data)
610 coerce = True
614 coerce = True
611 self.data.setup()
615 self.data.setup()
612 self.dates.append(dt)
616 self.dates.append(dt)
613
617
614 self.data.update(dataOut, tm)
618 self.data.update(dataOut, tm)
615
619
616 if dataOut.finished is True:
620 if dataOut.finished is True:
617 self.connections -= 1
621 self.connections -= 1
618 if self.connections == 0 and dt in self.dates:
622 if self.connections == 0 and dt in self.dates:
619 self.data.ended = True
623 self.data.ended = True
620 self.send(self.data)
624 self.send(self.data)
621 self.data.setup()
625 self.data.setup()
622 else:
626 else:
623 if self.realtime:
627 if self.realtime:
624 self.send(self.data)
628 self.send(self.data)
625 if self.web_address:
629 if self.web_address:
626 self.sender_web.send(self.data.jsonify())
630 self.sender_web.send(self.data.jsonify())
627 else:
631 else:
628 self.sendData(self.send, self.data, coerce=coerce)
632 self.sendData(self.send, self.data, coerce=coerce)
629 coerce = False
633 coerce = False
630
634
631 return
635 return
General Comments 0
You need to be logged in to leave comments. Login now