jroIO_kamisr.py
629 lines
| 21.9 KiB
| text/x-python
|
PythonLexer
|
r665 | ''' | ||
Created on Set 9, 2015 | ||||
@author: roj-idl71 Karim Kuyeng | ||||
''' | ||||
import os | ||||
import sys | ||||
import glob | ||||
import fnmatch | ||||
import datetime | ||||
import time | ||||
import re | ||||
import h5py | ||||
import numpy | ||||
try: | ||||
from gevent import sleep | ||||
except: | ||||
from time import sleep | ||||
from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader | ||||
from schainpy.model.data.jrodata import Voltage | ||||
from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation | ||||
from numpy import imag | ||||
class AMISRReader(ProcessingUnit): | ||||
''' | ||||
classdocs | ||||
''' | ||||
def __init__(self): | ||||
''' | ||||
Constructor | ||||
''' | ||||
ProcessingUnit.__init__(self) | ||||
self.set = None | ||||
self.subset = None | ||||
self.extension_file = '.h5' | ||||
self.dtc_str = 'dtc' | ||||
self.dtc_id = 0 | ||||
self.status = True | ||||
self.isConfig = False | ||||
self.dirnameList = [] | ||||
self.filenameList = [] | ||||
self.fileIndex = None | ||||
self.flagNoMoreFiles = False | ||||
self.flagIsNewFile = 0 | ||||
self.filename = '' | ||||
self.amisrFilePointer = None | ||||
self.dataset = None | ||||
self.profileIndex = 0 | ||||
self.beamCodeByFrame = None | ||||
self.radacTimeByFrame = None | ||||
self.dataset = None | ||||
self.__firstFile = True | ||||
self.buffer = None | ||||
self.timezone = 'ut' | ||||
self.__waitForNewFile = 20 | ||||
self.__filename_online = None | ||||
#Is really necessary create the output object in the initializer | ||||
self.dataOut = Voltage() | ||||
def setup(self,path=None, | ||||
startDate=None, | ||||
endDate=None, | ||||
startTime=None, | ||||
endTime=None, | ||||
walk=True, | ||||
timezone='ut', | ||||
all=0, | ||||
code = None, | ||||
nCode = 0, | ||||
nBaud = 0, | ||||
online=False): | ||||
self.timezone = timezone | ||||
self.all = all | ||||
self.online = online | ||||
self.code = code | ||||
self.nCode = int(nCode) | ||||
self.nBaud = int(nBaud) | ||||
#self.findFiles() | ||||
if not(online): | ||||
#Busqueda de archivos offline | ||||
|
r1052 | self.searchFilesOffLine(path, startDate, endDate, startTime, endTime, walk) | ||
|
r665 | else: | ||
|
r1052 | self.searchFilesOnLine(path, startDate, endDate, startTime,endTime,walk) | ||
|
r665 | |||
if not(self.filenameList): | ||||
|
r1167 | print("There is no files into the folder: %s"%(path)) | ||
|
r665 | |||
sys.exit(-1) | ||||
self.fileIndex = -1 | ||||
self.readNextFile(online) | ||||
''' | ||||
Add code | ||||
''' | ||||
self.isConfig = True | ||||
pass | ||||
def readAMISRHeader(self,fp): | ||||
header = 'Raw11/Data/RadacHeader' | ||||
self.beamCodeByPulse = fp.get(header+'/BeamCode') # LIST OF BEAMS PER PROFILE, TO BE USED ON REARRANGE | ||||
self.beamCode = fp.get('Raw11/Data/Beamcodes') # NUMBER OF CHANNELS AND IDENTIFY POSITION TO CREATE A FILE WITH THAT INFO | ||||
#self.code = fp.get(header+'/Code') # NOT USE FOR THIS | ||||
self.frameCount = fp.get(header+'/FrameCount')# NOT USE FOR THIS | ||||
self.modeGroup = fp.get(header+'/ModeGroup')# NOT USE FOR THIS | ||||
self.nsamplesPulse = fp.get(header+'/NSamplesPulse')# TO GET NSA OR USING DATA FOR THAT | ||||
self.pulseCount = fp.get(header+'/PulseCount')# NOT USE FOR THIS | ||||
self.radacTime = fp.get(header+'/RadacTime')# 1st TIME ON FILE ANDE CALCULATE THE REST WITH IPP*nindexprofile | ||||
self.timeCount = fp.get(header+'/TimeCount')# NOT USE FOR THIS | ||||
self.timeStatus = fp.get(header+'/TimeStatus')# NOT USE FOR THIS | ||||
self.rangeFromFile = fp.get('Raw11/Data/Samples/Range') | ||||
self.frequency = fp.get('Rx/Frequency') | ||||
txAus = fp.get('Raw11/Data/Pulsewidth') | ||||
self.nblocks = self.pulseCount.shape[0] #nblocks | ||||
self.nprofiles = self.pulseCount.shape[1] #nprofile | ||||
self.nsa = self.nsamplesPulse[0,0] #ngates | ||||
self.nchannels = self.beamCode.shape[1] | ||||
self.ippSeconds = (self.radacTime[0][1] -self.radacTime[0][0]) #Ipp in seconds | ||||
#self.__waitForNewFile = self.nblocks # wait depending on the number of blocks since each block is 1 sec | ||||
self.__waitForNewFile = self.nblocks * self.nprofiles * self.ippSeconds # wait until new file is created | ||||
#filling radar controller header parameters | ||||
self.__ippKm = self.ippSeconds *.15*1e6 # in km | ||||
self.__txA = (txAus.value)*.15 #(ipp[us]*.15km/1us) in km | ||||
self.__txB = 0 | ||||
nWindows=1 | ||||
self.__nSamples = self.nsa | ||||
self.__firstHeight = self.rangeFromFile[0][0]/1000 #in km | ||||
self.__deltaHeight = (self.rangeFromFile[0][1] - self.rangeFromFile[0][0])/1000 | ||||
#for now until understand why the code saved is different (code included even though code not in tuf file) | ||||
#self.__codeType = 0 | ||||
# self.__nCode = None | ||||
# self.__nBaud = None | ||||
self.__code = self.code | ||||
self.__codeType = 0 | ||||
if self.code != None: | ||||
self.__codeType = 1 | ||||
self.__nCode = self.nCode | ||||
self.__nBaud = self.nBaud | ||||
#self.__code = 0 | ||||
#filling system header parameters | ||||
self.__nSamples = self.nsa | ||||
self.newProfiles = self.nprofiles/self.nchannels | ||||
|
r1167 | self.__channelList = list(range(self.nchannels)) | ||
|
r665 | |||
self.__frequency = self.frequency[0][0] | ||||
def createBuffers(self): | ||||
pass | ||||
def __setParameters(self,path='', startDate='',endDate='',startTime='', endTime='', walk=''): | ||||
self.path = path | ||||
self.startDate = startDate | ||||
self.endDate = endDate | ||||
self.startTime = startTime | ||||
self.endTime = endTime | ||||
self.walk = walk | ||||
def __checkPath(self): | ||||
if os.path.exists(self.path): | ||||
self.status = 1 | ||||
else: | ||||
self.status = 0 | ||||
|
r1167 | print('Path:%s does not exists'%self.path) | ||
|
r665 | |||
return | ||||
def __selDates(self, amisr_dirname_format): | ||||
try: | ||||
year = int(amisr_dirname_format[0:4]) | ||||
month = int(amisr_dirname_format[4:6]) | ||||
dom = int(amisr_dirname_format[6:8]) | ||||
thisDate = datetime.date(year,month,dom) | ||||
if (thisDate>=self.startDate and thisDate <= self.endDate): | ||||
return amisr_dirname_format | ||||
except: | ||||
return None | ||||
def __findDataForDates(self,online=False): | ||||
if not(self.status): | ||||
return None | ||||
pat = '\d+.\d+' | ||||
dirnameList = [re.search(pat,x) for x in os.listdir(self.path)] | ||||
|
r1167 | dirnameList = [x for x in dirnameList if x!=None] | ||
|
r665 | dirnameList = [x.string for x in dirnameList] | ||
if not(online): | ||||
dirnameList = [self.__selDates(x) for x in dirnameList] | ||||
|
r1167 | dirnameList = [x for x in dirnameList if x!=None] | ||
|
r665 | if len(dirnameList)>0: | ||
self.status = 1 | ||||
self.dirnameList = dirnameList | ||||
self.dirnameList.sort() | ||||
else: | ||||
self.status = 0 | ||||
return None | ||||
def __getTimeFromData(self): | ||||
startDateTime_Reader = datetime.datetime.combine(self.startDate,self.startTime) | ||||
endDateTime_Reader = datetime.datetime.combine(self.endDate,self.endTime) | ||||
|
r1167 | print('Filtering Files from %s to %s'%(startDateTime_Reader, endDateTime_Reader)) | ||
print('........................................') | ||||
|
r665 | filter_filenameList = [] | ||
self.filenameList.sort() | ||||
#for i in range(len(self.filenameList)-1): | ||||
for i in range(len(self.filenameList)): | ||||
filename = self.filenameList[i] | ||||
fp = h5py.File(filename,'r') | ||||
time_str = fp.get('Time/RadacTimeString') | ||||
startDateTimeStr_File = time_str[0][0].split('.')[0] | ||||
junk = time.strptime(startDateTimeStr_File, '%Y-%m-%d %H:%M:%S') | ||||
startDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec) | ||||
endDateTimeStr_File = time_str[-1][-1].split('.')[0] | ||||
junk = time.strptime(endDateTimeStr_File, '%Y-%m-%d %H:%M:%S') | ||||
endDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec) | ||||
fp.close() | ||||
if self.timezone == 'lt': | ||||
startDateTime_File = startDateTime_File - datetime.timedelta(minutes = 300) | ||||
endDateTime_File = endDateTime_File - datetime.timedelta(minutes = 300) | ||||
if (endDateTime_File>=startDateTime_Reader and endDateTime_File<endDateTime_Reader): | ||||
#self.filenameList.remove(filename) | ||||
filter_filenameList.append(filename) | ||||
if (endDateTime_File>=endDateTime_Reader): | ||||
break | ||||
filter_filenameList.sort() | ||||
self.filenameList = filter_filenameList | ||||
return 1 | ||||
def __filterByGlob1(self, dirName): | ||||
filter_files = glob.glob1(dirName, '*.*%s'%self.extension_file) | ||||
filter_files.sort() | ||||
filterDict = {} | ||||
filterDict.setdefault(dirName) | ||||
filterDict[dirName] = filter_files | ||||
return filterDict | ||||
def __getFilenameList(self, fileListInKeys, dirList): | ||||
for value in fileListInKeys: | ||||
|
r1167 | dirName = list(value.keys())[0] | ||
|
r665 | for file in value[dirName]: | ||
filename = os.path.join(dirName, file) | ||||
self.filenameList.append(filename) | ||||
def __selectDataForTimes(self, online=False): | ||||
#aun no esta implementado el filtro for tiempo | ||||
if not(self.status): | ||||
return None | ||||
dirList = [os.path.join(self.path,x) for x in self.dirnameList] | ||||
fileListInKeys = [self.__filterByGlob1(x) for x in dirList] | ||||
self.__getFilenameList(fileListInKeys, dirList) | ||||
if not(online): | ||||
#filtro por tiempo | ||||
if not(self.all): | ||||
self.__getTimeFromData() | ||||
if len(self.filenameList)>0: | ||||
self.status = 1 | ||||
self.filenameList.sort() | ||||
else: | ||||
self.status = 0 | ||||
return None | ||||
else: | ||||
#get the last file - 1 | ||||
self.filenameList = [self.filenameList[-2]] | ||||
new_dirnameList = [] | ||||
for dirname in self.dirnameList: | ||||
junk = numpy.array([dirname in x for x in self.filenameList]) | ||||
junk_sum = junk.sum() | ||||
if junk_sum > 0: | ||||
new_dirnameList.append(dirname) | ||||
self.dirnameList = new_dirnameList | ||||
return 1 | ||||
|
r1052 | def searchFilesOnLine(self, path, startDate, endDate, startTime=datetime.time(0,0,0), | ||
|
r665 | endTime=datetime.time(23,59,59),walk=True): | ||
if endDate ==None: | ||||
startDate = datetime.datetime.utcnow().date() | ||||
endDate = datetime.datetime.utcnow().date() | ||||
self.__setParameters(path=path, startDate=startDate, endDate=endDate,startTime = startTime,endTime=endTime, walk=walk) | ||||
self.__checkPath() | ||||
self.__findDataForDates(online=True) | ||||
self.dirnameList = [self.dirnameList[-1]] | ||||
self.__selectDataForTimes(online=True) | ||||
return | ||||
|
r1052 | def searchFilesOffLine(self, | ||
|
r665 | path, | ||
startDate, | ||||
endDate, | ||||
startTime=datetime.time(0,0,0), | ||||
endTime=datetime.time(23,59,59), | ||||
walk=True): | ||||
self.__setParameters(path, startDate, endDate, startTime, endTime, walk) | ||||
self.__checkPath() | ||||
self.__findDataForDates() | ||||
self.__selectDataForTimes() | ||||
for i in range(len(self.filenameList)): | ||||
|
r1167 | print("%s" %(self.filenameList[i])) | ||
|
r665 | |||
return | ||||
def __setNextFileOffline(self): | ||||
idFile = self.fileIndex | ||||
while (True): | ||||
idFile += 1 | ||||
if not(idFile < len(self.filenameList)): | ||||
self.flagNoMoreFiles = 1 | ||||
|
r1167 | print("No more Files") | ||
|
r665 | return 0 | ||
filename = self.filenameList[idFile] | ||||
amisrFilePointer = h5py.File(filename,'r') | ||||
break | ||||
self.flagIsNewFile = 1 | ||||
self.fileIndex = idFile | ||||
self.filename = filename | ||||
self.amisrFilePointer = amisrFilePointer | ||||
|
r1167 | print("Setting the file: %s"%self.filename) | ||
|
r665 | |||
return 1 | ||||
def __setNextFileOnline(self): | ||||
filename = self.filenameList[0] | ||||
if self.__filename_online != None: | ||||
self.__selectDataForTimes(online=True) | ||||
filename = self.filenameList[0] | ||||
wait = 0 | ||||
while self.__filename_online == filename: | ||||
|
r1167 | print('waiting %d seconds to get a new file...'%(self.__waitForNewFile)) | ||
|
r665 | if wait == 5: | ||
return 0 | ||||
sleep(self.__waitForNewFile) | ||||
self.__selectDataForTimes(online=True) | ||||
filename = self.filenameList[0] | ||||
wait += 1 | ||||
self.__filename_online = filename | ||||
self.amisrFilePointer = h5py.File(filename,'r') | ||||
self.flagIsNewFile = 1 | ||||
self.filename = filename | ||||
|
r1167 | print("Setting the file: %s"%self.filename) | ||
|
r665 | return 1 | ||
def readData(self): | ||||
buffer = self.amisrFilePointer.get('Raw11/Data/Samples/Data') | ||||
re = buffer[:,:,:,0] | ||||
im = buffer[:,:,:,1] | ||||
dataset = re + im*1j | ||||
self.radacTime = self.amisrFilePointer.get('Raw11/Data/RadacHeader/RadacTime') | ||||
timeset = self.radacTime[:,0] | ||||
return dataset,timeset | ||||
def reshapeData(self): | ||||
#self.beamCodeByPulse, self.beamCode, self.nblocks, self.nprofiles, self.nsa, | ||||
channels = self.beamCodeByPulse[0,:] | ||||
nchan = self.nchannels | ||||
#self.newProfiles = self.nprofiles/nchan #must be defined on filljroheader | ||||
nblocks = self.nblocks | ||||
nsamples = self.nsa | ||||
#Dimensions : nChannels, nProfiles, nSamples | ||||
new_block = numpy.empty((nblocks, nchan, self.newProfiles, nsamples), dtype="complex64") | ||||
############################################ | ||||
for thisChannel in range(nchan): | ||||
new_block[:,thisChannel,:,:] = self.dataset[:,numpy.where(channels==self.beamCode[0][thisChannel])[0],:] | ||||
new_block = numpy.transpose(new_block, (1,0,2,3)) | ||||
new_block = numpy.reshape(new_block, (nchan,-1, nsamples)) | ||||
return new_block | ||||
def updateIndexes(self): | ||||
pass | ||||
def fillJROHeader(self): | ||||
#fill radar controller header | ||||
self.dataOut.radarControllerHeaderObj = RadarControllerHeader(ippKm=self.__ippKm, | ||||
txA=self.__txA, | ||||
txB=0, | ||||
nWindows=1, | ||||
nHeights=self.__nSamples, | ||||
firstHeight=self.__firstHeight, | ||||
deltaHeight=self.__deltaHeight, | ||||
codeType=self.__codeType, | ||||
nCode=self.__nCode, nBaud=self.__nBaud, | ||||
code = self.__code, | ||||
fClock=1) | ||||
#fill system header | ||||
self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples, | ||||
nProfiles=self.newProfiles, | ||||
nChannels=len(self.__channelList), | ||||
adcResolution=14, | ||||
pciDioBusWith=32) | ||||
self.dataOut.type = "Voltage" | ||||
self.dataOut.data = None | ||||
self.dataOut.dtype = numpy.dtype([('real','<i8'),('imag','<i8')]) | ||||
# self.dataOut.nChannels = 0 | ||||
# self.dataOut.nHeights = 0 | ||||
self.dataOut.nProfiles = self.newProfiles*self.nblocks | ||||
#self.dataOut.heightList = self.__firstHeigth + numpy.arange(self.__nSamples, dtype = numpy.float)*self.__deltaHeigth | ||||
ranges = numpy.reshape(self.rangeFromFile.value,(-1)) | ||||
self.dataOut.heightList = ranges/1000.0 #km | ||||
self.dataOut.channelList = self.__channelList | ||||
r1338 | self.dataOut.blocksize = self.dataOut.nChannels * self.dataOut.nHeights | |||
|
r665 | |||
# self.dataOut.channelIndexList = None | ||||
self.dataOut.flagNoData = True | ||||
#Set to TRUE if the data is discontinuous | ||||
self.dataOut.flagDiscontinuousBlock = False | ||||
self.dataOut.utctime = None | ||||
#self.dataOut.timeZone = -5 #self.__timezone/60 #timezone like jroheader, difference in minutes between UTC and localtime | ||||
if self.timezone == 'lt': | ||||
self.dataOut.timeZone = time.timezone / 60. #get the timezone in minutes | ||||
else: | ||||
self.dataOut.timeZone = 0 #by default time is UTC | ||||
self.dataOut.dstFlag = 0 | ||||
self.dataOut.errorCount = 0 | ||||
self.dataOut.nCohInt = 1 | ||||
self.dataOut.flagDecodeData = False #asumo que la data esta decodificada | ||||
self.dataOut.flagDeflipData = False #asumo que la data esta sin flip | ||||
self.dataOut.flagShiftFFT = False | ||||
self.dataOut.ippSeconds = self.ippSeconds | ||||
#Time interval between profiles | ||||
#self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt | ||||
self.dataOut.frequency = self.__frequency | ||||
self.dataOut.realtime = self.online | ||||
pass | ||||
def readNextFile(self,online=False): | ||||
if not(online): | ||||
newFile = self.__setNextFileOffline() | ||||
else: | ||||
newFile = self.__setNextFileOnline() | ||||
if not(newFile): | ||||
return 0 | ||||
#if self.__firstFile: | ||||
self.readAMISRHeader(self.amisrFilePointer) | ||||
self.createBuffers() | ||||
self.fillJROHeader() | ||||
#self.__firstFile = False | ||||
self.dataset,self.timeset = self.readData() | ||||
if self.endDate!=None: | ||||
endDateTime_Reader = datetime.datetime.combine(self.endDate,self.endTime) | ||||
time_str = self.amisrFilePointer.get('Time/RadacTimeString') | ||||
startDateTimeStr_File = time_str[0][0].split('.')[0] | ||||
junk = time.strptime(startDateTimeStr_File, '%Y-%m-%d %H:%M:%S') | ||||
startDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec) | ||||
if self.timezone == 'lt': | ||||
startDateTime_File = startDateTime_File - datetime.timedelta(minutes = 300) | ||||
if (startDateTime_File>endDateTime_Reader): | ||||
return 0 | ||||
self.jrodataset = self.reshapeData() | ||||
#----self.updateIndexes() | ||||
self.profileIndex = 0 | ||||
return 1 | ||||
def __hasNotDataInBuffer(self): | ||||
if self.profileIndex >= (self.newProfiles*self.nblocks): | ||||
return 1 | ||||
return 0 | ||||
def getData(self): | ||||
if self.flagNoMoreFiles: | ||||
self.dataOut.flagNoData = True | ||||
return 0 | ||||
if self.__hasNotDataInBuffer(): | ||||
if not (self.readNextFile(self.online)): | ||||
return 0 | ||||
if self.dataset is None: # setear esta condicion cuando no hayan datos por leers | ||||
self.dataOut.flagNoData = True | ||||
return 0 | ||||
#self.dataOut.data = numpy.reshape(self.jrodataset[self.profileIndex,:],(1,-1)) | ||||
self.dataOut.data = self.jrodataset[:,self.profileIndex,:] | ||||
#self.dataOut.utctime = self.jrotimeset[self.profileIndex] | ||||
#verificar basic header de jro data y ver si es compatible con este valor | ||||
#self.dataOut.utctime = self.timeset + (self.profileIndex * self.ippSeconds * self.nchannels) | ||||
indexprof = numpy.mod(self.profileIndex, self.newProfiles) | ||||
indexblock = self.profileIndex/self.newProfiles | ||||
#print indexblock, indexprof | ||||
self.dataOut.utctime = self.timeset[indexblock] + (indexprof * self.ippSeconds * self.nchannels) | ||||
self.dataOut.profileIndex = self.profileIndex | ||||
self.dataOut.flagNoData = False | ||||
# if indexprof == 0: | ||||
# print self.dataOut.utctime | ||||
self.profileIndex += 1 | ||||
return self.dataOut.data | ||||
def run(self, **kwargs): | ||||
''' | ||||
This method will be called many times so here you should put all your code | ||||
''' | ||||
if not self.isConfig: | ||||
self.setup(**kwargs) | ||||
self.isConfig = True | ||||
self.getData() | ||||