##// END OF EJS Templates
Update plot codes, add meta attribute to dataOut to send metadata to plots
jespinoza -
r1139:ade57c0ecace
parent child
Show More
@@ -1,829 +1,832
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import os
6 6 import glob
7 7 import time
8 8 import json
9 9 import numpy
10 10 import paho.mqtt.client as mqtt
11 11 import zmq
12 12 import datetime
13 13 import ftplib
14 14 from zmq.utils.monitor import recv_monitor_message
15 15 from functools import wraps
16 16 from threading import Thread
17 17 from multiprocessing import Process
18 18
19 19 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
20 20 from schainpy.model.data.jrodata import JROData
21 21 from schainpy.utils import log
22 22
23 23 MAXNUMX = 500
24 24 MAXNUMY = 500
25 25
26 26 PLOT_CODES = {
27 'rti': 0, #Range time intensity (RTI).
28 'spc': 1, #Spectra (and Cross-spectra) information.
29 'cspc': 2, #Cross-Correlation information.
30 'coh': 3, #Coherence map.
31 'base': 4, #Base lines graphic.
32 'row': 5, #Row Spectra.
33 'total' : 6, #Total Power.
34 'drift' : 7, #Drifts graphics.
35 'height' : 8, #Height profile.
36 'phase' : 9, #Signal Phase.
27 'rti': 0, # Range time intensity (RTI).
28 'spc': 1, # Spectra (and Cross-spectra) information.
29 'cspc': 2, # Cross-Correlation information.
30 'coh': 3, # Coherence map.
31 'base': 4, # Base lines graphic.
32 'row': 5, # Row Spectra.
33 'total' : 6, # Total Power.
34 'drift' : 7, # Drifts graphics.
35 'height' : 8, # Height profile.
36 'phase' : 9, # Signal Phase.
37 37 'power' : 16,
38 38 'noise' : 17,
39 39 'beacon' : 18,
40 #USED IN jroplot_parameters.py
41 40 'wind' : 22,
42 41 'skymap' : 23,
43 # 'MPHASE_CODE' : 24,
44 'V' : 25,
45 'Z' : 26,
46 'spc_fit' : 27,
47 'ew_drifts' : 28,
48 'reflectivity': 30
42 'V-E' : 25,
43 'Z-E' : 26,
44 'V-A' : 27,
45 'Z-A' : 28,
49 46 }
50 47
51 48 class PrettyFloat(float):
52 49 def __repr__(self):
53 50 return '%.2f' % self
54 51
55 52 def roundFloats(obj):
56 53 if isinstance(obj, list):
57 54 return map(roundFloats, obj)
58 55 elif isinstance(obj, float):
59 56 return round(obj, 2)
60 57
61 58 def decimate(z, MAXNUMY):
62 59 dy = int(len(z[0])/MAXNUMY) + 1
63 60
64 61 return z[::, ::dy]
65 62
66 63 class throttle(object):
67 64 '''
68 65 Decorator that prevents a function from being called more than once every
69 66 time period.
70 67 To create a function that cannot be called more than once a minute, but
71 68 will sleep until it can be called:
72 69 @throttle(minutes=1)
73 70 def foo():
74 71 pass
75 72
76 73 for i in range(10):
77 74 foo()
78 75 print "This function has run %s times." % i
79 76 '''
80 77
81 78 def __init__(self, seconds=0, minutes=0, hours=0):
82 79 self.throttle_period = datetime.timedelta(
83 80 seconds=seconds, minutes=minutes, hours=hours
84 81 )
85 82
86 83 self.time_of_last_call = datetime.datetime.min
87 84
88 85 def __call__(self, fn):
89 86 @wraps(fn)
90 87 def wrapper(*args, **kwargs):
91 88 coerce = kwargs.pop('coerce', None)
92 89 if coerce:
93 90 self.time_of_last_call = datetime.datetime.now()
94 91 return fn(*args, **kwargs)
95 92 else:
96 93 now = datetime.datetime.now()
97 94 time_since_last_call = now - self.time_of_last_call
98 95 time_left = self.throttle_period - time_since_last_call
99 96
100 97 if time_left > datetime.timedelta(seconds=0):
101 98 return
102 99
103 100 self.time_of_last_call = datetime.datetime.now()
104 101 return fn(*args, **kwargs)
105 102
106 103 return wrapper
107 104
108 105 class Data(object):
109 106 '''
110 107 Object to hold data to be plotted
111 108 '''
112 109
113 110 def __init__(self, plottypes, throttle_value, exp_code, buffering=True):
114 111 self.plottypes = plottypes
115 112 self.throttle = throttle_value
116 113 self.exp_code = exp_code
117 114 self.buffering = buffering
118 115 self.ended = False
119 116 self.localtime = False
117 self.meta = {}
120 118 self.__times = []
121 119 self.__heights = []
122 120
123 121 def __str__(self):
124 122 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
125 123 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
126 124
127 125 def __len__(self):
128 126 return len(self.__times)
129 127
130 128 def __getitem__(self, key):
131 129 if key not in self.data:
132 130 raise KeyError(log.error('Missing key: {}'.format(key)))
133 131
134 132 if 'spc' in key or not self.buffering:
135 133 ret = self.data[key]
136 134 else:
137 135 ret = numpy.array([self.data[key][x] for x in self.times])
138 136 if ret.ndim > 1:
139 137 ret = numpy.swapaxes(ret, 0, 1)
140 138 return ret
141 139
142 140 def __contains__(self, key):
143 141 return key in self.data
144 142
145 143 def setup(self):
146 144 '''
147 145 Configure object
148 146 '''
149 147
150 148 self.type = ''
151 149 self.ended = False
152 150 self.data = {}
153 151 self.__times = []
154 152 self.__heights = []
155 153 self.__all_heights = set()
156 154 for plot in self.plottypes:
157 155 if 'snr' in plot:
158 156 plot = 'snr'
159 157 self.data[plot] = {}
160 158
161 159 def shape(self, key):
162 160 '''
163 161 Get the shape of the one-element data for the given key
164 162 '''
165 163
166 164 if len(self.data[key]):
167 165 if 'spc' in key or not self.buffering:
168 166 return self.data[key].shape
169 167 return self.data[key][self.__times[0]].shape
170 168 return (0,)
171 169
172 170 def update(self, dataOut, tm):
173 171 '''
174 172 Update data object with new dataOut
175 173 '''
176 174
177 175 if tm in self.__times:
178 176 return
179 177
180 178 self.type = dataOut.type
181 179 self.parameters = getattr(dataOut, 'parameters', [])
182 180 if hasattr(dataOut, 'pairsList'):
183 181 self.pairs = dataOut.pairsList
182 if hasattr(dataOut, 'meta'):
183 self.meta = dataOut.meta
184 184 self.channels = dataOut.channelList
185 185 self.interval = dataOut.getTimeInterval()
186 186 self.localtime = dataOut.useLocalTime
187 187 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
188 188 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
189 189 self.__heights.append(dataOut.heightList)
190 190 self.__all_heights.update(dataOut.heightList)
191 191 self.__times.append(tm)
192 192
193 193 for plot in self.plottypes:
194 194 if plot == 'spc':
195 195 z = dataOut.data_spc/dataOut.normFactor
196 196 self.data[plot] = 10*numpy.log10(z)
197 197 if plot == 'cspc':
198 198 self.data[plot] = dataOut.data_cspc
199 199 if plot == 'noise':
200 200 buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
201 201 if plot == 'rti':
202 202 buffer = dataOut.getPower()
203 203 if plot == 'snr_db':
204 204 self.data['snr'][tm] = dataOut.data_SNR
205 205 if plot == 'snr':
206 206 buffer = 10*numpy.log10(dataOut.data_SNR)
207 207 if plot == 'dop':
208 208 buffer = 10*numpy.log10(dataOut.data_DOP)
209 209 if plot == 'mean':
210 210 buffer = dataOut.data_MEAN
211 211 if plot == 'std':
212 212 buffer = dataOut.data_STD
213 213 if plot == 'coh':
214 214 buffer = dataOut.getCoherence()
215 215 if plot == 'phase':
216 216 buffer = dataOut.getCoherence(phase=True)
217 217 if plot == 'output':
218 218 buffer = dataOut.data_output
219 219 if plot == 'param':
220 220 buffer = dataOut.data_param
221 221
222 222 if self.buffering:
223 223 self.data[plot][tm] = buffer
224 224 else:
225 225 self.data[plot] = buffer
226 226
227 227 def normalize_heights(self):
228 228 '''
229 229 Ensure same-dimension of the data for different heighList
230 230 '''
231 231
232 232 H = numpy.array(list(self.__all_heights))
233 233 H.sort()
234 234 for key in self.data:
235 235 shape = self.shape(key)[:-1] + H.shape
236 236 for tm, obj in self.data[key].items():
237 237 h = self.__heights[self.__times.index(tm)]
238 238 if H.size == h.size:
239 239 continue
240 240 index = numpy.where(numpy.in1d(H, h))[0]
241 241 dummy = numpy.zeros(shape) + numpy.nan
242 242 if len(shape) == 2:
243 243 dummy[:, index] = obj
244 244 else:
245 245 dummy[index] = obj
246 246 self.data[key][tm] = dummy
247 247
248 248 self.__heights = [H for tm in self.__times]
249 249
250 250 def jsonify(self, decimate=False):
251 251 '''
252 252 Convert data to json
253 253 '''
254 254
255 255 data = {}
256 256 tm = self.times[-1]
257 257
258 258 for key in self.data:
259 259 if key in ('spc', 'cspc') or not self.buffering:
260 260 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
261 261 dy = int(self.data[key].shape[2]/MAXNUMY) + 1
262 262 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
263 263 else:
264 264 data[key] = roundFloats(self.data[key][tm].tolist())
265 265
266 266 ret = {'data': data}
267 267 ret['exp_code'] = self.exp_code
268 268 ret['time'] = tm
269 269 ret['interval'] = self.interval
270 270 ret['localtime'] = self.localtime
271 271 ret['yrange'] = roundFloats(self.heights.tolist())
272 272 if key in ('spc', 'cspc'):
273 273 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
274 274 else:
275 275 ret['xrange'] = []
276 276 if hasattr(self, 'pairs'):
277 277 ret['pairs'] = self.pairs
278
279 for key, value in self.meta:
280 ret[key] = value
281
278 282 return json.dumps(ret)
279 283
280 284 @property
281 285 def times(self):
282 286 '''
283 287 Return the list of times of the current data
284 288 '''
285 289
286 290 ret = numpy.array(self.__times)
287 291 ret.sort()
288 292 return ret
289 293
290 294 @property
291 295 def heights(self):
292 296 '''
293 297 Return the list of heights of the current data
294 298 '''
295 299
296 300 return numpy.array(self.__heights[-1])
297 301
298 302 class PublishData(Operation):
299 303 '''
300 304 Operation to send data over zmq.
301 305 '''
302 306
303 307 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
304 308
305 309 def __init__(self, **kwargs):
306 310 """Inicio."""
307 311 Operation.__init__(self, **kwargs)
308 312 self.isConfig = False
309 313 self.client = None
310 314 self.zeromq = None
311 315 self.mqtt = None
312 316
313 317 def on_disconnect(self, client, userdata, rc):
314 318 if rc != 0:
315 319 log.warning('Unexpected disconnection.')
316 320 self.connect()
317 321
318 322 def connect(self):
319 323 log.warning('trying to connect')
320 324 try:
321 325 self.client.connect(
322 326 host=self.host,
323 327 port=self.port,
324 328 keepalive=60*10,
325 329 bind_address='')
326 330 self.client.loop_start()
327 331 # self.client.publish(
328 332 # self.topic + 'SETUP',
329 333 # json.dumps(setup),
330 334 # retain=True
331 335 # )
332 336 except:
333 337 log.error('MQTT Conection error.')
334 338 self.client = False
335 339
336 340 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
337 341 self.counter = 0
338 342 self.topic = kwargs.get('topic', 'schain')
339 343 self.delay = kwargs.get('delay', 0)
340 344 self.plottype = kwargs.get('plottype', 'spectra')
341 345 self.host = kwargs.get('host', "10.10.10.82")
342 346 self.port = kwargs.get('port', 3000)
343 347 self.clientId = clientId
344 348 self.cnt = 0
345 349 self.zeromq = zeromq
346 350 self.mqtt = kwargs.get('plottype', 0)
347 351 self.client = None
348 352 self.verbose = verbose
349 353 setup = []
350 354 if mqtt is 1:
351 355 self.client = mqtt.Client(
352 356 client_id=self.clientId + self.topic + 'SCHAIN',
353 357 clean_session=True)
354 358 self.client.on_disconnect = self.on_disconnect
355 359 self.connect()
356 360 for plot in self.plottype:
357 361 setup.append({
358 362 'plot': plot,
359 363 'topic': self.topic + plot,
360 364 'title': getattr(self, plot + '_' + 'title', False),
361 365 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
362 366 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
363 367 'xrange': getattr(self, plot + '_' + 'xrange', False),
364 368 'yrange': getattr(self, plot + '_' + 'yrange', False),
365 369 'zrange': getattr(self, plot + '_' + 'zrange', False),
366 370 })
367 371 if zeromq is 1:
368 372 context = zmq.Context()
369 373 self.zmq_socket = context.socket(zmq.PUSH)
370 374 server = kwargs.get('server', 'zmq.pipe')
371 375
372 376 if 'tcp://' in server:
373 377 address = server
374 378 else:
375 379 address = 'ipc:///tmp/%s' % server
376 380
377 381 self.zmq_socket.connect(address)
378 382 time.sleep(1)
379 383
380 384
381 385 def publish_data(self):
382 386 self.dataOut.finished = False
383 387 if self.mqtt is 1:
384 388 yData = self.dataOut.heightList[:2].tolist()
385 389 if self.plottype == 'spectra':
386 390 data = getattr(self.dataOut, 'data_spc')
387 391 z = data/self.dataOut.normFactor
388 392 zdB = 10*numpy.log10(z)
389 393 xlen, ylen = zdB[0].shape
390 394 dx = int(xlen/MAXNUMX) + 1
391 395 dy = int(ylen/MAXNUMY) + 1
392 396 Z = [0 for i in self.dataOut.channelList]
393 397 for i in self.dataOut.channelList:
394 398 Z[i] = zdB[i][::dx, ::dy].tolist()
395 399 payload = {
396 400 'timestamp': self.dataOut.utctime,
397 401 'data': roundFloats(Z),
398 402 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
399 403 'interval': self.dataOut.getTimeInterval(),
400 404 'type': self.plottype,
401 405 'yData': yData
402 406 }
403 407
404 408 elif self.plottype in ('rti', 'power'):
405 409 data = getattr(self.dataOut, 'data_spc')
406 410 z = data/self.dataOut.normFactor
407 411 avg = numpy.average(z, axis=1)
408 412 avgdB = 10*numpy.log10(avg)
409 413 xlen, ylen = z[0].shape
410 414 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
411 415 AVG = [0 for i in self.dataOut.channelList]
412 416 for i in self.dataOut.channelList:
413 417 AVG[i] = avgdB[i][::dy].tolist()
414 418 payload = {
415 419 'timestamp': self.dataOut.utctime,
416 420 'data': roundFloats(AVG),
417 421 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
418 422 'interval': self.dataOut.getTimeInterval(),
419 423 'type': self.plottype,
420 424 'yData': yData
421 425 }
422 426 elif self.plottype == 'noise':
423 427 noise = self.dataOut.getNoise()/self.dataOut.normFactor
424 428 noisedB = 10*numpy.log10(noise)
425 429 payload = {
426 430 'timestamp': self.dataOut.utctime,
427 431 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
428 432 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
429 433 'interval': self.dataOut.getTimeInterval(),
430 434 'type': self.plottype,
431 435 'yData': yData
432 436 }
433 437 elif self.plottype == 'snr':
434 438 data = getattr(self.dataOut, 'data_SNR')
435 439 avgdB = 10*numpy.log10(data)
436 440
437 441 ylen = data[0].size
438 442 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
439 443 AVG = [0 for i in self.dataOut.channelList]
440 444 for i in self.dataOut.channelList:
441 445 AVG[i] = avgdB[i][::dy].tolist()
442 446 payload = {
443 447 'timestamp': self.dataOut.utctime,
444 448 'data': roundFloats(AVG),
445 449 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
446 450 'type': self.plottype,
447 451 'yData': yData
448 452 }
449 453 else:
450 454 print "Tipo de grafico invalido"
451 455 payload = {
452 456 'data': 'None',
453 457 'timestamp': 'None',
454 458 'type': None
455 459 }
456 460
457 461 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
458 462
459 463 if self.zeromq is 1:
460 464 if self.verbose:
461 465 log.log(
462 466 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
463 467 self.name
464 468 )
465 469 self.zmq_socket.send_pyobj(self.dataOut)
466 470
467 471 def run(self, dataOut, **kwargs):
468 472 self.dataOut = dataOut
469 473 if not self.isConfig:
470 474 self.setup(**kwargs)
471 475 self.isConfig = True
472 476
473 477 self.publish_data()
474 478 time.sleep(self.delay)
475 479
476 480 def close(self):
477 481 if self.zeromq is 1:
478 482 self.dataOut.finished = True
479 483 self.zmq_socket.send_pyobj(self.dataOut)
480 484 time.sleep(0.1)
481 485 self.zmq_socket.close()
482 486 if self.client:
483 487 self.client.loop_stop()
484 488 self.client.disconnect()
485 489
486 490
487 491 class ReceiverData(ProcessingUnit):
488 492
489 493 __attrs__ = ['server']
490 494
491 495 def __init__(self, **kwargs):
492 496
493 497 ProcessingUnit.__init__(self, **kwargs)
494 498
495 499 self.isConfig = False
496 500 server = kwargs.get('server', 'zmq.pipe')
497 501 if 'tcp://' in server:
498 502 address = server
499 503 else:
500 504 address = 'ipc:///tmp/%s' % server
501 505
502 506 self.address = address
503 507 self.dataOut = JROData()
504 508
505 509 def setup(self):
506 510
507 511 self.context = zmq.Context()
508 512 self.receiver = self.context.socket(zmq.PULL)
509 513 self.receiver.bind(self.address)
510 514 time.sleep(0.5)
511 515 log.success('ReceiverData from {}'.format(self.address))
512 516
513 517
514 518 def run(self):
515 519
516 520 if not self.isConfig:
517 521 self.setup()
518 522 self.isConfig = True
519 523
520 524 self.dataOut = self.receiver.recv_pyobj()
521 525 log.log('{} - {}'.format(self.dataOut.type,
522 526 self.dataOut.datatime.ctime(),),
523 527 'Receiving')
524 528
525 529
526 530 class PlotterReceiver(ProcessingUnit, Process):
527 531
528 532 throttle_value = 5
529 533 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
530 534 'exp_code', 'web_server', 'buffering']
531 535
532 536 def __init__(self, **kwargs):
533 537
534 538 ProcessingUnit.__init__(self, **kwargs)
535 539 Process.__init__(self)
536 540 self.mp = False
537 541 self.isConfig = False
538 542 self.isWebConfig = False
539 543 self.connections = 0
540 544 server = kwargs.get('server', 'zmq.pipe')
541 545 web_server = kwargs.get('web_server', None)
542 546 if 'tcp://' in server:
543 547 address = server
544 548 else:
545 549 address = 'ipc:///tmp/%s' % server
546 550 self.address = address
547 551 self.web_address = web_server
548 552 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
549 553 self.realtime = kwargs.get('realtime', False)
550 554 self.localtime = kwargs.get('localtime', True)
551 555 self.buffering = kwargs.get('buffering', True)
552 556 self.throttle_value = kwargs.get('throttle', 5)
553 557 self.exp_code = kwargs.get('exp_code', None)
554 558 self.sendData = self.initThrottle(self.throttle_value)
555 559 self.dates = []
556 560 self.setup()
557 561
558 562 def setup(self):
559 563
560 564 self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering)
561 565 self.isConfig = True
562 566
563 567 def event_monitor(self, monitor):
564 568
565 569 events = {}
566 570
567 571 for name in dir(zmq):
568 572 if name.startswith('EVENT_'):
569 573 value = getattr(zmq, name)
570 574 events[value] = name
571 575
572 576 while monitor.poll():
573 577 evt = recv_monitor_message(monitor)
574 578 if evt['event'] == 32:
575 579 self.connections += 1
576 580 if evt['event'] == 512:
577 581 pass
578 582
579 583 evt.update({'description': events[evt['event']]})
580 584
581 585 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
582 586 break
583 587 monitor.close()
584 588 print('event monitor thread done!')
585 589
586 590 def initThrottle(self, throttle_value):
587 591
588 592 @throttle(seconds=throttle_value)
589 593 def sendDataThrottled(fn_sender, data):
590 594 fn_sender(data)
591 595
592 596 return sendDataThrottled
593 597
594 598 def send(self, data):
595 599 log.log('Sending {}'.format(data), self.name)
596 600 self.sender.send_pyobj(data)
597 601
598 602 def run(self):
599 603
600 604 log.log(
601 605 'Starting from {}'.format(self.address),
602 606 self.name
603 607 )
604 608
605 609 self.context = zmq.Context()
606 610 self.receiver = self.context.socket(zmq.PULL)
607 611 self.receiver.bind(self.address)
608 612 monitor = self.receiver.get_monitor_socket()
609 613 self.sender = self.context.socket(zmq.PUB)
610 614 if self.web_address:
611 615 log.success(
612 616 'Sending to web: {}'.format(self.web_address),
613 617 self.name
614 618 )
615 619 self.sender_web = self.context.socket(zmq.PUSH)
616 620 self.sender_web.connect(self.web_address)
617 621 time.sleep(1)
618 622
619 623 if 'server' in self.kwargs:
620 624 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
621 625 else:
622 626 self.sender.bind("ipc:///tmp/zmq.plots")
623 627
624 628 time.sleep(2)
625 629
626 630 t = Thread(target=self.event_monitor, args=(monitor,))
627 631 t.start()
628 632
629 633 while True:
630 634 dataOut = self.receiver.recv_pyobj()
631 635 if not dataOut.flagNoData:
632 636 if dataOut.type == 'Parameters':
633 637 tm = dataOut.utctimeInit
634 638 else:
635 639 tm = dataOut.utctime
636 640 if dataOut.useLocalTime:
637 641 if not self.localtime:
638 642 tm += time.timezone
639 643 dt = datetime.datetime.fromtimestamp(tm).date()
640 644 else:
641 645 if self.localtime:
642 646 tm -= time.timezone
643 647 dt = datetime.datetime.utcfromtimestamp(tm).date()
644 648 coerce = False
645 649 if dt not in self.dates:
646 650 if self.data:
647 651 self.data.ended = True
648 652 self.send(self.data)
649 653 coerce = True
650 654 self.data.setup()
651 655 self.dates.append(dt)
652 656
653 657 self.data.update(dataOut, tm)
654 658
655 659 if dataOut.finished is True:
656 660 self.connections -= 1
657 661 if self.connections == 0 and dt in self.dates:
658 662 self.data.ended = True
659 663 self.send(self.data)
660 664 self.data.setup()
661 665 else:
662 666 if self.realtime:
663 667 self.send(self.data)
664 668 if self.web_address:
665 669 payload = self.data.jsonify()
666 670 log.log('Sending to web... type:{}, size:{}'.format(dataOut.type, len(payload)), self.name)
667 671 self.sender_web.send(payload)
668 672 else:
669 673 self.sendData(self.send, self.data, coerce=coerce)
670 674 coerce = False
671 675
672 676 return
673 677
674 678
675 679 class SendToFTP(Operation, Process):
676 680
677 681 '''
678 682 Operation to send data over FTP.
679 683 '''
680 684
681 685 __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout']
682 686
683 687 def __init__(self, **kwargs):
684 688 '''
685 689 patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...]
686 690 '''
687 691 Operation.__init__(self, **kwargs)
688 692 Process.__init__(self)
689 693 self.server = kwargs.get('server')
690 694 self.username = kwargs.get('username')
691 695 self.password = kwargs.get('password')
692 696 self.patterns = kwargs.get('patterns')
693 self.timeout = kwargs.get('timeout', 10)
697 self.timeout = kwargs.get('timeout', 30)
694 698 self.times = [time.time() for p in self.patterns]
695 699 self.latest = ['' for p in self.patterns]
696 700 self.mp = False
697 701 self.ftp = None
698 702
699 703 def setup(self):
700 704
701 705 log.log('Connecting to ftp://{}'.format(self.server), self.name)
702 706 try:
703 707 self.ftp = ftplib.FTP(self.server, timeout=self.timeout)
704 708 except ftplib.all_errors:
705 709 log.error('Server connection fail: {}'.format(self.server), self.name)
706 710 if self.ftp is not None:
707 711 self.ftp.close()
708 712 self.ftp = None
709 713 self.isConfig = False
710 714 return
711 715
712 716 try:
713 717 self.ftp.login(self.username, self.password)
714 718 except ftplib.all_errors:
715 719 log.error('The given username y/o password are incorrect', self.name)
716 720 if self.ftp is not None:
717 721 self.ftp.close()
718 722 self.ftp = None
719 723 self.isConfig = False
720 724 return
721 725
722 726 log.success('Connection success', self.name)
723 727 self.isConfig = True
724 728 return
725 729
726 730 def check(self):
727 731
728 732 try:
729 733 self.ftp.voidcmd("NOOP")
730 734 except:
731 735 log.warning('Connection lost... trying to reconnect', self.name)
732 736 if self.ftp is not None:
733 737 self.ftp.close()
734 738 self.ftp = None
735 739 self.setup()
736 740
737 741 def find_files(self, path, ext):
738 742
739 743 files = glob.glob1(path, '*{}'.format(ext))
740 744 files.sort()
741 745 if files:
742 746 return files[-1]
743 747 return None
744 748
745 749 def getftpname(self, filename, exp_code, sub_exp_code):
746 750
747 751 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
748 752 YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year
749 753 DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday
750 754 exp_code = '%3.3d'%exp_code
751 755 sub_exp_code = '%2.2d'%sub_exp_code
752 756 plot_code = '%2.2d'% PLOT_CODES[filename.split('_')[0].split('-')[1]]
753 757 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
754 758 return name
755 759
756 760 def upload(self, src, dst):
757 761
758 762 log.log('Uploading {} '.format(src), self.name, nl=False)
759 763
760 764 fp = open(src, 'rb')
761 765 command = 'STOR {}'.format(dst)
762 766
763 767 try:
764 768 self.ftp.storbinary(command, fp, blocksize=1024)
765 769 except ftplib.all_errors, e:
766 770 log.error('{}'.format(e), self.name)
767 771 if self.ftp is not None:
768 772 self.ftp.close()
769 773 self.ftp = None
770 774 return
771 775
772 776 try:
773 777 self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
774 778 except ftplib.all_errors, e:
775 779 log.error('{}'.format(e), self.name)
776 780 if self.ftp is not None:
777 781 self.ftp.close()
778 782 self.ftp = None
779 783
780 784 fp.close()
781 785
782 786 log.success('OK', tag='')
783 787
784 788 def send_files(self):
785 789
786 790 for x, pattern in enumerate(self.patterns):
787 791 local, remote, ext, delay, exp_code, sub_exp_code = pattern
788 792 if time.time()-self.times[x] >= delay:
789 793 srcname = self.find_files(local, ext)
790 794
791 795 if srcname is None or srcname == self.latest[x]:
792 796 continue
793 797
794 798 if 'png' in ext:
795 799 dstname = self.getftpname(srcname, exp_code, sub_exp_code)
796 800 else:
797 801 dstname = srcname
798 802
799 803 src = os.path.join(local, srcname)
800 804
801 805 if os.path.getmtime(src) < time.time() - 30*60:
802 806 continue
803 807
804 808 dst = os.path.join(remote, dstname)
805 809
806 810 if self.ftp is None:
807 811 continue
808 812
809 813 self.upload(src, dst)
810 814
811 815 self.times[x] = time.time()
812 816 self.latest[x] = srcname
813 817
814 818 def run(self):
815 819
816 820 while True:
817 821 if not self.isConfig:
818 822 self.setup()
819 823 if self.ftp is not None:
820 824 self.check()
821 825 self.send_files()
822 time.sleep(2)
826 time.sleep(10)
823 827
824 828 def close():
825 829
826 830 if self.ftp is not None:
827 if self.ftp is not None:
828 self.ftp.close()
831 self.ftp.close()
829 832 self.terminate()
General Comments 0
You need to be logged in to leave comments. Login now