##// END OF EJS Templates
manual header update
manual header update

File last commit:

r1783:14f11b916f42
r1788:c7146b87b3fa
Show More
jroIO_param.py
745 lines | 23.9 KiB | text/x-python | PythonLexer
Julio Valdez
Changed name from jroIO_HDF5 to jroIO_param
r848 import os
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 import time
Julio Valdez
Changed name from jroIO_HDF5 to jroIO_param
r848 import datetime
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 import numpy
import h5py
Errors handling and gracefully terminate main process
r1241 import schainpy.admin
Julio Valdez
Changed name from jroIO_HDF5 to jroIO_param
r848 from schainpy.model.data.jrodata import *
George Yong
Multiprocessing for writing Units(Spectral, Voltage and Parameters)
r1179 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
Julio Valdez
Changed name from jroIO_HDF5 to jroIO_param
r848 from schainpy.model.io.jroIO_base import *
George Yong
Multiprocessing for writing Units(Spectral, Voltage and Parameters)
r1179 from schainpy.utils import log
Julio Valdez
Changed name from jroIO_HDF5 to jroIO_param
r848
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 class HDFReader(Reader, ProcessingUnit):
"""Processing unit to read HDF5 format files
This unit reads HDF5 files created with `HDFWriter` operation contains
by default two groups Data and Metadata all variables would be saved as `dataOut`
isr commit
r1377 attributes.
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 It is possible to read any HDF5 file by given the structure in the `description`
parameter, also you can add extra values to metadata with the parameter `extras`.
Parameters:
-----------
path : str
Path where files are located.
startDate : date
Start date of the files
endDate : list
End date of the files
startTime : time
Start time of the files
endTime : time
End time of the files
description : dict, optional
Dictionary with the description of the HDF5 file
extras : dict, optional
Dictionary with extra metadata to be be added to `dataOut`
isr commit
r1377
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 Examples
--------
isr commit
r1377
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 desc = {
'Data': {
'data_output': ['u', 'v', 'w'],
'utctime': 'timestamps',
} ,
'Metadata': {
'heightList': 'heights'
}
}
desc = {
'Data': {
'data_output': 'winds',
'utctime': 'timestamps'
},
'Metadata': {
'heightList': 'heights'
}
}
extras = {
'timeZone': 300
}
isr commit
r1377
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 reader = project.addReadUnit(
name='HDFReader',
path='/path/to/files',
startDate='2019/01/01',
endDate='2019/01/31',
startTime='00:00:00',
endTime='23:59:59',
# description=json.dumps(desc),
# extras=json.dumps(extras),
)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 """
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
def __init__(self):
ProcessingUnit.__init__(self)
self.dataOut = Parameters()
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.ext = ".hdf5"
self.optchar = "D"
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 self.meta = {}
self.data = {}
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.open_file = h5py.File
self.open_mode = 'r'
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 self.description = {}
self.extras = {}
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.filefmt = "*%Y%j***"
self.folderfmt = "*%Y%j"
Bug saving plots when throttle, add utcoffset to HDFReader
r1363 self.utcoffset = 0
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
def setup(self, **kwargs):
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.set_kwargs(**kwargs)
if not self.ext.startswith('.'):
isr commit
r1377 self.ext = '.{}'.format(self.ext)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 if self.online:
log.log("Searching files in online mode...", self.name)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 for nTries in range(self.nTries):
fullpath = self.searchFilesOnLine(self.path, self.startDate,
isr commit
r1377 self.endDate, self.expLabel, self.ext, self.walk,
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.filefmt, self.folderfmt)
try:
fullpath = next(fullpath)
except:
fullpath = None
isr commit
r1377
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 if fullpath:
break
log.warning(
'Waiting {} sec for a valid file in {}: try {} ...'.format(
isr commit
r1377 self.delay, self.path, nTries + 1),
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.name)
time.sleep(self.delay)
if not(fullpath):
raise schainpy.admin.SchainError(
isr commit
r1377 'There isn\'t any valid file in {}'.format(self.path))
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254
pathname, filename = os.path.split(fullpath)
self.year = int(filename[1:5])
self.doy = int(filename[5:8])
isr commit
r1377 self.set = int(filename[8:11]) - 1
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 else:
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 log.log("Searching files in {}".format(self.path), self.name)
isr commit
r1377 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
isr commit
r1377
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.setNextFile()
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 return
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 def readFirstHeader(self):
'''Read metadata and data'''
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
isr commit
r1377 self.__readMetadata()
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 self.__readData()
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.__setBlockList()
isr commit
r1377
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 if 'type' in self.meta:
self.dataOut = eval(self.meta['type'])()
isr commit
r1377
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 for attr in self.meta:
setattr(self.dataOut, attr, self.meta[attr])
isr commit
r1377
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 self.blockIndex = 0
isr commit
r1377
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 return
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
def __setBlockList(self):
'''
Selects the data within the times defined
self.fp
self.startTime
self.endTime
self.blockList
self.blocksPerFile
'''
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 startTime = self.startTime
endTime = self.endTime
Bug saving plots when throttle, add utcoffset to HDFReader
r1363 thisUtcTime = self.data['utctime'] + self.utcoffset
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
thisDate = thisDatetime.date()
thisTime = thisDatetime.time()
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
self.blockList = ind
self.blocksPerFile = len(ind)
updated with changes in v3.0-devel
r1737 # similar to master
if len(ind)==0:
print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.blockIndex,
self.blocksPerFile,
thisDatetime))
self.setNextFile()
# similar to master
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 return
def __readMetadata(self):
'''
Reads Metadata
'''
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 meta = {}
if self.description:
for key, value in self.description['Metadata'].items():
Fix h5py Dataset value attribute deprecation
r1360 meta[key] = self.fp[value][()]
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 else:
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 grp = self.fp['Metadata']
for name in grp:
Fix h5py Dataset value attribute deprecation
r1360 meta[name] = grp[name][()]
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 if self.extras:
for key, value in self.extras.items():
meta[key] = value
self.meta = meta
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
return
def __readData(self):
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 data = {}
isr commit
r1377
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 if self.description:
for key, value in self.description['Data'].items():
if isinstance(value, str):
if isinstance(self.fp[value], h5py.Dataset):
Fix h5py Dataset value attribute deprecation
r1360 data[key] = self.fp[value][()]
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 elif isinstance(self.fp[value], h5py.Group):
array = []
for ch in self.fp[value]:
Fix h5py Dataset value attribute deprecation
r1360 array.append(self.fp[value][ch][()])
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 data[key] = numpy.array(array)
elif isinstance(value, list):
array = []
for ch in value:
Fix h5py Dataset value attribute deprecation
r1360 array.append(self.fp[ch][()])
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 data[key] = numpy.array(array)
else:
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 grp = self.fp['Data']
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 for name in grp:
if isinstance(grp[name], h5py.Dataset):
Fix h5py Dataset value attribute deprecation
r1360 array = grp[name][()]
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 elif isinstance(grp[name], h5py.Group):
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 array = []
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 for ch in grp[name]:
Fix h5py Dataset value attribute deprecation
r1360 array.append(grp[name][ch][()])
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 array = numpy.array(array)
else:
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 log.warning('Unknown type: {}'.format(name))
isr commit
r1377
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 if name in self.description:
key = self.description[name]
else:
key = name
data[key] = array
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 self.data = data
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 return
isr commit
r1377
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 def getData(self):
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 for attr in self.data:
if self.data[attr].ndim == 1:
setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 else:
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
self.dataOut.flagNoData = False
self.blockIndex += 1
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 log.log("Block No. {}/{} -> {}".format(
self.blockIndex,
self.blocksPerFile,
self.dataOut.datatime.ctime()), self.name)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 return
def run(self, **kwargs):
if not(self.isConfig):
self.setup(**kwargs)
self.isConfig = True
if self.blockIndex == self.blocksPerFile:
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.setNextFile()
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
self.getData()
return
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 @MPDecorator
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 class HDFWriter(Operation):
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 # HDFWriter class of most recent Signal Chain operation obtained from Sophy branch, added to incorpore new variables in the HDF Writting March 2025
# C. Portilla
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 """Operation to write HDF5 files.
The HDF5 file contains by default two groups Data and Metadata where
you can save any `dataOut` attribute specified by `dataList` and `metadataList`
parameters, data attributes are normaly time dependent where the metadata
isr commit
r1377 are not.
It is possible to customize the structure of the HDF5 file with the
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 optional description parameter see the examples.
Parameters:
-----------
path : str
Path where files will be saved.
blocksPerFile : int
Number of blocks per file
metadataList : list
List of the dataOut attributes that will be saved as metadata
dataList : int
List of the dataOut attributes that will be saved as data
setType : bool
If True the name of the files corresponds to the timestamp of the data
description : dict, optional
Dictionary with the desired description of the HDF5 file
isr commit
r1377
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 Examples
--------
isr commit
r1377
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 desc = {
'data_output': {'winds': ['z', 'w', 'v']},
'utctime': 'timestamps',
'heightList': 'heights'
}
desc = {
'data_output': ['z', 'w', 'v'],
'utctime': 'timestamps',
'heightList': 'heights'
}
desc = {
'Data': {
'data_output': 'winds',
'utctime': 'timestamps'
},
'Metadata': {
'heightList': 'heights'
}
}
isr commit
r1377
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 writer = proc_unit.addOperation(name='HDFWriter')
writer.addParameter(name='path', value='/path/to/file')
writer.addParameter(name='blocksPerFile', value='32')
writer.addParameter(name='metadataList', value='heightList,timeZone')
writer.addParameter(name='dataList',value='data_output,utctime')
# writer.addParameter(name='description',value=json.dumps(desc))
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 """
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 ext = ".hdf5"
optchar = "D"
filename = None
path = None
setFile = None
fp = None
firsttime = True
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 #Configurations
blocksPerFile = None
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 blockIndex = None
dataOut = None
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 #Data Arrays
dataList = None
metadataList = None
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 currentDay = None
lastTime = None
last_Azipos = None
last_Elepos = None
mode = None
#-----------------------
Typename = None
mask = False
setChannel = None
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
def __init__(self):
isr commit
r1377
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 Operation.__init__(self)
return
updated with changes in v3.0-devel
r1737 def set_kwargs(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 def set_kwargs_obj(self,obj, **kwargs):
updated with changes in v3.0-devel
r1737
for key, value in kwargs.items():
setattr(obj, key, value)
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None,type_data=None, localtime=True,setChannel=None, **kwargs):
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 self.path = path
self.blocksPerFile = blocksPerFile
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 self.metadataList = metadataList
self.dataList = [s.strip() for s in dataList]
self.setChannel = setChannel
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 self.setType = setType
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 if self.setType == "weather":
self.set_kwargs(**kwargs)
self.set_kwargs_obj(self.dataOut,**kwargs)
self.weather_vars = {
'S' : 0,
'V' : 1,
'W' : 2,
'SNR' : 3,
'Z' : 4,
'D' : 5,
'P' : 6,
'R' : 7,
}
if localtime:
self.getDateTime = datetime.datetime.fromtimestamp
else:
self.getDateTime = datetime.datetime.utcfromtimestamp
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 self.description = description
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 self.type_data=type_data
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326
HDFWriter create metadata if not given, update setup files
r1339 if self.metadataList is None:
self.metadataList = self.dataOut.metadata_list
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
dsList = []
for i in range(len(self.dataList)):
dsDict = {}
HDFWriter create metadata if not given, update setup files
r1339 if hasattr(self.dataOut, self.dataList[i]):
dataAux = getattr(self.dataOut, self.dataList[i])
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 if self.setType == 'weather' and self.dataList[i] == 'data_param':
if self.setChannel is None:
dataAux = dataAux[:,self.weather_vars[self.weather_var],:]
else:
dataAux = dataAux[self.setChannel,self.weather_vars[self.weather_var],:]
dataAux = numpy.reshape(dataAux,(1,dataAux.shape[0],dataAux.shape[1]))
HDFWriter create metadata if not given, update setup files
r1339 dsDict['variable'] = self.dataList[i]
else:
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 log.warning('Attribute {} not found in dataOut'.format(self.dataList[i]), self.name)
HDFWriter create metadata if not given, update setup files
r1339 continue
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
if dataAux is None:
continue
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float_)):
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 dsDict['nDim'] = 0
else:
dsDict['nDim'] = len(dataAux.shape)
dsDict['shape'] = dataAux.shape
dsDict['dsNumber'] = dataAux.shape[0]
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 dsDict['dtype'] = dataAux.dtype
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 dsList.append(dsDict)
self.dsList = dsList
self.currentDay = self.dataOut.datatime.date()
def timeFlag(self):
currentTime = self.dataOut.utctime
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 dt = self.getDateTime(currentTime)
dataDay = int(dt.strftime('%j'))
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
if self.lastTime is None:
self.lastTime = currentTime
self.currentDay = dataDay
return False
isr commit
r1377
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 timeDiff = currentTime - self.lastTime
#Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
if dataDay != self.currentDay:
self.currentDay = dataDay
return True
elif timeDiff > 3*60*60:
self.lastTime = currentTime
return True
else:
self.lastTime = currentTime
return False
HDFWriter create metadata if not given, update setup files
r1339 def run(self, dataOut, path, blocksPerFile=10, metadataList=None,
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 dataList=[], setType=None, description={}, mode= None,
type_data=None, Reset = False, localtime=True, **kwargs):
if Reset:
self.isConfig = False
self.closeFile()
self.lastTime = None
self.blockIndex = 0
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
self.dataOut = dataOut
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 self.mode = mode
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 if not(self.isConfig):
isr commit
r1377 self.setup(path=path, blocksPerFile=blocksPerFile,
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 metadataList=metadataList, dataList=dataList,
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 setType=setType, description=description,type_data=type_data,
localtime=localtime, **kwargs)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
self.isConfig = True
self.setNextFile()
self.putData()
return
isr commit
r1377
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 def setNextFile(self):
isr commit
r1377
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 ext = self.ext
path = self.path
setFile = self.setFile
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 dt = self.getDateTime(self.dataOut.utctime)
if self.setType == 'weather':
subfolder = dt.strftime('%Y-%m-%dT%H-00-00')
subfolder = ''
else:
subfolder = dt.strftime('d%Y%j')
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 fullpath = os.path.join(path, subfolder)
if os.path.exists(fullpath):
filesList = os.listdir(fullpath)
filesList = [k for k in filesList if k.startswith(self.optchar)]
if len( filesList ) > 0:
filesList = sorted(filesList, key=str.lower)
filen = filesList[-1]
# el filename debera tener el siguiente formato
# 0 1234 567 89A BCDE (hex)
# x YYYY DDD SSS .ext
if isNumber(filen[8:11]):
setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
else:
setFile = -1
else:
setFile = -1 #inicializo mi contador de seteo
else:
os.makedirs(fullpath)
setFile = -1 #inicializo mi contador de seteo
if self.setType is None:
setFile += 1
file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 dt.year,
int(dt.strftime('%j')),
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 setFile,
ext )
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 elif self.setType == "weather":
#SOPHY_20200505_140215_E10.0_Z.h5
#SOPHY_20200505_140215_A40.0_Z.h5
if self.dataOut.flagMode == 1: #'AZI' #PPI
ang_type = 'EL'
mode_type = 'PPI'
len_aux = int(self.dataOut.data_ele.shape[0]/4)
mean = numpy.mean(self.dataOut.data_ele[len_aux:-len_aux])
ang_ = round(mean,1)
elif self.dataOut.flagMode == 0: #'ELE' #RHI
ang_type = 'AZ'
mode_type = 'RHI'
len_aux = int(self.dataOut.data_azi.shape[0]/4)
mean = numpy.mean(self.dataOut.data_azi[len_aux:-len_aux])
ang_ = round(mean,1)
file = '%s_%2.2d%2.2d%2.2d_%2.2d%2.2d%2.2d_%s%2.1f_%s%s' % (
'SOPHY',
dt.year,
dt.month,
dt.day,
dt.hour,
dt.minute,
dt.second,
ang_type[0],
ang_,
self.weather_var,
ext )
subfolder = '{}_{}_{}_{:2.1f}'.format(self.weather_var, mode_type, ang_type, ang_)
fullpath = os.path.join(path, subfolder)
if not os.path.exists(fullpath):
os.makedirs(fullpath)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 else:
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 setFile = dt.hour*60+dt.minute
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 dt.year,
int(dt.strftime('%j')),
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 setFile,
ext )
self.filename = os.path.join( path, subfolder, file )
self.fp = h5py.File(self.filename, 'w')
#write metadata
self.writeMetadata(self.fp)
#Write data
self.writeData(self.fp)
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 def getLabel(self, name, x=None):
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 if x is None:
if 'Data' in self.description:
data = self.description['Data']
if 'Metadata' in self.description:
data.update(self.description['Metadata'])
else:
data = self.description
if name in data:
if isinstance(data[name], str):
return data[name]
elif isinstance(data[name], list):
return None
elif isinstance(data[name], dict):
for key, value in data[name].items():
return key
return name
else:
updated with changes in v3.0-devel
r1737 if 'Data' in self.description:
data = self.description['Data']
if 'Metadata' in self.description:
data.update(self.description['Metadata'])
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 else:
updated with changes in v3.0-devel
r1737 data = self.description
if name in data:
if isinstance(data[name], list):
return data[name][x]
elif isinstance(data[name], dict):
for key, value in data[name].items():
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 return value[x]
HDFWriter create metadata if not given, update setup files
r1339 if 'cspc' in name:
return 'pair{:02d}'.format(x)
else:
return 'channel{:02d}'.format(x)
isr commit
r1377
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 def writeMetadata(self, fp):
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 if self.description:
if 'Metadata' in self.description:
grp = fp.create_group('Metadata')
else:
grp = fp
else:
grp = fp.create_group('Metadata')
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
for i in range(len(self.metadataList)):
if not hasattr(self.dataOut, self.metadataList[i]):
log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
continue
value = getattr(self.dataOut, self.metadataList[i])
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 if isinstance(value, bool):
if value is True:
value = 1
else:
value = 0
grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 return
def writeData(self, fp):
isr commit
r1377
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 if self.description:
if 'Data' in self.description:
grp = fp.create_group('Data')
else:
grp = fp
else:
grp = fp.create_group('Data')
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 dtsets = []
data = []
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 for dsInfo in self.dsList:
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 if dsInfo['nDim'] == 0:
ds = grp.create_dataset(
isr commit
r1377 self.getLabel(dsInfo['variable']),
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 (self.blocksPerFile, ),
isr commit
r1377 chunks=True,
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 dtype=numpy.float64)
dtsets.append(ds)
data.append((dsInfo['variable'], -1))
else:
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 label = self.getLabel(dsInfo['variable'])
if label is not None:
sgrp = grp.create_group(label)
else:
sgrp = grp
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 if self.blocksPerFile == 1:
shape = dsInfo['shape'][1:]
else:
shape = (self.blocksPerFile, ) + dsInfo['shape'][1:]
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 for i in range(dsInfo['dsNumber']):
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 if dsInfo['dsNumber']==1:
if self.setChannel==1:
i=1
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 ds = sgrp.create_dataset(
isr commit
r1377 self.getLabel(dsInfo['variable'], i),
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 shape,
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 chunks=True,
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 dtype=dsInfo['dtype'],
compression='gzip',
)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 dtsets.append(ds)
data.append((dsInfo['variable'], i))
fp.flush()
Juan C. Espinoza
Add input queues for processing units and external operations
r1235 log.log('Creating file: {}'.format(fp.filename), self.name)
isr commit
r1377
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 self.ds = dtsets
self.data = data
self.firsttime = True
self.blockIndex = 0
return
def putData(self):
if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
self.closeFile()
self.setNextFile()
for i, ds in enumerate(self.ds):
attr, ch = self.data[i]
if ch == -1:
ds[self.blockIndex] = getattr(self.dataOut, attr)
else:
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 if self.blocksPerFile == 1:
mask = self.dataOut.data_param[:,3,:][ch] < self.mask
tmp = getattr(self.dataOut, attr)[:,self.weather_vars[self.weather_var],:][ch]
if self.mask:
tmp[mask] = numpy.nan
ds[:] = tmp
else:
ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
self.fp.flush()
self.blockIndex += 1
Juan C. Espinoza
Add input queues for processing units and external operations
r1235 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
return
def closeFile(self):
if self.blockIndex != self.blocksPerFile:
for ds in self.ds:
ds.resize(self.blockIndex, axis=0)
Add update method to plots to pass data (no more changes in jrodata)
r1343 if self.fp:
self.fp.flush()
self.fp.close()
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
def close(self):
fix TxLagRate, add RemoveDC mode, update HDFWriter
r1783 self.closeFile()