##// END OF EJS Templates
minor adjustments 1
minor adjustments 1

File last commit:

r1738:e8767bd42c24
r1739:e8800a48ec81
Show More
jroIO_param.py
815 lines | 26.3 KiB | text/x-python | PythonLexer
Julio Valdez
Changed name from jroIO_HDF5 to jroIO_param
r848 import os
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 import time
Julio Valdez
Changed name from jroIO_HDF5 to jroIO_param
r848 import datetime
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 import numpy
import h5py
Errors handling and gracefully terminate main process
r1241 import schainpy.admin
Julio Valdez
Changed name from jroIO_HDF5 to jroIO_param
r848 from schainpy.model.data.jrodata import *
George Yong
Multiprocessing for writing Units(Spectral, Voltage and Parameters)
r1179 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
Julio Valdez
Changed name from jroIO_HDF5 to jroIO_param
r848 from schainpy.model.io.jroIO_base import *
George Yong
Multiprocessing for writing Units(Spectral, Voltage and Parameters)
r1179 from schainpy.utils import log
Julio Valdez
Changed name from jroIO_HDF5 to jroIO_param
r848
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 class HDFReader(Reader, ProcessingUnit):
"""Processing unit to read HDF5 format files
This unit reads HDF5 files created with `HDFWriter` operation contains
by default two groups Data and Metadata all variables would be saved as `dataOut`
Alexander Valdez
update jroIO_param class HDFWriter
r1701 attributes.
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 It is possible to read any HDF5 file by given the structure in the `description`
parameter, also you can add extra values to metadata with the parameter `extras`.
Parameters:
-----------
path : str
Path where files are located.
startDate : date
Start date of the files
endDate : list
End date of the files
startTime : time
Start time of the files
endTime : time
End time of the files
description : dict, optional
Dictionary with the description of the HDF5 file
extras : dict, optional
Dictionary with extra metadata to be be added to `dataOut`
Alexander Valdez
comentario y recomendacion para usar utcoffset
r1692
Attention: Be carefull, add attribute utcoffset, in the last part of reader in order to work in Local Time without time problems.
-----------
utcoffset='-18000'
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326
Examples
--------
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 desc = {
'Data': {
'data_output': ['u', 'v', 'w'],
'utctime': 'timestamps',
} ,
'Metadata': {
'heightList': 'heights'
}
}
desc = {
'Data': {
'data_output': 'winds',
'utctime': 'timestamps'
},
'Metadata': {
'heightList': 'heights'
}
}
extras = {
'timeZone': 300
}
Alexander Valdez
comentario y recomendacion para usar utcoffset
r1692
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 reader = project.addReadUnit(
name='HDFReader',
path='/path/to/files',
startDate='2019/01/01',
endDate='2019/01/31',
startTime='00:00:00',
endTime='23:59:59',
Alexander Valdez
comentario y recomendacion para usar utcoffset
r1692 utcoffset='-18000'
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 # description=json.dumps(desc),
# extras=json.dumps(extras),
)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 """
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
def __init__(self):
merge with amisr tmp
r1738
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 ProcessingUnit.__init__(self)
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.ext = ".hdf5"
self.optchar = "D"
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 self.meta = {}
self.data = {}
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.open_file = h5py.File
self.open_mode = 'r'
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 self.description = {}
self.extras = {}
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.filefmt = "*%Y%j***"
self.folderfmt = "*%Y%j"
Bug saving plots when throttle, add utcoffset to HDFReader
r1363 self.utcoffset = 0
merge with amisr tmp
r1738 self.flagUpdateDataOut = False
self.dataOut = Parameters()
self.dataOut.error=False ## NOTE: Importante definir esto antes inicio
self.dataOut.flagNoData = True
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
def setup(self, **kwargs):
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.set_kwargs(**kwargs)
if not self.ext.startswith('.'):
Alexander Valdez
update jroIO_param class HDFWriter
r1701 self.ext = '.{}'.format(self.ext)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 if self.online:
log.log("Searching files in online mode...", self.name)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 for nTries in range(self.nTries):
fullpath = self.searchFilesOnLine(self.path, self.startDate,
Alexander Valdez
update jroIO_param class HDFWriter
r1701 self.endDate, self.expLabel, self.ext, self.walk,
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.filefmt, self.folderfmt)
merge with amisr tmp
r1738 pathname, filename = os.path.split(fullpath)
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 try:
fullpath = next(fullpath)
except:
fullpath = None
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 if fullpath:
break
log.warning(
'Waiting {} sec for a valid file in {}: try {} ...'.format(
Alexander Valdez
update jroIO_param class HDFWriter
r1701 self.delay, self.path, nTries + 1),
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.name)
time.sleep(self.delay)
if not(fullpath):
raise schainpy.admin.SchainError(
Alexander Valdez
update jroIO_param class HDFWriter
r1701 'There isn\'t any valid file in {}'.format(self.path))
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254
pathname, filename = os.path.split(fullpath)
self.year = int(filename[1:5])
self.doy = int(filename[5:8])
Alexander Valdez
update jroIO_param class HDFWriter
r1701 self.set = int(filename[8:11]) - 1
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 else:
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 log.log("Searching files in {}".format(self.path), self.name)
Alexander Valdez
update jroIO_param class HDFWriter
r1701 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.setNextFile()
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 return
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
merge with amisr tmp
r1738 # def readFirstHeader(self):
# '''Read metadata and data'''
# self.__readMetadata()
# self.__readData()
# self.__setBlockList()
# if 'type' in self.meta:
# self.dataOut = eval(self.meta['type'])()
# for attr in self.meta:
# setattr(self.dataOut, attr, self.meta[attr])
# self.blockIndex = 0
# return
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 def readFirstHeader(self):
'''Read metadata and data'''
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
merge with amisr tmp
r1738 self.__readMetadata2()
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 self.__readData()
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.__setBlockList()
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 if 'type' in self.meta:
self.dataOut = eval(self.meta['type'])()
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 for attr in self.meta:
merge with amisr tmp
r1738 if "processingHeaderObj" in attr:
self.flagUpdateDataOut=True
at = attr.split('.')
if len(at) > 1:
setattr(eval("self.dataOut."+at[0]),at[1], self.meta[attr])
else:
setattr(self.dataOut, attr, self.meta[attr])
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 self.blockIndex = 0
Alexander Valdez
update jroIO_param class HDFWriter
r1701
merge with amisr tmp
r1738 if self.flagUpdateDataOut:
self.updateDataOut()
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 return
merge with amisr tmp
r1738
def updateDataOut(self):
self.dataOut.azimuthList = self.dataOut.processingHeaderObj.azimuthList
self.dataOut.elevationList = self.dataOut.processingHeaderObj.elevationList
self.dataOut.heightList = self.dataOut.processingHeaderObj.heightList
self.dataOut.ippSeconds = self.dataOut.processingHeaderObj.ipp
self.dataOut.elevationList = self.dataOut.processingHeaderObj.elevationList
self.dataOut.channelList = self.dataOut.processingHeaderObj.channelList
self.dataOut.nCohInt = self.dataOut.processingHeaderObj.nCohInt
self.dataOut.nFFTPoints = self.dataOut.processingHeaderObj.nFFTPoints
self.flagUpdateDataOut = False
self.dataOut.frequency = self.dataOut.radarControllerHeaderObj.frequency
#self.dataOut.heightList = self.dataOut.processingHeaderObj.heightList
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
def __setBlockList(self):
'''
Selects the data within the times defined
self.fp
self.startTime
self.endTime
self.blockList
self.blocksPerFile
'''
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 startTime = self.startTime
endTime = self.endTime
Bug saving plots when throttle, add utcoffset to HDFReader
r1363 thisUtcTime = self.data['utctime'] + self.utcoffset
merge with amisr tmp
r1738 # self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
merge with amisr tmp
r1738 self.startFileDatetime = thisDatetime
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 thisDate = thisDatetime.date()
thisTime = thisDatetime.time()
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
self.blockList = ind
self.blocksPerFile = len(ind)
merge with amisr tmp
r1738 # self.blocksPerFile = len(thisUtcTime)
Alexander Valdez
fix error of selection time in HDFReader
r1691 if len(ind)==0:
print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.blockIndex,
self.blocksPerFile,
thisDatetime))
self.setNextFile()
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 return
def __readMetadata(self):
'''
Reads Metadata
'''
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 meta = {}
if self.description:
for key, value in self.description['Metadata'].items():
Fix h5py Dataset value attribute deprecation
r1360 meta[key] = self.fp[value][()]
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 else:
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 grp = self.fp['Metadata']
for name in grp:
Fix h5py Dataset value attribute deprecation
r1360 meta[name] = grp[name][()]
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 if self.extras:
for key, value in self.extras.items():
meta[key] = value
self.meta = meta
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
return
merge with amisr tmp
r1738
def __readMetadata2(self):
'''
Reads Metadata
'''
meta = {}
if self.description:
for key, value in self.description['Metadata'].items():
meta[key] = self.fp[value][()]
else:
grp = self.fp['Metadata']
for item in grp.values():
name = item.name
if isinstance(item, h5py.Dataset):
name = name.split("/")[-1]
meta[name] = item[()]
else:
grp2 = self.fp[name]
Obj = name.split("/")[-1]
for item2 in grp2.values():
name2 = Obj+"."+item2.name.split("/")[-1]
meta[name2] = item2[()]
if self.extras:
for key, value in self.extras.items():
meta[key] = value
self.meta = meta
return
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
def __readData(self):
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 data = {}
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 if self.description:
for key, value in self.description['Data'].items():
if isinstance(value, str):
if isinstance(self.fp[value], h5py.Dataset):
Fix h5py Dataset value attribute deprecation
r1360 data[key] = self.fp[value][()]
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 elif isinstance(self.fp[value], h5py.Group):
array = []
for ch in self.fp[value]:
Fix h5py Dataset value attribute deprecation
r1360 array.append(self.fp[value][ch][()])
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 data[key] = numpy.array(array)
elif isinstance(value, list):
array = []
for ch in value:
Fix h5py Dataset value attribute deprecation
r1360 array.append(self.fp[ch][()])
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 data[key] = numpy.array(array)
else:
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 grp = self.fp['Data']
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 for name in grp:
if isinstance(grp[name], h5py.Dataset):
Fix h5py Dataset value attribute deprecation
r1360 array = grp[name][()]
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 elif isinstance(grp[name], h5py.Group):
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 array = []
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 for ch in grp[name]:
Fix h5py Dataset value attribute deprecation
r1360 array.append(grp[name][ch][()])
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 array = numpy.array(array)
else:
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 log.warning('Unknown type: {}'.format(name))
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 if name in self.description:
key = self.description[name]
else:
key = name
data[key] = array
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 self.data = data
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 return
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 def getData(self):
merge with amisr tmp
r1738 if not self.isDateTimeInRange(self.startFileDatetime, self.startDate, self.endDate, self.startTime, self.endTime):
self.dataOut.flagNoData = True
self.blockIndex = self.blocksPerFile
self.dataOut.error = True # TERMINA EL PROGRAMA
return
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 for attr in self.data:
merge with amisr tmp
r1738
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 if self.data[attr].ndim == 1:
setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 else:
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
merge with amisr tmp
r1738
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 self.blockIndex += 1
merge with amisr tmp
r1738 if self.blockIndex == 1:
log.log("Block No. {}/{} -> {}".format(
self.blockIndex,
self.blocksPerFile,
self.dataOut.datatime.ctime()), self.name)
else:
log.log("Block No. {}/{} ".format(
self.blockIndex,
self.blocksPerFile),self.name)
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326
merge with amisr tmp
r1738 if self.blockIndex == self.blocksPerFile:
self.setNextFile()
self.dataOut.flagNoData = False
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
merge with amisr tmp
r1738 return
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 def run(self, **kwargs):
if not(self.isConfig):
self.setup(**kwargs)
self.isConfig = True
if self.blockIndex == self.blocksPerFile:
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.setNextFile()
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
self.getData()
return
@MPDecorator
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 class HDFWriter(Operation):
"""Operation to write HDF5 files.
The HDF5 file contains by default two groups Data and Metadata where
you can save any `dataOut` attribute specified by `dataList` and `metadataList`
parameters, data attributes are normaly time dependent where the metadata
Alexander Valdez
update jroIO_param class HDFWriter
r1701 are not.
It is possible to customize the structure of the HDF5 file with the
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 optional description parameter see the examples.
Parameters:
-----------
path : str
Path where files will be saved.
blocksPerFile : int
Number of blocks per file
metadataList : list
List of the dataOut attributes that will be saved as metadata
dataList : int
List of the dataOut attributes that will be saved as data
setType : bool
If True the name of the files corresponds to the timestamp of the data
description : dict, optional
Dictionary with the desired description of the HDF5 file
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 Examples
--------
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 desc = {
'data_output': {'winds': ['z', 'w', 'v']},
'utctime': 'timestamps',
'heightList': 'heights'
}
desc = {
'data_output': ['z', 'w', 'v'],
'utctime': 'timestamps',
'heightList': 'heights'
}
desc = {
'Data': {
'data_output': 'winds',
'utctime': 'timestamps'
},
'Metadata': {
'heightList': 'heights'
}
}
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 writer = proc_unit.addOperation(name='HDFWriter')
writer.addParameter(name='path', value='/path/to/file')
writer.addParameter(name='blocksPerFile', value='32')
writer.addParameter(name='metadataList', value='heightList,timeZone')
writer.addParameter(name='dataList',value='data_output,utctime')
# writer.addParameter(name='description',value=json.dumps(desc))
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 """
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
ext = ".hdf5"
optchar = "D"
filename = None
path = None
setFile = None
fp = None
firsttime = True
#Configurations
blocksPerFile = None
blockIndex = None
merge with amisr tmp
r1738 dataOut = None #eval ??????
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 #Data Arrays
dataList = None
metadataList = None
currentDay = None
lastTime = None
merge with amisr tmp
r1738 timeZone = "ut"
hourLimit = 3
breakDays = True
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
def __init__(self):
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 Operation.__init__(self)
return
Alexander Valdez
update jroIO_param class HDFWriter
r1701 def set_kwargs(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def set_kwargs_obj(self, obj, **kwargs):
for key, value in kwargs.items():
setattr(obj, key, value)
merge with amisr tmp
r1738 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None,
description={},timeZone = "ut",hourLimit = 3, breakDays=True, **kwargs):
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 self.path = path
self.blocksPerFile = blocksPerFile
self.metadataList = metadataList
HDFWriter create metadata if not given, update setup files
r1339 self.dataList = [s.strip() for s in dataList]
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 self.setType = setType
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 self.description = description
merge with amisr tmp
r1738 self.timeZone = timeZone
self.hourLimit = hourLimit
self.breakDays = breakDays
Alexander Valdez
update jroIO_param class HDFWriter
r1701 self.set_kwargs(**kwargs)
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326
HDFWriter create metadata if not given, update setup files
r1339 if self.metadataList is None:
self.metadataList = self.dataOut.metadata_list
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
tableList = []
dsList = []
for i in range(len(self.dataList)):
dsDict = {}
HDFWriter create metadata if not given, update setup files
r1339 if hasattr(self.dataOut, self.dataList[i]):
dataAux = getattr(self.dataOut, self.dataList[i])
dsDict['variable'] = self.dataList[i]
else:
merge with amisr tmp
r1738 log.warning('Attribute {} not found in dataOut'.format(self.dataList[i]),self.name)
HDFWriter create metadata if not given, update setup files
r1339 continue
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
if dataAux is None:
continue
elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
dsDict['nDim'] = 0
else:
dsDict['nDim'] = len(dataAux.shape)
dsDict['shape'] = dataAux.shape
dsDict['dsNumber'] = dataAux.shape[0]
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 dsDict['dtype'] = dataAux.dtype
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 dsList.append(dsDict)
self.dsList = dsList
self.currentDay = self.dataOut.datatime.date()
def timeFlag(self):
currentTime = self.dataOut.utctime
merge with amisr tmp
r1738 timeTuple = None
if self.timeZone == "lt":
timeTuple = time.localtime(currentTime)
else :
timeTuple = time.gmtime(currentTime)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 dataDay = timeTuple.tm_yday
if self.lastTime is None:
self.lastTime = currentTime
self.currentDay = dataDay
return False
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 timeDiff = currentTime - self.lastTime
merge with amisr tmp
r1738 # Si el dia es diferente o si la diferencia entre un
# dato y otro supera self.hourLimit
if (dataDay != self.currentDay) and self.breakDays:
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 self.currentDay = dataDay
return True
merge with amisr tmp
r1738 elif timeDiff > self.hourLimit*60*60:
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 self.lastTime = currentTime
return True
else:
self.lastTime = currentTime
return False
HDFWriter create metadata if not given, update setup files
r1339 def run(self, dataOut, path, blocksPerFile=10, metadataList=None,
Alexander Valdez
update jroIO_param class HDFWriter
r1701 dataList=[], setType=None, description={}, **kwargs):
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
self.dataOut = dataOut
Alexander Valdez
update jroIO_param class HDFWriter
r1701 self.set_kwargs_obj(self.dataOut, **kwargs)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 if not(self.isConfig):
Alexander Valdez
update jroIO_param class HDFWriter
r1701 self.setup(path=path, blocksPerFile=blocksPerFile,
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 metadataList=metadataList, dataList=dataList,
Alexander Valdez
update jroIO_param class HDFWriter
r1701 setType=setType, description=description, **kwargs)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
self.isConfig = True
self.setNextFile()
self.putData()
return
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 def setNextFile(self):
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 ext = self.ext
path = self.path
setFile = self.setFile
merge with amisr tmp
r1738 timeTuple = None
if self.timeZone == "lt":
timeTuple = time.localtime(self.dataOut.utctime)
elif self.timeZone == "ut":
timeTuple = time.gmtime(self.dataOut.utctime)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
fullpath = os.path.join(path, subfolder)
if os.path.exists(fullpath):
filesList = os.listdir(fullpath)
filesList = [k for k in filesList if k.startswith(self.optchar)]
Alexander Valdez
update jroIO_param class HDFWriter
r1701 if len(filesList) > 0:
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 filesList = sorted(filesList, key=str.lower)
filen = filesList[-1]
# el filename debera tener el siguiente formato
# 0 1234 567 89A BCDE (hex)
# x YYYY DDD SSS .ext
if isNumber(filen[8:11]):
setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
else:
setFile = -1
else:
setFile = -1 #inicializo mi contador de seteo
else:
os.makedirs(fullpath)
setFile = -1 #inicializo mi contador de seteo
if self.setType is None:
setFile += 1
file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
timeTuple.tm_year,
timeTuple.tm_yday,
setFile,
Alexander Valdez
update jroIO_param class HDFWriter
r1701 ext)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 else:
setFile = timeTuple.tm_hour*60+timeTuple.tm_min
file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
timeTuple.tm_year,
timeTuple.tm_yday,
setFile,
Alexander Valdez
update jroIO_param class HDFWriter
r1701 ext)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Alexander Valdez
update jroIO_param class HDFWriter
r1701 self.filename = os.path.join(path, subfolder, file)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
merge with amisr tmp
r1738
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 def getLabel(self, name, x=None):
if x is None:
if 'Data' in self.description:
data = self.description['Data']
if 'Metadata' in self.description:
data.update(self.description['Metadata'])
else:
data = self.description
if name in data:
if isinstance(data[name], str):
return data[name]
elif isinstance(data[name], list):
return None
elif isinstance(data[name], dict):
for key, value in data[name].items():
return key
return name
else:
if 'Metadata' in self.description:
meta = self.description['Metadata']
else:
meta = self.description
if name in meta:
if isinstance(meta[name], list):
return meta[name][x]
elif isinstance(meta[name], dict):
for key, value in meta[name].items():
return value[x]
HDFWriter create metadata if not given, update setup files
r1339 if 'cspc' in name:
return 'pair{:02d}'.format(x)
else:
return 'channel{:02d}'.format(x)
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 def writeMetadata(self, fp):
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 if self.description:
if 'Metadata' in self.description:
grp = fp.create_group('Metadata')
else:
grp = fp
else:
grp = fp.create_group('Metadata')
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
for i in range(len(self.metadataList)):
if not hasattr(self.dataOut, self.metadataList[i]):
log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
continue
value = getattr(self.dataOut, self.metadataList[i])
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 if isinstance(value, bool):
if value is True:
value = 1
else:
value = 0
grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 return
merge with amisr tmp
r1738
def writeMetadata2(self, fp):
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
merge with amisr tmp
r1738 if self.description:
if 'Metadata' in self.description:
grp = fp.create_group('Metadata')
else:
grp = fp
else:
grp = fp.create_group('Metadata')
for i in range(len(self.metadataList)):
attribute = self.metadataList[i]
attr = attribute.split('.')
if len(attr) > 1:
if not hasattr(eval("self.dataOut."+attr[0]),attr[1]):
log.warning('Metadata: {}.{} not found'.format(attr[0],attr[1]), self.name)
continue
value = getattr(eval("self.dataOut."+attr[0]),attr[1])
if isinstance(value, bool):
if value is True:
value = 1
else:
value = 0
if isinstance(value,type(None)):
log.warning("Invalid value detected, {} is None".format(attribute), self.name)
value = 0
grp2 = None
if not 'Metadata/'+attr[0] in fp:
grp2 = fp.create_group('Metadata/'+attr[0])
else:
grp2 = fp['Metadata/'+attr[0]]
grp2.create_dataset(attr[1], data=value)
else:
if not hasattr(self.dataOut, attr[0] ):
log.warning('Metadata: `{}` not found'.format(attribute), self.name)
continue
value = getattr(self.dataOut, attr[0])
if isinstance(value, bool):
if value is True:
value = 1
else:
value = 0
if isinstance(value, type(None)):
log.error("Value {} is None".format(attribute),self.name)
grp.create_dataset(self.getLabel(attribute), data=value)
return
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 def writeData(self, fp):
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 if self.description:
if 'Data' in self.description:
grp = fp.create_group('Data')
else:
grp = fp
else:
grp = fp.create_group('Data')
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 dtsets = []
data = []
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 for dsInfo in self.dsList:
if dsInfo['nDim'] == 0:
ds = grp.create_dataset(
Alexander Valdez
update jroIO_param class HDFWriter
r1701 self.getLabel(dsInfo['variable']),
(self.blocksPerFile,),
chunks=True,
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 dtype=numpy.float64)
dtsets.append(ds)
data.append((dsInfo['variable'], -1))
else:
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 label = self.getLabel(dsInfo['variable'])
if label is not None:
sgrp = grp.create_group(label)
else:
sgrp = grp
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 for i in range(dsInfo['dsNumber']):
ds = sgrp.create_dataset(
Alexander Valdez
update jroIO_param class HDFWriter
r1701 self.getLabel(dsInfo['variable'], i),
(self.blocksPerFile,) + dsInfo['shape'][1:],
Juan C. Espinoza
Rewrite Param reader/writer as HDF reader/writer
r1326 chunks=True,
dtype=dsInfo['dtype'])
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 dtsets.append(ds)
data.append((dsInfo['variable'], i))
fp.flush()
Juan C. Espinoza
Add input queues for processing units and external operations
r1235 log.log('Creating file: {}'.format(fp.filename), self.name)
Alexander Valdez
update jroIO_param class HDFWriter
r1701
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232 self.ds = dtsets
self.data = data
self.firsttime = True
self.blockIndex = 0
return
def putData(self):
if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
self.closeFile()
self.setNextFile()
merge with amisr tmp
r1738 self.dataOut.flagNoData = False
self.blockIndex = 0
if self.blockIndex == 0:
#Setting HDF5 File
self.fp = h5py.File(self.filename, 'w')
#write metadata
self.writeMetadata2(self.fp)
#Write data
self.writeData(self.fp)
log.log('Block No. {}/{} --> {}'.format(self.blockIndex+1, self.blocksPerFile,self.dataOut.datatime.ctime()), self.name)
elif (self.blockIndex % 10 ==0):
log.log('Block No. {}/{} --> {}'.format(self.blockIndex+1, self.blocksPerFile,self.dataOut.datatime.ctime()), self.name)
else:
log.log('Block No. {}/{}'.format(self.blockIndex+1, self.blocksPerFile), self.name)
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
for i, ds in enumerate(self.ds):
attr, ch = self.data[i]
if ch == -1:
ds[self.blockIndex] = getattr(self.dataOut, attr)
else:
ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
self.blockIndex += 1
merge with amisr tmp
r1738 self.fp.flush()
self.dataOut.flagNoData = True
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
def closeFile(self):
if self.blockIndex != self.blocksPerFile:
for ds in self.ds:
ds.resize(self.blockIndex, axis=0)
Add update method to plots to pass data (no more changes in jrodata)
r1343 if self.fp:
self.fp.flush()
self.fp.close()
Juan C. Espinoza
ParameterReader unit and ParameterWriter operation added
r1232
def close(self):
self.closeFile()