utilsIO.py
523 lines
| 17.6 KiB
| text/x-python
|
PythonLexer
r1553 | """ | |||
Utilities for IO modules | ||||
@modified: Joab Apaza | ||||
@email: roj-op01@igp.gob.pe, joab.apaza32@gmail.com | ||||
""" | ||||
################################################################################ | ||||
################################################################################ | ||||
import os | ||||
from datetime import datetime | ||||
import numpy | ||||
from schainpy.model.data.jrodata import Parameters | ||||
import itertools | ||||
import numpy | ||||
import h5py | ||||
import re | ||||
import time | ||||
################################################################################ | ||||
################################################################################ | ||||
################################################################################ | ||||
def folder_in_range(folder, start_date, end_date, pattern): | ||||
""" | ||||
Check whether folder is bettwen start_date and end_date | ||||
Args: | ||||
folder (str): Folder to check | ||||
start_date (date): Initial date | ||||
end_date (date): Final date | ||||
pattern (str): Datetime format of the folder | ||||
Returns: | ||||
bool: True for success, False otherwise | ||||
""" | ||||
try: | ||||
dt = datetime.strptime(folder, pattern) | ||||
except: | ||||
raise ValueError('Folder {} does not match {} format'.format(folder, pattern)) | ||||
return start_date <= dt.date() <= end_date | ||||
################################################################################ | ||||
################################################################################ | ||||
################################################################################ | ||||
def getHei_index( minHei, maxHei, heightList): | ||||
if (minHei < heightList[0]): | ||||
minHei = heightList[0] | ||||
if (maxHei > heightList[-1]): | ||||
maxHei = heightList[-1] | ||||
minIndex = 0 | ||||
maxIndex = 0 | ||||
heights = numpy.asarray(heightList) | ||||
inda = numpy.where(heights >= minHei) | ||||
indb = numpy.where(heights <= maxHei) | ||||
try: | ||||
minIndex = inda[0][0] | ||||
except: | ||||
minIndex = 0 | ||||
try: | ||||
maxIndex = indb[0][-1] | ||||
except: | ||||
maxIndex = len(heightList) | ||||
return minIndex,maxIndex | ||||
################################################################################ | ||||
################################################################################ | ||||
################################################################################ | ||||
class MergeH5(object): | ||||
"""Processing unit to read HDF5 format files | ||||
This unit reads HDF5 files created with `HDFWriter` operation when channels area | ||||
processed by separated. Then merge all channels in a single files. | ||||
"example" | ||||
nChannels = 4 | ||||
pathOut = "/home/soporte/Data/OutTest/clean2D/perpAndObliq/byChannels/merged" | ||||
p0 = "/home/soporte/Data/OutTest/clean2D/perpAndObliq/byChannels/d2022240_Ch0" | ||||
p1 = "/home/soporte/Data/OutTest/clean2D/perpAndObliq/byChannels/d2022240_Ch1" | ||||
p2 = "/home/soporte/Data/OutTest/clean2D/perpAndObliq/byChannels/d2022240_Ch2" | ||||
p3 = "/home/soporte/Data/OutTest/clean2D/perpAndObliq/byChannels/d2022240_Ch3" | ||||
list = ['data_spc','data_cspc','nIncohInt','utctime'] | ||||
merger = MergeH5(nChannels,pathOut,list, p0, p1,p2,p3) | ||||
merger.run() | ||||
""" | ||||
# #__attrs__ = ['paths', 'nChannels'] | ||||
isConfig = False | ||||
inPaths = None | ||||
nChannels = None | ||||
ch_dataIn = [] | ||||
channelList = [] | ||||
def __init__(self,nChannels, pOut, dataList, *args): | ||||
self.inPaths = [p for p in args] | ||||
#print(self.inPaths) | ||||
if len(self.inPaths) != nChannels: | ||||
print("ERROR, number of channels different from iput paths {} != {}".format(nChannels, len(args))) | ||||
return | ||||
self.pathOut = pOut | ||||
self.dataList = dataList | ||||
self.nChannels = len(self.inPaths) | ||||
self.ch_dataIn = [Parameters() for p in args] | ||||
self.dataOut = Parameters() | ||||
self.channelList = [n for n in range(nChannels)] | ||||
self.blocksPerFile = None | ||||
self.date = None | ||||
self.ext = ".hdf5$" | ||||
self.dataList = dataList | ||||
self.optchar = "D" | ||||
self.meta = {} | ||||
self.data = {} | ||||
self.open_file = h5py.File | ||||
self.open_mode = 'r' | ||||
self.description = {} | ||||
self.extras = {} | ||||
self.filefmt = "*%Y%j***" | ||||
self.folderfmt = "*%Y%j" | ||||
def setup(self): | ||||
# if not self.ext.startswith('.'): | ||||
# self.ext = '.{}'.format(self.ext) | ||||
self.filenameList = self.searchFiles(self.inPaths, None) | ||||
self.nfiles = len(self.filenameList[0]) | ||||
def searchFiles(self, paths, date, walk=True): | ||||
# self.paths = path | ||||
#self.date = startDate | ||||
#self.walk = walk | ||||
filenameList = [[] for n in range(self.nChannels)] | ||||
ch = 0 | ||||
for path in paths: | ||||
if os.path.exists(path): | ||||
print("Searching files in {}".format(path)) | ||||
filenameList[ch] = self.getH5files(path, walk) | ||||
print("Found: ") | ||||
for f in filenameList[ch]: | ||||
print(f) | ||||
else: | ||||
self.status = 0 | ||||
print('Path:%s does not exists'%path) | ||||
return 0 | ||||
ch+=1 | ||||
return filenameList | ||||
def getH5files(self, path, walk): | ||||
dirnameList = [] | ||||
pat = '(\d)+.'+self.ext | ||||
if walk: | ||||
for root, dirs, files in os.walk(path): | ||||
for dir in dirs: | ||||
#print(os.path.join(root,dir)) | ||||
files = [re.search(pat,x) for x in os.listdir(os.path.join(root,dir))] | ||||
#print(files) | ||||
files = [x for x in files if x!=None] | ||||
files = [x.string for x in files] | ||||
files = [os.path.join(root,dir,x) for x in files if x!=None] | ||||
files.sort() | ||||
dirnameList += files | ||||
return dirnameList | ||||
else: | ||||
dirnameList = [re.search(pat,x) for x in os.listdir(path)] | ||||
dirnameList = [x for x in dirnameList if x!=None] | ||||
dirnameList = [x.string for x in dirnameList] | ||||
dirnameList = [x for x in dirnameList if x!=None] | ||||
dirnameList.sort() | ||||
return dirnameList | ||||
def readFile(self,fp,ch): | ||||
'''Read metadata and data''' | ||||
self.readMetadata(fp) | ||||
data = self.readData(fp) | ||||
for attr in self.meta: | ||||
setattr(self.ch_dataIn[ch], attr, self.meta[attr]) | ||||
self.fill_dataIn(data, self.ch_dataIn[ch]) | ||||
r1555 | ||||
r1553 | return | |||
def readMetadata(self, fp): | ||||
''' | ||||
Reads Metadata | ||||
''' | ||||
meta = {} | ||||
self.metadataList = [] | ||||
grp = fp['Metadata'] | ||||
for name in grp: | ||||
meta[name] = grp[name][()] | ||||
self.metadataList.append(name) | ||||
for k in meta: | ||||
if ('List' in k): | ||||
meta[k] = meta[k].tolist() | ||||
if not self.meta: | ||||
self.meta = meta | ||||
self.meta["channelList"] =[n for n in range(self.nChannels)] | ||||
return 1 | ||||
else: | ||||
if len(self.meta) == len(meta): | ||||
for k in meta: | ||||
if 'List' in k and 'channel' not in k and "height" not in k: | ||||
self.meta[k] += meta[k] | ||||
return 1 | ||||
else: | ||||
return 0 | ||||
def fill_dataIn(self,data, dataIn): | ||||
for attr in data: | ||||
if data[attr].ndim == 1: | ||||
setattr(dataIn, attr, data[attr][:]) | ||||
else: | ||||
r1555 | setattr(dataIn, attr, numpy.squeeze(data[attr][:,:])) | |||
r1556 | print("shape in", dataIn.data_spc.shape, len(dataIn.data_spc)) | |||
r1553 | if dataIn.data_spc.ndim > 3: | |||
dataIn.data_spc = dataIn.data_spc[0] | ||||
#print("shape in", dataIn.data_spc.shape) | ||||
r1555 | ||||
def getBlocksPerFile(self): | ||||
b = numpy.zeros(self.nChannels) | ||||
for i in range(self.nChannels): | ||||
b[i] = self.ch_dataIn[i].data_spc.shape[0] #number of blocks | ||||
self.blocksPerFile = int(b.min()) | ||||
iresh_ch = numpy.where(b > self.blocksPerFile)[0] | ||||
if len(iresh_ch) > 0: | ||||
for ich in iresh_ch: | ||||
for i in range(len(self.dataList)): | ||||
if hasattr(self.ch_dataIn[ich], self.dataList[i]): | ||||
# print("reshaping ", self.dataList[i]) | ||||
# print(getattr(self.ch_dataIn[ich], self.dataList[i]).shape) | ||||
dataAux = getattr(self.ch_dataIn[ich], self.dataList[i]) | ||||
setattr(self.ch_dataIn[ich], self.dataList[i], None) | ||||
setattr(self.ch_dataIn[ich], self.dataList[i], dataAux[0:self.blocksPerFile]) | ||||
# print(getattr(self.ch_dataIn[ich], self.dataList[i]).shape) | ||||
else: | ||||
return | ||||
r1553 | ||||
def getLabel(self, name, x=None): | ||||
if x is None: | ||||
if 'Data' in self.description: | ||||
data = self.description['Data'] | ||||
if 'Metadata' in self.description: | ||||
data.update(self.description['Metadata']) | ||||
else: | ||||
data = self.description | ||||
if name in data: | ||||
if isinstance(data[name], str): | ||||
return data[name] | ||||
elif isinstance(data[name], list): | ||||
return None | ||||
elif isinstance(data[name], dict): | ||||
for key, value in data[name].items(): | ||||
return key | ||||
return name | ||||
else: | ||||
if 'Metadata' in self.description: | ||||
meta = self.description['Metadata'] | ||||
else: | ||||
meta = self.description | ||||
if name in meta: | ||||
if isinstance(meta[name], list): | ||||
return meta[name][x] | ||||
elif isinstance(meta[name], dict): | ||||
for key, value in meta[name].items(): | ||||
return value[x] | ||||
r1554 | ||||
r1553 | if 'cspc' in name: | |||
return 'pair{:02d}'.format(x) | ||||
else: | ||||
return 'channel{:02d}'.format(x) | ||||
def readData(self, fp): | ||||
#print("read fp: ", fp) | ||||
data = {} | ||||
grp = fp['Data'] | ||||
for name in grp: | ||||
if isinstance(grp[name], h5py.Dataset): | ||||
array = grp[name][()] | ||||
elif isinstance(grp[name], h5py.Group): | ||||
array = [] | ||||
for ch in grp[name]: | ||||
array.append(grp[name][ch][()]) | ||||
array = numpy.array(array) | ||||
else: | ||||
print('Unknown type: {}'.format(name)) | ||||
data[name] = array | ||||
return data | ||||
def getDataOut(self): | ||||
r1556 | self.dataOut = self.ch_dataIn[0].copy() #dataIn #blocks, fft, hei for metadata | |||
if self.dataOut.data_spc.ndim < 3: | ||||
return 0 | ||||
r1553 | self.dataOut.data_spc = None | |||
self.dataOut.utctim = None | ||||
self.dataOut.nIncohInt = None | ||||
#print(self.ch_dataIn[0].data_spc.shape) | ||||
spc = [data.data_spc for data in self.ch_dataIn] | ||||
self.dataOut.data_spc = numpy.stack(spc, axis=1) #blocks, ch, fft, hei | ||||
time = [data.utctime for data in self.ch_dataIn] | ||||
time = numpy.asarray(time).mean(axis=0) | ||||
#time = numpy.reshape(time, (len(time),1)) | ||||
time = numpy.squeeze(time) | ||||
self.dataOut.utctime = time | ||||
ints = [data.nIncohInt for data in self.ch_dataIn] | ||||
self.dataOut.nIncohInt = numpy.stack(ints, axis=1) | ||||
r1554 | ||||
if self.dataOut.nIncohInt.ndim > 3: | ||||
aux = self.dataOut.nIncohInt | ||||
self.dataOut.nIncohInt = None | ||||
self.dataOut.nIncohInt = aux[0] | ||||
r1553 | if self.dataOut.nIncohInt.ndim < 3: | |||
nIncohInt = numpy.repeat(self.dataOut.nIncohInt, self.dataOut.nHeights).reshape(self.blocksPerFile,self.nChannels, self.dataOut.nHeights) | ||||
#nIncohInt = numpy.reshape(nIncohInt, (self.blocksPerFile,self.nChannels, self.dataOut.nHeights)) | ||||
self.dataOut.nIncohInt = None | ||||
self.dataOut.nIncohInt = nIncohInt | ||||
r1554 | ||||
if (self.dataOut.nIncohInt.shape)[0]==self.nChannels: ## ch,blocks, hei | ||||
self.dataOut.nIncohInt = numpy.swapaxes(self.dataOut.nIncohInt, 0, 1) ## blocks,ch, hei | ||||
r1553 | #print("utcTime: ", time.shape) | |||
#print("data_spc ",self.dataOut.data_spc.shape) | ||||
pairsList = [pair for pair in itertools.combinations(self.channelList, 2)] | ||||
#print("PairsList: ", pairsList) | ||||
self.dataOut.pairsList = pairsList | ||||
cspc = [] | ||||
for i, j in pairsList: | ||||
cspc.append(self.ch_dataIn[i].data_spc*numpy.conjugate(self.ch_dataIn[j].data_spc)) #blocks, fft, hei | ||||
cspc = numpy.asarray(cspc) # # pairs, blocks, fft, hei | ||||
#print("cspc: ",cspc.shape) | ||||
self.dataOut.data_cspc = numpy.swapaxes(cspc, 0, 1) ## blocks, pairs, fft, hei | ||||
#print("dataOut.data_cspc: ",self.dataOut.data_cspc.shape) | ||||
r1556 | return 1 | |||
r1553 | ||||
def writeMetadata(self, fp): | ||||
grp = fp.create_group('Metadata') | ||||
for i in range(len(self.metadataList)): | ||||
if not hasattr(self.dataOut, self.metadataList[i]): | ||||
print('Metadata: `{}` not found'.format(self.metadataList[i])) | ||||
continue | ||||
value = getattr(self.dataOut, self.metadataList[i]) | ||||
if isinstance(value, bool): | ||||
if value is True: | ||||
value = 1 | ||||
else: | ||||
value = 0 | ||||
grp.create_dataset(self.getLabel(self.metadataList[i]), data=value) | ||||
return | ||||
def getDsList(self): | ||||
dsList =[] | ||||
for i in range(len(self.dataList)): | ||||
dsDict = {} | ||||
if hasattr(self.dataOut, self.dataList[i]): | ||||
dataAux = getattr(self.dataOut, self.dataList[i]) | ||||
dsDict['variable'] = self.dataList[i] | ||||
else: | ||||
print('Attribute {} not found in dataOut'.format(self.dataList[i])) | ||||
continue | ||||
if dataAux is None: | ||||
continue | ||||
elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)): | ||||
dsDict['nDim'] = 0 | ||||
else: | ||||
r1554 | ||||
r1553 | dsDict['nDim'] = len(dataAux.shape) -1 | |||
dsDict['shape'] = dataAux.shape | ||||
r1554 | ||||
r1553 | if len(dsDict['shape'])>=2: | |||
dsDict['dsNumber'] = dataAux.shape[1] | ||||
else: | ||||
dsDict['dsNumber'] = 1 | ||||
dsDict['dtype'] = dataAux.dtype | ||||
# if len(dataAux.shape) == 4: | ||||
# dsDict['nDim'] = len(dataAux.shape) -1 | ||||
# dsDict['shape'] = dataAux.shape | ||||
# dsDict['dsNumber'] = dataAux.shape[1] | ||||
# dsDict['dtype'] = dataAux.dtype | ||||
# else: | ||||
# dsDict['nDim'] = len(dataAux.shape) | ||||
# dsDict['shape'] = dataAux.shape | ||||
# dsDict['dsNumber'] = dataAux.shape[0] | ||||
# dsDict['dtype'] = dataAux.dtype | ||||
dsList.append(dsDict) | ||||
#print(dsList) | ||||
self.dsList = dsList | ||||
def clean_dataIn(self): | ||||
for ch in range(self.nChannels): | ||||
self.ch_dataIn[ch].data_spc = None | ||||
self.ch_dataIn[ch].utctime = None | ||||
self.ch_dataIn[ch].nIncohInt = None | ||||
self.meta ={} | ||||
r1554 | self.blocksPerFile = None | |||
r1553 | ||||
def writeData(self, outFilename): | ||||
self.getDsList() | ||||
fp = h5py.File(outFilename, 'w') | ||||
self.writeMetadata(fp) | ||||
grp = fp.create_group('Data') | ||||
dtsets = [] | ||||
data = [] | ||||
for dsInfo in self.dsList: | ||||
if dsInfo['nDim'] == 0: | ||||
ds = grp.create_dataset( | ||||
self.getLabel(dsInfo['variable']),(self.blocksPerFile, ),chunks=True,dtype=numpy.float64) | ||||
dtsets.append(ds) | ||||
data.append((dsInfo['variable'], -1)) | ||||
else: | ||||
label = self.getLabel(dsInfo['variable']) | ||||
if label is not None: | ||||
sgrp = grp.create_group(label) | ||||
else: | ||||
sgrp = grp | ||||
r1554 | k = -1*(dsInfo['nDim'] - 1) | |||
#print(k, dsInfo['shape'], dsInfo['shape'][k:]) | ||||
r1553 | for i in range(dsInfo['dsNumber']): | |||
ds = sgrp.create_dataset( | ||||
r1554 | self.getLabel(dsInfo['variable'], i),(self.blocksPerFile, ) + dsInfo['shape'][k:], | |||
r1553 | chunks=True, | |||
dtype=dsInfo['dtype']) | ||||
dtsets.append(ds) | ||||
data.append((dsInfo['variable'], i)) | ||||
r1554 | #print("\n",dtsets) | |||
r1553 | ||||
print('Creating merged file: {}'.format(fp.filename)) | ||||
for i, ds in enumerate(dtsets): | ||||
attr, ch = data[i] | ||||
if ch == -1: | ||||
ds[:] = getattr(self.dataOut, attr) | ||||
else: | ||||
#print(ds, getattr(self.dataOut, attr)[ch].shape) | ||||
aux = getattr(self.dataOut, attr)# block, ch, ... | ||||
aux = numpy.swapaxes(aux,0,1) # ch, blocks, ... | ||||
r1554 | #print(ds.shape, aux.shape) | |||
r1553 | #ds[:] = getattr(self.dataOut, attr)[ch] | |||
ds[:] = aux[ch] | ||||
fp.flush() | ||||
fp.close() | ||||
self.clean_dataIn() | ||||
return | ||||
def run(self): | ||||
if not(self.isConfig): | ||||
self.setup() | ||||
self.isConfig = True | ||||
for nf in range(self.nfiles): | ||||
name = None | ||||
for ch in range(self.nChannels): | ||||
name = self.filenameList[ch][nf] | ||||
filename = os.path.join(self.inPaths[ch], name) | ||||
fp = h5py.File(filename, 'r') | ||||
#print("Opening file: ",filename) | ||||
self.readFile(fp,ch) | ||||
fp.close() | ||||
r1555 | ||||
if self.blocksPerFile == None: | ||||
self.getBlocksPerFile() | ||||
print("blocks per file: ", self.blocksPerFile) | ||||
r1556 | if not self.getDataOut(): | |||
print("Error getting DataOut invalid number of blocks") | ||||
return | ||||
r1554 | name = name[-16:] | |||
r1553 | #print("Final name out: ", name) | |||
outFile = os.path.join(self.pathOut, name) | ||||
#print("Outfile: ", outFile) | ||||
self.writeData(outFile) | ||||