jroIO_param.py
651 lines
| 20.0 KiB
| text/x-python
|
PythonLexer
|
r848 | import os | ||
|
r1326 | import time | ||
|
r848 | import datetime | ||
|
r1326 | import numpy | ||
import h5py | ||||
r1241 | import schainpy.admin | |||
|
r848 | from schainpy.model.data.jrodata import * | ||
|
r1179 | from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator | ||
|
r848 | from schainpy.model.io.jroIO_base import * | ||
|
r1179 | from schainpy.utils import log | ||
|
r848 | |||
|
r897 | |||
|
r1326 | class HDFReader(Reader, ProcessingUnit): | ||
"""Processing unit to read HDF5 format files | ||||
This unit reads HDF5 files created with `HDFWriter` operation contains | ||||
by default two groups Data and Metadata all variables would be saved as `dataOut` | ||||
r1370 | attributes. | |||
|
r1326 | It is possible to read any HDF5 file by given the structure in the `description` | ||
parameter, also you can add extra values to metadata with the parameter `extras`. | ||||
Parameters: | ||||
----------- | ||||
path : str | ||||
Path where files are located. | ||||
startDate : date | ||||
Start date of the files | ||||
endDate : list | ||||
End date of the files | ||||
startTime : time | ||||
Start time of the files | ||||
endTime : time | ||||
End time of the files | ||||
description : dict, optional | ||||
Dictionary with the description of the HDF5 file | ||||
extras : dict, optional | ||||
Dictionary with extra metadata to be be added to `dataOut` | ||||
r1370 | ||||
|
r1326 | Examples | ||
-------- | ||||
r1370 | ||||
|
r1326 | desc = { | ||
'Data': { | ||||
'data_output': ['u', 'v', 'w'], | ||||
'utctime': 'timestamps', | ||||
} , | ||||
'Metadata': { | ||||
'heightList': 'heights' | ||||
} | ||||
} | ||||
desc = { | ||||
'Data': { | ||||
'data_output': 'winds', | ||||
'utctime': 'timestamps' | ||||
}, | ||||
'Metadata': { | ||||
'heightList': 'heights' | ||||
} | ||||
} | ||||
extras = { | ||||
'timeZone': 300 | ||||
} | ||||
r1370 | ||||
|
r1326 | reader = project.addReadUnit( | ||
name='HDFReader', | ||||
path='/path/to/files', | ||||
startDate='2019/01/01', | ||||
endDate='2019/01/31', | ||||
startTime='00:00:00', | ||||
endTime='23:59:59', | ||||
# description=json.dumps(desc), | ||||
# extras=json.dumps(extras), | ||||
) | ||||
|
r1232 | |||
|
r1326 | """ | ||
|
r1287 | |||
|
r1326 | __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras'] | ||
|
r1232 | |||
def __init__(self): | ||||
ProcessingUnit.__init__(self) | ||||
self.dataOut = Parameters() | ||||
r1254 | self.ext = ".hdf5" | |||
self.optchar = "D" | ||||
|
r1326 | self.meta = {} | ||
self.data = {} | ||||
r1254 | self.open_file = h5py.File | |||
self.open_mode = 'r' | ||||
|
r1326 | self.description = {} | ||
self.extras = {} | ||||
r1254 | self.filefmt = "*%Y%j***" | |||
self.folderfmt = "*%Y%j" | ||||
r1363 | self.utcoffset = 0 | |||
|
r1232 | |||
def setup(self, **kwargs): | ||||
r1254 | self.set_kwargs(**kwargs) | |||
if not self.ext.startswith('.'): | ||||
r1279 | self.ext = '.{}'.format(self.ext) | |||
|
r1232 | |||
r1254 | if self.online: | |||
log.log("Searching files in online mode...", self.name) | ||||
|
r1232 | |||
r1254 | for nTries in range(self.nTries): | |||
fullpath = self.searchFilesOnLine(self.path, self.startDate, | ||||
r1279 | self.endDate, self.expLabel, self.ext, self.walk, | |||
r1254 | self.filefmt, self.folderfmt) | |||
r1391 | pathname, filename = os.path.split(fullpath) | |||
print(pathname,filename) | ||||
r1254 | try: | |||
fullpath = next(fullpath) | ||||
r1391 | ||||
r1254 | except: | |||
fullpath = None | ||||
r1279 | ||||
r1254 | if fullpath: | |||
break | ||||
log.warning( | ||||
'Waiting {} sec for a valid file in {}: try {} ...'.format( | ||||
r1279 | self.delay, self.path, nTries + 1), | |||
r1254 | self.name) | |||
time.sleep(self.delay) | ||||
if not(fullpath): | ||||
raise schainpy.admin.SchainError( | ||||
r1279 | 'There isn\'t any valid file in {}'.format(self.path)) | |||
r1254 | ||||
pathname, filename = os.path.split(fullpath) | ||||
self.year = int(filename[1:5]) | ||||
self.doy = int(filename[5:8]) | ||||
r1279 | self.set = int(filename[8:11]) - 1 | |||
|
r1232 | else: | ||
r1254 | log.log("Searching files in {}".format(self.path), self.name) | |||
r1279 | self.filenameList = self.searchFilesOffLine(self.path, self.startDate, | |||
r1254 | self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt) | |||
r1279 | ||||
r1254 | self.setNextFile() | |||
|
r1232 | |||
r1254 | return | |||
|
r1232 | |||
r1391 | ||||
r1254 | def readFirstHeader(self): | |||
'''Read metadata and data''' | ||||
|
r1232 | |||
r1279 | self.__readMetadata() | |||
|
r1232 | self.__readData() | ||
r1254 | self.__setBlockList() | |||
r1370 | ||||
|
r1326 | if 'type' in self.meta: | ||
self.dataOut = eval(self.meta['type'])() | ||||
r1370 | ||||
|
r1326 | for attr in self.meta: | ||
r1391 | print("attr: ", attr) | |||
|
r1326 | setattr(self.dataOut, attr, self.meta[attr]) | ||
r1370 | ||||
r1391 | ||||
|
r1232 | self.blockIndex = 0 | ||
r1279 | ||||
r1254 | return | |||
|
r1232 | |||
def __setBlockList(self): | ||||
''' | ||||
Selects the data within the times defined | ||||
self.fp | ||||
self.startTime | ||||
self.endTime | ||||
self.blockList | ||||
self.blocksPerFile | ||||
''' | ||||
r1254 | ||||
|
r1232 | startTime = self.startTime | ||
endTime = self.endTime | ||||
r1363 | thisUtcTime = self.data['utctime'] + self.utcoffset | |||
r1254 | self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1]) | |||
|
r1326 | thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0]) | ||
r1391 | self.startFileDatetime = thisDatetime | |||
print("datee ",self.startFileDatetime) | ||||
|
r1232 | thisDate = thisDatetime.date() | ||
thisTime = thisDatetime.time() | ||||
|
r1326 | startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds() | ||
endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds() | ||||
|
r1232 | |||
ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0] | ||||
self.blockList = ind | ||||
self.blocksPerFile = len(ind) | ||||
r1391 | self.blocksPerFile = len(thisUtcTime) | |||
|
r1232 | return | ||
def __readMetadata(self): | ||||
''' | ||||
Reads Metadata | ||||
''' | ||||
|
r1326 | meta = {} | ||
if self.description: | ||||
for key, value in self.description['Metadata'].items(): | ||||
r1360 | meta[key] = self.fp[value][()] | |||
r1254 | else: | |||
|
r1326 | grp = self.fp['Metadata'] | ||
for name in grp: | ||||
r1360 | meta[name] = grp[name][()] | |||
|
r1232 | |||
|
r1326 | if self.extras: | ||
for key, value in self.extras.items(): | ||||
meta[key] = value | ||||
self.meta = meta | ||||
|
r1232 | |||
return | ||||
r1391 | ||||
def checkForRealPath(self, nextFile, nextDay): | ||||
# print("check FRP") | ||||
# dt = self.startFileDatetime + datetime.timedelta(1) | ||||
# filename = '{}.{}{}'.format(self.path, dt.strftime('%Y%m%d'), self.ext) | ||||
# fullfilename = os.path.join(self.path, filename) | ||||
# print("check Path ",fullfilename,filename) | ||||
# if os.path.exists(fullfilename): | ||||
# return fullfilename, filename | ||||
# return None, filename | ||||
return None,None | ||||
|
r1232 | def __readData(self): | ||
|
r1326 | data = {} | ||
r1370 | ||||
|
r1326 | if self.description: | ||
for key, value in self.description['Data'].items(): | ||||
if isinstance(value, str): | ||||
if isinstance(self.fp[value], h5py.Dataset): | ||||
r1360 | data[key] = self.fp[value][()] | |||
|
r1326 | elif isinstance(self.fp[value], h5py.Group): | ||
array = [] | ||||
for ch in self.fp[value]: | ||||
r1360 | array.append(self.fp[value][ch][()]) | |||
|
r1326 | data[key] = numpy.array(array) | ||
elif isinstance(value, list): | ||||
array = [] | ||||
for ch in value: | ||||
r1360 | array.append(self.fp[ch][()]) | |||
|
r1326 | data[key] = numpy.array(array) | ||
else: | ||||
r1254 | grp = self.fp['Data'] | |||
|
r1326 | for name in grp: | ||
if isinstance(grp[name], h5py.Dataset): | ||||
r1360 | array = grp[name][()] | |||
|
r1326 | elif isinstance(grp[name], h5py.Group): | ||
r1254 | array = [] | |||
|
r1326 | for ch in grp[name]: | ||
r1360 | array.append(grp[name][ch][()]) | |||
r1254 | array = numpy.array(array) | |||
else: | ||||
|
r1326 | log.warning('Unknown type: {}'.format(name)) | ||
r1279 | ||||
|
r1326 | if name in self.description: | ||
key = self.description[name] | ||||
r1254 | else: | |||
|
r1326 | key = name | ||
data[key] = array | ||||
|
r1232 | |||
|
r1326 | self.data = data | ||
|
r1232 | return | ||
r1279 | ||||
|
r1232 | def getData(self): | ||
r1391 | if not self.isDateTimeInRange(self.startFileDatetime, self.startDate, self.endDate, self.startTime, self.endTime): | |||
self.dataOut.flagNoData = True | ||||
self.dataOut.error = True | ||||
return | ||||
|
r1326 | for attr in self.data: | ||
if self.data[attr].ndim == 1: | ||||
setattr(self.dataOut, attr, self.data[attr][self.blockIndex]) | ||||
|
r1232 | else: | ||
|
r1326 | setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex]) | ||
|
r1232 | |||
self.dataOut.flagNoData = False | ||||
self.blockIndex += 1 | ||||
|
r1326 | log.log("Block No. {}/{} -> {}".format( | ||
self.blockIndex, | ||||
self.blocksPerFile, | ||||
self.dataOut.datatime.ctime()), self.name) | ||||
|
r1232 | return | ||
def run(self, **kwargs): | ||||
if not(self.isConfig): | ||||
self.setup(**kwargs) | ||||
self.isConfig = True | ||||
if self.blockIndex == self.blocksPerFile: | ||||
r1254 | self.setNextFile() | |||
|
r1232 | |||
self.getData() | ||||
return | ||||
@MPDecorator | ||||
|
r1326 | class HDFWriter(Operation): | ||
"""Operation to write HDF5 files. | ||||
The HDF5 file contains by default two groups Data and Metadata where | ||||
you can save any `dataOut` attribute specified by `dataList` and `metadataList` | ||||
parameters, data attributes are normaly time dependent where the metadata | ||||
r1370 | are not. | |||
It is possible to customize the structure of the HDF5 file with the | ||||
|
r1326 | optional description parameter see the examples. | ||
Parameters: | ||||
----------- | ||||
path : str | ||||
Path where files will be saved. | ||||
blocksPerFile : int | ||||
Number of blocks per file | ||||
metadataList : list | ||||
List of the dataOut attributes that will be saved as metadata | ||||
dataList : int | ||||
List of the dataOut attributes that will be saved as data | ||||
setType : bool | ||||
If True the name of the files corresponds to the timestamp of the data | ||||
description : dict, optional | ||||
Dictionary with the desired description of the HDF5 file | ||||
r1370 | ||||
|
r1326 | Examples | ||
-------- | ||||
r1370 | ||||
|
r1326 | desc = { | ||
'data_output': {'winds': ['z', 'w', 'v']}, | ||||
'utctime': 'timestamps', | ||||
'heightList': 'heights' | ||||
} | ||||
desc = { | ||||
'data_output': ['z', 'w', 'v'], | ||||
'utctime': 'timestamps', | ||||
'heightList': 'heights' | ||||
} | ||||
desc = { | ||||
'Data': { | ||||
'data_output': 'winds', | ||||
'utctime': 'timestamps' | ||||
}, | ||||
'Metadata': { | ||||
'heightList': 'heights' | ||||
} | ||||
} | ||||
r1370 | ||||
|
r1326 | writer = proc_unit.addOperation(name='HDFWriter') | ||
writer.addParameter(name='path', value='/path/to/file') | ||||
writer.addParameter(name='blocksPerFile', value='32') | ||||
writer.addParameter(name='metadataList', value='heightList,timeZone') | ||||
writer.addParameter(name='dataList',value='data_output,utctime') | ||||
# writer.addParameter(name='description',value=json.dumps(desc)) | ||||
|
r1232 | |||
|
r1326 | """ | ||
|
r1232 | |||
ext = ".hdf5" | ||||
optchar = "D" | ||||
filename = None | ||||
path = None | ||||
setFile = None | ||||
fp = None | ||||
firsttime = True | ||||
#Configurations | ||||
blocksPerFile = None | ||||
blockIndex = None | ||||
dataOut = None | ||||
#Data Arrays | ||||
dataList = None | ||||
metadataList = None | ||||
currentDay = None | ||||
lastTime = None | ||||
def __init__(self): | ||||
r1279 | ||||
|
r1232 | Operation.__init__(self) | ||
return | ||||
|
r1326 | def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None): | ||
|
r1232 | self.path = path | ||
self.blocksPerFile = blocksPerFile | ||||
self.metadataList = metadataList | ||||
r1339 | self.dataList = [s.strip() for s in dataList] | |||
|
r1232 | self.setType = setType | ||
|
r1326 | self.description = description | ||
r1339 | if self.metadataList is None: | |||
self.metadataList = self.dataOut.metadata_list | ||||
|
r1232 | |||
tableList = [] | ||||
dsList = [] | ||||
for i in range(len(self.dataList)): | ||||
dsDict = {} | ||||
r1339 | if hasattr(self.dataOut, self.dataList[i]): | |||
dataAux = getattr(self.dataOut, self.dataList[i]) | ||||
dsDict['variable'] = self.dataList[i] | ||||
else: | ||||
log.warning('Attribute {} not found in dataOut', self.name) | ||||
continue | ||||
|
r1232 | |||
if dataAux is None: | ||||
continue | ||||
elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)): | ||||
dsDict['nDim'] = 0 | ||||
else: | ||||
dsDict['nDim'] = len(dataAux.shape) | ||||
dsDict['shape'] = dataAux.shape | ||||
dsDict['dsNumber'] = dataAux.shape[0] | ||||
|
r1326 | dsDict['dtype'] = dataAux.dtype | ||
r1279 | ||||
|
r1232 | dsList.append(dsDict) | ||
self.dsList = dsList | ||||
self.currentDay = self.dataOut.datatime.date() | ||||
def timeFlag(self): | ||||
currentTime = self.dataOut.utctime | ||||
timeTuple = time.localtime(currentTime) | ||||
dataDay = timeTuple.tm_yday | ||||
if self.lastTime is None: | ||||
self.lastTime = currentTime | ||||
self.currentDay = dataDay | ||||
return False | ||||
r1279 | ||||
|
r1232 | timeDiff = currentTime - self.lastTime | ||
#Si el dia es diferente o si la diferencia entre un dato y otro supera la hora | ||||
if dataDay != self.currentDay: | ||||
self.currentDay = dataDay | ||||
return True | ||||
elif timeDiff > 3*60*60: | ||||
self.lastTime = currentTime | ||||
return True | ||||
else: | ||||
self.lastTime = currentTime | ||||
return False | ||||
r1339 | def run(self, dataOut, path, blocksPerFile=10, metadataList=None, | |||
|
r1326 | dataList=[], setType=None, description={}): | ||
|
r1232 | |||
self.dataOut = dataOut | ||||
if not(self.isConfig): | ||||
r1279 | self.setup(path=path, blocksPerFile=blocksPerFile, | |||
|
r1232 | metadataList=metadataList, dataList=dataList, | ||
|
r1326 | setType=setType, description=description) | ||
|
r1232 | |||
self.isConfig = True | ||||
self.setNextFile() | ||||
self.putData() | ||||
return | ||||
r1279 | ||||
|
r1232 | def setNextFile(self): | ||
r1279 | ||||
|
r1232 | ext = self.ext | ||
path = self.path | ||||
setFile = self.setFile | ||||
timeTuple = time.localtime(self.dataOut.utctime) | ||||
subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday) | ||||
fullpath = os.path.join(path, subfolder) | ||||
if os.path.exists(fullpath): | ||||
filesList = os.listdir(fullpath) | ||||
filesList = [k for k in filesList if k.startswith(self.optchar)] | ||||
if len( filesList ) > 0: | ||||
filesList = sorted(filesList, key=str.lower) | ||||
filen = filesList[-1] | ||||
# el filename debera tener el siguiente formato | ||||
# 0 1234 567 89A BCDE (hex) | ||||
# x YYYY DDD SSS .ext | ||||
if isNumber(filen[8:11]): | ||||
setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file | ||||
else: | ||||
setFile = -1 | ||||
else: | ||||
setFile = -1 #inicializo mi contador de seteo | ||||
else: | ||||
os.makedirs(fullpath) | ||||
setFile = -1 #inicializo mi contador de seteo | ||||
if self.setType is None: | ||||
setFile += 1 | ||||
file = '%s%4.4d%3.3d%03d%s' % (self.optchar, | ||||
timeTuple.tm_year, | ||||
timeTuple.tm_yday, | ||||
setFile, | ||||
ext ) | ||||
else: | ||||
setFile = timeTuple.tm_hour*60+timeTuple.tm_min | ||||
file = '%s%4.4d%3.3d%04d%s' % (self.optchar, | ||||
timeTuple.tm_year, | ||||
timeTuple.tm_yday, | ||||
setFile, | ||||
ext ) | ||||
self.filename = os.path.join( path, subfolder, file ) | ||||
#Setting HDF5 File | ||||
self.fp = h5py.File(self.filename, 'w') | ||||
#write metadata | ||||
self.writeMetadata(self.fp) | ||||
#Write data | ||||
self.writeData(self.fp) | ||||
|
r1326 | def getLabel(self, name, x=None): | ||
if x is None: | ||||
if 'Data' in self.description: | ||||
data = self.description['Data'] | ||||
if 'Metadata' in self.description: | ||||
data.update(self.description['Metadata']) | ||||
else: | ||||
data = self.description | ||||
if name in data: | ||||
if isinstance(data[name], str): | ||||
return data[name] | ||||
elif isinstance(data[name], list): | ||||
return None | ||||
elif isinstance(data[name], dict): | ||||
for key, value in data[name].items(): | ||||
return key | ||||
return name | ||||
else: | ||||
if 'Metadata' in self.description: | ||||
meta = self.description['Metadata'] | ||||
else: | ||||
meta = self.description | ||||
if name in meta: | ||||
if isinstance(meta[name], list): | ||||
return meta[name][x] | ||||
elif isinstance(meta[name], dict): | ||||
for key, value in meta[name].items(): | ||||
return value[x] | ||||
r1339 | if 'cspc' in name: | |||
return 'pair{:02d}'.format(x) | ||||
else: | ||||
return 'channel{:02d}'.format(x) | ||||
r1370 | ||||
|
r1232 | def writeMetadata(self, fp): | ||
|
r1326 | if self.description: | ||
if 'Metadata' in self.description: | ||||
grp = fp.create_group('Metadata') | ||||
else: | ||||
grp = fp | ||||
else: | ||||
grp = fp.create_group('Metadata') | ||||
|
r1232 | |||
for i in range(len(self.metadataList)): | ||||
if not hasattr(self.dataOut, self.metadataList[i]): | ||||
log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name) | ||||
continue | ||||
value = getattr(self.dataOut, self.metadataList[i]) | ||||
|
r1326 | if isinstance(value, bool): | ||
if value is True: | ||||
value = 1 | ||||
else: | ||||
value = 0 | ||||
grp.create_dataset(self.getLabel(self.metadataList[i]), data=value) | ||||
|
r1232 | return | ||
def writeData(self, fp): | ||||
r1279 | ||||
|
r1326 | if self.description: | ||
if 'Data' in self.description: | ||||
grp = fp.create_group('Data') | ||||
else: | ||||
grp = fp | ||||
else: | ||||
grp = fp.create_group('Data') | ||||
|
r1232 | dtsets = [] | ||
data = [] | ||||
r1279 | ||||
|
r1232 | for dsInfo in self.dsList: | ||
if dsInfo['nDim'] == 0: | ||||
ds = grp.create_dataset( | ||||
r1370 | self.getLabel(dsInfo['variable']), | |||
|
r1232 | (self.blocksPerFile, ), | ||
r1279 | chunks=True, | |||
|
r1232 | dtype=numpy.float64) | ||
dtsets.append(ds) | ||||
data.append((dsInfo['variable'], -1)) | ||||
else: | ||||
|
r1326 | label = self.getLabel(dsInfo['variable']) | ||
if label is not None: | ||||
sgrp = grp.create_group(label) | ||||
else: | ||||
sgrp = grp | ||||
|
r1232 | for i in range(dsInfo['dsNumber']): | ||
ds = sgrp.create_dataset( | ||||
r1370 | self.getLabel(dsInfo['variable'], i), | |||
|
r1232 | (self.blocksPerFile, ) + dsInfo['shape'][1:], | ||
|
r1326 | chunks=True, | ||
dtype=dsInfo['dtype']) | ||||
|
r1232 | dtsets.append(ds) | ||
data.append((dsInfo['variable'], i)) | ||||
fp.flush() | ||||
|
r1235 | log.log('Creating file: {}'.format(fp.filename), self.name) | ||
r1279 | ||||
|
r1232 | self.ds = dtsets | ||
self.data = data | ||||
self.firsttime = True | ||||
self.blockIndex = 0 | ||||
return | ||||
def putData(self): | ||||
if (self.blockIndex == self.blocksPerFile) or self.timeFlag(): | ||||
self.closeFile() | ||||
self.setNextFile() | ||||
for i, ds in enumerate(self.ds): | ||||
attr, ch = self.data[i] | ||||
if ch == -1: | ||||
ds[self.blockIndex] = getattr(self.dataOut, attr) | ||||
else: | ||||
ds[self.blockIndex] = getattr(self.dataOut, attr)[ch] | ||||
self.fp.flush() | ||||
self.blockIndex += 1 | ||||
|
r1235 | log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name) | ||
|
r1232 | |||
return | ||||
def closeFile(self): | ||||
if self.blockIndex != self.blocksPerFile: | ||||
for ds in self.ds: | ||||
ds.resize(self.blockIndex, axis=0) | ||||
r1343 | if self.fp: | |||
self.fp.flush() | ||||
self.fp.close() | ||||
|
r1232 | |||
def close(self): | ||||
self.closeFile() | ||||