##// END OF EJS Templates
Fix time when publishing Parameters
Juan C. Espinoza -
r1094:4c0b45256b7d
parent child
Show More
@@ -1,619 +1,631
1 '''
1 '''
2 @author: Juan C. Espinoza
2 @author: Juan C. Espinoza
3 '''
3 '''
4
4
5 import time
5 import time
6 import json
6 import json
7 import numpy
7 import numpy
8 import paho.mqtt.client as mqtt
8 import paho.mqtt.client as mqtt
9 import zmq
9 import zmq
10 import datetime
10 import datetime
11 from zmq.utils.monitor import recv_monitor_message
11 from zmq.utils.monitor import recv_monitor_message
12 from functools import wraps
12 from functools import wraps
13 from threading import Thread
13 from threading import Thread
14 from multiprocessing import Process
14 from multiprocessing import Process
15
15
16 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
16 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 from schainpy.model.data.jrodata import JROData
17 from schainpy.model.data.jrodata import JROData
18 from schainpy.utils import log
18 from schainpy.utils import log
19
19
20 MAXNUMX = 100
20 MAXNUMX = 100
21 MAXNUMY = 100
21 MAXNUMY = 100
22
22
23 class PrettyFloat(float):
23 class PrettyFloat(float):
24 def __repr__(self):
24 def __repr__(self):
25 return '%.2f' % self
25 return '%.2f' % self
26
26
27 def roundFloats(obj):
27 def roundFloats(obj):
28 if isinstance(obj, list):
28 if isinstance(obj, list):
29 return map(roundFloats, obj)
29 return map(roundFloats, obj)
30 elif isinstance(obj, float):
30 elif isinstance(obj, float):
31 return round(obj, 2)
31 return round(obj, 2)
32
32
33 def decimate(z, MAXNUMY):
33 def decimate(z, MAXNUMY):
34 dy = int(len(z[0])/MAXNUMY) + 1
34 dy = int(len(z[0])/MAXNUMY) + 1
35
35
36 return z[::, ::dy]
36 return z[::, ::dy]
37
37
38 class throttle(object):
38 class throttle(object):
39 '''
39 '''
40 Decorator that prevents a function from being called more than once every
40 Decorator that prevents a function from being called more than once every
41 time period.
41 time period.
42 To create a function that cannot be called more than once a minute, but
42 To create a function that cannot be called more than once a minute, but
43 will sleep until it can be called:
43 will sleep until it can be called:
44 @throttle(minutes=1)
44 @throttle(minutes=1)
45 def foo():
45 def foo():
46 pass
46 pass
47
47
48 for i in range(10):
48 for i in range(10):
49 foo()
49 foo()
50 print "This function has run %s times." % i
50 print "This function has run %s times." % i
51 '''
51 '''
52
52
53 def __init__(self, seconds=0, minutes=0, hours=0):
53 def __init__(self, seconds=0, minutes=0, hours=0):
54 self.throttle_period = datetime.timedelta(
54 self.throttle_period = datetime.timedelta(
55 seconds=seconds, minutes=minutes, hours=hours
55 seconds=seconds, minutes=minutes, hours=hours
56 )
56 )
57
57
58 self.time_of_last_call = datetime.datetime.min
58 self.time_of_last_call = datetime.datetime.min
59
59
60 def __call__(self, fn):
60 def __call__(self, fn):
61 @wraps(fn)
61 @wraps(fn)
62 def wrapper(*args, **kwargs):
62 def wrapper(*args, **kwargs):
63 now = datetime.datetime.now()
63 coerce = kwargs.pop('coerce', None)
64 time_since_last_call = now - self.time_of_last_call
64 if coerce:
65 time_left = self.throttle_period - time_since_last_call
65 self.time_of_last_call = datetime.datetime.now()
66 return fn(*args, **kwargs)
67 else:
68 now = datetime.datetime.now()
69 time_since_last_call = now - self.time_of_last_call
70 time_left = self.throttle_period - time_since_last_call
66
71
67 if time_left > datetime.timedelta(seconds=0):
72 if time_left > datetime.timedelta(seconds=0):
68 return
73 return
69
74
70 self.time_of_last_call = datetime.datetime.now()
75 self.time_of_last_call = datetime.datetime.now()
71 return fn(*args, **kwargs)
76 return fn(*args, **kwargs)
72
77
73 return wrapper
78 return wrapper
74
79
75 class Data(object):
80 class Data(object):
76 '''
81 '''
77 Object to hold data to be plotted
82 Object to hold data to be plotted
78 '''
83 '''
79
84
80 def __init__(self, plottypes, throttle_value):
85 def __init__(self, plottypes, throttle_value):
81 self.plottypes = plottypes
86 self.plottypes = plottypes
82 self.throttle = throttle_value
87 self.throttle = throttle_value
83 self.ended = False
88 self.ended = False
84 self.localtime = False
89 self.localtime = False
85 self.__times = []
90 self.__times = []
86 self.__heights = []
91 self.__heights = []
87
92
88 def __str__(self):
93 def __str__(self):
89 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
94 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
90 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
95 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
91
96
92 def __len__(self):
97 def __len__(self):
93 return len(self.__times)
98 return len(self.__times)
94
99
95 def __getitem__(self, key):
100 def __getitem__(self, key):
96 if key not in self.data:
101 if key not in self.data:
97 raise KeyError(log.error('Missing key: {}'.format(key)))
102 raise KeyError(log.error('Missing key: {}'.format(key)))
98
103
99 if 'spc' in key:
104 if 'spc' in key:
100 ret = self.data[key]
105 ret = self.data[key]
101 else:
106 else:
102 ret = numpy.array([self.data[key][x] for x in self.times])
107 ret = numpy.array([self.data[key][x] for x in self.times])
103 if ret.ndim > 1:
108 if ret.ndim > 1:
104 ret = numpy.swapaxes(ret, 0, 1)
109 ret = numpy.swapaxes(ret, 0, 1)
105 return ret
110 return ret
106
111
112 def __contains__(self, key):
113 return key in self.data
114
107 def setup(self):
115 def setup(self):
108 '''
116 '''
109 Configure object
117 Configure object
110 '''
118 '''
111
119
112 self.ended = False
120 self.ended = False
113 self.data = {}
121 self.data = {}
114 self.__times = []
122 self.__times = []
115 self.__heights = []
123 self.__heights = []
116 self.__all_heights = set()
124 self.__all_heights = set()
117 for plot in self.plottypes:
125 for plot in self.plottypes:
118 if 'snr' in plot:
126 if 'snr' in plot:
119 plot = 'snr'
127 plot = 'snr'
120 self.data[plot] = {}
128 self.data[plot] = {}
121
129
122 def shape(self, key):
130 def shape(self, key):
123 '''
131 '''
124 Get the shape of the one-element data for the given key
132 Get the shape of the one-element data for the given key
125 '''
133 '''
126
134
127 if len(self.data[key]):
135 if len(self.data[key]):
128 if 'spc' in key:
136 if 'spc' in key:
129 return self.data[key].shape
137 return self.data[key].shape
130 return self.data[key][self.__times[0]].shape
138 return self.data[key][self.__times[0]].shape
131 return (0,)
139 return (0,)
132
140
133 def update(self, dataOut):
141 def update(self, dataOut):
134 '''
142 '''
135 Update data object with new dataOut
143 Update data object with new dataOut
136 '''
144 '''
137
145
138 tm = dataOut.utctime
146 tm = dataOut.utctime
139 if tm in self.__times:
147 if tm in self.__times:
140 return
148 return
141
149
142 self.parameters = getattr(dataOut, 'parameters', [])
150 self.parameters = getattr(dataOut, 'parameters', [])
143 self.pairs = dataOut.pairsList
151 self.pairs = dataOut.pairsList
144 self.channels = dataOut.channelList
152 self.channels = dataOut.channelList
145 self.interval = dataOut.getTimeInterval()
153 self.interval = dataOut.getTimeInterval()
146 self.localtime = dataOut.useLocalTime
154 self.localtime = dataOut.useLocalTime
147 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
155 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
148 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
156 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
149 self.__heights.append(dataOut.heightList)
157 self.__heights.append(dataOut.heightList)
150 self.__all_heights.update(dataOut.heightList)
158 self.__all_heights.update(dataOut.heightList)
151 self.__times.append(tm)
159 self.__times.append(tm)
152
160
153 for plot in self.plottypes:
161 for plot in self.plottypes:
154 if plot == 'spc':
162 if plot == 'spc':
155 z = dataOut.data_spc/dataOut.normFactor
163 z = dataOut.data_spc/dataOut.normFactor
156 self.data[plot] = 10*numpy.log10(z)
164 self.data[plot] = 10*numpy.log10(z)
157 if plot == 'cspc':
165 if plot == 'cspc':
158 self.data[plot] = dataOut.data_cspc
166 self.data[plot] = dataOut.data_cspc
159 if plot == 'noise':
167 if plot == 'noise':
160 self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
168 self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
161 if plot == 'rti':
169 if plot == 'rti':
162 self.data[plot][tm] = dataOut.getPower()
170 self.data[plot][tm] = dataOut.getPower()
163 if plot == 'snr_db':
171 if plot == 'snr_db':
164 self.data['snr'][tm] = dataOut.data_SNR
172 self.data['snr'][tm] = dataOut.data_SNR
165 if plot == 'snr':
173 if plot == 'snr':
166 self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR)
174 self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR)
167 if plot == 'dop':
175 if plot == 'dop':
168 self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP)
176 self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP)
169 if plot == 'mean':
177 if plot == 'mean':
170 self.data[plot][tm] = dataOut.data_MEAN
178 self.data[plot][tm] = dataOut.data_MEAN
171 if plot == 'std':
179 if plot == 'std':
172 self.data[plot][tm] = dataOut.data_STD
180 self.data[plot][tm] = dataOut.data_STD
173 if plot == 'coh':
181 if plot == 'coh':
174 self.data[plot][tm] = dataOut.getCoherence()
182 self.data[plot][tm] = dataOut.getCoherence()
175 if plot == 'phase':
183 if plot == 'phase':
176 self.data[plot][tm] = dataOut.getCoherence(phase=True)
184 self.data[plot][tm] = dataOut.getCoherence(phase=True)
177 if plot == 'output':
185 if plot == 'output':
178 self.data[plot][tm] = dataOut.data_output
186 self.data[plot][tm] = dataOut.data_output
179 if plot == 'param':
187 if plot == 'param':
180 self.data[plot][tm] = dataOut.data_param
188 self.data[plot][tm] = dataOut.data_param
181
189
182 def normalize_heights(self):
190 def normalize_heights(self):
183 '''
191 '''
184 Ensure same-dimension of the data for different heighList
192 Ensure same-dimension of the data for different heighList
185 '''
193 '''
186
194
187 H = numpy.array(list(self.__all_heights))
195 H = numpy.array(list(self.__all_heights))
188 H.sort()
196 H.sort()
189 for key in self.data:
197 for key in self.data:
190 shape = self.shape(key)[:-1] + H.shape
198 shape = self.shape(key)[:-1] + H.shape
191 for tm, obj in self.data[key].items():
199 for tm, obj in self.data[key].items():
192 h = self.__heights[self.__times.index(tm)]
200 h = self.__heights[self.__times.index(tm)]
193 if H.size == h.size:
201 if H.size == h.size:
194 continue
202 continue
195 index = numpy.where(numpy.in1d(H, h))[0]
203 index = numpy.where(numpy.in1d(H, h))[0]
196 dummy = numpy.zeros(shape) + numpy.nan
204 dummy = numpy.zeros(shape) + numpy.nan
197 if len(shape) == 2:
205 if len(shape) == 2:
198 dummy[:, index] = obj
206 dummy[:, index] = obj
199 else:
207 else:
200 dummy[index] = obj
208 dummy[index] = obj
201 self.data[key][tm] = dummy
209 self.data[key][tm] = dummy
202
210
203 self.__heights = [H for tm in self.__times]
211 self.__heights = [H for tm in self.__times]
204
212
205 def jsonify(self, decimate=False):
213 def jsonify(self, decimate=False):
206 '''
214 '''
207 Convert data to json
215 Convert data to json
208 '''
216 '''
209
217
210 ret = {}
218 ret = {}
211 tm = self.times[-1]
219 tm = self.times[-1]
212
220
213 for key, value in self.data:
221 for key, value in self.data:
214 if key in ('spc', 'cspc'):
222 if key in ('spc', 'cspc'):
215 ret[key] = roundFloats(self.data[key].to_list())
223 ret[key] = roundFloats(self.data[key].to_list())
216 else:
224 else:
217 ret[key] = roundFloats(self.data[key][tm].to_list())
225 ret[key] = roundFloats(self.data[key][tm].to_list())
218
226
219 ret['timestamp'] = tm
227 ret['timestamp'] = tm
220 ret['interval'] = self.interval
228 ret['interval'] = self.interval
221
229
222 @property
230 @property
223 def times(self):
231 def times(self):
224 '''
232 '''
225 Return the list of times of the current data
233 Return the list of times of the current data
226 '''
234 '''
227
235
228 ret = numpy.array(self.__times)
236 ret = numpy.array(self.__times)
229 ret.sort()
237 ret.sort()
230 return ret
238 return ret
231
239
232 @property
240 @property
233 def heights(self):
241 def heights(self):
234 '''
242 '''
235 Return the list of heights of the current data
243 Return the list of heights of the current data
236 '''
244 '''
237
245
238 return numpy.array(self.__heights[-1])
246 return numpy.array(self.__heights[-1])
239
247
240 class PublishData(Operation):
248 class PublishData(Operation):
241 '''
249 '''
242 Operation to send data over zmq.
250 Operation to send data over zmq.
243 '''
251 '''
244
252
245 def __init__(self, **kwargs):
253 def __init__(self, **kwargs):
246 """Inicio."""
254 """Inicio."""
247 Operation.__init__(self, **kwargs)
255 Operation.__init__(self, **kwargs)
248 self.isConfig = False
256 self.isConfig = False
249 self.client = None
257 self.client = None
250 self.zeromq = None
258 self.zeromq = None
251 self.mqtt = None
259 self.mqtt = None
252
260
253 def on_disconnect(self, client, userdata, rc):
261 def on_disconnect(self, client, userdata, rc):
254 if rc != 0:
262 if rc != 0:
255 log.warning('Unexpected disconnection.')
263 log.warning('Unexpected disconnection.')
256 self.connect()
264 self.connect()
257
265
258 def connect(self):
266 def connect(self):
259 log.warning('trying to connect')
267 log.warning('trying to connect')
260 try:
268 try:
261 self.client.connect(
269 self.client.connect(
262 host=self.host,
270 host=self.host,
263 port=self.port,
271 port=self.port,
264 keepalive=60*10,
272 keepalive=60*10,
265 bind_address='')
273 bind_address='')
266 self.client.loop_start()
274 self.client.loop_start()
267 # self.client.publish(
275 # self.client.publish(
268 # self.topic + 'SETUP',
276 # self.topic + 'SETUP',
269 # json.dumps(setup),
277 # json.dumps(setup),
270 # retain=True
278 # retain=True
271 # )
279 # )
272 except:
280 except:
273 log.error('MQTT Conection error.')
281 log.error('MQTT Conection error.')
274 self.client = False
282 self.client = False
275
283
276 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
284 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
277 self.counter = 0
285 self.counter = 0
278 self.topic = kwargs.get('topic', 'schain')
286 self.topic = kwargs.get('topic', 'schain')
279 self.delay = kwargs.get('delay', 0)
287 self.delay = kwargs.get('delay', 0)
280 self.plottype = kwargs.get('plottype', 'spectra')
288 self.plottype = kwargs.get('plottype', 'spectra')
281 self.host = kwargs.get('host', "10.10.10.82")
289 self.host = kwargs.get('host', "10.10.10.82")
282 self.port = kwargs.get('port', 3000)
290 self.port = kwargs.get('port', 3000)
283 self.clientId = clientId
291 self.clientId = clientId
284 self.cnt = 0
292 self.cnt = 0
285 self.zeromq = zeromq
293 self.zeromq = zeromq
286 self.mqtt = kwargs.get('plottype', 0)
294 self.mqtt = kwargs.get('plottype', 0)
287 self.client = None
295 self.client = None
288 self.verbose = verbose
296 self.verbose = verbose
289 setup = []
297 setup = []
290 if mqtt is 1:
298 if mqtt is 1:
291 self.client = mqtt.Client(
299 self.client = mqtt.Client(
292 client_id=self.clientId + self.topic + 'SCHAIN',
300 client_id=self.clientId + self.topic + 'SCHAIN',
293 clean_session=True)
301 clean_session=True)
294 self.client.on_disconnect = self.on_disconnect
302 self.client.on_disconnect = self.on_disconnect
295 self.connect()
303 self.connect()
296 for plot in self.plottype:
304 for plot in self.plottype:
297 setup.append({
305 setup.append({
298 'plot': plot,
306 'plot': plot,
299 'topic': self.topic + plot,
307 'topic': self.topic + plot,
300 'title': getattr(self, plot + '_' + 'title', False),
308 'title': getattr(self, plot + '_' + 'title', False),
301 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
309 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
302 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
310 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
303 'xrange': getattr(self, plot + '_' + 'xrange', False),
311 'xrange': getattr(self, plot + '_' + 'xrange', False),
304 'yrange': getattr(self, plot + '_' + 'yrange', False),
312 'yrange': getattr(self, plot + '_' + 'yrange', False),
305 'zrange': getattr(self, plot + '_' + 'zrange', False),
313 'zrange': getattr(self, plot + '_' + 'zrange', False),
306 })
314 })
307 if zeromq is 1:
315 if zeromq is 1:
308 context = zmq.Context()
316 context = zmq.Context()
309 self.zmq_socket = context.socket(zmq.PUSH)
317 self.zmq_socket = context.socket(zmq.PUSH)
310 server = kwargs.get('server', 'zmq.pipe')
318 server = kwargs.get('server', 'zmq.pipe')
311
319
312 if 'tcp://' in server:
320 if 'tcp://' in server:
313 address = server
321 address = server
314 else:
322 else:
315 address = 'ipc:///tmp/%s' % server
323 address = 'ipc:///tmp/%s' % server
316
324
317 self.zmq_socket.connect(address)
325 self.zmq_socket.connect(address)
318 time.sleep(1)
326 time.sleep(1)
319
327
320
328
321 def publish_data(self):
329 def publish_data(self):
322 self.dataOut.finished = False
330 self.dataOut.finished = False
323 if self.mqtt is 1:
331 if self.mqtt is 1:
324 yData = self.dataOut.heightList[:2].tolist()
332 yData = self.dataOut.heightList[:2].tolist()
325 if self.plottype == 'spectra':
333 if self.plottype == 'spectra':
326 data = getattr(self.dataOut, 'data_spc')
334 data = getattr(self.dataOut, 'data_spc')
327 z = data/self.dataOut.normFactor
335 z = data/self.dataOut.normFactor
328 zdB = 10*numpy.log10(z)
336 zdB = 10*numpy.log10(z)
329 xlen, ylen = zdB[0].shape
337 xlen, ylen = zdB[0].shape
330 dx = int(xlen/MAXNUMX) + 1
338 dx = int(xlen/MAXNUMX) + 1
331 dy = int(ylen/MAXNUMY) + 1
339 dy = int(ylen/MAXNUMY) + 1
332 Z = [0 for i in self.dataOut.channelList]
340 Z = [0 for i in self.dataOut.channelList]
333 for i in self.dataOut.channelList:
341 for i in self.dataOut.channelList:
334 Z[i] = zdB[i][::dx, ::dy].tolist()
342 Z[i] = zdB[i][::dx, ::dy].tolist()
335 payload = {
343 payload = {
336 'timestamp': self.dataOut.utctime,
344 'timestamp': self.dataOut.utctime,
337 'data': roundFloats(Z),
345 'data': roundFloats(Z),
338 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
346 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
339 'interval': self.dataOut.getTimeInterval(),
347 'interval': self.dataOut.getTimeInterval(),
340 'type': self.plottype,
348 'type': self.plottype,
341 'yData': yData
349 'yData': yData
342 }
350 }
343
351
344 elif self.plottype in ('rti', 'power'):
352 elif self.plottype in ('rti', 'power'):
345 data = getattr(self.dataOut, 'data_spc')
353 data = getattr(self.dataOut, 'data_spc')
346 z = data/self.dataOut.normFactor
354 z = data/self.dataOut.normFactor
347 avg = numpy.average(z, axis=1)
355 avg = numpy.average(z, axis=1)
348 avgdB = 10*numpy.log10(avg)
356 avgdB = 10*numpy.log10(avg)
349 xlen, ylen = z[0].shape
357 xlen, ylen = z[0].shape
350 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
358 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
351 AVG = [0 for i in self.dataOut.channelList]
359 AVG = [0 for i in self.dataOut.channelList]
352 for i in self.dataOut.channelList:
360 for i in self.dataOut.channelList:
353 AVG[i] = avgdB[i][::dy].tolist()
361 AVG[i] = avgdB[i][::dy].tolist()
354 payload = {
362 payload = {
355 'timestamp': self.dataOut.utctime,
363 'timestamp': self.dataOut.utctime,
356 'data': roundFloats(AVG),
364 'data': roundFloats(AVG),
357 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
365 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
358 'interval': self.dataOut.getTimeInterval(),
366 'interval': self.dataOut.getTimeInterval(),
359 'type': self.plottype,
367 'type': self.plottype,
360 'yData': yData
368 'yData': yData
361 }
369 }
362 elif self.plottype == 'noise':
370 elif self.plottype == 'noise':
363 noise = self.dataOut.getNoise()/self.dataOut.normFactor
371 noise = self.dataOut.getNoise()/self.dataOut.normFactor
364 noisedB = 10*numpy.log10(noise)
372 noisedB = 10*numpy.log10(noise)
365 payload = {
373 payload = {
366 'timestamp': self.dataOut.utctime,
374 'timestamp': self.dataOut.utctime,
367 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
375 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
368 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
376 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
369 'interval': self.dataOut.getTimeInterval(),
377 'interval': self.dataOut.getTimeInterval(),
370 'type': self.plottype,
378 'type': self.plottype,
371 'yData': yData
379 'yData': yData
372 }
380 }
373 elif self.plottype == 'snr':
381 elif self.plottype == 'snr':
374 data = getattr(self.dataOut, 'data_SNR')
382 data = getattr(self.dataOut, 'data_SNR')
375 avgdB = 10*numpy.log10(data)
383 avgdB = 10*numpy.log10(data)
376
384
377 ylen = data[0].size
385 ylen = data[0].size
378 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
386 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
379 AVG = [0 for i in self.dataOut.channelList]
387 AVG = [0 for i in self.dataOut.channelList]
380 for i in self.dataOut.channelList:
388 for i in self.dataOut.channelList:
381 AVG[i] = avgdB[i][::dy].tolist()
389 AVG[i] = avgdB[i][::dy].tolist()
382 payload = {
390 payload = {
383 'timestamp': self.dataOut.utctime,
391 'timestamp': self.dataOut.utctime,
384 'data': roundFloats(AVG),
392 'data': roundFloats(AVG),
385 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
393 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
386 'type': self.plottype,
394 'type': self.plottype,
387 'yData': yData
395 'yData': yData
388 }
396 }
389 else:
397 else:
390 print "Tipo de grafico invalido"
398 print "Tipo de grafico invalido"
391 payload = {
399 payload = {
392 'data': 'None',
400 'data': 'None',
393 'timestamp': 'None',
401 'timestamp': 'None',
394 'type': None
402 'type': None
395 }
403 }
396
404
397 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
405 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
398
406
399 if self.zeromq is 1:
407 if self.zeromq is 1:
400 if self.verbose:
408 if self.verbose:
401 log.log(
409 log.log(
402 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
410 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
403 self.name
411 self.name
404 )
412 )
405 self.zmq_socket.send_pyobj(self.dataOut)
413 self.zmq_socket.send_pyobj(self.dataOut)
406
414
407 def run(self, dataOut, **kwargs):
415 def run(self, dataOut, **kwargs):
408 self.dataOut = dataOut
416 self.dataOut = dataOut
409 if not self.isConfig:
417 if not self.isConfig:
410 self.setup(**kwargs)
418 self.setup(**kwargs)
411 self.isConfig = True
419 self.isConfig = True
412
420
413 self.publish_data()
421 self.publish_data()
414 time.sleep(self.delay)
422 time.sleep(self.delay)
415
423
416 def close(self):
424 def close(self):
417 if self.zeromq is 1:
425 if self.zeromq is 1:
418 self.dataOut.finished = True
426 self.dataOut.finished = True
419 self.zmq_socket.send_pyobj(self.dataOut)
427 self.zmq_socket.send_pyobj(self.dataOut)
420 time.sleep(0.1)
428 time.sleep(0.1)
421 self.zmq_socket.close()
429 self.zmq_socket.close()
422 if self.client:
430 if self.client:
423 self.client.loop_stop()
431 self.client.loop_stop()
424 self.client.disconnect()
432 self.client.disconnect()
425
433
426
434
427 class ReceiverData(ProcessingUnit):
435 class ReceiverData(ProcessingUnit):
428
436
429 def __init__(self, **kwargs):
437 def __init__(self, **kwargs):
430
438
431 ProcessingUnit.__init__(self, **kwargs)
439 ProcessingUnit.__init__(self, **kwargs)
432
440
433 self.isConfig = False
441 self.isConfig = False
434 server = kwargs.get('server', 'zmq.pipe')
442 server = kwargs.get('server', 'zmq.pipe')
435 if 'tcp://' in server:
443 if 'tcp://' in server:
436 address = server
444 address = server
437 else:
445 else:
438 address = 'ipc:///tmp/%s' % server
446 address = 'ipc:///tmp/%s' % server
439
447
440 self.address = address
448 self.address = address
441 self.dataOut = JROData()
449 self.dataOut = JROData()
442
450
443 def setup(self):
451 def setup(self):
444
452
445 self.context = zmq.Context()
453 self.context = zmq.Context()
446 self.receiver = self.context.socket(zmq.PULL)
454 self.receiver = self.context.socket(zmq.PULL)
447 self.receiver.bind(self.address)
455 self.receiver.bind(self.address)
448 time.sleep(0.5)
456 time.sleep(0.5)
449 log.success('ReceiverData from {}'.format(self.address))
457 log.success('ReceiverData from {}'.format(self.address))
450
458
451
459
452 def run(self):
460 def run(self):
453
461
454 if not self.isConfig:
462 if not self.isConfig:
455 self.setup()
463 self.setup()
456 self.isConfig = True
464 self.isConfig = True
457
465
458 self.dataOut = self.receiver.recv_pyobj()
466 self.dataOut = self.receiver.recv_pyobj()
459 log.log('{} - {}'.format(self.dataOut.type,
467 log.log('{} - {}'.format(self.dataOut.type,
460 self.dataOut.datatime.ctime(),),
468 self.dataOut.datatime.ctime(),),
461 'Receiving')
469 'Receiving')
462
470
463
471
464 class PlotterReceiver(ProcessingUnit, Process):
472 class PlotterReceiver(ProcessingUnit, Process):
465
473
466 throttle_value = 5
474 throttle_value = 5
467
475
468 def __init__(self, **kwargs):
476 def __init__(self, **kwargs):
469
477
470 ProcessingUnit.__init__(self, **kwargs)
478 ProcessingUnit.__init__(self, **kwargs)
471 Process.__init__(self)
479 Process.__init__(self)
472 self.mp = False
480 self.mp = False
473 self.isConfig = False
481 self.isConfig = False
474 self.isWebConfig = False
482 self.isWebConfig = False
475 self.connections = 0
483 self.connections = 0
476 server = kwargs.get('server', 'zmq.pipe')
484 server = kwargs.get('server', 'zmq.pipe')
477 plot_server = kwargs.get('plot_server', 'zmq.web')
485 plot_server = kwargs.get('plot_server', 'zmq.web')
478 if 'tcp://' in server:
486 if 'tcp://' in server:
479 address = server
487 address = server
480 else:
488 else:
481 address = 'ipc:///tmp/%s' % server
489 address = 'ipc:///tmp/%s' % server
482
490
483 if 'tcp://' in plot_server:
491 if 'tcp://' in plot_server:
484 plot_address = plot_server
492 plot_address = plot_server
485 else:
493 else:
486 plot_address = 'ipc:///tmp/%s' % plot_server
494 plot_address = 'ipc:///tmp/%s' % plot_server
487
495
488 self.address = address
496 self.address = address
489 self.plot_address = plot_address
497 self.plot_address = plot_address
490 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
498 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
491 self.realtime = kwargs.get('realtime', False)
499 self.realtime = kwargs.get('realtime', False)
492 self.localtime = kwargs.get('localtime', True)
500 self.localtime = kwargs.get('localtime', True)
493 self.throttle_value = kwargs.get('throttle', 5)
501 self.throttle_value = kwargs.get('throttle', 5)
494 self.sendData = self.initThrottle(self.throttle_value)
502 self.sendData = self.initThrottle(self.throttle_value)
495 self.dates = []
503 self.dates = []
496 self.setup()
504 self.setup()
497
505
498 def setup(self):
506 def setup(self):
499
507
500 self.data = Data(self.plottypes, self.throttle_value)
508 self.data = Data(self.plottypes, self.throttle_value)
501 self.isConfig = True
509 self.isConfig = True
502
510
503 def event_monitor(self, monitor):
511 def event_monitor(self, monitor):
504
512
505 events = {}
513 events = {}
506
514
507 for name in dir(zmq):
515 for name in dir(zmq):
508 if name.startswith('EVENT_'):
516 if name.startswith('EVENT_'):
509 value = getattr(zmq, name)
517 value = getattr(zmq, name)
510 events[value] = name
518 events[value] = name
511
519
512 while monitor.poll():
520 while monitor.poll():
513 evt = recv_monitor_message(monitor)
521 evt = recv_monitor_message(monitor)
514 if evt['event'] == 32:
522 if evt['event'] == 32:
515 self.connections += 1
523 self.connections += 1
516 if evt['event'] == 512:
524 if evt['event'] == 512:
517 pass
525 pass
518
526
519 evt.update({'description': events[evt['event']]})
527 evt.update({'description': events[evt['event']]})
520
528
521 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
529 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
522 break
530 break
523 monitor.close()
531 monitor.close()
524 print('event monitor thread done!')
532 print('event monitor thread done!')
525
533
526 def initThrottle(self, throttle_value):
534 def initThrottle(self, throttle_value):
527
535
528 @throttle(seconds=throttle_value)
536 @throttle(seconds=throttle_value)
529 def sendDataThrottled(fn_sender, data):
537 def sendDataThrottled(fn_sender, data):
530 fn_sender(data)
538 fn_sender(data)
531
539
532 return sendDataThrottled
540 return sendDataThrottled
533
541
534 def send(self, data):
542 def send(self, data):
535 log.success('Sending {}'.format(data), self.name)
543 log.success('Sending {}'.format(data), self.name)
536 self.sender.send_pyobj(data)
544 self.sender.send_pyobj(data)
537
545
538 def run(self):
546 def run(self):
539
547
540 log.success(
548 log.success(
541 'Starting from {}'.format(self.address),
549 'Starting from {}'.format(self.address),
542 self.name
550 self.name
543 )
551 )
544
552
545 self.context = zmq.Context()
553 self.context = zmq.Context()
546 self.receiver = self.context.socket(zmq.PULL)
554 self.receiver = self.context.socket(zmq.PULL)
547 self.receiver.bind(self.address)
555 self.receiver.bind(self.address)
548 monitor = self.receiver.get_monitor_socket()
556 monitor = self.receiver.get_monitor_socket()
549 self.sender = self.context.socket(zmq.PUB)
557 self.sender = self.context.socket(zmq.PUB)
550 if self.realtime:
558 if self.realtime:
551 self.sender_web = self.context.socket(zmq.PUB)
559 self.sender_web = self.context.socket(zmq.PUB)
552 self.sender_web.connect(self.plot_address)
560 self.sender_web.connect(self.plot_address)
553 time.sleep(1)
561 time.sleep(1)
554
562
555 if 'server' in self.kwargs:
563 if 'server' in self.kwargs:
556 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
564 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
557 else:
565 else:
558 self.sender.bind("ipc:///tmp/zmq.plots")
566 self.sender.bind("ipc:///tmp/zmq.plots")
559
567
560 time.sleep(2)
568 time.sleep(2)
561
569
562 t = Thread(target=self.event_monitor, args=(monitor,))
570 t = Thread(target=self.event_monitor, args=(monitor,))
563 t.start()
571 t.start()
564
572
565 while True:
573 while True:
566 dataOut = self.receiver.recv_pyobj()
574 dataOut = self.receiver.recv_pyobj()
567 tm = dataOut.utctime
575 if not dataOut.flagNoData:
568 if dataOut.useLocalTime:
576 if dataOut.type == 'Parameters':
569 if not self.localtime:
577 tm = dataOut.utctimeInit
570 tm += time.timezone
578 else:
571 dt = datetime.datetime.fromtimestamp(tm).date()
579 tm = dataOut.utctime
572 else:
580 if dataOut.useLocalTime:
573 if self.localtime:
581 if not self.localtime:
574 tm -= time.timezone
582 tm += time.timezone
575 dt = datetime.datetime.utcfromtimestamp(tm).date()
583 dt = datetime.datetime.fromtimestamp(tm).date()
576 sended = False
584 else:
577 if dt not in self.dates:
585 if self.localtime:
578 if self.data:
586 tm -= time.timezone
579 self.data.ended = True
587 dt = datetime.datetime.utcfromtimestamp(tm).date()
580 self.send(self.data)
588 coerce = False
581 sended = True
589 if dt not in self.dates:
582 self.data.setup()
590 if self.data:
583 self.dates.append(dt)
591 self.data.ended = True
584
592 self.send(self.data)
585 self.data.update(dataOut)
593 coerce = True
594 self.data.setup()
595 self.dates.append(dt)
586
596
597 self.data.update(dataOut)
598
587 if dataOut.finished is True:
599 if dataOut.finished is True:
588 self.connections -= 1
600 self.connections -= 1
589 if self.connections == 0 and dt in self.dates:
601 if self.connections == 0 and dt in self.dates:
590 self.data.ended = True
602 self.data.ended = True
591 self.send(self.data)
603 self.send(self.data)
592 self.data.setup()
604 self.data.setup()
593 else:
605 else:
594 if self.realtime:
606 if self.realtime:
595 self.send(self.data)
607 self.send(self.data)
596 # self.sender_web.send_string(self.data.jsonify())
608 # self.sender_web.send_string(self.data.jsonify())
597 else:
609 else:
598 if not sended:
610 self.sendData(self.send, self.data, coerce=coerce)
599 self.sendData(self.send, self.data)
611 coerce = False
600
612
601 return
613 return
602
614
603 def sendToWeb(self):
615 def sendToWeb(self):
604
616
605 if not self.isWebConfig:
617 if not self.isWebConfig:
606 context = zmq.Context()
618 context = zmq.Context()
607 sender_web_config = context.socket(zmq.PUB)
619 sender_web_config = context.socket(zmq.PUB)
608 if 'tcp://' in self.plot_address:
620 if 'tcp://' in self.plot_address:
609 dum, address, port = self.plot_address.split(':')
621 dum, address, port = self.plot_address.split(':')
610 conf_address = '{}:{}:{}'.format(dum, address, int(port)+1)
622 conf_address = '{}:{}:{}'.format(dum, address, int(port)+1)
611 else:
623 else:
612 conf_address = self.plot_address + '.config'
624 conf_address = self.plot_address + '.config'
613 sender_web_config.bind(conf_address)
625 sender_web_config.bind(conf_address)
614 time.sleep(1)
626 time.sleep(1)
615 for kwargs in self.operationKwargs.values():
627 for kwargs in self.operationKwargs.values():
616 if 'plot' in kwargs:
628 if 'plot' in kwargs:
617 log.success('[Sending] Config data to web for {}'.format(kwargs['code'].upper()))
629 log.success('[Sending] Config data to web for {}'.format(kwargs['code'].upper()))
618 sender_web_config.send_string(json.dumps(kwargs))
630 sender_web_config.send_string(json.dumps(kwargs))
619 self.isWebConfig = True
631 self.isWebConfig = True
General Comments 0
You need to be logged in to leave comments. Login now