##// END OF EJS Templates
Fix publish to web_server
Juan C. Espinoza -
r1114:f58248fe43e5
parent child
Show More
@@ -1,636 +1,627
1 '''
1 '''
2 @author: Juan C. Espinoza
2 @author: Juan C. Espinoza
3 '''
3 '''
4
4
5 import time
5 import time
6 import json
6 import json
7 import numpy
7 import numpy
8 import paho.mqtt.client as mqtt
8 import paho.mqtt.client as mqtt
9 import zmq
9 import zmq
10 import datetime
10 import datetime
11 from zmq.utils.monitor import recv_monitor_message
11 from zmq.utils.monitor import recv_monitor_message
12 from functools import wraps
12 from functools import wraps
13 from threading import Thread
13 from threading import Thread
14 from multiprocessing import Process
14 from multiprocessing import Process
15
15
16 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
16 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 from schainpy.model.data.jrodata import JROData
17 from schainpy.model.data.jrodata import JROData
18 from schainpy.utils import log
18 from schainpy.utils import log
19
19
20 MAXNUMX = 100
20 MAXNUMX = 100
21 MAXNUMY = 100
21 MAXNUMY = 100
22
22
23 class PrettyFloat(float):
23 class PrettyFloat(float):
24 def __repr__(self):
24 def __repr__(self):
25 return '%.2f' % self
25 return '%.2f' % self
26
26
27 def roundFloats(obj):
27 def roundFloats(obj):
28 if isinstance(obj, list):
28 if isinstance(obj, list):
29 return map(roundFloats, obj)
29 return map(roundFloats, obj)
30 elif isinstance(obj, float):
30 elif isinstance(obj, float):
31 return round(obj, 2)
31 return round(obj, 2)
32
32
33 def decimate(z, MAXNUMY):
33 def decimate(z, MAXNUMY):
34 dy = int(len(z[0])/MAXNUMY) + 1
34 dy = int(len(z[0])/MAXNUMY) + 1
35
35
36 return z[::, ::dy]
36 return z[::, ::dy]
37
37
38 class throttle(object):
38 class throttle(object):
39 '''
39 '''
40 Decorator that prevents a function from being called more than once every
40 Decorator that prevents a function from being called more than once every
41 time period.
41 time period.
42 To create a function that cannot be called more than once a minute, but
42 To create a function that cannot be called more than once a minute, but
43 will sleep until it can be called:
43 will sleep until it can be called:
44 @throttle(minutes=1)
44 @throttle(minutes=1)
45 def foo():
45 def foo():
46 pass
46 pass
47
47
48 for i in range(10):
48 for i in range(10):
49 foo()
49 foo()
50 print "This function has run %s times." % i
50 print "This function has run %s times." % i
51 '''
51 '''
52
52
53 def __init__(self, seconds=0, minutes=0, hours=0):
53 def __init__(self, seconds=0, minutes=0, hours=0):
54 self.throttle_period = datetime.timedelta(
54 self.throttle_period = datetime.timedelta(
55 seconds=seconds, minutes=minutes, hours=hours
55 seconds=seconds, minutes=minutes, hours=hours
56 )
56 )
57
57
58 self.time_of_last_call = datetime.datetime.min
58 self.time_of_last_call = datetime.datetime.min
59
59
60 def __call__(self, fn):
60 def __call__(self, fn):
61 @wraps(fn)
61 @wraps(fn)
62 def wrapper(*args, **kwargs):
62 def wrapper(*args, **kwargs):
63 coerce = kwargs.pop('coerce', None)
63 coerce = kwargs.pop('coerce', None)
64 if coerce:
64 if coerce:
65 self.time_of_last_call = datetime.datetime.now()
65 self.time_of_last_call = datetime.datetime.now()
66 return fn(*args, **kwargs)
66 return fn(*args, **kwargs)
67 else:
67 else:
68 now = datetime.datetime.now()
68 now = datetime.datetime.now()
69 time_since_last_call = now - self.time_of_last_call
69 time_since_last_call = now - self.time_of_last_call
70 time_left = self.throttle_period - time_since_last_call
70 time_left = self.throttle_period - time_since_last_call
71
71
72 if time_left > datetime.timedelta(seconds=0):
72 if time_left > datetime.timedelta(seconds=0):
73 return
73 return
74
74
75 self.time_of_last_call = datetime.datetime.now()
75 self.time_of_last_call = datetime.datetime.now()
76 return fn(*args, **kwargs)
76 return fn(*args, **kwargs)
77
77
78 return wrapper
78 return wrapper
79
79
80 class Data(object):
80 class Data(object):
81 '''
81 '''
82 Object to hold data to be plotted
82 Object to hold data to be plotted
83 '''
83 '''
84
84
85 def __init__(self, plottypes, throttle_value):
85 def __init__(self, plottypes, throttle_value, exp_code):
86 self.plottypes = plottypes
86 self.plottypes = plottypes
87 self.throttle = throttle_value
87 self.throttle = throttle_value
88 self.exp_code = exp_code
88 self.ended = False
89 self.ended = False
89 self.localtime = False
90 self.localtime = False
90 self.__times = []
91 self.__times = []
91 self.__heights = []
92 self.__heights = []
92
93
93 def __str__(self):
94 def __str__(self):
94 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
95 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
95 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
96 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
96
97
97 def __len__(self):
98 def __len__(self):
98 return len(self.__times)
99 return len(self.__times)
99
100
100 def __getitem__(self, key):
101 def __getitem__(self, key):
101 if key not in self.data:
102 if key not in self.data:
102 raise KeyError(log.error('Missing key: {}'.format(key)))
103 raise KeyError(log.error('Missing key: {}'.format(key)))
103
104
104 if 'spc' in key:
105 if 'spc' in key:
105 ret = self.data[key]
106 ret = self.data[key]
106 else:
107 else:
107 ret = numpy.array([self.data[key][x] for x in self.times])
108 ret = numpy.array([self.data[key][x] for x in self.times])
108 if ret.ndim > 1:
109 if ret.ndim > 1:
109 ret = numpy.swapaxes(ret, 0, 1)
110 ret = numpy.swapaxes(ret, 0, 1)
110 return ret
111 return ret
111
112
112 def __contains__(self, key):
113 def __contains__(self, key):
113 return key in self.data
114 return key in self.data
114
115
115 def setup(self):
116 def setup(self):
116 '''
117 '''
117 Configure object
118 Configure object
118 '''
119 '''
119
120
120 self.ended = False
121 self.ended = False
121 self.data = {}
122 self.data = {}
122 self.__times = []
123 self.__times = []
123 self.__heights = []
124 self.__heights = []
124 self.__all_heights = set()
125 self.__all_heights = set()
125 for plot in self.plottypes:
126 for plot in self.plottypes:
126 if 'snr' in plot:
127 if 'snr' in plot:
127 plot = 'snr'
128 plot = 'snr'
128 self.data[plot] = {}
129 self.data[plot] = {}
129
130
130 def shape(self, key):
131 def shape(self, key):
131 '''
132 '''
132 Get the shape of the one-element data for the given key
133 Get the shape of the one-element data for the given key
133 '''
134 '''
134
135
135 if len(self.data[key]):
136 if len(self.data[key]):
136 if 'spc' in key:
137 if 'spc' in key:
137 return self.data[key].shape
138 return self.data[key].shape
138 return self.data[key][self.__times[0]].shape
139 return self.data[key][self.__times[0]].shape
139 return (0,)
140 return (0,)
140
141
141 def update(self, dataOut, tm):
142 def update(self, dataOut, tm):
142 '''
143 '''
143 Update data object with new dataOut
144 Update data object with new dataOut
144 '''
145 '''
145
146
146 if tm in self.__times:
147 if tm in self.__times:
147 return
148 return
148
149
149 self.parameters = getattr(dataOut, 'parameters', [])
150 self.parameters = getattr(dataOut, 'parameters', [])
150 if hasattr(dataOut, 'pairsList'):
151 if hasattr(dataOut, 'pairsList'):
151 self.pairs = dataOut.pairsList
152 self.pairs = dataOut.pairsList
152 self.channels = dataOut.channelList
153 self.channels = dataOut.channelList
153 self.interval = dataOut.getTimeInterval()
154 self.interval = dataOut.getTimeInterval()
154 self.localtime = dataOut.useLocalTime
155 self.localtime = dataOut.useLocalTime
155 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
156 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
156 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
157 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
157 self.__heights.append(dataOut.heightList)
158 self.__heights.append(dataOut.heightList)
158 self.__all_heights.update(dataOut.heightList)
159 self.__all_heights.update(dataOut.heightList)
159 self.__times.append(tm)
160 self.__times.append(tm)
160
161
161 for plot in self.plottypes:
162 for plot in self.plottypes:
162 if plot == 'spc':
163 if plot == 'spc':
163 z = dataOut.data_spc/dataOut.normFactor
164 z = dataOut.data_spc/dataOut.normFactor
164 self.data[plot] = 10*numpy.log10(z)
165 self.data[plot] = 10*numpy.log10(z)
165 if plot == 'cspc':
166 if plot == 'cspc':
166 self.data[plot] = dataOut.data_cspc
167 self.data[plot] = dataOut.data_cspc
167 if plot == 'noise':
168 if plot == 'noise':
168 self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
169 self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
169 if plot == 'rti':
170 if plot == 'rti':
170 self.data[plot][tm] = dataOut.getPower()
171 self.data[plot][tm] = dataOut.getPower()
171 if plot == 'snr_db':
172 if plot == 'snr_db':
172 self.data['snr'][tm] = dataOut.data_SNR
173 self.data['snr'][tm] = dataOut.data_SNR
173 if plot == 'snr':
174 if plot == 'snr':
174 self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR)
175 self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR)
175 if plot == 'dop':
176 if plot == 'dop':
176 self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP)
177 self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP)
177 if plot == 'mean':
178 if plot == 'mean':
178 self.data[plot][tm] = dataOut.data_MEAN
179 self.data[plot][tm] = dataOut.data_MEAN
179 if plot == 'std':
180 if plot == 'std':
180 self.data[plot][tm] = dataOut.data_STD
181 self.data[plot][tm] = dataOut.data_STD
181 if plot == 'coh':
182 if plot == 'coh':
182 self.data[plot][tm] = dataOut.getCoherence()
183 self.data[plot][tm] = dataOut.getCoherence()
183 if plot == 'phase':
184 if plot == 'phase':
184 self.data[plot][tm] = dataOut.getCoherence(phase=True)
185 self.data[plot][tm] = dataOut.getCoherence(phase=True)
185 if plot == 'output':
186 if plot == 'output':
186 self.data[plot][tm] = dataOut.data_output
187 self.data[plot][tm] = dataOut.data_output
187 if plot == 'param':
188 if plot == 'param':
188 self.data[plot][tm] = dataOut.data_param
189 self.data[plot][tm] = dataOut.data_param
189
190
190 def normalize_heights(self):
191 def normalize_heights(self):
191 '''
192 '''
192 Ensure same-dimension of the data for different heighList
193 Ensure same-dimension of the data for different heighList
193 '''
194 '''
194
195
195 H = numpy.array(list(self.__all_heights))
196 H = numpy.array(list(self.__all_heights))
196 H.sort()
197 H.sort()
197 for key in self.data:
198 for key in self.data:
198 shape = self.shape(key)[:-1] + H.shape
199 shape = self.shape(key)[:-1] + H.shape
199 for tm, obj in self.data[key].items():
200 for tm, obj in self.data[key].items():
200 h = self.__heights[self.__times.index(tm)]
201 h = self.__heights[self.__times.index(tm)]
201 if H.size == h.size:
202 if H.size == h.size:
202 continue
203 continue
203 index = numpy.where(numpy.in1d(H, h))[0]
204 index = numpy.where(numpy.in1d(H, h))[0]
204 dummy = numpy.zeros(shape) + numpy.nan
205 dummy = numpy.zeros(shape) + numpy.nan
205 if len(shape) == 2:
206 if len(shape) == 2:
206 dummy[:, index] = obj
207 dummy[:, index] = obj
207 else:
208 else:
208 dummy[index] = obj
209 dummy[index] = obj
209 self.data[key][tm] = dummy
210 self.data[key][tm] = dummy
210
211
211 self.__heights = [H for tm in self.__times]
212 self.__heights = [H for tm in self.__times]
212
213
213 def jsonify(self, decimate=False):
214 def jsonify(self, decimate=False):
214 '''
215 '''
215 Convert data to json
216 Convert data to json
216 '''
217 '''
217
218
218 ret = {}
219 data = {}
219 tm = self.times[-1]
220 tm = self.times[-1]
220
221
221 for key, value in self.data:
222 for key in self.data:
222 if key in ('spc', 'cspc'):
223 if key in ('spc', 'cspc'):
223 ret[key] = roundFloats(self.data[key].to_list())
224 data[key] = roundFloats(self.data[key].tolist())
224 else:
225 else:
225 ret[key] = roundFloats(self.data[key][tm].to_list())
226 data[key] = roundFloats(self.data[key][tm].tolist())
226
227
227 ret['timestamp'] = tm
228 ret = {'data': data}
229 ret['exp_code'] = self.exp_code
230 ret['time'] = tm
228 ret['interval'] = self.interval
231 ret['interval'] = self.interval
232 ret['ymin'] = self.heights[0]
233 ret['ymax'] = self.heights[-1]
234 ret['ystep'] = self.heights[1] - self.heights[0]
235
236 return json.dumps(ret)
229
237
230 @property
238 @property
231 def times(self):
239 def times(self):
232 '''
240 '''
233 Return the list of times of the current data
241 Return the list of times of the current data
234 '''
242 '''
235
243
236 ret = numpy.array(self.__times)
244 ret = numpy.array(self.__times)
237 ret.sort()
245 ret.sort()
238 return ret
246 return ret
239
247
240 @property
248 @property
241 def heights(self):
249 def heights(self):
242 '''
250 '''
243 Return the list of heights of the current data
251 Return the list of heights of the current data
244 '''
252 '''
245
253
246 return numpy.array(self.__heights[-1])
254 return numpy.array(self.__heights[-1])
247
255
248 class PublishData(Operation):
256 class PublishData(Operation):
249 '''
257 '''
250 Operation to send data over zmq.
258 Operation to send data over zmq.
251 '''
259 '''
252
260
253 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
261 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
254
262
255 def __init__(self, **kwargs):
263 def __init__(self, **kwargs):
256 """Inicio."""
264 """Inicio."""
257 Operation.__init__(self, **kwargs)
265 Operation.__init__(self, **kwargs)
258 self.isConfig = False
266 self.isConfig = False
259 self.client = None
267 self.client = None
260 self.zeromq = None
268 self.zeromq = None
261 self.mqtt = None
269 self.mqtt = None
262
270
263 def on_disconnect(self, client, userdata, rc):
271 def on_disconnect(self, client, userdata, rc):
264 if rc != 0:
272 if rc != 0:
265 log.warning('Unexpected disconnection.')
273 log.warning('Unexpected disconnection.')
266 self.connect()
274 self.connect()
267
275
268 def connect(self):
276 def connect(self):
269 log.warning('trying to connect')
277 log.warning('trying to connect')
270 try:
278 try:
271 self.client.connect(
279 self.client.connect(
272 host=self.host,
280 host=self.host,
273 port=self.port,
281 port=self.port,
274 keepalive=60*10,
282 keepalive=60*10,
275 bind_address='')
283 bind_address='')
276 self.client.loop_start()
284 self.client.loop_start()
277 # self.client.publish(
285 # self.client.publish(
278 # self.topic + 'SETUP',
286 # self.topic + 'SETUP',
279 # json.dumps(setup),
287 # json.dumps(setup),
280 # retain=True
288 # retain=True
281 # )
289 # )
282 except:
290 except:
283 log.error('MQTT Conection error.')
291 log.error('MQTT Conection error.')
284 self.client = False
292 self.client = False
285
293
286 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
294 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
287 self.counter = 0
295 self.counter = 0
288 self.topic = kwargs.get('topic', 'schain')
296 self.topic = kwargs.get('topic', 'schain')
289 self.delay = kwargs.get('delay', 0)
297 self.delay = kwargs.get('delay', 0)
290 self.plottype = kwargs.get('plottype', 'spectra')
298 self.plottype = kwargs.get('plottype', 'spectra')
291 self.host = kwargs.get('host', "10.10.10.82")
299 self.host = kwargs.get('host', "10.10.10.82")
292 self.port = kwargs.get('port', 3000)
300 self.port = kwargs.get('port', 3000)
293 self.clientId = clientId
301 self.clientId = clientId
294 self.cnt = 0
302 self.cnt = 0
295 self.zeromq = zeromq
303 self.zeromq = zeromq
296 self.mqtt = kwargs.get('plottype', 0)
304 self.mqtt = kwargs.get('plottype', 0)
297 self.client = None
305 self.client = None
298 self.verbose = verbose
306 self.verbose = verbose
299 setup = []
307 setup = []
300 if mqtt is 1:
308 if mqtt is 1:
301 self.client = mqtt.Client(
309 self.client = mqtt.Client(
302 client_id=self.clientId + self.topic + 'SCHAIN',
310 client_id=self.clientId + self.topic + 'SCHAIN',
303 clean_session=True)
311 clean_session=True)
304 self.client.on_disconnect = self.on_disconnect
312 self.client.on_disconnect = self.on_disconnect
305 self.connect()
313 self.connect()
306 for plot in self.plottype:
314 for plot in self.plottype:
307 setup.append({
315 setup.append({
308 'plot': plot,
316 'plot': plot,
309 'topic': self.topic + plot,
317 'topic': self.topic + plot,
310 'title': getattr(self, plot + '_' + 'title', False),
318 'title': getattr(self, plot + '_' + 'title', False),
311 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
319 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
312 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
320 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
313 'xrange': getattr(self, plot + '_' + 'xrange', False),
321 'xrange': getattr(self, plot + '_' + 'xrange', False),
314 'yrange': getattr(self, plot + '_' + 'yrange', False),
322 'yrange': getattr(self, plot + '_' + 'yrange', False),
315 'zrange': getattr(self, plot + '_' + 'zrange', False),
323 'zrange': getattr(self, plot + '_' + 'zrange', False),
316 })
324 })
317 if zeromq is 1:
325 if zeromq is 1:
318 context = zmq.Context()
326 context = zmq.Context()
319 self.zmq_socket = context.socket(zmq.PUSH)
327 self.zmq_socket = context.socket(zmq.PUSH)
320 server = kwargs.get('server', 'zmq.pipe')
328 server = kwargs.get('server', 'zmq.pipe')
321
329
322 if 'tcp://' in server:
330 if 'tcp://' in server:
323 address = server
331 address = server
324 else:
332 else:
325 address = 'ipc:///tmp/%s' % server
333 address = 'ipc:///tmp/%s' % server
326
334
327 self.zmq_socket.connect(address)
335 self.zmq_socket.connect(address)
328 time.sleep(1)
336 time.sleep(1)
329
337
330
338
331 def publish_data(self):
339 def publish_data(self):
332 self.dataOut.finished = False
340 self.dataOut.finished = False
333 if self.mqtt is 1:
341 if self.mqtt is 1:
334 yData = self.dataOut.heightList[:2].tolist()
342 yData = self.dataOut.heightList[:2].tolist()
335 if self.plottype == 'spectra':
343 if self.plottype == 'spectra':
336 data = getattr(self.dataOut, 'data_spc')
344 data = getattr(self.dataOut, 'data_spc')
337 z = data/self.dataOut.normFactor
345 z = data/self.dataOut.normFactor
338 zdB = 10*numpy.log10(z)
346 zdB = 10*numpy.log10(z)
339 xlen, ylen = zdB[0].shape
347 xlen, ylen = zdB[0].shape
340 dx = int(xlen/MAXNUMX) + 1
348 dx = int(xlen/MAXNUMX) + 1
341 dy = int(ylen/MAXNUMY) + 1
349 dy = int(ylen/MAXNUMY) + 1
342 Z = [0 for i in self.dataOut.channelList]
350 Z = [0 for i in self.dataOut.channelList]
343 for i in self.dataOut.channelList:
351 for i in self.dataOut.channelList:
344 Z[i] = zdB[i][::dx, ::dy].tolist()
352 Z[i] = zdB[i][::dx, ::dy].tolist()
345 payload = {
353 payload = {
346 'timestamp': self.dataOut.utctime,
354 'timestamp': self.dataOut.utctime,
347 'data': roundFloats(Z),
355 'data': roundFloats(Z),
348 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
356 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
349 'interval': self.dataOut.getTimeInterval(),
357 'interval': self.dataOut.getTimeInterval(),
350 'type': self.plottype,
358 'type': self.plottype,
351 'yData': yData
359 'yData': yData
352 }
360 }
353
361
354 elif self.plottype in ('rti', 'power'):
362 elif self.plottype in ('rti', 'power'):
355 data = getattr(self.dataOut, 'data_spc')
363 data = getattr(self.dataOut, 'data_spc')
356 z = data/self.dataOut.normFactor
364 z = data/self.dataOut.normFactor
357 avg = numpy.average(z, axis=1)
365 avg = numpy.average(z, axis=1)
358 avgdB = 10*numpy.log10(avg)
366 avgdB = 10*numpy.log10(avg)
359 xlen, ylen = z[0].shape
367 xlen, ylen = z[0].shape
360 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
368 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
361 AVG = [0 for i in self.dataOut.channelList]
369 AVG = [0 for i in self.dataOut.channelList]
362 for i in self.dataOut.channelList:
370 for i in self.dataOut.channelList:
363 AVG[i] = avgdB[i][::dy].tolist()
371 AVG[i] = avgdB[i][::dy].tolist()
364 payload = {
372 payload = {
365 'timestamp': self.dataOut.utctime,
373 'timestamp': self.dataOut.utctime,
366 'data': roundFloats(AVG),
374 'data': roundFloats(AVG),
367 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
375 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
368 'interval': self.dataOut.getTimeInterval(),
376 'interval': self.dataOut.getTimeInterval(),
369 'type': self.plottype,
377 'type': self.plottype,
370 'yData': yData
378 'yData': yData
371 }
379 }
372 elif self.plottype == 'noise':
380 elif self.plottype == 'noise':
373 noise = self.dataOut.getNoise()/self.dataOut.normFactor
381 noise = self.dataOut.getNoise()/self.dataOut.normFactor
374 noisedB = 10*numpy.log10(noise)
382 noisedB = 10*numpy.log10(noise)
375 payload = {
383 payload = {
376 'timestamp': self.dataOut.utctime,
384 'timestamp': self.dataOut.utctime,
377 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
385 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
378 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
386 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
379 'interval': self.dataOut.getTimeInterval(),
387 'interval': self.dataOut.getTimeInterval(),
380 'type': self.plottype,
388 'type': self.plottype,
381 'yData': yData
389 'yData': yData
382 }
390 }
383 elif self.plottype == 'snr':
391 elif self.plottype == 'snr':
384 data = getattr(self.dataOut, 'data_SNR')
392 data = getattr(self.dataOut, 'data_SNR')
385 avgdB = 10*numpy.log10(data)
393 avgdB = 10*numpy.log10(data)
386
394
387 ylen = data[0].size
395 ylen = data[0].size
388 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
396 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
389 AVG = [0 for i in self.dataOut.channelList]
397 AVG = [0 for i in self.dataOut.channelList]
390 for i in self.dataOut.channelList:
398 for i in self.dataOut.channelList:
391 AVG[i] = avgdB[i][::dy].tolist()
399 AVG[i] = avgdB[i][::dy].tolist()
392 payload = {
400 payload = {
393 'timestamp': self.dataOut.utctime,
401 'timestamp': self.dataOut.utctime,
394 'data': roundFloats(AVG),
402 'data': roundFloats(AVG),
395 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
403 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
396 'type': self.plottype,
404 'type': self.plottype,
397 'yData': yData
405 'yData': yData
398 }
406 }
399 else:
407 else:
400 print "Tipo de grafico invalido"
408 print "Tipo de grafico invalido"
401 payload = {
409 payload = {
402 'data': 'None',
410 'data': 'None',
403 'timestamp': 'None',
411 'timestamp': 'None',
404 'type': None
412 'type': None
405 }
413 }
406
414
407 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
415 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
408
416
409 if self.zeromq is 1:
417 if self.zeromq is 1:
410 if self.verbose:
418 if self.verbose:
411 log.log(
419 log.log(
412 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
420 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
413 self.name
421 self.name
414 )
422 )
415 self.zmq_socket.send_pyobj(self.dataOut)
423 self.zmq_socket.send_pyobj(self.dataOut)
416
424
417 def run(self, dataOut, **kwargs):
425 def run(self, dataOut, **kwargs):
418 self.dataOut = dataOut
426 self.dataOut = dataOut
419 if not self.isConfig:
427 if not self.isConfig:
420 self.setup(**kwargs)
428 self.setup(**kwargs)
421 self.isConfig = True
429 self.isConfig = True
422
430
423 self.publish_data()
431 self.publish_data()
424 time.sleep(self.delay)
432 time.sleep(self.delay)
425
433
426 def close(self):
434 def close(self):
427 if self.zeromq is 1:
435 if self.zeromq is 1:
428 self.dataOut.finished = True
436 self.dataOut.finished = True
429 self.zmq_socket.send_pyobj(self.dataOut)
437 self.zmq_socket.send_pyobj(self.dataOut)
430 time.sleep(0.1)
438 time.sleep(0.1)
431 self.zmq_socket.close()
439 self.zmq_socket.close()
432 if self.client:
440 if self.client:
433 self.client.loop_stop()
441 self.client.loop_stop()
434 self.client.disconnect()
442 self.client.disconnect()
435
443
436
444
437 class ReceiverData(ProcessingUnit):
445 class ReceiverData(ProcessingUnit):
438
446
439 __attrs__ = ['server']
447 __attrs__ = ['server']
440
448
441 def __init__(self, **kwargs):
449 def __init__(self, **kwargs):
442
450
443 ProcessingUnit.__init__(self, **kwargs)
451 ProcessingUnit.__init__(self, **kwargs)
444
452
445 self.isConfig = False
453 self.isConfig = False
446 server = kwargs.get('server', 'zmq.pipe')
454 server = kwargs.get('server', 'zmq.pipe')
447 if 'tcp://' in server:
455 if 'tcp://' in server:
448 address = server
456 address = server
449 else:
457 else:
450 address = 'ipc:///tmp/%s' % server
458 address = 'ipc:///tmp/%s' % server
451
459
452 self.address = address
460 self.address = address
453 self.dataOut = JROData()
461 self.dataOut = JROData()
454
462
455 def setup(self):
463 def setup(self):
456
464
457 self.context = zmq.Context()
465 self.context = zmq.Context()
458 self.receiver = self.context.socket(zmq.PULL)
466 self.receiver = self.context.socket(zmq.PULL)
459 self.receiver.bind(self.address)
467 self.receiver.bind(self.address)
460 time.sleep(0.5)
468 time.sleep(0.5)
461 log.success('ReceiverData from {}'.format(self.address))
469 log.success('ReceiverData from {}'.format(self.address))
462
470
463
471
464 def run(self):
472 def run(self):
465
473
466 if not self.isConfig:
474 if not self.isConfig:
467 self.setup()
475 self.setup()
468 self.isConfig = True
476 self.isConfig = True
469
477
470 self.dataOut = self.receiver.recv_pyobj()
478 self.dataOut = self.receiver.recv_pyobj()
471 log.log('{} - {}'.format(self.dataOut.type,
479 log.log('{} - {}'.format(self.dataOut.type,
472 self.dataOut.datatime.ctime(),),
480 self.dataOut.datatime.ctime(),),
473 'Receiving')
481 'Receiving')
474
482
475
483
476 class PlotterReceiver(ProcessingUnit, Process):
484 class PlotterReceiver(ProcessingUnit, Process):
477
485
478 throttle_value = 5
486 throttle_value = 5
479 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle']
487 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
488 'exp_code', 'web_server']
480
489
481 def __init__(self, **kwargs):
490 def __init__(self, **kwargs):
482
491
483 ProcessingUnit.__init__(self, **kwargs)
492 ProcessingUnit.__init__(self, **kwargs)
484 Process.__init__(self)
493 Process.__init__(self)
485 self.mp = False
494 self.mp = False
486 self.isConfig = False
495 self.isConfig = False
487 self.isWebConfig = False
496 self.isWebConfig = False
488 self.connections = 0
497 self.connections = 0
489 server = kwargs.get('server', 'zmq.pipe')
498 server = kwargs.get('server', 'zmq.pipe')
490 plot_server = kwargs.get('plot_server', 'zmq.web')
499 web_server = kwargs.get('web_server', None)
491 if 'tcp://' in server:
500 if 'tcp://' in server:
492 address = server
501 address = server
493 else:
502 else:
494 address = 'ipc:///tmp/%s' % server
503 address = 'ipc:///tmp/%s' % server
495
496 if 'tcp://' in plot_server:
497 plot_address = plot_server
498 else:
499 plot_address = 'ipc:///tmp/%s' % plot_server
500
501 self.address = address
504 self.address = address
502 self.plot_address = plot_address
505 self.web_address = web_server
503 self.plottypes = [s.strip() for s in kwargs.get('plottypes', '').split(',') if s]
506 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
504 self.realtime = kwargs.get('realtime', False)
507 self.realtime = kwargs.get('realtime', False)
505 self.localtime = kwargs.get('localtime', True)
508 self.localtime = kwargs.get('localtime', True)
506 self.throttle_value = kwargs.get('throttle', 5)
509 self.throttle_value = kwargs.get('throttle', 5)
510 self.exp_code = kwargs.get('exp_code', None)
507 self.sendData = self.initThrottle(self.throttle_value)
511 self.sendData = self.initThrottle(self.throttle_value)
508 self.dates = []
512 self.dates = []
509 self.setup()
513 self.setup()
510
514
511 def setup(self):
515 def setup(self):
512
516
513 self.data = Data(self.plottypes, self.throttle_value)
517 self.data = Data(self.plottypes, self.throttle_value, self.exp_code)
514 self.isConfig = True
518 self.isConfig = True
515
519
516 def event_monitor(self, monitor):
520 def event_monitor(self, monitor):
517
521
518 events = {}
522 events = {}
519
523
520 for name in dir(zmq):
524 for name in dir(zmq):
521 if name.startswith('EVENT_'):
525 if name.startswith('EVENT_'):
522 value = getattr(zmq, name)
526 value = getattr(zmq, name)
523 events[value] = name
527 events[value] = name
524
528
525 while monitor.poll():
529 while monitor.poll():
526 evt = recv_monitor_message(monitor)
530 evt = recv_monitor_message(monitor)
527 if evt['event'] == 32:
531 if evt['event'] == 32:
528 self.connections += 1
532 self.connections += 1
529 if evt['event'] == 512:
533 if evt['event'] == 512:
530 pass
534 pass
531
535
532 evt.update({'description': events[evt['event']]})
536 evt.update({'description': events[evt['event']]})
533
537
534 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
538 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
535 break
539 break
536 monitor.close()
540 monitor.close()
537 print('event monitor thread done!')
541 print('event monitor thread done!')
538
542
539 def initThrottle(self, throttle_value):
543 def initThrottle(self, throttle_value):
540
544
541 @throttle(seconds=throttle_value)
545 @throttle(seconds=throttle_value)
542 def sendDataThrottled(fn_sender, data):
546 def sendDataThrottled(fn_sender, data):
543 fn_sender(data)
547 fn_sender(data)
544
548
545 return sendDataThrottled
549 return sendDataThrottled
546
550
547 def send(self, data):
551 def send(self, data):
548 log.success('Sending {}'.format(data), self.name)
552 log.success('Sending {}'.format(data), self.name)
549 self.sender.send_pyobj(data)
553 self.sender.send_pyobj(data)
550
554
551 def run(self):
555 def run(self):
552
556
553 log.success(
557 log.success(
554 'Starting from {}'.format(self.address),
558 'Starting from {}'.format(self.address),
555 self.name
559 self.name
556 )
560 )
557
561
558 self.context = zmq.Context()
562 self.context = zmq.Context()
559 self.receiver = self.context.socket(zmq.PULL)
563 self.receiver = self.context.socket(zmq.PULL)
560 self.receiver.bind(self.address)
564 self.receiver.bind(self.address)
561 monitor = self.receiver.get_monitor_socket()
565 monitor = self.receiver.get_monitor_socket()
562 self.sender = self.context.socket(zmq.PUB)
566 self.sender = self.context.socket(zmq.PUB)
563 if self.realtime:
567 if self.web_address:
568 log.success(
569 'Sending to web: {}'.format(self.web_address),
570 self.name
571 )
564 self.sender_web = self.context.socket(zmq.PUB)
572 self.sender_web = self.context.socket(zmq.PUB)
565 self.sender_web.connect(self.plot_address)
573 self.sender_web.connect(self.web_address)
566 time.sleep(1)
574 time.sleep(1)
567
575
568 if 'server' in self.kwargs:
576 if 'server' in self.kwargs:
569 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
577 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
570 else:
578 else:
571 self.sender.bind("ipc:///tmp/zmq.plots")
579 self.sender.bind("ipc:///tmp/zmq.plots")
572
580
573 time.sleep(2)
581 time.sleep(2)
574
582
575 t = Thread(target=self.event_monitor, args=(monitor,))
583 t = Thread(target=self.event_monitor, args=(monitor,))
576 t.start()
584 t.start()
577
585
578 while True:
586 while True:
579 dataOut = self.receiver.recv_pyobj()
587 dataOut = self.receiver.recv_pyobj()
580 if not dataOut.flagNoData:
588 if not dataOut.flagNoData:
581 if dataOut.type == 'Parameters':
589 if dataOut.type == 'Parameters':
582 tm = dataOut.utctimeInit
590 tm = dataOut.utctimeInit
583 else:
591 else:
584 tm = dataOut.utctime
592 tm = dataOut.utctime
585 if dataOut.useLocalTime:
593 if dataOut.useLocalTime:
586 if not self.localtime:
594 if not self.localtime:
587 tm += time.timezone
595 tm += time.timezone
588 dt = datetime.datetime.fromtimestamp(tm).date()
596 dt = datetime.datetime.fromtimestamp(tm).date()
589 else:
597 else:
590 if self.localtime:
598 if self.localtime:
591 tm -= time.timezone
599 tm -= time.timezone
592 dt = datetime.datetime.utcfromtimestamp(tm).date()
600 dt = datetime.datetime.utcfromtimestamp(tm).date()
593 coerce = False
601 coerce = False
594 if dt not in self.dates:
602 if dt not in self.dates:
595 if self.data:
603 if self.data:
596 self.data.ended = True
604 self.data.ended = True
597 self.send(self.data)
605 self.send(self.data)
598 coerce = True
606 coerce = True
599 self.data.setup()
607 self.data.setup()
600 self.dates.append(dt)
608 self.dates.append(dt)
601
609
602 self.data.update(dataOut, tm)
610 self.data.update(dataOut, tm)
603
611
604 if dataOut.finished is True:
612 if dataOut.finished is True:
605 self.connections -= 1
613 self.connections -= 1
606 if self.connections == 0 and dt in self.dates:
614 if self.connections == 0 and dt in self.dates:
607 self.data.ended = True
615 self.data.ended = True
608 self.send(self.data)
616 self.send(self.data)
609 self.data.setup()
617 self.data.setup()
610 else:
618 else:
611 if self.realtime:
619 if self.realtime:
612 self.send(self.data)
620 self.send(self.data)
613 # self.sender_web.send_string(self.data.jsonify())
621 if self.web_address:
622 self.sender_web.send(self.data.jsonify())
614 else:
623 else:
615 self.sendData(self.send, self.data, coerce=coerce)
624 self.sendData(self.send, self.data, coerce=coerce)
616 coerce = False
625 coerce = False
617
626
618 return
627 return
619
620 def sendToWeb(self):
621
622 if not self.isWebConfig:
623 context = zmq.Context()
624 sender_web_config = context.socket(zmq.PUB)
625 if 'tcp://' in self.plot_address:
626 dum, address, port = self.plot_address.split(':')
627 conf_address = '{}:{}:{}'.format(dum, address, int(port)+1)
628 else:
629 conf_address = self.plot_address + '.config'
630 sender_web_config.bind(conf_address)
631 time.sleep(1)
632 for kwargs in self.operationKwargs.values():
633 if 'plot' in kwargs:
634 log.success('[Sending] Config data to web for {}'.format(kwargs['code'].upper()))
635 sender_web_config.send_string(json.dumps(kwargs))
636 self.isWebConfig = True
General Comments 0
You need to be logged in to leave comments. Login now