##// END OF EJS Templates
Fix SendByFTP and add plot codes
jespinoza -
r1142:f05281601d69
parent child
Show More
@@ -1,832 +1,835
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import os
6 6 import glob
7 7 import time
8 8 import json
9 9 import numpy
10 10 import paho.mqtt.client as mqtt
11 11 import zmq
12 12 import datetime
13 13 import ftplib
14 14 from zmq.utils.monitor import recv_monitor_message
15 15 from functools import wraps
16 16 from threading import Thread
17 17 from multiprocessing import Process
18 18
19 19 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
20 20 from schainpy.model.data.jrodata import JROData
21 21 from schainpy.utils import log
22 22
23 23 MAXNUMX = 500
24 24 MAXNUMY = 500
25 25
26 26 PLOT_CODES = {
27 27 'rti': 0, # Range time intensity (RTI).
28 28 'spc': 1, # Spectra (and Cross-spectra) information.
29 29 'cspc': 2, # Cross-Correlation information.
30 30 'coh': 3, # Coherence map.
31 31 'base': 4, # Base lines graphic.
32 32 'row': 5, # Row Spectra.
33 'total' : 6, # Total Power.
34 'drift' : 7, # Drifts graphics.
35 'height' : 8, # Height profile.
36 'phase' : 9, # Signal Phase.
37 'power' : 16,
38 'noise' : 17,
39 'beacon' : 18,
40 'wind' : 22,
41 'skymap' : 23,
42 'V-E' : 25,
43 'Z-E' : 26,
44 'V-A' : 27,
45 'Z-A' : 28,
33 'total': 6, # Total Power.
34 'drift': 7, # Drifts graphics.
35 'height': 8, # Height profile.
36 'phase': 9, # Signal Phase.
37 'power': 16,
38 'noise': 17,
39 'beacon': 18,
40 'wind': 22,
41 'skymap': 23,
42 'Unknown': 24,
43 'V-E': 25, # PIP Velocity.
44 'Z-E': 26, # PIP Reflectivity.
45 'V-A': 27, # RHI Velocity.
46 'Z-A': 28, # RHI Reflectivity.
46 47 }
47 48
48 class PrettyFloat(float):
49 def __repr__(self):
50 return '%.2f' % self
49 def get_plot_code(s):
50 label = s.split('_')[0]
51 codes = [key for key in PLOT_CODES if key in label]
52 if codes:
53 return PLOT_CODES[codes[0]]
54 else:
55 return 24
51 56
52 57 def roundFloats(obj):
53 58 if isinstance(obj, list):
54 59 return map(roundFloats, obj)
55 60 elif isinstance(obj, float):
56 61 return round(obj, 2)
57 62
58 63 def decimate(z, MAXNUMY):
59 64 dy = int(len(z[0])/MAXNUMY) + 1
60 65
61 66 return z[::, ::dy]
62 67
63 68 class throttle(object):
64 69 '''
65 70 Decorator that prevents a function from being called more than once every
66 71 time period.
67 72 To create a function that cannot be called more than once a minute, but
68 73 will sleep until it can be called:
69 74 @throttle(minutes=1)
70 75 def foo():
71 76 pass
72 77
73 78 for i in range(10):
74 79 foo()
75 80 print "This function has run %s times." % i
76 81 '''
77 82
78 83 def __init__(self, seconds=0, minutes=0, hours=0):
79 84 self.throttle_period = datetime.timedelta(
80 85 seconds=seconds, minutes=minutes, hours=hours
81 86 )
82 87
83 88 self.time_of_last_call = datetime.datetime.min
84 89
85 90 def __call__(self, fn):
86 91 @wraps(fn)
87 92 def wrapper(*args, **kwargs):
88 93 coerce = kwargs.pop('coerce', None)
89 94 if coerce:
90 95 self.time_of_last_call = datetime.datetime.now()
91 96 return fn(*args, **kwargs)
92 97 else:
93 98 now = datetime.datetime.now()
94 99 time_since_last_call = now - self.time_of_last_call
95 100 time_left = self.throttle_period - time_since_last_call
96 101
97 102 if time_left > datetime.timedelta(seconds=0):
98 103 return
99 104
100 105 self.time_of_last_call = datetime.datetime.now()
101 106 return fn(*args, **kwargs)
102 107
103 108 return wrapper
104 109
105 110 class Data(object):
106 111 '''
107 112 Object to hold data to be plotted
108 113 '''
109 114
110 115 def __init__(self, plottypes, throttle_value, exp_code, buffering=True):
111 116 self.plottypes = plottypes
112 117 self.throttle = throttle_value
113 118 self.exp_code = exp_code
114 119 self.buffering = buffering
115 120 self.ended = False
116 121 self.localtime = False
117 122 self.meta = {}
118 123 self.__times = []
119 124 self.__heights = []
120 125
121 126 def __str__(self):
122 127 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
123 128 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
124 129
125 130 def __len__(self):
126 131 return len(self.__times)
127 132
128 133 def __getitem__(self, key):
129 134 if key not in self.data:
130 135 raise KeyError(log.error('Missing key: {}'.format(key)))
131 136
132 137 if 'spc' in key or not self.buffering:
133 138 ret = self.data[key]
134 139 else:
135 140 ret = numpy.array([self.data[key][x] for x in self.times])
136 141 if ret.ndim > 1:
137 142 ret = numpy.swapaxes(ret, 0, 1)
138 143 return ret
139 144
140 145 def __contains__(self, key):
141 146 return key in self.data
142 147
143 148 def setup(self):
144 149 '''
145 150 Configure object
146 151 '''
147 152
148 153 self.type = ''
149 154 self.ended = False
150 155 self.data = {}
151 156 self.__times = []
152 157 self.__heights = []
153 158 self.__all_heights = set()
154 159 for plot in self.plottypes:
155 160 if 'snr' in plot:
156 161 plot = 'snr'
157 162 self.data[plot] = {}
158 163
159 164 def shape(self, key):
160 165 '''
161 166 Get the shape of the one-element data for the given key
162 167 '''
163 168
164 169 if len(self.data[key]):
165 170 if 'spc' in key or not self.buffering:
166 171 return self.data[key].shape
167 172 return self.data[key][self.__times[0]].shape
168 173 return (0,)
169 174
170 175 def update(self, dataOut, tm):
171 176 '''
172 177 Update data object with new dataOut
173 178 '''
174 179
175 180 if tm in self.__times:
176 181 return
177 182
178 183 self.type = dataOut.type
179 184 self.parameters = getattr(dataOut, 'parameters', [])
180 185 if hasattr(dataOut, 'pairsList'):
181 186 self.pairs = dataOut.pairsList
182 187 if hasattr(dataOut, 'meta'):
183 188 self.meta = dataOut.meta
184 189 self.channels = dataOut.channelList
185 190 self.interval = dataOut.getTimeInterval()
186 191 self.localtime = dataOut.useLocalTime
187 192 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
188 193 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
189 194 self.__heights.append(dataOut.heightList)
190 195 self.__all_heights.update(dataOut.heightList)
191 196 self.__times.append(tm)
192 197
193 198 for plot in self.plottypes:
194 199 if plot == 'spc':
195 200 z = dataOut.data_spc/dataOut.normFactor
196 201 self.data[plot] = 10*numpy.log10(z)
197 202 if plot == 'cspc':
198 203 self.data[plot] = dataOut.data_cspc
199 204 if plot == 'noise':
200 205 buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
201 206 if plot == 'rti':
202 207 buffer = dataOut.getPower()
203 208 if plot == 'snr_db':
204 209 self.data['snr'][tm] = dataOut.data_SNR
205 210 if plot == 'snr':
206 211 buffer = 10*numpy.log10(dataOut.data_SNR)
207 212 if plot == 'dop':
208 213 buffer = 10*numpy.log10(dataOut.data_DOP)
209 214 if plot == 'mean':
210 215 buffer = dataOut.data_MEAN
211 216 if plot == 'std':
212 217 buffer = dataOut.data_STD
213 218 if plot == 'coh':
214 219 buffer = dataOut.getCoherence()
215 220 if plot == 'phase':
216 221 buffer = dataOut.getCoherence(phase=True)
217 222 if plot == 'output':
218 223 buffer = dataOut.data_output
219 224 if plot == 'param':
220 225 buffer = dataOut.data_param
221 226
222 227 if self.buffering:
223 228 self.data[plot][tm] = buffer
224 229 else:
225 230 self.data[plot] = buffer
226 231
227 232 def normalize_heights(self):
228 233 '''
229 234 Ensure same-dimension of the data for different heighList
230 235 '''
231 236
232 237 H = numpy.array(list(self.__all_heights))
233 238 H.sort()
234 239 for key in self.data:
235 240 shape = self.shape(key)[:-1] + H.shape
236 241 for tm, obj in self.data[key].items():
237 242 h = self.__heights[self.__times.index(tm)]
238 243 if H.size == h.size:
239 244 continue
240 245 index = numpy.where(numpy.in1d(H, h))[0]
241 246 dummy = numpy.zeros(shape) + numpy.nan
242 247 if len(shape) == 2:
243 248 dummy[:, index] = obj
244 249 else:
245 250 dummy[index] = obj
246 251 self.data[key][tm] = dummy
247 252
248 253 self.__heights = [H for tm in self.__times]
249 254
250 255 def jsonify(self, decimate=False):
251 256 '''
252 257 Convert data to json
253 258 '''
254 259
255 260 data = {}
256 261 tm = self.times[-1]
257 262
258 263 for key in self.data:
259 264 if key in ('spc', 'cspc') or not self.buffering:
260 265 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
261 266 dy = int(self.data[key].shape[2]/MAXNUMY) + 1
262 267 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
263 268 else:
264 269 data[key] = roundFloats(self.data[key][tm].tolist())
265 270
266 271 ret = {'data': data}
267 272 ret['exp_code'] = self.exp_code
268 273 ret['time'] = tm
269 274 ret['interval'] = self.interval
270 275 ret['localtime'] = self.localtime
271 276 ret['yrange'] = roundFloats(self.heights.tolist())
272 277 if key in ('spc', 'cspc'):
273 278 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
274 279 else:
275 280 ret['xrange'] = []
276 281 if hasattr(self, 'pairs'):
277 282 ret['pairs'] = self.pairs
278 283
279 284 for key, value in self.meta:
280 285 ret[key] = value
281 286
282 287 return json.dumps(ret)
283 288
284 289 @property
285 290 def times(self):
286 291 '''
287 292 Return the list of times of the current data
288 293 '''
289 294
290 295 ret = numpy.array(self.__times)
291 296 ret.sort()
292 297 return ret
293 298
294 299 @property
295 300 def heights(self):
296 301 '''
297 302 Return the list of heights of the current data
298 303 '''
299 304
300 305 return numpy.array(self.__heights[-1])
301 306
302 307 class PublishData(Operation):
303 308 '''
304 309 Operation to send data over zmq.
305 310 '''
306 311
307 312 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
308 313
309 314 def __init__(self, **kwargs):
310 315 """Inicio."""
311 316 Operation.__init__(self, **kwargs)
312 317 self.isConfig = False
313 318 self.client = None
314 319 self.zeromq = None
315 320 self.mqtt = None
316 321
317 322 def on_disconnect(self, client, userdata, rc):
318 323 if rc != 0:
319 324 log.warning('Unexpected disconnection.')
320 325 self.connect()
321 326
322 327 def connect(self):
323 328 log.warning('trying to connect')
324 329 try:
325 330 self.client.connect(
326 331 host=self.host,
327 332 port=self.port,
328 333 keepalive=60*10,
329 334 bind_address='')
330 335 self.client.loop_start()
331 336 # self.client.publish(
332 337 # self.topic + 'SETUP',
333 338 # json.dumps(setup),
334 339 # retain=True
335 340 # )
336 341 except:
337 342 log.error('MQTT Conection error.')
338 343 self.client = False
339 344
340 345 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
341 346 self.counter = 0
342 347 self.topic = kwargs.get('topic', 'schain')
343 348 self.delay = kwargs.get('delay', 0)
344 349 self.plottype = kwargs.get('plottype', 'spectra')
345 350 self.host = kwargs.get('host', "10.10.10.82")
346 351 self.port = kwargs.get('port', 3000)
347 352 self.clientId = clientId
348 353 self.cnt = 0
349 354 self.zeromq = zeromq
350 355 self.mqtt = kwargs.get('plottype', 0)
351 356 self.client = None
352 357 self.verbose = verbose
353 358 setup = []
354 359 if mqtt is 1:
355 360 self.client = mqtt.Client(
356 361 client_id=self.clientId + self.topic + 'SCHAIN',
357 362 clean_session=True)
358 363 self.client.on_disconnect = self.on_disconnect
359 364 self.connect()
360 365 for plot in self.plottype:
361 366 setup.append({
362 367 'plot': plot,
363 368 'topic': self.topic + plot,
364 369 'title': getattr(self, plot + '_' + 'title', False),
365 370 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
366 371 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
367 372 'xrange': getattr(self, plot + '_' + 'xrange', False),
368 373 'yrange': getattr(self, plot + '_' + 'yrange', False),
369 374 'zrange': getattr(self, plot + '_' + 'zrange', False),
370 375 })
371 376 if zeromq is 1:
372 377 context = zmq.Context()
373 378 self.zmq_socket = context.socket(zmq.PUSH)
374 379 server = kwargs.get('server', 'zmq.pipe')
375 380
376 381 if 'tcp://' in server:
377 382 address = server
378 383 else:
379 384 address = 'ipc:///tmp/%s' % server
380 385
381 386 self.zmq_socket.connect(address)
382 387 time.sleep(1)
383 388
384 389
385 390 def publish_data(self):
386 391 self.dataOut.finished = False
387 392 if self.mqtt is 1:
388 393 yData = self.dataOut.heightList[:2].tolist()
389 394 if self.plottype == 'spectra':
390 395 data = getattr(self.dataOut, 'data_spc')
391 396 z = data/self.dataOut.normFactor
392 397 zdB = 10*numpy.log10(z)
393 398 xlen, ylen = zdB[0].shape
394 399 dx = int(xlen/MAXNUMX) + 1
395 400 dy = int(ylen/MAXNUMY) + 1
396 401 Z = [0 for i in self.dataOut.channelList]
397 402 for i in self.dataOut.channelList:
398 403 Z[i] = zdB[i][::dx, ::dy].tolist()
399 404 payload = {
400 405 'timestamp': self.dataOut.utctime,
401 406 'data': roundFloats(Z),
402 407 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
403 408 'interval': self.dataOut.getTimeInterval(),
404 409 'type': self.plottype,
405 410 'yData': yData
406 411 }
407 412
408 413 elif self.plottype in ('rti', 'power'):
409 414 data = getattr(self.dataOut, 'data_spc')
410 415 z = data/self.dataOut.normFactor
411 416 avg = numpy.average(z, axis=1)
412 417 avgdB = 10*numpy.log10(avg)
413 418 xlen, ylen = z[0].shape
414 419 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
415 420 AVG = [0 for i in self.dataOut.channelList]
416 421 for i in self.dataOut.channelList:
417 422 AVG[i] = avgdB[i][::dy].tolist()
418 423 payload = {
419 424 'timestamp': self.dataOut.utctime,
420 425 'data': roundFloats(AVG),
421 426 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
422 427 'interval': self.dataOut.getTimeInterval(),
423 428 'type': self.plottype,
424 429 'yData': yData
425 430 }
426 431 elif self.plottype == 'noise':
427 432 noise = self.dataOut.getNoise()/self.dataOut.normFactor
428 433 noisedB = 10*numpy.log10(noise)
429 434 payload = {
430 435 'timestamp': self.dataOut.utctime,
431 436 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
432 437 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
433 438 'interval': self.dataOut.getTimeInterval(),
434 439 'type': self.plottype,
435 440 'yData': yData
436 441 }
437 442 elif self.plottype == 'snr':
438 443 data = getattr(self.dataOut, 'data_SNR')
439 444 avgdB = 10*numpy.log10(data)
440 445
441 446 ylen = data[0].size
442 447 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
443 448 AVG = [0 for i in self.dataOut.channelList]
444 449 for i in self.dataOut.channelList:
445 450 AVG[i] = avgdB[i][::dy].tolist()
446 451 payload = {
447 452 'timestamp': self.dataOut.utctime,
448 453 'data': roundFloats(AVG),
449 454 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
450 455 'type': self.plottype,
451 456 'yData': yData
452 457 }
453 458 else:
454 459 print "Tipo de grafico invalido"
455 460 payload = {
456 461 'data': 'None',
457 462 'timestamp': 'None',
458 463 'type': None
459 464 }
460 465
461 466 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
462 467
463 468 if self.zeromq is 1:
464 469 if self.verbose:
465 470 log.log(
466 471 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
467 472 self.name
468 473 )
469 474 self.zmq_socket.send_pyobj(self.dataOut)
470 475
471 476 def run(self, dataOut, **kwargs):
472 477 self.dataOut = dataOut
473 478 if not self.isConfig:
474 479 self.setup(**kwargs)
475 480 self.isConfig = True
476 481
477 482 self.publish_data()
478 483 time.sleep(self.delay)
479 484
480 485 def close(self):
481 486 if self.zeromq is 1:
482 487 self.dataOut.finished = True
483 488 self.zmq_socket.send_pyobj(self.dataOut)
484 489 time.sleep(0.1)
485 490 self.zmq_socket.close()
486 491 if self.client:
487 492 self.client.loop_stop()
488 493 self.client.disconnect()
489 494
490 495
491 496 class ReceiverData(ProcessingUnit):
492 497
493 498 __attrs__ = ['server']
494 499
495 500 def __init__(self, **kwargs):
496 501
497 502 ProcessingUnit.__init__(self, **kwargs)
498 503
499 504 self.isConfig = False
500 505 server = kwargs.get('server', 'zmq.pipe')
501 506 if 'tcp://' in server:
502 507 address = server
503 508 else:
504 509 address = 'ipc:///tmp/%s' % server
505 510
506 511 self.address = address
507 512 self.dataOut = JROData()
508 513
509 514 def setup(self):
510 515
511 516 self.context = zmq.Context()
512 517 self.receiver = self.context.socket(zmq.PULL)
513 518 self.receiver.bind(self.address)
514 519 time.sleep(0.5)
515 520 log.success('ReceiverData from {}'.format(self.address))
516 521
517 522
518 523 def run(self):
519 524
520 525 if not self.isConfig:
521 526 self.setup()
522 527 self.isConfig = True
523 528
524 529 self.dataOut = self.receiver.recv_pyobj()
525 530 log.log('{} - {}'.format(self.dataOut.type,
526 531 self.dataOut.datatime.ctime(),),
527 532 'Receiving')
528 533
529 534
530 535 class PlotterReceiver(ProcessingUnit, Process):
531 536
532 537 throttle_value = 5
533 538 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
534 539 'exp_code', 'web_server', 'buffering']
535 540
536 541 def __init__(self, **kwargs):
537 542
538 543 ProcessingUnit.__init__(self, **kwargs)
539 544 Process.__init__(self)
540 545 self.mp = False
541 546 self.isConfig = False
542 547 self.isWebConfig = False
543 548 self.connections = 0
544 549 server = kwargs.get('server', 'zmq.pipe')
545 550 web_server = kwargs.get('web_server', None)
546 551 if 'tcp://' in server:
547 552 address = server
548 553 else:
549 554 address = 'ipc:///tmp/%s' % server
550 555 self.address = address
551 556 self.web_address = web_server
552 557 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
553 558 self.realtime = kwargs.get('realtime', False)
554 559 self.localtime = kwargs.get('localtime', True)
555 560 self.buffering = kwargs.get('buffering', True)
556 561 self.throttle_value = kwargs.get('throttle', 5)
557 562 self.exp_code = kwargs.get('exp_code', None)
558 563 self.sendData = self.initThrottle(self.throttle_value)
559 564 self.dates = []
560 565 self.setup()
561 566
562 567 def setup(self):
563 568
564 569 self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering)
565 570 self.isConfig = True
566 571
567 572 def event_monitor(self, monitor):
568 573
569 574 events = {}
570 575
571 576 for name in dir(zmq):
572 577 if name.startswith('EVENT_'):
573 578 value = getattr(zmq, name)
574 579 events[value] = name
575 580
576 581 while monitor.poll():
577 582 evt = recv_monitor_message(monitor)
578 583 if evt['event'] == 32:
579 584 self.connections += 1
580 585 if evt['event'] == 512:
581 586 pass
582 587
583 588 evt.update({'description': events[evt['event']]})
584 589
585 590 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
586 591 break
587 592 monitor.close()
588 593 print('event monitor thread done!')
589 594
590 595 def initThrottle(self, throttle_value):
591 596
592 597 @throttle(seconds=throttle_value)
593 598 def sendDataThrottled(fn_sender, data):
594 599 fn_sender(data)
595 600
596 601 return sendDataThrottled
597 602
598 603 def send(self, data):
599 604 log.log('Sending {}'.format(data), self.name)
600 605 self.sender.send_pyobj(data)
601 606
602 607 def run(self):
603 608
604 609 log.log(
605 610 'Starting from {}'.format(self.address),
606 611 self.name
607 612 )
608 613
609 614 self.context = zmq.Context()
610 615 self.receiver = self.context.socket(zmq.PULL)
611 616 self.receiver.bind(self.address)
612 617 monitor = self.receiver.get_monitor_socket()
613 618 self.sender = self.context.socket(zmq.PUB)
614 619 if self.web_address:
615 620 log.success(
616 621 'Sending to web: {}'.format(self.web_address),
617 622 self.name
618 623 )
619 624 self.sender_web = self.context.socket(zmq.PUSH)
620 625 self.sender_web.connect(self.web_address)
621 626 time.sleep(1)
622 627
623 628 if 'server' in self.kwargs:
624 629 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
625 630 else:
626 631 self.sender.bind("ipc:///tmp/zmq.plots")
627 632
628 633 time.sleep(2)
629 634
630 635 t = Thread(target=self.event_monitor, args=(monitor,))
631 636 t.start()
632 637
633 638 while True:
634 639 dataOut = self.receiver.recv_pyobj()
635 640 if not dataOut.flagNoData:
636 641 if dataOut.type == 'Parameters':
637 642 tm = dataOut.utctimeInit
638 643 else:
639 644 tm = dataOut.utctime
640 645 if dataOut.useLocalTime:
641 646 if not self.localtime:
642 647 tm += time.timezone
643 648 dt = datetime.datetime.fromtimestamp(tm).date()
644 649 else:
645 650 if self.localtime:
646 651 tm -= time.timezone
647 652 dt = datetime.datetime.utcfromtimestamp(tm).date()
648 653 coerce = False
649 654 if dt not in self.dates:
650 655 if self.data:
651 656 self.data.ended = True
652 657 self.send(self.data)
653 658 coerce = True
654 659 self.data.setup()
655 660 self.dates.append(dt)
656 661
657 662 self.data.update(dataOut, tm)
658 663
659 664 if dataOut.finished is True:
660 665 self.connections -= 1
661 666 if self.connections == 0 and dt in self.dates:
662 667 self.data.ended = True
663 668 self.send(self.data)
664 669 self.data.setup()
665 670 else:
666 671 if self.realtime:
667 672 self.send(self.data)
668 673 if self.web_address:
669 674 payload = self.data.jsonify()
670 675 log.log('Sending to web... type:{}, size:{}'.format(dataOut.type, len(payload)), self.name)
671 676 self.sender_web.send(payload)
672 677 else:
673 678 self.sendData(self.send, self.data, coerce=coerce)
674 679 coerce = False
675 680
676 681 return
677 682
678 683
679 684 class SendToFTP(Operation, Process):
680 685
681 686 '''
682 687 Operation to send data over FTP.
683 688 '''
684 689
685 690 __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout']
686 691
687 692 def __init__(self, **kwargs):
688 693 '''
689 694 patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...]
690 695 '''
691 696 Operation.__init__(self, **kwargs)
692 697 Process.__init__(self)
693 698 self.server = kwargs.get('server')
694 699 self.username = kwargs.get('username')
695 700 self.password = kwargs.get('password')
696 701 self.patterns = kwargs.get('patterns')
697 702 self.timeout = kwargs.get('timeout', 30)
698 703 self.times = [time.time() for p in self.patterns]
699 704 self.latest = ['' for p in self.patterns]
700 705 self.mp = False
701 706 self.ftp = None
702 707
703 708 def setup(self):
704 709
705 710 log.log('Connecting to ftp://{}'.format(self.server), self.name)
706 711 try:
707 712 self.ftp = ftplib.FTP(self.server, timeout=self.timeout)
708 713 except ftplib.all_errors:
709 714 log.error('Server connection fail: {}'.format(self.server), self.name)
710 715 if self.ftp is not None:
711 716 self.ftp.close()
712 717 self.ftp = None
713 718 self.isConfig = False
714 719 return
715 720
716 721 try:
717 722 self.ftp.login(self.username, self.password)
718 723 except ftplib.all_errors:
719 724 log.error('The given username y/o password are incorrect', self.name)
720 725 if self.ftp is not None:
721 726 self.ftp.close()
722 727 self.ftp = None
723 728 self.isConfig = False
724 729 return
725 730
726 731 log.success('Connection success', self.name)
727 732 self.isConfig = True
728 733 return
729 734
730 735 def check(self):
731 736
732 737 try:
733 738 self.ftp.voidcmd("NOOP")
734 739 except:
735 740 log.warning('Connection lost... trying to reconnect', self.name)
736 741 if self.ftp is not None:
737 742 self.ftp.close()
738 743 self.ftp = None
739 744 self.setup()
740 745
741 746 def find_files(self, path, ext):
742 747
743 748 files = glob.glob1(path, '*{}'.format(ext))
744 749 files.sort()
745 750 if files:
746 751 return files[-1]
747 752 return None
748 753
749 754 def getftpname(self, filename, exp_code, sub_exp_code):
750 755
751 756 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
752 757 YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year
753 758 DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday
754 759 exp_code = '%3.3d'%exp_code
755 760 sub_exp_code = '%2.2d'%sub_exp_code
756 plot_code = '%2.2d'% PLOT_CODES[filename.split('_')[0].split('-')[1]]
761 plot_code = '%2.2d'% get_plot_code(filename)
757 762 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
758 763 return name
759 764
760 765 def upload(self, src, dst):
761 766
762 767 log.log('Uploading {} '.format(src), self.name, nl=False)
763 768
764 769 fp = open(src, 'rb')
765 770 command = 'STOR {}'.format(dst)
766 771
767 772 try:
768 773 self.ftp.storbinary(command, fp, blocksize=1024)
769 except ftplib.all_errors, e:
774 except Exception, e:
770 775 log.error('{}'.format(e), self.name)
771 776 if self.ftp is not None:
772 777 self.ftp.close()
773 778 self.ftp = None
774 return
779 return 0
775 780
776 781 try:
777 782 self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
778 except ftplib.all_errors, e:
783 except Exception, e:
779 784 log.error('{}'.format(e), self.name)
780 785 if self.ftp is not None:
781 786 self.ftp.close()
782 787 self.ftp = None
788 return 0
783 789
784 790 fp.close()
785
786 791 log.success('OK', tag='')
792 return 1
787 793
788 794 def send_files(self):
789 795
790 796 for x, pattern in enumerate(self.patterns):
791 797 local, remote, ext, delay, exp_code, sub_exp_code = pattern
792 798 if time.time()-self.times[x] >= delay:
793 srcname = self.find_files(local, ext)
794
799 srcname = self.find_files(local, ext)
800 src = os.path.join(local, srcname)
801 if os.path.getmtime(src) < time.time() - 30*60:
802 continue
803
795 804 if srcname is None or srcname == self.latest[x]:
796 805 continue
797 806
798 807 if 'png' in ext:
799 808 dstname = self.getftpname(srcname, exp_code, sub_exp_code)
800 809 else:
801 dstname = srcname
802
803 src = os.path.join(local, srcname)
804
805 if os.path.getmtime(src) < time.time() - 30*60:
806 continue
810 dstname = srcname
807 811
808 812 dst = os.path.join(remote, dstname)
809 813
810 if self.ftp is None:
811 continue
812
813 self.upload(src, dst)
814
815 self.times[x] = time.time()
816 self.latest[x] = srcname
814 if self.upload(src, dst):
815 self.times[x] = time.time()
816 self.latest[x] = srcname
817 else:
818 self.isConfig = False
819 break
817 820
818 821 def run(self):
819 822
820 823 while True:
821 824 if not self.isConfig:
822 825 self.setup()
823 826 if self.ftp is not None:
824 827 self.check()
825 self.send_files()
828 self.send_files()
826 829 time.sleep(10)
827 830
828 831 def close():
829 832
830 833 if self.ftp is not None:
831 834 self.ftp.close()
832 835 self.terminate()
General Comments 0
You need to be logged in to leave comments. Login now