jroIO_param.py
745 lines
| 23.9 KiB
| text/x-python
|
PythonLexer
|
r848 | import os | |
|
r1326 | import time | |
|
r848 | import datetime | |
|
r1326 | import numpy | |
import h5py | |||
r1241 | import schainpy.admin | ||
|
r848 | from schainpy.model.data.jrodata import * | |
|
r1179 | from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator | |
|
r848 | from schainpy.model.io.jroIO_base import * | |
|
r1179 | from schainpy.utils import log | |
|
r848 | ||
|
r1287 | ||
|
r1326 | class HDFReader(Reader, ProcessingUnit): | |
"""Processing unit to read HDF5 format files | |||
This unit reads HDF5 files created with `HDFWriter` operation contains | |||
by default two groups Data and Metadata all variables would be saved as `dataOut` | |||
r1377 | attributes. | ||
|
r1326 | It is possible to read any HDF5 file by given the structure in the `description` | |
parameter, also you can add extra values to metadata with the parameter `extras`. | |||
Parameters: | |||
----------- | |||
path : str | |||
Path where files are located. | |||
startDate : date | |||
Start date of the files | |||
endDate : list | |||
End date of the files | |||
startTime : time | |||
Start time of the files | |||
endTime : time | |||
End time of the files | |||
description : dict, optional | |||
Dictionary with the description of the HDF5 file | |||
extras : dict, optional | |||
Dictionary with extra metadata to be be added to `dataOut` | |||
r1377 | |||
|
r1326 | Examples | |
-------- | |||
r1377 | |||
|
r1326 | desc = { | |
'Data': { | |||
'data_output': ['u', 'v', 'w'], | |||
'utctime': 'timestamps', | |||
} , | |||
'Metadata': { | |||
'heightList': 'heights' | |||
} | |||
} | |||
desc = { | |||
'Data': { | |||
'data_output': 'winds', | |||
'utctime': 'timestamps' | |||
}, | |||
'Metadata': { | |||
'heightList': 'heights' | |||
} | |||
} | |||
extras = { | |||
'timeZone': 300 | |||
} | |||
r1377 | |||
|
r1326 | reader = project.addReadUnit( | |
name='HDFReader', | |||
path='/path/to/files', | |||
startDate='2019/01/01', | |||
endDate='2019/01/31', | |||
startTime='00:00:00', | |||
endTime='23:59:59', | |||
# description=json.dumps(desc), | |||
# extras=json.dumps(extras), | |||
) | |||
|
r1232 | ||
|
r1326 | """ | |
|
r1287 | ||
|
r1326 | __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras'] | |
|
r1232 | ||
def __init__(self): | |||
ProcessingUnit.__init__(self) | |||
self.dataOut = Parameters() | |||
r1254 | self.ext = ".hdf5" | ||
self.optchar = "D" | |||
|
r1326 | self.meta = {} | |
self.data = {} | |||
r1254 | self.open_file = h5py.File | ||
self.open_mode = 'r' | |||
|
r1326 | self.description = {} | |
self.extras = {} | |||
r1254 | self.filefmt = "*%Y%j***" | ||
self.folderfmt = "*%Y%j" | |||
r1363 | self.utcoffset = 0 | ||
|
r1232 | ||
def setup(self, **kwargs): | |||
r1254 | self.set_kwargs(**kwargs) | ||
if not self.ext.startswith('.'): | |||
r1377 | self.ext = '.{}'.format(self.ext) | ||
|
r1232 | ||
r1254 | if self.online: | ||
log.log("Searching files in online mode...", self.name) | |||
|
r1232 | ||
r1254 | for nTries in range(self.nTries): | ||
fullpath = self.searchFilesOnLine(self.path, self.startDate, | |||
r1377 | self.endDate, self.expLabel, self.ext, self.walk, | ||
r1254 | self.filefmt, self.folderfmt) | ||
try: | |||
fullpath = next(fullpath) | |||
except: | |||
fullpath = None | |||
r1377 | |||
r1254 | if fullpath: | ||
break | |||
log.warning( | |||
'Waiting {} sec for a valid file in {}: try {} ...'.format( | |||
r1377 | self.delay, self.path, nTries + 1), | ||
r1254 | self.name) | ||
time.sleep(self.delay) | |||
if not(fullpath): | |||
raise schainpy.admin.SchainError( | |||
r1377 | 'There isn\'t any valid file in {}'.format(self.path)) | ||
r1254 | |||
pathname, filename = os.path.split(fullpath) | |||
self.year = int(filename[1:5]) | |||
self.doy = int(filename[5:8]) | |||
r1377 | self.set = int(filename[8:11]) - 1 | ||
|
r1232 | else: | |
r1254 | log.log("Searching files in {}".format(self.path), self.name) | ||
r1377 | self.filenameList = self.searchFilesOffLine(self.path, self.startDate, | ||
r1254 | self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt) | ||
r1377 | |||
r1254 | self.setNextFile() | ||
|
r1232 | ||
r1254 | return | ||
|
r1232 | ||
r1254 | def readFirstHeader(self): | ||
'''Read metadata and data''' | |||
|
r1232 | ||
r1377 | self.__readMetadata() | ||
|
r1232 | self.__readData() | |
r1254 | self.__setBlockList() | ||
r1377 | |||
|
r1326 | if 'type' in self.meta: | |
self.dataOut = eval(self.meta['type'])() | |||
r1377 | |||
|
r1326 | for attr in self.meta: | |
setattr(self.dataOut, attr, self.meta[attr]) | |||
r1377 | |||
|
r1232 | self.blockIndex = 0 | |
r1377 | |||
r1254 | return | ||
|
r1232 | ||
def __setBlockList(self): | |||
''' | |||
Selects the data within the times defined | |||
self.fp | |||
self.startTime | |||
self.endTime | |||
self.blockList | |||
self.blocksPerFile | |||
''' | |||
r1254 | |||
|
r1232 | startTime = self.startTime | |
endTime = self.endTime | |||
r1363 | thisUtcTime = self.data['utctime'] + self.utcoffset | ||
r1254 | self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1]) | ||
|
r1326 | thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0]) | |
|
r1232 | ||
thisDate = thisDatetime.date() | |||
thisTime = thisDatetime.time() | |||
|
r1326 | startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds() | |
endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds() | |||
|
r1232 | ||
ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0] | |||
self.blockList = ind | |||
self.blocksPerFile = len(ind) | |||
r1737 | # similar to master | ||
if len(ind)==0: | |||
print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.blockIndex, | |||
self.blocksPerFile, | |||
thisDatetime)) | |||
self.setNextFile() | |||
# similar to master | |||
|
r1232 | return | |
def __readMetadata(self): | |||
''' | |||
Reads Metadata | |||
''' | |||
|
r1326 | meta = {} | |
if self.description: | |||
for key, value in self.description['Metadata'].items(): | |||
r1360 | meta[key] = self.fp[value][()] | ||
r1254 | else: | ||
|
r1326 | grp = self.fp['Metadata'] | |
for name in grp: | |||
r1360 | meta[name] = grp[name][()] | ||
|
r1232 | ||
|
r1326 | if self.extras: | |
for key, value in self.extras.items(): | |||
meta[key] = value | |||
self.meta = meta | |||
|
r1232 | ||
return | |||
def __readData(self): | |||
|
r1326 | data = {} | |
r1377 | |||
|
r1326 | if self.description: | |
for key, value in self.description['Data'].items(): | |||
if isinstance(value, str): | |||
if isinstance(self.fp[value], h5py.Dataset): | |||
r1360 | data[key] = self.fp[value][()] | ||
|
r1326 | elif isinstance(self.fp[value], h5py.Group): | |
array = [] | |||
for ch in self.fp[value]: | |||
r1360 | array.append(self.fp[value][ch][()]) | ||
|
r1326 | data[key] = numpy.array(array) | |
elif isinstance(value, list): | |||
array = [] | |||
for ch in value: | |||
r1360 | array.append(self.fp[ch][()]) | ||
|
r1326 | data[key] = numpy.array(array) | |
else: | |||
r1254 | grp = self.fp['Data'] | ||
|
r1326 | for name in grp: | |
if isinstance(grp[name], h5py.Dataset): | |||
r1360 | array = grp[name][()] | ||
|
r1326 | elif isinstance(grp[name], h5py.Group): | |
r1254 | array = [] | ||
|
r1326 | for ch in grp[name]: | |
r1360 | array.append(grp[name][ch][()]) | ||
r1254 | array = numpy.array(array) | ||
else: | |||
|
r1326 | log.warning('Unknown type: {}'.format(name)) | |
r1377 | |||
|
r1326 | if name in self.description: | |
key = self.description[name] | |||
else: | |||
key = name | |||
data[key] = array | |||
|
r1232 | ||
|
r1326 | self.data = data | |
|
r1232 | return | |
r1377 | |||
|
r1232 | def getData(self): | |
|
r1326 | for attr in self.data: | |
if self.data[attr].ndim == 1: | |||
setattr(self.dataOut, attr, self.data[attr][self.blockIndex]) | |||
|
r1232 | else: | |
|
r1326 | setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex]) | |
|
r1232 | ||
self.dataOut.flagNoData = False | |||
self.blockIndex += 1 | |||
|
r1326 | log.log("Block No. {}/{} -> {}".format( | |
self.blockIndex, | |||
self.blocksPerFile, | |||
self.dataOut.datatime.ctime()), self.name) | |||
|
r1232 | return | |
def run(self, **kwargs): | |||
if not(self.isConfig): | |||
self.setup(**kwargs) | |||
self.isConfig = True | |||
if self.blockIndex == self.blocksPerFile: | |||
r1254 | self.setNextFile() | ||
|
r1232 | ||
self.getData() | |||
return | |||
r1783 | |||
|
r1232 | @MPDecorator | |
|
r1326 | class HDFWriter(Operation): | |
r1783 | # HDFWriter class of most recent Signal Chain operation obtained from Sophy branch, added to incorpore new variables in the HDF Writting March 2025 | ||
# C. Portilla | |||
|
r1326 | """Operation to write HDF5 files. | |
The HDF5 file contains by default two groups Data and Metadata where | |||
you can save any `dataOut` attribute specified by `dataList` and `metadataList` | |||
parameters, data attributes are normaly time dependent where the metadata | |||
r1377 | are not. | ||
It is possible to customize the structure of the HDF5 file with the | |||
|
r1326 | optional description parameter see the examples. | |
Parameters: | |||
----------- | |||
path : str | |||
Path where files will be saved. | |||
blocksPerFile : int | |||
Number of blocks per file | |||
metadataList : list | |||
List of the dataOut attributes that will be saved as metadata | |||
dataList : int | |||
List of the dataOut attributes that will be saved as data | |||
setType : bool | |||
If True the name of the files corresponds to the timestamp of the data | |||
description : dict, optional | |||
Dictionary with the desired description of the HDF5 file | |||
r1377 | |||
|
r1326 | Examples | |
-------- | |||
r1377 | |||
|
r1326 | desc = { | |
'data_output': {'winds': ['z', 'w', 'v']}, | |||
'utctime': 'timestamps', | |||
'heightList': 'heights' | |||
} | |||
desc = { | |||
'data_output': ['z', 'w', 'v'], | |||
'utctime': 'timestamps', | |||
'heightList': 'heights' | |||
} | |||
desc = { | |||
'Data': { | |||
'data_output': 'winds', | |||
'utctime': 'timestamps' | |||
}, | |||
'Metadata': { | |||
'heightList': 'heights' | |||
} | |||
} | |||
r1377 | |||
|
r1326 | writer = proc_unit.addOperation(name='HDFWriter') | |
writer.addParameter(name='path', value='/path/to/file') | |||
writer.addParameter(name='blocksPerFile', value='32') | |||
writer.addParameter(name='metadataList', value='heightList,timeZone') | |||
writer.addParameter(name='dataList',value='data_output,utctime') | |||
# writer.addParameter(name='description',value=json.dumps(desc)) | |||
|
r1232 | ||
|
r1326 | """ | |
|
r1232 | ||
r1783 | ext = ".hdf5" | ||
optchar = "D" | |||
filename = None | |||
path = None | |||
setFile = None | |||
fp = None | |||
firsttime = True | |||
|
r1232 | #Configurations | |
blocksPerFile = None | |||
r1783 | blockIndex = None | ||
dataOut = None | |||
|
r1232 | #Data Arrays | |
dataList = None | |||
metadataList = None | |||
r1783 | currentDay = None | ||
lastTime = None | |||
last_Azipos = None | |||
last_Elepos = None | |||
mode = None | |||
#----------------------- | |||
Typename = None | |||
mask = False | |||
setChannel = None | |||
|
r1232 | ||
def __init__(self): | |||
r1377 | |||
|
r1232 | Operation.__init__(self) | |
return | |||
r1737 | def set_kwargs(self, **kwargs): | ||
for key, value in kwargs.items(): | |||
setattr(self, key, value) | |||
r1783 | def set_kwargs_obj(self,obj, **kwargs): | ||
r1737 | |||
for key, value in kwargs.items(): | |||
setattr(obj, key, value) | |||
r1783 | def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None,type_data=None, localtime=True,setChannel=None, **kwargs): | ||
|
r1232 | self.path = path | |
self.blocksPerFile = blocksPerFile | |||
r1783 | self.metadataList = metadataList | ||
self.dataList = [s.strip() for s in dataList] | |||
self.setChannel = setChannel | |||
|
r1232 | self.setType = setType | |
r1783 | if self.setType == "weather": | ||
self.set_kwargs(**kwargs) | |||
self.set_kwargs_obj(self.dataOut,**kwargs) | |||
self.weather_vars = { | |||
'S' : 0, | |||
'V' : 1, | |||
'W' : 2, | |||
'SNR' : 3, | |||
'Z' : 4, | |||
'D' : 5, | |||
'P' : 6, | |||
'R' : 7, | |||
} | |||
if localtime: | |||
self.getDateTime = datetime.datetime.fromtimestamp | |||
else: | |||
self.getDateTime = datetime.datetime.utcfromtimestamp | |||
|
r1326 | self.description = description | |
r1783 | self.type_data=type_data | ||
|
r1326 | ||
r1339 | if self.metadataList is None: | ||
self.metadataList = self.dataOut.metadata_list | |||
|
r1232 | ||
dsList = [] | |||
for i in range(len(self.dataList)): | |||
dsDict = {} | |||
r1339 | if hasattr(self.dataOut, self.dataList[i]): | ||
dataAux = getattr(self.dataOut, self.dataList[i]) | |||
r1783 | if self.setType == 'weather' and self.dataList[i] == 'data_param': | ||
if self.setChannel is None: | |||
dataAux = dataAux[:,self.weather_vars[self.weather_var],:] | |||
else: | |||
dataAux = dataAux[self.setChannel,self.weather_vars[self.weather_var],:] | |||
dataAux = numpy.reshape(dataAux,(1,dataAux.shape[0],dataAux.shape[1])) | |||
r1339 | dsDict['variable'] = self.dataList[i] | ||
else: | |||
r1783 | log.warning('Attribute {} not found in dataOut'.format(self.dataList[i]), self.name) | ||
r1339 | continue | ||
|
r1232 | ||
if dataAux is None: | |||
continue | |||
r1783 | elif isinstance(dataAux, (int, float, numpy.integer, numpy.float_)): | ||
|
r1232 | dsDict['nDim'] = 0 | |
else: | |||
dsDict['nDim'] = len(dataAux.shape) | |||
dsDict['shape'] = dataAux.shape | |||
dsDict['dsNumber'] = dataAux.shape[0] | |||
|
r1326 | dsDict['dtype'] = dataAux.dtype | |
|
r1232 | dsList.append(dsDict) | |
self.dsList = dsList | |||
self.currentDay = self.dataOut.datatime.date() | |||
def timeFlag(self): | |||
currentTime = self.dataOut.utctime | |||
r1783 | dt = self.getDateTime(currentTime) | ||
dataDay = int(dt.strftime('%j')) | |||
|
r1232 | ||
if self.lastTime is None: | |||
self.lastTime = currentTime | |||
self.currentDay = dataDay | |||
return False | |||
r1377 | |||
|
r1232 | timeDiff = currentTime - self.lastTime | |
#Si el dia es diferente o si la diferencia entre un dato y otro supera la hora | |||
if dataDay != self.currentDay: | |||
self.currentDay = dataDay | |||
return True | |||
elif timeDiff > 3*60*60: | |||
self.lastTime = currentTime | |||
return True | |||
else: | |||
self.lastTime = currentTime | |||
return False | |||
r1339 | def run(self, dataOut, path, blocksPerFile=10, metadataList=None, | ||
r1783 | dataList=[], setType=None, description={}, mode= None, | ||
type_data=None, Reset = False, localtime=True, **kwargs): | |||
if Reset: | |||
self.isConfig = False | |||
self.closeFile() | |||
self.lastTime = None | |||
self.blockIndex = 0 | |||
|
r1232 | ||
self.dataOut = dataOut | |||
r1783 | self.mode = mode | ||
|
r1232 | if not(self.isConfig): | |
r1377 | self.setup(path=path, blocksPerFile=blocksPerFile, | ||
|
r1232 | metadataList=metadataList, dataList=dataList, | |
r1783 | setType=setType, description=description,type_data=type_data, | ||
localtime=localtime, **kwargs) | |||
|
r1232 | ||
self.isConfig = True | |||
self.setNextFile() | |||
self.putData() | |||
return | |||
r1377 | |||
|
r1232 | def setNextFile(self): | |
r1377 | |||
|
r1232 | ext = self.ext | |
path = self.path | |||
setFile = self.setFile | |||
r1783 | dt = self.getDateTime(self.dataOut.utctime) | ||
if self.setType == 'weather': | |||
subfolder = dt.strftime('%Y-%m-%dT%H-00-00') | |||
subfolder = '' | |||
else: | |||
subfolder = dt.strftime('d%Y%j') | |||
|
r1232 | fullpath = os.path.join(path, subfolder) | |
if os.path.exists(fullpath): | |||
filesList = os.listdir(fullpath) | |||
filesList = [k for k in filesList if k.startswith(self.optchar)] | |||
if len( filesList ) > 0: | |||
filesList = sorted(filesList, key=str.lower) | |||
filen = filesList[-1] | |||
# el filename debera tener el siguiente formato | |||
# 0 1234 567 89A BCDE (hex) | |||
# x YYYY DDD SSS .ext | |||
if isNumber(filen[8:11]): | |||
setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file | |||
else: | |||
setFile = -1 | |||
else: | |||
setFile = -1 #inicializo mi contador de seteo | |||
else: | |||
os.makedirs(fullpath) | |||
setFile = -1 #inicializo mi contador de seteo | |||
if self.setType is None: | |||
setFile += 1 | |||
file = '%s%4.4d%3.3d%03d%s' % (self.optchar, | |||
r1783 | dt.year, | ||
int(dt.strftime('%j')), | |||
|
r1232 | setFile, | |
ext ) | |||
r1783 | elif self.setType == "weather": | ||
#SOPHY_20200505_140215_E10.0_Z.h5 | |||
#SOPHY_20200505_140215_A40.0_Z.h5 | |||
if self.dataOut.flagMode == 1: #'AZI' #PPI | |||
ang_type = 'EL' | |||
mode_type = 'PPI' | |||
len_aux = int(self.dataOut.data_ele.shape[0]/4) | |||
mean = numpy.mean(self.dataOut.data_ele[len_aux:-len_aux]) | |||
ang_ = round(mean,1) | |||
elif self.dataOut.flagMode == 0: #'ELE' #RHI | |||
ang_type = 'AZ' | |||
mode_type = 'RHI' | |||
len_aux = int(self.dataOut.data_azi.shape[0]/4) | |||
mean = numpy.mean(self.dataOut.data_azi[len_aux:-len_aux]) | |||
ang_ = round(mean,1) | |||
file = '%s_%2.2d%2.2d%2.2d_%2.2d%2.2d%2.2d_%s%2.1f_%s%s' % ( | |||
'SOPHY', | |||
dt.year, | |||
dt.month, | |||
dt.day, | |||
dt.hour, | |||
dt.minute, | |||
dt.second, | |||
ang_type[0], | |||
ang_, | |||
self.weather_var, | |||
ext ) | |||
subfolder = '{}_{}_{}_{:2.1f}'.format(self.weather_var, mode_type, ang_type, ang_) | |||
fullpath = os.path.join(path, subfolder) | |||
if not os.path.exists(fullpath): | |||
os.makedirs(fullpath) | |||
|
r1232 | else: | |
r1783 | setFile = dt.hour*60+dt.minute | ||
|
r1232 | file = '%s%4.4d%3.3d%04d%s' % (self.optchar, | |
r1783 | dt.year, | ||
int(dt.strftime('%j')), | |||
|
r1232 | setFile, | |
ext ) | |||
self.filename = os.path.join( path, subfolder, file ) | |||
self.fp = h5py.File(self.filename, 'w') | |||
#write metadata | |||
self.writeMetadata(self.fp) | |||
#Write data | |||
self.writeData(self.fp) | |||
|
r1326 | def getLabel(self, name, x=None): | |
r1783 | |||
|
r1326 | if x is None: | |
if 'Data' in self.description: | |||
data = self.description['Data'] | |||
if 'Metadata' in self.description: | |||
data.update(self.description['Metadata']) | |||
else: | |||
data = self.description | |||
if name in data: | |||
if isinstance(data[name], str): | |||
return data[name] | |||
elif isinstance(data[name], list): | |||
return None | |||
elif isinstance(data[name], dict): | |||
for key, value in data[name].items(): | |||
return key | |||
return name | |||
else: | |||
r1737 | if 'Data' in self.description: | ||
data = self.description['Data'] | |||
if 'Metadata' in self.description: | |||
data.update(self.description['Metadata']) | |||
|
r1326 | else: | |
r1737 | data = self.description | ||
if name in data: | |||
if isinstance(data[name], list): | |||
return data[name][x] | |||
elif isinstance(data[name], dict): | |||
for key, value in data[name].items(): | |||
|
r1326 | return value[x] | |
r1339 | if 'cspc' in name: | ||
return 'pair{:02d}'.format(x) | |||
else: | |||
return 'channel{:02d}'.format(x) | |||
r1377 | |||
|
r1232 | def writeMetadata(self, fp): | |
|
r1326 | if self.description: | |
if 'Metadata' in self.description: | |||
grp = fp.create_group('Metadata') | |||
else: | |||
grp = fp | |||
else: | |||
grp = fp.create_group('Metadata') | |||
|
r1232 | ||
for i in range(len(self.metadataList)): | |||
if not hasattr(self.dataOut, self.metadataList[i]): | |||
log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name) | |||
continue | |||
value = getattr(self.dataOut, self.metadataList[i]) | |||
|
r1326 | if isinstance(value, bool): | |
if value is True: | |||
value = 1 | |||
else: | |||
value = 0 | |||
grp.create_dataset(self.getLabel(self.metadataList[i]), data=value) | |||
|
r1232 | return | |
def writeData(self, fp): | |||
r1377 | |||
|
r1326 | if self.description: | |
if 'Data' in self.description: | |||
grp = fp.create_group('Data') | |||
else: | |||
grp = fp | |||
else: | |||
grp = fp.create_group('Data') | |||
|
r1232 | dtsets = [] | |
data = [] | |||
r1783 | |||
|
r1232 | for dsInfo in self.dsList: | |
r1783 | |||
|
r1232 | if dsInfo['nDim'] == 0: | |
ds = grp.create_dataset( | |||
r1377 | self.getLabel(dsInfo['variable']), | ||
|
r1232 | (self.blocksPerFile, ), | |
r1377 | chunks=True, | ||
|
r1232 | dtype=numpy.float64) | |
dtsets.append(ds) | |||
data.append((dsInfo['variable'], -1)) | |||
else: | |||
|
r1326 | label = self.getLabel(dsInfo['variable']) | |
if label is not None: | |||
sgrp = grp.create_group(label) | |||
else: | |||
sgrp = grp | |||
r1783 | if self.blocksPerFile == 1: | ||
shape = dsInfo['shape'][1:] | |||
else: | |||
shape = (self.blocksPerFile, ) + dsInfo['shape'][1:] | |||
|
r1232 | for i in range(dsInfo['dsNumber']): | |
r1783 | if dsInfo['dsNumber']==1: | ||
if self.setChannel==1: | |||
i=1 | |||
|
r1232 | ds = sgrp.create_dataset( | |
r1377 | self.getLabel(dsInfo['variable'], i), | ||
r1783 | shape, | ||
|
r1326 | chunks=True, | |
r1783 | dtype=dsInfo['dtype'], | ||
compression='gzip', | |||
) | |||
|
r1232 | dtsets.append(ds) | |
data.append((dsInfo['variable'], i)) | |||
fp.flush() | |||
|
r1235 | log.log('Creating file: {}'.format(fp.filename), self.name) | |
r1377 | |||
|
r1232 | self.ds = dtsets | |
self.data = data | |||
self.firsttime = True | |||
self.blockIndex = 0 | |||
return | |||
def putData(self): | |||
if (self.blockIndex == self.blocksPerFile) or self.timeFlag(): | |||
self.closeFile() | |||
self.setNextFile() | |||
for i, ds in enumerate(self.ds): | |||
attr, ch = self.data[i] | |||
if ch == -1: | |||
ds[self.blockIndex] = getattr(self.dataOut, attr) | |||
else: | |||
r1783 | if self.blocksPerFile == 1: | ||
mask = self.dataOut.data_param[:,3,:][ch] < self.mask | |||
tmp = getattr(self.dataOut, attr)[:,self.weather_vars[self.weather_var],:][ch] | |||
if self.mask: | |||
tmp[mask] = numpy.nan | |||
ds[:] = tmp | |||
else: | |||
ds[self.blockIndex] = getattr(self.dataOut, attr)[ch] | |||
|
r1232 | ||
self.fp.flush() | |||
self.blockIndex += 1 | |||
|
r1235 | log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name) | |
|
r1232 | ||
return | |||
def closeFile(self): | |||
if self.blockIndex != self.blocksPerFile: | |||
for ds in self.ds: | |||
ds.resize(self.blockIndex, axis=0) | |||
r1343 | if self.fp: | ||
self.fp.flush() | |||
self.fp.close() | |||
|
r1232 | ||
def close(self): | |||
r1783 | self.closeFile() |