##// END OF EJS Templates
Fix searching files correctly in jroIO_base & Spectra shift
jespinoza -
r1270:8e736242aff9
parent child
Show More
@@ -1,1567 +1,1574
1 1 """
2 2 Created on Jul 2, 2014
3 3
4 4 @author: roj-idl71
5 5 """
6 6 import os
7 7 import sys
8 8 import glob
9 9 import time
10 10 import numpy
11 11 import fnmatch
12 12 import inspect
13 13 import time
14 14 import datetime
15 15 import zmq
16 16
17 17 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
18 18 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
19 19 from schainpy.utils import log
20 20 import schainpy.admin
21 21
22 22 LOCALTIME = True
23 23 DT_DIRECTIVES = {
24 24 '%Y': 4,
25 25 '%y': 2,
26 26 '%m': 2,
27 27 '%d': 2,
28 28 '%j': 3,
29 29 '%H': 2,
30 30 '%M': 2,
31 31 '%S': 2,
32 32 '%f': 6
33 33 }
34 34
35 35
36 36 def isNumber(cad):
37 37 """
38 38 Chequea si el conjunto de caracteres que componen un string puede ser convertidos a un numero.
39 39
40 40 Excepciones:
41 41 Si un determinado string no puede ser convertido a numero
42 42 Input:
43 43 str, string al cual se le analiza para determinar si convertible a un numero o no
44 44
45 45 Return:
46 46 True : si el string es uno numerico
47 47 False : no es un string numerico
48 48 """
49 49 try:
50 50 float(cad)
51 51 return True
52 52 except:
53 53 return False
54 54
55 55
56 56 def isFileInEpoch(filename, startUTSeconds, endUTSeconds):
57 57 """
58 58 Esta funcion determina si un archivo de datos se encuentra o no dentro del rango de fecha especificado.
59 59
60 60 Inputs:
61 61 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
62 62
63 63 startUTSeconds : fecha inicial del rango seleccionado. La fecha esta dada en
64 64 segundos contados desde 01/01/1970.
65 65 endUTSeconds : fecha final del rango seleccionado. La fecha esta dada en
66 66 segundos contados desde 01/01/1970.
67 67
68 68 Return:
69 69 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
70 70 fecha especificado, de lo contrario retorna False.
71 71
72 72 Excepciones:
73 73 Si el archivo no existe o no puede ser abierto
74 74 Si la cabecera no puede ser leida.
75 75
76 76 """
77 77 basicHeaderObj = BasicHeader(LOCALTIME)
78 78
79 79 try:
80 80 fp = open(filename, 'rb')
81 81 except IOError:
82 82 print("The file %s can't be opened" % (filename))
83 83 return 0
84 84
85 85 sts = basicHeaderObj.read(fp)
86 86 fp.close()
87 87
88 88 if not(sts):
89 89 print("Skipping the file %s because it has not a valid header" % (filename))
90 90 return 0
91 91
92 92 if not ((startUTSeconds <= basicHeaderObj.utc) and (endUTSeconds > basicHeaderObj.utc)):
93 93 return 0
94 94
95 95 return 1
96 96
97 97
98 98 def isTimeInRange(thisTime, startTime, endTime):
99 99 if endTime >= startTime:
100 100 if (thisTime < startTime) or (thisTime > endTime):
101 101 return 0
102 102 return 1
103 103 else:
104 104 if (thisTime < startTime) and (thisTime > endTime):
105 105 return 0
106 106 return 1
107 107
108 108
109 109 def isFileInTimeRange(filename, startDate, endDate, startTime, endTime):
110 110 """
111 111 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
112 112
113 113 Inputs:
114 114 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
115 115
116 116 startDate : fecha inicial del rango seleccionado en formato datetime.date
117 117
118 118 endDate : fecha final del rango seleccionado en formato datetime.date
119 119
120 120 startTime : tiempo inicial del rango seleccionado en formato datetime.time
121 121
122 122 endTime : tiempo final del rango seleccionado en formato datetime.time
123 123
124 124 Return:
125 125 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
126 126 fecha especificado, de lo contrario retorna False.
127 127
128 128 Excepciones:
129 129 Si el archivo no existe o no puede ser abierto
130 130 Si la cabecera no puede ser leida.
131 131
132 132 """
133 133
134 134 try:
135 135 fp = open(filename, 'rb')
136 136 except IOError:
137 137 print("The file %s can't be opened" % (filename))
138 138 return None
139 139
140 140 firstBasicHeaderObj = BasicHeader(LOCALTIME)
141 141 systemHeaderObj = SystemHeader()
142 142 radarControllerHeaderObj = RadarControllerHeader()
143 143 processingHeaderObj = ProcessingHeader()
144 144
145 145 lastBasicHeaderObj = BasicHeader(LOCALTIME)
146 146
147 147 sts = firstBasicHeaderObj.read(fp)
148 148
149 149 if not(sts):
150 150 print("[Reading] Skipping the file %s because it has not a valid header" % (filename))
151 151 return None
152 152
153 153 if not systemHeaderObj.read(fp):
154 154 return None
155 155
156 156 if not radarControllerHeaderObj.read(fp):
157 157 return None
158 158
159 159 if not processingHeaderObj.read(fp):
160 160 return None
161 161
162 162 filesize = os.path.getsize(filename)
163 163
164 164 offset = processingHeaderObj.blockSize + 24 # header size
165 165
166 166 if filesize <= offset:
167 167 print("[Reading] %s: This file has not enough data" % filename)
168 168 return None
169 169
170 170 fp.seek(-offset, 2)
171 171
172 172 sts = lastBasicHeaderObj.read(fp)
173 173
174 174 fp.close()
175 175
176 176 thisDatetime = lastBasicHeaderObj.datatime
177 177 thisTime_last_block = thisDatetime.time()
178 178
179 179 thisDatetime = firstBasicHeaderObj.datatime
180 180 thisDate = thisDatetime.date()
181 181 thisTime_first_block = thisDatetime.time()
182 182
183 183 # General case
184 184 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
185 185 #-----------o----------------------------o-----------
186 186 # startTime endTime
187 187
188 188 if endTime >= startTime:
189 189 if (thisTime_last_block < startTime) or (thisTime_first_block > endTime):
190 190 return None
191 191
192 192 return thisDatetime
193 193
194 194 # If endTime < startTime then endTime belongs to the next day
195 195
196 196 #<<<<<<<<<<<o o>>>>>>>>>>>
197 197 #-----------o----------------------------o-----------
198 198 # endTime startTime
199 199
200 200 if (thisDate == startDate) and (thisTime_last_block < startTime):
201 201 return None
202 202
203 203 if (thisDate == endDate) and (thisTime_first_block > endTime):
204 204 return None
205 205
206 206 if (thisTime_last_block < startTime) and (thisTime_first_block > endTime):
207 207 return None
208 208
209 209 return thisDatetime
210 210
211 211
212 212 def isFolderInDateRange(folder, startDate=None, endDate=None):
213 213 """
214 214 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
215 215
216 216 Inputs:
217 217 folder : nombre completo del directorio.
218 218 Su formato deberia ser "/path_root/?YYYYDDD"
219 219
220 220 siendo:
221 221 YYYY : Anio (ejemplo 2015)
222 222 DDD : Dia del anio (ejemplo 305)
223 223
224 224 startDate : fecha inicial del rango seleccionado en formato datetime.date
225 225
226 226 endDate : fecha final del rango seleccionado en formato datetime.date
227 227
228 228 Return:
229 229 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
230 230 fecha especificado, de lo contrario retorna False.
231 231 Excepciones:
232 232 Si el directorio no tiene el formato adecuado
233 233 """
234 234
235 235 basename = os.path.basename(folder)
236 236
237 237 if not isRadarFolder(basename):
238 238 print("The folder %s has not the rigth format" % folder)
239 239 return 0
240 240
241 241 if startDate and endDate:
242 242 thisDate = getDateFromRadarFolder(basename)
243 243
244 244 if thisDate < startDate:
245 245 return 0
246 246
247 247 if thisDate > endDate:
248 248 return 0
249 249
250 250 return 1
251 251
252 252
253 253 def isFileInDateRange(filename, startDate=None, endDate=None):
254 254 """
255 255 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
256 256
257 257 Inputs:
258 258 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
259 259
260 260 Su formato deberia ser "?YYYYDDDsss"
261 261
262 262 siendo:
263 263 YYYY : Anio (ejemplo 2015)
264 264 DDD : Dia del anio (ejemplo 305)
265 265 sss : set
266 266
267 267 startDate : fecha inicial del rango seleccionado en formato datetime.date
268 268
269 269 endDate : fecha final del rango seleccionado en formato datetime.date
270 270
271 271 Return:
272 272 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
273 273 fecha especificado, de lo contrario retorna False.
274 274 Excepciones:
275 275 Si el archivo no tiene el formato adecuado
276 276 """
277 277
278 278 basename = os.path.basename(filename)
279 279
280 280 if not isRadarFile(basename):
281 281 print("The filename %s has not the rigth format" % filename)
282 282 return 0
283 283
284 284 if startDate and endDate:
285 285 thisDate = getDateFromRadarFile(basename)
286 286
287 287 if thisDate < startDate:
288 288 return 0
289 289
290 290 if thisDate > endDate:
291 291 return 0
292 292
293 293 return 1
294 294
295 295
296 296 def getFileFromSet(path, ext, set):
297 297 validFilelist = []
298 298 fileList = os.listdir(path)
299 299
300 300 # 0 1234 567 89A BCDE
301 301 # H YYYY DDD SSS .ext
302 302
303 303 for thisFile in fileList:
304 304 try:
305 305 year = int(thisFile[1:5])
306 306 doy = int(thisFile[5:8])
307 307 except:
308 308 continue
309 309
310 310 if (os.path.splitext(thisFile)[-1].lower() != ext.lower()):
311 311 continue
312 312
313 313 validFilelist.append(thisFile)
314 314
315 315 myfile = fnmatch.filter(
316 316 validFilelist, '*%4.4d%3.3d%3.3d*' % (year, doy, set))
317 317
318 318 if len(myfile) != 0:
319 319 return myfile[0]
320 320 else:
321 321 filename = '*%4.4d%3.3d%3.3d%s' % (year, doy, set, ext.lower())
322 322 print('the filename %s does not exist' % filename)
323 323 print('...going to the last file: ')
324 324
325 325 if validFilelist:
326 326 validFilelist = sorted(validFilelist, key=str.lower)
327 327 return validFilelist[-1]
328 328
329 329 return None
330 330
331 331
332 332 def getlastFileFromPath(path, ext):
333 333 """
334 334 Depura el fileList dejando solo los que cumplan el formato de "PYYYYDDDSSS.ext"
335 335 al final de la depuracion devuelve el ultimo file de la lista que quedo.
336 336
337 337 Input:
338 338 fileList : lista conteniendo todos los files (sin path) que componen una determinada carpeta
339 339 ext : extension de los files contenidos en una carpeta
340 340
341 341 Return:
342 342 El ultimo file de una determinada carpeta, no se considera el path.
343 343 """
344 344 validFilelist = []
345 345 fileList = os.listdir(path)
346 346
347 347 # 0 1234 567 89A BCDE
348 348 # H YYYY DDD SSS .ext
349 349
350 350 for thisFile in fileList:
351 351
352 352 year = thisFile[1:5]
353 353 if not isNumber(year):
354 354 continue
355 355
356 356 doy = thisFile[5:8]
357 357 if not isNumber(doy):
358 358 continue
359 359
360 360 year = int(year)
361 361 doy = int(doy)
362 362
363 363 if (os.path.splitext(thisFile)[-1].lower() != ext.lower()):
364 364 continue
365 365
366 366 validFilelist.append(thisFile)
367 367
368 368 if validFilelist:
369 369 validFilelist = sorted(validFilelist, key=str.lower)
370 370 return validFilelist[-1]
371 371
372 372 return None
373 373
374 374
375 375 def isRadarFolder(folder):
376 376 try:
377 377 year = int(folder[1:5])
378 378 doy = int(folder[5:8])
379 379 except:
380 380 return 0
381 381
382 382 return 1
383 383
384 384
385 385 def isRadarFile(file):
386 386 try:
387 387 year = int(file[1:5])
388 388 doy = int(file[5:8])
389 389 set = int(file[8:11])
390 390 except:
391 391 return 0
392 392
393 393 return 1
394 394
395 395
396 396 def getDateFromRadarFile(file):
397 397 try:
398 398 year = int(file[1:5])
399 399 doy = int(file[5:8])
400 400 set = int(file[8:11])
401 401 except:
402 402 return None
403 403
404 404 thisDate = datetime.date(year, 1, 1) + datetime.timedelta(doy - 1)
405 405 return thisDate
406 406
407 407
408 408 def getDateFromRadarFolder(folder):
409 409 try:
410 410 year = int(folder[1:5])
411 411 doy = int(folder[5:8])
412 412 except:
413 413 return None
414 414
415 415 thisDate = datetime.date(year, 1, 1) + datetime.timedelta(doy - 1)
416 416 return thisDate
417 417
418 418 def parse_format(s, fmt):
419 419
420 420 for i in range(fmt.count('%')):
421 421 x = fmt.index('%')
422 422 d = DT_DIRECTIVES[fmt[x:x+2]]
423 423 fmt = fmt.replace(fmt[x:x+2], s[x:x+d])
424 424 return fmt
425 425
426 426 class Reader(object):
427 427
428 428 c = 3E8
429 429 isConfig = False
430 430 dtype = None
431 431 pathList = []
432 432 filenameList = []
433 433 datetimeList = []
434 434 filename = None
435 435 ext = None
436 436 flagIsNewFile = 1
437 437 flagDiscontinuousBlock = 0
438 438 flagIsNewBlock = 0
439 439 flagNoMoreFiles = 0
440 440 fp = None
441 441 firstHeaderSize = 0
442 442 basicHeaderSize = 24
443 443 versionFile = 1103
444 444 fileSize = None
445 445 fileSizeByHeader = None
446 446 fileIndex = -1
447 447 profileIndex = None
448 448 blockIndex = 0
449 449 nTotalBlocks = 0
450 450 maxTimeStep = 30
451 451 lastUTTime = None
452 452 datablock = None
453 453 dataOut = None
454 454 getByBlock = False
455 455 path = None
456 456 startDate = None
457 457 endDate = None
458 458 startTime = datetime.time(0, 0, 0)
459 459 endTime = datetime.time(23, 59, 59)
460 460 set = None
461 461 expLabel = ""
462 462 online = False
463 463 delay = 60
464 464 nTries = 3 # quantity tries
465 465 nFiles = 3 # number of files for searching
466 466 walk = True
467 467 getblock = False
468 468 nTxs = 1
469 469 realtime = False
470 470 blocksize = 0
471 471 blocktime = None
472 472 warnings = True
473 473 verbose = True
474 474 server = None
475 475 format = None
476 476 oneDDict = None
477 477 twoDDict = None
478 478 independentParam = None
479 479 filefmt = None
480 480 folderfmt = None
481 481 open_file = open
482 482 open_mode = 'rb'
483 483
484 484 def run(self):
485 485
486 486 raise NotImplementedError
487 487
488 488 def getAllowedArgs(self):
489 489 if hasattr(self, '__attrs__'):
490 490 return self.__attrs__
491 491 else:
492 492 return inspect.getargspec(self.run).args
493 493
494 494 def set_kwargs(self, **kwargs):
495 495
496 496 for key, value in kwargs.items():
497 497 setattr(self, key, value)
498 498
499 499 def find_folders(self, path, startDate, endDate, folderfmt, last=False):
500 500
501 501 folders = [x for f in path.split(',')
502 502 for x in os.listdir(f) if os.path.isdir(os.path.join(f, x))]
503 503 folders.sort()
504 504
505 505 if last:
506 506 folders = [folders[-1]]
507 507
508 508 for folder in folders:
509 509 try:
510 510 dt = datetime.datetime.strptime(parse_format(folder, folderfmt), folderfmt).date()
511 511 if dt >= startDate and dt <= endDate:
512 512 yield os.path.join(path, folder)
513 513 else:
514 514 log.log('Skiping folder {}'.format(folder), self.name)
515 515 except Exception as e:
516 516 log.log('Skiping folder {}'.format(folder), self.name)
517 517 continue
518 518 return
519 519
520 520 def find_files(self, folders, ext, filefmt, startDate=None, endDate=None,
521 521 expLabel='', last=False):
522 522
523 523 for path in folders:
524 524 files = glob.glob1(path, '*{}'.format(ext))
525 525 files.sort()
526 526 if last:
527 527 if files:
528 528 fo = files[-1]
529 529 try:
530 530 dt = datetime.datetime.strptime(parse_format(fo, filefmt), filefmt).date()
531 531 yield os.path.join(path, expLabel, fo)
532 532 except Exception as e:
533 533 pass
534 534 return
535 535 else:
536 536 return
537 537
538 538 for fo in files:
539 539 try:
540 540 dt = datetime.datetime.strptime(parse_format(fo, filefmt), filefmt).date()
541 541 if dt >= startDate and dt <= endDate:
542 542 yield os.path.join(path, expLabel, fo)
543 543 else:
544 544 log.log('Skiping file {}'.format(fo), self.name)
545 545 except Exception as e:
546 546 log.log('Skiping file {}'.format(fo), self.name)
547 547 continue
548 548
549 549 def searchFilesOffLine(self, path, startDate, endDate,
550 550 expLabel, ext, walk,
551 551 filefmt, folderfmt):
552 552 """Search files in offline mode for the given arguments
553 553
554 554 Return:
555 555 Generator of files
556 556 """
557 557
558 558 if walk:
559 559 folders = self.find_folders(
560 560 path, startDate, endDate, folderfmt)
561 561 else:
562 562 folders = path.split(',')
563 563
564 564 return self.find_files(
565 565 folders, ext, filefmt, startDate, endDate, expLabel)
566 566
567 567 def searchFilesOnLine(self, path, startDate, endDate,
568 568 expLabel, ext, walk,
569 569 filefmt, folderfmt):
570 570 """Search for the last file of the last folder
571 571
572 572 Arguments:
573 573 path : carpeta donde estan contenidos los files que contiene data
574 574 expLabel : Nombre del subexperimento (subfolder)
575 575 ext : extension de los files
576 576 walk : Si es habilitado no realiza busquedas dentro de los ubdirectorios (doypath)
577 577
578 578 Return:
579 579 generator with the full path of last filename
580 580 """
581 581
582 582 if walk:
583 583 folders = self.find_folders(
584 584 path, startDate, endDate, folderfmt, last=True)
585 585 else:
586 586 folders = path.split(',')
587 587
588 588 return self.find_files(
589 589 folders, ext, filefmt, startDate, endDate, expLabel, last=True)
590 590
591 591 def setNextFile(self):
592 592 """Set the next file to be readed open it and parse de file header"""
593 593
594 if self.fp != None:
595 self.fp.close()
594 while True:
595 if self.fp != None:
596 self.fp.close()
596 597
597 if self.online:
598 newFile = self.setNextFileOnline()
599 else:
600 newFile = self.setNextFileOffline()
601
602 if not(newFile):
603 598 if self.online:
604 raise schainpy.admin.SchainError('Time to wait for new files reach')
599 newFile = self.setNextFileOnline()
605 600 else:
606 if self.fileIndex == -1:
607 raise schainpy.admin.SchainWarning('No files found in the given path')
601 newFile = self.setNextFileOffline()
602
603 if not(newFile):
604 if self.online:
605 raise schainpy.admin.SchainError('Time to wait for new files reach')
608 606 else:
609 raise schainpy.admin.SchainWarning('No more files to read')
610
611 if not(self.verifyFile(self.filename)):
612 self.setNextFile()
607 if self.fileIndex == -1:
608 raise schainpy.admin.SchainWarning('No files found in the given path')
609 else:
610 raise schainpy.admin.SchainWarning('No more files to read')
611
612 if self.verifyFile(self.filename):
613 break
613 614
614 615 log.log('Opening file: %s' % self.filename, self.name)
615 616
616 617 self.readFirstHeader()
617 618 self.nReadBlocks = 0
618 619
619 620 def setNextFileOnline(self):
620 621 """Check for the next file to be readed in online mode.
621 622
622 623 Set:
623 624 self.filename
624 625 self.fp
625 626 self.filesize
626 627
627 628 Return:
628 629 boolean
629 630
630 631 """
631 632 nextFile = True
632 633 nextDay = False
633 634
634 635 for nFiles in range(self.nFiles+1):
635 636 for nTries in range(self.nTries):
636 637 fullfilename, filename = self.checkForRealPath(nextFile, nextDay)
637 638 if fullfilename is not None:
638 639 break
639 640 log.warning(
640 641 "Waiting %0.2f sec for the next file: \"%s\" , try %02d ..." % (self.delay, filename, nTries + 1),
641 642 self.name)
642 643 time.sleep(self.delay)
643 644 nextFile = False
644 645 continue
645 646
646 647 if fullfilename:
647 648 break
648 649
649 650 self.nTries = 1
650 651 nextFile = True
651 652
652 653 if nFiles == (self.nFiles - 1):
653 654 log.log('Trying with next day...', self.name)
654 655 nextDay = True
655 656
656 657 if fullfilename:
657 658 self.fileSize = os.path.getsize(fullfilename)
658 659 self.filename = fullfilename
659 660 self.flagIsNewFile = 1
660 661 if self.fp != None:
661 662 self.fp.close()
662 663 self.fp = self.open_file(fullfilename, self.open_mode)
663 664 self.flagNoMoreFiles = 0
664 665 self.fileIndex += 1
665 666 return 1
666 667 else:
667 668 return 0
668 669
669 670 def setNextFileOffline(self):
670 671 """Open the next file to be readed in offline mode"""
671 672
672 673 try:
673 674 filename = next(self.filenameList)
674 675 self.fileIndex +=1
675 676 except StopIteration:
676 677 self.flagNoMoreFiles = 1
677 678 return 0
678 679
679 680 self.filename = filename
680 681 self.fileSize = os.path.getsize(filename)
681 682 self.fp = self.open_file(filename, self.open_mode)
682 683 self.flagIsNewFile = 1
683 684
684 685 return 1
685 686
687 @staticmethod
688 def isDateTimeInRange(dt, startDate, endDate, startTime, endTime):
689 """Check if the given datetime is in range"""
690
691 if startDate <= dt.date() <= endDate:
692 if startTime <= dt.time() <= endTime:
693 return True
694 return False
695
686 696 def verifyFile(self, filename):
687 697 """Check for a valid file
688 698
689 699 Arguments:
690 700 filename -- full path filename
691 701
692 702 Return:
693 703 boolean
694 704 """
695 705
696 706 return True
697 707
698 708 def checkForRealPath(self, nextFile, nextDay):
699 709 """Check if the next file to be readed exists"""
700 710
701 711 raise NotImplementedError
702 712
703 713 def readFirstHeader(self):
704 714 """Parse the file header"""
705 715
706 716 pass
707 717
708 718 class JRODataReader(Reader):
709 719
710 720 utc = 0
711 721 nReadBlocks = 0
712 722 foldercounter = 0
713 723 firstHeaderSize = 0
714 724 basicHeaderSize = 24
715 725 __isFirstTimeOnline = 1
716 726 __printInfo = True
717 727 filefmt = "*%Y%j***"
718 728 folderfmt = "*%Y%j"
719 729
720 730 def getDtypeWidth(self):
721 731
722 732 dtype_index = get_dtype_index(self.dtype)
723 733 dtype_width = get_dtype_width(dtype_index)
724 734
725 735 return dtype_width
726 736
727 737 def checkForRealPath(self, nextFile, nextDay):
728 738 """Check if the next file to be readed exists.
729 739
730 740 Example :
731 741 nombre correcto del file es .../.../D2009307/P2009307367.ext
732 742
733 743 Entonces la funcion prueba con las siguientes combinaciones
734 744 .../.../y2009307367.ext
735 745 .../.../Y2009307367.ext
736 746 .../.../x2009307/y2009307367.ext
737 747 .../.../x2009307/Y2009307367.ext
738 748 .../.../X2009307/y2009307367.ext
739 749 .../.../X2009307/Y2009307367.ext
740 750 siendo para este caso, la ultima combinacion de letras, identica al file buscado
741 751
742 752 Return:
743 753 str -- fullpath of the file
744 754 """
745 755
746 756
747 757 if nextFile:
748 758 self.set += 1
749 759 if nextDay:
750 760 self.set = 0
751 761 self.doy += 1
752 762 foldercounter = 0
753 763 prefixDirList = [None, 'd', 'D']
754 764 if self.ext.lower() == ".r": # voltage
755 765 prefixFileList = ['d', 'D']
756 766 elif self.ext.lower() == ".pdata": # spectra
757 767 prefixFileList = ['p', 'P']
758 768
759 769 # barrido por las combinaciones posibles
760 770 for prefixDir in prefixDirList:
761 771 thispath = self.path
762 772 if prefixDir != None:
763 773 # formo el nombre del directorio xYYYYDDD (x=d o x=D)
764 774 if foldercounter == 0:
765 775 thispath = os.path.join(self.path, "%s%04d%03d" %
766 776 (prefixDir, self.year, self.doy))
767 777 else:
768 778 thispath = os.path.join(self.path, "%s%04d%03d_%02d" % (
769 779 prefixDir, self.year, self.doy, foldercounter))
770 780 for prefixFile in prefixFileList: # barrido por las dos combinaciones posibles de "D"
771 781 # formo el nombre del file xYYYYDDDSSS.ext
772 782 filename = "%s%04d%03d%03d%s" % (prefixFile, self.year, self.doy, self.set, self.ext)
773 783 fullfilename = os.path.join(
774 784 thispath, filename)
775 785
776 786 if os.path.exists(fullfilename):
777 787 return fullfilename, filename
778 788
779 789 return None, filename
780 790
781 791 def __waitNewBlock(self):
782 792 """
783 793 Return 1 si se encontro un nuevo bloque de datos, 0 de otra forma.
784 794
785 795 Si el modo de lectura es OffLine siempre retorn 0
786 796 """
787 797 if not self.online:
788 798 return 0
789 799
790 800 if (self.nReadBlocks >= self.processingHeaderObj.dataBlocksPerFile):
791 801 return 0
792 802
793 803 currentPointer = self.fp.tell()
794 804
795 805 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
796 806
797 807 for nTries in range(self.nTries):
798 808
799 809 self.fp.close()
800 810 self.fp = open(self.filename, 'rb')
801 811 self.fp.seek(currentPointer)
802 812
803 813 self.fileSize = os.path.getsize(self.filename)
804 814 currentSize = self.fileSize - currentPointer
805 815
806 816 if (currentSize >= neededSize):
807 817 self.basicHeaderObj.read(self.fp)
808 818 return 1
809 819
810 820 if self.fileSize == self.fileSizeByHeader:
811 821 # self.flagEoF = True
812 822 return 0
813 823
814 824 print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1))
815 825 time.sleep(self.delay)
816 826
817 827 return 0
818 828
819 829 def waitDataBlock(self, pointer_location, blocksize=None):
820 830
821 831 currentPointer = pointer_location
822 832 if blocksize is None:
823 833 neededSize = self.processingHeaderObj.blockSize # + self.basicHeaderSize
824 834 else:
825 835 neededSize = blocksize
826 836
827 837 for nTries in range(self.nTries):
828 838 self.fp.close()
829 839 self.fp = open(self.filename, 'rb')
830 840 self.fp.seek(currentPointer)
831 841
832 842 self.fileSize = os.path.getsize(self.filename)
833 843 currentSize = self.fileSize - currentPointer
834 844
835 845 if (currentSize >= neededSize):
836 846 return 1
837 847
838 848 log.warning(
839 849 "Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1),
840 850 self.name
841 851 )
842 852 time.sleep(self.delay)
843 853
844 854 return 0
845 855
846 856 def __setNewBlock(self):
847 857
848 858 if self.fp == None:
849 859 return 0
850 860
851 861 if self.flagIsNewFile:
852 862 self.lastUTTime = self.basicHeaderObj.utc
853 863 return 1
854 864
855 865 if self.realtime:
856 866 self.flagDiscontinuousBlock = 1
857 867 if not(self.setNextFile()):
858 868 return 0
859 869 else:
860 870 return 1
861 871
862 872 currentSize = self.fileSize - self.fp.tell()
863 873 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
864 874
865 875 if (currentSize >= neededSize):
866 876 self.basicHeaderObj.read(self.fp)
867 877 self.lastUTTime = self.basicHeaderObj.utc
868 878 return 1
869 879
870 880 if self.__waitNewBlock():
871 881 self.lastUTTime = self.basicHeaderObj.utc
872 882 return 1
873 883
874 884 if not(self.setNextFile()):
875 885 return 0
876 886
877 887 deltaTime = self.basicHeaderObj.utc - self.lastUTTime
878 888 self.lastUTTime = self.basicHeaderObj.utc
879 889
880 890 self.flagDiscontinuousBlock = 0
881 891
882 892 if deltaTime > self.maxTimeStep:
883 893 self.flagDiscontinuousBlock = 1
884 894
885 895 return 1
886 896
887 897 def readNextBlock(self):
888 898
889 899 while True:
890 900 self.__setNewBlock()
891 901
892 902 if not(self.readBlock()):
893 903 return 0
894 904
895 905 self.getBasicHeader()
896 if (self.dataOut.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or (self.dataOut.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
906
907 if not self.isDateTimeInRange(self.dataOut.datatime, self.startDate, self.endDate, self.startTime, self.endTime):
897 908 print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.nReadBlocks,
898 909 self.processingHeaderObj.dataBlocksPerFile,
899 910 self.dataOut.datatime.ctime()))
900 911 continue
901 912
902 913 break
903 914
904 915 if self.verbose:
905 916 print("[Reading] Block No. %d/%d -> %s" % (self.nReadBlocks,
906 917 self.processingHeaderObj.dataBlocksPerFile,
907 918 self.dataOut.datatime.ctime()))
908 919 return 1
909 920
910 921 def readFirstHeader(self):
911 922
912 923 self.basicHeaderObj.read(self.fp)
913 924 self.systemHeaderObj.read(self.fp)
914 925 self.radarControllerHeaderObj.read(self.fp)
915 926 self.processingHeaderObj.read(self.fp)
916 927 self.firstHeaderSize = self.basicHeaderObj.size
917 928
918 929 datatype = int(numpy.log2((self.processingHeaderObj.processFlags &
919 930 PROCFLAG.DATATYPE_MASK)) - numpy.log2(PROCFLAG.DATATYPE_CHAR))
920 931 if datatype == 0:
921 932 datatype_str = numpy.dtype([('real', '<i1'), ('imag', '<i1')])
922 933 elif datatype == 1:
923 934 datatype_str = numpy.dtype([('real', '<i2'), ('imag', '<i2')])
924 935 elif datatype == 2:
925 936 datatype_str = numpy.dtype([('real', '<i4'), ('imag', '<i4')])
926 937 elif datatype == 3:
927 938 datatype_str = numpy.dtype([('real', '<i8'), ('imag', '<i8')])
928 939 elif datatype == 4:
929 940 datatype_str = numpy.dtype([('real', '<f4'), ('imag', '<f4')])
930 941 elif datatype == 5:
931 942 datatype_str = numpy.dtype([('real', '<f8'), ('imag', '<f8')])
932 943 else:
933 944 raise ValueError('Data type was not defined')
934 945
935 946 self.dtype = datatype_str
936 947 #self.ippSeconds = 2 * 1000 * self.radarControllerHeaderObj.ipp / self.c
937 948 self.fileSizeByHeader = self.processingHeaderObj.dataBlocksPerFile * self.processingHeaderObj.blockSize + \
938 949 self.firstHeaderSize + self.basicHeaderSize * \
939 950 (self.processingHeaderObj.dataBlocksPerFile - 1)
940 951 # self.dataOut.channelList = numpy.arange(self.systemHeaderObj.numChannels)
941 952 # self.dataOut.channelIndexList = numpy.arange(self.systemHeaderObj.numChannels)
942 953 self.getBlockDimension()
943 954
944 955 def verifyFile(self, filename, msgFlag=True):
945 956
946 957 msg = None
947 958
948 959 try:
949 960 fp = open(filename, 'rb')
950 961 except IOError:
951 962
952 963 if msgFlag:
953 964 print("[Reading] File %s can't be opened" % (filename))
954 965
955 966 return False
956
957 currentPosition = fp.tell()
958 neededSize = self.processingHeaderObj.blockSize + self.firstHeaderSize
959
960 if neededSize == 0:
967
968 if self.waitDataBlock(0):
961 969 basicHeaderObj = BasicHeader(LOCALTIME)
962 970 systemHeaderObj = SystemHeader()
963 971 radarControllerHeaderObj = RadarControllerHeader()
964 972 processingHeaderObj = ProcessingHeader()
965 973
966 974 if not(basicHeaderObj.read(fp)):
967 975 fp.close()
968 976 return False
969 977
970 978 if not(systemHeaderObj.read(fp)):
971 979 fp.close()
972 980 return False
973 981
974 982 if not(radarControllerHeaderObj.read(fp)):
975 983 fp.close()
976 984 return False
977 985
978 986 if not(processingHeaderObj.read(fp)):
979 987 fp.close()
980 return False
981
982 neededSize = processingHeaderObj.blockSize + basicHeaderObj.size
983 else:
984 msg = "[Reading] Skipping the file %s due to it hasn't enough data" % filename
988 return False
989
990 if not self.online:
991 dt1 = basicHeaderObj.datatime
992 fp.seek(self.fileSize-processingHeaderObj.blockSize-24)
993 if not(basicHeaderObj.read(fp)):
994 fp.close()
995 return False
996 dt2 = basicHeaderObj.datatime
997 if not self.isDateTimeInRange(dt1, self.startDate, self.endDate, self.startTime, self.endTime) and not \
998 self.isDateTimeInRange(dt2, self.startDate, self.endDate, self.startTime, self.endTime):
999 return False
985 1000
986 1001 fp.close()
987
988 fileSize = os.path.getsize(filename)
989 currentSize = fileSize - currentPosition
990
991 if currentSize < neededSize:
992 if msgFlag and (msg != None):
993 print(msg)
994 return False
995
1002
996 1003 return True
997 1004
998 1005 def findDatafiles(self, path, startDate=None, endDate=None, expLabel='', ext='.r', walk=True, include_path=False):
999 1006
1000 1007 path_empty = True
1001 1008
1002 1009 dateList = []
1003 1010 pathList = []
1004 1011
1005 1012 multi_path = path.split(',')
1006 1013
1007 1014 if not walk:
1008 1015
1009 1016 for single_path in multi_path:
1010 1017
1011 1018 if not os.path.isdir(single_path):
1012 1019 continue
1013 1020
1014 1021 fileList = glob.glob1(single_path, "*" + ext)
1015 1022
1016 1023 if not fileList:
1017 1024 continue
1018 1025
1019 1026 path_empty = False
1020 1027
1021 1028 fileList.sort()
1022 1029
1023 1030 for thisFile in fileList:
1024 1031
1025 1032 if not os.path.isfile(os.path.join(single_path, thisFile)):
1026 1033 continue
1027 1034
1028 1035 if not isRadarFile(thisFile):
1029 1036 continue
1030 1037
1031 1038 if not isFileInDateRange(thisFile, startDate, endDate):
1032 1039 continue
1033 1040
1034 1041 thisDate = getDateFromRadarFile(thisFile)
1035 1042
1036 1043 if thisDate in dateList or single_path in pathList:
1037 1044 continue
1038 1045
1039 1046 dateList.append(thisDate)
1040 1047 pathList.append(single_path)
1041 1048
1042 1049 else:
1043 1050 for single_path in multi_path:
1044 1051
1045 1052 if not os.path.isdir(single_path):
1046 1053 continue
1047 1054
1048 1055 dirList = []
1049 1056
1050 1057 for thisPath in os.listdir(single_path):
1051 1058
1052 1059 if not os.path.isdir(os.path.join(single_path, thisPath)):
1053 1060 continue
1054 1061
1055 1062 if not isRadarFolder(thisPath):
1056 1063 continue
1057 1064
1058 1065 if not isFolderInDateRange(thisPath, startDate, endDate):
1059 1066 continue
1060 1067
1061 1068 dirList.append(thisPath)
1062 1069
1063 1070 if not dirList:
1064 1071 continue
1065 1072
1066 1073 dirList.sort()
1067 1074
1068 1075 for thisDir in dirList:
1069 1076
1070 1077 datapath = os.path.join(single_path, thisDir, expLabel)
1071 1078 fileList = glob.glob1(datapath, "*" + ext)
1072 1079
1073 1080 if not fileList:
1074 1081 continue
1075 1082
1076 1083 path_empty = False
1077 1084
1078 1085 thisDate = getDateFromRadarFolder(thisDir)
1079 1086
1080 1087 pathList.append(datapath)
1081 1088 dateList.append(thisDate)
1082 1089
1083 1090 dateList.sort()
1084 1091
1085 1092 if walk:
1086 1093 pattern_path = os.path.join(multi_path[0], "[dYYYYDDD]", expLabel)
1087 1094 else:
1088 1095 pattern_path = multi_path[0]
1089 1096
1090 1097 if path_empty:
1091 1098 raise schainpy.admin.SchainError("[Reading] No *%s files in %s for %s to %s" % (ext, pattern_path, startDate, endDate))
1092 1099 else:
1093 1100 if not dateList:
1094 1101 raise schainpy.admin.SchainError("[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" % (startDate, endDate, ext, path))
1095 1102
1096 1103 if include_path:
1097 1104 return dateList, pathList
1098 1105
1099 1106 return dateList
1100 1107
1101 1108 def setup(self, **kwargs):
1102 1109
1103 1110 self.set_kwargs(**kwargs)
1104 1111 if not self.ext.startswith('.'):
1105 1112 self.ext = '.{}'.format(self.ext)
1106 1113
1107 1114 if self.server is not None:
1108 1115 if 'tcp://' in self.server:
1109 1116 address = server
1110 1117 else:
1111 1118 address = 'ipc:///tmp/%s' % self.server
1112 1119 self.server = address
1113 1120 self.context = zmq.Context()
1114 1121 self.receiver = self.context.socket(zmq.PULL)
1115 1122 self.receiver.connect(self.server)
1116 1123 time.sleep(0.5)
1117 1124 print('[Starting] ReceiverData from {}'.format(self.server))
1118 1125 else:
1119 1126 self.server = None
1120 1127 if self.path == None:
1121 1128 raise ValueError("[Reading] The path is not valid")
1122 1129
1123 1130 if self.online:
1124 1131 log.log("[Reading] Searching files in online mode...", self.name)
1125 1132
1126 1133 for nTries in range(self.nTries):
1127 1134 fullpath = self.searchFilesOnLine(self.path, self.startDate,
1128 1135 self.endDate, self.expLabel, self.ext, self.walk,
1129 1136 self.filefmt, self.folderfmt)
1130 1137
1131 1138 try:
1132 1139 fullpath = next(fullpath)
1133 1140 except:
1134 1141 fullpath = None
1135 1142
1136 1143 if fullpath:
1137 1144 break
1138 1145
1139 1146 log.warning(
1140 1147 'Waiting {} sec for a valid file in {}: try {} ...'.format(
1141 1148 self.delay, self.path, nTries + 1),
1142 1149 self.name)
1143 1150 time.sleep(self.delay)
1144 1151
1145 1152 if not(fullpath):
1146 1153 raise schainpy.admin.SchainError(
1147 1154 'There isn\'t any valid file in {}'.format(self.path))
1148 1155
1149 1156 pathname, filename = os.path.split(fullpath)
1150 1157 self.year = int(filename[1:5])
1151 1158 self.doy = int(filename[5:8])
1152 1159 self.set = int(filename[8:11]) - 1
1153 1160 else:
1154 1161 log.log("Searching files in {}".format(self.path), self.name)
1155 1162 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
1156 1163 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
1157 1164
1158 1165 self.setNextFile()
1159 1166
1160 1167 return
1161 1168
1162 1169 def getBasicHeader(self):
1163 1170
1164 1171 self.dataOut.utctime = self.basicHeaderObj.utc + self.basicHeaderObj.miliSecond / \
1165 1172 1000. + self.profileIndex * self.radarControllerHeaderObj.ippSeconds
1166 1173
1167 1174 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
1168 1175
1169 1176 self.dataOut.timeZone = self.basicHeaderObj.timeZone
1170 1177
1171 1178 self.dataOut.dstFlag = self.basicHeaderObj.dstFlag
1172 1179
1173 1180 self.dataOut.errorCount = self.basicHeaderObj.errorCount
1174 1181
1175 1182 self.dataOut.useLocalTime = self.basicHeaderObj.useLocalTime
1176 1183
1177 1184 self.dataOut.ippSeconds = self.radarControllerHeaderObj.ippSeconds / self.nTxs
1178 1185
1179 1186 # self.dataOut.nProfiles = self.processingHeaderObj.profilesPerBlock*self.nTxs
1180 1187
1181 1188 def getFirstHeader(self):
1182 1189
1183 1190 raise NotImplementedError
1184 1191
1185 1192 def getData(self):
1186 1193
1187 1194 raise NotImplementedError
1188 1195
1189 1196 def hasNotDataInBuffer(self):
1190 1197
1191 1198 raise NotImplementedError
1192 1199
1193 1200 def readBlock(self):
1194 1201
1195 1202 raise NotImplementedError
1196 1203
1197 1204 def isEndProcess(self):
1198 1205
1199 1206 return self.flagNoMoreFiles
1200 1207
1201 1208 def printReadBlocks(self):
1202 1209
1203 1210 print("[Reading] Number of read blocks per file %04d" % self.nReadBlocks)
1204 1211
1205 1212 def printTotalBlocks(self):
1206 1213
1207 1214 print("[Reading] Number of read blocks %04d" % self.nTotalBlocks)
1208 1215
1209 1216 def printNumberOfBlock(self):
1210 1217 'SPAM!'
1211 1218
1212 1219 # if self.flagIsNewBlock:
1213 1220 # print "[Reading] Block No. %d/%d -> %s" %(self.nReadBlocks,
1214 1221 # self.processingHeaderObj.dataBlocksPerFile,
1215 1222 # self.dataOut.datatime.ctime())
1216 1223
1217 1224 def printInfo(self):
1218 1225
1219 1226 if self.__printInfo == False:
1220 1227 return
1221 1228
1222 1229 self.basicHeaderObj.printInfo()
1223 1230 self.systemHeaderObj.printInfo()
1224 1231 self.radarControllerHeaderObj.printInfo()
1225 1232 self.processingHeaderObj.printInfo()
1226 1233
1227 1234 self.__printInfo = False
1228 1235
1229 1236 def run(self, **kwargs):
1230 1237 """
1231 1238
1232 1239 Arguments:
1233 1240 path :
1234 1241 startDate :
1235 1242 endDate :
1236 1243 startTime :
1237 1244 endTime :
1238 1245 set :
1239 1246 expLabel :
1240 1247 ext :
1241 1248 online :
1242 1249 delay :
1243 1250 walk :
1244 1251 getblock :
1245 1252 nTxs :
1246 1253 realtime :
1247 1254 blocksize :
1248 1255 blocktime :
1249 1256 skip :
1250 1257 cursor :
1251 1258 warnings :
1252 1259 server :
1253 1260 verbose :
1254 1261 format :
1255 1262 oneDDict :
1256 1263 twoDDict :
1257 1264 independentParam :
1258 1265 """
1259 1266
1260 1267 if not(self.isConfig):
1261 1268 self.setup(**kwargs)
1262 1269 self.isConfig = True
1263 1270 if self.server is None:
1264 1271 self.getData()
1265 1272 else:
1266 1273 self.getFromServer()
1267 1274
1268 1275
1269 1276 class JRODataWriter(Reader):
1270 1277
1271 1278 """
1272 1279 Esta clase permite escribir datos a archivos procesados (.r o ,pdata). La escritura
1273 1280 de los datos siempre se realiza por bloques.
1274 1281 """
1275 1282
1276 1283 setFile = None
1277 1284 profilesPerBlock = None
1278 1285 blocksPerFile = None
1279 1286 nWriteBlocks = 0
1280 1287 fileDate = None
1281 1288
1282 1289 def __init__(self, dataOut=None):
1283 1290 raise NotImplementedError
1284 1291
1285 1292 def hasAllDataInBuffer(self):
1286 1293 raise NotImplementedError
1287 1294
1288 1295 def setBlockDimension(self):
1289 1296 raise NotImplementedError
1290 1297
1291 1298 def writeBlock(self):
1292 1299 raise NotImplementedError
1293 1300
1294 1301 def putData(self):
1295 1302 raise NotImplementedError
1296 1303
1297 1304 def getDtypeWidth(self):
1298 1305
1299 1306 dtype_index = get_dtype_index(self.dtype)
1300 1307 dtype_width = get_dtype_width(dtype_index)
1301 1308
1302 1309 return dtype_width
1303 1310
1304 1311 def getProcessFlags(self):
1305 1312
1306 1313 processFlags = 0
1307 1314
1308 1315 dtype_index = get_dtype_index(self.dtype)
1309 1316 procflag_dtype = get_procflag_dtype(dtype_index)
1310 1317
1311 1318 processFlags += procflag_dtype
1312 1319
1313 1320 if self.dataOut.flagDecodeData:
1314 1321 processFlags += PROCFLAG.DECODE_DATA
1315 1322
1316 1323 if self.dataOut.flagDeflipData:
1317 1324 processFlags += PROCFLAG.DEFLIP_DATA
1318 1325
1319 1326 if self.dataOut.code is not None:
1320 1327 processFlags += PROCFLAG.DEFINE_PROCESS_CODE
1321 1328
1322 1329 if self.dataOut.nCohInt > 1:
1323 1330 processFlags += PROCFLAG.COHERENT_INTEGRATION
1324 1331
1325 1332 if self.dataOut.type == "Spectra":
1326 1333 if self.dataOut.nIncohInt > 1:
1327 1334 processFlags += PROCFLAG.INCOHERENT_INTEGRATION
1328 1335
1329 1336 if self.dataOut.data_dc is not None:
1330 1337 processFlags += PROCFLAG.SAVE_CHANNELS_DC
1331 1338
1332 1339 if self.dataOut.flagShiftFFT:
1333 1340 processFlags += PROCFLAG.SHIFT_FFT_DATA
1334 1341
1335 1342 return processFlags
1336 1343
1337 1344 def setBasicHeader(self):
1338 1345
1339 1346 self.basicHeaderObj.size = self.basicHeaderSize # bytes
1340 1347 self.basicHeaderObj.version = self.versionFile
1341 1348 self.basicHeaderObj.dataBlock = self.nTotalBlocks
1342 1349 utc = numpy.floor(self.dataOut.utctime)
1343 1350 milisecond = (self.dataOut.utctime - utc) * 1000.0
1344 1351 self.basicHeaderObj.utc = utc
1345 1352 self.basicHeaderObj.miliSecond = milisecond
1346 1353 self.basicHeaderObj.timeZone = self.dataOut.timeZone
1347 1354 self.basicHeaderObj.dstFlag = self.dataOut.dstFlag
1348 1355 self.basicHeaderObj.errorCount = self.dataOut.errorCount
1349 1356
1350 1357 def setFirstHeader(self):
1351 1358 """
1352 1359 Obtiene una copia del First Header
1353 1360
1354 1361 Affected:
1355 1362
1356 1363 self.basicHeaderObj
1357 1364 self.systemHeaderObj
1358 1365 self.radarControllerHeaderObj
1359 1366 self.processingHeaderObj self.
1360 1367
1361 1368 Return:
1362 1369 None
1363 1370 """
1364 1371
1365 1372 raise NotImplementedError
1366 1373
1367 1374 def __writeFirstHeader(self):
1368 1375 """
1369 1376 Escribe el primer header del file es decir el Basic header y el Long header (SystemHeader, RadarControllerHeader, ProcessingHeader)
1370 1377
1371 1378 Affected:
1372 1379 __dataType
1373 1380
1374 1381 Return:
1375 1382 None
1376 1383 """
1377 1384
1378 1385 # CALCULAR PARAMETROS
1379 1386
1380 1387 sizeLongHeader = self.systemHeaderObj.size + \
1381 1388 self.radarControllerHeaderObj.size + self.processingHeaderObj.size
1382 1389 self.basicHeaderObj.size = self.basicHeaderSize + sizeLongHeader
1383 1390
1384 1391 self.basicHeaderObj.write(self.fp)
1385 1392 self.systemHeaderObj.write(self.fp)
1386 1393 self.radarControllerHeaderObj.write(self.fp)
1387 1394 self.processingHeaderObj.write(self.fp)
1388 1395
1389 1396 def __setNewBlock(self):
1390 1397 """
1391 1398 Si es un nuevo file escribe el First Header caso contrario escribe solo el Basic Header
1392 1399
1393 1400 Return:
1394 1401 0 : si no pudo escribir nada
1395 1402 1 : Si escribio el Basic el First Header
1396 1403 """
1397 1404 if self.fp == None:
1398 1405 self.setNextFile()
1399 1406
1400 1407 if self.flagIsNewFile:
1401 1408 return 1
1402 1409
1403 1410 if self.blockIndex < self.processingHeaderObj.dataBlocksPerFile:
1404 1411 self.basicHeaderObj.write(self.fp)
1405 1412 return 1
1406 1413
1407 1414 if not(self.setNextFile()):
1408 1415 return 0
1409 1416
1410 1417 return 1
1411 1418
1412 1419 def writeNextBlock(self):
1413 1420 """
1414 1421 Selecciona el bloque siguiente de datos y los escribe en un file
1415 1422
1416 1423 Return:
1417 1424 0 : Si no hizo pudo escribir el bloque de datos
1418 1425 1 : Si no pudo escribir el bloque de datos
1419 1426 """
1420 1427 if not(self.__setNewBlock()):
1421 1428 return 0
1422 1429
1423 1430 self.writeBlock()
1424 1431
1425 1432 print("[Writing] Block No. %d/%d" % (self.blockIndex,
1426 1433 self.processingHeaderObj.dataBlocksPerFile))
1427 1434
1428 1435 return 1
1429 1436
1430 1437 def setNextFile(self):
1431 1438 """Determina el siguiente file que sera escrito
1432 1439
1433 1440 Affected:
1434 1441 self.filename
1435 1442 self.subfolder
1436 1443 self.fp
1437 1444 self.setFile
1438 1445 self.flagIsNewFile
1439 1446
1440 1447 Return:
1441 1448 0 : Si el archivo no puede ser escrito
1442 1449 1 : Si el archivo esta listo para ser escrito
1443 1450 """
1444 1451 ext = self.ext
1445 1452 path = self.path
1446 1453
1447 1454 if self.fp != None:
1448 1455 self.fp.close()
1449 1456
1450 1457 if not os.path.exists(path):
1451 1458 os.mkdir(path)
1452 1459
1453 1460 timeTuple = time.localtime(self.dataOut.utctime)
1454 1461 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year, timeTuple.tm_yday)
1455 1462
1456 1463 fullpath = os.path.join(path, subfolder)
1457 1464 setFile = self.setFile
1458 1465
1459 1466 if not(os.path.exists(fullpath)):
1460 1467 os.mkdir(fullpath)
1461 1468 setFile = -1 # inicializo mi contador de seteo
1462 1469 else:
1463 1470 filesList = os.listdir(fullpath)
1464 1471 if len(filesList) > 0:
1465 1472 filesList = sorted(filesList, key=str.lower)
1466 1473 filen = filesList[-1]
1467 1474 # el filename debera tener el siguiente formato
1468 1475 # 0 1234 567 89A BCDE (hex)
1469 1476 # x YYYY DDD SSS .ext
1470 1477 if isNumber(filen[8:11]):
1471 1478 # inicializo mi contador de seteo al seteo del ultimo file
1472 1479 setFile = int(filen[8:11])
1473 1480 else:
1474 1481 setFile = -1
1475 1482 else:
1476 1483 setFile = -1 # inicializo mi contador de seteo
1477 1484
1478 1485 setFile += 1
1479 1486
1480 1487 # If this is a new day it resets some values
1481 1488 if self.dataOut.datatime.date() > self.fileDate:
1482 1489 setFile = 0
1483 1490 self.nTotalBlocks = 0
1484 1491
1485 1492 filen = '{}{:04d}{:03d}{:03d}{}'.format(
1486 1493 self.optchar, timeTuple.tm_year, timeTuple.tm_yday, setFile, ext)
1487 1494
1488 1495 filename = os.path.join(path, subfolder, filen)
1489 1496
1490 1497 fp = open(filename, 'wb')
1491 1498
1492 1499 self.blockIndex = 0
1493 1500 self.filename = filename
1494 1501 self.subfolder = subfolder
1495 1502 self.fp = fp
1496 1503 self.setFile = setFile
1497 1504 self.flagIsNewFile = 1
1498 1505 self.fileDate = self.dataOut.datatime.date()
1499 1506 self.setFirstHeader()
1500 1507
1501 1508 print('[Writing] Opening file: %s' % self.filename)
1502 1509
1503 1510 self.__writeFirstHeader()
1504 1511
1505 1512 return 1
1506 1513
1507 1514 def setup(self, dataOut, path, blocksPerFile, profilesPerBlock=64, set=None, ext=None, datatype=4):
1508 1515 """
1509 1516 Setea el tipo de formato en la cual sera guardada la data y escribe el First Header
1510 1517
1511 1518 Inputs:
1512 1519 path : directory where data will be saved
1513 1520 profilesPerBlock : number of profiles per block
1514 1521 set : initial file set
1515 1522 datatype : An integer number that defines data type:
1516 1523 0 : int8 (1 byte)
1517 1524 1 : int16 (2 bytes)
1518 1525 2 : int32 (4 bytes)
1519 1526 3 : int64 (8 bytes)
1520 1527 4 : float32 (4 bytes)
1521 1528 5 : double64 (8 bytes)
1522 1529
1523 1530 Return:
1524 1531 0 : Si no realizo un buen seteo
1525 1532 1 : Si realizo un buen seteo
1526 1533 """
1527 1534
1528 1535 if ext == None:
1529 1536 ext = self.ext
1530 1537
1531 1538 self.ext = ext.lower()
1532 1539
1533 1540 self.path = path
1534 1541
1535 1542 if set is None:
1536 1543 self.setFile = -1
1537 1544 else:
1538 1545 self.setFile = set - 1
1539 1546
1540 1547 self.blocksPerFile = blocksPerFile
1541 1548 self.profilesPerBlock = profilesPerBlock
1542 1549 self.dataOut = dataOut
1543 1550 self.fileDate = self.dataOut.datatime.date()
1544 1551 self.dtype = self.dataOut.dtype
1545 1552
1546 1553 if datatype is not None:
1547 1554 self.dtype = get_numpy_dtype(datatype)
1548 1555
1549 1556 if not(self.setNextFile()):
1550 1557 print("[Writing] There isn't a next file")
1551 1558 return 0
1552 1559
1553 1560 self.setBlockDimension()
1554 1561
1555 1562 return 1
1556 1563
1557 1564 def run(self, dataOut, path, blocksPerFile=100, profilesPerBlock=64, set=None, ext=None, datatype=4, **kwargs):
1558 1565
1559 1566 if not(self.isConfig):
1560 1567
1561 1568 self.setup(dataOut, path, blocksPerFile, profilesPerBlock=profilesPerBlock,
1562 1569 set=set, ext=ext, datatype=datatype, **kwargs)
1563 1570 self.isConfig = True
1564 1571
1565 1572 self.dataOut = dataOut
1566 1573 self.putData()
1567 1574 return self.dataOut
@@ -1,429 +1,429
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14
15 15 import os
16 16 import sys
17 17 import inspect
18 18 import zmq
19 19 import time
20 20 import pickle
21 21 import traceback
22 22 try:
23 23 from queue import Queue
24 24 except:
25 25 from Queue import Queue
26 26 from threading import Thread
27 27 from multiprocessing import Process
28 28
29 29 from schainpy.utils import log
30 30
31 31
32 32 class ProcessingUnit(object):
33 33
34 34 """
35 35 Update - Jan 2018 - MULTIPROCESSING
36 36 All the "call" methods present in the previous base were removed.
37 37 The majority of operations are independant processes, thus
38 38 the decorator is in charge of communicate the operation processes
39 39 with the proccessing unit via IPC.
40 40
41 41 The constructor does not receive any argument. The remaining methods
42 42 are related with the operations to execute.
43 43
44 44
45 45 """
46 46 proc_type = 'processing'
47 47 __attrs__ = []
48 48
49 49 def __init__(self):
50 50
51 51 self.dataIn = None
52 52 self.dataOut = None
53 53 self.isConfig = False
54 54 self.operations = []
55 55 self.plots = []
56 56
57 57 def getAllowedArgs(self):
58 58 if hasattr(self, '__attrs__'):
59 59 return self.__attrs__
60 60 else:
61 61 return inspect.getargspec(self.run).args
62 62
63 63 def addOperation(self, conf, operation):
64 64 """
65 65 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
66 66 posses the id of the operation process (IPC purposes)
67 67
68 68 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
69 69 identificador asociado a este objeto.
70 70
71 71 Input:
72 72
73 73 object : objeto de la clase "Operation"
74 74
75 75 Return:
76 76
77 77 objId : identificador del objeto, necesario para comunicar con master(procUnit)
78 78 """
79 79
80 80 self.operations.append(
81 81 (operation, conf.type, conf.id, conf.getKwargs()))
82 82
83 83 if 'plot' in self.name.lower():
84 84 self.plots.append(operation.CODE)
85 85
86 86 def getOperationObj(self, objId):
87 87
88 88 if objId not in list(self.operations.keys()):
89 89 return None
90 90
91 91 return self.operations[objId]
92 92
93 93 def operation(self, **kwargs):
94 94 """
95 95 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
96 96 atributos del objeto dataOut
97 97
98 98 Input:
99 99
100 100 **kwargs : Diccionario de argumentos de la funcion a ejecutar
101 101 """
102 102
103 103 raise NotImplementedError
104 104
105 105 def setup(self):
106 106
107 107 raise NotImplementedError
108 108
109 109 def run(self):
110 110
111 111 raise NotImplementedError
112 112
113 113 def close(self):
114 114
115 115 return
116 116
117 117
118 118 class Operation(object):
119 119
120 120 """
121 121 Update - Jan 2018 - MULTIPROCESSING
122 122
123 123 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
124 124 The constructor doe snot receive any argument, neither the baseclass.
125 125
126 126
127 127 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
128 128 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
129 129 acumulacion dentro de esta clase
130 130
131 131 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
132 132
133 133 """
134 134 proc_type = 'operation'
135 135 __attrs__ = []
136 136
137 137 def __init__(self):
138 138
139 139 self.id = None
140 140 self.isConfig = False
141 141
142 142 if not hasattr(self, 'name'):
143 143 self.name = self.__class__.__name__
144 144
145 145 def getAllowedArgs(self):
146 146 if hasattr(self, '__attrs__'):
147 147 return self.__attrs__
148 148 else:
149 149 return inspect.getargspec(self.run).args
150 150
151 151 def setup(self):
152 152
153 153 self.isConfig = True
154 154
155 155 raise NotImplementedError
156 156
157 157 def run(self, dataIn, **kwargs):
158 158 """
159 159 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
160 160 atributos del objeto dataIn.
161 161
162 162 Input:
163 163
164 164 dataIn : objeto del tipo JROData
165 165
166 166 Return:
167 167
168 168 None
169 169
170 170 Affected:
171 171 __buffer : buffer de recepcion de datos.
172 172
173 173 """
174 174 if not self.isConfig:
175 175 self.setup(**kwargs)
176 176
177 177 raise NotImplementedError
178 178
179 179 def close(self):
180 180
181 181 return
182 182
183 183 class InputQueue(Thread):
184 184
185 185 '''
186 186 Class to hold input data for Proccessing Units and external Operations,
187 187 '''
188 188
189 189 def __init__(self, project_id, inputId, lock=None):
190 190
191 191 Thread.__init__(self)
192 192 self.queue = Queue()
193 193 self.project_id = project_id
194 194 self.inputId = inputId
195 195 self.lock = lock
196 196 self.islocked = False
197 197 self.size = 0
198 198
199 199 def run(self):
200 200
201 201 c = zmq.Context()
202 202 self.receiver = c.socket(zmq.SUB)
203 203 self.receiver.connect(
204 204 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
205 205 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
206 206
207 207 while True:
208 208 obj = self.receiver.recv_multipart()[1]
209 209 self.size += sys.getsizeof(obj)
210 210 self.queue.put(obj)
211 211
212 212 def get(self):
213 213
214 if not self.islocked and self.size/1000000 > 512:
214 if not self.islocked and self.size/1000000 > 1024:
215 215 self.lock.n.value += 1
216 216 self.islocked = True
217 217 self.lock.clear()
218 elif self.islocked and self.size/1000000 <= 512:
218 elif self.islocked and self.size/1000000 <= 1024:
219 219 self.islocked = False
220 220 self.lock.n.value -= 1
221 221 if self.lock.n.value == 0:
222 222 self.lock.set()
223 223
224 224 obj = self.queue.get()
225 225 self.size -= sys.getsizeof(obj)
226 226 return pickle.loads(obj)
227 227
228 228
229 229 def MPDecorator(BaseClass):
230 230 """
231 231 Multiprocessing class decorator
232 232
233 233 This function add multiprocessing features to a BaseClass. Also, it handle
234 234 the communication beetween processes (readers, procUnits and operations).
235 235 """
236 236
237 237 class MPClass(BaseClass, Process):
238 238
239 239 def __init__(self, *args, **kwargs):
240 240 super(MPClass, self).__init__()
241 241 Process.__init__(self)
242 242 self.operationKwargs = {}
243 243 self.args = args
244 244 self.kwargs = kwargs
245 245 self.sender = None
246 246 self.receiver = None
247 247 self.i = 0
248 248 self.t = time.time()
249 249 self.name = BaseClass.__name__
250 250 self.__doc__ = BaseClass.__doc__
251 251
252 252 if 'plot' in self.name.lower() and not self.name.endswith('_'):
253 253 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
254 254
255 255 self.start_time = time.time()
256 256 self.id = args[0]
257 257 self.inputId = args[1]
258 258 self.project_id = args[2]
259 259 self.err_queue = args[3]
260 260 self.lock = args[4]
261 261 self.typeProc = args[5]
262 262 self.err_queue.put('#_start_#')
263 263 if self.inputId is not None:
264 264 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
265 265
266 266 def subscribe(self):
267 267 '''
268 268 Start the zmq socket receiver and subcribe to input ID.
269 269 '''
270 270
271 271 self.queue.start()
272 272
273 273 def listen(self):
274 274 '''
275 275 This function waits for objects
276 276 '''
277 277
278 278 return self.queue.get()
279 279
280 280 def set_publisher(self):
281 281 '''
282 282 This function create a zmq socket for publishing objects.
283 283 '''
284 284
285 285 time.sleep(0.5)
286 286
287 287 c = zmq.Context()
288 288 self.sender = c.socket(zmq.PUB)
289 289 self.sender.connect(
290 290 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
291 291
292 292 def publish(self, data, id):
293 293 '''
294 294 This function publish an object, to an specific topic.
295 295 It blocks publishing when receiver queue is full to avoid data loss
296 296 '''
297 297
298 298 if self.inputId is None:
299 299 self.lock.wait()
300 300 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
301 301
302 302 def runReader(self):
303 303 '''
304 304 Run fuction for read units
305 305 '''
306 306 while True:
307 307
308 308 try:
309 309 BaseClass.run(self, **self.kwargs)
310 310 except:
311 311 err = traceback.format_exc()
312 312 if 'No more files' in err:
313 313 log.warning('No more files to read', self.name)
314 314 else:
315 315 self.err_queue.put('{}|{}'.format(self.name, err))
316 316 self.dataOut.error = True
317 317
318 318 for op, optype, opId, kwargs in self.operations:
319 319 if optype == 'self' and not self.dataOut.flagNoData:
320 320 op(**kwargs)
321 321 elif optype == 'other' and not self.dataOut.flagNoData:
322 322 self.dataOut = op.run(self.dataOut, **self.kwargs)
323 323 elif optype == 'external':
324 324 self.publish(self.dataOut, opId)
325 325
326 326 if self.dataOut.flagNoData and not self.dataOut.error:
327 327 continue
328 328
329 329 self.publish(self.dataOut, self.id)
330 330
331 331 if self.dataOut.error:
332 332 break
333 333
334 334 time.sleep(0.5)
335 335
336 336 def runProc(self):
337 337 '''
338 338 Run function for proccessing units
339 339 '''
340 340
341 341 while True:
342 342 self.dataIn = self.listen()
343 343
344 344 if self.dataIn.flagNoData and self.dataIn.error is None:
345 345 continue
346 346 elif not self.dataIn.error:
347 347 try:
348 348 BaseClass.run(self, **self.kwargs)
349 349 except:
350 350 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
351 351 self.dataOut.error = True
352 352 elif self.dataIn.error:
353 353 self.dataOut.error = self.dataIn.error
354 354 self.dataOut.flagNoData = True
355 355
356 356 for op, optype, opId, kwargs in self.operations:
357 357 if optype == 'self' and not self.dataOut.flagNoData:
358 358 op(**kwargs)
359 359 elif optype == 'other' and not self.dataOut.flagNoData:
360 360 self.dataOut = op.run(self.dataOut, **kwargs)
361 361 elif optype == 'external' and not self.dataOut.flagNoData:
362 362 self.publish(self.dataOut, opId)
363 363
364 364 self.publish(self.dataOut, self.id)
365 365 for op, optype, opId, kwargs in self.operations:
366 366 if optype == 'external' and self.dataOut.error:
367 367 self.publish(self.dataOut, opId)
368 368
369 369 if self.dataOut.error:
370 370 break
371 371
372 372 time.sleep(0.5)
373 373
374 374 def runOp(self):
375 375 '''
376 376 Run function for external operations (this operations just receive data
377 377 ex: plots, writers, publishers)
378 378 '''
379 379
380 380 while True:
381 381
382 382 dataOut = self.listen()
383 383
384 384 if not dataOut.error:
385 385 try:
386 386 BaseClass.run(self, dataOut, **self.kwargs)
387 387 except:
388 388 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
389 389 dataOut.error = True
390 390 else:
391 391 break
392 392
393 393 def run(self):
394 394 if self.typeProc is "ProcUnit":
395 395
396 396 if self.inputId is not None:
397 397 self.subscribe()
398 398
399 399 self.set_publisher()
400 400
401 401 if 'Reader' not in BaseClass.__name__:
402 402 self.runProc()
403 403 else:
404 404 self.runReader()
405 405
406 406 elif self.typeProc is "Operation":
407 407
408 408 self.subscribe()
409 409 self.runOp()
410 410
411 411 else:
412 412 raise ValueError("Unknown type")
413 413
414 414 self.close()
415 415
416 416 def close(self):
417 417
418 418 BaseClass.close(self)
419 419 self.err_queue.put('#_end_#')
420 420
421 421 if self.sender:
422 422 self.sender.close()
423 423
424 424 if self.receiver:
425 425 self.receiver.close()
426 426
427 427 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
428 428
429 429 return MPClass
@@ -1,1056 +1,1056
1 1 import itertools
2 2
3 3 import numpy
4 4
5 5 from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator, Operation
6 6 from schainpy.model.data.jrodata import Spectra
7 7 from schainpy.model.data.jrodata import hildebrand_sekhon
8 8 from schainpy.utils import log
9 9
10 10 @MPDecorator
11 11 class SpectraProc(ProcessingUnit):
12 12
13 13
14 14 def __init__(self):
15 15
16 16 ProcessingUnit.__init__(self)
17 17
18 18 self.buffer = None
19 19 self.firstdatatime = None
20 20 self.profIndex = 0
21 21 self.dataOut = Spectra()
22 22 self.id_min = None
23 23 self.id_max = None
24 24 self.setupReq = False #Agregar a todas las unidades de proc
25 25
26 26 def __updateSpecFromVoltage(self):
27 27
28 28 self.dataOut.timeZone = self.dataIn.timeZone
29 29 self.dataOut.dstFlag = self.dataIn.dstFlag
30 30 self.dataOut.errorCount = self.dataIn.errorCount
31 31 self.dataOut.useLocalTime = self.dataIn.useLocalTime
32 32 try:
33 33 self.dataOut.processingHeaderObj = self.dataIn.processingHeaderObj.copy()
34 34 except:
35 35 pass
36 36 self.dataOut.radarControllerHeaderObj = self.dataIn.radarControllerHeaderObj.copy()
37 37 self.dataOut.systemHeaderObj = self.dataIn.systemHeaderObj.copy()
38 38 self.dataOut.channelList = self.dataIn.channelList
39 39 self.dataOut.heightList = self.dataIn.heightList
40 40 self.dataOut.dtype = numpy.dtype([('real', '<f4'), ('imag', '<f4')])
41 41
42 42 self.dataOut.nBaud = self.dataIn.nBaud
43 43 self.dataOut.nCode = self.dataIn.nCode
44 44 self.dataOut.code = self.dataIn.code
45 45 self.dataOut.nProfiles = self.dataOut.nFFTPoints
46 46
47 47 self.dataOut.flagDiscontinuousBlock = self.dataIn.flagDiscontinuousBlock
48 48 self.dataOut.utctime = self.firstdatatime
49 49 # asumo q la data esta decodificada
50 50 self.dataOut.flagDecodeData = self.dataIn.flagDecodeData
51 51 # asumo q la data esta sin flip
52 52 self.dataOut.flagDeflipData = self.dataIn.flagDeflipData
53 53 self.dataOut.flagShiftFFT = False
54 54
55 55 self.dataOut.nCohInt = self.dataIn.nCohInt
56 56 self.dataOut.nIncohInt = 1
57 57
58 58 self.dataOut.windowOfFilter = self.dataIn.windowOfFilter
59 59
60 60 self.dataOut.frequency = self.dataIn.frequency
61 61 self.dataOut.realtime = self.dataIn.realtime
62 62
63 63 self.dataOut.azimuth = self.dataIn.azimuth
64 64 self.dataOut.zenith = self.dataIn.zenith
65 65
66 66 self.dataOut.beam.codeList = self.dataIn.beam.codeList
67 67 self.dataOut.beam.azimuthList = self.dataIn.beam.azimuthList
68 68 self.dataOut.beam.zenithList = self.dataIn.beam.zenithList
69 69
70 70 def __getFft(self):
71 71 """
72 72 Convierte valores de Voltaje a Spectra
73 73
74 74 Affected:
75 75 self.dataOut.data_spc
76 76 self.dataOut.data_cspc
77 77 self.dataOut.data_dc
78 78 self.dataOut.heightList
79 79 self.profIndex
80 80 self.buffer
81 81 self.dataOut.flagNoData
82 82 """
83 83 fft_volt = numpy.fft.fft(
84 84 self.buffer, n=self.dataOut.nFFTPoints, axis=1)
85 85 fft_volt = fft_volt.astype(numpy.dtype('complex'))
86 86 dc = fft_volt[:, 0, :]
87 87
88 88 # calculo de self-spectra
89 89 fft_volt = numpy.fft.fftshift(fft_volt, axes=(1,))
90 90 spc = fft_volt * numpy.conjugate(fft_volt)
91 91 spc = spc.real
92 92
93 93 blocksize = 0
94 94 blocksize += dc.size
95 95 blocksize += spc.size
96 96
97 97 cspc = None
98 98 pairIndex = 0
99 99 if self.dataOut.pairsList != None:
100 100 # calculo de cross-spectra
101 101 cspc = numpy.zeros(
102 102 (self.dataOut.nPairs, self.dataOut.nFFTPoints, self.dataOut.nHeights), dtype='complex')
103 103 for pair in self.dataOut.pairsList:
104 104 if pair[0] not in self.dataOut.channelList:
105 105 raise ValueError("Error getting CrossSpectra: pair 0 of %s is not in channelList = %s" % (
106 106 str(pair), str(self.dataOut.channelList)))
107 107 if pair[1] not in self.dataOut.channelList:
108 108 raise ValueError("Error getting CrossSpectra: pair 1 of %s is not in channelList = %s" % (
109 109 str(pair), str(self.dataOut.channelList)))
110 110
111 111 cspc[pairIndex, :, :] = fft_volt[pair[0], :, :] * \
112 112 numpy.conjugate(fft_volt[pair[1], :, :])
113 113 pairIndex += 1
114 114 blocksize += cspc.size
115 115
116 116 self.dataOut.data_spc = spc
117 117 self.dataOut.data_cspc = cspc
118 118 self.dataOut.data_dc = dc
119 119 self.dataOut.blockSize = blocksize
120 self.dataOut.flagShiftFFT = True
120 self.dataOut.flagShiftFFT = False
121 121
122 122 def run(self, nProfiles=None, nFFTPoints=None, pairsList=[], ippFactor=None, shift_fft=False):
123 123
124 124 if self.dataIn.type == "Spectra":
125 125 self.dataOut.copy(self.dataIn)
126 126 if shift_fft:
127 127 #desplaza a la derecha en el eje 2 determinadas posiciones
128 128 shift = int(self.dataOut.nFFTPoints/2)
129 129 self.dataOut.data_spc = numpy.roll(self.dataOut.data_spc, shift , axis=1)
130 130
131 131 if self.dataOut.data_cspc is not None:
132 132 #desplaza a la derecha en el eje 2 determinadas posiciones
133 133 self.dataOut.data_cspc = numpy.roll(self.dataOut.data_cspc, shift, axis=1)
134 134
135 135 return True
136 136
137 137 if self.dataIn.type == "Voltage":
138 138
139 139 self.dataOut.flagNoData = True
140 140
141 141 if nFFTPoints == None:
142 142 raise ValueError("This SpectraProc.run() need nFFTPoints input variable")
143 143
144 144 if nProfiles == None:
145 145 nProfiles = nFFTPoints
146 146
147 147 if ippFactor == None:
148 148 ippFactor = 1
149 149
150 150 self.dataOut.ippFactor = ippFactor
151 151
152 152 self.dataOut.nFFTPoints = nFFTPoints
153 153 self.dataOut.pairsList = pairsList
154 154
155 155 if self.buffer is None:
156 156 self.buffer = numpy.zeros((self.dataIn.nChannels,
157 157 nProfiles,
158 158 self.dataIn.nHeights),
159 159 dtype='complex')
160 160
161 161 if self.dataIn.flagDataAsBlock:
162 162 nVoltProfiles = self.dataIn.data.shape[1]
163 163
164 164 if nVoltProfiles == nProfiles:
165 165 self.buffer = self.dataIn.data.copy()
166 166 self.profIndex = nVoltProfiles
167 167
168 168 elif nVoltProfiles < nProfiles:
169 169
170 170 if self.profIndex == 0:
171 171 self.id_min = 0
172 172 self.id_max = nVoltProfiles
173 173
174 174 self.buffer[:, self.id_min:self.id_max,
175 175 :] = self.dataIn.data
176 176 self.profIndex += nVoltProfiles
177 177 self.id_min += nVoltProfiles
178 178 self.id_max += nVoltProfiles
179 179 else:
180 180 raise ValueError("The type object %s has %d profiles, it should just has %d profiles" % (
181 181 self.dataIn.type, self.dataIn.data.shape[1], nProfiles))
182 182 self.dataOut.flagNoData = True
183 183 return 0
184 184 else:
185 185 self.buffer[:, self.profIndex, :] = self.dataIn.data.copy()
186 186 self.profIndex += 1
187 187
188 188 if self.firstdatatime == None:
189 189 self.firstdatatime = self.dataIn.utctime
190 190
191 191 if self.profIndex == nProfiles:
192 192 self.__updateSpecFromVoltage()
193 193 self.__getFft()
194 194
195 195 self.dataOut.flagNoData = False
196 196 self.firstdatatime = None
197 197 self.profIndex = 0
198 198
199 199 return True
200 200
201 201 raise ValueError("The type of input object '%s' is not valid" % (
202 202 self.dataIn.type))
203 203
204 204 def __selectPairs(self, pairsList):
205 205
206 206 if not pairsList:
207 207 return
208 208
209 209 pairs = []
210 210 pairsIndex = []
211 211
212 212 for pair in pairsList:
213 213 if pair[0] not in self.dataOut.channelList or pair[1] not in self.dataOut.channelList:
214 214 continue
215 215 pairs.append(pair)
216 216 pairsIndex.append(pairs.index(pair))
217 217
218 218 self.dataOut.data_cspc = self.dataOut.data_cspc[pairsIndex]
219 219 self.dataOut.pairsList = pairs
220 220
221 221 return
222 222
223 223 def __selectPairsByChannel(self, channelList=None):
224 224
225 225 if channelList == None:
226 226 return
227 227
228 228 pairsIndexListSelected = []
229 229 for pairIndex in self.dataOut.pairsIndexList:
230 230 # First pair
231 231 if self.dataOut.pairsList[pairIndex][0] not in channelList:
232 232 continue
233 233 # Second pair
234 234 if self.dataOut.pairsList[pairIndex][1] not in channelList:
235 235 continue
236 236
237 237 pairsIndexListSelected.append(pairIndex)
238 238
239 239 if not pairsIndexListSelected:
240 240 self.dataOut.data_cspc = None
241 241 self.dataOut.pairsList = []
242 242 return
243 243
244 244 self.dataOut.data_cspc = self.dataOut.data_cspc[pairsIndexListSelected]
245 245 self.dataOut.pairsList = [self.dataOut.pairsList[i]
246 246 for i in pairsIndexListSelected]
247 247
248 248 return
249 249
250 250 def selectChannels(self, channelList):
251 251
252 252 channelIndexList = []
253 253
254 254 for channel in channelList:
255 255 if channel not in self.dataOut.channelList:
256 256 raise ValueError("Error selecting channels, Channel %d is not valid.\nAvailable channels = %s" % (
257 257 channel, str(self.dataOut.channelList)))
258 258
259 259 index = self.dataOut.channelList.index(channel)
260 260 channelIndexList.append(index)
261 261
262 262 self.selectChannelsByIndex(channelIndexList)
263 263
264 264 def selectChannelsByIndex(self, channelIndexList):
265 265 """
266 266 Selecciona un bloque de datos en base a canales segun el channelIndexList
267 267
268 268 Input:
269 269 channelIndexList : lista sencilla de canales a seleccionar por ej. [2,3,7]
270 270
271 271 Affected:
272 272 self.dataOut.data_spc
273 273 self.dataOut.channelIndexList
274 274 self.dataOut.nChannels
275 275
276 276 Return:
277 277 None
278 278 """
279 279
280 280 for channelIndex in channelIndexList:
281 281 if channelIndex not in self.dataOut.channelIndexList:
282 282 raise ValueError("Error selecting channels: The value %d in channelIndexList is not valid.\nAvailable channel indexes = " % (
283 283 channelIndex, self.dataOut.channelIndexList))
284 284
285 285 data_spc = self.dataOut.data_spc[channelIndexList, :]
286 286 data_dc = self.dataOut.data_dc[channelIndexList, :]
287 287
288 288 self.dataOut.data_spc = data_spc
289 289 self.dataOut.data_dc = data_dc
290 290
291 291 # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
292 292 self.dataOut.channelList = range(len(channelIndexList))
293 293 self.__selectPairsByChannel(channelIndexList)
294 294
295 295 return 1
296 296
297 297
298 298 def selectFFTs(self, minFFT, maxFFT ):
299 299 """
300 300 Selecciona un bloque de datos en base a un grupo de valores de puntos FFTs segun el rango
301 301 minFFT<= FFT <= maxFFT
302 302 """
303 303
304 304 if (minFFT > maxFFT):
305 305 raise ValueError("Error selecting heights: Height range (%d,%d) is not valid" % (minFFT, maxFFT))
306 306
307 307 if (minFFT < self.dataOut.getFreqRange()[0]):
308 308 minFFT = self.dataOut.getFreqRange()[0]
309 309
310 310 if (maxFFT > self.dataOut.getFreqRange()[-1]):
311 311 maxFFT = self.dataOut.getFreqRange()[-1]
312 312
313 313 minIndex = 0
314 314 maxIndex = 0
315 315 FFTs = self.dataOut.getFreqRange()
316 316
317 317 inda = numpy.where(FFTs >= minFFT)
318 318 indb = numpy.where(FFTs <= maxFFT)
319 319
320 320 try:
321 321 minIndex = inda[0][0]
322 322 except:
323 323 minIndex = 0
324 324
325 325 try:
326 326 maxIndex = indb[0][-1]
327 327 except:
328 328 maxIndex = len(FFTs)
329 329
330 330 self.selectFFTsByIndex(minIndex, maxIndex)
331 331
332 332 return 1
333 333
334 334
335 335 def setH0(self, h0, deltaHeight = None):
336 336
337 337 if not deltaHeight:
338 338 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
339 339
340 340 nHeights = self.dataOut.nHeights
341 341
342 342 newHeiRange = h0 + numpy.arange(nHeights)*deltaHeight
343 343
344 344 self.dataOut.heightList = newHeiRange
345 345
346 346
347 347 def selectHeights(self, minHei, maxHei):
348 348 """
349 349 Selecciona un bloque de datos en base a un grupo de valores de alturas segun el rango
350 350 minHei <= height <= maxHei
351 351
352 352 Input:
353 353 minHei : valor minimo de altura a considerar
354 354 maxHei : valor maximo de altura a considerar
355 355
356 356 Affected:
357 357 Indirectamente son cambiados varios valores a travez del metodo selectHeightsByIndex
358 358
359 359 Return:
360 360 1 si el metodo se ejecuto con exito caso contrario devuelve 0
361 361 """
362 362
363 363
364 364 if (minHei > maxHei):
365 365 raise ValueError("Error selecting heights: Height range (%d,%d) is not valid" % (minHei, maxHei))
366 366
367 367 if (minHei < self.dataOut.heightList[0]):
368 368 minHei = self.dataOut.heightList[0]
369 369
370 370 if (maxHei > self.dataOut.heightList[-1]):
371 371 maxHei = self.dataOut.heightList[-1]
372 372
373 373 minIndex = 0
374 374 maxIndex = 0
375 375 heights = self.dataOut.heightList
376 376
377 377 inda = numpy.where(heights >= minHei)
378 378 indb = numpy.where(heights <= maxHei)
379 379
380 380 try:
381 381 minIndex = inda[0][0]
382 382 except:
383 383 minIndex = 0
384 384
385 385 try:
386 386 maxIndex = indb[0][-1]
387 387 except:
388 388 maxIndex = len(heights)
389 389
390 390 self.selectHeightsByIndex(minIndex, maxIndex)
391 391
392 392
393 393 return 1
394 394
395 395 def getBeaconSignal(self, tauindex=0, channelindex=0, hei_ref=None):
396 396 newheis = numpy.where(
397 397 self.dataOut.heightList > self.dataOut.radarControllerHeaderObj.Taus[tauindex])
398 398
399 399 if hei_ref != None:
400 400 newheis = numpy.where(self.dataOut.heightList > hei_ref)
401 401
402 402 minIndex = min(newheis[0])
403 403 maxIndex = max(newheis[0])
404 404 data_spc = self.dataOut.data_spc[:, :, minIndex:maxIndex + 1]
405 405 heightList = self.dataOut.heightList[minIndex:maxIndex + 1]
406 406
407 407 # determina indices
408 408 nheis = int(self.dataOut.radarControllerHeaderObj.txB /
409 409 (self.dataOut.heightList[1] - self.dataOut.heightList[0]))
410 410 avg_dB = 10 * \
411 411 numpy.log10(numpy.sum(data_spc[channelindex, :, :], axis=0))
412 412 beacon_dB = numpy.sort(avg_dB)[-nheis:]
413 413 beacon_heiIndexList = []
414 414 for val in avg_dB.tolist():
415 415 if val >= beacon_dB[0]:
416 416 beacon_heiIndexList.append(avg_dB.tolist().index(val))
417 417
418 418 #data_spc = data_spc[:,:,beacon_heiIndexList]
419 419 data_cspc = None
420 420 if self.dataOut.data_cspc is not None:
421 421 data_cspc = self.dataOut.data_cspc[:, :, minIndex:maxIndex + 1]
422 422 #data_cspc = data_cspc[:,:,beacon_heiIndexList]
423 423
424 424 data_dc = None
425 425 if self.dataOut.data_dc is not None:
426 426 data_dc = self.dataOut.data_dc[:, minIndex:maxIndex + 1]
427 427 #data_dc = data_dc[:,beacon_heiIndexList]
428 428
429 429 self.dataOut.data_spc = data_spc
430 430 self.dataOut.data_cspc = data_cspc
431 431 self.dataOut.data_dc = data_dc
432 432 self.dataOut.heightList = heightList
433 433 self.dataOut.beacon_heiIndexList = beacon_heiIndexList
434 434
435 435 return 1
436 436
437 437 def selectFFTsByIndex(self, minIndex, maxIndex):
438 438 """
439 439
440 440 """
441 441
442 442 if (minIndex < 0) or (minIndex > maxIndex):
443 443 raise ValueError("Error selecting heights: Index range (%d,%d) is not valid" % (minIndex, maxIndex))
444 444
445 445 if (maxIndex >= self.dataOut.nProfiles):
446 446 maxIndex = self.dataOut.nProfiles-1
447 447
448 448 #Spectra
449 449 data_spc = self.dataOut.data_spc[:,minIndex:maxIndex+1,:]
450 450
451 451 data_cspc = None
452 452 if self.dataOut.data_cspc is not None:
453 453 data_cspc = self.dataOut.data_cspc[:,minIndex:maxIndex+1,:]
454 454
455 455 data_dc = None
456 456 if self.dataOut.data_dc is not None:
457 457 data_dc = self.dataOut.data_dc[minIndex:maxIndex+1,:]
458 458
459 459 self.dataOut.data_spc = data_spc
460 460 self.dataOut.data_cspc = data_cspc
461 461 self.dataOut.data_dc = data_dc
462 462
463 463 self.dataOut.ippSeconds = self.dataOut.ippSeconds*(self.dataOut.nFFTPoints / numpy.shape(data_cspc)[1])
464 464 self.dataOut.nFFTPoints = numpy.shape(data_cspc)[1]
465 465 self.dataOut.profilesPerBlock = numpy.shape(data_cspc)[1]
466 466
467 467 return 1
468 468
469 469
470 470
471 471 def selectHeightsByIndex(self, minIndex, maxIndex):
472 472 """
473 473 Selecciona un bloque de datos en base a un grupo indices de alturas segun el rango
474 474 minIndex <= index <= maxIndex
475 475
476 476 Input:
477 477 minIndex : valor de indice minimo de altura a considerar
478 478 maxIndex : valor de indice maximo de altura a considerar
479 479
480 480 Affected:
481 481 self.dataOut.data_spc
482 482 self.dataOut.data_cspc
483 483 self.dataOut.data_dc
484 484 self.dataOut.heightList
485 485
486 486 Return:
487 487 1 si el metodo se ejecuto con exito caso contrario devuelve 0
488 488 """
489 489
490 490 if (minIndex < 0) or (minIndex > maxIndex):
491 491 raise ValueError("Error selecting heights: Index range (%d,%d) is not valid" % (
492 492 minIndex, maxIndex))
493 493
494 494 if (maxIndex >= self.dataOut.nHeights):
495 495 maxIndex = self.dataOut.nHeights - 1
496 496
497 497 # Spectra
498 498 data_spc = self.dataOut.data_spc[:, :, minIndex:maxIndex + 1]
499 499
500 500 data_cspc = None
501 501 if self.dataOut.data_cspc is not None:
502 502 data_cspc = self.dataOut.data_cspc[:, :, minIndex:maxIndex + 1]
503 503
504 504 data_dc = None
505 505 if self.dataOut.data_dc is not None:
506 506 data_dc = self.dataOut.data_dc[:, minIndex:maxIndex + 1]
507 507
508 508 self.dataOut.data_spc = data_spc
509 509 self.dataOut.data_cspc = data_cspc
510 510 self.dataOut.data_dc = data_dc
511 511
512 512 self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex + 1]
513 513
514 514 return 1
515 515
516 516 def removeDC(self, mode=2):
517 517 jspectra = self.dataOut.data_spc
518 518 jcspectra = self.dataOut.data_cspc
519 519
520 520 num_chan = jspectra.shape[0]
521 521 num_hei = jspectra.shape[2]
522 522
523 523 if jcspectra is not None:
524 524 jcspectraExist = True
525 525 num_pairs = jcspectra.shape[0]
526 526 else:
527 527 jcspectraExist = False
528 528
529 529 freq_dc = int(jspectra.shape[1] / 2)
530 530 ind_vel = numpy.array([-2, -1, 1, 2]) + freq_dc
531 531 ind_vel = ind_vel.astype(int)
532 532
533 533 if ind_vel[0] < 0:
534 534 ind_vel[list(range(0, 1))] = ind_vel[list(range(0, 1))] + self.num_prof
535 535
536 536 if mode == 1:
537 537 jspectra[:, freq_dc, :] = (
538 538 jspectra[:, ind_vel[1], :] + jspectra[:, ind_vel[2], :]) / 2 # CORRECCION
539 539
540 540 if jcspectraExist:
541 541 jcspectra[:, freq_dc, :] = (
542 542 jcspectra[:, ind_vel[1], :] + jcspectra[:, ind_vel[2], :]) / 2
543 543
544 544 if mode == 2:
545 545
546 546 vel = numpy.array([-2, -1, 1, 2])
547 547 xx = numpy.zeros([4, 4])
548 548
549 549 for fil in range(4):
550 550 xx[fil, :] = vel[fil]**numpy.asarray(list(range(4)))
551 551
552 552 xx_inv = numpy.linalg.inv(xx)
553 553 xx_aux = xx_inv[0, :]
554 554
555 555 for ich in range(num_chan):
556 556 yy = jspectra[ich, ind_vel, :]
557 557 jspectra[ich, freq_dc, :] = numpy.dot(xx_aux, yy)
558 558
559 559 junkid = jspectra[ich, freq_dc, :] <= 0
560 560 cjunkid = sum(junkid)
561 561
562 562 if cjunkid.any():
563 563 jspectra[ich, freq_dc, junkid.nonzero()] = (
564 564 jspectra[ich, ind_vel[1], junkid] + jspectra[ich, ind_vel[2], junkid]) / 2
565 565
566 566 if jcspectraExist:
567 567 for ip in range(num_pairs):
568 568 yy = jcspectra[ip, ind_vel, :]
569 569 jcspectra[ip, freq_dc, :] = numpy.dot(xx_aux, yy)
570 570
571 571 self.dataOut.data_spc = jspectra
572 572 self.dataOut.data_cspc = jcspectra
573 573
574 574 return 1
575 575
576 576 def removeInterference2(self):
577 577
578 578 cspc = self.dataOut.data_cspc
579 579 spc = self.dataOut.data_spc
580 580 Heights = numpy.arange(cspc.shape[2])
581 581 realCspc = numpy.abs(cspc)
582 582
583 583 for i in range(cspc.shape[0]):
584 584 LinePower= numpy.sum(realCspc[i], axis=0)
585 585 Threshold = numpy.amax(LinePower)-numpy.sort(LinePower)[len(Heights)-int(len(Heights)*0.1)]
586 586 SelectedHeights = Heights[ numpy.where( LinePower < Threshold ) ]
587 587 InterferenceSum = numpy.sum( realCspc[i,:,SelectedHeights], axis=0 )
588 588 InterferenceThresholdMin = numpy.sort(InterferenceSum)[int(len(InterferenceSum)*0.98)]
589 589 InterferenceThresholdMax = numpy.sort(InterferenceSum)[int(len(InterferenceSum)*0.99)]
590 590
591 591
592 592 InterferenceRange = numpy.where( ([InterferenceSum > InterferenceThresholdMin]))# , InterferenceSum < InterferenceThresholdMax]) )
593 593 #InterferenceRange = numpy.where( ([InterferenceRange < InterferenceThresholdMax]))
594 594 if len(InterferenceRange)<int(cspc.shape[1]*0.3):
595 595 cspc[i,InterferenceRange,:] = numpy.NaN
596 596
597 597
598 598
599 599 self.dataOut.data_cspc = cspc
600 600
601 601 def removeInterference(self, interf = 2,hei_interf = None, nhei_interf = None, offhei_interf = None):
602 602
603 603 jspectra = self.dataOut.data_spc
604 604 jcspectra = self.dataOut.data_cspc
605 605 jnoise = self.dataOut.getNoise()
606 606 num_incoh = self.dataOut.nIncohInt
607 607
608 608 num_channel = jspectra.shape[0]
609 609 num_prof = jspectra.shape[1]
610 610 num_hei = jspectra.shape[2]
611 611
612 612 # hei_interf
613 613 if hei_interf is None:
614 614 count_hei = int(num_hei / 2)
615 615 hei_interf = numpy.asmatrix(list(range(count_hei))) + num_hei - count_hei
616 616 hei_interf = numpy.asarray(hei_interf)[0]
617 617 # nhei_interf
618 618 if (nhei_interf == None):
619 619 nhei_interf = 5
620 620 if (nhei_interf < 1):
621 621 nhei_interf = 1
622 622 if (nhei_interf > count_hei):
623 623 nhei_interf = count_hei
624 624 if (offhei_interf == None):
625 625 offhei_interf = 0
626 626
627 627 ind_hei = list(range(num_hei))
628 628 # mask_prof = numpy.asarray(range(num_prof - 2)) + 1
629 629 # mask_prof[range(num_prof/2 - 1,len(mask_prof))] += 1
630 630 mask_prof = numpy.asarray(list(range(num_prof)))
631 631 num_mask_prof = mask_prof.size
632 632 comp_mask_prof = [0, num_prof / 2]
633 633
634 634 # noise_exist: Determina si la variable jnoise ha sido definida y contiene la informacion del ruido de cada canal
635 635 if (jnoise.size < num_channel or numpy.isnan(jnoise).any()):
636 636 jnoise = numpy.nan
637 637 noise_exist = jnoise[0] < numpy.Inf
638 638
639 639 # Subrutina de Remocion de la Interferencia
640 640 for ich in range(num_channel):
641 641 # Se ordena los espectros segun su potencia (menor a mayor)
642 642 power = jspectra[ich, mask_prof, :]
643 643 power = power[:, hei_interf]
644 644 power = power.sum(axis=0)
645 645 psort = power.ravel().argsort()
646 646
647 647 # Se estima la interferencia promedio en los Espectros de Potencia empleando
648 648 junkspc_interf = jspectra[ich, :, hei_interf[psort[list(range(
649 649 offhei_interf, nhei_interf + offhei_interf))]]]
650 650
651 651 if noise_exist:
652 652 # tmp_noise = jnoise[ich] / num_prof
653 653 tmp_noise = jnoise[ich]
654 654 junkspc_interf = junkspc_interf - tmp_noise
655 655 #junkspc_interf[:,comp_mask_prof] = 0
656 656
657 657 jspc_interf = junkspc_interf.sum(axis=0) / nhei_interf
658 658 jspc_interf = jspc_interf.transpose()
659 659 # Calculando el espectro de interferencia promedio
660 660 noiseid = numpy.where(
661 661 jspc_interf <= tmp_noise / numpy.sqrt(num_incoh))
662 662 noiseid = noiseid[0]
663 663 cnoiseid = noiseid.size
664 664 interfid = numpy.where(
665 665 jspc_interf > tmp_noise / numpy.sqrt(num_incoh))
666 666 interfid = interfid[0]
667 667 cinterfid = interfid.size
668 668
669 669 if (cnoiseid > 0):
670 670 jspc_interf[noiseid] = 0
671 671
672 672 # Expandiendo los perfiles a limpiar
673 673 if (cinterfid > 0):
674 674 new_interfid = (
675 675 numpy.r_[interfid - 1, interfid, interfid + 1] + num_prof) % num_prof
676 676 new_interfid = numpy.asarray(new_interfid)
677 677 new_interfid = {x for x in new_interfid}
678 678 new_interfid = numpy.array(list(new_interfid))
679 679 new_cinterfid = new_interfid.size
680 680 else:
681 681 new_cinterfid = 0
682 682
683 683 for ip in range(new_cinterfid):
684 684 ind = junkspc_interf[:, new_interfid[ip]].ravel().argsort()
685 685 jspc_interf[new_interfid[ip]
686 686 ] = junkspc_interf[ind[nhei_interf // 2], new_interfid[ip]]
687 687
688 688 jspectra[ich, :, ind_hei] = jspectra[ich, :,
689 689 ind_hei] - jspc_interf # Corregir indices
690 690
691 691 # Removiendo la interferencia del punto de mayor interferencia
692 692 ListAux = jspc_interf[mask_prof].tolist()
693 693 maxid = ListAux.index(max(ListAux))
694 694
695 695 if cinterfid > 0:
696 696 for ip in range(cinterfid * (interf == 2) - 1):
697 697 ind = (jspectra[ich, interfid[ip], :] < tmp_noise *
698 698 (1 + 1 / numpy.sqrt(num_incoh))).nonzero()
699 699 cind = len(ind)
700 700
701 701 if (cind > 0):
702 702 jspectra[ich, interfid[ip], ind] = tmp_noise * \
703 703 (1 + (numpy.random.uniform(cind) - 0.5) /
704 704 numpy.sqrt(num_incoh))
705 705
706 706 ind = numpy.array([-2, -1, 1, 2])
707 707 xx = numpy.zeros([4, 4])
708 708
709 709 for id1 in range(4):
710 710 xx[:, id1] = ind[id1]**numpy.asarray(list(range(4)))
711 711
712 712 xx_inv = numpy.linalg.inv(xx)
713 713 xx = xx_inv[:, 0]
714 714 ind = (ind + maxid + num_mask_prof) % num_mask_prof
715 715 yy = jspectra[ich, mask_prof[ind], :]
716 716 jspectra[ich, mask_prof[maxid], :] = numpy.dot(
717 717 yy.transpose(), xx)
718 718
719 719 indAux = (jspectra[ich, :, :] < tmp_noise *
720 720 (1 - 1 / numpy.sqrt(num_incoh))).nonzero()
721 721 jspectra[ich, indAux[0], indAux[1]] = tmp_noise * \
722 722 (1 - 1 / numpy.sqrt(num_incoh))
723 723
724 724 # Remocion de Interferencia en el Cross Spectra
725 725 if jcspectra is None:
726 726 return jspectra, jcspectra
727 727 num_pairs = int(jcspectra.size / (num_prof * num_hei))
728 728 jcspectra = jcspectra.reshape(num_pairs, num_prof, num_hei)
729 729
730 730 for ip in range(num_pairs):
731 731
732 732 #-------------------------------------------
733 733
734 734 cspower = numpy.abs(jcspectra[ip, mask_prof, :])
735 735 cspower = cspower[:, hei_interf]
736 736 cspower = cspower.sum(axis=0)
737 737
738 738 cspsort = cspower.ravel().argsort()
739 739 junkcspc_interf = jcspectra[ip, :, hei_interf[cspsort[list(range(
740 740 offhei_interf, nhei_interf + offhei_interf))]]]
741 741 junkcspc_interf = junkcspc_interf.transpose()
742 742 jcspc_interf = junkcspc_interf.sum(axis=1) / nhei_interf
743 743
744 744 ind = numpy.abs(jcspc_interf[mask_prof]).ravel().argsort()
745 745
746 746 median_real = int(numpy.median(numpy.real(
747 747 junkcspc_interf[mask_prof[ind[list(range(3 * num_prof // 4))]], :])))
748 748 median_imag = int(numpy.median(numpy.imag(
749 749 junkcspc_interf[mask_prof[ind[list(range(3 * num_prof // 4))]], :])))
750 750 comp_mask_prof = [int(e) for e in comp_mask_prof]
751 751 junkcspc_interf[comp_mask_prof, :] = numpy.complex(
752 752 median_real, median_imag)
753 753
754 754 for iprof in range(num_prof):
755 755 ind = numpy.abs(junkcspc_interf[iprof, :]).ravel().argsort()
756 756 jcspc_interf[iprof] = junkcspc_interf[iprof, ind[nhei_interf // 2]]
757 757
758 758 # Removiendo la Interferencia
759 759 jcspectra[ip, :, ind_hei] = jcspectra[ip,
760 760 :, ind_hei] - jcspc_interf
761 761
762 762 ListAux = numpy.abs(jcspc_interf[mask_prof]).tolist()
763 763 maxid = ListAux.index(max(ListAux))
764 764
765 765 ind = numpy.array([-2, -1, 1, 2])
766 766 xx = numpy.zeros([4, 4])
767 767
768 768 for id1 in range(4):
769 769 xx[:, id1] = ind[id1]**numpy.asarray(list(range(4)))
770 770
771 771 xx_inv = numpy.linalg.inv(xx)
772 772 xx = xx_inv[:, 0]
773 773
774 774 ind = (ind + maxid + num_mask_prof) % num_mask_prof
775 775 yy = jcspectra[ip, mask_prof[ind], :]
776 776 jcspectra[ip, mask_prof[maxid], :] = numpy.dot(yy.transpose(), xx)
777 777
778 778 # Guardar Resultados
779 779 self.dataOut.data_spc = jspectra
780 780 self.dataOut.data_cspc = jcspectra
781 781
782 782 return 1
783 783
784 784 def setRadarFrequency(self, frequency=None):
785 785
786 786 if frequency != None:
787 787 self.dataOut.frequency = frequency
788 788
789 789 return 1
790 790
791 791 def getNoise(self, minHei=None, maxHei=None, minVel=None, maxVel=None):
792 792 # validacion de rango
793 793 if minHei == None:
794 794 minHei = self.dataOut.heightList[0]
795 795
796 796 if maxHei == None:
797 797 maxHei = self.dataOut.heightList[-1]
798 798
799 799 if (minHei < self.dataOut.heightList[0]) or (minHei > maxHei):
800 800 print('minHei: %.2f is out of the heights range' % (minHei))
801 801 print('minHei is setting to %.2f' % (self.dataOut.heightList[0]))
802 802 minHei = self.dataOut.heightList[0]
803 803
804 804 if (maxHei > self.dataOut.heightList[-1]) or (maxHei < minHei):
805 805 print('maxHei: %.2f is out of the heights range' % (maxHei))
806 806 print('maxHei is setting to %.2f' % (self.dataOut.heightList[-1]))
807 807 maxHei = self.dataOut.heightList[-1]
808 808
809 809 # validacion de velocidades
810 810 velrange = self.dataOut.getVelRange(1)
811 811
812 812 if minVel == None:
813 813 minVel = velrange[0]
814 814
815 815 if maxVel == None:
816 816 maxVel = velrange[-1]
817 817
818 818 if (minVel < velrange[0]) or (minVel > maxVel):
819 819 print('minVel: %.2f is out of the velocity range' % (minVel))
820 820 print('minVel is setting to %.2f' % (velrange[0]))
821 821 minVel = velrange[0]
822 822
823 823 if (maxVel > velrange[-1]) or (maxVel < minVel):
824 824 print('maxVel: %.2f is out of the velocity range' % (maxVel))
825 825 print('maxVel is setting to %.2f' % (velrange[-1]))
826 826 maxVel = velrange[-1]
827 827
828 828 # seleccion de indices para rango
829 829 minIndex = 0
830 830 maxIndex = 0
831 831 heights = self.dataOut.heightList
832 832
833 833 inda = numpy.where(heights >= minHei)
834 834 indb = numpy.where(heights <= maxHei)
835 835
836 836 try:
837 837 minIndex = inda[0][0]
838 838 except:
839 839 minIndex = 0
840 840
841 841 try:
842 842 maxIndex = indb[0][-1]
843 843 except:
844 844 maxIndex = len(heights)
845 845
846 846 if (minIndex < 0) or (minIndex > maxIndex):
847 847 raise ValueError("some value in (%d,%d) is not valid" % (
848 848 minIndex, maxIndex))
849 849
850 850 if (maxIndex >= self.dataOut.nHeights):
851 851 maxIndex = self.dataOut.nHeights - 1
852 852
853 853 # seleccion de indices para velocidades
854 854 indminvel = numpy.where(velrange >= minVel)
855 855 indmaxvel = numpy.where(velrange <= maxVel)
856 856 try:
857 857 minIndexVel = indminvel[0][0]
858 858 except:
859 859 minIndexVel = 0
860 860
861 861 try:
862 862 maxIndexVel = indmaxvel[0][-1]
863 863 except:
864 864 maxIndexVel = len(velrange)
865 865
866 866 # seleccion del espectro
867 867 data_spc = self.dataOut.data_spc[:,
868 868 minIndexVel:maxIndexVel + 1, minIndex:maxIndex + 1]
869 869 # estimacion de ruido
870 870 noise = numpy.zeros(self.dataOut.nChannels)
871 871
872 872 for channel in range(self.dataOut.nChannels):
873 873 daux = data_spc[channel, :, :]
874 874 noise[channel] = hildebrand_sekhon(daux, self.dataOut.nIncohInt)
875 875
876 876 self.dataOut.noise_estimation = noise.copy()
877 877
878 878 return 1
879 879
880 880
881 881 class IncohInt(Operation):
882 882
883 883 __profIndex = 0
884 884 __withOverapping = False
885 885
886 886 __byTime = False
887 887 __initime = None
888 888 __lastdatatime = None
889 889 __integrationtime = None
890 890
891 891 __buffer_spc = None
892 892 __buffer_cspc = None
893 893 __buffer_dc = None
894 894
895 895 __dataReady = False
896 896
897 897 __timeInterval = None
898 898
899 899 n = None
900 900
901 901 def __init__(self):
902 902
903 903 Operation.__init__(self)
904 904
905 905 def setup(self, n=None, timeInterval=None, overlapping=False):
906 906 """
907 907 Set the parameters of the integration class.
908 908
909 909 Inputs:
910 910
911 911 n : Number of coherent integrations
912 912 timeInterval : Time of integration. If the parameter "n" is selected this one does not work
913 913 overlapping :
914 914
915 915 """
916 916
917 917 self.__initime = None
918 918 self.__lastdatatime = 0
919 919
920 920 self.__buffer_spc = 0
921 921 self.__buffer_cspc = 0
922 922 self.__buffer_dc = 0
923 923
924 924 self.__profIndex = 0
925 925 self.__dataReady = False
926 926 self.__byTime = False
927 927
928 928 if n is None and timeInterval is None:
929 929 raise ValueError("n or timeInterval should be specified ...")
930 930
931 931 if n is not None:
932 932 self.n = int(n)
933 933 else:
934 934
935 935 self.__integrationtime = int(timeInterval)
936 936 self.n = None
937 937 self.__byTime = True
938 938
939 939 def putData(self, data_spc, data_cspc, data_dc):
940 940 """
941 941 Add a profile to the __buffer_spc and increase in one the __profileIndex
942 942
943 943 """
944 944
945 945 self.__buffer_spc += data_spc
946 946
947 947 if data_cspc is None:
948 948 self.__buffer_cspc = None
949 949 else:
950 950 self.__buffer_cspc += data_cspc
951 951
952 952 if data_dc is None:
953 953 self.__buffer_dc = None
954 954 else:
955 955 self.__buffer_dc += data_dc
956 956
957 957 self.__profIndex += 1
958 958
959 959 return
960 960
961 961 def pushData(self):
962 962 """
963 963 Return the sum of the last profiles and the profiles used in the sum.
964 964
965 965 Affected:
966 966
967 967 self.__profileIndex
968 968
969 969 """
970 970
971 971 data_spc = self.__buffer_spc
972 972 data_cspc = self.__buffer_cspc
973 973 data_dc = self.__buffer_dc
974 974 n = self.__profIndex
975 975
976 976 self.__buffer_spc = 0
977 977 self.__buffer_cspc = 0
978 978 self.__buffer_dc = 0
979 979 self.__profIndex = 0
980 980
981 981 return data_spc, data_cspc, data_dc, n
982 982
983 983 def byProfiles(self, *args):
984 984
985 985 self.__dataReady = False
986 986 avgdata_spc = None
987 987 avgdata_cspc = None
988 988 avgdata_dc = None
989 989
990 990 self.putData(*args)
991 991
992 992 if self.__profIndex == self.n:
993 993
994 994 avgdata_spc, avgdata_cspc, avgdata_dc, n = self.pushData()
995 995 self.n = n
996 996 self.__dataReady = True
997 997
998 998 return avgdata_spc, avgdata_cspc, avgdata_dc
999 999
1000 1000 def byTime(self, datatime, *args):
1001 1001
1002 1002 self.__dataReady = False
1003 1003 avgdata_spc = None
1004 1004 avgdata_cspc = None
1005 1005 avgdata_dc = None
1006 1006
1007 1007 self.putData(*args)
1008 1008
1009 1009 if (datatime - self.__initime) >= self.__integrationtime:
1010 1010 avgdata_spc, avgdata_cspc, avgdata_dc, n = self.pushData()
1011 1011 self.n = n
1012 1012 self.__dataReady = True
1013 1013
1014 1014 return avgdata_spc, avgdata_cspc, avgdata_dc
1015 1015
1016 1016 def integrate(self, datatime, *args):
1017 1017
1018 1018 if self.__profIndex == 0:
1019 1019 self.__initime = datatime
1020 1020
1021 1021 if self.__byTime:
1022 1022 avgdata_spc, avgdata_cspc, avgdata_dc = self.byTime(
1023 1023 datatime, *args)
1024 1024 else:
1025 1025 avgdata_spc, avgdata_cspc, avgdata_dc = self.byProfiles(*args)
1026 1026
1027 1027 if not self.__dataReady:
1028 1028 return None, None, None, None
1029 1029
1030 1030 return self.__initime, avgdata_spc, avgdata_cspc, avgdata_dc
1031 1031
1032 1032 def run(self, dataOut, n=None, timeInterval=None, overlapping=False):
1033 1033 if n == 1:
1034 1034 return
1035 1035
1036 1036 dataOut.flagNoData = True
1037 1037
1038 1038 if not self.isConfig:
1039 1039 self.setup(n, timeInterval, overlapping)
1040 1040 self.isConfig = True
1041 1041
1042 1042 avgdatatime, avgdata_spc, avgdata_cspc, avgdata_dc = self.integrate(dataOut.utctime,
1043 1043 dataOut.data_spc,
1044 1044 dataOut.data_cspc,
1045 1045 dataOut.data_dc)
1046 1046
1047 1047 if self.__dataReady:
1048 1048
1049 1049 dataOut.data_spc = avgdata_spc
1050 1050 dataOut.data_cspc = avgdata_cspc
1051 1051 dataOut.data_dc = avgdata_dc
1052 1052 dataOut.nIncohInt *= self.n
1053 1053 dataOut.utctime = avgdatatime
1054 1054 dataOut.flagNoData = False
1055 1055
1056 1056 return dataOut No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now