##// END OF EJS Templates
corregido reorganización de apuntes en kamisr
joabAM -
r1420:78c29438e197
parent child
Show More
@@ -1,659 +1,660
1 1 ''''
2 2 Created on Set 9, 2015
3 3
4 4 @author: roj-idl71 Karim Kuyeng
5 5
6 6 @update: 2021, Joab Apaza
7 7 '''
8 8
9 9 import os
10 10 import sys
11 11 import glob
12 12 import fnmatch
13 13 import datetime
14 14 import time
15 15 import re
16 16 import h5py
17 17 import numpy
18 18
19 19 try:
20 20 from gevent import sleep
21 21 except:
22 22 from time import sleep
23 23
24 24 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
25 25 from schainpy.model.data.jrodata import Voltage
26 26 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
27 27 from numpy import imag
28 28
29 29
30 30 class AMISRReader(ProcessingUnit):
31 31 '''
32 32 classdocs
33 33 '''
34 34
35 35 def __init__(self):
36 36 '''
37 37 Constructor
38 38 '''
39 39
40 40 ProcessingUnit.__init__(self)
41 41
42 42 self.set = None
43 43 self.subset = None
44 44 self.extension_file = '.h5'
45 45 self.dtc_str = 'dtc'
46 46 self.dtc_id = 0
47 47 self.status = True
48 48 self.isConfig = False
49 49 self.dirnameList = []
50 50 self.filenameList = []
51 51 self.fileIndex = None
52 52 self.flagNoMoreFiles = False
53 53 self.flagIsNewFile = 0
54 54 self.filename = ''
55 55 self.amisrFilePointer = None
56 56 self.realBeamCode = []
57 57 self.beamCodeMap = None
58 58 self.azimuthList = []
59 59 self.elevationList = []
60 60 self.dataShape = None
61
61 self.flag_old_beams = False
62 62
63 63
64 64 self.profileIndex = 0
65 65
66 66
67 67 self.beamCodeByFrame = None
68 68 self.radacTimeByFrame = None
69 69
70 70 self.dataset = None
71 71
72 72 self.__firstFile = True
73 73
74 74 self.buffer = None
75 75
76 76 self.timezone = 'ut'
77 77
78 78 self.__waitForNewFile = 20
79 79 self.__filename_online = None
80 80 #Is really necessary create the output object in the initializer
81 81 self.dataOut = Voltage()
82 82 self.dataOut.error=False
83 83
84 84
85 85 def setup(self,path=None,
86 86 startDate=None,
87 87 endDate=None,
88 88 startTime=None,
89 89 endTime=None,
90 90 walk=True,
91 91 timezone='ut',
92 92 all=0,
93 93 code = None,
94 94 nCode = 0,
95 95 nBaud = 0,
96 online=False):
96 online=False
97 old_beams=False):
97 98
98 99
99 100
100 101 self.timezone = timezone
101 102 self.all = all
102 103 self.online = online
103
104 self.flag_old_beams = old_beams
104 105 self.code = code
105 106 self.nCode = int(nCode)
106 107 self.nBaud = int(nBaud)
107 108
108 109
109 110
110 111 #self.findFiles()
111 112 if not(online):
112 113 #Busqueda de archivos offline
113 114 self.searchFilesOffLine(path, startDate, endDate, startTime, endTime, walk)
114 115 else:
115 116 self.searchFilesOnLine(path, startDate, endDate, startTime,endTime,walk)
116 117
117 118 if not(self.filenameList):
118 119 raise schainpy.admin.SchainWarning("There is no files into the folder: %s"%(path))
119 120 sys.exit()
120 121
121 122 self.fileIndex = 0
122 123
123 124 self.readNextFile(online)
124 125
125 126 '''
126 127 Add code
127 128 '''
128 129 self.isConfig = True
129 130 # print("Setup Done")
130 131 pass
131 132
132 133
133 134 def readAMISRHeader(self,fp):
134 135
135 136 if self.isConfig and (not self.flagNoMoreFiles):
136 137 newShape = fp.get('Raw11/Data/Samples/Data').shape[1:]
137 138 if self.dataShape != newShape and newShape != None:
138 139 raise schainpy.admin.SchainError("NEW FILE HAS A DIFFERENT SHAPE: ")
139 140 print(self.dataShape,newShape,"\n")
140 141 return 0
141 142 else:
142 143 self.dataShape = fp.get('Raw11/Data/Samples/Data').shape[1:]
143 144
144 145
145 146 header = 'Raw11/Data/RadacHeader'
146 147 self.beamCodeByPulse = fp.get(header+'/BeamCode') # LIST OF BEAMS PER PROFILE, TO BE USED ON REARRANGE
147 if (self.startDate> datetime.date(2021, 7, 15)): #Se cambió la forma de extracción de Apuntes el 17
148 if (self.startDate> datetime.date(2021, 7, 15)) or self.flag_old_beams: #Se cambió la forma de extracción de Apuntes el 17 o forzar con flag de reorganización
148 149 self.beamcodeFile = fp['Setup/Beamcodefile'][()].decode()
149 150 self.trueBeams = self.beamcodeFile.split("\n")
150 151 self.trueBeams.pop()#remove last
151 152 [self.realBeamCode.append(x) for x in self.trueBeams if x not in self.realBeamCode]
152 153 self.beamCode = [int(x, 16) for x in self.realBeamCode]
153 154 else:
154 155 _beamCode= fp.get('Raw11/Data/Beamcodes') #se usa la manera previa al cambio de apuntes
155 156 self.beamCode = _beamCode[0,:]
156 157
157 158 if self.beamCodeMap == None:
158 159 self.beamCodeMap = fp['Setup/BeamcodeMap']
159 160 for beam in self.beamCode:
160 161 beamAziElev = numpy.where(self.beamCodeMap[:,0]==beam)
161 162 beamAziElev = beamAziElev[0].squeeze()
162 163 self.azimuthList.append(self.beamCodeMap[beamAziElev,1])
163 164 self.elevationList.append(self.beamCodeMap[beamAziElev,2])
164 165 #print("Beamssss: ",self.beamCodeMap[beamAziElev,1],self.beamCodeMap[beamAziElev,2])
165 166 #print(self.beamCode)
166 167 #self.code = fp.get(header+'/Code') # NOT USE FOR THIS
167 168 self.frameCount = fp.get(header+'/FrameCount')# NOT USE FOR THIS
168 169 self.modeGroup = fp.get(header+'/ModeGroup')# NOT USE FOR THIS
169 170 self.nsamplesPulse = fp.get(header+'/NSamplesPulse')# TO GET NSA OR USING DATA FOR THAT
170 171 self.pulseCount = fp.get(header+'/PulseCount')# NOT USE FOR THIS
171 172 self.radacTime = fp.get(header+'/RadacTime')# 1st TIME ON FILE ANDE CALCULATE THE REST WITH IPP*nindexprofile
172 173 self.timeCount = fp.get(header+'/TimeCount')# NOT USE FOR THIS
173 174 self.timeStatus = fp.get(header+'/TimeStatus')# NOT USE FOR THIS
174 175 self.rangeFromFile = fp.get('Raw11/Data/Samples/Range')
175 176 self.frequency = fp.get('Rx/Frequency')
176 177 txAus = fp.get('Raw11/Data/Pulsewidth')
177 178
178 179
179 180 self.nblocks = self.pulseCount.shape[0] #nblocks
180 181
181 182 self.nprofiles = self.pulseCount.shape[1] #nprofile
182 183 self.nsa = self.nsamplesPulse[0,0] #ngates
183 184 self.nchannels = len(self.beamCode)
184 185 self.ippSeconds = (self.radacTime[0][1] -self.radacTime[0][0]) #Ipp in seconds
185 186 #self.__waitForNewFile = self.nblocks # wait depending on the number of blocks since each block is 1 sec
186 187 self.__waitForNewFile = self.nblocks * self.nprofiles * self.ippSeconds # wait until new file is created
187 188
188 189 #filling radar controller header parameters
189 190 self.__ippKm = self.ippSeconds *.15*1e6 # in km
190 191 self.__txA = (txAus.value)*.15 #(ipp[us]*.15km/1us) in km
191 192 self.__txB = 0
192 193 nWindows=1
193 194 self.__nSamples = self.nsa
194 195 self.__firstHeight = self.rangeFromFile[0][0]/1000 #in km
195 196 self.__deltaHeight = (self.rangeFromFile[0][1] - self.rangeFromFile[0][0])/1000
196 197
197 198 #for now until understand why the code saved is different (code included even though code not in tuf file)
198 199 #self.__codeType = 0
199 200 # self.__nCode = None
200 201 # self.__nBaud = None
201 202 self.__code = self.code
202 203 self.__codeType = 0
203 204 if self.code != None:
204 205 self.__codeType = 1
205 206 self.__nCode = self.nCode
206 207 self.__nBaud = self.nBaud
207 208 #self.__code = 0
208 209
209 210 #filling system header parameters
210 211 self.__nSamples = self.nsa
211 212 self.newProfiles = self.nprofiles/self.nchannels
212 213 self.__channelList = list(range(self.nchannels))
213 214
214 215 self.__frequency = self.frequency[0][0]
215 216
216 217
217 218 return 1
218 219
219 220
220 221 def createBuffers(self):
221 222
222 223 pass
223 224
224 225 def __setParameters(self,path='', startDate='',endDate='',startTime='', endTime='', walk=''):
225 226 self.path = path
226 227 self.startDate = startDate
227 228 self.endDate = endDate
228 229 self.startTime = startTime
229 230 self.endTime = endTime
230 231 self.walk = walk
231 232
232 233 def __checkPath(self):
233 234 if os.path.exists(self.path):
234 235 self.status = 1
235 236 else:
236 237 self.status = 0
237 238 print('Path:%s does not exists'%self.path)
238 239
239 240 return
240 241
241 242
242 243 def __selDates(self, amisr_dirname_format):
243 244 try:
244 245 year = int(amisr_dirname_format[0:4])
245 246 month = int(amisr_dirname_format[4:6])
246 247 dom = int(amisr_dirname_format[6:8])
247 248 thisDate = datetime.date(year,month,dom)
248 249 #margen de un día extra, igual luego se filtra for fecha y hora
249 250 if (thisDate>=(self.startDate - datetime.timedelta(days=1)) and thisDate <= (self.endDate)+ datetime.timedelta(days=1)):
250 251 return amisr_dirname_format
251 252 except:
252 253 return None
253 254
254 255
255 256 def __findDataForDates(self,online=False):
256 257
257 258 if not(self.status):
258 259 return None
259 260
260 261 pat = '\d+.\d+'
261 262 dirnameList = [re.search(pat,x) for x in os.listdir(self.path)]
262 263 dirnameList = [x for x in dirnameList if x!=None]
263 264 dirnameList = [x.string for x in dirnameList]
264 265 if not(online):
265 266 dirnameList = [self.__selDates(x) for x in dirnameList]
266 267 dirnameList = [x for x in dirnameList if x!=None]
267 268 if len(dirnameList)>0:
268 269 self.status = 1
269 270 self.dirnameList = dirnameList
270 271 self.dirnameList.sort()
271 272 else:
272 273 self.status = 0
273 274 return None
274 275
275 276 def __getTimeFromData(self):
276 277 startDateTime_Reader = datetime.datetime.combine(self.startDate,self.startTime)
277 278 endDateTime_Reader = datetime.datetime.combine(self.endDate,self.endTime)
278 279
279 280 print('Filtering Files from %s to %s'%(startDateTime_Reader, endDateTime_Reader))
280 281 print('........................................')
281 282 filter_filenameList = []
282 283 self.filenameList.sort()
283 284 #for i in range(len(self.filenameList)-1):
284 285 for i in range(len(self.filenameList)):
285 286 filename = self.filenameList[i]
286 287 fp = h5py.File(filename,'r')
287 288 time_str = fp.get('Time/RadacTimeString')
288 289
289 290 startDateTimeStr_File = time_str[0][0].decode('UTF-8').split('.')[0]
290 291 #startDateTimeStr_File = "2019-12-16 09:21:11"
291 292 junk = time.strptime(startDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
292 293 startDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
293 294
294 295 #endDateTimeStr_File = "2019-12-16 11:10:11"
295 296 endDateTimeStr_File = time_str[-1][-1].decode('UTF-8').split('.')[0]
296 297 junk = time.strptime(endDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
297 298 endDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
298 299
299 300 fp.close()
300 301
301 302 #print("check time", startDateTime_File)
302 303 if self.timezone == 'lt':
303 304 startDateTime_File = startDateTime_File - datetime.timedelta(minutes = 300)
304 305 endDateTime_File = endDateTime_File - datetime.timedelta(minutes = 300)
305 306 if (startDateTime_File >=startDateTime_Reader and endDateTime_File<=endDateTime_Reader):
306 307 filter_filenameList.append(filename)
307 308
308 309 if (startDateTime_File>endDateTime_Reader):
309 310 break
310 311
311 312
312 313 filter_filenameList.sort()
313 314 self.filenameList = filter_filenameList
314 315
315 316 return 1
316 317
317 318 def __filterByGlob1(self, dirName):
318 319 filter_files = glob.glob1(dirName, '*.*%s'%self.extension_file)
319 320 filter_files.sort()
320 321 filterDict = {}
321 322 filterDict.setdefault(dirName)
322 323 filterDict[dirName] = filter_files
323 324 return filterDict
324 325
325 326 def __getFilenameList(self, fileListInKeys, dirList):
326 327 for value in fileListInKeys:
327 328 dirName = list(value.keys())[0]
328 329 for file in value[dirName]:
329 330 filename = os.path.join(dirName, file)
330 331 self.filenameList.append(filename)
331 332
332 333
333 334 def __selectDataForTimes(self, online=False):
334 335 #aun no esta implementado el filtro for tiempo
335 336 if not(self.status):
336 337 return None
337 338
338 339 dirList = [os.path.join(self.path,x) for x in self.dirnameList]
339 340 fileListInKeys = [self.__filterByGlob1(x) for x in dirList]
340 341 self.__getFilenameList(fileListInKeys, dirList)
341 342 if not(online):
342 343 #filtro por tiempo
343 344 if not(self.all):
344 345 self.__getTimeFromData()
345 346
346 347 if len(self.filenameList)>0:
347 348 self.status = 1
348 349 self.filenameList.sort()
349 350 else:
350 351 self.status = 0
351 352 return None
352 353
353 354 else:
354 355 #get the last file - 1
355 356 self.filenameList = [self.filenameList[-2]]
356 357 new_dirnameList = []
357 358 for dirname in self.dirnameList:
358 359 junk = numpy.array([dirname in x for x in self.filenameList])
359 360 junk_sum = junk.sum()
360 361 if junk_sum > 0:
361 362 new_dirnameList.append(dirname)
362 363 self.dirnameList = new_dirnameList
363 364 return 1
364 365
365 366 def searchFilesOnLine(self, path, startDate, endDate, startTime=datetime.time(0,0,0),
366 367 endTime=datetime.time(23,59,59),walk=True):
367 368
368 369 if endDate ==None:
369 370 startDate = datetime.datetime.utcnow().date()
370 371 endDate = datetime.datetime.utcnow().date()
371 372
372 373 self.__setParameters(path=path, startDate=startDate, endDate=endDate,startTime = startTime,endTime=endTime, walk=walk)
373 374
374 375 self.__checkPath()
375 376
376 377 self.__findDataForDates(online=True)
377 378
378 379 self.dirnameList = [self.dirnameList[-1]]
379 380
380 381 self.__selectDataForTimes(online=True)
381 382
382 383 return
383 384
384 385
385 386 def searchFilesOffLine(self,
386 387 path,
387 388 startDate,
388 389 endDate,
389 390 startTime=datetime.time(0,0,0),
390 391 endTime=datetime.time(23,59,59),
391 392 walk=True):
392 393
393 394 self.__setParameters(path, startDate, endDate, startTime, endTime, walk)
394 395
395 396 self.__checkPath()
396 397
397 398 self.__findDataForDates()
398 399
399 400 self.__selectDataForTimes()
400 401
401 402 for i in range(len(self.filenameList)):
402 403 print("%s" %(self.filenameList[i]))
403 404
404 405 return
405 406
406 407 def __setNextFileOffline(self):
407 408
408 409 try:
409 410 self.filename = self.filenameList[self.fileIndex]
410 411 self.amisrFilePointer = h5py.File(self.filename,'r')
411 412 self.fileIndex += 1
412 413 except:
413 414 self.flagNoMoreFiles = 1
414 415 raise schainpy.admin.SchainError('No more files to read')
415 416 return 0
416 417
417 418 self.flagIsNewFile = 1
418 419 print("Setting the file: %s"%self.filename)
419 420
420 421 return 1
421 422
422 423
423 424 def __setNextFileOnline(self):
424 425 filename = self.filenameList[0]
425 426 if self.__filename_online != None:
426 427 self.__selectDataForTimes(online=True)
427 428 filename = self.filenameList[0]
428 429 wait = 0
429 430 self.__waitForNewFile=300 ## DEBUG:
430 431 while self.__filename_online == filename:
431 432 print('waiting %d seconds to get a new file...'%(self.__waitForNewFile))
432 433 if wait == 5:
433 434 self.flagNoMoreFiles = 1
434 435 return 0
435 436 sleep(self.__waitForNewFile)
436 437 self.__selectDataForTimes(online=True)
437 438 filename = self.filenameList[0]
438 439 wait += 1
439 440
440 441 self.__filename_online = filename
441 442
442 443 self.amisrFilePointer = h5py.File(filename,'r')
443 444 self.flagIsNewFile = 1
444 445 self.filename = filename
445 446 print("Setting the file: %s"%self.filename)
446 447 return 1
447 448
448 449
449 450 def readData(self):
450 451 buffer = self.amisrFilePointer.get('Raw11/Data/Samples/Data')
451 452 re = buffer[:,:,:,0]
452 453 im = buffer[:,:,:,1]
453 454 dataset = re + im*1j
454 455
455 456 self.radacTime = self.amisrFilePointer.get('Raw11/Data/RadacHeader/RadacTime')
456 457 timeset = self.radacTime[:,0]
457 458
458 459 return dataset,timeset
459 460
460 461 def reshapeData(self):
461 462 #self.beamCodeByPulse, self.beamCode, self.nblocks, self.nprofiles, self.nsa,
462 463 channels = self.beamCodeByPulse[0,:]
463 464 nchan = self.nchannels
464 465 #self.newProfiles = self.nprofiles/nchan #must be defined on filljroheader
465 466 nblocks = self.nblocks
466 467 nsamples = self.nsa
467 468
468 469 #Dimensions : nChannels, nProfiles, nSamples
469 470 new_block = numpy.empty((nblocks, nchan, numpy.int_(self.newProfiles), nsamples), dtype="complex64")
470 471 ############################################
471 472
472 473 for thisChannel in range(nchan):
473 474 new_block[:,thisChannel,:,:] = self.dataset[:,numpy.where(channels==self.beamCode[thisChannel])[0],:]
474 475
475 476
476 477 new_block = numpy.transpose(new_block, (1,0,2,3))
477 478 new_block = numpy.reshape(new_block, (nchan,-1, nsamples))
478 479
479 480 return new_block
480 481
481 482 def updateIndexes(self):
482 483
483 484 pass
484 485
485 486 def fillJROHeader(self):
486 487
487 488 #fill radar controller header
488 489 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(ipp=self.__ippKm,
489 490 txA=self.__txA,
490 491 txB=0,
491 492 nWindows=1,
492 493 nHeights=self.__nSamples,
493 494 firstHeight=self.__firstHeight,
494 495 deltaHeight=self.__deltaHeight,
495 496 codeType=self.__codeType,
496 497 nCode=self.__nCode, nBaud=self.__nBaud,
497 498 code = self.__code,
498 499 fClock=1)
499 500
500 501 #fill system header
501 502 self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples,
502 503 nProfiles=self.newProfiles,
503 504 nChannels=len(self.__channelList),
504 505 adcResolution=14,
505 506 pciDioBusWidth=32)
506 507
507 508 self.dataOut.type = "Voltage"
508 509 self.dataOut.data = None
509 510 self.dataOut.dtype = numpy.dtype([('real','<i8'),('imag','<i8')])
510 511 # self.dataOut.nChannels = 0
511 512
512 513 # self.dataOut.nHeights = 0
513 514
514 515 self.dataOut.nProfiles = self.newProfiles*self.nblocks
515 516 #self.dataOut.heightList = self.__firstHeigth + numpy.arange(self.__nSamples, dtype = numpy.float)*self.__deltaHeigth
516 517 ranges = numpy.reshape(self.rangeFromFile.value,(-1))
517 518 self.dataOut.heightList = ranges/1000.0 #km
518 519 self.dataOut.channelList = self.__channelList
519 520 self.dataOut.blocksize = self.dataOut.nChannels * self.dataOut.nHeights
520 521
521 522 # self.dataOut.channelIndexList = None
522 523
523 524
524 525 self.dataOut.azimuthList = numpy.array(self.azimuthList)
525 526 self.dataOut.elevationList = numpy.array(self.elevationList)
526 527 self.dataOut.codeList = numpy.array(self.beamCode)
527 528 #print(self.dataOut.elevationList)
528 529 self.dataOut.flagNoData = True
529 530
530 531 #Set to TRUE if the data is discontinuous
531 532 self.dataOut.flagDiscontinuousBlock = False
532 533
533 534 self.dataOut.utctime = None
534 535
535 536 #self.dataOut.timeZone = -5 #self.__timezone/60 #timezone like jroheader, difference in minutes between UTC and localtime
536 537 if self.timezone == 'lt':
537 538 self.dataOut.timeZone = time.timezone / 60. #get the timezone in minutes
538 539 else:
539 540 self.dataOut.timeZone = 0 #by default time is UTC
540 541
541 542 self.dataOut.dstFlag = 0
542 543 self.dataOut.errorCount = 0
543 544 self.dataOut.nCohInt = 1
544 545 self.dataOut.flagDecodeData = False #asumo que la data esta decodificada
545 546 self.dataOut.flagDeflipData = False #asumo que la data esta sin flip
546 547 self.dataOut.flagShiftFFT = False
547 548 self.dataOut.ippSeconds = self.ippSeconds
548 549
549 550 #Time interval between profiles
550 551 #self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt
551 552
552 553 self.dataOut.frequency = self.__frequency
553 554 self.dataOut.realtime = self.online
554 555 pass
555 556
556 557 def readNextFile(self,online=False):
557 558
558 559 if not(online):
559 560 newFile = self.__setNextFileOffline()
560 561 else:
561 562 newFile = self.__setNextFileOnline()
562 563
563 564 if not(newFile):
564 565 self.dataOut.error = True
565 566 return 0
566 567
567 568 if not self.readAMISRHeader(self.amisrFilePointer):
568 569 self.dataOut.error = True
569 570 return 0
570 571
571 572 self.createBuffers()
572 573 self.fillJROHeader()
573 574
574 575 #self.__firstFile = False
575 576
576 577
577 578
578 579 self.dataset,self.timeset = self.readData()
579 580
580 581 if self.endDate!=None:
581 582 endDateTime_Reader = datetime.datetime.combine(self.endDate,self.endTime)
582 583 time_str = self.amisrFilePointer.get('Time/RadacTimeString')
583 584 startDateTimeStr_File = time_str[0][0].decode('UTF-8').split('.')[0]
584 585 junk = time.strptime(startDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
585 586 startDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
586 587 if self.timezone == 'lt':
587 588 startDateTime_File = startDateTime_File - datetime.timedelta(minutes = 300)
588 589 if (startDateTime_File>endDateTime_Reader):
589 590 return 0
590 591
591 592 self.jrodataset = self.reshapeData()
592 593 #----self.updateIndexes()
593 594 self.profileIndex = 0
594 595
595 596 return 1
596 597
597 598
598 599 def __hasNotDataInBuffer(self):
599 600 if self.profileIndex >= (self.newProfiles*self.nblocks):
600 601 return 1
601 602 return 0
602 603
603 604
604 605 def getData(self):
605 606
606 607 if self.flagNoMoreFiles:
607 608 self.dataOut.flagNoData = True
608 609 return 0
609 610
610 611 if self.__hasNotDataInBuffer():
611 612 if not (self.readNextFile(self.online)):
612 613 return 0
613 614
614 615
615 616 if self.dataset is None: # setear esta condicion cuando no hayan datos por leer
616 617 self.dataOut.flagNoData = True
617 618 return 0
618 619
619 620 #self.dataOut.data = numpy.reshape(self.jrodataset[self.profileIndex,:],(1,-1))
620 621
621 622 self.dataOut.data = self.jrodataset[:,self.profileIndex,:]
622 623
623 624 #print("R_t",self.timeset)
624 625
625 626 #self.dataOut.utctime = self.jrotimeset[self.profileIndex]
626 627 #verificar basic header de jro data y ver si es compatible con este valor
627 628 #self.dataOut.utctime = self.timeset + (self.profileIndex * self.ippSeconds * self.nchannels)
628 629 indexprof = numpy.mod(self.profileIndex, self.newProfiles)
629 630 indexblock = self.profileIndex/self.newProfiles
630 631 #print (indexblock, indexprof)
631 632 diffUTC = 0
632 633 t_comp = (indexprof * self.ippSeconds * self.nchannels) + diffUTC #
633 634
634 635 #print("utc :",indexblock," __ ",t_comp)
635 636 #print(numpy.shape(self.timeset))
636 637 self.dataOut.utctime = self.timeset[numpy.int_(indexblock)] + t_comp
637 638 #self.dataOut.utctime = self.timeset[self.profileIndex] + t_comp
638 639
639 640 self.dataOut.profileIndex = self.profileIndex
640 641 #print("N profile:",self.profileIndex,self.newProfiles,self.nblocks,self.dataOut.utctime)
641 642 self.dataOut.flagNoData = False
642 643 # if indexprof == 0:
643 644 # print("kamisr: ",self.dataOut.utctime)
644 645
645 646 self.profileIndex += 1
646 647
647 648 return self.dataOut.data #retorno necesario??
648 649
649 650
650 651 def run(self, **kwargs):
651 652 '''
652 653 This method will be called many times so here you should put all your code
653 654 '''
654 655 #print("running kamisr")
655 656 if not self.isConfig:
656 657 self.setup(**kwargs)
657 658 self.isConfig = True
658 659
659 660 self.getData()
@@ -1,1113 +1,1112
1 1 '''
2 2 @author: Daniel Suarez
3 3 '''
4 4 import os
5 5 import glob
6 6 import ftplib
7 7
8 8 try:
9 9 import paramiko
10 10 import scp
11 11 except:
12 12 pass
13 13
14 14 import time
15 15
16 16 import threading
17 17 Thread = threading.Thread
18 18
19 19 # try:
20 20 # from gevent import sleep
21 21 # except:
22 22 from time import sleep
23 23 from schainpy.model.data.jrodata import *
24 24 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
25 25 #@MPDecorator
26 26 class Remote(Thread):
27 27 """
28 28 Remote is a parent class used to define the behaviour of FTP and SSH class. These clases are
29 29 used to upload or download files remotely.
30 30
31 31 Non-standard Python modules used:
32 32 None
33 33
34 34 Written by:
35 35 "Miguel Urco":mailto:miguel.urco@jro.igp.gob.pe Jun. 03, 2015
36 36 Modified by:
37 37 -
38 38 """
39 39
40 40 server = None
41 41 username = None
42 42 password = None
43 43 remotefolder = None
44 44 key_filename=None
45 45
46 46 period = 60
47 47 fileList = []
48 48 bussy = False
49 49
50 50 def __init__(self, server, username, password, remotefolder, period=60,key_filename=None):
51 51
52 52 Thread.__init__(self)
53 53
54 54 self.setDaemon(True)
55 55
56 56 self.status = 0
57 57
58 58 self.__server = server
59 59 self.__username = username
60 60 self.__password = password
61 61 self.__remotefolder = remotefolder
62 62
63 63 self.period = period
64 64 self.key_filename = key_filename
65 65 self.fileList = []
66 66 self.bussy = False
67 67
68 68 self.stopFlag = False
69 69
70 70 print("[Remote Server] Opening server: %s" %self.__server)
71 71 if self.open(self.__server, self.__username, self.__password, self.__remotefolder,key_filename=self.key_filename):
72 72 print("[Remote Server] %s server was opened successfully" %self.__server)
73 73
74 74 #self.close()
75 75
76 76 self.mutex = threading.Lock()
77 77
78 78 def stop(self):
79 79
80 80 self.stopFlag = True
81 81 self.join(10)
82 82
83 83 def open(self):
84 84 """
85 85 Connect to server and create a connection class (FTP or SSH) to remote server.
86 86 """
87 87 raise NotImplementedError("Implement this method in child class")
88 88
89 89 def close(self):
90 90 """
91 91 Close connection to server
92 92 """
93 93 raise NotImplementedError("Implement this method in child class")
94 94
95 95 def mkdir(self, remotefolder):
96 96 """
97 97 Create a folder remotely
98 98 """
99 99 raise NotImplementedError("Implement this method in child class")
100 100
101 101 def cd(self, remotefolder):
102 102 """
103 103 Change working directory in remote server
104 104 """
105 105 raise NotImplementedError("Implement this method in child class")
106 106
107 107 def download(self, filename, localfolder=None):
108 108 """
109 109 Download a file from server to local host
110 110 """
111 111 raise NotImplementedError("Implement this method in child class")
112 112
113 113 def sendFile(self, fullfilename):
114 114 """
115 115 sendFile method is used to upload a local file to the current directory in remote server
116 116
117 117 Inputs:
118 118 fullfilename - full path name of local file to store in remote directory
119 119
120 120 Returns:
121 121 0 in error case else 1
122 122 """
123 123 raise NotImplementedError("Implement this method in child class")
124 124
125 125 def upload(self, fullfilename, remotefolder=None):
126 126 """
127 127 upload method is used to upload a local file to remote directory. This method changes
128 128 working directory before sending a file.
129 129
130 130 Inputs:
131 131 fullfilename - full path name of local file to store in remote directory
132 132
133 133 remotefolder - remote directory
134 134
135 135 Returns:
136 136 0 in error case else 1
137 137 """
138 138 print("[Remote Server] Uploading %s to %s:%s" %(fullfilename, self.server, self.remotefolder))
139 139
140 140 if not self.status:
141 141 return 0
142 142
143 143 if remotefolder == None:
144 144 remotefolder = self.remotefolder
145 145
146 146 if not self.cd(remotefolder):
147 147 return 0
148 148
149 149 if not self.sendFile(fullfilename):
150 150 print("[Remote Server] Error uploading file %s" %fullfilename)
151 151 return 0
152 152
153 153
154 154
155 155 return 1
156 156
157 157 def delete(self, filename):
158 158 """
159 159 Remove a file from remote server
160 160 """
161 161 pass
162 162
163 163 def updateFileList(self, fileList):
164 164 """
165 165 Remove a file from remote server
166 166 """
167 167
168 168 if fileList == self.fileList:
169 169 return 0
170 170
171 171 self.mutex.acquire()
172 172 # init = time.time()
173 173 #
174 174 # while(self.bussy):
175 175 # sleep(0.1)
176 176 # if time.time() - init > 2*self.period:
177 177 # return 0
178 178
179 179 self.fileList = fileList
180 180 self.mutex.release()
181 181 return 1
182 182
183 183 def run(self):
184 184
185 185 if not self.status:
186 186 print("Finishing FTP service")
187 187 return
188 188
189 189 if not self.cd(self.remotefolder):
190 190 raise ValueError("Could not access to the new remote directory: %s" %self.remotefolder)
191 191
192 192 while True:
193 193
194 194 for i in range(self.period):
195 195 if self.stopFlag:
196 196 break
197 197 sleep(1)
198 198
199 199 if self.stopFlag:
200 200 break
201 201
202 202 # self.bussy = True
203 203 self.mutex.acquire()
204 204
205 205 print("[Remote Server] Opening %s" %self.__server)
206 206 if not self.open(self.__server, self.__username, self.__password, self.__remotefolder):
207 207 self.mutex.release()
208 208 continue
209 209
210 210 for thisFile in self.fileList:
211 211 self.upload(thisFile, self.remotefolder)
212 212
213 213 print("[Remote Server] Closing %s" %self.__server)
214 214 self.close()
215 215
216 216 self.mutex.release()
217 217 # self.bussy = False
218 218
219 219 print("[Remote Server] Thread stopped successfully")
220 220
221 221 class FTPClient(Remote):
222 222
223 223 __ftpClientObj = None
224 224
225 225 def __init__(self, server, username, password, remotefolder, period=60):
226 226 """
227 227 """
228 228 Remote.__init__(self, server, username, password, remotefolder, period)
229 229
230 230 def open(self, server, username, password, remotefolder):
231 231
232 232 """
233 233 This method is used to set FTP parameters and establish a connection to remote server
234 234
235 235 Inputs:
236 236 server - remote server IP Address
237 237
238 238 username - remote server Username
239 239
240 240 password - remote server password
241 241
242 242 remotefolder - remote server current working directory
243 243
244 244 Return:
245 245 Boolean - Returns 1 if a connection has been established, 0 otherwise
246 246
247 247 Affects:
248 248 self.status - in case of error or fail connection this parameter is set to 0 else 1
249 249
250 250 """
251 251
252 252 if server == None:
253 253 raise ValueError("FTP server should be defined")
254 254
255 255 if username == None:
256 256 raise ValueError("FTP username should be defined")
257 257
258 258 if password == None:
259 259 raise ValueError("FTP password should be defined")
260 260
261 261 if remotefolder == None:
262 262 raise ValueError("FTP remote folder should be defined")
263 263
264 264 try:
265 265 ftpClientObj = ftplib.FTP(server)
266 266 except ftplib.all_errors as e:
267 267 print("[FTP Server]: FTP server connection fail: %s" %server)
268 268 print("[FTP Server]:", e)
269 269 self.status = 0
270 270 return 0
271 271
272 272 try:
273 273 ftpClientObj.login(username, password)
274 274 except ftplib.all_errors:
275 275 print("[FTP Server]: FTP username or password are incorrect")
276 276 self.status = 0
277 277 return 0
278 278
279 279 if remotefolder == None:
280 280 remotefolder = ftpClientObj.pwd()
281 281 else:
282 282 try:
283 283 ftpClientObj.cwd(remotefolder)
284 284 except ftplib.all_errors:
285 285 print("[FTP Server]: FTP remote folder is invalid: %s" %remotefolder)
286 286 remotefolder = ftpClientObj.pwd()
287 287
288 288 self.server = server
289 289 self.username = username
290 290 self.password = password
291 291 self.remotefolder = remotefolder
292 292 self.__ftpClientObj = ftpClientObj
293 293 self.status = 1
294 294
295 295 return 1
296 296
297 297 def close(self):
298 298 """
299 299 Close connection to remote server
300 300 """
301 301 if not self.status:
302 302 return 0
303 303
304 304 self.__ftpClientObj.close()
305 305
306 306 def mkdir(self, remotefolder):
307 307 """
308 308 mkdir is used to make a new directory in remote server
309 309
310 310 Input:
311 311 remotefolder - directory name
312 312
313 313 Return:
314 314 0 in error case else 1
315 315 """
316 316 if not self.status:
317 317 return 0
318 318
319 319 try:
320 320 self.__ftpClientObj.mkd(dirname)
321 321 except ftplib.all_errors:
322 322 print("[FTP Server]: Error creating remote folder: %s" %remotefolder)
323 323 return 0
324 324
325 325 return 1
326 326
327 327 def cd(self, remotefolder):
328 328 """
329 329 cd is used to change remote working directory on server
330 330
331 331 Input:
332 332 remotefolder - current working directory
333 333
334 334 Affects:
335 335 self.remotefolder
336 336
337 337 Return:
338 338 0 in case of error else 1
339 339 """
340 340 if not self.status:
341 341 return 0
342 342
343 343 if remotefolder == self.remotefolder:
344 344 return 1
345 345
346 346 try:
347 347 self.__ftpClientObj.cwd(remotefolder)
348 348 except ftplib.all_errors:
349 349 print('[FTP Server]: Error changing to %s' %remotefolder)
350 350 print('[FTP Server]: Trying to create remote folder')
351 351
352 352 if not self.mkdir(remotefolder):
353 353 print('[FTP Server]: Remote folder could not be created')
354 354 return 0
355 355
356 356 try:
357 357 self.__ftpClientObj.cwd(remotefolder)
358 358 except ftplib.all_errors:
359 359 return 0
360 360
361 361 self.remotefolder = remotefolder
362 362
363 363 return 1
364 364
365 365 def sendFile(self, fullfilename):
366 366
367 367 if not self.status:
368 368 return 0
369 369
370 370 fp = open(fullfilename, 'rb')
371 371
372 372 filename = os.path.basename(fullfilename)
373 373
374 374 command = "STOR %s" %filename
375 375
376 376 try:
377 377 self.__ftpClientObj.storbinary(command, fp)
378 378 except ftplib.all_errors as e:
379 379 print("[FTP Server]:", e)
380 380 return 0
381 381
382 382 try:
383 383 self.__ftpClientObj.sendcmd('SITE CHMOD 755 ' + filename)
384 384 except ftplib.all_errors as e:
385 385 print("[FTP Server]:", e)
386 386
387 387 fp.close()
388 388
389 389 return 1
390 390
391 391 class SSHClient(Remote):
392 392
393 393 __sshClientObj = None
394 394 __scpClientObj = None
395 395
396 396
397 397 def __init__(self, server, username, password, remotefolder, period=60,key_filename=None):
398 398 """
399 399 """
400 400 Remote.__init__(self, server, username, password, remotefolder, period, key_filename)
401 401
402 402 def open(self, server, username, password, remotefolder, port=22, key_filename=None):
403 403
404 404 """
405 405 This method is used to set SSH parameters and establish a connection to a remote server
406 406
407 407 Inputs:
408 408 server - remote server IP Address
409 409
410 410 username - remote server Username
411 411
412 412 password - remote server password
413 413
414 414 remotefolder - remote server current working directory
415 415
416 416 key_filename - filename of the private key/optional
417 417
418 418 Return: void
419 419
420 420 Affects:
421 421 self.status - in case of error or fail connection this parameter is set to 0 else 1
422 422
423 423 """
424 424 #import socket
425 425
426 426 if server == None:
427 427 raise ValueError("SSH server should be defined")
428 428
429 429 if username == None:
430 430 raise ValueError("SSH username should be defined")
431 431
432 432 if password == None:
433 433 raise ValueError("SSH password should be defined")
434 434
435 435 if remotefolder == None:
436 436 raise ValueError("SSH remote folder should be defined")
437 437
438 438 self.__sshClientObj = paramiko.SSHClient()
439 439
440 440 self.__sshClientObj.load_system_host_keys()
441 441 self.__sshClientObj.set_missing_host_key_policy(paramiko.WarningPolicy())
442 442
443 443 self.status = 0
444 444
445 445 try:
446 446 if key_filename != None:
447 447 self.__sshClientObj.connect(server, username=username, password=password, port=port, key_filename=key_filename)
448 448 else:
449 449 self.__sshClientObj.connect(server, username=username, password=password, port=port)
450 450 except paramiko.AuthenticationException as e:
451 451 # print "SSH username or password are incorrect: %s"
452 452 print("[SSH Server]:", e)
453 453 return 0
454 454 # except SSHException as e:
455 455 # print("[SSH Server]:", e)
456 456 # return 0
457 457 # except socket.error:
458 458 # self.status = 0
459 459 # print("[SSH Server]:", e)
460 460 # return 0
461 461
462 462 self.status = 1
463 463 #self.__scpClientObj = scp.SCPClient(self.__sshClientObj.get_transport(), socket_timeout=30)
464 464 self.__scpClientObj = self.__sshClientObj.open_sftp()
465 465 if remotefolder == None:
466 466 remotefolder = self.pwd()
467 467
468 468 self.server = server
469 469 self.username = username
470 470 self.password = password
471 471 # self.__sshClientObj = self.__sshClientObj
472 472 # self.__scpClientObj = self.__scpClientObj
473 473 self.status = 1
474 474
475 475 if not self.cd(remotefolder):
476 476 raise ValueError("[SSH Server]: Could not access to remote folder: %s" %remotefolder)
477 477 return 0
478 478
479 479 self.remotefolder = remotefolder
480 480
481 481 return 1
482 482
483 483 def close(self):
484 484 """
485 485 Close connection to remote server
486 486 """
487 487 if not self.status:
488 488 return 0
489 489
490 490 self.__scpClientObj.close()
491 491 self.__sshClientObj.close()
492 492
493 493 def __execute(self, command):
494 494 """
495 495 __execute a command on remote server
496 496
497 497 Input:
498 498 command - Exmaple 'ls -l'
499 499
500 500 Return:
501 501 0 in error case else 1
502 502 """
503 503 if not self.status:
504 504 return 0
505 505
506 506 stdin, stdout, stderr = self.__sshClientObj.exec_command(command)
507 507
508 508 result = stderr.readlines()
509 509 if len(result) > 1:
510 510 return 0
511 511
512 512 result = stdout.readlines()
513 513 if len(result) > 1:
514 514 return result[0][:-1]
515 515
516 516 return 1
517 517
518 518 def mkdir(self, remotefolder):
519 519 """
520 520 mkdir is used to make a new directory in remote server
521 521
522 522 Input:
523 523 remotefolder - directory name
524 524
525 525 Return:
526 526 0 in error case else 1
527 527 """
528 528
529 529 command = 'mkdir %s' %remotefolder
530 530
531 531 return self.__execute(command)
532 532
533 533 def pwd(self):
534 534
535 535 command = 'pwd'
536 536
537 537 return self.__execute(command)
538 538
539 539 def cd(self, remotefolder):
540 540 """
541 541 cd is used to change remote working directory on server
542 542
543 543 Input:
544 544 remotefolder - current working directory
545 545
546 546 Affects:
547 547 self.remotefolder
548 548
549 549 Return:
550 550 0 in case of error else 1
551 551 """
552 552 if not self.status:
553 553 return 0
554 554
555 555 if remotefolder == self.remotefolder:
556 556 return 1
557 557
558 558 chk_command = "cd %s; pwd" %remotefolder
559 559 mkdir_command = "mkdir %s" %remotefolder
560 560
561 561 if not self.__execute(chk_command):
562 562 if not self.__execute(mkdir_command):
563 563 self.remotefolder = None
564 564 return 0
565 565
566 566 self.remotefolder = remotefolder
567 567
568 568 return 1
569 569
570 570 def sendFile(self, fullfilename):
571 571
572 572 if not self.status:
573 573 return 0
574 574
575 575 remotefile = os.path.join(self.remotefolder, os.path.split(fullfilename)[-1])
576 576 print("remotefile",fullfilename, remotefile)
577 577
578 578 try:
579 579 self.__scpClientObj.put(fullfilename,remotefile)
580 580 except paramiko.SSHException as e:
581 581 print("[SSH Server]", str(e))
582 582 print(fullfilename," to ",remotefile)
583 583 return 0
584 584
585 585
586 586 #command = 'chmod 775 %s' %remotefile
587 587
588 588 return 1#self.__execute(command)
589 589 #@MPDecorator
590 590 class SendToServerProc(ProcessingUnit):
591 591
592 592 sendByTrigger = False
593 593
594 594 def __init__(self, **kwargs):
595 595
596 596 ProcessingUnit.__init__(self)
597 597
598 598 self.isConfig = False
599 599 self.clientObj = None
600 600 self.dataOut = Parameters()
601 601 self.dataOut.error=False
602 602 self.dataOut.flagNoData=True
603 603
604 604 def setup(self, server=None, username="", password="", remotefolder="", localfolder="",
605 ext='.png', period=60, protocol='ftp', sendByTrigger=False, key_filename=None):
605 ext='.png', period=60, protocol='ftp', sendByTrigger=False, key_filename=None):
606 606 self.server = server
607 607 self.username = username
608 608 self.password = password
609 609 self.remotefolder = remotefolder
610 610 self.clientObj = None
611 611 self.localfolder = localfolder
612 612 self.ext = ext
613 613 self.sendByTrigger = sendByTrigger
614 614 self.period = period
615 615 self.key_filename = key_filename
616 616 if self.sendByTrigger:
617 617 self.period = 1000000000000 #para que no se ejecute por tiempo
618 618
619 619 if str.lower(protocol) == 'ftp':
620 620 self.clientObj = FTPClient(server, username, password, remotefolder, period)
621 621
622 622 if str.lower(protocol) == 'ssh':
623 623 self.clientObj = SSHClient(self.server, self.username, self.password,
624 624 self.remotefolder, period=600000,key_filename=self.key_filename)
625 625
626 626 if not self.clientObj:
627 627 raise ValueError("%s has been chosen as remote access protocol but it is not valid" %protocol)
628 628
629 #self.clientObj.start()
630 629 print("Send to Server setup complete")
631 630
632 631
633 632 def findFiles(self):
634 633
635 634 if not type(self.localfolder) == list:
636 635 folderList = [self.localfolder]
637 636 else:
638 637 folderList = self.localfolder
639 638
640 639 #Remove duplicate items
641 640 folderList = list(set(folderList))
642 641
643 642 fullfilenameList = []
644 643
645 644 for thisFolder in folderList:
646 645
647 646 print("[Remote Server]: Searching files on %s" %thisFolder)
648 647
649 648 filenameList = glob.glob1(thisFolder, '*%s' %self.ext)
650 649
651 650 if len(filenameList) < 1:
652 651
653 652 continue
654 653
655 654 for thisFile in filenameList:
656 655 fullfilename = os.path.join(thisFolder, thisFile)
657 656
658 657 if fullfilename in fullfilenameList:
659 658 continue
660 659
661 660 #Only files modified in the last 30 minutes are considered
662 661 if os.path.getmtime(fullfilename) < time.time() - 30*60:
663 662 continue
664 663
665 664 fullfilenameList.append(fullfilename)
666 665 fullfilenameList.sort()
667 666
668 667 return fullfilenameList
669 668
670 669 def run(self, **kwargs):
671 670
672 671 if not self.isConfig:
673 672 self.init = time.time()
674 673 self.setup(**kwargs)
675 674 self.isConfig = True
676 675
677 676 if not self.clientObj.is_alive():
678 677 print("[Remote Server]: Restarting connection ")
679 678 self.setup( **kwargs)
680 679
681 680 if ((time.time() - self.init) >= self.period and not self.sendByTrigger) or (self.sendByTrigger and not self.dataIn.flagNoData):
682 681 fullfilenameList = self.findFiles()
683 682 if self.sendByTrigger:
684 683 if self.clientObj.upload(fullfilenameList[-1]): #last file to send
685 684 print("[Remote Server] upload finished successfully")
686 685 else:
687 686 for file in fullfilenameList:
688 687 self.clientObj.upload(file)
689 688
690 689 # if self.clientObj.updateFileList(fullfilenameList):
691 690 # print("[Remote Server]: Sending the next files ", str(fullfilenameList))
692 691
693 692 self.init = time.time()
694 693
695 694 def close(self):
696 695 print("[Remote Server] Stopping thread")
697 696 self.clientObj.stop()
698 697
699 698 class SendByRSYNCProc(ProcessingUnit):
700 699
701 700 sendByTrigger = False
702 701
703 702 def __init__(self, **kwargs):
704 703
705 704 ProcessingUnit.__init__(self)
706 705
707 706 self.isConfig = False
708 707 self.dataOut = Parameters()
709 708 self.dataOut.error=False
710 709 self.dataOut.flagNoData=True
711 710
712 711 def setup(self, server="", username="", remotefolder="", localfolder="",sendByTrigger=True,
713 712 period=60, key_filename=None, port=22 ,param1="", param2=""):
714 713 self.server = server
715 714 self.username = username
716 715 self.remotefolder = remotefolder
717 716 self.localfolder = localfolder
718 717 self.period = period
719 718 self.key_filename = key_filename
720 719 if type(param1)==str:
721 720 self.param1 = list(param1.split(","))
722 721 else:
723 722 self.param1 = param1
724 723 if type(param2)==str:
725 724 self.param2 = list(param2.split(","))
726 725 else:
727 726 self.param2 = param2
728 727 self.port = port
729 728 self.sendByTrigger = sendByTrigger
730 729 if self.sendByTrigger:
731 730 self.period = 1000000000000 #para que no se ejecute por tiempo
732 731 self.command ="rsync "
733 732
734 733 def syncFolders(self):
735 734 self.command ="rsync "
736 735 for p1 in self.param1:
737 736 self.command += " -"+str(p1)
738 737 for p2 in self.param2:
739 738 self.command += " --"+str(p2)
740 739 if self.key_filename != None:
741 740 self.command += """ "ssh -i {} -p {}" """.format(self.key_filename, self.port)
742 741 self.command += " {} ".format(self.localfolder)
743 742 self.command += " {}@{}:{}".format(self.username,self.server,self.remotefolder)
744 743 print("CMD: ",self.command)
745 744 #os.system(self.command)
746 745 return
747 746
748 747 def run(self, **kwargs):
749 748
750 749 if not self.isConfig:
751 750 self.init = time.time()
752 751 self.setup(**kwargs)
753 752 self.isConfig = True
754 753
755 754 if self.sendByTrigger and not self.dataIn.flagNoData:
756 755 self.syncFolders()
757 756 else:
758 757 if (time.time() - self.init) >= self.period:
759 758 self.syncFolders()
760 759 self.init = time.time()
761 760
762 761 return
763 762
764 763
765 764
766 765 class FTP(object):
767 766 """
768 767 Ftp is a public class used to define custom File Transfer Protocol from "ftplib" python module
769 768
770 769 Non-standard Python modules used: None
771 770
772 771 Written by "Daniel Suarez":mailto:daniel.suarez@jro.igp.gob.pe Oct. 26, 2010
773 772 """
774 773
775 774 def __init__(self,server = None, username=None, password=None, remotefolder=None):
776 775 """
777 776 This method is used to setting parameters for FTP and establishing connection to remote server
778 777
779 778 Inputs:
780 779 server - remote server IP Address
781 780
782 781 username - remote server Username
783 782
784 783 password - remote server password
785 784
786 785 remotefolder - remote server current working directory
787 786
788 787 Return: void
789 788
790 789 Affects:
791 790 self.status - in Error Case or Connection Failed this parameter is set to 1 else 0
792 791
793 792 self.folderList - sub-folder list of remote folder
794 793
795 794 self.fileList - file list of remote folder
796 795
797 796
798 797 """
799 798
800 799 if ((server == None) and (username==None) and (password==None) and (remotefolder==None)):
801 800 server, username, password, remotefolder = self.parmsByDefault()
802 801
803 802 self.server = server
804 803 self.username = username
805 804 self.password = password
806 805 self.remotefolder = remotefolder
807 806 self.file = None
808 807 self.ftp = None
809 808 self.status = 0
810 809
811 810 try:
812 811 self.ftp = ftplib.FTP(self.server)
813 812 self.ftp.login(self.username,self.password)
814 813 self.ftp.cwd(self.remotefolder)
815 814 # print 'Connect to FTP Server: Successfully'
816 815
817 816 except ftplib.all_errors:
818 817 print('Error FTP Service')
819 818 self.status = 1
820 819 return
821 820
822 821
823 822
824 823 self.dirList = []
825 824
826 825 try:
827 826 self.dirList = self.ftp.nlst()
828 827
829 828 except ftplib.error_perm as resp:
830 829 if str(resp) == "550 No files found":
831 830 print("no files in this directory")
832 831 self.status = 1
833 832 return
834 833
835 834 except ftplib.all_errors:
836 835 print('Error Displaying Dir-Files')
837 836 self.status = 1
838 837 return
839 838
840 839 self.fileList = []
841 840 self.folderList = []
842 841 #only for test
843 842 for f in self.dirList:
844 843 name, ext = os.path.splitext(f)
845 844 if ext != '':
846 845 self.fileList.append(f)
847 846 # print 'filename: %s - size: %d'%(f,self.ftp.size(f))
848 847
849 848 def parmsByDefault(self):
850 849 server = 'jro-app.igp.gob.pe'
851 850 username = 'wmaster'
852 851 password = 'mst2010vhf'
853 852 remotefolder = '/home/wmaster/graficos'
854 853
855 854 return server, username, password, remotefolder
856 855
857 856
858 857 def mkd(self,dirname):
859 858 """
860 859 mkd is used to make directory in remote server
861 860
862 861 Input:
863 862 dirname - directory name
864 863
865 864 Return:
866 865 1 in error case else 0
867 866 """
868 867 try:
869 868 self.ftp.mkd(dirname)
870 869 except:
871 870 print('Error creating remote folder:%s'%dirname)
872 871 return 1
873 872
874 873 return 0
875 874
876 875
877 876 def delete(self,filename):
878 877 """
879 878 delete is used to delete file in current working directory of remote server
880 879
881 880 Input:
882 881 filename - filename to delete in remote folder
883 882
884 883 Return:
885 884 1 in error case else 0
886 885 """
887 886
888 887 try:
889 888 self.ftp.delete(filename)
890 889 except:
891 890 print('Error deleting remote file:%s'%filename)
892 891 return 1
893 892
894 893 return 0
895 894
896 895 def download(self,filename,localfolder):
897 896 """
898 897 download is used to downloading file from remote folder into local folder
899 898
900 899 Inputs:
901 900 filename - filename to donwload
902 901
903 902 localfolder - directory local to store filename
904 903
905 904 Returns:
906 905 self.status - 1 in error case else 0
907 906 """
908 907
909 908 self.status = 0
910 909
911 910
912 911 if not(filename in self.fileList):
913 912 print('filename:%s not exists'%filename)
914 913 self.status = 1
915 914 return self.status
916 915
917 916 newfilename = os.path.join(localfolder,filename)
918 917
919 918 self.file = open(newfilename, 'wb')
920 919
921 920 try:
922 921 print('Download: ' + filename)
923 922 self.ftp.retrbinary('RETR ' + filename, self.__handleDownload)
924 923 print('Download Complete')
925 924 except ftplib.all_errors:
926 925 print('Error Downloading ' + filename)
927 926 self.status = 1
928 927 return self.status
929 928
930 929 self.file.close()
931 930
932 931 return self.status
933 932
934 933
935 934 def __handleDownload(self,block):
936 935 """
937 936 __handleDownload is used to handle writing file
938 937 """
939 938 self.file.write(block)
940 939
941 940
942 941 def upload(self,filename,remotefolder=None):
943 942 """
944 943 upload is used to uploading local file to remote directory
945 944
946 945 Inputs:
947 946 filename - full path name of local file to store in remote directory
948 947
949 948 remotefolder - remote directory
950 949
951 950 Returns:
952 951 self.status - 1 in error case else 0
953 952 """
954 953
955 954 if remotefolder == None:
956 955 remotefolder = self.remotefolder
957 956
958 957 self.status = 0
959 958
960 959 try:
961 960 self.ftp.cwd(remotefolder)
962 961
963 962 self.file = open(filename, 'rb')
964 963
965 964 (head, tail) = os.path.split(filename)
966 965
967 966 command = "STOR " + tail
968 967
969 968 print('Uploading: ' + tail)
970 969 self.ftp.storbinary(command, self.file)
971 970 print('Upload Completed')
972 971
973 972 except ftplib.all_errors:
974 973 print('Error Uploading ' + tail)
975 974 self.status = 1
976 975 return self.status
977 976
978 977 self.file.close()
979 978
980 979 #back to initial directory in __init__()
981 980 self.ftp.cwd(self.remotefolder)
982 981
983 982 return self.status
984 983
985 984
986 985 def dir(self,remotefolder):
987 986 """
988 987 dir is used to change working directory of remote server and get folder and file list
989 988
990 989 Input:
991 990 remotefolder - current working directory
992 991
993 992 Affects:
994 993 self.fileList - file list of working directory
995 994
996 995 Return:
997 996 infoList - list with filenames and size of file in bytes
998 997
999 998 self.folderList - folder list
1000 999 """
1001 1000
1002 1001 self.remotefolder = remotefolder
1003 1002 print('Change to ' + self.remotefolder)
1004 1003 try:
1005 1004 self.ftp.cwd(remotefolder)
1006 1005 except ftplib.all_errors:
1007 1006 print('Error Change to ' + self.remotefolder)
1008 1007 infoList = None
1009 1008 self.folderList = None
1010 1009 return infoList,self.folderList
1011 1010
1012 1011 self.dirList = []
1013 1012
1014 1013 try:
1015 1014 self.dirList = self.ftp.nlst()
1016 1015
1017 1016 except ftplib.error_perm as resp:
1018 1017 if str(resp) == "550 No files found":
1019 1018 print("no files in this directory")
1020 1019 infoList = None
1021 1020 self.folderList = None
1022 1021 return infoList,self.folderList
1023 1022 except ftplib.all_errors:
1024 1023 print('Error Displaying Dir-Files')
1025 1024 infoList = None
1026 1025 self.folderList = None
1027 1026 return infoList,self.folderList
1028 1027
1029 1028 infoList = []
1030 1029 self.fileList = []
1031 1030 self.folderList = []
1032 1031 for f in self.dirList:
1033 1032 name,ext = os.path.splitext(f)
1034 1033 if ext != '':
1035 1034 self.fileList.append(f)
1036 1035 value = (f,self.ftp.size(f))
1037 1036 infoList.append(value)
1038 1037
1039 1038 if ext == '':
1040 1039 self.folderList.append(f)
1041 1040
1042 1041 return infoList,self.folderList
1043 1042
1044 1043
1045 1044 def close(self):
1046 1045 """
1047 1046 close is used to close and end FTP connection
1048 1047
1049 1048 Inputs: None
1050 1049
1051 1050 Return: void
1052 1051
1053 1052 """
1054 1053 self.ftp.close()
1055 1054 @MPDecorator
1056 1055 class SendByFTP(Operation):
1057 1056
1058 1057 def __init__(self, **kwargs):
1059 1058 Operation.__init__(self, **kwargs)
1060 1059 self.status = 1
1061 1060 self.counter = 0
1062 1061
1063 1062 def error_print(self, ValueError):
1064 1063
1065 1064 print(ValueError, 'Error FTP')
1066 1065 print("don't worry the program is running...")
1067 1066
1068 1067 def worker_ftp(self, server, username, password, remotefolder, filenameList):
1069 1068
1070 1069 self.ftpClientObj = FTP(server, username, password, remotefolder)
1071 1070 for filename in filenameList:
1072 1071 self.ftpClientObj.upload(filename)
1073 1072 self.ftpClientObj.close()
1074 1073
1075 1074 def ftp_thread(self, server, username, password, remotefolder):
1076 1075 if not(self.status):
1077 1076 return
1078 1077
1079 1078 import multiprocessing
1080 1079
1081 1080 p = multiprocessing.Process(target=self.worker_ftp, args=(server, username, password, remotefolder, self.filenameList,))
1082 1081 p.start()
1083 1082
1084 1083 p.join(3)
1085 1084
1086 1085 if p.is_alive():
1087 1086 p.terminate()
1088 1087 p.join()
1089 1088 print('killing ftp process...')
1090 1089 self.status = 0
1091 1090 return
1092 1091
1093 1092 self.status = 1
1094 1093 return
1095 1094
1096 1095 def filterByExt(self, ext, localfolder):
1097 1096 fnameList = glob.glob1(localfolder,ext)
1098 1097 self.filenameList = [os.path.join(localfolder,x) for x in fnameList]
1099 1098
1100 1099 if len(self.filenameList) == 0:
1101 1100 self.status = 0
1102 1101
1103 1102 def run(self, dataOut, ext, localfolder, remotefolder, server, username, password, period=1):
1104 1103
1105 1104 self.counter += 1
1106 1105 if self.counter >= period:
1107 1106 self.filterByExt(ext, localfolder)
1108 1107
1109 1108 self.ftp_thread(server, username, password, remotefolder)
1110 1109
1111 1110 self.counter = 0
1112 1111
1113 1112 self.status = 1
General Comments 0
You need to be logged in to leave comments. Login now