##// END OF EJS Templates
New Operation SendToFTP
jespinoza -
r1135:eed97f9f9bf5
parent child
Show More
@@ -1,631 +1,829
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 import os
6 import glob
5 7 import time
6 8 import json
7 9 import numpy
8 10 import paho.mqtt.client as mqtt
9 11 import zmq
10 12 import datetime
13 import ftplib
11 14 from zmq.utils.monitor import recv_monitor_message
12 15 from functools import wraps
13 16 from threading import Thread
14 17 from multiprocessing import Process
15 18
16 19 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 20 from schainpy.model.data.jrodata import JROData
18 21 from schainpy.utils import log
19 22
20 MAXNUMX = 100
21 MAXNUMY = 100
23 MAXNUMX = 500
24 MAXNUMY = 500
25
26 PLOT_CODES = {
27 'rti': 0, #Range time intensity (RTI).
28 'spc': 1, #Spectra (and Cross-spectra) information.
29 'cspc': 2, #Cross-Correlation information.
30 'coh': 3, #Coherence map.
31 'base': 4, #Base lines graphic.
32 'row': 5, #Row Spectra.
33 'total' : 6, #Total Power.
34 'drift' : 7, #Drifts graphics.
35 'height' : 8, #Height profile.
36 'phase' : 9, #Signal Phase.
37 'power' : 16,
38 'noise' : 17,
39 'beacon' : 18,
40 #USED IN jroplot_parameters.py
41 'wind' : 22,
42 'skymap' : 23,
43 # 'MPHASE_CODE' : 24,
44 'moments' : 25,
45 'param' : 26,
46 'spc_fit' : 27,
47 'ew_drifts' : 28,
48 'reflectivity': 30
49 }
22 50
23 51 class PrettyFloat(float):
24 52 def __repr__(self):
25 53 return '%.2f' % self
26 54
27 55 def roundFloats(obj):
28 56 if isinstance(obj, list):
29 57 return map(roundFloats, obj)
30 58 elif isinstance(obj, float):
31 59 return round(obj, 2)
32 60
33 61 def decimate(z, MAXNUMY):
34 62 dy = int(len(z[0])/MAXNUMY) + 1
35 63
36 64 return z[::, ::dy]
37 65
38 66 class throttle(object):
39 67 '''
40 68 Decorator that prevents a function from being called more than once every
41 69 time period.
42 70 To create a function that cannot be called more than once a minute, but
43 71 will sleep until it can be called:
44 72 @throttle(minutes=1)
45 73 def foo():
46 74 pass
47 75
48 76 for i in range(10):
49 77 foo()
50 78 print "This function has run %s times." % i
51 79 '''
52 80
53 81 def __init__(self, seconds=0, minutes=0, hours=0):
54 82 self.throttle_period = datetime.timedelta(
55 83 seconds=seconds, minutes=minutes, hours=hours
56 84 )
57 85
58 86 self.time_of_last_call = datetime.datetime.min
59 87
60 88 def __call__(self, fn):
61 89 @wraps(fn)
62 90 def wrapper(*args, **kwargs):
63 91 coerce = kwargs.pop('coerce', None)
64 92 if coerce:
65 93 self.time_of_last_call = datetime.datetime.now()
66 94 return fn(*args, **kwargs)
67 95 else:
68 96 now = datetime.datetime.now()
69 97 time_since_last_call = now - self.time_of_last_call
70 98 time_left = self.throttle_period - time_since_last_call
71 99
72 100 if time_left > datetime.timedelta(seconds=0):
73 101 return
74 102
75 103 self.time_of_last_call = datetime.datetime.now()
76 104 return fn(*args, **kwargs)
77 105
78 106 return wrapper
79 107
80 108 class Data(object):
81 109 '''
82 110 Object to hold data to be plotted
83 111 '''
84 112
85 def __init__(self, plottypes, throttle_value, exp_code):
113 def __init__(self, plottypes, throttle_value, exp_code, buffering=True):
86 114 self.plottypes = plottypes
87 115 self.throttle = throttle_value
88 116 self.exp_code = exp_code
117 self.buffering = buffering
89 118 self.ended = False
90 119 self.localtime = False
91 120 self.__times = []
92 121 self.__heights = []
93 122
94 123 def __str__(self):
95 124 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
96 125 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
97 126
98 127 def __len__(self):
99 128 return len(self.__times)
100 129
101 130 def __getitem__(self, key):
102 131 if key not in self.data:
103 132 raise KeyError(log.error('Missing key: {}'.format(key)))
104 133
105 if 'spc' in key:
134 if 'spc' in key or not self.buffering:
106 135 ret = self.data[key]
107 136 else:
108 137 ret = numpy.array([self.data[key][x] for x in self.times])
109 138 if ret.ndim > 1:
110 139 ret = numpy.swapaxes(ret, 0, 1)
111 140 return ret
112 141
113 142 def __contains__(self, key):
114 143 return key in self.data
115 144
116 145 def setup(self):
117 146 '''
118 147 Configure object
119 148 '''
120 149
150 self.type = ''
121 151 self.ended = False
122 152 self.data = {}
123 153 self.__times = []
124 154 self.__heights = []
125 155 self.__all_heights = set()
126 156 for plot in self.plottypes:
127 157 if 'snr' in plot:
128 158 plot = 'snr'
129 159 self.data[plot] = {}
130 160
131 161 def shape(self, key):
132 162 '''
133 163 Get the shape of the one-element data for the given key
134 164 '''
135 165
136 166 if len(self.data[key]):
137 if 'spc' in key:
167 if 'spc' in key or not self.buffering:
138 168 return self.data[key].shape
139 169 return self.data[key][self.__times[0]].shape
140 170 return (0,)
141 171
142 172 def update(self, dataOut, tm):
143 173 '''
144 174 Update data object with new dataOut
145 175 '''
146 176
147 177 if tm in self.__times:
148 178 return
149 179
180 self.type = dataOut.type
150 181 self.parameters = getattr(dataOut, 'parameters', [])
151 182 if hasattr(dataOut, 'pairsList'):
152 183 self.pairs = dataOut.pairsList
153 184 self.channels = dataOut.channelList
154 185 self.interval = dataOut.getTimeInterval()
155 186 self.localtime = dataOut.useLocalTime
156 187 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
157 188 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
158 189 self.__heights.append(dataOut.heightList)
159 190 self.__all_heights.update(dataOut.heightList)
160 191 self.__times.append(tm)
161 192
162 193 for plot in self.plottypes:
163 194 if plot == 'spc':
164 195 z = dataOut.data_spc/dataOut.normFactor
165 196 self.data[plot] = 10*numpy.log10(z)
166 197 if plot == 'cspc':
167 198 self.data[plot] = dataOut.data_cspc
168 199 if plot == 'noise':
169 self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
200 buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
170 201 if plot == 'rti':
171 self.data[plot][tm] = dataOut.getPower()
202 buffer = dataOut.getPower()
172 203 if plot == 'snr_db':
173 204 self.data['snr'][tm] = dataOut.data_SNR
174 205 if plot == 'snr':
175 self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR)
206 buffer = 10*numpy.log10(dataOut.data_SNR)
176 207 if plot == 'dop':
177 self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP)
208 buffer = 10*numpy.log10(dataOut.data_DOP)
178 209 if plot == 'mean':
179 self.data[plot][tm] = dataOut.data_MEAN
210 buffer = dataOut.data_MEAN
180 211 if plot == 'std':
181 self.data[plot][tm] = dataOut.data_STD
212 buffer = dataOut.data_STD
182 213 if plot == 'coh':
183 self.data[plot][tm] = dataOut.getCoherence()
214 buffer = dataOut.getCoherence()
184 215 if plot == 'phase':
185 self.data[plot][tm] = dataOut.getCoherence(phase=True)
216 buffer = dataOut.getCoherence(phase=True)
186 217 if plot == 'output':
187 self.data[plot][tm] = dataOut.data_output
218 buffer = dataOut.data_output
188 219 if plot == 'param':
189 self.data[plot][tm] = dataOut.data_param
220 buffer = dataOut.data_param
221
222 if self.buffering:
223 self.data[plot][tm] = buffer
224 else:
225 self.data[plot] = buffer
190 226
191 227 def normalize_heights(self):
192 228 '''
193 229 Ensure same-dimension of the data for different heighList
194 230 '''
195 231
196 232 H = numpy.array(list(self.__all_heights))
197 233 H.sort()
198 234 for key in self.data:
199 235 shape = self.shape(key)[:-1] + H.shape
200 236 for tm, obj in self.data[key].items():
201 237 h = self.__heights[self.__times.index(tm)]
202 238 if H.size == h.size:
203 239 continue
204 240 index = numpy.where(numpy.in1d(H, h))[0]
205 241 dummy = numpy.zeros(shape) + numpy.nan
206 242 if len(shape) == 2:
207 243 dummy[:, index] = obj
208 244 else:
209 245 dummy[index] = obj
210 246 self.data[key][tm] = dummy
211 247
212 248 self.__heights = [H for tm in self.__times]
213 249
214 250 def jsonify(self, decimate=False):
215 251 '''
216 252 Convert data to json
217 253 '''
218 254
219 255 data = {}
220 256 tm = self.times[-1]
221 257
222 258 for key in self.data:
223 if key in ('spc', 'cspc'):
259 if key in ('spc', 'cspc') or not self.buffering:
224 260 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
225 261 dy = int(self.data[key].shape[2]/MAXNUMY) + 1
226 262 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
227 263 else:
228 264 data[key] = roundFloats(self.data[key][tm].tolist())
229 265
230 266 ret = {'data': data}
231 267 ret['exp_code'] = self.exp_code
232 268 ret['time'] = tm
233 269 ret['interval'] = self.interval
234 270 ret['localtime'] = self.localtime
235 271 ret['yrange'] = roundFloats(self.heights.tolist())
236 272 if key in ('spc', 'cspc'):
237 273 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
274 else:
275 ret['xrange'] = []
238 276 if hasattr(self, 'pairs'):
239 277 ret['pairs'] = self.pairs
240 278 return json.dumps(ret)
241 279
242 280 @property
243 281 def times(self):
244 282 '''
245 283 Return the list of times of the current data
246 284 '''
247 285
248 286 ret = numpy.array(self.__times)
249 287 ret.sort()
250 288 return ret
251 289
252 290 @property
253 291 def heights(self):
254 292 '''
255 293 Return the list of heights of the current data
256 294 '''
257 295
258 296 return numpy.array(self.__heights[-1])
259 297
260 298 class PublishData(Operation):
261 299 '''
262 300 Operation to send data over zmq.
263 301 '''
264 302
265 303 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
266 304
267 305 def __init__(self, **kwargs):
268 306 """Inicio."""
269 307 Operation.__init__(self, **kwargs)
270 308 self.isConfig = False
271 309 self.client = None
272 310 self.zeromq = None
273 311 self.mqtt = None
274 312
275 313 def on_disconnect(self, client, userdata, rc):
276 314 if rc != 0:
277 315 log.warning('Unexpected disconnection.')
278 316 self.connect()
279 317
280 318 def connect(self):
281 319 log.warning('trying to connect')
282 320 try:
283 321 self.client.connect(
284 322 host=self.host,
285 323 port=self.port,
286 324 keepalive=60*10,
287 325 bind_address='')
288 326 self.client.loop_start()
289 327 # self.client.publish(
290 328 # self.topic + 'SETUP',
291 329 # json.dumps(setup),
292 330 # retain=True
293 331 # )
294 332 except:
295 333 log.error('MQTT Conection error.')
296 334 self.client = False
297 335
298 336 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
299 337 self.counter = 0
300 338 self.topic = kwargs.get('topic', 'schain')
301 339 self.delay = kwargs.get('delay', 0)
302 340 self.plottype = kwargs.get('plottype', 'spectra')
303 341 self.host = kwargs.get('host', "10.10.10.82")
304 342 self.port = kwargs.get('port', 3000)
305 343 self.clientId = clientId
306 344 self.cnt = 0
307 345 self.zeromq = zeromq
308 346 self.mqtt = kwargs.get('plottype', 0)
309 347 self.client = None
310 348 self.verbose = verbose
311 349 setup = []
312 350 if mqtt is 1:
313 351 self.client = mqtt.Client(
314 352 client_id=self.clientId + self.topic + 'SCHAIN',
315 353 clean_session=True)
316 354 self.client.on_disconnect = self.on_disconnect
317 355 self.connect()
318 356 for plot in self.plottype:
319 357 setup.append({
320 358 'plot': plot,
321 359 'topic': self.topic + plot,
322 360 'title': getattr(self, plot + '_' + 'title', False),
323 361 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
324 362 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
325 363 'xrange': getattr(self, plot + '_' + 'xrange', False),
326 364 'yrange': getattr(self, plot + '_' + 'yrange', False),
327 365 'zrange': getattr(self, plot + '_' + 'zrange', False),
328 366 })
329 367 if zeromq is 1:
330 368 context = zmq.Context()
331 369 self.zmq_socket = context.socket(zmq.PUSH)
332 370 server = kwargs.get('server', 'zmq.pipe')
333 371
334 372 if 'tcp://' in server:
335 373 address = server
336 374 else:
337 375 address = 'ipc:///tmp/%s' % server
338 376
339 377 self.zmq_socket.connect(address)
340 378 time.sleep(1)
341 379
342 380
343 381 def publish_data(self):
344 382 self.dataOut.finished = False
345 383 if self.mqtt is 1:
346 384 yData = self.dataOut.heightList[:2].tolist()
347 385 if self.plottype == 'spectra':
348 386 data = getattr(self.dataOut, 'data_spc')
349 387 z = data/self.dataOut.normFactor
350 388 zdB = 10*numpy.log10(z)
351 389 xlen, ylen = zdB[0].shape
352 390 dx = int(xlen/MAXNUMX) + 1
353 391 dy = int(ylen/MAXNUMY) + 1
354 392 Z = [0 for i in self.dataOut.channelList]
355 393 for i in self.dataOut.channelList:
356 394 Z[i] = zdB[i][::dx, ::dy].tolist()
357 395 payload = {
358 396 'timestamp': self.dataOut.utctime,
359 397 'data': roundFloats(Z),
360 398 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
361 399 'interval': self.dataOut.getTimeInterval(),
362 400 'type': self.plottype,
363 401 'yData': yData
364 402 }
365 403
366 404 elif self.plottype in ('rti', 'power'):
367 405 data = getattr(self.dataOut, 'data_spc')
368 406 z = data/self.dataOut.normFactor
369 407 avg = numpy.average(z, axis=1)
370 408 avgdB = 10*numpy.log10(avg)
371 409 xlen, ylen = z[0].shape
372 410 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
373 411 AVG = [0 for i in self.dataOut.channelList]
374 412 for i in self.dataOut.channelList:
375 413 AVG[i] = avgdB[i][::dy].tolist()
376 414 payload = {
377 415 'timestamp': self.dataOut.utctime,
378 416 'data': roundFloats(AVG),
379 417 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
380 418 'interval': self.dataOut.getTimeInterval(),
381 419 'type': self.plottype,
382 420 'yData': yData
383 421 }
384 422 elif self.plottype == 'noise':
385 423 noise = self.dataOut.getNoise()/self.dataOut.normFactor
386 424 noisedB = 10*numpy.log10(noise)
387 425 payload = {
388 426 'timestamp': self.dataOut.utctime,
389 427 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
390 428 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
391 429 'interval': self.dataOut.getTimeInterval(),
392 430 'type': self.plottype,
393 431 'yData': yData
394 432 }
395 433 elif self.plottype == 'snr':
396 434 data = getattr(self.dataOut, 'data_SNR')
397 435 avgdB = 10*numpy.log10(data)
398 436
399 437 ylen = data[0].size
400 438 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
401 439 AVG = [0 for i in self.dataOut.channelList]
402 440 for i in self.dataOut.channelList:
403 441 AVG[i] = avgdB[i][::dy].tolist()
404 442 payload = {
405 443 'timestamp': self.dataOut.utctime,
406 444 'data': roundFloats(AVG),
407 445 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
408 446 'type': self.plottype,
409 447 'yData': yData
410 448 }
411 449 else:
412 450 print "Tipo de grafico invalido"
413 451 payload = {
414 452 'data': 'None',
415 453 'timestamp': 'None',
416 454 'type': None
417 455 }
418 456
419 457 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
420 458
421 459 if self.zeromq is 1:
422 460 if self.verbose:
423 461 log.log(
424 462 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
425 463 self.name
426 464 )
427 465 self.zmq_socket.send_pyobj(self.dataOut)
428 466
429 467 def run(self, dataOut, **kwargs):
430 468 self.dataOut = dataOut
431 469 if not self.isConfig:
432 470 self.setup(**kwargs)
433 471 self.isConfig = True
434 472
435 473 self.publish_data()
436 474 time.sleep(self.delay)
437 475
438 476 def close(self):
439 477 if self.zeromq is 1:
440 478 self.dataOut.finished = True
441 479 self.zmq_socket.send_pyobj(self.dataOut)
442 480 time.sleep(0.1)
443 481 self.zmq_socket.close()
444 482 if self.client:
445 483 self.client.loop_stop()
446 484 self.client.disconnect()
447 485
448 486
449 487 class ReceiverData(ProcessingUnit):
450 488
451 489 __attrs__ = ['server']
452 490
453 491 def __init__(self, **kwargs):
454 492
455 493 ProcessingUnit.__init__(self, **kwargs)
456 494
457 495 self.isConfig = False
458 496 server = kwargs.get('server', 'zmq.pipe')
459 497 if 'tcp://' in server:
460 498 address = server
461 499 else:
462 500 address = 'ipc:///tmp/%s' % server
463 501
464 502 self.address = address
465 503 self.dataOut = JROData()
466 504
467 505 def setup(self):
468 506
469 507 self.context = zmq.Context()
470 508 self.receiver = self.context.socket(zmq.PULL)
471 509 self.receiver.bind(self.address)
472 510 time.sleep(0.5)
473 511 log.success('ReceiverData from {}'.format(self.address))
474 512
475 513
476 514 def run(self):
477 515
478 516 if not self.isConfig:
479 517 self.setup()
480 518 self.isConfig = True
481 519
482 520 self.dataOut = self.receiver.recv_pyobj()
483 521 log.log('{} - {}'.format(self.dataOut.type,
484 522 self.dataOut.datatime.ctime(),),
485 523 'Receiving')
486 524
487 525
488 526 class PlotterReceiver(ProcessingUnit, Process):
489 527
490 528 throttle_value = 5
491 529 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
492 'exp_code', 'web_server']
530 'exp_code', 'web_server', 'buffering']
493 531
494 532 def __init__(self, **kwargs):
495 533
496 534 ProcessingUnit.__init__(self, **kwargs)
497 535 Process.__init__(self)
498 536 self.mp = False
499 537 self.isConfig = False
500 538 self.isWebConfig = False
501 539 self.connections = 0
502 540 server = kwargs.get('server', 'zmq.pipe')
503 541 web_server = kwargs.get('web_server', None)
504 542 if 'tcp://' in server:
505 543 address = server
506 544 else:
507 545 address = 'ipc:///tmp/%s' % server
508 546 self.address = address
509 547 self.web_address = web_server
510 548 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
511 549 self.realtime = kwargs.get('realtime', False)
512 550 self.localtime = kwargs.get('localtime', True)
551 self.buffering = kwargs.get('buffering', True)
513 552 self.throttle_value = kwargs.get('throttle', 5)
514 553 self.exp_code = kwargs.get('exp_code', None)
515 554 self.sendData = self.initThrottle(self.throttle_value)
516 555 self.dates = []
517 556 self.setup()
518 557
519 558 def setup(self):
520 559
521 self.data = Data(self.plottypes, self.throttle_value, self.exp_code)
522 self.isConfig = True
560 self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering)
561 self.isConfig = True
523 562
524 563 def event_monitor(self, monitor):
525 564
526 565 events = {}
527 566
528 567 for name in dir(zmq):
529 568 if name.startswith('EVENT_'):
530 569 value = getattr(zmq, name)
531 570 events[value] = name
532 571
533 572 while monitor.poll():
534 573 evt = recv_monitor_message(monitor)
535 574 if evt['event'] == 32:
536 575 self.connections += 1
537 576 if evt['event'] == 512:
538 577 pass
539 578
540 579 evt.update({'description': events[evt['event']]})
541 580
542 581 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
543 582 break
544 583 monitor.close()
545 584 print('event monitor thread done!')
546 585
547 586 def initThrottle(self, throttle_value):
548 587
549 588 @throttle(seconds=throttle_value)
550 589 def sendDataThrottled(fn_sender, data):
551 590 fn_sender(data)
552 591
553 592 return sendDataThrottled
554 593
555 594 def send(self, data):
556 log.success('Sending {}'.format(data), self.name)
595 log.log('Sending {}'.format(data), self.name)
557 596 self.sender.send_pyobj(data)
558 597
559 598 def run(self):
560 599
561 log.success(
600 log.log(
562 601 'Starting from {}'.format(self.address),
563 602 self.name
564 603 )
565 604
566 605 self.context = zmq.Context()
567 606 self.receiver = self.context.socket(zmq.PULL)
568 607 self.receiver.bind(self.address)
569 608 monitor = self.receiver.get_monitor_socket()
570 609 self.sender = self.context.socket(zmq.PUB)
571 610 if self.web_address:
572 611 log.success(
573 612 'Sending to web: {}'.format(self.web_address),
574 613 self.name
575 614 )
576 self.sender_web = self.context.socket(zmq.PUB)
615 self.sender_web = self.context.socket(zmq.PUSH)
577 616 self.sender_web.connect(self.web_address)
578 617 time.sleep(1)
579 618
580 619 if 'server' in self.kwargs:
581 620 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
582 621 else:
583 622 self.sender.bind("ipc:///tmp/zmq.plots")
584 623
585 624 time.sleep(2)
586 625
587 626 t = Thread(target=self.event_monitor, args=(monitor,))
588 627 t.start()
589 628
590 629 while True:
591 630 dataOut = self.receiver.recv_pyobj()
592 631 if not dataOut.flagNoData:
593 632 if dataOut.type == 'Parameters':
594 633 tm = dataOut.utctimeInit
595 634 else:
596 635 tm = dataOut.utctime
597 636 if dataOut.useLocalTime:
598 637 if not self.localtime:
599 638 tm += time.timezone
600 639 dt = datetime.datetime.fromtimestamp(tm).date()
601 640 else:
602 641 if self.localtime:
603 642 tm -= time.timezone
604 643 dt = datetime.datetime.utcfromtimestamp(tm).date()
605 644 coerce = False
606 645 if dt not in self.dates:
607 646 if self.data:
608 647 self.data.ended = True
609 648 self.send(self.data)
610 649 coerce = True
611 650 self.data.setup()
612 651 self.dates.append(dt)
613 652
614 653 self.data.update(dataOut, tm)
615 654
616 655 if dataOut.finished is True:
617 656 self.connections -= 1
618 657 if self.connections == 0 and dt in self.dates:
619 658 self.data.ended = True
620 659 self.send(self.data)
621 660 self.data.setup()
622 661 else:
623 662 if self.realtime:
624 663 self.send(self.data)
625 664 if self.web_address:
626 self.sender_web.send(self.data.jsonify())
665 payload = self.data.jsonify()
666 log.log('Sending to web... type:{}, size:{}'.format(dataOut.type, len(payload)), self.name)
667 self.sender_web.send(payload)
627 668 else:
628 669 self.sendData(self.send, self.data, coerce=coerce)
629 670 coerce = False
630 671
631 672 return
673
674
675 class SendToFTP(Operation, Process):
676
677 '''
678 Operation to send data over FTP.
679 '''
680
681 __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout']
682
683 def __init__(self, **kwargs):
684 '''
685 patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...]
686 '''
687 Operation.__init__(self, **kwargs)
688 Process.__init__(self)
689 self.server = kwargs.get('server')
690 self.username = kwargs.get('username')
691 self.password = kwargs.get('password')
692 self.patterns = kwargs.get('patterns')
693 self.timeout = kwargs.get('timeout', 10)
694 self.times = [time.time() for p in self.patterns]
695 self.latest = ['' for p in self.patterns]
696 self.mp = False
697 self.ftp = None
698
699 def setup(self):
700
701 log.log('Connecting to ftp://{}'.format(self.server), self.name)
702 try:
703 self.ftp = ftplib.FTP(self.server, timeout=self.timeout)
704 except ftplib.all_errors:
705 log.error('Server connection fail: {}'.format(self.server), self.name)
706 if self.ftp is not None:
707 self.ftp.close()
708 self.ftp = None
709 self.isConfig = False
710 return
711
712 try:
713 self.ftp.login(self.username, self.password)
714 except ftplib.all_errors:
715 log.error('The given username y/o password are incorrect', self.name)
716 if self.ftp is not None:
717 self.ftp.close()
718 self.ftp = None
719 self.isConfig = False
720 return
721
722 log.success('Connection success', self.name)
723 self.isConfig = True
724 return
725
726 def check(self):
727
728 try:
729 self.ftp.voidcmd("NOOP")
730 except:
731 log.warning('Connection lost... trying to reconnect', self.name)
732 if self.ftp is not None:
733 self.ftp.close()
734 self.ftp = None
735 self.setup()
736
737 def find_files(self, path, ext):
738
739 files = glob.glob1(path, '*{}'.format(ext))
740 files.sort()
741 if files:
742 return files[-1]
743 return None
744
745 def getftpname(self, filename, exp_code, sub_exp_code):
746
747 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
748 YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year
749 DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday
750 exp_code = '%3.3d'%exp_code
751 sub_exp_code = '%2.2d'%sub_exp_code
752 plot_code = '%2.2d'% PLOT_CODES[filename.split('_')[0].split('-')[0]]
753 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
754 return name
755
756 def upload(self, src, dst):
757
758 log.log('Uploading {} '.format(src), self.name, nl=False)
759
760 fp = open(src, 'rb')
761 command = 'STOR {}'.format(dst)
762
763 try:
764 self.ftp.storbinary(command, fp, blocksize=1024)
765 except ftplib.all_errors, e:
766 log.error('{}'.format(e), self.name)
767 if self.ftp is not None:
768 self.ftp.close()
769 self.ftp = None
770 return
771
772 try:
773 self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
774 except ftplib.all_errors, e:
775 log.error('{}'.format(e), self.name)
776 if self.ftp is not None:
777 self.ftp.close()
778 self.ftp = None
779
780 fp.close()
781
782 log.success('OK', tag='')
783
784 def send_files(self):
785
786 for x, pattern in enumerate(self.patterns):
787 local, remote, ext, delay, exp_code, sub_exp_code = pattern
788 if time.time()-self.times[x] >= delay:
789 srcname = self.find_files(local, ext)
790
791 if srcname is None or srcname == self.latest[x]:
792 continue
793
794 if 'png' in ext:
795 dstname = self.getftpname(srcname, exp_code, sub_exp_code)
796 else:
797 dstname = srcname
798
799 src = os.path.join(local, srcname)
800
801 if os.path.getmtime(src) < time.time() - 30*60:
802 continue
803
804 dst = os.path.join(remote, dstname)
805
806 if self.ftp is None:
807 continue
808
809 self.upload(src, dst)
810
811 self.times[x] = time.time()
812 self.latest[x] = srcname
813
814 def run(self):
815
816 while True:
817 if not self.isConfig:
818 self.setup()
819 if self.ftp is not None:
820 self.check()
821 self.send_files()
822 time.sleep(2)
823
824 def close():
825
826 if self.ftp is not None:
827 if self.ftp is not None:
828 self.ftp.close()
829 self.terminate()
General Comments 0
You need to be logged in to leave comments. Login now