##// END OF EJS Templates
Change PUB/SUB to REQ/REP for send data to web server
Juan C. Espinoza -
r1161:9b21f1781b2e
parent child
Show More
@@ -1,635 +1,661
1 '''
1 '''
2 @author: Juan C. Espinoza
2 @author: Juan C. Espinoza
3 '''
3 '''
4
4
5 import time
5 import time
6 import json
6 import json
7 import numpy
7 import numpy
8 import paho.mqtt.client as mqtt
8 import paho.mqtt.client as mqtt
9 import zmq
9 import zmq
10 import datetime
10 import datetime
11 from zmq.utils.monitor import recv_monitor_message
11 from zmq.utils.monitor import recv_monitor_message
12 from functools import wraps
12 from functools import wraps
13 from threading import Thread
13 from threading import Thread
14 from multiprocessing import Process
14 from multiprocessing import Process
15
15
16 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
16 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 from schainpy.model.data.jrodata import JROData
17 from schainpy.model.data.jrodata import JROData
18 from schainpy.utils import log
18 from schainpy.utils import log
19
19
20 MAXNUMX = 100
20 MAXNUMX = 100
21 MAXNUMY = 100
21 MAXNUMY = 100
22
22
23 class PrettyFloat(float):
23 class PrettyFloat(float):
24 def __repr__(self):
24 def __repr__(self):
25 return '%.2f' % self
25 return '%.2f' % self
26
26
27 def roundFloats(obj):
27 def roundFloats(obj):
28 if isinstance(obj, list):
28 if isinstance(obj, list):
29 return map(roundFloats, obj)
29 return map(roundFloats, obj)
30 elif isinstance(obj, float):
30 elif isinstance(obj, float):
31 return round(obj, 2)
31 return round(obj, 2)
32
32
33 def decimate(z, MAXNUMY):
33 def decimate(z, MAXNUMY):
34 dy = int(len(z[0])/MAXNUMY) + 1
34 dy = int(len(z[0])/MAXNUMY) + 1
35
35
36 return z[::, ::dy]
36 return z[::, ::dy]
37
37
38 class throttle(object):
38 class throttle(object):
39 '''
39 '''
40 Decorator that prevents a function from being called more than once every
40 Decorator that prevents a function from being called more than once every
41 time period.
41 time period.
42 To create a function that cannot be called more than once a minute, but
42 To create a function that cannot be called more than once a minute, but
43 will sleep until it can be called:
43 will sleep until it can be called:
44 @throttle(minutes=1)
44 @throttle(minutes=1)
45 def foo():
45 def foo():
46 pass
46 pass
47
47
48 for i in range(10):
48 for i in range(10):
49 foo()
49 foo()
50 print "This function has run %s times." % i
50 print "This function has run %s times." % i
51 '''
51 '''
52
52
53 def __init__(self, seconds=0, minutes=0, hours=0):
53 def __init__(self, seconds=0, minutes=0, hours=0):
54 self.throttle_period = datetime.timedelta(
54 self.throttle_period = datetime.timedelta(
55 seconds=seconds, minutes=minutes, hours=hours
55 seconds=seconds, minutes=minutes, hours=hours
56 )
56 )
57
57
58 self.time_of_last_call = datetime.datetime.min
58 self.time_of_last_call = datetime.datetime.min
59
59
60 def __call__(self, fn):
60 def __call__(self, fn):
61 @wraps(fn)
61 @wraps(fn)
62 def wrapper(*args, **kwargs):
62 def wrapper(*args, **kwargs):
63 coerce = kwargs.pop('coerce', None)
63 coerce = kwargs.pop('coerce', None)
64 if coerce:
64 if coerce:
65 self.time_of_last_call = datetime.datetime.now()
65 self.time_of_last_call = datetime.datetime.now()
66 return fn(*args, **kwargs)
66 return fn(*args, **kwargs)
67 else:
67 else:
68 now = datetime.datetime.now()
68 now = datetime.datetime.now()
69 time_since_last_call = now - self.time_of_last_call
69 time_since_last_call = now - self.time_of_last_call
70 time_left = self.throttle_period - time_since_last_call
70 time_left = self.throttle_period - time_since_last_call
71
71
72 if time_left > datetime.timedelta(seconds=0):
72 if time_left > datetime.timedelta(seconds=0):
73 return
73 return
74
74
75 self.time_of_last_call = datetime.datetime.now()
75 self.time_of_last_call = datetime.datetime.now()
76 return fn(*args, **kwargs)
76 return fn(*args, **kwargs)
77
77
78 return wrapper
78 return wrapper
79
79
80 class Data(object):
80 class Data(object):
81 '''
81 '''
82 Object to hold data to be plotted
82 Object to hold data to be plotted
83 '''
83 '''
84
84
85 def __init__(self, plottypes, throttle_value, exp_code):
85 def __init__(self, plottypes, throttle_value, exp_code):
86 self.plottypes = plottypes
86 self.plottypes = plottypes
87 self.throttle = throttle_value
87 self.throttle = throttle_value
88 self.exp_code = exp_code
88 self.exp_code = exp_code
89 self.ended = False
89 self.ended = False
90 self.localtime = False
90 self.localtime = False
91 self.__times = []
91 self.__times = []
92 self.__heights = []
92 self.__heights = []
93
93
94 def __str__(self):
94 def __str__(self):
95 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
95 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
96 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
96 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
97
97
98 def __len__(self):
98 def __len__(self):
99 return len(self.__times)
99 return len(self.__times)
100
100
101 def __getitem__(self, key):
101 def __getitem__(self, key):
102 if key not in self.data:
102 if key not in self.data:
103 raise KeyError(log.error('Missing key: {}'.format(key)))
103 raise KeyError(log.error('Missing key: {}'.format(key)))
104
104
105 if 'spc' in key:
105 if 'spc' in key:
106 ret = self.data[key]
106 ret = self.data[key]
107 else:
107 else:
108 ret = numpy.array([self.data[key][x] for x in self.times])
108 ret = numpy.array([self.data[key][x] for x in self.times])
109 if ret.ndim > 1:
109 if ret.ndim > 1:
110 ret = numpy.swapaxes(ret, 0, 1)
110 ret = numpy.swapaxes(ret, 0, 1)
111 return ret
111 return ret
112
112
113 def __contains__(self, key):
113 def __contains__(self, key):
114 return key in self.data
114 return key in self.data
115
115
116 def setup(self):
116 def setup(self):
117 '''
117 '''
118 Configure object
118 Configure object
119 '''
119 '''
120
120
121 self.ended = False
121 self.ended = False
122 self.data = {}
122 self.data = {}
123 self.__times = []
123 self.__times = []
124 self.__heights = []
124 self.__heights = []
125 self.__all_heights = set()
125 self.__all_heights = set()
126 for plot in self.plottypes:
126 for plot in self.plottypes:
127 if 'snr' in plot:
127 if 'snr' in plot:
128 plot = 'snr'
128 plot = 'snr'
129 self.data[plot] = {}
129 self.data[plot] = {}
130
130
131 def shape(self, key):
131 def shape(self, key):
132 '''
132 '''
133 Get the shape of the one-element data for the given key
133 Get the shape of the one-element data for the given key
134 '''
134 '''
135
135
136 if len(self.data[key]):
136 if len(self.data[key]):
137 if 'spc' in key:
137 if 'spc' in key:
138 return self.data[key].shape
138 return self.data[key].shape
139 return self.data[key][self.__times[0]].shape
139 return self.data[key][self.__times[0]].shape
140 return (0,)
140 return (0,)
141
141
142 def update(self, dataOut, tm):
142 def update(self, dataOut, tm):
143 '''
143 '''
144 Update data object with new dataOut
144 Update data object with new dataOut
145 '''
145 '''
146
146
147 if tm in self.__times:
147 if tm in self.__times:
148 return
148 return
149
149
150 self.parameters = getattr(dataOut, 'parameters', [])
150 self.parameters = getattr(dataOut, 'parameters', [])
151 if hasattr(dataOut, 'pairsList'):
151 if hasattr(dataOut, 'pairsList'):
152 self.pairs = dataOut.pairsList
152 self.pairs = dataOut.pairsList
153 self.channels = dataOut.channelList
153 self.channels = dataOut.channelList
154 self.interval = dataOut.getTimeInterval()
154 self.interval = dataOut.getTimeInterval()
155 self.localtime = dataOut.useLocalTime
155 self.localtime = dataOut.useLocalTime
156 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
156 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
157 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
157 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
158 self.__heights.append(dataOut.heightList)
158 self.__heights.append(dataOut.heightList)
159 self.__all_heights.update(dataOut.heightList)
159 self.__all_heights.update(dataOut.heightList)
160 self.__times.append(tm)
160 self.__times.append(tm)
161
161
162 for plot in self.plottypes:
162 for plot in self.plottypes:
163 if plot == 'spc':
163 if plot == 'spc':
164 z = dataOut.data_spc/dataOut.normFactor
164 z = dataOut.data_spc/dataOut.normFactor
165 self.data[plot] = 10*numpy.log10(z)
165 self.data[plot] = 10*numpy.log10(z)
166 if plot == 'cspc':
166 if plot == 'cspc':
167 self.data[plot] = dataOut.data_cspc
167 self.data[plot] = dataOut.data_cspc
168 if plot == 'noise':
168 if plot == 'noise':
169 self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
169 self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
170 if plot == 'rti':
170 if plot == 'rti':
171 self.data[plot][tm] = dataOut.getPower()
171 self.data[plot][tm] = dataOut.getPower()
172 if plot == 'snr_db':
172 if plot == 'snr_db':
173 self.data['snr'][tm] = dataOut.data_SNR
173 self.data['snr'][tm] = dataOut.data_SNR
174 if plot == 'snr':
174 if plot == 'snr':
175 self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR)
175 self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR)
176 if plot == 'dop':
176 if plot == 'dop':
177 self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP)
177 self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP)
178 if plot == 'mean':
178 if plot == 'mean':
179 self.data[plot][tm] = dataOut.data_MEAN
179 self.data[plot][tm] = dataOut.data_MEAN
180 if plot == 'std':
180 if plot == 'std':
181 self.data[plot][tm] = dataOut.data_STD
181 self.data[plot][tm] = dataOut.data_STD
182 if plot == 'coh':
182 if plot == 'coh':
183 self.data[plot][tm] = dataOut.getCoherence()
183 self.data[plot][tm] = dataOut.getCoherence()
184 if plot == 'phase':
184 if plot == 'phase':
185 self.data[plot][tm] = dataOut.getCoherence(phase=True)
185 self.data[plot][tm] = dataOut.getCoherence(phase=True)
186 if plot == 'output':
186 if plot == 'output':
187 self.data[plot][tm] = dataOut.data_output
187 self.data[plot][tm] = dataOut.data_output
188 if plot == 'param':
188 if plot == 'param':
189 self.data[plot][tm] = dataOut.data_param
189 self.data[plot][tm] = dataOut.data_param
190
190
191 def normalize_heights(self):
191 def normalize_heights(self):
192 '''
192 '''
193 Ensure same-dimension of the data for different heighList
193 Ensure same-dimension of the data for different heighList
194 '''
194 '''
195
195
196 H = numpy.array(list(self.__all_heights))
196 H = numpy.array(list(self.__all_heights))
197 H.sort()
197 H.sort()
198 for key in self.data:
198 for key in self.data:
199 shape = self.shape(key)[:-1] + H.shape
199 shape = self.shape(key)[:-1] + H.shape
200 for tm, obj in self.data[key].items():
200 for tm, obj in self.data[key].items():
201 h = self.__heights[self.__times.index(tm)]
201 h = self.__heights[self.__times.index(tm)]
202 if H.size == h.size:
202 if H.size == h.size:
203 continue
203 continue
204 index = numpy.where(numpy.in1d(H, h))[0]
204 index = numpy.where(numpy.in1d(H, h))[0]
205 dummy = numpy.zeros(shape) + numpy.nan
205 dummy = numpy.zeros(shape) + numpy.nan
206 if len(shape) == 2:
206 if len(shape) == 2:
207 dummy[:, index] = obj
207 dummy[:, index] = obj
208 else:
208 else:
209 dummy[index] = obj
209 dummy[index] = obj
210 self.data[key][tm] = dummy
210 self.data[key][tm] = dummy
211
211
212 self.__heights = [H for tm in self.__times]
212 self.__heights = [H for tm in self.__times]
213
213
214 def jsonify(self, decimate=False):
214 def jsonify(self, decimate=False):
215 '''
215 '''
216 Convert data to json
216 Convert data to json
217 '''
217 '''
218
218
219 data = {}
219 data = {}
220 tm = self.times[-1]
220 tm = self.times[-1]
221
221 dy = int(self.heights.size/MAXNUMY) + 1
222 for key in self.data:
222 for key in self.data:
223 if key in ('spc', 'cspc'):
223 if key in ('spc', 'cspc'):
224 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
224 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
225 dy = int(self.data[key].shape[2]/MAXNUMY) + 1
226 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
225 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
227 else:
226 else:
228 data[key] = roundFloats(self.data[key][tm].tolist())
227 data[key] = roundFloats(self.data[key][tm].tolist())
229
228
230 ret = {'data': data}
229 ret = {'data': data}
231 ret['exp_code'] = self.exp_code
230 ret['exp_code'] = self.exp_code
232 ret['time'] = tm
231 ret['time'] = tm
233 ret['interval'] = self.interval
232 ret['interval'] = self.interval
234 ret['localtime'] = self.localtime
233 ret['localtime'] = self.localtime
235 ret['yrange'] = roundFloats(self.heights.tolist())
234 ret['yrange'] = roundFloats(self.heights[::dy].tolist())
236 if key in ('spc', 'cspc'):
235 if 'spc' in self.data or 'cspc' in self.data:
237 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
236 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
238 else:
237 else:
239 ret['xrange'] = []
238 ret['xrange'] = []
240 if hasattr(self, 'pairs'):
239 if hasattr(self, 'pairs'):
241 ret['pairs'] = self.pairs
240 ret['pairs'] = self.pairs
242 else:
241 else:
243 ret['pairs'] = []
242 ret['pairs'] = []
244 return json.dumps(ret)
243 return json.dumps(ret)
245
244
246 @property
245 @property
247 def times(self):
246 def times(self):
248 '''
247 '''
249 Return the list of times of the current data
248 Return the list of times of the current data
250 '''
249 '''
251
250
252 ret = numpy.array(self.__times)
251 ret = numpy.array(self.__times)
253 ret.sort()
252 ret.sort()
254 return ret
253 return ret
255
254
256 @property
255 @property
257 def heights(self):
256 def heights(self):
258 '''
257 '''
259 Return the list of heights of the current data
258 Return the list of heights of the current data
260 '''
259 '''
261
260
262 return numpy.array(self.__heights[-1])
261 return numpy.array(self.__heights[-1])
263
262
264 class PublishData(Operation):
263 class PublishData(Operation):
265 '''
264 '''
266 Operation to send data over zmq.
265 Operation to send data over zmq.
267 '''
266 '''
268
267
269 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
268 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
270
269
271 def __init__(self, **kwargs):
270 def __init__(self, **kwargs):
272 """Inicio."""
271 """Inicio."""
273 Operation.__init__(self, **kwargs)
272 Operation.__init__(self, **kwargs)
274 self.isConfig = False
273 self.isConfig = False
275 self.client = None
274 self.client = None
276 self.zeromq = None
275 self.zeromq = None
277 self.mqtt = None
276 self.mqtt = None
278
277
279 def on_disconnect(self, client, userdata, rc):
278 def on_disconnect(self, client, userdata, rc):
280 if rc != 0:
279 if rc != 0:
281 log.warning('Unexpected disconnection.')
280 log.warning('Unexpected disconnection.')
282 self.connect()
281 self.connect()
283
282
284 def connect(self):
283 def connect(self):
285 log.warning('trying to connect')
284 log.warning('trying to connect')
286 try:
285 try:
287 self.client.connect(
286 self.client.connect(
288 host=self.host,
287 host=self.host,
289 port=self.port,
288 port=self.port,
290 keepalive=60*10,
289 keepalive=60*10,
291 bind_address='')
290 bind_address='')
292 self.client.loop_start()
291 self.client.loop_start()
293 # self.client.publish(
292 # self.client.publish(
294 # self.topic + 'SETUP',
293 # self.topic + 'SETUP',
295 # json.dumps(setup),
294 # json.dumps(setup),
296 # retain=True
295 # retain=True
297 # )
296 # )
298 except:
297 except:
299 log.error('MQTT Conection error.')
298 log.error('MQTT Conection error.')
300 self.client = False
299 self.client = False
301
300
302 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
301 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
303 self.counter = 0
302 self.counter = 0
304 self.topic = kwargs.get('topic', 'schain')
303 self.topic = kwargs.get('topic', 'schain')
305 self.delay = kwargs.get('delay', 0)
304 self.delay = kwargs.get('delay', 0)
306 self.plottype = kwargs.get('plottype', 'spectra')
305 self.plottype = kwargs.get('plottype', 'spectra')
307 self.host = kwargs.get('host', "10.10.10.82")
306 self.host = kwargs.get('host', "10.10.10.82")
308 self.port = kwargs.get('port', 3000)
307 self.port = kwargs.get('port', 3000)
309 self.clientId = clientId
308 self.clientId = clientId
310 self.cnt = 0
309 self.cnt = 0
311 self.zeromq = zeromq
310 self.zeromq = zeromq
312 self.mqtt = kwargs.get('plottype', 0)
311 self.mqtt = kwargs.get('plottype', 0)
313 self.client = None
312 self.client = None
314 self.verbose = verbose
313 self.verbose = verbose
315 setup = []
314 setup = []
316 if mqtt is 1:
315 if mqtt is 1:
317 self.client = mqtt.Client(
316 self.client = mqtt.Client(
318 client_id=self.clientId + self.topic + 'SCHAIN',
317 client_id=self.clientId + self.topic + 'SCHAIN',
319 clean_session=True)
318 clean_session=True)
320 self.client.on_disconnect = self.on_disconnect
319 self.client.on_disconnect = self.on_disconnect
321 self.connect()
320 self.connect()
322 for plot in self.plottype:
321 for plot in self.plottype:
323 setup.append({
322 setup.append({
324 'plot': plot,
323 'plot': plot,
325 'topic': self.topic + plot,
324 'topic': self.topic + plot,
326 'title': getattr(self, plot + '_' + 'title', False),
325 'title': getattr(self, plot + '_' + 'title', False),
327 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
326 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
328 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
327 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
329 'xrange': getattr(self, plot + '_' + 'xrange', False),
328 'xrange': getattr(self, plot + '_' + 'xrange', False),
330 'yrange': getattr(self, plot + '_' + 'yrange', False),
329 'yrange': getattr(self, plot + '_' + 'yrange', False),
331 'zrange': getattr(self, plot + '_' + 'zrange', False),
330 'zrange': getattr(self, plot + '_' + 'zrange', False),
332 })
331 })
333 if zeromq is 1:
332 if zeromq is 1:
334 context = zmq.Context()
333 context = zmq.Context()
335 self.zmq_socket = context.socket(zmq.PUSH)
334 self.zmq_socket = context.socket(zmq.PUSH)
336 server = kwargs.get('server', 'zmq.pipe')
335 server = kwargs.get('server', 'zmq.pipe')
337
336
338 if 'tcp://' in server:
337 if 'tcp://' in server:
339 address = server
338 address = server
340 else:
339 else:
341 address = 'ipc:///tmp/%s' % server
340 address = 'ipc:///tmp/%s' % server
342
341
343 self.zmq_socket.connect(address)
342 self.zmq_socket.connect(address)
344 time.sleep(1)
343 time.sleep(1)
345
344
346
345
347 def publish_data(self):
346 def publish_data(self):
348 self.dataOut.finished = False
347 self.dataOut.finished = False
349 if self.mqtt is 1:
348 if self.mqtt is 1:
350 yData = self.dataOut.heightList[:2].tolist()
349 yData = self.dataOut.heightList[:2].tolist()
351 if self.plottype == 'spectra':
350 if self.plottype == 'spectra':
352 data = getattr(self.dataOut, 'data_spc')
351 data = getattr(self.dataOut, 'data_spc')
353 z = data/self.dataOut.normFactor
352 z = data/self.dataOut.normFactor
354 zdB = 10*numpy.log10(z)
353 zdB = 10*numpy.log10(z)
355 xlen, ylen = zdB[0].shape
354 xlen, ylen = zdB[0].shape
356 dx = int(xlen/MAXNUMX) + 1
355 dx = int(xlen/MAXNUMX) + 1
357 dy = int(ylen/MAXNUMY) + 1
356 dy = int(ylen/MAXNUMY) + 1
358 Z = [0 for i in self.dataOut.channelList]
357 Z = [0 for i in self.dataOut.channelList]
359 for i in self.dataOut.channelList:
358 for i in self.dataOut.channelList:
360 Z[i] = zdB[i][::dx, ::dy].tolist()
359 Z[i] = zdB[i][::dx, ::dy].tolist()
361 payload = {
360 payload = {
362 'timestamp': self.dataOut.utctime,
361 'timestamp': self.dataOut.utctime,
363 'data': roundFloats(Z),
362 'data': roundFloats(Z),
364 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
363 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
365 'interval': self.dataOut.getTimeInterval(),
364 'interval': self.dataOut.getTimeInterval(),
366 'type': self.plottype,
365 'type': self.plottype,
367 'yData': yData
366 'yData': yData
368 }
367 }
369
368
370 elif self.plottype in ('rti', 'power'):
369 elif self.plottype in ('rti', 'power'):
371 data = getattr(self.dataOut, 'data_spc')
370 data = getattr(self.dataOut, 'data_spc')
372 z = data/self.dataOut.normFactor
371 z = data/self.dataOut.normFactor
373 avg = numpy.average(z, axis=1)
372 avg = numpy.average(z, axis=1)
374 avgdB = 10*numpy.log10(avg)
373 avgdB = 10*numpy.log10(avg)
375 xlen, ylen = z[0].shape
374 xlen, ylen = z[0].shape
376 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
375 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
377 AVG = [0 for i in self.dataOut.channelList]
376 AVG = [0 for i in self.dataOut.channelList]
378 for i in self.dataOut.channelList:
377 for i in self.dataOut.channelList:
379 AVG[i] = avgdB[i][::dy].tolist()
378 AVG[i] = avgdB[i][::dy].tolist()
380 payload = {
379 payload = {
381 'timestamp': self.dataOut.utctime,
380 'timestamp': self.dataOut.utctime,
382 'data': roundFloats(AVG),
381 'data': roundFloats(AVG),
383 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
382 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
384 'interval': self.dataOut.getTimeInterval(),
383 'interval': self.dataOut.getTimeInterval(),
385 'type': self.plottype,
384 'type': self.plottype,
386 'yData': yData
385 'yData': yData
387 }
386 }
388 elif self.plottype == 'noise':
387 elif self.plottype == 'noise':
389 noise = self.dataOut.getNoise()/self.dataOut.normFactor
388 noise = self.dataOut.getNoise()/self.dataOut.normFactor
390 noisedB = 10*numpy.log10(noise)
389 noisedB = 10*numpy.log10(noise)
391 payload = {
390 payload = {
392 'timestamp': self.dataOut.utctime,
391 'timestamp': self.dataOut.utctime,
393 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
392 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
394 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
393 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
395 'interval': self.dataOut.getTimeInterval(),
394 'interval': self.dataOut.getTimeInterval(),
396 'type': self.plottype,
395 'type': self.plottype,
397 'yData': yData
396 'yData': yData
398 }
397 }
399 elif self.plottype == 'snr':
398 elif self.plottype == 'snr':
400 data = getattr(self.dataOut, 'data_SNR')
399 data = getattr(self.dataOut, 'data_SNR')
401 avgdB = 10*numpy.log10(data)
400 avgdB = 10*numpy.log10(data)
402
401
403 ylen = data[0].size
402 ylen = data[0].size
404 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
403 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
405 AVG = [0 for i in self.dataOut.channelList]
404 AVG = [0 for i in self.dataOut.channelList]
406 for i in self.dataOut.channelList:
405 for i in self.dataOut.channelList:
407 AVG[i] = avgdB[i][::dy].tolist()
406 AVG[i] = avgdB[i][::dy].tolist()
408 payload = {
407 payload = {
409 'timestamp': self.dataOut.utctime,
408 'timestamp': self.dataOut.utctime,
410 'data': roundFloats(AVG),
409 'data': roundFloats(AVG),
411 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
410 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
412 'type': self.plottype,
411 'type': self.plottype,
413 'yData': yData
412 'yData': yData
414 }
413 }
415 else:
414 else:
416 print "Tipo de grafico invalido"
415 print "Tipo de grafico invalido"
417 payload = {
416 payload = {
418 'data': 'None',
417 'data': 'None',
419 'timestamp': 'None',
418 'timestamp': 'None',
420 'type': None
419 'type': None
421 }
420 }
422
421
423 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
422 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
424
423
425 if self.zeromq is 1:
424 if self.zeromq is 1:
426 if self.verbose:
425 if self.verbose:
427 log.log(
426 log.log(
428 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
427 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
429 self.name
428 self.name
430 )
429 )
431 self.zmq_socket.send_pyobj(self.dataOut)
430 self.zmq_socket.send_pyobj(self.dataOut)
432
431
433 def run(self, dataOut, **kwargs):
432 def run(self, dataOut, **kwargs):
434 self.dataOut = dataOut
433 self.dataOut = dataOut
435 if not self.isConfig:
434 if not self.isConfig:
436 self.setup(**kwargs)
435 self.setup(**kwargs)
437 self.isConfig = True
436 self.isConfig = True
438
437
439 self.publish_data()
438 self.publish_data()
440 time.sleep(self.delay)
439 time.sleep(self.delay)
441
440
442 def close(self):
441 def close(self):
443 if self.zeromq is 1:
442 if self.zeromq is 1:
444 self.dataOut.finished = True
443 self.dataOut.finished = True
445 self.zmq_socket.send_pyobj(self.dataOut)
444 self.zmq_socket.send_pyobj(self.dataOut)
446 time.sleep(0.1)
445 time.sleep(0.1)
447 self.zmq_socket.close()
446 self.zmq_socket.close()
448 if self.client:
447 if self.client:
449 self.client.loop_stop()
448 self.client.loop_stop()
450 self.client.disconnect()
449 self.client.disconnect()
451
450
452
451
453 class ReceiverData(ProcessingUnit):
452 class ReceiverData(ProcessingUnit):
454
453
455 __attrs__ = ['server']
454 __attrs__ = ['server']
456
455
457 def __init__(self, **kwargs):
456 def __init__(self, **kwargs):
458
457
459 ProcessingUnit.__init__(self, **kwargs)
458 ProcessingUnit.__init__(self, **kwargs)
460
459
461 self.isConfig = False
460 self.isConfig = False
462 server = kwargs.get('server', 'zmq.pipe')
461 server = kwargs.get('server', 'zmq.pipe')
463 if 'tcp://' in server:
462 if 'tcp://' in server:
464 address = server
463 address = server
465 else:
464 else:
466 address = 'ipc:///tmp/%s' % server
465 address = 'ipc:///tmp/%s' % server
467
466
468 self.address = address
467 self.address = address
469 self.dataOut = JROData()
468 self.dataOut = JROData()
470
469
471 def setup(self):
470 def setup(self):
472
471
473 self.context = zmq.Context()
472 self.context = zmq.Context()
474 self.receiver = self.context.socket(zmq.PULL)
473 self.receiver = self.context.socket(zmq.PULL)
475 self.receiver.bind(self.address)
474 self.receiver.bind(self.address)
476 time.sleep(0.5)
475 time.sleep(0.5)
477 log.success('ReceiverData from {}'.format(self.address))
476 log.success('ReceiverData from {}'.format(self.address))
478
477
479
478
480 def run(self):
479 def run(self):
481
480
482 if not self.isConfig:
481 if not self.isConfig:
483 self.setup()
482 self.setup()
484 self.isConfig = True
483 self.isConfig = True
485
484
486 self.dataOut = self.receiver.recv_pyobj()
485 self.dataOut = self.receiver.recv_pyobj()
487 log.log('{} - {}'.format(self.dataOut.type,
486 log.log('{} - {}'.format(self.dataOut.type,
488 self.dataOut.datatime.ctime(),),
487 self.dataOut.datatime.ctime(),),
489 'Receiving')
488 'Receiving')
490
489
491
490
492 class PlotterReceiver(ProcessingUnit, Process):
491 class PlotterReceiver(ProcessingUnit, Process):
493
492
494 throttle_value = 5
493 throttle_value = 5
495 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
494 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
496 'exp_code', 'web_server']
495 'exp_code', 'web_server']
497
496
498 def __init__(self, **kwargs):
497 def __init__(self, **kwargs):
499
498
500 ProcessingUnit.__init__(self, **kwargs)
499 ProcessingUnit.__init__(self, **kwargs)
501 Process.__init__(self)
500 Process.__init__(self)
502 self.mp = False
501 self.mp = False
503 self.isConfig = False
502 self.isConfig = False
504 self.isWebConfig = False
503 self.isWebConfig = False
505 self.connections = 0
504 self.connections = 0
506 server = kwargs.get('server', 'zmq.pipe')
505 server = kwargs.get('server', 'zmq.pipe')
507 web_server = kwargs.get('web_server', None)
506 web_server = kwargs.get('web_server', None)
508 if 'tcp://' in server:
507 if 'tcp://' in server:
509 address = server
508 address = server
510 else:
509 else:
511 address = 'ipc:///tmp/%s' % server
510 address = 'ipc:///tmp/%s' % server
512 self.address = address
511 self.address = address
513 self.web_address = web_server
512 self.web_address = web_server
514 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
513 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
515 self.realtime = kwargs.get('realtime', False)
514 self.realtime = kwargs.get('realtime', False)
516 self.localtime = kwargs.get('localtime', True)
515 self.localtime = kwargs.get('localtime', True)
517 self.throttle_value = kwargs.get('throttle', 5)
516 self.throttle_value = kwargs.get('throttle', 5)
518 self.exp_code = kwargs.get('exp_code', None)
517 self.exp_code = kwargs.get('exp_code', None)
519 self.sendData = self.initThrottle(self.throttle_value)
518 self.sendData = self.initThrottle(self.throttle_value)
520 self.dates = []
519 self.dates = []
521 self.setup()
520 self.setup()
522
521
523 def setup(self):
522 def setup(self):
524
523
525 self.data = Data(self.plottypes, self.throttle_value, self.exp_code)
524 self.data = Data(self.plottypes, self.throttle_value, self.exp_code)
526 self.isConfig = True
525 self.isConfig = True
527
526
528 def event_monitor(self, monitor):
527 def event_monitor(self, monitor):
529
528
530 events = {}
529 events = {}
531
530
532 for name in dir(zmq):
531 for name in dir(zmq):
533 if name.startswith('EVENT_'):
532 if name.startswith('EVENT_'):
534 value = getattr(zmq, name)
533 value = getattr(zmq, name)
535 events[value] = name
534 events[value] = name
536
535
537 while monitor.poll():
536 while monitor.poll():
538 evt = recv_monitor_message(monitor)
537 evt = recv_monitor_message(monitor)
539 if evt['event'] == 32:
538 if evt['event'] == 32:
540 self.connections += 1
539 self.connections += 1
541 if evt['event'] == 512:
540 if evt['event'] == 512:
542 pass
541 pass
543
542
544 evt.update({'description': events[evt['event']]})
543 evt.update({'description': events[evt['event']]})
545
544
546 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
545 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
547 break
546 break
548 monitor.close()
547 monitor.close()
549 print('event monitor thread done!')
548 print('event monitor thread done!')
550
549
551 def initThrottle(self, throttle_value):
550 def initThrottle(self, throttle_value):
552
551
553 @throttle(seconds=throttle_value)
552 @throttle(seconds=throttle_value)
554 def sendDataThrottled(fn_sender, data):
553 def sendDataThrottled(fn_sender, data):
555 fn_sender(data)
554 fn_sender(data)
556
555
557 return sendDataThrottled
556 return sendDataThrottled
558
557
559 def send(self, data):
558 def send(self, data):
560 log.success('Sending {}'.format(data), self.name)
559 log.success('Sending {}'.format(data), self.name)
561 self.sender.send_pyobj(data)
560 self.sender.send_pyobj(data)
562
561
563 def run(self):
562 def run(self):
564
563
565 log.success(
564 log.success(
566 'Starting from {}'.format(self.address),
565 'Starting from {}'.format(self.address),
567 self.name
566 self.name
568 )
567 )
569
568
570 self.context = zmq.Context()
569 self.context = zmq.Context()
571 self.receiver = self.context.socket(zmq.PULL)
570 self.receiver = self.context.socket(zmq.PULL)
572 self.receiver.bind(self.address)
571 self.receiver.bind(self.address)
573 monitor = self.receiver.get_monitor_socket()
572 monitor = self.receiver.get_monitor_socket()
574 self.sender = self.context.socket(zmq.PUB)
573 self.sender = self.context.socket(zmq.PUB)
575 if self.web_address:
574 if self.web_address:
576 log.success(
575 log.success(
577 'Sending to web: {}'.format(self.web_address),
576 'Sending to web: {}'.format(self.web_address),
578 self.name
577 self.name
579 )
578 )
580 self.sender_web = self.context.socket(zmq.PUB)
579 self.sender_web = self.context.socket(zmq.REQ)
581 self.sender_web.connect(self.web_address)
580 self.sender_web.connect(self.web_address)
581 self.poll = zmq.Poller()
582 self.poll.register(self.sender_web, zmq.POLLIN)
582 time.sleep(1)
583 time.sleep(1)
583
584
584 if 'server' in self.kwargs:
585 if 'server' in self.kwargs:
585 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
586 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
586 else:
587 else:
587 self.sender.bind("ipc:///tmp/zmq.plots")
588 self.sender.bind("ipc:///tmp/zmq.plots")
588
589
589 time.sleep(2)
590 time.sleep(2)
590
591
591 t = Thread(target=self.event_monitor, args=(monitor,))
592 t = Thread(target=self.event_monitor, args=(monitor,))
592 t.start()
593 t.start()
593
594
594 while True:
595 while True:
595 dataOut = self.receiver.recv_pyobj()
596 dataOut = self.receiver.recv_pyobj()
596 if not dataOut.flagNoData:
597 if not dataOut.flagNoData:
597 if dataOut.type == 'Parameters':
598 if dataOut.type == 'Parameters':
598 tm = dataOut.utctimeInit
599 tm = dataOut.utctimeInit
599 else:
600 else:
600 tm = dataOut.utctime
601 tm = dataOut.utctime
601 if dataOut.useLocalTime:
602 if dataOut.useLocalTime:
602 if not self.localtime:
603 if not self.localtime:
603 tm += time.timezone
604 tm += time.timezone
604 dt = datetime.datetime.fromtimestamp(tm).date()
605 dt = datetime.datetime.fromtimestamp(tm).date()
605 else:
606 else:
606 if self.localtime:
607 if self.localtime:
607 tm -= time.timezone
608 tm -= time.timezone
608 dt = datetime.datetime.utcfromtimestamp(tm).date()
609 dt = datetime.datetime.utcfromtimestamp(tm).date()
609 coerce = False
610 coerce = False
610 if dt not in self.dates:
611 if dt not in self.dates:
611 if self.data:
612 if self.data:
612 self.data.ended = True
613 self.data.ended = True
613 self.send(self.data)
614 self.send(self.data)
614 coerce = True
615 coerce = True
615 self.data.setup()
616 self.data.setup()
616 self.dates.append(dt)
617 self.dates.append(dt)
617
618
618 self.data.update(dataOut, tm)
619 self.data.update(dataOut, tm)
619
620
620 if dataOut.finished is True:
621 if dataOut.finished is True:
621 self.connections -= 1
622 self.connections -= 1
622 if self.connections == 0 and dt in self.dates:
623 if self.connections == 0 and dt in self.dates:
623 self.data.ended = True
624 self.data.ended = True
624 self.send(self.data)
625 self.send(self.data)
625 self.data.setup()
626 # self.data.setup()
627 time.sleep(1)
628 break
626 else:
629 else:
627 if self.realtime:
630 if self.realtime:
628 self.send(self.data)
631 self.send(self.data)
629 if self.web_address:
632 if self.web_address:
630 self.sender_web.send(self.data.jsonify())
633 retries = 5
634 while True:
635 self.sender_web.send(self.data.jsonify())
636 socks = dict(self.poll.poll(5000))
637 if socks.get(self.sender_web) == zmq.POLLIN:
638 reply = self.sender_web.recv_string()
639 if reply == 'ok':
640 break
641 else:
642 print("Malformed reply from server: %s" % reply)
643
644 else:
645 print("No response from server, retrying...")
646 self.sender_web.setsockopt(zmq.LINGER, 0)
647 self.sender_web.close()
648 self.poll.unregister(self.sender_web)
649 retries -= 1
650 if retries == 0:
651 print("Server seems to be offline, abandoning")
652 break
653 self.sender_web = self.context.socket(zmq.REQ)
654 self.sender_web.connect(self.web_address)
655 self.poll.register(self.sender_web, zmq.POLLIN)
656 time.sleep(1)
631 else:
657 else:
632 self.sendData(self.send, self.data, coerce=coerce)
658 self.sendData(self.send, self.data, coerce=coerce)
633 coerce = False
659 coerce = False
634
660
635 return
661 return
General Comments 0
You need to be logged in to leave comments. Login now