##// END OF EJS Templates
merge voltage ftp
José Chávez -
r1037:2fe9a199ea60
parent child
Show More
@@ -1,824 +1,737
1 1 '''
2 2 Created on Jul 2, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6
7 7 import numpy
8 8
9 9 from jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
11 11 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
12 12 from schainpy.model.data.jrodata import Voltage
13 13 import zmq
14 14 import tempfile
15 15 from StringIO import StringIO
16 16 # from _sha import blocksize
17 17
18 18 class VoltageReader(JRODataReader, ProcessingUnit):
19 19 """
20 20 Esta clase permite leer datos de voltage desde archivos en formato rawdata (.r). La lectura
21 21 de los datos siempre se realiza por bloques. Los datos leidos (array de 3 dimensiones:
22 22 perfiles*alturas*canales) son almacenados en la variable "buffer".
23 23
24 24 perfiles * alturas * canales
25 25
26 26 Esta clase contiene instancias (objetos) de las clases BasicHeader, SystemHeader,
27 27 RadarControllerHeader y Voltage. Los tres primeros se usan para almacenar informacion de la
28 28 cabecera de datos (metadata), y el cuarto (Voltage) para obtener y almacenar un perfil de
29 29 datos desde el "buffer" cada vez que se ejecute el metodo "getData".
30 30
31 31 Example:
32 32
33 33 dpath = "/home/myuser/data"
34 34
35 35 startTime = datetime.datetime(2010,1,20,0,0,0,0,0,0)
36 36
37 37 endTime = datetime.datetime(2010,1,21,23,59,59,0,0,0)
38 38
39 39 readerObj = VoltageReader()
40 40
41 41 readerObj.setup(dpath, startTime, endTime)
42 42
43 43 while(True):
44 44
45 45 #to get one profile
46 46 profile = readerObj.getData()
47 47
48 48 #print the profile
49 49 print profile
50 50
51 51 #If you want to see all datablock
52 52 print readerObj.datablock
53 53
54 54 if readerObj.flagNoMoreFiles:
55 55 break
56 56
57 57 """
58 58
59 59 ext = ".r"
60 60
61 61 optchar = "D"
62 62 dataOut = None
63 63
64 64 def __init__(self, **kwargs):
65 65 """
66 66 Inicializador de la clase VoltageReader para la lectura de datos de voltage.
67 67
68 68 Input:
69 69 dataOut : Objeto de la clase Voltage. Este objeto sera utilizado para
70 70 almacenar un perfil de datos cada vez que se haga un requerimiento
71 71 (getData). El perfil sera obtenido a partir del buffer de datos,
72 72 si el buffer esta vacio se hara un nuevo proceso de lectura de un
73 73 bloque de datos.
74 74 Si este parametro no es pasado se creara uno internamente.
75 75
76 76 Variables afectadas:
77 77 self.dataOut
78 78
79 79 Return:
80 80 None
81 81 """
82 82
83 83 ProcessingUnit.__init__(self, **kwargs)
84 84
85 85 self.isConfig = False
86 86
87 87 self.datablock = None
88 88
89 89 self.utc = 0
90 90
91 91 self.ext = ".r"
92 92
93 93 self.optchar = "D"
94 94
95 95 self.basicHeaderObj = BasicHeader(LOCALTIME)
96 96
97 97 self.systemHeaderObj = SystemHeader()
98 98
99 99 self.radarControllerHeaderObj = RadarControllerHeader()
100 100
101 101 self.processingHeaderObj = ProcessingHeader()
102 102
103 103 self.online = 0
104 104
105 105 self.fp = None
106 106
107 107 self.idFile = None
108 108
109 109 self.dtype = None
110 110
111 111 self.fileSizeByHeader = None
112 112
113 113 self.filenameList = []
114 114
115 115 self.filename = None
116 116
117 117 self.fileSize = None
118 118
119 119 self.firstHeaderSize = 0
120 120
121 121 self.basicHeaderSize = 24
122 122
123 123 self.pathList = []
124 124
125 125 self.filenameList = []
126 126
127 127 self.lastUTTime = 0
128 128
129 129 self.maxTimeStep = 30
130 130
131 131 self.flagNoMoreFiles = 0
132 132
133 133 self.set = 0
134 134
135 135 self.path = None
136 136
137 137 self.profileIndex = 2**32-1
138 138
139 139 self.delay = 3 #seconds
140 140
141 141 self.nTries = 3 #quantity tries
142 142
143 143 self.nFiles = 3 #number of files for searching
144 144
145 145 self.nReadBlocks = 0
146 146
147 147 self.flagIsNewFile = 1
148 148
149 149 self.__isFirstTimeOnline = 1
150 150
151 151 # self.ippSeconds = 0
152 152
153 153 self.flagDiscontinuousBlock = 0
154 154
155 155 self.flagIsNewBlock = 0
156 156
157 157 self.nTotalBlocks = 0
158 158
159 159 self.blocksize = 0
160 160
161 161 self.dataOut = self.createObjByDefault()
162 162
163 163 self.nTxs = 1
164 164
165 165 self.txIndex = 0
166 166
167 167 def createObjByDefault(self):
168 168
169 169 dataObj = Voltage()
170 170
171 171 return dataObj
172 172
173 173 def __hasNotDataInBuffer(self):
174 174
175 175 if self.profileIndex >= self.processingHeaderObj.profilesPerBlock*self.nTxs:
176 176 return 1
177 177
178 178 return 0
179 179
180 180
181 181 def getBlockDimension(self):
182 182 """
183 <<<<<<< HEAD
184 Obtiene la cantidad de puntos a leer por cada bloque de datos
185
186 Affected:
187 self.blocksize
188 =======
189 183 Obtiene la cantidad de puntos a leer por cada bloque de datos
190 184
191 185 Affected:
192 186 self.blocksize
193 >>>>>>> online_data_hour
194 187
195 188 Return:
196 189 None
197 190 """
198 191 pts2read = self.processingHeaderObj.profilesPerBlock * self.processingHeaderObj.nHeights * self.systemHeaderObj.nChannels
199 192 self.blocksize = pts2read
200 193
201 194
202 195
203 196 def readBlock(self):
204 197 """
205 <<<<<<< HEAD
206 readBlock lee el bloque de datos desde la posicion actual del puntero del archivo
207 (self.fp) y actualiza todos los parametros relacionados al bloque de datos
208 (metadata + data). La data leida es almacenada en el buffer y el contador del buffer
209 es seteado a 0
210
211 Inputs:
212 None
213
214 Return:
215 None
216
217 Affected:
218 self.profileIndex
219 self.datablock
220 self.flagIsNewFile
221 self.flagIsNewBlock
222 self.nTotalBlocks
223
224 Exceptions:
225 Si un bloque leido no es un bloque valido
226 =======
227 198 readBlock lee el bloque de datos desde la posicion actual del puntero del archivo
228 199 (self.fp) y actualiza todos los parametros relacionados al bloque de datos
229 200 (metadata + data). La data leida es almacenada en el buffer y el contador del buffer
230 201 es seteado a 0
231 202
232 203 Inputs:
233 204 None
234 205
235 206 Return:
236 207 None
237 208
238 209 Affected:
239 210 self.profileIndex
240 211 self.datablock
241 212 self.flagIsNewFile
242 213 self.flagIsNewBlock
243 214 self.nTotalBlocks
244 215
245 216 Exceptions:
246 217 Si un bloque leido no es un bloque valido
247 >>>>>>> online_data_hour
248 218 """
249 219
250 220 # if self.server is not None:
251 221 # self.zBlock = self.receiver.recv()
252 222 # self.zHeader = self.zBlock[:24]
253 223 # self.zDataBlock = self.zBlock[24:]
254 224 # junk = numpy.fromstring(self.zDataBlock, numpy.dtype([('real','<i4'),('imag','<i4')]))
255 225 # self.processingHeaderObj.profilesPerBlock = 240
256 226 # self.processingHeaderObj.nHeights = 248
257 227 # self.systemHeaderObj.nChannels
258 228 # else:
259 229 current_pointer_location = self.fp.tell()
260 230 junk = numpy.fromfile( self.fp, self.dtype, self.blocksize )
261 231
262 232 try:
263 233 junk = junk.reshape( (self.processingHeaderObj.profilesPerBlock, self.processingHeaderObj.nHeights, self.systemHeaderObj.nChannels) )
264 234 except:
265 235 #print "The read block (%3d) has not enough data" %self.nReadBlocks
266 236
267 237 if self.waitDataBlock(pointer_location=current_pointer_location):
268 238 junk = numpy.fromfile( self.fp, self.dtype, self.blocksize )
269 239 junk = junk.reshape( (self.processingHeaderObj.profilesPerBlock, self.processingHeaderObj.nHeights, self.systemHeaderObj.nChannels) )
270 240 # return 0
271 241
272 242 #Dimensions : nChannels, nProfiles, nSamples
273 243
274 244 junk = numpy.transpose(junk, (2,0,1))
275 245 self.datablock = junk['real'] + junk['imag']*1j
276 246
277 247 self.profileIndex = 0
278 248
279 249 self.flagIsNewFile = 0
280 250 self.flagIsNewBlock = 1
281 251
282 252 self.nTotalBlocks += 1
283 253 self.nReadBlocks += 1
284 254
285 255 return 1
286 256
287 257 def getFirstHeader(self):
288 258
289 259 self.getBasicHeader()
290 260
291 261 self.dataOut.systemHeaderObj = self.systemHeaderObj.copy()
292 262
293 263 self.dataOut.radarControllerHeaderObj = self.radarControllerHeaderObj.copy()
294 264
295 265 if self.nTxs > 1:
296 266 self.dataOut.radarControllerHeaderObj.ippSeconds = self.radarControllerHeaderObj.ippSeconds/self.nTxs
297 267
298 268 #Time interval and code are propierties of dataOut. Its value depends of radarControllerHeaderObj.
299 269
300 270 # self.dataOut.timeInterval = self.radarControllerHeaderObj.ippSeconds * self.processingHeaderObj.nCohInt
301 271 #
302 272 # if self.radarControllerHeaderObj.code is not None:
303 273 #
304 274 # self.dataOut.nCode = self.radarControllerHeaderObj.nCode
305 275 #
306 276 # self.dataOut.nBaud = self.radarControllerHeaderObj.nBaud
307 277 #
308 278 # self.dataOut.code = self.radarControllerHeaderObj.code
309 279
310 280 self.dataOut.dtype = self.dtype
311 281
312 282 self.dataOut.nProfiles = self.processingHeaderObj.profilesPerBlock
313 283
314 284 self.dataOut.heightList = numpy.arange(self.processingHeaderObj.nHeights) *self.processingHeaderObj.deltaHeight + self.processingHeaderObj.firstHeight
315 285
316 286 self.dataOut.channelList = range(self.systemHeaderObj.nChannels)
317 287
318 288 self.dataOut.nCohInt = self.processingHeaderObj.nCohInt
319 289
320 290 self.dataOut.flagDecodeData = self.processingHeaderObj.flag_decode #asumo q la data no esta decodificada
321 291
322 292 self.dataOut.flagDeflipData = self.processingHeaderObj.flag_deflip #asumo q la data no esta sin flip
323 293
324 294 self.dataOut.flagShiftFFT = self.processingHeaderObj.shif_fft
325 295
326 296 def reshapeData(self):
327 297
328 298 if self.nTxs < 0:
329 299 return
330 300
331 301 if self.nTxs == 1:
332 302 return
333 303
334 304 if self.nTxs < 1 and self.processingHeaderObj.profilesPerBlock % (1./self.nTxs) != 0:
335 305 raise ValueError, "1./nTxs (=%f), should be a multiple of nProfiles (=%d)" %(1./self.nTxs, self.processingHeaderObj.profilesPerBlock)
336 306
337 307 if self.nTxs > 1 and self.processingHeaderObj.nHeights % self.nTxs != 0:
338 308 raise ValueError, "nTxs (=%d), should be a multiple of nHeights (=%d)" %(self.nTxs, self.processingHeaderObj.nHeights)
339 309
340 310 self.datablock = self.datablock.reshape((self.systemHeaderObj.nChannels, self.processingHeaderObj.profilesPerBlock*self.nTxs, self.processingHeaderObj.nHeights/self.nTxs))
341 311
342 312 self.dataOut.nProfiles = self.processingHeaderObj.profilesPerBlock*self.nTxs
343 313 self.dataOut.heightList = numpy.arange(self.processingHeaderObj.nHeights/self.nTxs) *self.processingHeaderObj.deltaHeight + self.processingHeaderObj.firstHeight
344 314 self.dataOut.radarControllerHeaderObj.ippSeconds = self.radarControllerHeaderObj.ippSeconds/self.nTxs
345 315
346 316 return
347 317
348 318 def readFirstHeaderFromServer(self):
349 319
350 320 self.getFirstHeader()
351 321
352 322 self.firstHeaderSize = self.basicHeaderObj.size
353 323
354 324 datatype = int(numpy.log2((self.processingHeaderObj.processFlags & PROCFLAG.DATATYPE_MASK))-numpy.log2(PROCFLAG.DATATYPE_CHAR))
355 325 if datatype == 0:
356 326 datatype_str = numpy.dtype([('real','<i1'),('imag','<i1')])
357 327 elif datatype == 1:
358 328 datatype_str = numpy.dtype([('real','<i2'),('imag','<i2')])
359 329 elif datatype == 2:
360 330 datatype_str = numpy.dtype([('real','<i4'),('imag','<i4')])
361 331 elif datatype == 3:
362 332 datatype_str = numpy.dtype([('real','<i8'),('imag','<i8')])
363 333 elif datatype == 4:
364 334 datatype_str = numpy.dtype([('real','<f4'),('imag','<f4')])
365 335 elif datatype == 5:
366 336 datatype_str = numpy.dtype([('real','<f8'),('imag','<f8')])
367 337 else:
368 338 raise ValueError, 'Data type was not defined'
369 339
370 340 self.dtype = datatype_str
371 341 #self.ippSeconds = 2 * 1000 * self.radarControllerHeaderObj.ipp / self.c
372 342 self.fileSizeByHeader = self.processingHeaderObj.dataBlocksPerFile * self.processingHeaderObj.blockSize + self.firstHeaderSize + self.basicHeaderSize*(self.processingHeaderObj.dataBlocksPerFile - 1)
373 343 # self.dataOut.channelList = numpy.arange(self.systemHeaderObj.numChannels)
374 344 # self.dataOut.channelIndexList = numpy.arange(self.systemHeaderObj.numChannels)
375 345 self.getBlockDimension()
376 346
377 347
378 348 def getFromServer(self):
379 349 self.flagDiscontinuousBlock = 0
380 350 self.profileIndex = 0
381 351 self.flagIsNewBlock = 1
382 352 self.dataOut.flagNoData = False
383 353 self.nTotalBlocks += 1
384 354 self.nReadBlocks += 1
385 355 self.blockPointer = 0
386 356
387 357 block = self.receiver.recv()
388 358
389 359 self.basicHeaderObj.read(block[self.blockPointer:])
390 360 self.blockPointer += self.basicHeaderObj.length
391 361 self.systemHeaderObj.read(block[self.blockPointer:])
392 362 self.blockPointer += self.systemHeaderObj.length
393 363 self.radarControllerHeaderObj.read(block[self.blockPointer:])
394 364 self.blockPointer += self.radarControllerHeaderObj.length
395 365 self.processingHeaderObj.read(block[self.blockPointer:])
396 366 self.blockPointer += self.processingHeaderObj.length
397 367 self.readFirstHeaderFromServer()
398 368
399 369 timestamp = self.basicHeaderObj.get_datatime()
400 370 print '[Reading] - Block {} - {}'.format(self.nTotalBlocks, timestamp)
401 371 current_pointer_location = self.blockPointer
402 372 junk = numpy.fromstring( block[self.blockPointer:], self.dtype, self.blocksize )
403 373
404 374 try:
405 375 junk = junk.reshape( (self.processingHeaderObj.profilesPerBlock, self.processingHeaderObj.nHeights, self.systemHeaderObj.nChannels) )
406 376 except:
407 377 #print "The read block (%3d) has not enough data" %self.nReadBlocks
408 378 if self.waitDataBlock(pointer_location=current_pointer_location):
409 379 junk = numpy.fromstring( block[self.blockPointer:], self.dtype, self.blocksize )
410 380 junk = junk.reshape( (self.processingHeaderObj.profilesPerBlock, self.processingHeaderObj.nHeights, self.systemHeaderObj.nChannels) )
411 381 # return 0
412 382
413 383 #Dimensions : nChannels, nProfiles, nSamples
414 384
415 385 junk = numpy.transpose(junk, (2,0,1))
416 386 self.datablock = junk['real'] + junk['imag'] * 1j
417 387 self.profileIndex = 0
418 388 if self.selBlocksize == None: self.selBlocksize = self.dataOut.nProfiles
419 389 if self.selBlocktime != None:
420 390 if self.dataOut.nCohInt is not None:
421 391 nCohInt = self.dataOut.nCohInt
422 392 else:
423 393 nCohInt = 1
424 394 self.selBlocksize = int(self.dataOut.nProfiles*round(self.selBlocktime/(nCohInt*self.dataOut.ippSeconds*self.dataOut.nProfiles)))
425 395 self.dataOut.data = self.datablock[:,self.profileIndex:self.profileIndex+self.selBlocksize,:]
426 396 datasize = self.dataOut.data.shape[1]
427 397 if datasize < self.selBlocksize:
428 398 buffer = numpy.zeros((self.dataOut.data.shape[0], self.selBlocksize, self.dataOut.data.shape[2]), dtype = 'complex')
429 399 buffer[:,:datasize,:] = self.dataOut.data
430 400 self.dataOut.data = buffer
431 401 self.profileIndex = blockIndex
432 402
433 403 self.dataOut.flagDataAsBlock = True
434 404 self.flagIsNewBlock = 1
435 405 self.dataOut.realtime = self.online
436 406
437 407 return self.dataOut.data
438 408
439 409 def getData(self):
440 410 """
441 <<<<<<< HEAD
442 getData obtiene una unidad de datos del buffer de lectura, un perfil, y la copia al objeto self.dataOut
443 del tipo "Voltage" con todos los parametros asociados a este (metadata). cuando no hay datos
444 en el buffer de lectura es necesario hacer una nueva lectura de los bloques de datos usando
445 "readNextBlock"
446
447 Ademas incrementa el contador del buffer "self.profileIndex" en 1.
448
449 Return:
450
451 Si el flag self.getByBlock ha sido seteado el bloque completo es copiado a self.dataOut y el self.profileIndex
452 es igual al total de perfiles leidos desde el archivo.
453
454 Si self.getByBlock == False:
455
456 self.dataOut.data = buffer[:, thisProfile, :]
457
458 shape = [nChannels, nHeis]
459
460 Si self.getByBlock == True:
461
462 self.dataOut.data = buffer[:, :, :]
463
464 shape = [nChannels, nProfiles, nHeis]
465
466 Variables afectadas:
467 self.dataOut
468 self.profileIndex
469
470 Affected:
471 self.dataOut
472 self.profileIndex
473 self.flagDiscontinuousBlock
474 self.flagIsNewBlock
475 =======
476 411 getData obtiene una unidad de datos del buffer de lectura, un perfil, y la copia al objeto self.dataOut
477 412 del tipo "Voltage" con todos los parametros asociados a este (metadata). cuando no hay datos
478 413 en el buffer de lectura es necesario hacer una nueva lectura de los bloques de datos usando
479 414 "readNextBlock"
480 415
481 416 Ademas incrementa el contador del buffer "self.profileIndex" en 1.
482 417
483 418 Return:
484 419
485 420 Si el flag self.getByBlock ha sido seteado el bloque completo es copiado a self.dataOut y el self.profileIndex
486 421 es igual al total de perfiles leidos desde el archivo.
487 422
488 423 Si self.getByBlock == False:
489 424
490 425 self.dataOut.data = buffer[:, thisProfile, :]
491 426
492 427 shape = [nChannels, nHeis]
493 428
494 429 Si self.getByBlock == True:
495 430
496 431 self.dataOut.data = buffer[:, :, :]
497 432
498 433 shape = [nChannels, nProfiles, nHeis]
499 434
500 435 Variables afectadas:
501 436 self.dataOut
502 437 self.profileIndex
503 438
504 439 Affected:
505 440 self.dataOut
506 441 self.profileIndex
507 442 self.flagDiscontinuousBlock
508 443 self.flagIsNewBlock
509 >>>>>>> online_data_hour
510 444 """
511 445 if self.flagNoMoreFiles:
512 446 self.dataOut.flagNoData = True
513 447 print 'Process finished'
514 448 return 0
515 449 self.flagDiscontinuousBlock = 0
516 450 self.flagIsNewBlock = 0
517 451 if self.__hasNotDataInBuffer():
518 452 if not( self.readNextBlock() ):
519 453 return 0
520 454
521 455 self.getFirstHeader()
522 456
523 457 self.reshapeData()
524 458 if self.datablock is None:
525 459 self.dataOut.flagNoData = True
526 460 return 0
527 461
528 462 if not self.getByBlock:
529 463
530 464 """
531 <<<<<<< HEAD
532 Return profile by profile
533
534 If nTxs > 1 then one profile is divided by nTxs and number of total
535 blocks is increased by nTxs (nProfiles *= nTxs)
536 =======
537 465 Return profile by profile
538 466
539 467 If nTxs > 1 then one profile is divided by nTxs and number of total
540 468 blocks is increased by nTxs (nProfiles *= nTxs)
541 >>>>>>> online_data_hour
542 469 """
543 470 self.dataOut.flagDataAsBlock = False
544 471 self.dataOut.data = self.datablock[:,self.profileIndex,:]
545 472 self.dataOut.profileIndex = self.profileIndex
546 473
547 474 self.profileIndex += 1
548 <<<<<<< HEAD
549
550 # elif self.selBlocksize==None or self.selBlocksize==self.dataOut.nProfiles:
551 # """
552 # Return all block
553 # """
554 # self.dataOut.flagDataAsBlock = True
555 # self.dataOut.data = self.datablock
556 # self.dataOut.profileIndex = self.dataOut.nProfiles - 1
557 #
558 # self.profileIndex = self.dataOut.nProfiles
559
560 =======
561 475
562 476 # elif self.selBlocksize==None or self.selBlocksize==self.dataOut.nProfiles:
563 477 # """
564 478 # Return all block
565 479 # """
566 480 # self.dataOut.flagDataAsBlock = True
567 481 # self.dataOut.data = self.datablock
568 482 # self.dataOut.profileIndex = self.dataOut.nProfiles - 1
569 483 #
570 484 # self.profileIndex = self.dataOut.nProfiles
571 485
572 >>>>>>> online_data_hour
573 486 else:
574 487 """
575 488 Return a block
576 489 """
577 490 if self.selBlocksize == None: self.selBlocksize = self.dataOut.nProfiles
578 491 if self.selBlocktime != None:
579 492 if self.dataOut.nCohInt is not None:
580 493 nCohInt = self.dataOut.nCohInt
581 494 else:
582 495 nCohInt = 1
583 496 self.selBlocksize = int(self.dataOut.nProfiles*round(self.selBlocktime/(nCohInt*self.dataOut.ippSeconds*self.dataOut.nProfiles)))
584 497
585 498 self.dataOut.data = self.datablock[:,self.profileIndex:self.profileIndex+self.selBlocksize,:]
586 499 self.profileIndex += self.selBlocksize
587 500 datasize = self.dataOut.data.shape[1]
588 501
589 502 if datasize < self.selBlocksize:
590 503 buffer = numpy.zeros((self.dataOut.data.shape[0],self.selBlocksize,self.dataOut.data.shape[2]), dtype = 'complex')
591 504 buffer[:,:datasize,:] = self.dataOut.data
592 505
593 506 while datasize < self.selBlocksize: #Not enough profiles to fill the block
594 507 if not( self.readNextBlock() ):
595 508 return 0
596 509 self.getFirstHeader()
597 510 self.reshapeData()
598 511 if self.datablock is None:
599 512 self.dataOut.flagNoData = True
600 513 return 0
601 514 #stack data
602 515 blockIndex = self.selBlocksize - datasize
603 516 datablock1 = self.datablock[:,:blockIndex,:]
604 517
605 518 buffer[:,datasize:datasize+datablock1.shape[1],:] = datablock1
606 519 datasize += datablock1.shape[1]
607 520
608 521 self.dataOut.data = buffer
609 522 self.profileIndex = blockIndex
610 523
611 524 self.dataOut.flagDataAsBlock = True
612 525 self.dataOut.nProfiles = self.dataOut.data.shape[1]
613 526
614 527 self.dataOut.flagNoData = False
615 528
616 529 self.getBasicHeader()
617 530
618 531 self.dataOut.realtime = self.online
619 532
620 533 return self.dataOut.data
621 534
622 535 class VoltageWriter(JRODataWriter, Operation):
623 536 """
624 537 Esta clase permite escribir datos de voltajes a archivos procesados (.r). La escritura
625 538 de los datos siempre se realiza por bloques.
626 539 """
627 540
628 541 ext = ".r"
629 542
630 543 optchar = "D"
631 544
632 545 shapeBuffer = None
633 546
634 547
635 548 def __init__(self, **kwargs):
636 549 """
637 550 Inicializador de la clase VoltageWriter para la escritura de datos de espectros.
638 551
639 552 Affected:
640 553 self.dataOut
641 554
642 555 Return: None
643 556 """
644 557 Operation.__init__(self, **kwargs)
645 558
646 559 self.nTotalBlocks = 0
647 560
648 561 self.profileIndex = 0
649 562
650 563 self.isConfig = False
651 564
652 565 self.fp = None
653 566
654 567 self.flagIsNewFile = 1
655 568
656 569 self.blockIndex = 0
657 570
658 571 self.flagIsNewBlock = 0
659 572
660 573 self.setFile = None
661 574
662 575 self.dtype = None
663 576
664 577 self.path = None
665 578
666 579 self.filename = None
667 580
668 581 self.basicHeaderObj = BasicHeader(LOCALTIME)
669 582
670 583 self.systemHeaderObj = SystemHeader()
671 584
672 585 self.radarControllerHeaderObj = RadarControllerHeader()
673 586
674 587 self.processingHeaderObj = ProcessingHeader()
675 588
676 589 def hasAllDataInBuffer(self):
677 590 if self.profileIndex >= self.processingHeaderObj.profilesPerBlock:
678 591 return 1
679 592 return 0
680 593
681 594
682 595 def setBlockDimension(self):
683 596 """
684 597 Obtiene las formas dimensionales del los subbloques de datos que componen un bloque
685 598
686 599 Affected:
687 600 self.shape_spc_Buffer
688 601 self.shape_cspc_Buffer
689 602 self.shape_dc_Buffer
690 603
691 604 Return: None
692 605 """
693 606 self.shapeBuffer = (self.processingHeaderObj.profilesPerBlock,
694 607 self.processingHeaderObj.nHeights,
695 608 self.systemHeaderObj.nChannels)
696 609
697 610 self.datablock = numpy.zeros((self.systemHeaderObj.nChannels,
698 611 self.processingHeaderObj.profilesPerBlock,
699 612 self.processingHeaderObj.nHeights),
700 613 dtype=numpy.dtype('complex64'))
701 614
702 615 def writeBlock(self):
703 616 """
704 617 Escribe el buffer en el file designado
705 618
706 619 Affected:
707 620 self.profileIndex
708 621 self.flagIsNewFile
709 622 self.flagIsNewBlock
710 623 self.nTotalBlocks
711 624 self.blockIndex
712 625
713 626 Return: None
714 627 """
715 628 data = numpy.zeros( self.shapeBuffer, self.dtype )
716 629
717 630 junk = numpy.transpose(self.datablock, (1,2,0))
718 631
719 632 data['real'] = junk.real
720 633 data['imag'] = junk.imag
721 634
722 635 data = data.reshape( (-1) )
723 636
724 637 data.tofile( self.fp )
725 638
726 639 self.datablock.fill(0)
727 640
728 641 self.profileIndex = 0
729 642 self.flagIsNewFile = 0
730 643 self.flagIsNewBlock = 1
731 644
732 645 self.blockIndex += 1
733 646 self.nTotalBlocks += 1
734 647
735 648 # print "[Writing] Block = %04d" %self.blockIndex
736 649
737 650 def putData(self):
738 651 """
739 652 Setea un bloque de datos y luego los escribe en un file
740 653
741 654 Affected:
742 655 self.flagIsNewBlock
743 656 self.profileIndex
744 657
745 658 Return:
746 659 0 : Si no hay data o no hay mas files que puedan escribirse
747 660 1 : Si se escribio la data de un bloque en un file
748 661 """
749 662 if self.dataOut.flagNoData:
750 663 return 0
751 664
752 665 self.flagIsNewBlock = 0
753 666
754 667 if self.dataOut.flagDiscontinuousBlock:
755 668 self.datablock.fill(0)
756 669 self.profileIndex = 0
757 670 self.setNextFile()
758 671
759 672 if self.profileIndex == 0:
760 673 self.setBasicHeader()
761 674
762 675 self.datablock[:,self.profileIndex,:] = self.dataOut.data
763 676
764 677 self.profileIndex += 1
765 678
766 679 if self.hasAllDataInBuffer():
767 680 #if self.flagIsNewFile:
768 681 self.writeNextBlock()
769 682 # self.setFirstHeader()
770 683
771 684 return 1
772 685
773 686 def __getBlockSize(self):
774 687 '''
775 688 Este metodos determina el cantidad de bytes para un bloque de datos de tipo Voltage
776 689 '''
777 690
778 691 dtype_width = self.getDtypeWidth()
779 692
780 693 blocksize = int(self.dataOut.nHeights * self.dataOut.nChannels * self.profilesPerBlock * dtype_width * 2)
781 694
782 695 return blocksize
783 696
784 697 def setFirstHeader(self):
785 698
786 699 """
787 700 Obtiene una copia del First Header
788 701
789 702 Affected:
790 703 self.systemHeaderObj
791 704 self.radarControllerHeaderObj
792 705 self.dtype
793 706
794 707 Return:
795 708 None
796 709 """
797 710
798 711 self.systemHeaderObj = self.dataOut.systemHeaderObj.copy()
799 712 self.systemHeaderObj.nChannels = self.dataOut.nChannels
800 713 self.radarControllerHeaderObj = self.dataOut.radarControllerHeaderObj.copy()
801 714
802 715 self.processingHeaderObj.dtype = 0 # Voltage
803 716 self.processingHeaderObj.blockSize = self.__getBlockSize()
804 717 self.processingHeaderObj.profilesPerBlock = self.profilesPerBlock
805 718 self.processingHeaderObj.dataBlocksPerFile = self.blocksPerFile
806 719 self.processingHeaderObj.nWindows = 1 #podria ser 1 o self.dataOut.processingHeaderObj.nWindows
807 720 self.processingHeaderObj.nCohInt = self.dataOut.nCohInt
808 721 self.processingHeaderObj.nIncohInt = 1 # Cuando la data de origen es de tipo Voltage
809 722 self.processingHeaderObj.totalSpectra = 0 # Cuando la data de origen es de tipo Voltage
810 723
811 724 if self.dataOut.code is not None:
812 725 self.processingHeaderObj.code = self.dataOut.code
813 726 self.processingHeaderObj.nCode = self.dataOut.nCode
814 727 self.processingHeaderObj.nBaud = self.dataOut.nBaud
815 728
816 729 if self.processingHeaderObj.nWindows != 0:
817 730 self.processingHeaderObj.firstHeight = self.dataOut.heightList[0]
818 731 self.processingHeaderObj.deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
819 732 self.processingHeaderObj.nHeights = self.dataOut.nHeights
820 733 self.processingHeaderObj.samplesWin = self.dataOut.nHeights
821 734
822 735 self.processingHeaderObj.processFlags = self.getProcessFlags()
823 736
824 737 self.setBasicHeader()
@@ -1,1021 +1,1008
1 1 '''
2 2 @author: Daniel Suarez
3 3 '''
4 4 import os
5 5 import glob
6 6 import ftplib
7 7
8 8 try:
9 9 import paramiko
10 10 import scp
11 11 except:
12 12 print "You should install paramiko and scp libraries \nif you want to use SSH protocol to upload files to the server"
13 13
14 14 import time
15 15
16 16 import threading
17 17 Thread = threading.Thread
18 18
19 19 # try:
20 20 # from gevent import sleep
21 21 # except:
22 22 from time import sleep
23 23
24 24 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
25 25
26 26 class Remote(Thread):
27 27 """
28 28 Remote is a parent class used to define the behaviour of FTP and SSH class. These clases are
29 29 used to upload or download files remotely.
30 30
31 31 Non-standard Python modules used:
32 32 None
33 33
34 34 Written by:
35 35 "Miguel Urco":mailto:miguel.urco@jro.igp.gob.pe Jun. 03, 2015
36 36 """
37 37
38 38 server = None
39 39 username = None
40 40 password = None
41 41 remotefolder = None
42 42
43 43 period = 60
44 44 fileList = []
45 45 bussy = False
46 46
47 47 def __init__(self, server, username, password, remotefolder, period=60):
48 48
49 49 Thread.__init__(self)
50 50
51 51 self.setDaemon(True)
52 52
53 53 self.status = 0
54 54
55 55 self.__server = server
56 56 self.__username = username
57 57 self.__password = password
58 58 self.__remotefolder = remotefolder
59 59
60 60 self.period = period
61 61
62 62 self.fileList = []
63 63 self.bussy = False
64 64
65 65 self.stopFlag = False
66 66
67 67 print "[Remote Server] Opening server: %s" %self.__server
68 68 if self.open(self.__server, self.__username, self.__password, self.__remotefolder):
69 69 print "[Remote Server] %s server was opened successfully" %self.__server
70 70
71 71 self.close()
72 72
73 73 self.mutex = threading.Lock()
74 74
75 75 def stop(self):
76 76
77 77 self.stopFlag = True
78 78 self.join(10)
79 79
80 80 def open(self):
81 81 """
82 82 Connect to server and create a connection class (FTP or SSH) to remote server.
83 83 """
84 84 raise NotImplementedError, "Implement this method in child class"
85 85
86 86 def close(self):
87 87 """
88 88 Close connection to server
89 89 """
90 90 raise NotImplementedError, "Implement this method in child class"
91 91
92 92 def mkdir(self, remotefolder):
93 93 """
94 94 Create a folder remotely
95 95 """
96 96 raise NotImplementedError, "Implement this method in child class"
97 97
98 98 def cd(self, remotefolder):
99 99 """
100 100 Change working directory in remote server
101 101 """
102 102 raise NotImplementedError, "Implement this method in child class"
103 103
104 104 def download(self, filename, localfolder=None):
105 105 """
106 106 Download a file from server to local host
107 107 """
108 108 raise NotImplementedError, "Implement this method in child class"
109 109
110 110 def sendFile(self, fullfilename):
111 111 """
112 112 sendFile method is used to upload a local file to the current directory in remote server
113 113
114 114 Inputs:
115 115 fullfilename - full path name of local file to store in remote directory
116 116
117 117 Returns:
118 118 0 in error case else 1
119 119 """
120 120 raise NotImplementedError, "Implement this method in child class"
121 121
122 122 def upload(self, fullfilename, remotefolder=None):
123 123 """
124 124 upload method is used to upload a local file to remote directory. This method changes
125 125 working directory before sending a file.
126 126
127 127 Inputs:
128 128 fullfilename - full path name of local file to store in remote directory
129 129
130 130 remotefolder - remote directory
131 131
132 132 Returns:
133 133 0 in error case else 1
134 134 """
135 135 print "[Remote Server] Uploading %s to %s:%s" %(fullfilename, self.server, self.remotefolder)
136 136
137 137 if not self.status:
138 138 return 0
139 139
140 140 if remotefolder == None:
141 141 remotefolder = self.remotefolder
142 142
143 143 if not self.cd(remotefolder):
144 144 return 0
145 145
146 146 if not self.sendFile(fullfilename):
147 147 print "[Remote Server] Error uploading file %s" %fullfilename
148 148 return 0
149 149
150 150 print "[Remote Server] upload finished successfully"
151 151
152 152 return 1
153 153
154 154 def delete(self, filename):
155 155 """
156 156 Remove a file from remote server
157 157 """
158 158 pass
159 159
160 160 def updateFileList(self, fileList):
161 161 """
162 162 Remove a file from remote server
163 163 """
164 164
165 165 if fileList == self.fileList:
166 166 return 0
167 167
168 168 self.mutex.acquire()
169 <<<<<<< HEAD
170 169 # init = time.time()
171 170 #
172 171 # while(self.bussy):
173 172 # sleep(0.1)
174 173 # if time.time() - init > 2*self.period:
175 174 # return 0
176 =======
177 # init = time.time()
178 #
179 # while(self.bussy):
180 # sleep(0.1)
181 # if time.time() - init > 2*self.period:
182 # return 0
183 >>>>>>> online_data_hour
184 175
185 176 self.fileList = fileList
186 177 self.mutex.release()
187 178 return 1
188 179
189 180 def run(self):
190 181
191 182 if not self.status:
192 183 print "Finishing FTP service"
193 184 return
194 185
195 186 if not self.cd(self.remotefolder):
196 187 raise ValueError, "Could not access to the new remote directory: %s" %self.remotefolder
197 188
198 189 while True:
199 190
200 191 for i in range(self.period):
201 192 if self.stopFlag:
202 193 break
203 194 sleep(1)
204 195
205 196 if self.stopFlag:
206 197 break
207 198
208 <<<<<<< HEAD
209 199 # self.bussy = True
210 =======
211 # self.bussy = True
212 >>>>>>> online_data_hour
213 200 self.mutex.acquire()
214 201
215 202 print "[Remote Server] Opening %s" %self.__server
216 203 if not self.open(self.__server, self.__username, self.__password, self.__remotefolder):
217 204 self.mutex.release()
218 205 continue
219 206
220 207 for thisFile in self.fileList:
221 208 self.upload(thisFile, self.remotefolder)
222 209
223 210 print "[Remote Server] Closing %s" %self.__server
224 211 self.close()
225 212
226 213 self.mutex.release()
227 214 # self.bussy = False
228 215
229 216 print "[Remote Server] Thread stopped successfully"
230 217
231 218 class FTPClient(Remote):
232 219
233 220 __ftpClientObj = None
234 221
235 222 def __init__(self, server, username, password, remotefolder, period=60):
236 223 """
237 224 """
238 225 Remote.__init__(self, server, username, password, remotefolder, period)
239 226
240 227 def open(self, server, username, password, remotefolder):
241 228
242 229 """
243 230 This method is used to set FTP parameters and establish a connection to remote server
244 231
245 232 Inputs:
246 233 server - remote server IP Address
247 234
248 235 username - remote server Username
249 236
250 237 password - remote server password
251 238
252 239 remotefolder - remote server current working directory
253 240
254 241 Return:
255 242 Boolean - Returns 1 if a connection has been established, 0 otherwise
256 243
257 244 Affects:
258 245 self.status - in case of error or fail connection this parameter is set to 0 else 1
259 246
260 247 """
261 248
262 249 if server == None:
263 250 raise ValueError, "FTP server should be defined"
264 251
265 252 if username == None:
266 253 raise ValueError, "FTP username should be defined"
267 254
268 255 if password == None:
269 256 raise ValueError, "FTP password should be defined"
270 257
271 258 if remotefolder == None:
272 259 raise ValueError, "FTP remote folder should be defined"
273 260
274 261 try:
275 262 ftpClientObj = ftplib.FTP(server)
276 263 except ftplib.all_errors, e:
277 264 print "[FTP Server]: FTP server connection fail: %s" %server
278 265 print "[FTP Server]:", e
279 266 self.status = 0
280 267 return 0
281 268
282 269 try:
283 270 ftpClientObj.login(username, password)
284 271 except ftplib.all_errors:
285 272 print "[FTP Server]: FTP username or password are incorrect"
286 273 self.status = 0
287 274 return 0
288 275
289 276 if remotefolder == None:
290 277 remotefolder = ftpClientObj.pwd()
291 278 else:
292 279 try:
293 280 ftpClientObj.cwd(remotefolder)
294 281 except ftplib.all_errors:
295 282 print "[FTP Server]: FTP remote folder is invalid: %s" %remotefolder
296 283 remotefolder = ftpClientObj.pwd()
297 284
298 285 self.server = server
299 286 self.username = username
300 287 self.password = password
301 288 self.remotefolder = remotefolder
302 289 self.__ftpClientObj = ftpClientObj
303 290 self.status = 1
304 291
305 292 return 1
306 293
307 294 def close(self):
308 295 """
309 296 Close connection to remote server
310 297 """
311 298 if not self.status:
312 299 return 0
313 300
314 301 self.__ftpClientObj.close()
315 302
316 303 def mkdir(self, remotefolder):
317 304 """
318 305 mkdir is used to make a new directory in remote server
319 306
320 307 Input:
321 308 remotefolder - directory name
322 309
323 310 Return:
324 311 0 in error case else 1
325 312 """
326 313 if not self.status:
327 314 return 0
328 315
329 316 try:
330 317 self.__ftpClientObj.mkd(dirname)
331 318 except ftplib.all_errors:
332 319 print "[FTP Server]: Error creating remote folder: %s" %remotefolder
333 320 return 0
334 321
335 322 return 1
336 323
337 324 def cd(self, remotefolder):
338 325 """
339 326 cd is used to change remote working directory on server
340 327
341 328 Input:
342 329 remotefolder - current working directory
343 330
344 331 Affects:
345 332 self.remotefolder
346 333
347 334 Return:
348 335 0 in case of error else 1
349 336 """
350 337 if not self.status:
351 338 return 0
352 339
353 340 if remotefolder == self.remotefolder:
354 341 return 1
355 342
356 343 try:
357 344 self.__ftpClientObj.cwd(remotefolder)
358 345 except ftplib.all_errors:
359 346 print '[FTP Server]: Error changing to %s' %remotefolder
360 347 print '[FTP Server]: Trying to create remote folder'
361 348
362 349 if not self.mkdir(remotefolder):
363 350 print '[FTP Server]: Remote folder could not be created'
364 351 return 0
365 352
366 353 try:
367 354 self.__ftpClientObj.cwd(remotefolder)
368 355 except ftplib.all_errors:
369 356 return 0
370 357
371 358 self.remotefolder = remotefolder
372 359
373 360 return 1
374 361
375 362 def sendFile(self, fullfilename):
376 363
377 364 if not self.status:
378 365 return 0
379 366
380 367 fp = open(fullfilename, 'rb')
381 368
382 369 filename = os.path.basename(fullfilename)
383 370
384 371 command = "STOR %s" %filename
385 372
386 373 try:
387 374 self.__ftpClientObj.storbinary(command, fp)
388 375 except ftplib.all_errors, e:
389 376 print "[FTP Server]:", e
390 377 return 0
391 378
392 379 try:
393 380 self.__ftpClientObj.sendcmd('SITE CHMOD 755 ' + filename)
394 381 except ftplib.all_errors, e:
395 382 print "[FTP Server]:", e
396 383
397 384 fp.close()
398 385
399 386 return 1
400 387
401 388 class SSHClient(Remote):
402 389
403 390 __sshClientObj = None
404 391 __scpClientObj = None
405 392
406 393 def __init__(self, server, username, password, remotefolder, period=60):
407 394 """
408 395 """
409 396 Remote.__init__(self, server, username, password, remotefolder, period)
410 397
411 398 def open(self, server, username, password, remotefolder, port=22):
412 399
413 400 """
414 401 This method is used to set SSH parameters and establish a connection to a remote server
415 402
416 403 Inputs:
417 404 server - remote server IP Address
418 405
419 406 username - remote server Username
420 407
421 408 password - remote server password
422 409
423 410 remotefolder - remote server current working directory
424 411
425 412 Return: void
426 413
427 414 Affects:
428 415 self.status - in case of error or fail connection this parameter is set to 0 else 1
429 416
430 417 """
431 418 import socket
432 419
433 420 if server == None:
434 421 raise ValueError, "SSH server should be defined"
435 422
436 423 if username == None:
437 424 raise ValueError, "SSH username should be defined"
438 425
439 426 if password == None:
440 427 raise ValueError, "SSH password should be defined"
441 428
442 429 if remotefolder == None:
443 430 raise ValueError, "SSH remote folder should be defined"
444 431
445 432 sshClientObj = paramiko.SSHClient()
446 433
447 434 sshClientObj.load_system_host_keys()
448 435 sshClientObj.set_missing_host_key_policy(paramiko.WarningPolicy())
449 436
450 437 self.status = 0
451 438 try:
452 439 sshClientObj.connect(server, username=username, password=password, port=port)
453 440 except paramiko.AuthenticationException, e:
454 441 # print "SSH username or password are incorrect: %s"
455 442 print "[SSH Server]:", e
456 443 return 0
457 444 except SSHException, e:
458 445 print "[SSH Server]:", e
459 446 return 0
460 447 except socket.error:
461 448 self.status = 0
462 449 print "[SSH Server]:", e
463 450 return 0
464 451
465 452 self.status = 1
466 453 scpClientObj = scp.SCPClient(sshClientObj.get_transport(), socket_timeout=30)
467 454
468 455 if remotefolder == None:
469 456 remotefolder = self.pwd()
470 457
471 458 self.server = server
472 459 self.username = username
473 460 self.password = password
474 461 self.__sshClientObj = sshClientObj
475 462 self.__scpClientObj = scpClientObj
476 463 self.status = 1
477 464
478 465 if not self.cd(remotefolder):
479 466 raise ValueError, "[SSH Server]: Could not access to remote folder: %s" %remotefolder
480 467 return 0
481 468
482 469 self.remotefolder = remotefolder
483 470
484 471 return 1
485 472
486 473 def close(self):
487 474 """
488 475 Close connection to remote server
489 476 """
490 477 if not self.status:
491 478 return 0
492 479
493 480 self.__scpClientObj.close()
494 481 self.__sshClientObj.close()
495 482
496 483 def __execute(self, command):
497 484 """
498 485 __execute a command on remote server
499 486
500 487 Input:
501 488 command - Exmaple 'ls -l'
502 489
503 490 Return:
504 491 0 in error case else 1
505 492 """
506 493 if not self.status:
507 494 return 0
508 495
509 496 stdin, stdout, stderr = self.__sshClientObj.exec_command(command)
510 497
511 498 result = stderr.readlines()
512 499 if len(result) > 1:
513 500 return 0
514 501
515 502 result = stdout.readlines()
516 503 if len(result) > 1:
517 504 return result[0][:-1]
518 505
519 506 return 1
520 507
521 508 def mkdir(self, remotefolder):
522 509 """
523 510 mkdir is used to make a new directory in remote server
524 511
525 512 Input:
526 513 remotefolder - directory name
527 514
528 515 Return:
529 516 0 in error case else 1
530 517 """
531 518
532 519 command = 'mkdir %s' %remotefolder
533 520
534 521 return self.__execute(command)
535 522
536 523 def pwd(self):
537 524
538 525 command = 'pwd'
539 526
540 527 return self.__execute(command)
541 528
542 529 def cd(self, remotefolder):
543 530 """
544 531 cd is used to change remote working directory on server
545 532
546 533 Input:
547 534 remotefolder - current working directory
548 535
549 536 Affects:
550 537 self.remotefolder
551 538
552 539 Return:
553 540 0 in case of error else 1
554 541 """
555 542 if not self.status:
556 543 return 0
557 544
558 545 if remotefolder == self.remotefolder:
559 546 return 1
560 547
561 548 chk_command = "cd %s; pwd" %remotefolder
562 549 mkdir_command = "mkdir %s" %remotefolder
563 550
564 551 if not self.__execute(chk_command):
565 552 if not self.__execute(mkdir_command):
566 553 self.remotefolder = None
567 554 return 0
568 555
569 556 self.remotefolder = remotefolder
570 557
571 558 return 1
572 559
573 560 def sendFile(self, fullfilename):
574 561
575 562 if not self.status:
576 563 return 0
577 564
578 565 try:
579 566 self.__scpClientObj.put(fullfilename, remote_path=self.remotefolder)
580 567 except scp.ScpError, e:
581 568 print "[SSH Server]", str(e)
582 569 return 0
583 570
584 571 remotefile = os.path.join(self.remotefolder, os.path.split(fullfilename)[-1])
585 572 command = 'chmod 775 %s' %remotefile
586 573
587 574 return self.__execute(command)
588 575
589 576 class SendToServer(ProcessingUnit):
590 577
591 578 def __init__(self, **kwargs):
592 579
593 580 ProcessingUnit.__init__(self, **kwargs)
594 581
595 582 self.isConfig = False
596 583 self.clientObj = None
597 584
598 585 def setup(self, server, username, password, remotefolder, localfolder, ext='.png', period=60, protocol='ftp', **kwargs):
599 586
600 587 self.clientObj = None
601 588 self.localfolder = localfolder
602 589 self.ext = ext
603 590 self.period = period
604 591
605 592 if str.lower(protocol) == 'ftp':
606 593 self.clientObj = FTPClient(server, username, password, remotefolder, period)
607 594
608 595 if str.lower(protocol) == 'ssh':
609 596 self.clientObj = SSHClient(server, username, password, remotefolder, period)
610 597
611 598 if not self.clientObj:
612 599 raise ValueError, "%s has been chosen as remote access protocol but it is not valid" %protocol
613 600
614 601 self.clientObj.start()
615 602
616 603 def findFiles(self):
617 604
618 605 if not type(self.localfolder) == list:
619 606 folderList = [self.localfolder]
620 607 else:
621 608 folderList = self.localfolder
622 609
623 610 #Remove duplicate items
624 611 folderList = list(set(folderList))
625 612
626 613 fullfilenameList = []
627 614
628 615 for thisFolder in folderList:
629 616
630 617 print "[Remote Server]: Searching files on %s" %thisFolder
631 618
632 619 filenameList = glob.glob1(thisFolder, '*%s' %self.ext)
633 620
634 621 if len(filenameList) < 1:
635 622
636 623 continue
637 624
638 625 for thisFile in filenameList:
639 626 fullfilename = os.path.join(thisFolder, thisFile)
640 627
641 628 if fullfilename in fullfilenameList:
642 629 continue
643 630
644 631 #Only files modified in the last 30 minutes are considered
645 632 if os.path.getmtime(fullfilename) < time.time() - 30*60:
646 633 continue
647 634
648 635 fullfilenameList.append(fullfilename)
649 636
650 637 return fullfilenameList
651 638
652 639 def run(self, **kwargs):
653 640 if not self.isConfig:
654 641 self.init = time.time()
655 642 self.setup(**kwargs)
656 643 self.isConfig = True
657 644
658 645 if not self.clientObj.is_alive():
659 646 print "[Remote Server]: Restarting connection "
660 647 self.setup(**kwargs)
661 648
662 649 if time.time() - self.init >= self.period:
663 650 fullfilenameList = self.findFiles()
664 651
665 652 if self.clientObj.updateFileList(fullfilenameList):
666 653 print "[Remote Server]: Sending the next files ", str(fullfilenameList)
667 654 self.init = time.time()
668 655
669 656 def close(self):
670 657 print "[Remote Server] Stopping thread"
671 658 self.clientObj.stop()
672 659
673 660
674 661 class FTP(object):
675 662 """
676 663 Ftp is a public class used to define custom File Transfer Protocol from "ftplib" python module
677 664
678 665 Non-standard Python modules used: None
679 666
680 667 Written by "Daniel Suarez":mailto:daniel.suarez@jro.igp.gob.pe Oct. 26, 2010
681 668 """
682 669
683 670 def __init__(self,server = None, username=None, password=None, remotefolder=None):
684 671 """
685 672 This method is used to setting parameters for FTP and establishing connection to remote server
686 673
687 674 Inputs:
688 675 server - remote server IP Address
689 676
690 677 username - remote server Username
691 678
692 679 password - remote server password
693 680
694 681 remotefolder - remote server current working directory
695 682
696 683 Return: void
697 684
698 685 Affects:
699 686 self.status - in Error Case or Connection Failed this parameter is set to 1 else 0
700 687
701 688 self.folderList - sub-folder list of remote folder
702 689
703 690 self.fileList - file list of remote folder
704 691
705 692
706 693 """
707 694
708 695 if ((server == None) and (username==None) and (password==None) and (remotefolder==None)):
709 696 server, username, password, remotefolder = self.parmsByDefault()
710 697
711 698 self.server = server
712 699 self.username = username
713 700 self.password = password
714 701 self.remotefolder = remotefolder
715 702 self.file = None
716 703 self.ftp = None
717 704 self.status = 0
718 705
719 706 try:
720 707 self.ftp = ftplib.FTP(self.server)
721 708 self.ftp.login(self.username,self.password)
722 709 self.ftp.cwd(self.remotefolder)
723 710 # print 'Connect to FTP Server: Successfully'
724 711
725 712 except ftplib.all_errors:
726 713 print 'Error FTP Service'
727 714 self.status = 1
728 715 return
729 716
730 717
731 718
732 719 self.dirList = []
733 720
734 721 try:
735 722 self.dirList = self.ftp.nlst()
736 723
737 724 except ftplib.error_perm, resp:
738 725 if str(resp) == "550 No files found":
739 726 print "no files in this directory"
740 727 self.status = 1
741 728 return
742 729
743 730 except ftplib.all_errors:
744 731 print 'Error Displaying Dir-Files'
745 732 self.status = 1
746 733 return
747 734
748 735 self.fileList = []
749 736 self.folderList = []
750 737 #only for test
751 738 for f in self.dirList:
752 739 name, ext = os.path.splitext(f)
753 740 if ext != '':
754 741 self.fileList.append(f)
755 742 # print 'filename: %s - size: %d'%(f,self.ftp.size(f))
756 743
757 744 def parmsByDefault(self):
758 745 server = 'jro-app.igp.gob.pe'
759 746 username = 'wmaster'
760 747 password = 'mst2010vhf'
761 748 remotefolder = '/home/wmaster/graficos'
762 749
763 750 return server, username, password, remotefolder
764 751
765 752
766 753 def mkd(self,dirname):
767 754 """
768 755 mkd is used to make directory in remote server
769 756
770 757 Input:
771 758 dirname - directory name
772 759
773 760 Return:
774 761 1 in error case else 0
775 762 """
776 763 try:
777 764 self.ftp.mkd(dirname)
778 765 except:
779 766 print 'Error creating remote folder:%s'%dirname
780 767 return 1
781 768
782 769 return 0
783 770
784 771
785 772 def delete(self,filename):
786 773 """
787 774 delete is used to delete file in current working directory of remote server
788 775
789 776 Input:
790 777 filename - filename to delete in remote folder
791 778
792 779 Return:
793 780 1 in error case else 0
794 781 """
795 782
796 783 try:
797 784 self.ftp.delete(filename)
798 785 except:
799 786 print 'Error deleting remote file:%s'%filename
800 787 return 1
801 788
802 789 return 0
803 790
804 791 def download(self,filename,localfolder):
805 792 """
806 793 download is used to downloading file from remote folder into local folder
807 794
808 795 Inputs:
809 796 filename - filename to donwload
810 797
811 798 localfolder - directory local to store filename
812 799
813 800 Returns:
814 801 self.status - 1 in error case else 0
815 802 """
816 803
817 804 self.status = 0
818 805
819 806
820 807 if not(filename in self.fileList):
821 808 print 'filename:%s not exists'%filename
822 809 self.status = 1
823 810 return self.status
824 811
825 812 newfilename = os.path.join(localfolder,filename)
826 813
827 814 self.file = open(newfilename, 'wb')
828 815
829 816 try:
830 817 print 'Download: ' + filename
831 818 self.ftp.retrbinary('RETR ' + filename, self.__handleDownload)
832 819 print 'Download Complete'
833 820 except ftplib.all_errors:
834 821 print 'Error Downloading ' + filename
835 822 self.status = 1
836 823 return self.status
837 824
838 825 self.file.close()
839 826
840 827 return self.status
841 828
842 829
843 830 def __handleDownload(self,block):
844 831 """
845 832 __handleDownload is used to handle writing file
846 833 """
847 834 self.file.write(block)
848 835
849 836
850 837 def upload(self,filename,remotefolder=None):
851 838 """
852 839 upload is used to uploading local file to remote directory
853 840
854 841 Inputs:
855 842 filename - full path name of local file to store in remote directory
856 843
857 844 remotefolder - remote directory
858 845
859 846 Returns:
860 847 self.status - 1 in error case else 0
861 848 """
862 849
863 850 if remotefolder == None:
864 851 remotefolder = self.remotefolder
865 852
866 853 self.status = 0
867 854
868 855 try:
869 856 self.ftp.cwd(remotefolder)
870 857
871 858 self.file = open(filename, 'rb')
872 859
873 860 (head, tail) = os.path.split(filename)
874 861
875 862 command = "STOR " + tail
876 863
877 864 print 'Uploading: ' + tail
878 865 self.ftp.storbinary(command, self.file)
879 866 print 'Upload Completed'
880 867
881 868 except ftplib.all_errors:
882 869 print 'Error Uploading ' + tail
883 870 self.status = 1
884 871 return self.status
885 872
886 873 self.file.close()
887 874
888 875 #back to initial directory in __init__()
889 876 self.ftp.cwd(self.remotefolder)
890 877
891 878 return self.status
892 879
893 880
894 881 def dir(self,remotefolder):
895 882 """
896 883 dir is used to change working directory of remote server and get folder and file list
897 884
898 885 Input:
899 886 remotefolder - current working directory
900 887
901 888 Affects:
902 889 self.fileList - file list of working directory
903 890
904 891 Return:
905 892 infoList - list with filenames and size of file in bytes
906 893
907 894 self.folderList - folder list
908 895 """
909 896
910 897 self.remotefolder = remotefolder
911 898 print 'Change to ' + self.remotefolder
912 899 try:
913 900 self.ftp.cwd(remotefolder)
914 901 except ftplib.all_errors:
915 902 print 'Error Change to ' + self.remotefolder
916 903 infoList = None
917 904 self.folderList = None
918 905 return infoList,self.folderList
919 906
920 907 self.dirList = []
921 908
922 909 try:
923 910 self.dirList = self.ftp.nlst()
924 911
925 912 except ftplib.error_perm, resp:
926 913 if str(resp) == "550 No files found":
927 914 print "no files in this directory"
928 915 infoList = None
929 916 self.folderList = None
930 917 return infoList,self.folderList
931 918 except ftplib.all_errors:
932 919 print 'Error Displaying Dir-Files'
933 920 infoList = None
934 921 self.folderList = None
935 922 return infoList,self.folderList
936 923
937 924 infoList = []
938 925 self.fileList = []
939 926 self.folderList = []
940 927 for f in self.dirList:
941 928 name,ext = os.path.splitext(f)
942 929 if ext != '':
943 930 self.fileList.append(f)
944 931 value = (f,self.ftp.size(f))
945 932 infoList.append(value)
946 933
947 934 if ext == '':
948 935 self.folderList.append(f)
949 936
950 937 return infoList,self.folderList
951 938
952 939
953 940 def close(self):
954 941 """
955 942 close is used to close and end FTP connection
956 943
957 944 Inputs: None
958 945
959 946 Return: void
960 947
961 948 """
962 949 self.ftp.close()
963 950
964 951 class SendByFTP(Operation):
965 952
966 953 def __init__(self, **kwargs):
967 954 Operation.__init__(self, **kwargs)
968 955 self.status = 1
969 956 self.counter = 0
970 957
971 958 def error_print(self, ValueError):
972 959
973 960 print ValueError, 'Error FTP'
974 961 print "don't worry the program is running..."
975 962
976 963 def worker_ftp(self, server, username, password, remotefolder, filenameList):
977 964
978 965 self.ftpClientObj = FTP(server, username, password, remotefolder)
979 966 for filename in filenameList:
980 967 self.ftpClientObj.upload(filename)
981 968 self.ftpClientObj.close()
982 969
983 970 def ftp_thread(self, server, username, password, remotefolder):
984 971 if not(self.status):
985 972 return
986 973
987 974 import multiprocessing
988 975
989 976 p = multiprocessing.Process(target=self.worker_ftp, args=(server, username, password, remotefolder, self.filenameList,))
990 977 p.start()
991 978
992 979 p.join(3)
993 980
994 981 if p.is_alive():
995 982 p.terminate()
996 983 p.join()
997 984 print 'killing ftp process...'
998 985 self.status = 0
999 986 return
1000 987
1001 988 self.status = 1
1002 989 return
1003 990
1004 991 def filterByExt(self, ext, localfolder):
1005 992 fnameList = glob.glob1(localfolder,ext)
1006 993 self.filenameList = [os.path.join(localfolder,x) for x in fnameList]
1007 994
1008 995 if len(self.filenameList) == 0:
1009 996 self.status = 0
1010 997
1011 998 def run(self, dataOut, ext, localfolder, remotefolder, server, username, password, period=1):
1012 999
1013 1000 self.counter += 1
1014 1001 if self.counter >= period:
1015 1002 self.filterByExt(ext, localfolder)
1016 1003
1017 1004 self.ftp_thread(server, username, password, remotefolder)
1018 1005
1019 1006 self.counter = 0
1020 1007
1021 1008 self.status = 1
General Comments 0
You need to be logged in to leave comments. Login now