##// END OF EJS Templates
Update plot codes, add meta attribute to dataOut to send metadata to plots
jespinoza -
r1139:ade57c0ecace
parent child
Show More
@@ -1,829 +1,832
1 '''
1 '''
2 @author: Juan C. Espinoza
2 @author: Juan C. Espinoza
3 '''
3 '''
4
4
5 import os
5 import os
6 import glob
6 import glob
7 import time
7 import time
8 import json
8 import json
9 import numpy
9 import numpy
10 import paho.mqtt.client as mqtt
10 import paho.mqtt.client as mqtt
11 import zmq
11 import zmq
12 import datetime
12 import datetime
13 import ftplib
13 import ftplib
14 from zmq.utils.monitor import recv_monitor_message
14 from zmq.utils.monitor import recv_monitor_message
15 from functools import wraps
15 from functools import wraps
16 from threading import Thread
16 from threading import Thread
17 from multiprocessing import Process
17 from multiprocessing import Process
18
18
19 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
19 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
20 from schainpy.model.data.jrodata import JROData
20 from schainpy.model.data.jrodata import JROData
21 from schainpy.utils import log
21 from schainpy.utils import log
22
22
23 MAXNUMX = 500
23 MAXNUMX = 500
24 MAXNUMY = 500
24 MAXNUMY = 500
25
25
26 PLOT_CODES = {
26 PLOT_CODES = {
27 'rti': 0, #Range time intensity (RTI).
27 'rti': 0, # Range time intensity (RTI).
28 'spc': 1, #Spectra (and Cross-spectra) information.
28 'spc': 1, # Spectra (and Cross-spectra) information.
29 'cspc': 2, #Cross-Correlation information.
29 'cspc': 2, # Cross-Correlation information.
30 'coh': 3, #Coherence map.
30 'coh': 3, # Coherence map.
31 'base': 4, #Base lines graphic.
31 'base': 4, # Base lines graphic.
32 'row': 5, #Row Spectra.
32 'row': 5, # Row Spectra.
33 'total' : 6, #Total Power.
33 'total' : 6, # Total Power.
34 'drift' : 7, #Drifts graphics.
34 'drift' : 7, # Drifts graphics.
35 'height' : 8, #Height profile.
35 'height' : 8, # Height profile.
36 'phase' : 9, #Signal Phase.
36 'phase' : 9, # Signal Phase.
37 'power' : 16,
37 'power' : 16,
38 'noise' : 17,
38 'noise' : 17,
39 'beacon' : 18,
39 'beacon' : 18,
40 #USED IN jroplot_parameters.py
41 'wind' : 22,
40 'wind' : 22,
42 'skymap' : 23,
41 'skymap' : 23,
43 # 'MPHASE_CODE' : 24,
42 'V-E' : 25,
44 'V' : 25,
43 'Z-E' : 26,
45 'Z' : 26,
44 'V-A' : 27,
46 'spc_fit' : 27,
45 'Z-A' : 28,
47 'ew_drifts' : 28,
48 'reflectivity': 30
49 }
46 }
50
47
51 class PrettyFloat(float):
48 class PrettyFloat(float):
52 def __repr__(self):
49 def __repr__(self):
53 return '%.2f' % self
50 return '%.2f' % self
54
51
55 def roundFloats(obj):
52 def roundFloats(obj):
56 if isinstance(obj, list):
53 if isinstance(obj, list):
57 return map(roundFloats, obj)
54 return map(roundFloats, obj)
58 elif isinstance(obj, float):
55 elif isinstance(obj, float):
59 return round(obj, 2)
56 return round(obj, 2)
60
57
61 def decimate(z, MAXNUMY):
58 def decimate(z, MAXNUMY):
62 dy = int(len(z[0])/MAXNUMY) + 1
59 dy = int(len(z[0])/MAXNUMY) + 1
63
60
64 return z[::, ::dy]
61 return z[::, ::dy]
65
62
66 class throttle(object):
63 class throttle(object):
67 '''
64 '''
68 Decorator that prevents a function from being called more than once every
65 Decorator that prevents a function from being called more than once every
69 time period.
66 time period.
70 To create a function that cannot be called more than once a minute, but
67 To create a function that cannot be called more than once a minute, but
71 will sleep until it can be called:
68 will sleep until it can be called:
72 @throttle(minutes=1)
69 @throttle(minutes=1)
73 def foo():
70 def foo():
74 pass
71 pass
75
72
76 for i in range(10):
73 for i in range(10):
77 foo()
74 foo()
78 print "This function has run %s times." % i
75 print "This function has run %s times." % i
79 '''
76 '''
80
77
81 def __init__(self, seconds=0, minutes=0, hours=0):
78 def __init__(self, seconds=0, minutes=0, hours=0):
82 self.throttle_period = datetime.timedelta(
79 self.throttle_period = datetime.timedelta(
83 seconds=seconds, minutes=minutes, hours=hours
80 seconds=seconds, minutes=minutes, hours=hours
84 )
81 )
85
82
86 self.time_of_last_call = datetime.datetime.min
83 self.time_of_last_call = datetime.datetime.min
87
84
88 def __call__(self, fn):
85 def __call__(self, fn):
89 @wraps(fn)
86 @wraps(fn)
90 def wrapper(*args, **kwargs):
87 def wrapper(*args, **kwargs):
91 coerce = kwargs.pop('coerce', None)
88 coerce = kwargs.pop('coerce', None)
92 if coerce:
89 if coerce:
93 self.time_of_last_call = datetime.datetime.now()
90 self.time_of_last_call = datetime.datetime.now()
94 return fn(*args, **kwargs)
91 return fn(*args, **kwargs)
95 else:
92 else:
96 now = datetime.datetime.now()
93 now = datetime.datetime.now()
97 time_since_last_call = now - self.time_of_last_call
94 time_since_last_call = now - self.time_of_last_call
98 time_left = self.throttle_period - time_since_last_call
95 time_left = self.throttle_period - time_since_last_call
99
96
100 if time_left > datetime.timedelta(seconds=0):
97 if time_left > datetime.timedelta(seconds=0):
101 return
98 return
102
99
103 self.time_of_last_call = datetime.datetime.now()
100 self.time_of_last_call = datetime.datetime.now()
104 return fn(*args, **kwargs)
101 return fn(*args, **kwargs)
105
102
106 return wrapper
103 return wrapper
107
104
108 class Data(object):
105 class Data(object):
109 '''
106 '''
110 Object to hold data to be plotted
107 Object to hold data to be plotted
111 '''
108 '''
112
109
113 def __init__(self, plottypes, throttle_value, exp_code, buffering=True):
110 def __init__(self, plottypes, throttle_value, exp_code, buffering=True):
114 self.plottypes = plottypes
111 self.plottypes = plottypes
115 self.throttle = throttle_value
112 self.throttle = throttle_value
116 self.exp_code = exp_code
113 self.exp_code = exp_code
117 self.buffering = buffering
114 self.buffering = buffering
118 self.ended = False
115 self.ended = False
119 self.localtime = False
116 self.localtime = False
117 self.meta = {}
120 self.__times = []
118 self.__times = []
121 self.__heights = []
119 self.__heights = []
122
120
123 def __str__(self):
121 def __str__(self):
124 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
122 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
125 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
123 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
126
124
127 def __len__(self):
125 def __len__(self):
128 return len(self.__times)
126 return len(self.__times)
129
127
130 def __getitem__(self, key):
128 def __getitem__(self, key):
131 if key not in self.data:
129 if key not in self.data:
132 raise KeyError(log.error('Missing key: {}'.format(key)))
130 raise KeyError(log.error('Missing key: {}'.format(key)))
133
131
134 if 'spc' in key or not self.buffering:
132 if 'spc' in key or not self.buffering:
135 ret = self.data[key]
133 ret = self.data[key]
136 else:
134 else:
137 ret = numpy.array([self.data[key][x] for x in self.times])
135 ret = numpy.array([self.data[key][x] for x in self.times])
138 if ret.ndim > 1:
136 if ret.ndim > 1:
139 ret = numpy.swapaxes(ret, 0, 1)
137 ret = numpy.swapaxes(ret, 0, 1)
140 return ret
138 return ret
141
139
142 def __contains__(self, key):
140 def __contains__(self, key):
143 return key in self.data
141 return key in self.data
144
142
145 def setup(self):
143 def setup(self):
146 '''
144 '''
147 Configure object
145 Configure object
148 '''
146 '''
149
147
150 self.type = ''
148 self.type = ''
151 self.ended = False
149 self.ended = False
152 self.data = {}
150 self.data = {}
153 self.__times = []
151 self.__times = []
154 self.__heights = []
152 self.__heights = []
155 self.__all_heights = set()
153 self.__all_heights = set()
156 for plot in self.plottypes:
154 for plot in self.plottypes:
157 if 'snr' in plot:
155 if 'snr' in plot:
158 plot = 'snr'
156 plot = 'snr'
159 self.data[plot] = {}
157 self.data[plot] = {}
160
158
161 def shape(self, key):
159 def shape(self, key):
162 '''
160 '''
163 Get the shape of the one-element data for the given key
161 Get the shape of the one-element data for the given key
164 '''
162 '''
165
163
166 if len(self.data[key]):
164 if len(self.data[key]):
167 if 'spc' in key or not self.buffering:
165 if 'spc' in key or not self.buffering:
168 return self.data[key].shape
166 return self.data[key].shape
169 return self.data[key][self.__times[0]].shape
167 return self.data[key][self.__times[0]].shape
170 return (0,)
168 return (0,)
171
169
172 def update(self, dataOut, tm):
170 def update(self, dataOut, tm):
173 '''
171 '''
174 Update data object with new dataOut
172 Update data object with new dataOut
175 '''
173 '''
176
174
177 if tm in self.__times:
175 if tm in self.__times:
178 return
176 return
179
177
180 self.type = dataOut.type
178 self.type = dataOut.type
181 self.parameters = getattr(dataOut, 'parameters', [])
179 self.parameters = getattr(dataOut, 'parameters', [])
182 if hasattr(dataOut, 'pairsList'):
180 if hasattr(dataOut, 'pairsList'):
183 self.pairs = dataOut.pairsList
181 self.pairs = dataOut.pairsList
182 if hasattr(dataOut, 'meta'):
183 self.meta = dataOut.meta
184 self.channels = dataOut.channelList
184 self.channels = dataOut.channelList
185 self.interval = dataOut.getTimeInterval()
185 self.interval = dataOut.getTimeInterval()
186 self.localtime = dataOut.useLocalTime
186 self.localtime = dataOut.useLocalTime
187 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
187 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
188 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
188 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
189 self.__heights.append(dataOut.heightList)
189 self.__heights.append(dataOut.heightList)
190 self.__all_heights.update(dataOut.heightList)
190 self.__all_heights.update(dataOut.heightList)
191 self.__times.append(tm)
191 self.__times.append(tm)
192
192
193 for plot in self.plottypes:
193 for plot in self.plottypes:
194 if plot == 'spc':
194 if plot == 'spc':
195 z = dataOut.data_spc/dataOut.normFactor
195 z = dataOut.data_spc/dataOut.normFactor
196 self.data[plot] = 10*numpy.log10(z)
196 self.data[plot] = 10*numpy.log10(z)
197 if plot == 'cspc':
197 if plot == 'cspc':
198 self.data[plot] = dataOut.data_cspc
198 self.data[plot] = dataOut.data_cspc
199 if plot == 'noise':
199 if plot == 'noise':
200 buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
200 buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
201 if plot == 'rti':
201 if plot == 'rti':
202 buffer = dataOut.getPower()
202 buffer = dataOut.getPower()
203 if plot == 'snr_db':
203 if plot == 'snr_db':
204 self.data['snr'][tm] = dataOut.data_SNR
204 self.data['snr'][tm] = dataOut.data_SNR
205 if plot == 'snr':
205 if plot == 'snr':
206 buffer = 10*numpy.log10(dataOut.data_SNR)
206 buffer = 10*numpy.log10(dataOut.data_SNR)
207 if plot == 'dop':
207 if plot == 'dop':
208 buffer = 10*numpy.log10(dataOut.data_DOP)
208 buffer = 10*numpy.log10(dataOut.data_DOP)
209 if plot == 'mean':
209 if plot == 'mean':
210 buffer = dataOut.data_MEAN
210 buffer = dataOut.data_MEAN
211 if plot == 'std':
211 if plot == 'std':
212 buffer = dataOut.data_STD
212 buffer = dataOut.data_STD
213 if plot == 'coh':
213 if plot == 'coh':
214 buffer = dataOut.getCoherence()
214 buffer = dataOut.getCoherence()
215 if plot == 'phase':
215 if plot == 'phase':
216 buffer = dataOut.getCoherence(phase=True)
216 buffer = dataOut.getCoherence(phase=True)
217 if plot == 'output':
217 if plot == 'output':
218 buffer = dataOut.data_output
218 buffer = dataOut.data_output
219 if plot == 'param':
219 if plot == 'param':
220 buffer = dataOut.data_param
220 buffer = dataOut.data_param
221
221
222 if self.buffering:
222 if self.buffering:
223 self.data[plot][tm] = buffer
223 self.data[plot][tm] = buffer
224 else:
224 else:
225 self.data[plot] = buffer
225 self.data[plot] = buffer
226
226
227 def normalize_heights(self):
227 def normalize_heights(self):
228 '''
228 '''
229 Ensure same-dimension of the data for different heighList
229 Ensure same-dimension of the data for different heighList
230 '''
230 '''
231
231
232 H = numpy.array(list(self.__all_heights))
232 H = numpy.array(list(self.__all_heights))
233 H.sort()
233 H.sort()
234 for key in self.data:
234 for key in self.data:
235 shape = self.shape(key)[:-1] + H.shape
235 shape = self.shape(key)[:-1] + H.shape
236 for tm, obj in self.data[key].items():
236 for tm, obj in self.data[key].items():
237 h = self.__heights[self.__times.index(tm)]
237 h = self.__heights[self.__times.index(tm)]
238 if H.size == h.size:
238 if H.size == h.size:
239 continue
239 continue
240 index = numpy.where(numpy.in1d(H, h))[0]
240 index = numpy.where(numpy.in1d(H, h))[0]
241 dummy = numpy.zeros(shape) + numpy.nan
241 dummy = numpy.zeros(shape) + numpy.nan
242 if len(shape) == 2:
242 if len(shape) == 2:
243 dummy[:, index] = obj
243 dummy[:, index] = obj
244 else:
244 else:
245 dummy[index] = obj
245 dummy[index] = obj
246 self.data[key][tm] = dummy
246 self.data[key][tm] = dummy
247
247
248 self.__heights = [H for tm in self.__times]
248 self.__heights = [H for tm in self.__times]
249
249
250 def jsonify(self, decimate=False):
250 def jsonify(self, decimate=False):
251 '''
251 '''
252 Convert data to json
252 Convert data to json
253 '''
253 '''
254
254
255 data = {}
255 data = {}
256 tm = self.times[-1]
256 tm = self.times[-1]
257
257
258 for key in self.data:
258 for key in self.data:
259 if key in ('spc', 'cspc') or not self.buffering:
259 if key in ('spc', 'cspc') or not self.buffering:
260 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
260 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
261 dy = int(self.data[key].shape[2]/MAXNUMY) + 1
261 dy = int(self.data[key].shape[2]/MAXNUMY) + 1
262 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
262 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
263 else:
263 else:
264 data[key] = roundFloats(self.data[key][tm].tolist())
264 data[key] = roundFloats(self.data[key][tm].tolist())
265
265
266 ret = {'data': data}
266 ret = {'data': data}
267 ret['exp_code'] = self.exp_code
267 ret['exp_code'] = self.exp_code
268 ret['time'] = tm
268 ret['time'] = tm
269 ret['interval'] = self.interval
269 ret['interval'] = self.interval
270 ret['localtime'] = self.localtime
270 ret['localtime'] = self.localtime
271 ret['yrange'] = roundFloats(self.heights.tolist())
271 ret['yrange'] = roundFloats(self.heights.tolist())
272 if key in ('spc', 'cspc'):
272 if key in ('spc', 'cspc'):
273 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
273 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
274 else:
274 else:
275 ret['xrange'] = []
275 ret['xrange'] = []
276 if hasattr(self, 'pairs'):
276 if hasattr(self, 'pairs'):
277 ret['pairs'] = self.pairs
277 ret['pairs'] = self.pairs
278
279 for key, value in self.meta:
280 ret[key] = value
281
278 return json.dumps(ret)
282 return json.dumps(ret)
279
283
280 @property
284 @property
281 def times(self):
285 def times(self):
282 '''
286 '''
283 Return the list of times of the current data
287 Return the list of times of the current data
284 '''
288 '''
285
289
286 ret = numpy.array(self.__times)
290 ret = numpy.array(self.__times)
287 ret.sort()
291 ret.sort()
288 return ret
292 return ret
289
293
290 @property
294 @property
291 def heights(self):
295 def heights(self):
292 '''
296 '''
293 Return the list of heights of the current data
297 Return the list of heights of the current data
294 '''
298 '''
295
299
296 return numpy.array(self.__heights[-1])
300 return numpy.array(self.__heights[-1])
297
301
298 class PublishData(Operation):
302 class PublishData(Operation):
299 '''
303 '''
300 Operation to send data over zmq.
304 Operation to send data over zmq.
301 '''
305 '''
302
306
303 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
307 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
304
308
305 def __init__(self, **kwargs):
309 def __init__(self, **kwargs):
306 """Inicio."""
310 """Inicio."""
307 Operation.__init__(self, **kwargs)
311 Operation.__init__(self, **kwargs)
308 self.isConfig = False
312 self.isConfig = False
309 self.client = None
313 self.client = None
310 self.zeromq = None
314 self.zeromq = None
311 self.mqtt = None
315 self.mqtt = None
312
316
313 def on_disconnect(self, client, userdata, rc):
317 def on_disconnect(self, client, userdata, rc):
314 if rc != 0:
318 if rc != 0:
315 log.warning('Unexpected disconnection.')
319 log.warning('Unexpected disconnection.')
316 self.connect()
320 self.connect()
317
321
318 def connect(self):
322 def connect(self):
319 log.warning('trying to connect')
323 log.warning('trying to connect')
320 try:
324 try:
321 self.client.connect(
325 self.client.connect(
322 host=self.host,
326 host=self.host,
323 port=self.port,
327 port=self.port,
324 keepalive=60*10,
328 keepalive=60*10,
325 bind_address='')
329 bind_address='')
326 self.client.loop_start()
330 self.client.loop_start()
327 # self.client.publish(
331 # self.client.publish(
328 # self.topic + 'SETUP',
332 # self.topic + 'SETUP',
329 # json.dumps(setup),
333 # json.dumps(setup),
330 # retain=True
334 # retain=True
331 # )
335 # )
332 except:
336 except:
333 log.error('MQTT Conection error.')
337 log.error('MQTT Conection error.')
334 self.client = False
338 self.client = False
335
339
336 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
340 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
337 self.counter = 0
341 self.counter = 0
338 self.topic = kwargs.get('topic', 'schain')
342 self.topic = kwargs.get('topic', 'schain')
339 self.delay = kwargs.get('delay', 0)
343 self.delay = kwargs.get('delay', 0)
340 self.plottype = kwargs.get('plottype', 'spectra')
344 self.plottype = kwargs.get('plottype', 'spectra')
341 self.host = kwargs.get('host', "10.10.10.82")
345 self.host = kwargs.get('host', "10.10.10.82")
342 self.port = kwargs.get('port', 3000)
346 self.port = kwargs.get('port', 3000)
343 self.clientId = clientId
347 self.clientId = clientId
344 self.cnt = 0
348 self.cnt = 0
345 self.zeromq = zeromq
349 self.zeromq = zeromq
346 self.mqtt = kwargs.get('plottype', 0)
350 self.mqtt = kwargs.get('plottype', 0)
347 self.client = None
351 self.client = None
348 self.verbose = verbose
352 self.verbose = verbose
349 setup = []
353 setup = []
350 if mqtt is 1:
354 if mqtt is 1:
351 self.client = mqtt.Client(
355 self.client = mqtt.Client(
352 client_id=self.clientId + self.topic + 'SCHAIN',
356 client_id=self.clientId + self.topic + 'SCHAIN',
353 clean_session=True)
357 clean_session=True)
354 self.client.on_disconnect = self.on_disconnect
358 self.client.on_disconnect = self.on_disconnect
355 self.connect()
359 self.connect()
356 for plot in self.plottype:
360 for plot in self.plottype:
357 setup.append({
361 setup.append({
358 'plot': plot,
362 'plot': plot,
359 'topic': self.topic + plot,
363 'topic': self.topic + plot,
360 'title': getattr(self, plot + '_' + 'title', False),
364 'title': getattr(self, plot + '_' + 'title', False),
361 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
365 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
362 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
366 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
363 'xrange': getattr(self, plot + '_' + 'xrange', False),
367 'xrange': getattr(self, plot + '_' + 'xrange', False),
364 'yrange': getattr(self, plot + '_' + 'yrange', False),
368 'yrange': getattr(self, plot + '_' + 'yrange', False),
365 'zrange': getattr(self, plot + '_' + 'zrange', False),
369 'zrange': getattr(self, plot + '_' + 'zrange', False),
366 })
370 })
367 if zeromq is 1:
371 if zeromq is 1:
368 context = zmq.Context()
372 context = zmq.Context()
369 self.zmq_socket = context.socket(zmq.PUSH)
373 self.zmq_socket = context.socket(zmq.PUSH)
370 server = kwargs.get('server', 'zmq.pipe')
374 server = kwargs.get('server', 'zmq.pipe')
371
375
372 if 'tcp://' in server:
376 if 'tcp://' in server:
373 address = server
377 address = server
374 else:
378 else:
375 address = 'ipc:///tmp/%s' % server
379 address = 'ipc:///tmp/%s' % server
376
380
377 self.zmq_socket.connect(address)
381 self.zmq_socket.connect(address)
378 time.sleep(1)
382 time.sleep(1)
379
383
380
384
381 def publish_data(self):
385 def publish_data(self):
382 self.dataOut.finished = False
386 self.dataOut.finished = False
383 if self.mqtt is 1:
387 if self.mqtt is 1:
384 yData = self.dataOut.heightList[:2].tolist()
388 yData = self.dataOut.heightList[:2].tolist()
385 if self.plottype == 'spectra':
389 if self.plottype == 'spectra':
386 data = getattr(self.dataOut, 'data_spc')
390 data = getattr(self.dataOut, 'data_spc')
387 z = data/self.dataOut.normFactor
391 z = data/self.dataOut.normFactor
388 zdB = 10*numpy.log10(z)
392 zdB = 10*numpy.log10(z)
389 xlen, ylen = zdB[0].shape
393 xlen, ylen = zdB[0].shape
390 dx = int(xlen/MAXNUMX) + 1
394 dx = int(xlen/MAXNUMX) + 1
391 dy = int(ylen/MAXNUMY) + 1
395 dy = int(ylen/MAXNUMY) + 1
392 Z = [0 for i in self.dataOut.channelList]
396 Z = [0 for i in self.dataOut.channelList]
393 for i in self.dataOut.channelList:
397 for i in self.dataOut.channelList:
394 Z[i] = zdB[i][::dx, ::dy].tolist()
398 Z[i] = zdB[i][::dx, ::dy].tolist()
395 payload = {
399 payload = {
396 'timestamp': self.dataOut.utctime,
400 'timestamp': self.dataOut.utctime,
397 'data': roundFloats(Z),
401 'data': roundFloats(Z),
398 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
402 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
399 'interval': self.dataOut.getTimeInterval(),
403 'interval': self.dataOut.getTimeInterval(),
400 'type': self.plottype,
404 'type': self.plottype,
401 'yData': yData
405 'yData': yData
402 }
406 }
403
407
404 elif self.plottype in ('rti', 'power'):
408 elif self.plottype in ('rti', 'power'):
405 data = getattr(self.dataOut, 'data_spc')
409 data = getattr(self.dataOut, 'data_spc')
406 z = data/self.dataOut.normFactor
410 z = data/self.dataOut.normFactor
407 avg = numpy.average(z, axis=1)
411 avg = numpy.average(z, axis=1)
408 avgdB = 10*numpy.log10(avg)
412 avgdB = 10*numpy.log10(avg)
409 xlen, ylen = z[0].shape
413 xlen, ylen = z[0].shape
410 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
414 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
411 AVG = [0 for i in self.dataOut.channelList]
415 AVG = [0 for i in self.dataOut.channelList]
412 for i in self.dataOut.channelList:
416 for i in self.dataOut.channelList:
413 AVG[i] = avgdB[i][::dy].tolist()
417 AVG[i] = avgdB[i][::dy].tolist()
414 payload = {
418 payload = {
415 'timestamp': self.dataOut.utctime,
419 'timestamp': self.dataOut.utctime,
416 'data': roundFloats(AVG),
420 'data': roundFloats(AVG),
417 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
421 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
418 'interval': self.dataOut.getTimeInterval(),
422 'interval': self.dataOut.getTimeInterval(),
419 'type': self.plottype,
423 'type': self.plottype,
420 'yData': yData
424 'yData': yData
421 }
425 }
422 elif self.plottype == 'noise':
426 elif self.plottype == 'noise':
423 noise = self.dataOut.getNoise()/self.dataOut.normFactor
427 noise = self.dataOut.getNoise()/self.dataOut.normFactor
424 noisedB = 10*numpy.log10(noise)
428 noisedB = 10*numpy.log10(noise)
425 payload = {
429 payload = {
426 'timestamp': self.dataOut.utctime,
430 'timestamp': self.dataOut.utctime,
427 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
431 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
428 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
432 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
429 'interval': self.dataOut.getTimeInterval(),
433 'interval': self.dataOut.getTimeInterval(),
430 'type': self.plottype,
434 'type': self.plottype,
431 'yData': yData
435 'yData': yData
432 }
436 }
433 elif self.plottype == 'snr':
437 elif self.plottype == 'snr':
434 data = getattr(self.dataOut, 'data_SNR')
438 data = getattr(self.dataOut, 'data_SNR')
435 avgdB = 10*numpy.log10(data)
439 avgdB = 10*numpy.log10(data)
436
440
437 ylen = data[0].size
441 ylen = data[0].size
438 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
442 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
439 AVG = [0 for i in self.dataOut.channelList]
443 AVG = [0 for i in self.dataOut.channelList]
440 for i in self.dataOut.channelList:
444 for i in self.dataOut.channelList:
441 AVG[i] = avgdB[i][::dy].tolist()
445 AVG[i] = avgdB[i][::dy].tolist()
442 payload = {
446 payload = {
443 'timestamp': self.dataOut.utctime,
447 'timestamp': self.dataOut.utctime,
444 'data': roundFloats(AVG),
448 'data': roundFloats(AVG),
445 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
449 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
446 'type': self.plottype,
450 'type': self.plottype,
447 'yData': yData
451 'yData': yData
448 }
452 }
449 else:
453 else:
450 print "Tipo de grafico invalido"
454 print "Tipo de grafico invalido"
451 payload = {
455 payload = {
452 'data': 'None',
456 'data': 'None',
453 'timestamp': 'None',
457 'timestamp': 'None',
454 'type': None
458 'type': None
455 }
459 }
456
460
457 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
461 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
458
462
459 if self.zeromq is 1:
463 if self.zeromq is 1:
460 if self.verbose:
464 if self.verbose:
461 log.log(
465 log.log(
462 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
466 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
463 self.name
467 self.name
464 )
468 )
465 self.zmq_socket.send_pyobj(self.dataOut)
469 self.zmq_socket.send_pyobj(self.dataOut)
466
470
467 def run(self, dataOut, **kwargs):
471 def run(self, dataOut, **kwargs):
468 self.dataOut = dataOut
472 self.dataOut = dataOut
469 if not self.isConfig:
473 if not self.isConfig:
470 self.setup(**kwargs)
474 self.setup(**kwargs)
471 self.isConfig = True
475 self.isConfig = True
472
476
473 self.publish_data()
477 self.publish_data()
474 time.sleep(self.delay)
478 time.sleep(self.delay)
475
479
476 def close(self):
480 def close(self):
477 if self.zeromq is 1:
481 if self.zeromq is 1:
478 self.dataOut.finished = True
482 self.dataOut.finished = True
479 self.zmq_socket.send_pyobj(self.dataOut)
483 self.zmq_socket.send_pyobj(self.dataOut)
480 time.sleep(0.1)
484 time.sleep(0.1)
481 self.zmq_socket.close()
485 self.zmq_socket.close()
482 if self.client:
486 if self.client:
483 self.client.loop_stop()
487 self.client.loop_stop()
484 self.client.disconnect()
488 self.client.disconnect()
485
489
486
490
487 class ReceiverData(ProcessingUnit):
491 class ReceiverData(ProcessingUnit):
488
492
489 __attrs__ = ['server']
493 __attrs__ = ['server']
490
494
491 def __init__(self, **kwargs):
495 def __init__(self, **kwargs):
492
496
493 ProcessingUnit.__init__(self, **kwargs)
497 ProcessingUnit.__init__(self, **kwargs)
494
498
495 self.isConfig = False
499 self.isConfig = False
496 server = kwargs.get('server', 'zmq.pipe')
500 server = kwargs.get('server', 'zmq.pipe')
497 if 'tcp://' in server:
501 if 'tcp://' in server:
498 address = server
502 address = server
499 else:
503 else:
500 address = 'ipc:///tmp/%s' % server
504 address = 'ipc:///tmp/%s' % server
501
505
502 self.address = address
506 self.address = address
503 self.dataOut = JROData()
507 self.dataOut = JROData()
504
508
505 def setup(self):
509 def setup(self):
506
510
507 self.context = zmq.Context()
511 self.context = zmq.Context()
508 self.receiver = self.context.socket(zmq.PULL)
512 self.receiver = self.context.socket(zmq.PULL)
509 self.receiver.bind(self.address)
513 self.receiver.bind(self.address)
510 time.sleep(0.5)
514 time.sleep(0.5)
511 log.success('ReceiverData from {}'.format(self.address))
515 log.success('ReceiverData from {}'.format(self.address))
512
516
513
517
514 def run(self):
518 def run(self):
515
519
516 if not self.isConfig:
520 if not self.isConfig:
517 self.setup()
521 self.setup()
518 self.isConfig = True
522 self.isConfig = True
519
523
520 self.dataOut = self.receiver.recv_pyobj()
524 self.dataOut = self.receiver.recv_pyobj()
521 log.log('{} - {}'.format(self.dataOut.type,
525 log.log('{} - {}'.format(self.dataOut.type,
522 self.dataOut.datatime.ctime(),),
526 self.dataOut.datatime.ctime(),),
523 'Receiving')
527 'Receiving')
524
528
525
529
526 class PlotterReceiver(ProcessingUnit, Process):
530 class PlotterReceiver(ProcessingUnit, Process):
527
531
528 throttle_value = 5
532 throttle_value = 5
529 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
533 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
530 'exp_code', 'web_server', 'buffering']
534 'exp_code', 'web_server', 'buffering']
531
535
532 def __init__(self, **kwargs):
536 def __init__(self, **kwargs):
533
537
534 ProcessingUnit.__init__(self, **kwargs)
538 ProcessingUnit.__init__(self, **kwargs)
535 Process.__init__(self)
539 Process.__init__(self)
536 self.mp = False
540 self.mp = False
537 self.isConfig = False
541 self.isConfig = False
538 self.isWebConfig = False
542 self.isWebConfig = False
539 self.connections = 0
543 self.connections = 0
540 server = kwargs.get('server', 'zmq.pipe')
544 server = kwargs.get('server', 'zmq.pipe')
541 web_server = kwargs.get('web_server', None)
545 web_server = kwargs.get('web_server', None)
542 if 'tcp://' in server:
546 if 'tcp://' in server:
543 address = server
547 address = server
544 else:
548 else:
545 address = 'ipc:///tmp/%s' % server
549 address = 'ipc:///tmp/%s' % server
546 self.address = address
550 self.address = address
547 self.web_address = web_server
551 self.web_address = web_server
548 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
552 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
549 self.realtime = kwargs.get('realtime', False)
553 self.realtime = kwargs.get('realtime', False)
550 self.localtime = kwargs.get('localtime', True)
554 self.localtime = kwargs.get('localtime', True)
551 self.buffering = kwargs.get('buffering', True)
555 self.buffering = kwargs.get('buffering', True)
552 self.throttle_value = kwargs.get('throttle', 5)
556 self.throttle_value = kwargs.get('throttle', 5)
553 self.exp_code = kwargs.get('exp_code', None)
557 self.exp_code = kwargs.get('exp_code', None)
554 self.sendData = self.initThrottle(self.throttle_value)
558 self.sendData = self.initThrottle(self.throttle_value)
555 self.dates = []
559 self.dates = []
556 self.setup()
560 self.setup()
557
561
558 def setup(self):
562 def setup(self):
559
563
560 self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering)
564 self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering)
561 self.isConfig = True
565 self.isConfig = True
562
566
563 def event_monitor(self, monitor):
567 def event_monitor(self, monitor):
564
568
565 events = {}
569 events = {}
566
570
567 for name in dir(zmq):
571 for name in dir(zmq):
568 if name.startswith('EVENT_'):
572 if name.startswith('EVENT_'):
569 value = getattr(zmq, name)
573 value = getattr(zmq, name)
570 events[value] = name
574 events[value] = name
571
575
572 while monitor.poll():
576 while monitor.poll():
573 evt = recv_monitor_message(monitor)
577 evt = recv_monitor_message(monitor)
574 if evt['event'] == 32:
578 if evt['event'] == 32:
575 self.connections += 1
579 self.connections += 1
576 if evt['event'] == 512:
580 if evt['event'] == 512:
577 pass
581 pass
578
582
579 evt.update({'description': events[evt['event']]})
583 evt.update({'description': events[evt['event']]})
580
584
581 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
585 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
582 break
586 break
583 monitor.close()
587 monitor.close()
584 print('event monitor thread done!')
588 print('event monitor thread done!')
585
589
586 def initThrottle(self, throttle_value):
590 def initThrottle(self, throttle_value):
587
591
588 @throttle(seconds=throttle_value)
592 @throttle(seconds=throttle_value)
589 def sendDataThrottled(fn_sender, data):
593 def sendDataThrottled(fn_sender, data):
590 fn_sender(data)
594 fn_sender(data)
591
595
592 return sendDataThrottled
596 return sendDataThrottled
593
597
594 def send(self, data):
598 def send(self, data):
595 log.log('Sending {}'.format(data), self.name)
599 log.log('Sending {}'.format(data), self.name)
596 self.sender.send_pyobj(data)
600 self.sender.send_pyobj(data)
597
601
598 def run(self):
602 def run(self):
599
603
600 log.log(
604 log.log(
601 'Starting from {}'.format(self.address),
605 'Starting from {}'.format(self.address),
602 self.name
606 self.name
603 )
607 )
604
608
605 self.context = zmq.Context()
609 self.context = zmq.Context()
606 self.receiver = self.context.socket(zmq.PULL)
610 self.receiver = self.context.socket(zmq.PULL)
607 self.receiver.bind(self.address)
611 self.receiver.bind(self.address)
608 monitor = self.receiver.get_monitor_socket()
612 monitor = self.receiver.get_monitor_socket()
609 self.sender = self.context.socket(zmq.PUB)
613 self.sender = self.context.socket(zmq.PUB)
610 if self.web_address:
614 if self.web_address:
611 log.success(
615 log.success(
612 'Sending to web: {}'.format(self.web_address),
616 'Sending to web: {}'.format(self.web_address),
613 self.name
617 self.name
614 )
618 )
615 self.sender_web = self.context.socket(zmq.PUSH)
619 self.sender_web = self.context.socket(zmq.PUSH)
616 self.sender_web.connect(self.web_address)
620 self.sender_web.connect(self.web_address)
617 time.sleep(1)
621 time.sleep(1)
618
622
619 if 'server' in self.kwargs:
623 if 'server' in self.kwargs:
620 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
624 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
621 else:
625 else:
622 self.sender.bind("ipc:///tmp/zmq.plots")
626 self.sender.bind("ipc:///tmp/zmq.plots")
623
627
624 time.sleep(2)
628 time.sleep(2)
625
629
626 t = Thread(target=self.event_monitor, args=(monitor,))
630 t = Thread(target=self.event_monitor, args=(monitor,))
627 t.start()
631 t.start()
628
632
629 while True:
633 while True:
630 dataOut = self.receiver.recv_pyobj()
634 dataOut = self.receiver.recv_pyobj()
631 if not dataOut.flagNoData:
635 if not dataOut.flagNoData:
632 if dataOut.type == 'Parameters':
636 if dataOut.type == 'Parameters':
633 tm = dataOut.utctimeInit
637 tm = dataOut.utctimeInit
634 else:
638 else:
635 tm = dataOut.utctime
639 tm = dataOut.utctime
636 if dataOut.useLocalTime:
640 if dataOut.useLocalTime:
637 if not self.localtime:
641 if not self.localtime:
638 tm += time.timezone
642 tm += time.timezone
639 dt = datetime.datetime.fromtimestamp(tm).date()
643 dt = datetime.datetime.fromtimestamp(tm).date()
640 else:
644 else:
641 if self.localtime:
645 if self.localtime:
642 tm -= time.timezone
646 tm -= time.timezone
643 dt = datetime.datetime.utcfromtimestamp(tm).date()
647 dt = datetime.datetime.utcfromtimestamp(tm).date()
644 coerce = False
648 coerce = False
645 if dt not in self.dates:
649 if dt not in self.dates:
646 if self.data:
650 if self.data:
647 self.data.ended = True
651 self.data.ended = True
648 self.send(self.data)
652 self.send(self.data)
649 coerce = True
653 coerce = True
650 self.data.setup()
654 self.data.setup()
651 self.dates.append(dt)
655 self.dates.append(dt)
652
656
653 self.data.update(dataOut, tm)
657 self.data.update(dataOut, tm)
654
658
655 if dataOut.finished is True:
659 if dataOut.finished is True:
656 self.connections -= 1
660 self.connections -= 1
657 if self.connections == 0 and dt in self.dates:
661 if self.connections == 0 and dt in self.dates:
658 self.data.ended = True
662 self.data.ended = True
659 self.send(self.data)
663 self.send(self.data)
660 self.data.setup()
664 self.data.setup()
661 else:
665 else:
662 if self.realtime:
666 if self.realtime:
663 self.send(self.data)
667 self.send(self.data)
664 if self.web_address:
668 if self.web_address:
665 payload = self.data.jsonify()
669 payload = self.data.jsonify()
666 log.log('Sending to web... type:{}, size:{}'.format(dataOut.type, len(payload)), self.name)
670 log.log('Sending to web... type:{}, size:{}'.format(dataOut.type, len(payload)), self.name)
667 self.sender_web.send(payload)
671 self.sender_web.send(payload)
668 else:
672 else:
669 self.sendData(self.send, self.data, coerce=coerce)
673 self.sendData(self.send, self.data, coerce=coerce)
670 coerce = False
674 coerce = False
671
675
672 return
676 return
673
677
674
678
675 class SendToFTP(Operation, Process):
679 class SendToFTP(Operation, Process):
676
680
677 '''
681 '''
678 Operation to send data over FTP.
682 Operation to send data over FTP.
679 '''
683 '''
680
684
681 __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout']
685 __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout']
682
686
683 def __init__(self, **kwargs):
687 def __init__(self, **kwargs):
684 '''
688 '''
685 patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...]
689 patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...]
686 '''
690 '''
687 Operation.__init__(self, **kwargs)
691 Operation.__init__(self, **kwargs)
688 Process.__init__(self)
692 Process.__init__(self)
689 self.server = kwargs.get('server')
693 self.server = kwargs.get('server')
690 self.username = kwargs.get('username')
694 self.username = kwargs.get('username')
691 self.password = kwargs.get('password')
695 self.password = kwargs.get('password')
692 self.patterns = kwargs.get('patterns')
696 self.patterns = kwargs.get('patterns')
693 self.timeout = kwargs.get('timeout', 10)
697 self.timeout = kwargs.get('timeout', 30)
694 self.times = [time.time() for p in self.patterns]
698 self.times = [time.time() for p in self.patterns]
695 self.latest = ['' for p in self.patterns]
699 self.latest = ['' for p in self.patterns]
696 self.mp = False
700 self.mp = False
697 self.ftp = None
701 self.ftp = None
698
702
699 def setup(self):
703 def setup(self):
700
704
701 log.log('Connecting to ftp://{}'.format(self.server), self.name)
705 log.log('Connecting to ftp://{}'.format(self.server), self.name)
702 try:
706 try:
703 self.ftp = ftplib.FTP(self.server, timeout=self.timeout)
707 self.ftp = ftplib.FTP(self.server, timeout=self.timeout)
704 except ftplib.all_errors:
708 except ftplib.all_errors:
705 log.error('Server connection fail: {}'.format(self.server), self.name)
709 log.error('Server connection fail: {}'.format(self.server), self.name)
706 if self.ftp is not None:
710 if self.ftp is not None:
707 self.ftp.close()
711 self.ftp.close()
708 self.ftp = None
712 self.ftp = None
709 self.isConfig = False
713 self.isConfig = False
710 return
714 return
711
715
712 try:
716 try:
713 self.ftp.login(self.username, self.password)
717 self.ftp.login(self.username, self.password)
714 except ftplib.all_errors:
718 except ftplib.all_errors:
715 log.error('The given username y/o password are incorrect', self.name)
719 log.error('The given username y/o password are incorrect', self.name)
716 if self.ftp is not None:
720 if self.ftp is not None:
717 self.ftp.close()
721 self.ftp.close()
718 self.ftp = None
722 self.ftp = None
719 self.isConfig = False
723 self.isConfig = False
720 return
724 return
721
725
722 log.success('Connection success', self.name)
726 log.success('Connection success', self.name)
723 self.isConfig = True
727 self.isConfig = True
724 return
728 return
725
729
726 def check(self):
730 def check(self):
727
731
728 try:
732 try:
729 self.ftp.voidcmd("NOOP")
733 self.ftp.voidcmd("NOOP")
730 except:
734 except:
731 log.warning('Connection lost... trying to reconnect', self.name)
735 log.warning('Connection lost... trying to reconnect', self.name)
732 if self.ftp is not None:
736 if self.ftp is not None:
733 self.ftp.close()
737 self.ftp.close()
734 self.ftp = None
738 self.ftp = None
735 self.setup()
739 self.setup()
736
740
737 def find_files(self, path, ext):
741 def find_files(self, path, ext):
738
742
739 files = glob.glob1(path, '*{}'.format(ext))
743 files = glob.glob1(path, '*{}'.format(ext))
740 files.sort()
744 files.sort()
741 if files:
745 if files:
742 return files[-1]
746 return files[-1]
743 return None
747 return None
744
748
745 def getftpname(self, filename, exp_code, sub_exp_code):
749 def getftpname(self, filename, exp_code, sub_exp_code):
746
750
747 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
751 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
748 YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year
752 YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year
749 DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday
753 DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday
750 exp_code = '%3.3d'%exp_code
754 exp_code = '%3.3d'%exp_code
751 sub_exp_code = '%2.2d'%sub_exp_code
755 sub_exp_code = '%2.2d'%sub_exp_code
752 plot_code = '%2.2d'% PLOT_CODES[filename.split('_')[0].split('-')[1]]
756 plot_code = '%2.2d'% PLOT_CODES[filename.split('_')[0].split('-')[1]]
753 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
757 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
754 return name
758 return name
755
759
756 def upload(self, src, dst):
760 def upload(self, src, dst):
757
761
758 log.log('Uploading {} '.format(src), self.name, nl=False)
762 log.log('Uploading {} '.format(src), self.name, nl=False)
759
763
760 fp = open(src, 'rb')
764 fp = open(src, 'rb')
761 command = 'STOR {}'.format(dst)
765 command = 'STOR {}'.format(dst)
762
766
763 try:
767 try:
764 self.ftp.storbinary(command, fp, blocksize=1024)
768 self.ftp.storbinary(command, fp, blocksize=1024)
765 except ftplib.all_errors, e:
769 except ftplib.all_errors, e:
766 log.error('{}'.format(e), self.name)
770 log.error('{}'.format(e), self.name)
767 if self.ftp is not None:
771 if self.ftp is not None:
768 self.ftp.close()
772 self.ftp.close()
769 self.ftp = None
773 self.ftp = None
770 return
774 return
771
775
772 try:
776 try:
773 self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
777 self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
774 except ftplib.all_errors, e:
778 except ftplib.all_errors, e:
775 log.error('{}'.format(e), self.name)
779 log.error('{}'.format(e), self.name)
776 if self.ftp is not None:
780 if self.ftp is not None:
777 self.ftp.close()
781 self.ftp.close()
778 self.ftp = None
782 self.ftp = None
779
783
780 fp.close()
784 fp.close()
781
785
782 log.success('OK', tag='')
786 log.success('OK', tag='')
783
787
784 def send_files(self):
788 def send_files(self):
785
789
786 for x, pattern in enumerate(self.patterns):
790 for x, pattern in enumerate(self.patterns):
787 local, remote, ext, delay, exp_code, sub_exp_code = pattern
791 local, remote, ext, delay, exp_code, sub_exp_code = pattern
788 if time.time()-self.times[x] >= delay:
792 if time.time()-self.times[x] >= delay:
789 srcname = self.find_files(local, ext)
793 srcname = self.find_files(local, ext)
790
794
791 if srcname is None or srcname == self.latest[x]:
795 if srcname is None or srcname == self.latest[x]:
792 continue
796 continue
793
797
794 if 'png' in ext:
798 if 'png' in ext:
795 dstname = self.getftpname(srcname, exp_code, sub_exp_code)
799 dstname = self.getftpname(srcname, exp_code, sub_exp_code)
796 else:
800 else:
797 dstname = srcname
801 dstname = srcname
798
802
799 src = os.path.join(local, srcname)
803 src = os.path.join(local, srcname)
800
804
801 if os.path.getmtime(src) < time.time() - 30*60:
805 if os.path.getmtime(src) < time.time() - 30*60:
802 continue
806 continue
803
807
804 dst = os.path.join(remote, dstname)
808 dst = os.path.join(remote, dstname)
805
809
806 if self.ftp is None:
810 if self.ftp is None:
807 continue
811 continue
808
812
809 self.upload(src, dst)
813 self.upload(src, dst)
810
814
811 self.times[x] = time.time()
815 self.times[x] = time.time()
812 self.latest[x] = srcname
816 self.latest[x] = srcname
813
817
814 def run(self):
818 def run(self):
815
819
816 while True:
820 while True:
817 if not self.isConfig:
821 if not self.isConfig:
818 self.setup()
822 self.setup()
819 if self.ftp is not None:
823 if self.ftp is not None:
820 self.check()
824 self.check()
821 self.send_files()
825 self.send_files()
822 time.sleep(2)
826 time.sleep(10)
823
827
824 def close():
828 def close():
825
829
826 if self.ftp is not None:
830 if self.ftp is not None:
827 if self.ftp is not None:
831 self.ftp.close()
828 self.ftp.close()
829 self.terminate()
832 self.terminate()
General Comments 0
You need to be logged in to leave comments. Login now