##// END OF EJS Templates
FTP modificado, para usarse libremente (no schain) y subir rti de AMISR a jro-app, ventaja de usar esta clase antigua solo se sube un archivo una vez finalizado el procesamiento offline.
joabAM -
r1422:161fd8f163b6
parent child
Show More
@@ -1,659 +1,660
1 1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
2 2 # All rights reserved.
3 3 #
4 4 # Distributed under the terms of the BSD 3-clause license.
5 5 """API to create signal chain projects
6 6
7 7 The API is provide through class: Project
8 8 """
9 9
10 10 import re
11 11 import sys
12 12 import ast
13 13 import datetime
14 14 import traceback
15 15 import time
16 16 import multiprocessing
17 17 from multiprocessing import Process, Queue
18 18 from threading import Thread
19 19 from xml.etree.ElementTree import ElementTree, Element, SubElement
20 20
21 21 from schainpy.admin import Alarm, SchainWarning
22 22 from schainpy.model import *
23 23 from schainpy.utils import log
24 24
25 25 if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7:
26 26 multiprocessing.set_start_method('fork')
27 27
28 28 class ConfBase():
29 29
30 30 def __init__(self):
31 31
32 32 self.id = '0'
33 33 self.name = None
34 34 self.priority = None
35 35 self.parameters = {}
36 36 self.object = None
37 37 self.operations = []
38 38
39 39 def getId(self):
40 40
41 41 return self.id
42 42
43 43 def getNewId(self):
44 44
45 45 return int(self.id) * 10 + len(self.operations) + 1
46 46
47 47 def updateId(self, new_id):
48 48
49 49 self.id = str(new_id)
50 50
51 51 n = 1
52 52 for conf in self.operations:
53 53 conf_id = str(int(new_id) * 10 + n)
54 54 conf.updateId(conf_id)
55 55 n += 1
56 56
57 57 def getKwargs(self):
58 58
59 59 params = {}
60 60
61 61 for key, value in self.parameters.items():
62 62 if value not in (None, '', ' '):
63 63 params[key] = value
64 64
65 65 return params
66 66
67 67 def update(self, **kwargs):
68 68
69 69 for key, value in kwargs.items():
70 70 self.addParameter(name=key, value=value)
71 71
72 72 def addParameter(self, name, value, format=None):
73 73 '''
74 74 '''
75 75
76 76 if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
77 77 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
78 78 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
79 79 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
80 80 else:
81 81 try:
82 82 self.parameters[name] = ast.literal_eval(value)
83 83 except:
84 84 if isinstance(value, str) and ',' in value:
85 85 self.parameters[name] = value.split(',')
86 86 else:
87 87 self.parameters[name] = value
88 88
89 89 def getParameters(self):
90 90
91 91 params = {}
92 92 for key, value in self.parameters.items():
93 93 s = type(value).__name__
94 94 if s == 'date':
95 95 params[key] = value.strftime('%Y/%m/%d')
96 96 elif s == 'time':
97 97 params[key] = value.strftime('%H:%M:%S')
98 98 else:
99 99 params[key] = str(value)
100 100
101 101 return params
102 102
103 103 def makeXml(self, element):
104 104
105 105 xml = SubElement(element, self.ELEMENTNAME)
106 106 for label in self.xml_labels:
107 107 xml.set(label, str(getattr(self, label)))
108 108
109 109 for key, value in self.getParameters().items():
110 110 xml_param = SubElement(xml, 'Parameter')
111 111 xml_param.set('name', key)
112 112 xml_param.set('value', value)
113 113
114 114 for conf in self.operations:
115 115 conf.makeXml(xml)
116 116
117 117 def __str__(self):
118 118
119 119 if self.ELEMENTNAME == 'Operation':
120 120 s = ' {}[id={}]\n'.format(self.name, self.id)
121 121 else:
122 122 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
123 123
124 124 for key, value in self.parameters.items():
125 125 if self.ELEMENTNAME == 'Operation':
126 126 s += ' {}: {}\n'.format(key, value)
127 127 else:
128 128 s += ' {}: {}\n'.format(key, value)
129 129
130 130 for conf in self.operations:
131 131 s += str(conf)
132 132
133 133 return s
134 134
135 135 class OperationConf(ConfBase):
136 136
137 137 ELEMENTNAME = 'Operation'
138 138 xml_labels = ['id', 'name']
139 139
140 140 def setup(self, id, name, priority, project_id, err_queue):
141 141
142 142 self.id = str(id)
143 143 self.project_id = project_id
144 144 self.name = name
145 145 self.type = 'other'
146 146 self.err_queue = err_queue
147 147
148 148 def readXml(self, element, project_id, err_queue):
149 149
150 150 self.id = element.get('id')
151 151 self.name = element.get('name')
152 152 self.type = 'other'
153 153 self.project_id = str(project_id)
154 154 self.err_queue = err_queue
155 155
156 156 for elm in element.iter('Parameter'):
157 157 self.addParameter(elm.get('name'), elm.get('value'))
158 158
159 159 def createObject(self):
160 160
161 161 className = eval(self.name)
162 162
163 163 if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name:
164 164 kwargs = self.getKwargs()
165 165 opObj = className(self.name, self.id, self.project_id, self.err_queue, **kwargs)
166 166 opObj.start()
167 167 self.type = 'external'
168 168 else:
169 169 opObj = className()
170 170
171 171 self.object = opObj
172 172 return opObj
173 173
174 174 class ProcUnitConf(ConfBase):
175 175
176 176 ELEMENTNAME = 'ProcUnit'
177 177 xml_labels = ['id', 'inputId', 'name']
178 178
179 179 def setup(self, project_id, id, name, datatype, inputId, err_queue):
180 180 '''
181 181 '''
182 182
183 183 if datatype == None and name == None:
184 184 raise ValueError('datatype or name should be defined')
185 185
186 186 if name == None:
187 187 if 'Proc' in datatype:
188 188 name = datatype
189 189 else:
190 190 name = '%sProc' % (datatype)
191 191
192 192 if datatype == None:
193 193 datatype = name.replace('Proc', '')
194 194
195 195 self.id = str(id)
196 196 self.project_id = project_id
197 197 self.name = name
198 198 self.datatype = datatype
199 199 self.inputId = inputId
200 200 self.err_queue = err_queue
201 201 self.operations = []
202 202 self.parameters = {}
203 203
204 204 def removeOperation(self, id):
205 205
206 206 i = [1 if x.id==id else 0 for x in self.operations]
207 207 self.operations.pop(i.index(1))
208 208
209 209 def getOperation(self, id):
210 210
211 211 for conf in self.operations:
212 212 if conf.id == id:
213 213 return conf
214 214
215 215 def addOperation(self, name, optype='self'):
216 216 '''
217 217 '''
218 218
219 219 id = self.getNewId()
220 220 conf = OperationConf()
221 221 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
222 222 self.operations.append(conf)
223 223
224 224 return conf
225 225
226 226 def readXml(self, element, project_id, err_queue):
227 227
228 228 self.id = element.get('id')
229 229 self.name = element.get('name')
230 230 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
231 231 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
232 232 self.project_id = str(project_id)
233 233 self.err_queue = err_queue
234 234 self.operations = []
235 235 self.parameters = {}
236 236
237 237 for elm in element:
238 238 if elm.tag == 'Parameter':
239 239 self.addParameter(elm.get('name'), elm.get('value'))
240 240 elif elm.tag == 'Operation':
241 241 conf = OperationConf()
242 242 conf.readXml(elm, project_id, err_queue)
243 243 self.operations.append(conf)
244 244
245 245 def createObjects(self):
246 246 '''
247 247 Instancia de unidades de procesamiento.
248 248 '''
249 249
250 250 className = eval(self.name)
251 251 kwargs = self.getKwargs()
252 252 procUnitObj = className()
253 253 procUnitObj.name = self.name
254 254 log.success('creating process... id: {}, inputId: {}'.format(self.id,self.inputId), self.name)
255 255
256 256 for conf in self.operations:
257 257
258 258 opObj = conf.createObject()
259 259
260 260 log.success('adding operation: {}, type:{}'.format(conf.name,conf.type), self.name)
261 261
262 262 procUnitObj.addOperation(conf, opObj)
263 263
264 264 self.object = procUnitObj
265 265
266 266 def run(self):
267 267 '''
268 268 '''
269 269
270 270 return self.object.call(**self.getKwargs())
271 271
272 272
273 273 class ReadUnitConf(ProcUnitConf):
274 274
275 275 ELEMENTNAME = 'ReadUnit'
276 276
277 277 def __init__(self):
278 278
279 279 self.id = None
280 280 self.datatype = None
281 281 self.name = None
282 282 self.inputId = None
283 283 self.operations = []
284 284 self.parameters = {}
285 285
286 286 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
287 287 startTime='', endTime='', server=None, **kwargs):
288 288
289 289 if datatype == None and name == None:
290 290 raise ValueError('datatype or name should be defined')
291 291 if name == None:
292 292 if 'Reader' in datatype:
293 293 name = datatype
294 294 datatype = name.replace('Reader','')
295 295 else:
296 296 name = '{}Reader'.format(datatype)
297 297 if datatype == None:
298 298 if 'Reader' in name:
299 299 datatype = name.replace('Reader','')
300 300 else:
301 301 datatype = name
302 302 name = '{}Reader'.format(name)
303 303
304 304 self.id = id
305 305 self.project_id = project_id
306 306 self.name = name
307 307 self.datatype = datatype
308 308 self.err_queue = err_queue
309 309
310 310 self.addParameter(name='path', value=path)
311 311 self.addParameter(name='startDate', value=startDate)
312 312 self.addParameter(name='endDate', value=endDate)
313 313 self.addParameter(name='startTime', value=startTime)
314 314 self.addParameter(name='endTime', value=endTime)
315 315
316 316 for key, value in kwargs.items():
317 317 self.addParameter(name=key, value=value)
318 318
319 319
320 320 class Project(Process):
321 321 """API to create signal chain projects"""
322 322
323 323 ELEMENTNAME = 'Project'
324 324
325 325 def __init__(self, name=''):
326 326
327 327 Process.__init__(self)
328 328 self.id = '1'
329 329 if name:
330 330 self.name = '{} ({})'.format(Process.__name__, name)
331 331 self.filename = None
332 332 self.description = None
333 333 self.email = None
334 334 self.alarm = []
335 335 self.configurations = {}
336 336 # self.err_queue = Queue()
337 337 self.err_queue = None
338 338 self.started = False
339 339
340 340 def getNewId(self):
341 341
342 342 idList = list(self.configurations.keys())
343 343 id = int(self.id) * 10
344 344
345 345 while True:
346 346 id += 1
347 347
348 348 if str(id) in idList:
349 349 continue
350 350
351 351 break
352 352
353 353 return str(id)
354 354
355 355 def updateId(self, new_id):
356 356
357 357 self.id = str(new_id)
358 358
359 359 keyList = list(self.configurations.keys())
360 360 keyList.sort()
361 361
362 362 n = 1
363 363 new_confs = {}
364 364
365 365 for procKey in keyList:
366 366
367 367 conf = self.configurations[procKey]
368 368 idProcUnit = str(int(self.id) * 10 + n)
369 369 conf.updateId(idProcUnit)
370 370 new_confs[idProcUnit] = conf
371 371 n += 1
372 372
373 373 self.configurations = new_confs
374 374
375 375 def setup(self, id=1, name='', description='', email=None, alarm=[]):
376 376
377 377 self.id = str(id)
378 378 self.description = description
379 379 self.email = email
380 380 self.alarm = alarm
381 381 if name:
382 382 self.name = '{} ({})'.format(Process.__name__, name)
383 383
384 384 def update(self, **kwargs):
385 385
386 386 for key, value in kwargs.items():
387 387 setattr(self, key, value)
388 388
389 389 def clone(self):
390 390
391 391 p = Project()
392 392 p.id = self.id
393 393 p.name = self.name
394 394 p.description = self.description
395 395 p.configurations = self.configurations.copy()
396 396
397 397 return p
398 398
399 399 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
400 400
401 401 '''
402 402 '''
403 403
404 404 if id is None:
405 405 idReadUnit = self.getNewId()
406 406 else:
407 407 idReadUnit = str(id)
408 408
409 409 conf = ReadUnitConf()
410 410 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
411 411 self.configurations[conf.id] = conf
412 412
413 413 return conf
414 414
415 415 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
416 416
417 417 '''
418 418 '''
419 419
420 420 if id is None:
421 421 idProcUnit = self.getNewId()
422 422 else:
423 423 idProcUnit = id
424 424
425 425 conf = ProcUnitConf()
426 426 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
427 427 self.configurations[conf.id] = conf
428 428
429 429 return conf
430 430
431 431 def removeProcUnit(self, id):
432 432
433 433 if id in self.configurations:
434 434 self.configurations.pop(id)
435 435
436 436 def getReadUnit(self):
437 437
438 438 for obj in list(self.configurations.values()):
439 439 if obj.ELEMENTNAME == 'ReadUnit':
440 440 return obj
441 441
442 442 return None
443 443
444 444 def getProcUnit(self, id):
445 445
446 446 return self.configurations[id]
447 447
448 448 def getUnits(self):
449 449
450 450 keys = list(self.configurations)
451 451 keys.sort()
452 452
453 453 for key in keys:
454 454 yield self.configurations[key]
455 455
456 456 def updateUnit(self, id, **kwargs):
457 457
458 458 conf = self.configurations[id].update(**kwargs)
459 459
460 460 def makeXml(self):
461 461
462 462 xml = Element('Project')
463 463 xml.set('id', str(self.id))
464 464 xml.set('name', self.name)
465 465 xml.set('description', self.description)
466 466
467 467 for conf in self.configurations.values():
468 468 conf.makeXml(xml)
469 469
470 470 self.xml = xml
471 471
472 472 def writeXml(self, filename=None):
473 473
474 474 if filename == None:
475 475 if self.filename:
476 476 filename = self.filename
477 477 else:
478 478 filename = 'schain.xml'
479 479
480 480 if not filename:
481 481 print('filename has not been defined. Use setFilename(filename) for do it.')
482 482 return 0
483 483
484 484 abs_file = os.path.abspath(filename)
485 485
486 486 if not os.access(os.path.dirname(abs_file), os.W_OK):
487 487 print('No write permission on %s' % os.path.dirname(abs_file))
488 488 return 0
489 489
490 490 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
491 491 print('File %s already exists and it could not be overwriten' % abs_file)
492 492 return 0
493 493
494 494 self.makeXml()
495 495
496 496 ElementTree(self.xml).write(abs_file, method='xml')
497 497
498 498 self.filename = abs_file
499 499
500 500 return 1
501 501
502 502 def readXml(self, filename):
503 503
504 504 abs_file = os.path.abspath(filename)
505 505
506 506 self.configurations = {}
507 507
508 508 try:
509 509 self.xml = ElementTree().parse(abs_file)
510 510 except:
511 511 log.error('Error reading %s, verify file format' % filename)
512 512 return 0
513 513
514 514 self.id = self.xml.get('id')
515 515 self.name = self.xml.get('name')
516 516 self.description = self.xml.get('description')
517 517
518 518 for element in self.xml:
519 519 if element.tag == 'ReadUnit':
520 520 conf = ReadUnitConf()
521 521 conf.readXml(element, self.id, self.err_queue)
522 522 self.configurations[conf.id] = conf
523 523 elif element.tag == 'ProcUnit':
524 524 conf = ProcUnitConf()
525 525 input_proc = self.configurations[element.get('inputId')]
526 526 conf.readXml(element, self.id, self.err_queue)
527 527 self.configurations[conf.id] = conf
528 528
529 529 self.filename = abs_file
530 530
531 531 return 1
532 532
533 533 def __str__(self):
534 534
535 535 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
536 536 self.id,
537 537 self.name,
538 538 self.description,
539 539 )
540 540
541 541 for conf in self.configurations.values():
542 542 text += '{}'.format(conf)
543 543
544 544 return text
545 545
546 546 def createObjects(self):
547 547
548 548 keys = list(self.configurations.keys())
549 549 keys.sort()
550 550 for key in keys:
551 551 conf = self.configurations[key]
552 552 conf.createObjects()
553 553 if conf.inputId is not None:
554 554 conf.object.setInput(self.configurations[conf.inputId].object)
555 555
556 556 def monitor(self):
557 557
558 558 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
559 559 t.start()
560 560
561 561 def _monitor(self, queue, ctx):
562 562
563 563 import socket
564 564
565 565 procs = 0
566 566 err_msg = ''
567 567
568 568 while True:
569 569 msg = queue.get()
570 570 if '#_start_#' in msg:
571 571 procs += 1
572 572 elif '#_end_#' in msg:
573 573 procs -=1
574 574 else:
575 575 err_msg = msg
576 576
577 577 if procs == 0 or 'Traceback' in err_msg:
578 578 break
579 579 time.sleep(0.1)
580 580
581 581 if '|' in err_msg:
582 582 name, err = err_msg.split('|')
583 583 if 'SchainWarning' in err:
584 584 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
585 585 elif 'SchainError' in err:
586 586 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
587 587 else:
588 588 log.error(err, name)
589 589 else:
590 590 name, err = self.name, err_msg
591 591
592 592 time.sleep(1)
593 593
594 594 ctx.term()
595 595
596 596 message = ''.join(err)
597 597
598 598 if err_msg:
599 599 subject = 'SChain v%s: Error running %s\n' % (
600 600 schainpy.__version__, self.name)
601 601
602 602 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
603 603 socket.gethostname())
604 604 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
605 605 subtitle += 'Configuration file: %s\n' % self.filename
606 606 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
607 607
608 608 readUnitConfObj = self.getReadUnit()
609 609 if readUnitConfObj:
610 610 subtitle += '\nInput parameters:\n'
611 611 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
612 612 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
613 613 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
614 614 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
615 615 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
616 616
617 617 a = Alarm(
618 618 modes=self.alarm,
619 619 email=self.email,
620 620 message=message,
621 621 subject=subject,
622 622 subtitle=subtitle,
623 623 filename=self.filename
624 624 )
625 625
626 626 a.start()
627 627
628 628 def setFilename(self, filename):
629 629
630 630 self.filename = filename
631 631
632 632 def runProcs(self):
633 633
634 634 err = False
635 635 n = len(self.configurations)
636 636
637 637 while not err:
638 638 #print("STAR")
639 639 for conf in self.getUnits():
640 640 #print("CONF: ",conf)
641 641 ok = conf.run()
642 642 if ok == 'Error':
643 #self.removeProcUnit(conf.id) #remove proc Unit
643 644 n -= 1
644 645 continue
645 646 elif not ok:
646 647 break
647 648 if n == 0:
648 649 err = True
649 650
650 651 def run(self):
651 652
652 653 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
653 654 self.started = True
654 655 self.start_time = time.time()
655 656 self.createObjects()
656 657 self.runProcs()
657 658 log.success('{} Done (Time: {:4.2f}s)'.format(
658 659 self.name,
659 660 time.time()-self.start_time), '')
@@ -1,683 +1,685
1 1 import os
2 2 import time
3 3 import datetime
4 4
5 5 import numpy
6 6 import h5py
7 7
8 8 import schainpy.admin
9 9 from schainpy.model.data.jrodata import *
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.io.jroIO_base import *
12 12 from schainpy.utils import log
13 13
14 14
15 15 class HDFReader(Reader, ProcessingUnit):
16 16 """Processing unit to read HDF5 format files
17 17
18 18 This unit reads HDF5 files created with `HDFWriter` operation contains
19 19 by default two groups Data and Metadata all variables would be saved as `dataOut`
20 20 attributes.
21 21 It is possible to read any HDF5 file by given the structure in the `description`
22 22 parameter, also you can add extra values to metadata with the parameter `extras`.
23 23
24 24 Parameters:
25 25 -----------
26 26 path : str
27 27 Path where files are located.
28 28 startDate : date
29 29 Start date of the files
30 30 endDate : list
31 31 End date of the files
32 32 startTime : time
33 33 Start time of the files
34 34 endTime : time
35 35 End time of the files
36 36 description : dict, optional
37 37 Dictionary with the description of the HDF5 file
38 38 extras : dict, optional
39 39 Dictionary with extra metadata to be be added to `dataOut`
40 40
41 41 Examples
42 42 --------
43 43
44 44 desc = {
45 45 'Data': {
46 46 'data_output': ['u', 'v', 'w'],
47 47 'utctime': 'timestamps',
48 48 } ,
49 49 'Metadata': {
50 50 'heightList': 'heights'
51 51 }
52 52 }
53 53
54 54 desc = {
55 55 'Data': {
56 56 'data_output': 'winds',
57 57 'utctime': 'timestamps'
58 58 },
59 59 'Metadata': {
60 60 'heightList': 'heights'
61 61 }
62 62 }
63 63
64 64 extras = {
65 65 'timeZone': 300
66 66 }
67 67
68 68 reader = project.addReadUnit(
69 69 name='HDFReader',
70 70 path='/path/to/files',
71 71 startDate='2019/01/01',
72 72 endDate='2019/01/31',
73 73 startTime='00:00:00',
74 74 endTime='23:59:59',
75 75 # description=json.dumps(desc),
76 76 # extras=json.dumps(extras),
77 77 )
78 78
79 79 """
80 80
81 81 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
82 82
83 83 def __init__(self):
84 84 ProcessingUnit.__init__(self)
85 85
86 86 self.ext = ".hdf5"
87 87 self.optchar = "D"
88 88 self.meta = {}
89 89 self.data = {}
90 90 self.open_file = h5py.File
91 91 self.open_mode = 'r'
92 92 self.description = {}
93 93 self.extras = {}
94 94 self.filefmt = "*%Y%j***"
95 95 self.folderfmt = "*%Y%j"
96 96 self.utcoffset = 0
97 97
98 98 self.dataOut = Parameters()
99 99 self.dataOut.error=False ## NOTE: Importante definir esto antes inicio
100 100 self.dataOut.flagNoData = True
101 101
102 102 def setup(self, **kwargs):
103 103
104 104 self.set_kwargs(**kwargs)
105 105 if not self.ext.startswith('.'):
106 106 self.ext = '.{}'.format(self.ext)
107 107
108 108 if self.online:
109 109 log.log("Searching files in online mode...", self.name)
110 110
111 111 for nTries in range(self.nTries):
112 112 fullpath = self.searchFilesOnLine(self.path, self.startDate,
113 113 self.endDate, self.expLabel, self.ext, self.walk,
114 114 self.filefmt, self.folderfmt)
115 115 pathname, filename = os.path.split(fullpath)
116 116
117 117 try:
118 118 fullpath = next(fullpath)
119 119
120 120 except:
121 121 fullpath = None
122 122
123 123 if fullpath:
124 124 break
125 125
126 126 log.warning(
127 127 'Waiting {} sec for a valid file in {}: try {} ...'.format(
128 128 self.delay, self.path, nTries + 1),
129 129 self.name)
130 130 time.sleep(self.delay)
131 131
132 132 if not(fullpath):
133 133 raise schainpy.admin.SchainError(
134 134 'There isn\'t any valid file in {}'.format(self.path))
135 135
136 136 pathname, filename = os.path.split(fullpath)
137 137 self.year = int(filename[1:5])
138 138 self.doy = int(filename[5:8])
139 139 self.set = int(filename[8:11]) - 1
140 140 else:
141 141 log.log("Searching files in {}".format(self.path), self.name)
142 142 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
143 143 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
144 144
145 145 self.setNextFile()
146 146
147 147
148 148
149 149
150 150 def readFirstHeader(self):
151 151 '''Read metadata and data'''
152 152
153 153 self.__readMetadata()
154 154 self.__readData()
155 155 self.__setBlockList()
156 156
157 157 for attr in self.meta:
158 158 setattr(self.dataOut, attr, self.meta[attr])
159 159 self.blockIndex = 0
160 160
161 161 return
162 162
163 163 def __setBlockList(self):
164 164 '''
165 165 Selects the data within the times defined
166 166
167 167 self.fp
168 168 self.startTime
169 169 self.endTime
170 170 self.blockList
171 171 self.blocksPerFile
172 172
173 173 '''
174 174
175 175 startTime = self.startTime
176 176 endTime = self.endTime
177 177 thisUtcTime = self.data['utctime'] + self.utcoffset
178 178 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
179 179 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
180 180 self.startFileDatetime = thisDatetime
181 181 thisDate = thisDatetime.date()
182 182 thisTime = thisDatetime.time()
183 183
184 184 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
185 185 endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
186 186
187 187 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
188 188
189 189 self.blockList = ind
190 190 self.blocksPerFile = len(ind)
191 191 self.blocksPerFile = len(thisUtcTime)
192 192 return
193 193
194 194 def __readMetadata(self):
195 195 '''
196 196 Reads Metadata
197 197 '''
198 198
199 199 meta = {}
200 200
201 201 if self.description:
202 202 for key, value in self.description['Metadata'].items():
203 203 meta[key] = self.fp[value][()]
204 204 else:
205 205 grp = self.fp['Metadata']
206 206 for name in grp:
207 207 meta[name] = grp[name][()]
208 208
209 209 if self.extras:
210 210 for key, value in self.extras.items():
211 211 meta[key] = value
212 212 self.meta = meta
213 213
214 214 return
215 215
216 216
217 217
218 218 def checkForRealPath(self, nextFile, nextDay):
219 219
220 220 # print("check FRP")
221 221 # dt = self.startFileDatetime + datetime.timedelta(1)
222 222 # filename = '{}.{}{}'.format(self.path, dt.strftime('%Y%m%d'), self.ext)
223 223 # fullfilename = os.path.join(self.path, filename)
224 224 # print("check Path ",fullfilename,filename)
225 225 # if os.path.exists(fullfilename):
226 226 # return fullfilename, filename
227 227 # return None, filename
228 228 return None,None
229 229
230 230 def __readData(self):
231 231
232 232 data = {}
233 233
234 234 if self.description:
235 235 for key, value in self.description['Data'].items():
236 236 if isinstance(value, str):
237 237 if isinstance(self.fp[value], h5py.Dataset):
238 238 data[key] = self.fp[value][()]
239 239 elif isinstance(self.fp[value], h5py.Group):
240 240 array = []
241 241 for ch in self.fp[value]:
242 242 array.append(self.fp[value][ch][()])
243 243 data[key] = numpy.array(array)
244 244 elif isinstance(value, list):
245 245 array = []
246 246 for ch in value:
247 247 array.append(self.fp[ch][()])
248 248 data[key] = numpy.array(array)
249 249 else:
250 250 grp = self.fp['Data']
251 251 for name in grp:
252 252 if isinstance(grp[name], h5py.Dataset):
253 253 array = grp[name][()]
254 254 elif isinstance(grp[name], h5py.Group):
255 255 array = []
256 256 for ch in grp[name]:
257 257 array.append(grp[name][ch][()])
258 258 array = numpy.array(array)
259 259 else:
260 260 log.warning('Unknown type: {}'.format(name))
261 261
262 262 if name in self.description:
263 263 key = self.description[name]
264 264 else:
265 265 key = name
266 266 data[key] = array
267 267
268 268 self.data = data
269 269 return
270 270
271 271 def getData(self):
272 272 if not self.isDateTimeInRange(self.startFileDatetime, self.startDate, self.endDate, self.startTime, self.endTime):
273 273 self.dataOut.flagNoData = True
274 274 self.blockIndex = self.blocksPerFile
275 275 self.dataOut.error = True # TERMINA EL PROGRAMA
276 276 return
277 277 for attr in self.data:
278 278
279 279 if self.data[attr].ndim == 1:
280 280 setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
281 281 else:
282 282 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
283 283
284 284
285 285 self.blockIndex += 1
286 286
287 287 if self.blockIndex == 1:
288 288 log.log("Block No. {}/{} -> {}".format(
289 289 self.blockIndex,
290 290 self.blocksPerFile,
291 291 self.dataOut.datatime.ctime()), self.name)
292 292 else:
293 293 log.log("Block No. {}/{} ".format(
294 294 self.blockIndex,
295 295 self.blocksPerFile),self.name)
296 296
297 297 if self.blockIndex == self.blocksPerFile:
298 298 self.setNextFile()
299 299
300 300 self.dataOut.flagNoData = False
301 301
302 302
303 303 def run(self, **kwargs):
304 304
305 305 if not(self.isConfig):
306 306 self.setup(**kwargs)
307 307 self.isConfig = True
308 308
309 309 self.getData()
310 310
311 311 #@MPDecorator
312 312 class HDFWrite(Operation):
313 313 """Operation to write HDF5 files.
314 314
315 315 The HDF5 file contains by default two groups Data and Metadata where
316 316 you can save any `dataOut` attribute specified by `dataList` and `metadataList`
317 317 parameters, data attributes are normaly time dependent where the metadata
318 318 are not.
319 319 It is possible to customize the structure of the HDF5 file with the
320 320 optional description parameter see the examples.
321 321
322 322 Parameters:
323 323 -----------
324 324 path : str
325 325 Path where files will be saved.
326 326 blocksPerFile : int
327 327 Number of blocks per file
328 328 metadataList : list
329 329 List of the dataOut attributes that will be saved as metadata
330 330 dataList : int
331 331 List of the dataOut attributes that will be saved as data
332 332 setType : bool
333 333 If True the name of the files corresponds to the timestamp of the data
334 334 description : dict, optional
335 335 Dictionary with the desired description of the HDF5 file
336 336
337 337 Examples
338 338 --------
339 339
340 340 desc = {
341 341 'data_output': {'winds': ['z', 'w', 'v']},
342 342 'utctime': 'timestamps',
343 343 'heightList': 'heights'
344 344 }
345 345 desc = {
346 346 'data_output': ['z', 'w', 'v'],
347 347 'utctime': 'timestamps',
348 348 'heightList': 'heights'
349 349 }
350 350 desc = {
351 351 'Data': {
352 352 'data_output': 'winds',
353 353 'utctime': 'timestamps'
354 354 },
355 355 'Metadata': {
356 356 'heightList': 'heights'
357 357 }
358 358 }
359 359
360 360 writer = proc_unit.addOperation(name='HDFWriter')
361 361 writer.addParameter(name='path', value='/path/to/file')
362 362 writer.addParameter(name='blocksPerFile', value='32')
363 363 writer.addParameter(name='metadataList', value='heightList,timeZone')
364 364 writer.addParameter(name='dataList',value='data_output,utctime')
365 365 # writer.addParameter(name='description',value=json.dumps(desc))
366 366
367 367 """
368 368
369 369 ext = ".hdf5"
370 370 optchar = "D"
371 371 filename = None
372 372 path = None
373 373 setFile = None
374 374 fp = None
375 375 firsttime = True
376 376 #Configurations
377 377 blocksPerFile = None
378 378 blockIndex = None
379 379 dataOut = None #eval ??????
380 380 #Data Arrays
381 381 dataList = None
382 382 metadataList = None
383 383 currentDay = None
384 384 lastTime = None
385 385 timeZone = "ut"
386 386 hourLimit = 3
387 387 breakDays = True
388 388
389 389 def __init__(self):
390 390
391 391 Operation.__init__(self)
392 392
393 393
394 394 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None,
395 395 description={},timeZone = "ut",hourLimit = 3, breakDays=True):
396 396 self.path = path
397 397 self.blocksPerFile = blocksPerFile
398 398 self.metadataList = metadataList
399 399 self.dataList = [s.strip() for s in dataList]
400 400 self.setType = setType
401 401 self.description = description
402 402 self.timeZone = timeZone
403 403 self.hourLimit = hourLimit
404 404 self.breakDays = breakDays
405 405
406 406 if self.metadataList is None:
407 407 self.metadataList = self.dataOut.metadata_list
408 408
409 409 tableList = []
410 410 dsList = []
411 411
412 412 for i in range(len(self.dataList)):
413 413 dsDict = {}
414 414 if hasattr(self.dataOut, self.dataList[i]):
415 415 dataAux = getattr(self.dataOut, self.dataList[i])
416 416 dsDict['variable'] = self.dataList[i]
417 417 else:
418 418 log.warning('Attribute {} not found in dataOut', self.name)
419 419 continue
420 420
421 421 if dataAux is None:
422 422 continue
423 423 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
424 424 dsDict['nDim'] = 0
425 425 else:
426 426 dsDict['nDim'] = len(dataAux.shape)
427 427 dsDict['shape'] = dataAux.shape
428 428 dsDict['dsNumber'] = dataAux.shape[0]
429 429 dsDict['dtype'] = dataAux.dtype
430 430
431 431 dsList.append(dsDict)
432 432
433 433 self.blockIndex = 0
434 434 self.dsList = dsList
435 435 self.currentDay = self.dataOut.datatime.date()
436 436
437 437
438 438 def timeFlag(self):
439 439 currentTime = self.dataOut.utctime
440 440 timeTuple = None
441 441 if self.timeZone == "lt":
442 442 timeTuple = time.localtime(currentTime)
443 443 else :
444 444 timeTuple = time.gmtime(currentTime)
445 445
446 446 dataDay = timeTuple.tm_yday
447 447
448 448 if self.lastTime is None:
449 449 self.lastTime = currentTime
450 450 self.currentDay = dataDay
451 451 return False
452 452
453 453 timeDiff = currentTime - self.lastTime
454 454
455 455 #Si el dia es diferente o si la diferencia entre un dato y otro supera self.hourLimit
456 456 if (dataDay != self.currentDay) and self.breakDays:
457 457 self.currentDay = dataDay
458 458 return True
459 459 elif timeDiff > self.hourLimit*60*60:
460 460 self.lastTime = currentTime
461 461 return True
462 462 else:
463 463 self.lastTime = currentTime
464 464 return False
465 465
466 466 def run(self, dataOut,**kwargs):
467 467
468 468 self.dataOut = dataOut
469 469 if not(self.isConfig):
470 470 self.setup(**kwargs)
471 471
472 472 self.isConfig = True
473 473 self.setNextFile()
474 474
475 475 self.putData()
476 476
477 477 return self.dataOut
478 478
479 479 def setNextFile(self):
480 480
481 481 ext = self.ext
482 482 path = self.path
483 483 setFile = self.setFile
484 484 timeTuple = None
485 485 if self.timeZone == "lt":
486 486 timeTuple = time.localtime(self.dataOut.utctime)
487 487 elif self.timeZone == "ut":
488 488 timeTuple = time.gmtime(self.dataOut.utctime)
489 489 #print("path: ",timeTuple)
490 490 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
491 491 fullpath = os.path.join(path, subfolder)
492 492
493 493 if os.path.exists(fullpath):
494 494 filesList = os.listdir(fullpath)
495 495 filesList = [k for k in filesList if k.startswith(self.optchar)]
496 496 if len( filesList ) > 0:
497 497 filesList = sorted(filesList, key=str.lower)
498 498 filen = filesList[-1]
499 499 # el filename debera tener el siguiente formato
500 500 # 0 1234 567 89A BCDE (hex)
501 501 # x YYYY DDD SSS .ext
502 502 if isNumber(filen[8:11]):
503 503 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
504 504 else:
505 505 setFile = -1
506 506 else:
507 507 setFile = -1 #inicializo mi contador de seteo
508 508 else:
509 509 os.makedirs(fullpath)
510 510 setFile = -1 #inicializo mi contador de seteo
511 511
512 512 if self.setType is None:
513 513 setFile += 1
514 514 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
515 515 timeTuple.tm_year,
516 516 timeTuple.tm_yday,
517 517 setFile,
518 518 ext )
519 519 else:
520 520 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
521 521 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
522 522 timeTuple.tm_year,
523 523 timeTuple.tm_yday,
524 524 setFile,
525 525 ext )
526 526
527 527 self.filename = os.path.join( path, subfolder, file )
528 528
529 529
530 530
531 531 def getLabel(self, name, x=None):
532 532
533 533 if x is None:
534 534 if 'Data' in self.description:
535 535 data = self.description['Data']
536 536 if 'Metadata' in self.description:
537 537 data.update(self.description['Metadata'])
538 538 else:
539 539 data = self.description
540 540 if name in data:
541 541 if isinstance(data[name], str):
542 542 return data[name]
543 543 elif isinstance(data[name], list):
544 544 return None
545 545 elif isinstance(data[name], dict):
546 546 for key, value in data[name].items():
547 547 return key
548 548 return name
549 549 else:
550 550 if 'Metadata' in self.description:
551 551 meta = self.description['Metadata']
552 552 else:
553 553 meta = self.description
554 554 if name in meta:
555 555 if isinstance(meta[name], list):
556 556 return meta[name][x]
557 557 elif isinstance(meta[name], dict):
558 558 for key, value in meta[name].items():
559 559 return value[x]
560 560 if 'cspc' in name:
561 561 return 'pair{:02d}'.format(x)
562 562 else:
563 563 return 'channel{:02d}'.format(x)
564 564
565 565 def writeMetadata(self, fp):
566 566
567 567 if self.description:
568 568 if 'Metadata' in self.description:
569 569 grp = fp.create_group('Metadata')
570 570 else:
571 571 grp = fp
572 572 else:
573 573 grp = fp.create_group('Metadata')
574 574
575 575 for i in range(len(self.metadataList)):
576 576 if not hasattr(self.dataOut, self.metadataList[i]):
577 577 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
578 578 continue
579 579 value = getattr(self.dataOut, self.metadataList[i])
580 580 if isinstance(value, bool):
581 581 if value is True:
582 582 value = 1
583 583 else:
584 584 value = 0
585 585 grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
586 586 return
587 587
588 588 def writeData(self, fp):
589 589
590 590 if self.description:
591 591 if 'Data' in self.description:
592 592 grp = fp.create_group('Data')
593 593 else:
594 594 grp = fp
595 595 else:
596 596 grp = fp.create_group('Data')
597 597
598 598 dtsets = []
599 599 data = []
600 600
601 601 for dsInfo in self.dsList:
602 602 if dsInfo['nDim'] == 0:
603 603 ds = grp.create_dataset(
604 604 self.getLabel(dsInfo['variable']),
605 605 (self.blocksPerFile, ),
606 606 chunks=True,
607 607 dtype=numpy.float64)
608 608 dtsets.append(ds)
609 609 data.append((dsInfo['variable'], -1))
610 610 else:
611 611 label = self.getLabel(dsInfo['variable'])
612 612 if label is not None:
613 613 sgrp = grp.create_group(label)
614 614 else:
615 615 sgrp = grp
616 616 for i in range(dsInfo['dsNumber']):
617 617 ds = sgrp.create_dataset(
618 618 self.getLabel(dsInfo['variable'], i),
619 619 (self.blocksPerFile, ) + dsInfo['shape'][1:],
620 620 chunks=True,
621 621 dtype=dsInfo['dtype'])
622 622 dtsets.append(ds)
623 623 data.append((dsInfo['variable'], i))
624 624 fp.flush()
625 625
626 626 log.log('Creating file: {}'.format(fp.filename), self.name)
627 627
628 628 self.ds = dtsets
629 629 self.data = data
630 630 self.firsttime = True
631 631
632 632 return
633 633
634 634 def putData(self):
635 635
636 636 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
637 637 self.closeFile()
638 638 self.setNextFile()
639 639 self.dataOut.flagNoData = False
640 640 self.blockIndex = 0
641 641 return
642 642
643 643
644 644
645 645 if self.blockIndex == 0:
646 646 #Escribir metadata Aqui???
647 647 #Setting HDF5 File
648 648 self.fp = h5py.File(self.filename, 'w')
649 649 #write metadata
650 650 self.writeMetadata(self.fp)
651 651 #Write data
652 652 self.writeData(self.fp)
653 653 log.log('Block No. {}/{} --> {}'.format(self.blockIndex+1, self.blocksPerFile,self.dataOut.datatime.ctime()), self.name)
654 elif (self.blockIndex % 10 ==0):
655 log.log('Block No. {}/{} --> {}'.format(self.blockIndex+1, self.blocksPerFile,self.dataOut.datatime.ctime()), self.name)
654 656 else:
655 657
656 658 log.log('Block No. {}/{}'.format(self.blockIndex+1, self.blocksPerFile), self.name)
657 659
658 660 for i, ds in enumerate(self.ds):
659 661 attr, ch = self.data[i]
660 662 if ch == -1:
661 663 ds[self.blockIndex] = getattr(self.dataOut, attr)
662 664 else:
663 665 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
664 666
665 667 self.blockIndex += 1
666 668
667 669 self.fp.flush()
668 670 self.dataOut.flagNoData = True
669 671
670 672
671 673 def closeFile(self):
672 674
673 675 if self.blockIndex != self.blocksPerFile:
674 676 for ds in self.ds:
675 677 ds.resize(self.blockIndex, axis=0)
676 678
677 679 if self.fp:
678 680 self.fp.flush()
679 681 self.fp.close()
680 682
681 683 def close(self):
682 684
683 685 self.closeFile()
@@ -1,1112 +1,1201
1 1 '''
2 2 @author: Daniel Suarez
3 3 '''
4 4 import os
5 5 import glob
6 6 import ftplib
7 7
8 8 try:
9 9 import paramiko
10 10 import scp
11 11 except:
12 12 pass
13 13
14 14 import time
15 15
16 16 import threading
17 17 Thread = threading.Thread
18 18
19 19 # try:
20 20 # from gevent import sleep
21 21 # except:
22 22 from time import sleep
23 23 from schainpy.model.data.jrodata import *
24 24 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
25 25 #@MPDecorator
26 26 class Remote(Thread):
27 27 """
28 28 Remote is a parent class used to define the behaviour of FTP and SSH class. These clases are
29 29 used to upload or download files remotely.
30 30
31 31 Non-standard Python modules used:
32 32 None
33 33
34 34 Written by:
35 35 "Miguel Urco":mailto:miguel.urco@jro.igp.gob.pe Jun. 03, 2015
36 36 Modified by:
37 37 -
38 38 """
39 39
40 40 server = None
41 41 username = None
42 42 password = None
43 43 remotefolder = None
44 44 key_filename=None
45 45
46 46 period = 60
47 47 fileList = []
48 48 bussy = False
49 49
50 50 def __init__(self, server, username, password, remotefolder, period=60,key_filename=None):
51 51
52 52 Thread.__init__(self)
53 53
54 54 self.setDaemon(True)
55 55
56 56 self.status = 0
57 57
58 58 self.__server = server
59 59 self.__username = username
60 60 self.__password = password
61 61 self.__remotefolder = remotefolder
62 62
63 63 self.period = period
64 64 self.key_filename = key_filename
65 65 self.fileList = []
66 66 self.bussy = False
67 67
68 68 self.stopFlag = False
69 69
70 70 print("[Remote Server] Opening server: %s" %self.__server)
71 71 if self.open(self.__server, self.__username, self.__password, self.__remotefolder,key_filename=self.key_filename):
72 72 print("[Remote Server] %s server was opened successfully" %self.__server)
73 73
74 74 #self.close()
75 75
76 76 self.mutex = threading.Lock()
77 77
78 78 def stop(self):
79 79
80 80 self.stopFlag = True
81 81 self.join(10)
82 82
83 83 def open(self):
84 84 """
85 85 Connect to server and create a connection class (FTP or SSH) to remote server.
86 86 """
87 87 raise NotImplementedError("Implement this method in child class")
88 88
89 89 def close(self):
90 90 """
91 91 Close connection to server
92 92 """
93 93 raise NotImplementedError("Implement this method in child class")
94 94
95 95 def mkdir(self, remotefolder):
96 96 """
97 97 Create a folder remotely
98 98 """
99 99 raise NotImplementedError("Implement this method in child class")
100 100
101 101 def cd(self, remotefolder):
102 102 """
103 103 Change working directory in remote server
104 104 """
105 105 raise NotImplementedError("Implement this method in child class")
106 106
107 107 def download(self, filename, localfolder=None):
108 108 """
109 109 Download a file from server to local host
110 110 """
111 111 raise NotImplementedError("Implement this method in child class")
112 112
113 113 def sendFile(self, fullfilename):
114 114 """
115 115 sendFile method is used to upload a local file to the current directory in remote server
116 116
117 117 Inputs:
118 118 fullfilename - full path name of local file to store in remote directory
119 119
120 120 Returns:
121 121 0 in error case else 1
122 122 """
123 123 raise NotImplementedError("Implement this method in child class")
124 124
125 125 def upload(self, fullfilename, remotefolder=None):
126 126 """
127 127 upload method is used to upload a local file to remote directory. This method changes
128 128 working directory before sending a file.
129 129
130 130 Inputs:
131 131 fullfilename - full path name of local file to store in remote directory
132 132
133 133 remotefolder - remote directory
134 134
135 135 Returns:
136 136 0 in error case else 1
137 137 """
138 138 print("[Remote Server] Uploading %s to %s:%s" %(fullfilename, self.server, self.remotefolder))
139 139
140 140 if not self.status:
141 141 return 0
142 142
143 143 if remotefolder == None:
144 144 remotefolder = self.remotefolder
145 145
146 146 if not self.cd(remotefolder):
147 147 return 0
148 148
149 149 if not self.sendFile(fullfilename):
150 150 print("[Remote Server] Error uploading file %s" %fullfilename)
151 151 return 0
152 152
153 153
154 154
155 155 return 1
156 156
157 157 def delete(self, filename):
158 158 """
159 159 Remove a file from remote server
160 160 """
161 161 pass
162 162
163 163 def updateFileList(self, fileList):
164 164 """
165 165 Remove a file from remote server
166 166 """
167 167
168 168 if fileList == self.fileList:
169 169 return 0
170 170
171 171 self.mutex.acquire()
172 172 # init = time.time()
173 173 #
174 174 # while(self.bussy):
175 175 # sleep(0.1)
176 176 # if time.time() - init > 2*self.period:
177 177 # return 0
178 178
179 179 self.fileList = fileList
180 180 self.mutex.release()
181 181 return 1
182 182
183 183 def run(self):
184 184
185 185 if not self.status:
186 186 print("Finishing FTP service")
187 187 return
188 188
189 189 if not self.cd(self.remotefolder):
190 190 raise ValueError("Could not access to the new remote directory: %s" %self.remotefolder)
191 191
192 192 while True:
193 193
194 194 for i in range(self.period):
195 195 if self.stopFlag:
196 196 break
197 197 sleep(1)
198 198
199 199 if self.stopFlag:
200 200 break
201 201
202 202 # self.bussy = True
203 203 self.mutex.acquire()
204 204
205 205 print("[Remote Server] Opening %s" %self.__server)
206 206 if not self.open(self.__server, self.__username, self.__password, self.__remotefolder):
207 207 self.mutex.release()
208 208 continue
209 209
210 210 for thisFile in self.fileList:
211 211 self.upload(thisFile, self.remotefolder)
212 212
213 213 print("[Remote Server] Closing %s" %self.__server)
214 214 self.close()
215 215
216 216 self.mutex.release()
217 217 # self.bussy = False
218 218
219 219 print("[Remote Server] Thread stopped successfully")
220 220
221 221 class FTPClient(Remote):
222 222
223 223 __ftpClientObj = None
224 224
225 225 def __init__(self, server, username, password, remotefolder, period=60):
226 226 """
227 227 """
228 228 Remote.__init__(self, server, username, password, remotefolder, period)
229 229
230 230 def open(self, server, username, password, remotefolder):
231 231
232 232 """
233 233 This method is used to set FTP parameters and establish a connection to remote server
234 234
235 235 Inputs:
236 236 server - remote server IP Address
237 237
238 238 username - remote server Username
239 239
240 240 password - remote server password
241 241
242 242 remotefolder - remote server current working directory
243 243
244 244 Return:
245 245 Boolean - Returns 1 if a connection has been established, 0 otherwise
246 246
247 247 Affects:
248 248 self.status - in case of error or fail connection this parameter is set to 0 else 1
249 249
250 250 """
251 251
252 252 if server == None:
253 253 raise ValueError("FTP server should be defined")
254 254
255 255 if username == None:
256 256 raise ValueError("FTP username should be defined")
257 257
258 258 if password == None:
259 259 raise ValueError("FTP password should be defined")
260 260
261 261 if remotefolder == None:
262 262 raise ValueError("FTP remote folder should be defined")
263 263
264 264 try:
265 265 ftpClientObj = ftplib.FTP(server)
266 266 except ftplib.all_errors as e:
267 267 print("[FTP Server]: FTP server connection fail: %s" %server)
268 268 print("[FTP Server]:", e)
269 269 self.status = 0
270 270 return 0
271 271
272 272 try:
273 273 ftpClientObj.login(username, password)
274 274 except ftplib.all_errors:
275 275 print("[FTP Server]: FTP username or password are incorrect")
276 276 self.status = 0
277 277 return 0
278 278
279 279 if remotefolder == None:
280 280 remotefolder = ftpClientObj.pwd()
281 281 else:
282 282 try:
283 283 ftpClientObj.cwd(remotefolder)
284 284 except ftplib.all_errors:
285 285 print("[FTP Server]: FTP remote folder is invalid: %s" %remotefolder)
286 286 remotefolder = ftpClientObj.pwd()
287 287
288 288 self.server = server
289 289 self.username = username
290 290 self.password = password
291 291 self.remotefolder = remotefolder
292 292 self.__ftpClientObj = ftpClientObj
293 293 self.status = 1
294 294
295 295 return 1
296 296
297 297 def close(self):
298 298 """
299 299 Close connection to remote server
300 300 """
301 301 if not self.status:
302 302 return 0
303 303
304 304 self.__ftpClientObj.close()
305 305
306 306 def mkdir(self, remotefolder):
307 307 """
308 308 mkdir is used to make a new directory in remote server
309 309
310 310 Input:
311 311 remotefolder - directory name
312 312
313 313 Return:
314 314 0 in error case else 1
315 315 """
316 316 if not self.status:
317 317 return 0
318 318
319 319 try:
320 320 self.__ftpClientObj.mkd(dirname)
321 321 except ftplib.all_errors:
322 322 print("[FTP Server]: Error creating remote folder: %s" %remotefolder)
323 323 return 0
324 324
325 325 return 1
326 326
327 327 def cd(self, remotefolder):
328 328 """
329 329 cd is used to change remote working directory on server
330 330
331 331 Input:
332 332 remotefolder - current working directory
333 333
334 334 Affects:
335 335 self.remotefolder
336 336
337 337 Return:
338 338 0 in case of error else 1
339 339 """
340 340 if not self.status:
341 341 return 0
342 342
343 343 if remotefolder == self.remotefolder:
344 344 return 1
345 345
346 346 try:
347 347 self.__ftpClientObj.cwd(remotefolder)
348 348 except ftplib.all_errors:
349 349 print('[FTP Server]: Error changing to %s' %remotefolder)
350 350 print('[FTP Server]: Trying to create remote folder')
351 351
352 352 if not self.mkdir(remotefolder):
353 353 print('[FTP Server]: Remote folder could not be created')
354 354 return 0
355 355
356 356 try:
357 357 self.__ftpClientObj.cwd(remotefolder)
358 358 except ftplib.all_errors:
359 359 return 0
360 360
361 361 self.remotefolder = remotefolder
362 362
363 363 return 1
364 364
365 365 def sendFile(self, fullfilename):
366 366
367 367 if not self.status:
368 368 return 0
369 369
370 370 fp = open(fullfilename, 'rb')
371 371
372 372 filename = os.path.basename(fullfilename)
373 373
374 374 command = "STOR %s" %filename
375 375
376 376 try:
377 377 self.__ftpClientObj.storbinary(command, fp)
378 378 except ftplib.all_errors as e:
379 379 print("[FTP Server]:", e)
380 380 return 0
381 381
382 382 try:
383 383 self.__ftpClientObj.sendcmd('SITE CHMOD 755 ' + filename)
384 384 except ftplib.all_errors as e:
385 385 print("[FTP Server]:", e)
386 386
387 387 fp.close()
388 388
389 389 return 1
390 390
391 391 class SSHClient(Remote):
392 392
393 393 __sshClientObj = None
394 394 __scpClientObj = None
395 395
396 396
397 397 def __init__(self, server, username, password, remotefolder, period=60,key_filename=None):
398 398 """
399 399 """
400 400 Remote.__init__(self, server, username, password, remotefolder, period, key_filename)
401 401
402 402 def open(self, server, username, password, remotefolder, port=22, key_filename=None):
403 403
404 404 """
405 405 This method is used to set SSH parameters and establish a connection to a remote server
406 406
407 407 Inputs:
408 408 server - remote server IP Address
409 409
410 410 username - remote server Username
411 411
412 412 password - remote server password
413 413
414 414 remotefolder - remote server current working directory
415 415
416 416 key_filename - filename of the private key/optional
417 417
418 418 Return: void
419 419
420 420 Affects:
421 421 self.status - in case of error or fail connection this parameter is set to 0 else 1
422 422
423 423 """
424 424 #import socket
425 425
426 426 if server == None:
427 427 raise ValueError("SSH server should be defined")
428 428
429 429 if username == None:
430 430 raise ValueError("SSH username should be defined")
431 431
432 432 if password == None:
433 433 raise ValueError("SSH password should be defined")
434 434
435 435 if remotefolder == None:
436 436 raise ValueError("SSH remote folder should be defined")
437 437
438 438 self.__sshClientObj = paramiko.SSHClient()
439 439
440 440 self.__sshClientObj.load_system_host_keys()
441 441 self.__sshClientObj.set_missing_host_key_policy(paramiko.WarningPolicy())
442 442
443 443 self.status = 0
444 444
445 445 try:
446 446 if key_filename != None:
447 447 self.__sshClientObj.connect(server, username=username, password=password, port=port, key_filename=key_filename)
448 448 else:
449 449 self.__sshClientObj.connect(server, username=username, password=password, port=port)
450 450 except paramiko.AuthenticationException as e:
451 451 # print "SSH username or password are incorrect: %s"
452 452 print("[SSH Server]:", e)
453 453 return 0
454 454 # except SSHException as e:
455 455 # print("[SSH Server]:", e)
456 456 # return 0
457 457 # except socket.error:
458 458 # self.status = 0
459 459 # print("[SSH Server]:", e)
460 460 # return 0
461 461
462 462 self.status = 1
463 463 #self.__scpClientObj = scp.SCPClient(self.__sshClientObj.get_transport(), socket_timeout=30)
464 464 self.__scpClientObj = self.__sshClientObj.open_sftp()
465 465 if remotefolder == None:
466 466 remotefolder = self.pwd()
467 467
468 468 self.server = server
469 469 self.username = username
470 470 self.password = password
471 471 # self.__sshClientObj = self.__sshClientObj
472 472 # self.__scpClientObj = self.__scpClientObj
473 473 self.status = 1
474 474
475 475 if not self.cd(remotefolder):
476 476 raise ValueError("[SSH Server]: Could not access to remote folder: %s" %remotefolder)
477 477 return 0
478 478
479 479 self.remotefolder = remotefolder
480 480
481 481 return 1
482 482
483 483 def close(self):
484 484 """
485 485 Close connection to remote server
486 486 """
487 487 if not self.status:
488 488 return 0
489 489
490 490 self.__scpClientObj.close()
491 491 self.__sshClientObj.close()
492 492
493 493 def __execute(self, command):
494 494 """
495 495 __execute a command on remote server
496 496
497 497 Input:
498 498 command - Exmaple 'ls -l'
499 499
500 500 Return:
501 501 0 in error case else 1
502 502 """
503 503 if not self.status:
504 504 return 0
505 505
506 506 stdin, stdout, stderr = self.__sshClientObj.exec_command(command)
507 507
508 508 result = stderr.readlines()
509 509 if len(result) > 1:
510 510 return 0
511 511
512 512 result = stdout.readlines()
513 513 if len(result) > 1:
514 514 return result[0][:-1]
515 515
516 516 return 1
517 517
518 518 def mkdir(self, remotefolder):
519 519 """
520 520 mkdir is used to make a new directory in remote server
521 521
522 522 Input:
523 523 remotefolder - directory name
524 524
525 525 Return:
526 526 0 in error case else 1
527 527 """
528 528
529 529 command = 'mkdir %s' %remotefolder
530 530
531 531 return self.__execute(command)
532 532
533 533 def pwd(self):
534 534
535 535 command = 'pwd'
536 536
537 537 return self.__execute(command)
538 538
539 539 def cd(self, remotefolder):
540 540 """
541 541 cd is used to change remote working directory on server
542 542
543 543 Input:
544 544 remotefolder - current working directory
545 545
546 546 Affects:
547 547 self.remotefolder
548 548
549 549 Return:
550 550 0 in case of error else 1
551 551 """
552 552 if not self.status:
553 553 return 0
554 554
555 555 if remotefolder == self.remotefolder:
556 556 return 1
557 557
558 558 chk_command = "cd %s; pwd" %remotefolder
559 559 mkdir_command = "mkdir %s" %remotefolder
560 560
561 561 if not self.__execute(chk_command):
562 562 if not self.__execute(mkdir_command):
563 563 self.remotefolder = None
564 564 return 0
565 565
566 566 self.remotefolder = remotefolder
567 567
568 568 return 1
569 569
570 570 def sendFile(self, fullfilename):
571 571
572 572 if not self.status:
573 573 return 0
574 574
575 575 remotefile = os.path.join(self.remotefolder, os.path.split(fullfilename)[-1])
576 576 print("remotefile",fullfilename, remotefile)
577 577
578 578 try:
579 579 self.__scpClientObj.put(fullfilename,remotefile)
580 580 except paramiko.SSHException as e:
581 581 print("[SSH Server]", str(e))
582 582 print(fullfilename," to ",remotefile)
583 583 return 0
584 584
585 585
586 586 #command = 'chmod 775 %s' %remotefile
587 587
588 588 return 1#self.__execute(command)
589 589 #@MPDecorator
590 590 class SendToServerProc(ProcessingUnit):
591 591
592 592 sendByTrigger = False
593 593
594 594 def __init__(self, **kwargs):
595 595
596 596 ProcessingUnit.__init__(self)
597 597
598 598 self.isConfig = False
599 599 self.clientObj = None
600 600 self.dataOut = Parameters()
601 601 self.dataOut.error=False
602 602 self.dataOut.flagNoData=True
603 603
604 604 def setup(self, server=None, username="", password="", remotefolder="", localfolder="",
605 605 ext='.png', period=60, protocol='ftp', sendByTrigger=False, key_filename=None):
606 606 self.server = server
607 607 self.username = username
608 608 self.password = password
609 609 self.remotefolder = remotefolder
610 610 self.clientObj = None
611 611 self.localfolder = localfolder
612 612 self.ext = ext
613 613 self.sendByTrigger = sendByTrigger
614 614 self.period = period
615 615 self.key_filename = key_filename
616 616 if self.sendByTrigger:
617 617 self.period = 1000000000000 #para que no se ejecute por tiempo
618 618
619 619 if str.lower(protocol) == 'ftp':
620 620 self.clientObj = FTPClient(server, username, password, remotefolder, period)
621 621
622 622 if str.lower(protocol) == 'ssh':
623 623 self.clientObj = SSHClient(self.server, self.username, self.password,
624 624 self.remotefolder, period=600000,key_filename=self.key_filename)
625 625
626 626 if not self.clientObj:
627 627 raise ValueError("%s has been chosen as remote access protocol but it is not valid" %protocol)
628 628
629 629 print("Send to Server setup complete")
630 630
631 631
632 632 def findFiles(self):
633 633
634 634 if not type(self.localfolder) == list:
635 635 folderList = [self.localfolder]
636 636 else:
637 637 folderList = self.localfolder
638 638
639 639 #Remove duplicate items
640 640 folderList = list(set(folderList))
641 641
642 642 fullfilenameList = []
643 643
644 644 for thisFolder in folderList:
645 645
646 646 print("[Remote Server]: Searching files on %s" %thisFolder)
647 647
648 648 filenameList = glob.glob1(thisFolder, '*%s' %self.ext)
649 649
650 650 if len(filenameList) < 1:
651 651
652 652 continue
653 653
654 654 for thisFile in filenameList:
655 655 fullfilename = os.path.join(thisFolder, thisFile)
656 656
657 657 if fullfilename in fullfilenameList:
658 658 continue
659 659
660 660 #Only files modified in the last 30 minutes are considered
661 661 if os.path.getmtime(fullfilename) < time.time() - 30*60:
662 662 continue
663 663
664 664 fullfilenameList.append(fullfilename)
665 665 fullfilenameList.sort()
666 666
667 667 return fullfilenameList
668 668
669 669 def run(self, **kwargs):
670 670
671 671 if not self.isConfig:
672 672 self.init = time.time()
673 673 self.setup(**kwargs)
674 674 self.isConfig = True
675 675
676 676 if not self.clientObj.is_alive():
677 677 print("[Remote Server]: Restarting connection ")
678 678 self.setup( **kwargs)
679 679
680 680 if ((time.time() - self.init) >= self.period and not self.sendByTrigger) or (self.sendByTrigger and not self.dataIn.flagNoData):
681 681 fullfilenameList = self.findFiles()
682 682 if self.sendByTrigger:
683 683 if self.clientObj.upload(fullfilenameList[-1]): #last file to send
684 684 print("[Remote Server] upload finished successfully")
685 685 else:
686 686 for file in fullfilenameList:
687 687 self.clientObj.upload(file)
688 688
689 689 # if self.clientObj.updateFileList(fullfilenameList):
690 690 # print("[Remote Server]: Sending the next files ", str(fullfilenameList))
691 691
692 692 self.init = time.time()
693 693
694 694 def close(self):
695 695 print("[Remote Server] Stopping thread")
696 696 self.clientObj.stop()
697 697
698 698 class SendByRSYNCProc(ProcessingUnit):
699 699
700 700 sendByTrigger = False
701 701
702 702 def __init__(self, **kwargs):
703 703
704 704 ProcessingUnit.__init__(self)
705 705
706 706 self.isConfig = False
707 707 self.dataOut = Parameters()
708 708 self.dataOut.error=False
709 709 self.dataOut.flagNoData=True
710 710
711 711 def setup(self, server="", username="", remotefolder="", localfolder="",sendByTrigger=True,
712 712 period=60, key_filename=None, port=22 ,param1="", param2=""):
713 713 self.server = server
714 714 self.username = username
715 715 self.remotefolder = remotefolder
716 716 self.localfolder = localfolder
717 717 self.period = period
718 718 self.key_filename = key_filename
719 719 if type(param1)==str:
720 720 self.param1 = list(param1.split(","))
721 721 else:
722 722 self.param1 = param1
723 723 if type(param2)==str:
724 724 self.param2 = list(param2.split(","))
725 725 else:
726 726 self.param2 = param2
727 727 self.port = port
728 728 self.sendByTrigger = sendByTrigger
729 729 if self.sendByTrigger:
730 730 self.period = 1000000000000 #para que no se ejecute por tiempo
731 731 self.command ="rsync "
732 732
733 733 def syncFolders(self):
734 734 self.command ="rsync "
735 735 for p1 in self.param1:
736 736 self.command += " -"+str(p1)
737 737 for p2 in self.param2:
738 738 self.command += " --"+str(p2)
739 739 if self.key_filename != None:
740 740 self.command += """ "ssh -i {} -p {}" """.format(self.key_filename, self.port)
741 741 self.command += " {} ".format(self.localfolder)
742 742 self.command += " {}@{}:{}".format(self.username,self.server,self.remotefolder)
743 743 print("CMD: ",self.command)
744 744 #os.system(self.command)
745 745 return
746 746
747 747 def run(self, **kwargs):
748 748
749 749 if not self.isConfig:
750 750 self.init = time.time()
751 751 self.setup(**kwargs)
752 752 self.isConfig = True
753 753
754 754 if self.sendByTrigger and not self.dataIn.flagNoData:
755 755 self.syncFolders()
756 756 else:
757 757 if (time.time() - self.init) >= self.period:
758 758 self.syncFolders()
759 759 self.init = time.time()
760 760
761 761 return
762 762
763 763
764 764
765 765 class FTP(object):
766 766 """
767 767 Ftp is a public class used to define custom File Transfer Protocol from "ftplib" python module
768 768
769 769 Non-standard Python modules used: None
770 770
771 771 Written by "Daniel Suarez":mailto:daniel.suarez@jro.igp.gob.pe Oct. 26, 2010
772
773 Modified:
774 Joab Apaza Feb. 2022
772 775 """
773 776
774 777 def __init__(self,server = None, username=None, password=None, remotefolder=None):
775 778 """
776 779 This method is used to setting parameters for FTP and establishing connection to remote server
777 780
778 781 Inputs:
779 782 server - remote server IP Address
780 783
781 784 username - remote server Username
782 785
783 786 password - remote server password
784 787
785 788 remotefolder - remote server current working directory
786 789
787 790 Return: void
788 791
789 792 Affects:
790 793 self.status - in Error Case or Connection Failed this parameter is set to 1 else 0
791 794
792 795 self.folderList - sub-folder list of remote folder
793 796
794 797 self.fileList - file list of remote folder
795 798
796 799
797 800 """
798 801
799 802 if ((server == None) and (username==None) and (password==None) and (remotefolder==None)):
800 803 server, username, password, remotefolder = self.parmsByDefault()
801 804
802 805 self.server = server
803 806 self.username = username
804 807 self.password = password
805 808 self.remotefolder = remotefolder
806 809 self.file = None
807 810 self.ftp = None
808 811 self.status = 0
809 812
810 813 try:
811 814 self.ftp = ftplib.FTP(self.server)
812 815 self.ftp.login(self.username,self.password)
813 816 self.ftp.cwd(self.remotefolder)
814 817 # print 'Connect to FTP Server: Successfully'
815 818
816 819 except ftplib.all_errors:
817 820 print('Error FTP Service')
818 821 self.status = 1
819 822 return
820 823
821 824
822 825
823 826 self.dirList = []
824 827
825 828 try:
826 829 self.dirList = self.ftp.nlst()
827 830
828 831 except ftplib.error_perm as resp:
829 832 if str(resp) == "550 No files found":
830 833 print("no files in this directory")
831 834 self.status = 1
832 835 return
833 836
834 837 except ftplib.all_errors:
835 838 print('Error Displaying Dir-Files')
836 839 self.status = 1
837 840 return
838 841
839 842 self.fileList = []
840 843 self.folderList = []
841 844 #only for test
842 845 for f in self.dirList:
843 846 name, ext = os.path.splitext(f)
844 847 if ext != '':
845 848 self.fileList.append(f)
846 849 # print 'filename: %s - size: %d'%(f,self.ftp.size(f))
847 850
848 851 def parmsByDefault(self):
849 852 server = 'jro-app.igp.gob.pe'
850 853 username = 'wmaster'
851 854 password = 'mst2010vhf'
852 855 remotefolder = '/home/wmaster/graficos'
853 856
854 857 return server, username, password, remotefolder
855 858
856 859
857 860 def mkd(self,dirname):
858 861 """
859 862 mkd is used to make directory in remote server
860 863
861 864 Input:
862 865 dirname - directory name
863 866
864 867 Return:
865 868 1 in error case else 0
866 869 """
867 870 try:
868 871 self.ftp.mkd(dirname)
869 872 except:
870 873 print('Error creating remote folder:%s'%dirname)
871 return 1
874 return False
872 875
873 return 0
876 return True
874 877
875 878
876 879 def delete(self,filename):
877 880 """
878 881 delete is used to delete file in current working directory of remote server
879 882
880 883 Input:
881 884 filename - filename to delete in remote folder
882 885
883 886 Return:
884 887 1 in error case else 0
885 888 """
886 889
887 890 try:
888 891 self.ftp.delete(filename)
889 892 except:
890 893 print('Error deleting remote file:%s'%filename)
891 return 1
894 return False
892 895
893 return 0
896 return True
894 897
895 898 def download(self,filename,localfolder):
896 899 """
897 900 download is used to downloading file from remote folder into local folder
898 901
899 902 Inputs:
900 903 filename - filename to donwload
901 904
902 905 localfolder - directory local to store filename
903 906
904 907 Returns:
905 908 self.status - 1 in error case else 0
906 909 """
907 910
908 911 self.status = 0
909 912
910 913
911 914 if not(filename in self.fileList):
912 915 print('filename:%s not exists'%filename)
913 916 self.status = 1
914 917 return self.status
915 918
916 919 newfilename = os.path.join(localfolder,filename)
917 920
918 921 self.file = open(newfilename, 'wb')
919 922
920 923 try:
921 924 print('Download: ' + filename)
922 925 self.ftp.retrbinary('RETR ' + filename, self.__handleDownload)
923 926 print('Download Complete')
924 927 except ftplib.all_errors:
925 928 print('Error Downloading ' + filename)
926 929 self.status = 1
927 930 return self.status
928 931
929 932 self.file.close()
930 933
931 934 return self.status
932 935
933 936
934 937 def __handleDownload(self,block):
935 938 """
936 939 __handleDownload is used to handle writing file
937 940 """
938 941 self.file.write(block)
939 942
940 943
941 def upload(self,filename,remotefolder=None):
944 def upload(self,filename,remotefolder=None, mkdir=False):
942 945 """
943 upload is used to uploading local file to remote directory
946 upload is used to uploading local file to remote directory, and change the permission of the remote file
944 947
945 948 Inputs:
946 949 filename - full path name of local file to store in remote directory
947 950
948 951 remotefolder - remote directory
949 952
953 mkdir - if the remote folder doesn't exist, it will created
954
950 955 Returns:
951 956 self.status - 1 in error case else 0
952 957 """
953 958
954 959 if remotefolder == None:
955 960 remotefolder = self.remotefolder
956 961
962 if mkdir:
963 if self.if_dir_exist(remotefolder):
964 pass
965 else:
966 self.mkdir_r(remotefolder)
967
957 968 self.status = 0
958 969
959 970 try:
960 971 self.ftp.cwd(remotefolder)
961 972
962 973 self.file = open(filename, 'rb')
963 974
964 975 (head, tail) = os.path.split(filename)
965 976
966 977 command = "STOR " + tail
967 978
968 979 print('Uploading: ' + tail)
969 980 self.ftp.storbinary(command, self.file)
981 print(self.cmd('SITE CHMOD 755 {}'.format(tail)))
970 982 print('Upload Completed')
971 983
972 984 except ftplib.all_errors:
973 985 print('Error Uploading ' + tail)
974 986 self.status = 1
975 987 return self.status
976 988
977 989 self.file.close()
978 990
979 991 #back to initial directory in __init__()
980 992 self.ftp.cwd(self.remotefolder)
981 993
982 994 return self.status
983 995
984 996
985 def dir(self,remotefolder):
997 def ch_dir(self,remotefolder):
986 998 """
987 dir is used to change working directory of remote server and get folder and file list
999 ch_dir is used to change working directory of remote server and get folder and file list
988 1000
989 1001 Input:
990 1002 remotefolder - current working directory
991 1003
992 1004 Affects:
993 1005 self.fileList - file list of working directory
994 1006
995 1007 Return:
996 1008 infoList - list with filenames and size of file in bytes
997 1009
998 1010 self.folderList - folder list
999 1011 """
1000 1012
1001 1013 self.remotefolder = remotefolder
1002 1014 print('Change to ' + self.remotefolder)
1003 1015 try:
1004 1016 self.ftp.cwd(remotefolder)
1005 1017 except ftplib.all_errors:
1006 1018 print('Error Change to ' + self.remotefolder)
1007 1019 infoList = None
1008 1020 self.folderList = None
1009 1021 return infoList,self.folderList
1010 1022
1011 1023 self.dirList = []
1012 1024
1013 1025 try:
1014 1026 self.dirList = self.ftp.nlst()
1015 1027
1016 1028 except ftplib.error_perm as resp:
1017 1029 if str(resp) == "550 No files found":
1018 1030 print("no files in this directory")
1019 1031 infoList = None
1020 1032 self.folderList = None
1021 1033 return infoList,self.folderList
1022 1034 except ftplib.all_errors:
1023 1035 print('Error Displaying Dir-Files')
1024 1036 infoList = None
1025 1037 self.folderList = None
1026 1038 return infoList,self.folderList
1027 1039
1028 1040 infoList = []
1029 1041 self.fileList = []
1030 1042 self.folderList = []
1031 1043 for f in self.dirList:
1032 1044 name,ext = os.path.splitext(f)
1033 1045 if ext != '':
1034 1046 self.fileList.append(f)
1035 1047 value = (f,self.ftp.size(f))
1036 1048 infoList.append(value)
1037 1049
1038 1050 if ext == '':
1039 1051 self.folderList.append(f)
1040 1052
1041 1053 return infoList,self.folderList
1042
1043
1044 1054 def close(self):
1045 1055 """
1046 1056 close is used to close and end FTP connection
1047 1057
1048 1058 Inputs: None
1049 1059
1050 1060 Return: void
1051 1061
1052 1062 """
1053 1063 self.ftp.close()
1064
1065 def get_sub_dirs(self, path):
1066 """
1067 used internal
1068
1069 Inputs:
1070 path - path to split in sub folders
1071
1072 Returns:
1073 sub_dirs - list of sub folders
1074 """
1075 sub_dirs = path.split("/")
1076 if sub_dirs[0]=="/":
1077 sub_dirs.pop(0)
1078 if sub_dirs[-1]=="/":
1079 sub_dirs.pop(-1)
1080 return sub_dirs
1081
1082 def if_dir_exist(self,path):
1083 """
1084 check if a the path folder exists in the ftp server
1085
1086 Inputs:
1087 path - path to check
1088
1089 Returns:
1090 status - True if exists and False if it doesn't
1091 """
1092 sub_dirs = self.get_sub_dirs(path)
1093 main = self.ftp.pwd()
1094 #print(main)
1095 for subdir in sub_dirs:
1096 folders = self.ftp.nlst(main)
1097 #print(folders)
1098 if (os.path.join(main,subdir) in folders):
1099 main = os.path.join(main,subdir)
1100 #print(main)
1101 continue
1102 else:
1103 return False
1104 return True
1105
1106 def cmd(self,command):
1107 """
1108 excecute a command in the FTP server
1109 """
1110 return self.ftp.sendcmd(command)
1111
1112 def mkdir_r(self,path):
1113 """
1114 create a remote folder and create sub folders if it is necessary
1115
1116 Inputs:
1117 path - path to create
1118
1119 Returns:
1120 status - True if succesfull else False
1121 """
1122 sub_dirs = self.get_sub_dirs(path)
1123 main = self.ftp.pwd()
1124 st = False
1125 #print(main)
1126 for subdir in sub_dirs:
1127 folders = self.ftp.nlst(main)
1128 #print(folders)
1129 folder = (os.path.join(main,subdir))
1130
1131 if (folder in folders):
1132 main = folder
1133 #print("new_main",main)
1134 continue
1135 else:
1136 print("creating...",folder)
1137 st = self.mkd(folder)
1138 print(self.cmd('SITE CHMOD 755 {}'.format(folder)))
1139 main = folder
1140
1141 return st
1142
1054 1143 @MPDecorator
1055 1144 class SendByFTP(Operation):
1056 1145
1057 1146 def __init__(self, **kwargs):
1058 1147 Operation.__init__(self, **kwargs)
1059 1148 self.status = 1
1060 1149 self.counter = 0
1061 1150
1062 1151 def error_print(self, ValueError):
1063 1152
1064 1153 print(ValueError, 'Error FTP')
1065 1154 print("don't worry the program is running...")
1066 1155
1067 1156 def worker_ftp(self, server, username, password, remotefolder, filenameList):
1068 1157
1069 1158 self.ftpClientObj = FTP(server, username, password, remotefolder)
1070 1159 for filename in filenameList:
1071 1160 self.ftpClientObj.upload(filename)
1072 1161 self.ftpClientObj.close()
1073 1162
1074 1163 def ftp_thread(self, server, username, password, remotefolder):
1075 1164 if not(self.status):
1076 1165 return
1077 1166
1078 1167 import multiprocessing
1079 1168
1080 1169 p = multiprocessing.Process(target=self.worker_ftp, args=(server, username, password, remotefolder, self.filenameList,))
1081 1170 p.start()
1082 1171
1083 1172 p.join(3)
1084 1173
1085 1174 if p.is_alive():
1086 1175 p.terminate()
1087 1176 p.join()
1088 1177 print('killing ftp process...')
1089 1178 self.status = 0
1090 1179 return
1091 1180
1092 1181 self.status = 1
1093 1182 return
1094 1183
1095 1184 def filterByExt(self, ext, localfolder):
1096 1185 fnameList = glob.glob1(localfolder,ext)
1097 1186 self.filenameList = [os.path.join(localfolder,x) for x in fnameList]
1098 1187
1099 1188 if len(self.filenameList) == 0:
1100 1189 self.status = 0
1101 1190
1102 1191 def run(self, dataOut, ext, localfolder, remotefolder, server, username, password, period=1):
1103 1192
1104 1193 self.counter += 1
1105 1194 if self.counter >= period:
1106 1195 self.filterByExt(ext, localfolder)
1107 1196
1108 1197 self.ftp_thread(server, username, password, remotefolder)
1109 1198
1110 1199 self.counter = 0
1111 1200
1112 1201 self.status = 1
General Comments 0
You need to be logged in to leave comments. Login now