##// END OF EJS Templates
New Operation SendToFTP
jespinoza -
r1135:eed97f9f9bf5
parent child
Show More
@@ -1,631 +1,829
1 '''
1 '''
2 @author: Juan C. Espinoza
2 @author: Juan C. Espinoza
3 '''
3 '''
4
4
5 import os
6 import glob
5 import time
7 import time
6 import json
8 import json
7 import numpy
9 import numpy
8 import paho.mqtt.client as mqtt
10 import paho.mqtt.client as mqtt
9 import zmq
11 import zmq
10 import datetime
12 import datetime
13 import ftplib
11 from zmq.utils.monitor import recv_monitor_message
14 from zmq.utils.monitor import recv_monitor_message
12 from functools import wraps
15 from functools import wraps
13 from threading import Thread
16 from threading import Thread
14 from multiprocessing import Process
17 from multiprocessing import Process
15
18
16 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
19 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 from schainpy.model.data.jrodata import JROData
20 from schainpy.model.data.jrodata import JROData
18 from schainpy.utils import log
21 from schainpy.utils import log
19
22
20 MAXNUMX = 100
23 MAXNUMX = 500
21 MAXNUMY = 100
24 MAXNUMY = 500
25
26 PLOT_CODES = {
27 'rti': 0, #Range time intensity (RTI).
28 'spc': 1, #Spectra (and Cross-spectra) information.
29 'cspc': 2, #Cross-Correlation information.
30 'coh': 3, #Coherence map.
31 'base': 4, #Base lines graphic.
32 'row': 5, #Row Spectra.
33 'total' : 6, #Total Power.
34 'drift' : 7, #Drifts graphics.
35 'height' : 8, #Height profile.
36 'phase' : 9, #Signal Phase.
37 'power' : 16,
38 'noise' : 17,
39 'beacon' : 18,
40 #USED IN jroplot_parameters.py
41 'wind' : 22,
42 'skymap' : 23,
43 # 'MPHASE_CODE' : 24,
44 'moments' : 25,
45 'param' : 26,
46 'spc_fit' : 27,
47 'ew_drifts' : 28,
48 'reflectivity': 30
49 }
22
50
23 class PrettyFloat(float):
51 class PrettyFloat(float):
24 def __repr__(self):
52 def __repr__(self):
25 return '%.2f' % self
53 return '%.2f' % self
26
54
27 def roundFloats(obj):
55 def roundFloats(obj):
28 if isinstance(obj, list):
56 if isinstance(obj, list):
29 return map(roundFloats, obj)
57 return map(roundFloats, obj)
30 elif isinstance(obj, float):
58 elif isinstance(obj, float):
31 return round(obj, 2)
59 return round(obj, 2)
32
60
33 def decimate(z, MAXNUMY):
61 def decimate(z, MAXNUMY):
34 dy = int(len(z[0])/MAXNUMY) + 1
62 dy = int(len(z[0])/MAXNUMY) + 1
35
63
36 return z[::, ::dy]
64 return z[::, ::dy]
37
65
38 class throttle(object):
66 class throttle(object):
39 '''
67 '''
40 Decorator that prevents a function from being called more than once every
68 Decorator that prevents a function from being called more than once every
41 time period.
69 time period.
42 To create a function that cannot be called more than once a minute, but
70 To create a function that cannot be called more than once a minute, but
43 will sleep until it can be called:
71 will sleep until it can be called:
44 @throttle(minutes=1)
72 @throttle(minutes=1)
45 def foo():
73 def foo():
46 pass
74 pass
47
75
48 for i in range(10):
76 for i in range(10):
49 foo()
77 foo()
50 print "This function has run %s times." % i
78 print "This function has run %s times." % i
51 '''
79 '''
52
80
53 def __init__(self, seconds=0, minutes=0, hours=0):
81 def __init__(self, seconds=0, minutes=0, hours=0):
54 self.throttle_period = datetime.timedelta(
82 self.throttle_period = datetime.timedelta(
55 seconds=seconds, minutes=minutes, hours=hours
83 seconds=seconds, minutes=minutes, hours=hours
56 )
84 )
57
85
58 self.time_of_last_call = datetime.datetime.min
86 self.time_of_last_call = datetime.datetime.min
59
87
60 def __call__(self, fn):
88 def __call__(self, fn):
61 @wraps(fn)
89 @wraps(fn)
62 def wrapper(*args, **kwargs):
90 def wrapper(*args, **kwargs):
63 coerce = kwargs.pop('coerce', None)
91 coerce = kwargs.pop('coerce', None)
64 if coerce:
92 if coerce:
65 self.time_of_last_call = datetime.datetime.now()
93 self.time_of_last_call = datetime.datetime.now()
66 return fn(*args, **kwargs)
94 return fn(*args, **kwargs)
67 else:
95 else:
68 now = datetime.datetime.now()
96 now = datetime.datetime.now()
69 time_since_last_call = now - self.time_of_last_call
97 time_since_last_call = now - self.time_of_last_call
70 time_left = self.throttle_period - time_since_last_call
98 time_left = self.throttle_period - time_since_last_call
71
99
72 if time_left > datetime.timedelta(seconds=0):
100 if time_left > datetime.timedelta(seconds=0):
73 return
101 return
74
102
75 self.time_of_last_call = datetime.datetime.now()
103 self.time_of_last_call = datetime.datetime.now()
76 return fn(*args, **kwargs)
104 return fn(*args, **kwargs)
77
105
78 return wrapper
106 return wrapper
79
107
80 class Data(object):
108 class Data(object):
81 '''
109 '''
82 Object to hold data to be plotted
110 Object to hold data to be plotted
83 '''
111 '''
84
112
85 def __init__(self, plottypes, throttle_value, exp_code):
113 def __init__(self, plottypes, throttle_value, exp_code, buffering=True):
86 self.plottypes = plottypes
114 self.plottypes = plottypes
87 self.throttle = throttle_value
115 self.throttle = throttle_value
88 self.exp_code = exp_code
116 self.exp_code = exp_code
117 self.buffering = buffering
89 self.ended = False
118 self.ended = False
90 self.localtime = False
119 self.localtime = False
91 self.__times = []
120 self.__times = []
92 self.__heights = []
121 self.__heights = []
93
122
94 def __str__(self):
123 def __str__(self):
95 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
124 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
96 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
125 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
97
126
98 def __len__(self):
127 def __len__(self):
99 return len(self.__times)
128 return len(self.__times)
100
129
101 def __getitem__(self, key):
130 def __getitem__(self, key):
102 if key not in self.data:
131 if key not in self.data:
103 raise KeyError(log.error('Missing key: {}'.format(key)))
132 raise KeyError(log.error('Missing key: {}'.format(key)))
104
133
105 if 'spc' in key:
134 if 'spc' in key or not self.buffering:
106 ret = self.data[key]
135 ret = self.data[key]
107 else:
136 else:
108 ret = numpy.array([self.data[key][x] for x in self.times])
137 ret = numpy.array([self.data[key][x] for x in self.times])
109 if ret.ndim > 1:
138 if ret.ndim > 1:
110 ret = numpy.swapaxes(ret, 0, 1)
139 ret = numpy.swapaxes(ret, 0, 1)
111 return ret
140 return ret
112
141
113 def __contains__(self, key):
142 def __contains__(self, key):
114 return key in self.data
143 return key in self.data
115
144
116 def setup(self):
145 def setup(self):
117 '''
146 '''
118 Configure object
147 Configure object
119 '''
148 '''
120
149
150 self.type = ''
121 self.ended = False
151 self.ended = False
122 self.data = {}
152 self.data = {}
123 self.__times = []
153 self.__times = []
124 self.__heights = []
154 self.__heights = []
125 self.__all_heights = set()
155 self.__all_heights = set()
126 for plot in self.plottypes:
156 for plot in self.plottypes:
127 if 'snr' in plot:
157 if 'snr' in plot:
128 plot = 'snr'
158 plot = 'snr'
129 self.data[plot] = {}
159 self.data[plot] = {}
130
160
131 def shape(self, key):
161 def shape(self, key):
132 '''
162 '''
133 Get the shape of the one-element data for the given key
163 Get the shape of the one-element data for the given key
134 '''
164 '''
135
165
136 if len(self.data[key]):
166 if len(self.data[key]):
137 if 'spc' in key:
167 if 'spc' in key or not self.buffering:
138 return self.data[key].shape
168 return self.data[key].shape
139 return self.data[key][self.__times[0]].shape
169 return self.data[key][self.__times[0]].shape
140 return (0,)
170 return (0,)
141
171
142 def update(self, dataOut, tm):
172 def update(self, dataOut, tm):
143 '''
173 '''
144 Update data object with new dataOut
174 Update data object with new dataOut
145 '''
175 '''
146
176
147 if tm in self.__times:
177 if tm in self.__times:
148 return
178 return
149
179
180 self.type = dataOut.type
150 self.parameters = getattr(dataOut, 'parameters', [])
181 self.parameters = getattr(dataOut, 'parameters', [])
151 if hasattr(dataOut, 'pairsList'):
182 if hasattr(dataOut, 'pairsList'):
152 self.pairs = dataOut.pairsList
183 self.pairs = dataOut.pairsList
153 self.channels = dataOut.channelList
184 self.channels = dataOut.channelList
154 self.interval = dataOut.getTimeInterval()
185 self.interval = dataOut.getTimeInterval()
155 self.localtime = dataOut.useLocalTime
186 self.localtime = dataOut.useLocalTime
156 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
187 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
157 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
188 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
158 self.__heights.append(dataOut.heightList)
189 self.__heights.append(dataOut.heightList)
159 self.__all_heights.update(dataOut.heightList)
190 self.__all_heights.update(dataOut.heightList)
160 self.__times.append(tm)
191 self.__times.append(tm)
161
192
162 for plot in self.plottypes:
193 for plot in self.plottypes:
163 if plot == 'spc':
194 if plot == 'spc':
164 z = dataOut.data_spc/dataOut.normFactor
195 z = dataOut.data_spc/dataOut.normFactor
165 self.data[plot] = 10*numpy.log10(z)
196 self.data[plot] = 10*numpy.log10(z)
166 if plot == 'cspc':
197 if plot == 'cspc':
167 self.data[plot] = dataOut.data_cspc
198 self.data[plot] = dataOut.data_cspc
168 if plot == 'noise':
199 if plot == 'noise':
169 self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
200 buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
170 if plot == 'rti':
201 if plot == 'rti':
171 self.data[plot][tm] = dataOut.getPower()
202 buffer = dataOut.getPower()
172 if plot == 'snr_db':
203 if plot == 'snr_db':
173 self.data['snr'][tm] = dataOut.data_SNR
204 self.data['snr'][tm] = dataOut.data_SNR
174 if plot == 'snr':
205 if plot == 'snr':
175 self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR)
206 buffer = 10*numpy.log10(dataOut.data_SNR)
176 if plot == 'dop':
207 if plot == 'dop':
177 self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP)
208 buffer = 10*numpy.log10(dataOut.data_DOP)
178 if plot == 'mean':
209 if plot == 'mean':
179 self.data[plot][tm] = dataOut.data_MEAN
210 buffer = dataOut.data_MEAN
180 if plot == 'std':
211 if plot == 'std':
181 self.data[plot][tm] = dataOut.data_STD
212 buffer = dataOut.data_STD
182 if plot == 'coh':
213 if plot == 'coh':
183 self.data[plot][tm] = dataOut.getCoherence()
214 buffer = dataOut.getCoherence()
184 if plot == 'phase':
215 if plot == 'phase':
185 self.data[plot][tm] = dataOut.getCoherence(phase=True)
216 buffer = dataOut.getCoherence(phase=True)
186 if plot == 'output':
217 if plot == 'output':
187 self.data[plot][tm] = dataOut.data_output
218 buffer = dataOut.data_output
188 if plot == 'param':
219 if plot == 'param':
189 self.data[plot][tm] = dataOut.data_param
220 buffer = dataOut.data_param
221
222 if self.buffering:
223 self.data[plot][tm] = buffer
224 else:
225 self.data[plot] = buffer
190
226
191 def normalize_heights(self):
227 def normalize_heights(self):
192 '''
228 '''
193 Ensure same-dimension of the data for different heighList
229 Ensure same-dimension of the data for different heighList
194 '''
230 '''
195
231
196 H = numpy.array(list(self.__all_heights))
232 H = numpy.array(list(self.__all_heights))
197 H.sort()
233 H.sort()
198 for key in self.data:
234 for key in self.data:
199 shape = self.shape(key)[:-1] + H.shape
235 shape = self.shape(key)[:-1] + H.shape
200 for tm, obj in self.data[key].items():
236 for tm, obj in self.data[key].items():
201 h = self.__heights[self.__times.index(tm)]
237 h = self.__heights[self.__times.index(tm)]
202 if H.size == h.size:
238 if H.size == h.size:
203 continue
239 continue
204 index = numpy.where(numpy.in1d(H, h))[0]
240 index = numpy.where(numpy.in1d(H, h))[0]
205 dummy = numpy.zeros(shape) + numpy.nan
241 dummy = numpy.zeros(shape) + numpy.nan
206 if len(shape) == 2:
242 if len(shape) == 2:
207 dummy[:, index] = obj
243 dummy[:, index] = obj
208 else:
244 else:
209 dummy[index] = obj
245 dummy[index] = obj
210 self.data[key][tm] = dummy
246 self.data[key][tm] = dummy
211
247
212 self.__heights = [H for tm in self.__times]
248 self.__heights = [H for tm in self.__times]
213
249
214 def jsonify(self, decimate=False):
250 def jsonify(self, decimate=False):
215 '''
251 '''
216 Convert data to json
252 Convert data to json
217 '''
253 '''
218
254
219 data = {}
255 data = {}
220 tm = self.times[-1]
256 tm = self.times[-1]
221
257
222 for key in self.data:
258 for key in self.data:
223 if key in ('spc', 'cspc'):
259 if key in ('spc', 'cspc') or not self.buffering:
224 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
260 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
225 dy = int(self.data[key].shape[2]/MAXNUMY) + 1
261 dy = int(self.data[key].shape[2]/MAXNUMY) + 1
226 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
262 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
227 else:
263 else:
228 data[key] = roundFloats(self.data[key][tm].tolist())
264 data[key] = roundFloats(self.data[key][tm].tolist())
229
265
230 ret = {'data': data}
266 ret = {'data': data}
231 ret['exp_code'] = self.exp_code
267 ret['exp_code'] = self.exp_code
232 ret['time'] = tm
268 ret['time'] = tm
233 ret['interval'] = self.interval
269 ret['interval'] = self.interval
234 ret['localtime'] = self.localtime
270 ret['localtime'] = self.localtime
235 ret['yrange'] = roundFloats(self.heights.tolist())
271 ret['yrange'] = roundFloats(self.heights.tolist())
236 if key in ('spc', 'cspc'):
272 if key in ('spc', 'cspc'):
237 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
273 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
274 else:
275 ret['xrange'] = []
238 if hasattr(self, 'pairs'):
276 if hasattr(self, 'pairs'):
239 ret['pairs'] = self.pairs
277 ret['pairs'] = self.pairs
240 return json.dumps(ret)
278 return json.dumps(ret)
241
279
242 @property
280 @property
243 def times(self):
281 def times(self):
244 '''
282 '''
245 Return the list of times of the current data
283 Return the list of times of the current data
246 '''
284 '''
247
285
248 ret = numpy.array(self.__times)
286 ret = numpy.array(self.__times)
249 ret.sort()
287 ret.sort()
250 return ret
288 return ret
251
289
252 @property
290 @property
253 def heights(self):
291 def heights(self):
254 '''
292 '''
255 Return the list of heights of the current data
293 Return the list of heights of the current data
256 '''
294 '''
257
295
258 return numpy.array(self.__heights[-1])
296 return numpy.array(self.__heights[-1])
259
297
260 class PublishData(Operation):
298 class PublishData(Operation):
261 '''
299 '''
262 Operation to send data over zmq.
300 Operation to send data over zmq.
263 '''
301 '''
264
302
265 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
303 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
266
304
267 def __init__(self, **kwargs):
305 def __init__(self, **kwargs):
268 """Inicio."""
306 """Inicio."""
269 Operation.__init__(self, **kwargs)
307 Operation.__init__(self, **kwargs)
270 self.isConfig = False
308 self.isConfig = False
271 self.client = None
309 self.client = None
272 self.zeromq = None
310 self.zeromq = None
273 self.mqtt = None
311 self.mqtt = None
274
312
275 def on_disconnect(self, client, userdata, rc):
313 def on_disconnect(self, client, userdata, rc):
276 if rc != 0:
314 if rc != 0:
277 log.warning('Unexpected disconnection.')
315 log.warning('Unexpected disconnection.')
278 self.connect()
316 self.connect()
279
317
280 def connect(self):
318 def connect(self):
281 log.warning('trying to connect')
319 log.warning('trying to connect')
282 try:
320 try:
283 self.client.connect(
321 self.client.connect(
284 host=self.host,
322 host=self.host,
285 port=self.port,
323 port=self.port,
286 keepalive=60*10,
324 keepalive=60*10,
287 bind_address='')
325 bind_address='')
288 self.client.loop_start()
326 self.client.loop_start()
289 # self.client.publish(
327 # self.client.publish(
290 # self.topic + 'SETUP',
328 # self.topic + 'SETUP',
291 # json.dumps(setup),
329 # json.dumps(setup),
292 # retain=True
330 # retain=True
293 # )
331 # )
294 except:
332 except:
295 log.error('MQTT Conection error.')
333 log.error('MQTT Conection error.')
296 self.client = False
334 self.client = False
297
335
298 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
336 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
299 self.counter = 0
337 self.counter = 0
300 self.topic = kwargs.get('topic', 'schain')
338 self.topic = kwargs.get('topic', 'schain')
301 self.delay = kwargs.get('delay', 0)
339 self.delay = kwargs.get('delay', 0)
302 self.plottype = kwargs.get('plottype', 'spectra')
340 self.plottype = kwargs.get('plottype', 'spectra')
303 self.host = kwargs.get('host', "10.10.10.82")
341 self.host = kwargs.get('host', "10.10.10.82")
304 self.port = kwargs.get('port', 3000)
342 self.port = kwargs.get('port', 3000)
305 self.clientId = clientId
343 self.clientId = clientId
306 self.cnt = 0
344 self.cnt = 0
307 self.zeromq = zeromq
345 self.zeromq = zeromq
308 self.mqtt = kwargs.get('plottype', 0)
346 self.mqtt = kwargs.get('plottype', 0)
309 self.client = None
347 self.client = None
310 self.verbose = verbose
348 self.verbose = verbose
311 setup = []
349 setup = []
312 if mqtt is 1:
350 if mqtt is 1:
313 self.client = mqtt.Client(
351 self.client = mqtt.Client(
314 client_id=self.clientId + self.topic + 'SCHAIN',
352 client_id=self.clientId + self.topic + 'SCHAIN',
315 clean_session=True)
353 clean_session=True)
316 self.client.on_disconnect = self.on_disconnect
354 self.client.on_disconnect = self.on_disconnect
317 self.connect()
355 self.connect()
318 for plot in self.plottype:
356 for plot in self.plottype:
319 setup.append({
357 setup.append({
320 'plot': plot,
358 'plot': plot,
321 'topic': self.topic + plot,
359 'topic': self.topic + plot,
322 'title': getattr(self, plot + '_' + 'title', False),
360 'title': getattr(self, plot + '_' + 'title', False),
323 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
361 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
324 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
362 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
325 'xrange': getattr(self, plot + '_' + 'xrange', False),
363 'xrange': getattr(self, plot + '_' + 'xrange', False),
326 'yrange': getattr(self, plot + '_' + 'yrange', False),
364 'yrange': getattr(self, plot + '_' + 'yrange', False),
327 'zrange': getattr(self, plot + '_' + 'zrange', False),
365 'zrange': getattr(self, plot + '_' + 'zrange', False),
328 })
366 })
329 if zeromq is 1:
367 if zeromq is 1:
330 context = zmq.Context()
368 context = zmq.Context()
331 self.zmq_socket = context.socket(zmq.PUSH)
369 self.zmq_socket = context.socket(zmq.PUSH)
332 server = kwargs.get('server', 'zmq.pipe')
370 server = kwargs.get('server', 'zmq.pipe')
333
371
334 if 'tcp://' in server:
372 if 'tcp://' in server:
335 address = server
373 address = server
336 else:
374 else:
337 address = 'ipc:///tmp/%s' % server
375 address = 'ipc:///tmp/%s' % server
338
376
339 self.zmq_socket.connect(address)
377 self.zmq_socket.connect(address)
340 time.sleep(1)
378 time.sleep(1)
341
379
342
380
343 def publish_data(self):
381 def publish_data(self):
344 self.dataOut.finished = False
382 self.dataOut.finished = False
345 if self.mqtt is 1:
383 if self.mqtt is 1:
346 yData = self.dataOut.heightList[:2].tolist()
384 yData = self.dataOut.heightList[:2].tolist()
347 if self.plottype == 'spectra':
385 if self.plottype == 'spectra':
348 data = getattr(self.dataOut, 'data_spc')
386 data = getattr(self.dataOut, 'data_spc')
349 z = data/self.dataOut.normFactor
387 z = data/self.dataOut.normFactor
350 zdB = 10*numpy.log10(z)
388 zdB = 10*numpy.log10(z)
351 xlen, ylen = zdB[0].shape
389 xlen, ylen = zdB[0].shape
352 dx = int(xlen/MAXNUMX) + 1
390 dx = int(xlen/MAXNUMX) + 1
353 dy = int(ylen/MAXNUMY) + 1
391 dy = int(ylen/MAXNUMY) + 1
354 Z = [0 for i in self.dataOut.channelList]
392 Z = [0 for i in self.dataOut.channelList]
355 for i in self.dataOut.channelList:
393 for i in self.dataOut.channelList:
356 Z[i] = zdB[i][::dx, ::dy].tolist()
394 Z[i] = zdB[i][::dx, ::dy].tolist()
357 payload = {
395 payload = {
358 'timestamp': self.dataOut.utctime,
396 'timestamp': self.dataOut.utctime,
359 'data': roundFloats(Z),
397 'data': roundFloats(Z),
360 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
398 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
361 'interval': self.dataOut.getTimeInterval(),
399 'interval': self.dataOut.getTimeInterval(),
362 'type': self.plottype,
400 'type': self.plottype,
363 'yData': yData
401 'yData': yData
364 }
402 }
365
403
366 elif self.plottype in ('rti', 'power'):
404 elif self.plottype in ('rti', 'power'):
367 data = getattr(self.dataOut, 'data_spc')
405 data = getattr(self.dataOut, 'data_spc')
368 z = data/self.dataOut.normFactor
406 z = data/self.dataOut.normFactor
369 avg = numpy.average(z, axis=1)
407 avg = numpy.average(z, axis=1)
370 avgdB = 10*numpy.log10(avg)
408 avgdB = 10*numpy.log10(avg)
371 xlen, ylen = z[0].shape
409 xlen, ylen = z[0].shape
372 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
410 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
373 AVG = [0 for i in self.dataOut.channelList]
411 AVG = [0 for i in self.dataOut.channelList]
374 for i in self.dataOut.channelList:
412 for i in self.dataOut.channelList:
375 AVG[i] = avgdB[i][::dy].tolist()
413 AVG[i] = avgdB[i][::dy].tolist()
376 payload = {
414 payload = {
377 'timestamp': self.dataOut.utctime,
415 'timestamp': self.dataOut.utctime,
378 'data': roundFloats(AVG),
416 'data': roundFloats(AVG),
379 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
417 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
380 'interval': self.dataOut.getTimeInterval(),
418 'interval': self.dataOut.getTimeInterval(),
381 'type': self.plottype,
419 'type': self.plottype,
382 'yData': yData
420 'yData': yData
383 }
421 }
384 elif self.plottype == 'noise':
422 elif self.plottype == 'noise':
385 noise = self.dataOut.getNoise()/self.dataOut.normFactor
423 noise = self.dataOut.getNoise()/self.dataOut.normFactor
386 noisedB = 10*numpy.log10(noise)
424 noisedB = 10*numpy.log10(noise)
387 payload = {
425 payload = {
388 'timestamp': self.dataOut.utctime,
426 'timestamp': self.dataOut.utctime,
389 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
427 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
390 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
428 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
391 'interval': self.dataOut.getTimeInterval(),
429 'interval': self.dataOut.getTimeInterval(),
392 'type': self.plottype,
430 'type': self.plottype,
393 'yData': yData
431 'yData': yData
394 }
432 }
395 elif self.plottype == 'snr':
433 elif self.plottype == 'snr':
396 data = getattr(self.dataOut, 'data_SNR')
434 data = getattr(self.dataOut, 'data_SNR')
397 avgdB = 10*numpy.log10(data)
435 avgdB = 10*numpy.log10(data)
398
436
399 ylen = data[0].size
437 ylen = data[0].size
400 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
438 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
401 AVG = [0 for i in self.dataOut.channelList]
439 AVG = [0 for i in self.dataOut.channelList]
402 for i in self.dataOut.channelList:
440 for i in self.dataOut.channelList:
403 AVG[i] = avgdB[i][::dy].tolist()
441 AVG[i] = avgdB[i][::dy].tolist()
404 payload = {
442 payload = {
405 'timestamp': self.dataOut.utctime,
443 'timestamp': self.dataOut.utctime,
406 'data': roundFloats(AVG),
444 'data': roundFloats(AVG),
407 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
445 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
408 'type': self.plottype,
446 'type': self.plottype,
409 'yData': yData
447 'yData': yData
410 }
448 }
411 else:
449 else:
412 print "Tipo de grafico invalido"
450 print "Tipo de grafico invalido"
413 payload = {
451 payload = {
414 'data': 'None',
452 'data': 'None',
415 'timestamp': 'None',
453 'timestamp': 'None',
416 'type': None
454 'type': None
417 }
455 }
418
456
419 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
457 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
420
458
421 if self.zeromq is 1:
459 if self.zeromq is 1:
422 if self.verbose:
460 if self.verbose:
423 log.log(
461 log.log(
424 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
462 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
425 self.name
463 self.name
426 )
464 )
427 self.zmq_socket.send_pyobj(self.dataOut)
465 self.zmq_socket.send_pyobj(self.dataOut)
428
466
429 def run(self, dataOut, **kwargs):
467 def run(self, dataOut, **kwargs):
430 self.dataOut = dataOut
468 self.dataOut = dataOut
431 if not self.isConfig:
469 if not self.isConfig:
432 self.setup(**kwargs)
470 self.setup(**kwargs)
433 self.isConfig = True
471 self.isConfig = True
434
472
435 self.publish_data()
473 self.publish_data()
436 time.sleep(self.delay)
474 time.sleep(self.delay)
437
475
438 def close(self):
476 def close(self):
439 if self.zeromq is 1:
477 if self.zeromq is 1:
440 self.dataOut.finished = True
478 self.dataOut.finished = True
441 self.zmq_socket.send_pyobj(self.dataOut)
479 self.zmq_socket.send_pyobj(self.dataOut)
442 time.sleep(0.1)
480 time.sleep(0.1)
443 self.zmq_socket.close()
481 self.zmq_socket.close()
444 if self.client:
482 if self.client:
445 self.client.loop_stop()
483 self.client.loop_stop()
446 self.client.disconnect()
484 self.client.disconnect()
447
485
448
486
449 class ReceiverData(ProcessingUnit):
487 class ReceiverData(ProcessingUnit):
450
488
451 __attrs__ = ['server']
489 __attrs__ = ['server']
452
490
453 def __init__(self, **kwargs):
491 def __init__(self, **kwargs):
454
492
455 ProcessingUnit.__init__(self, **kwargs)
493 ProcessingUnit.__init__(self, **kwargs)
456
494
457 self.isConfig = False
495 self.isConfig = False
458 server = kwargs.get('server', 'zmq.pipe')
496 server = kwargs.get('server', 'zmq.pipe')
459 if 'tcp://' in server:
497 if 'tcp://' in server:
460 address = server
498 address = server
461 else:
499 else:
462 address = 'ipc:///tmp/%s' % server
500 address = 'ipc:///tmp/%s' % server
463
501
464 self.address = address
502 self.address = address
465 self.dataOut = JROData()
503 self.dataOut = JROData()
466
504
467 def setup(self):
505 def setup(self):
468
506
469 self.context = zmq.Context()
507 self.context = zmq.Context()
470 self.receiver = self.context.socket(zmq.PULL)
508 self.receiver = self.context.socket(zmq.PULL)
471 self.receiver.bind(self.address)
509 self.receiver.bind(self.address)
472 time.sleep(0.5)
510 time.sleep(0.5)
473 log.success('ReceiverData from {}'.format(self.address))
511 log.success('ReceiverData from {}'.format(self.address))
474
512
475
513
476 def run(self):
514 def run(self):
477
515
478 if not self.isConfig:
516 if not self.isConfig:
479 self.setup()
517 self.setup()
480 self.isConfig = True
518 self.isConfig = True
481
519
482 self.dataOut = self.receiver.recv_pyobj()
520 self.dataOut = self.receiver.recv_pyobj()
483 log.log('{} - {}'.format(self.dataOut.type,
521 log.log('{} - {}'.format(self.dataOut.type,
484 self.dataOut.datatime.ctime(),),
522 self.dataOut.datatime.ctime(),),
485 'Receiving')
523 'Receiving')
486
524
487
525
488 class PlotterReceiver(ProcessingUnit, Process):
526 class PlotterReceiver(ProcessingUnit, Process):
489
527
490 throttle_value = 5
528 throttle_value = 5
491 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
529 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
492 'exp_code', 'web_server']
530 'exp_code', 'web_server', 'buffering']
493
531
494 def __init__(self, **kwargs):
532 def __init__(self, **kwargs):
495
533
496 ProcessingUnit.__init__(self, **kwargs)
534 ProcessingUnit.__init__(self, **kwargs)
497 Process.__init__(self)
535 Process.__init__(self)
498 self.mp = False
536 self.mp = False
499 self.isConfig = False
537 self.isConfig = False
500 self.isWebConfig = False
538 self.isWebConfig = False
501 self.connections = 0
539 self.connections = 0
502 server = kwargs.get('server', 'zmq.pipe')
540 server = kwargs.get('server', 'zmq.pipe')
503 web_server = kwargs.get('web_server', None)
541 web_server = kwargs.get('web_server', None)
504 if 'tcp://' in server:
542 if 'tcp://' in server:
505 address = server
543 address = server
506 else:
544 else:
507 address = 'ipc:///tmp/%s' % server
545 address = 'ipc:///tmp/%s' % server
508 self.address = address
546 self.address = address
509 self.web_address = web_server
547 self.web_address = web_server
510 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
548 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
511 self.realtime = kwargs.get('realtime', False)
549 self.realtime = kwargs.get('realtime', False)
512 self.localtime = kwargs.get('localtime', True)
550 self.localtime = kwargs.get('localtime', True)
551 self.buffering = kwargs.get('buffering', True)
513 self.throttle_value = kwargs.get('throttle', 5)
552 self.throttle_value = kwargs.get('throttle', 5)
514 self.exp_code = kwargs.get('exp_code', None)
553 self.exp_code = kwargs.get('exp_code', None)
515 self.sendData = self.initThrottle(self.throttle_value)
554 self.sendData = self.initThrottle(self.throttle_value)
516 self.dates = []
555 self.dates = []
517 self.setup()
556 self.setup()
518
557
519 def setup(self):
558 def setup(self):
520
559
521 self.data = Data(self.plottypes, self.throttle_value, self.exp_code)
560 self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering)
522 self.isConfig = True
561 self.isConfig = True
523
562
524 def event_monitor(self, monitor):
563 def event_monitor(self, monitor):
525
564
526 events = {}
565 events = {}
527
566
528 for name in dir(zmq):
567 for name in dir(zmq):
529 if name.startswith('EVENT_'):
568 if name.startswith('EVENT_'):
530 value = getattr(zmq, name)
569 value = getattr(zmq, name)
531 events[value] = name
570 events[value] = name
532
571
533 while monitor.poll():
572 while monitor.poll():
534 evt = recv_monitor_message(monitor)
573 evt = recv_monitor_message(monitor)
535 if evt['event'] == 32:
574 if evt['event'] == 32:
536 self.connections += 1
575 self.connections += 1
537 if evt['event'] == 512:
576 if evt['event'] == 512:
538 pass
577 pass
539
578
540 evt.update({'description': events[evt['event']]})
579 evt.update({'description': events[evt['event']]})
541
580
542 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
581 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
543 break
582 break
544 monitor.close()
583 monitor.close()
545 print('event monitor thread done!')
584 print('event monitor thread done!')
546
585
547 def initThrottle(self, throttle_value):
586 def initThrottle(self, throttle_value):
548
587
549 @throttle(seconds=throttle_value)
588 @throttle(seconds=throttle_value)
550 def sendDataThrottled(fn_sender, data):
589 def sendDataThrottled(fn_sender, data):
551 fn_sender(data)
590 fn_sender(data)
552
591
553 return sendDataThrottled
592 return sendDataThrottled
554
593
555 def send(self, data):
594 def send(self, data):
556 log.success('Sending {}'.format(data), self.name)
595 log.log('Sending {}'.format(data), self.name)
557 self.sender.send_pyobj(data)
596 self.sender.send_pyobj(data)
558
597
559 def run(self):
598 def run(self):
560
599
561 log.success(
600 log.log(
562 'Starting from {}'.format(self.address),
601 'Starting from {}'.format(self.address),
563 self.name
602 self.name
564 )
603 )
565
604
566 self.context = zmq.Context()
605 self.context = zmq.Context()
567 self.receiver = self.context.socket(zmq.PULL)
606 self.receiver = self.context.socket(zmq.PULL)
568 self.receiver.bind(self.address)
607 self.receiver.bind(self.address)
569 monitor = self.receiver.get_monitor_socket()
608 monitor = self.receiver.get_monitor_socket()
570 self.sender = self.context.socket(zmq.PUB)
609 self.sender = self.context.socket(zmq.PUB)
571 if self.web_address:
610 if self.web_address:
572 log.success(
611 log.success(
573 'Sending to web: {}'.format(self.web_address),
612 'Sending to web: {}'.format(self.web_address),
574 self.name
613 self.name
575 )
614 )
576 self.sender_web = self.context.socket(zmq.PUB)
615 self.sender_web = self.context.socket(zmq.PUSH)
577 self.sender_web.connect(self.web_address)
616 self.sender_web.connect(self.web_address)
578 time.sleep(1)
617 time.sleep(1)
579
618
580 if 'server' in self.kwargs:
619 if 'server' in self.kwargs:
581 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
620 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
582 else:
621 else:
583 self.sender.bind("ipc:///tmp/zmq.plots")
622 self.sender.bind("ipc:///tmp/zmq.plots")
584
623
585 time.sleep(2)
624 time.sleep(2)
586
625
587 t = Thread(target=self.event_monitor, args=(monitor,))
626 t = Thread(target=self.event_monitor, args=(monitor,))
588 t.start()
627 t.start()
589
628
590 while True:
629 while True:
591 dataOut = self.receiver.recv_pyobj()
630 dataOut = self.receiver.recv_pyobj()
592 if not dataOut.flagNoData:
631 if not dataOut.flagNoData:
593 if dataOut.type == 'Parameters':
632 if dataOut.type == 'Parameters':
594 tm = dataOut.utctimeInit
633 tm = dataOut.utctimeInit
595 else:
634 else:
596 tm = dataOut.utctime
635 tm = dataOut.utctime
597 if dataOut.useLocalTime:
636 if dataOut.useLocalTime:
598 if not self.localtime:
637 if not self.localtime:
599 tm += time.timezone
638 tm += time.timezone
600 dt = datetime.datetime.fromtimestamp(tm).date()
639 dt = datetime.datetime.fromtimestamp(tm).date()
601 else:
640 else:
602 if self.localtime:
641 if self.localtime:
603 tm -= time.timezone
642 tm -= time.timezone
604 dt = datetime.datetime.utcfromtimestamp(tm).date()
643 dt = datetime.datetime.utcfromtimestamp(tm).date()
605 coerce = False
644 coerce = False
606 if dt not in self.dates:
645 if dt not in self.dates:
607 if self.data:
646 if self.data:
608 self.data.ended = True
647 self.data.ended = True
609 self.send(self.data)
648 self.send(self.data)
610 coerce = True
649 coerce = True
611 self.data.setup()
650 self.data.setup()
612 self.dates.append(dt)
651 self.dates.append(dt)
613
652
614 self.data.update(dataOut, tm)
653 self.data.update(dataOut, tm)
615
654
616 if dataOut.finished is True:
655 if dataOut.finished is True:
617 self.connections -= 1
656 self.connections -= 1
618 if self.connections == 0 and dt in self.dates:
657 if self.connections == 0 and dt in self.dates:
619 self.data.ended = True
658 self.data.ended = True
620 self.send(self.data)
659 self.send(self.data)
621 self.data.setup()
660 self.data.setup()
622 else:
661 else:
623 if self.realtime:
662 if self.realtime:
624 self.send(self.data)
663 self.send(self.data)
625 if self.web_address:
664 if self.web_address:
626 self.sender_web.send(self.data.jsonify())
665 payload = self.data.jsonify()
666 log.log('Sending to web... type:{}, size:{}'.format(dataOut.type, len(payload)), self.name)
667 self.sender_web.send(payload)
627 else:
668 else:
628 self.sendData(self.send, self.data, coerce=coerce)
669 self.sendData(self.send, self.data, coerce=coerce)
629 coerce = False
670 coerce = False
630
671
631 return
672 return
673
674
675 class SendToFTP(Operation, Process):
676
677 '''
678 Operation to send data over FTP.
679 '''
680
681 __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout']
682
683 def __init__(self, **kwargs):
684 '''
685 patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...]
686 '''
687 Operation.__init__(self, **kwargs)
688 Process.__init__(self)
689 self.server = kwargs.get('server')
690 self.username = kwargs.get('username')
691 self.password = kwargs.get('password')
692 self.patterns = kwargs.get('patterns')
693 self.timeout = kwargs.get('timeout', 10)
694 self.times = [time.time() for p in self.patterns]
695 self.latest = ['' for p in self.patterns]
696 self.mp = False
697 self.ftp = None
698
699 def setup(self):
700
701 log.log('Connecting to ftp://{}'.format(self.server), self.name)
702 try:
703 self.ftp = ftplib.FTP(self.server, timeout=self.timeout)
704 except ftplib.all_errors:
705 log.error('Server connection fail: {}'.format(self.server), self.name)
706 if self.ftp is not None:
707 self.ftp.close()
708 self.ftp = None
709 self.isConfig = False
710 return
711
712 try:
713 self.ftp.login(self.username, self.password)
714 except ftplib.all_errors:
715 log.error('The given username y/o password are incorrect', self.name)
716 if self.ftp is not None:
717 self.ftp.close()
718 self.ftp = None
719 self.isConfig = False
720 return
721
722 log.success('Connection success', self.name)
723 self.isConfig = True
724 return
725
726 def check(self):
727
728 try:
729 self.ftp.voidcmd("NOOP")
730 except:
731 log.warning('Connection lost... trying to reconnect', self.name)
732 if self.ftp is not None:
733 self.ftp.close()
734 self.ftp = None
735 self.setup()
736
737 def find_files(self, path, ext):
738
739 files = glob.glob1(path, '*{}'.format(ext))
740 files.sort()
741 if files:
742 return files[-1]
743 return None
744
745 def getftpname(self, filename, exp_code, sub_exp_code):
746
747 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
748 YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year
749 DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday
750 exp_code = '%3.3d'%exp_code
751 sub_exp_code = '%2.2d'%sub_exp_code
752 plot_code = '%2.2d'% PLOT_CODES[filename.split('_')[0].split('-')[0]]
753 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
754 return name
755
756 def upload(self, src, dst):
757
758 log.log('Uploading {} '.format(src), self.name, nl=False)
759
760 fp = open(src, 'rb')
761 command = 'STOR {}'.format(dst)
762
763 try:
764 self.ftp.storbinary(command, fp, blocksize=1024)
765 except ftplib.all_errors, e:
766 log.error('{}'.format(e), self.name)
767 if self.ftp is not None:
768 self.ftp.close()
769 self.ftp = None
770 return
771
772 try:
773 self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
774 except ftplib.all_errors, e:
775 log.error('{}'.format(e), self.name)
776 if self.ftp is not None:
777 self.ftp.close()
778 self.ftp = None
779
780 fp.close()
781
782 log.success('OK', tag='')
783
784 def send_files(self):
785
786 for x, pattern in enumerate(self.patterns):
787 local, remote, ext, delay, exp_code, sub_exp_code = pattern
788 if time.time()-self.times[x] >= delay:
789 srcname = self.find_files(local, ext)
790
791 if srcname is None or srcname == self.latest[x]:
792 continue
793
794 if 'png' in ext:
795 dstname = self.getftpname(srcname, exp_code, sub_exp_code)
796 else:
797 dstname = srcname
798
799 src = os.path.join(local, srcname)
800
801 if os.path.getmtime(src) < time.time() - 30*60:
802 continue
803
804 dst = os.path.join(remote, dstname)
805
806 if self.ftp is None:
807 continue
808
809 self.upload(src, dst)
810
811 self.times[x] = time.time()
812 self.latest[x] = srcname
813
814 def run(self):
815
816 while True:
817 if not self.isConfig:
818 self.setup()
819 if self.ftp is not None:
820 self.check()
821 self.send_files()
822 time.sleep(2)
823
824 def close():
825
826 if self.ftp is not None:
827 if self.ftp is not None:
828 self.ftp.close()
829 self.terminate()
General Comments 0
You need to be logged in to leave comments. Login now