##// END OF EJS Templates
Fix Madrigal HDF5 reader (v2 & v3)
Fix Madrigal HDF5 reader (v2 & v3)

File last commit:

r1208:05917071ce48
r1208:05917071ce48
Show More
jroIO_madrigal.py
636 lines | 21.0 KiB | text/x-python | PythonLexer
'''
Created on Aug 1, 2017
@author: Juan C. Espinoza
'''
import os
import sys
import time
import json
import glob
import datetime
import numpy
import h5py
from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
from schainpy.model.data.jrodata import Parameters
from schainpy.utils import log
try:
import madrigal.cedar
except:
log.warning(
'You should install "madrigal library" module if you want to read/write Madrigal data'
)
try:
basestring
except:
basestring = str
DEF_CATALOG = {
'principleInvestigator': 'Marco Milla',
'expPurpose': '',
'cycleTime': '',
'correlativeExp': '',
'sciRemarks': '',
'instRemarks': ''
}
DEF_HEADER = {
'kindatDesc': '',
'analyst': 'Jicamarca User',
'comments': '',
'history': ''
}
MNEMONICS = {
10: 'jro',
11: 'jbr',
840: 'jul',
13: 'jas',
1000: 'pbr',
1001: 'hbr',
1002: 'obr',
400: 'clr'
}
UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
def load_json(obj):
'''
Parse json as string instead of unicode
'''
if isinstance(obj, str):
iterable = json.loads(obj)
else:
iterable = obj
if isinstance(iterable, dict):
return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, basestring) else v
for k, v in list(iterable.items())}
elif isinstance(iterable, (list, tuple)):
return [str(v) if isinstance(v, basestring) else v for v in iterable]
return iterable
@MPDecorator
class MADReader(JRODataReader, ProcessingUnit):
def __init__(self):
ProcessingUnit.__init__(self)
self.dataOut = Parameters()
self.counter_records = 0
self.nrecords = None
self.flagNoMoreFiles = 0
self.isConfig = False
self.filename = None
self.intervals = set()
def setup(self,
path=None,
startDate=None,
endDate=None,
format=None,
startTime=datetime.time(0, 0, 0),
endTime=datetime.time(23, 59, 59),
**kwargs):
self.path = path
self.startDate = startDate
self.endDate = endDate
self.startTime = startTime
self.endTime = endTime
self.datatime = datetime.datetime(1900,1,1)
self.oneDDict = load_json(kwargs.get('oneDDict',
"{\"GDLATR\":\"lat\", \"GDLONR\":\"lon\"}"))
self.twoDDict = load_json(kwargs.get('twoDDict',
"{\"GDALT\": \"heightList\"}"))
self.independentParam = 'GDALT'
if self.path is None:
raise ValueError('The path is not valid')
if format is None:
raise ValueError('The format is not valid choose simple or hdf5')
elif format.lower() in ('simple', 'txt'):
self.ext = '.txt'
elif format.lower() in ('cedar',):
self.ext = '.001'
else:
self.ext = '.hdf5'
self.search_files(self.path)
self.fileId = 0
if not self.fileList:
raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path))
self.setNextFile()
def search_files(self, path):
'''
Searching for madrigal files in path
Creating a list of files to procces included in [startDate,endDate]
Input:
path - Path to find files
'''
log.log('Searching files {} in {} '.format(self.ext, path), 'MADReader')
fileList0 = glob.glob1(path, '*{}'.format(self.ext))
fileList0.sort()
self.fileList = []
self.dateFileList = []
startDate = self.startDate - datetime.timedelta(1)
endDate = self.endDate + datetime.timedelta(1)
for thisFile in fileList0:
year = thisFile[3:7]
if not year.isdigit():
continue
month = thisFile[7:9]
if not month.isdigit():
continue
day = thisFile[9:11]
if not day.isdigit():
continue
year, month, day = int(year), int(month), int(day)
dateFile = datetime.date(year, month, day)
if (startDate > dateFile) or (endDate < dateFile):
continue
self.fileList.append(thisFile)
self.dateFileList.append(dateFile)
return
def parseHeader(self):
'''
'''
self.output = {}
self.version = '2'
s_parameters = None
if self.ext == '.txt':
self.parameters = [s.strip().lower() for s in self.fp.readline().decode().strip().split(' ') if s]
elif self.ext == '.hdf5':
self.metadata = self.fp['Metadata']
if '_record_layout' in self.metadata:
s_parameters = [s[0].lower().decode() for s in self.metadata['Independent Spatial Parameters']]
self.version = '3'
self.parameters = [s[0].lower().decode() for s in self.metadata['Data Parameters']]
log.success('Parameters found: {}'.format(self.parameters),
'MADReader')
if s_parameters:
log.success('Spatial parameters found: {}'.format(s_parameters),
'MADReader')
for param in list(self.oneDDict.keys()):
if param.lower() not in self.parameters:
log.warning(
'Parameter {} not found will be ignored'.format(
param),
'MADReader')
self.oneDDict.pop(param, None)
for param, value in list(self.twoDDict.items()):
if param.lower() not in self.parameters:
log.warning(
'Parameter {} not found, it will be ignored'.format(
param),
'MADReader')
self.twoDDict.pop(param, None)
continue
if isinstance(value, list):
if value[0] not in self.output:
self.output[value[0]] = []
self.output[value[0]].append([])
def parseData(self):
'''
'''
if self.ext == '.txt':
self.data = numpy.genfromtxt(self.fp, missing_values=('missing'))
self.nrecords = self.data.shape[0]
self.ranges = numpy.unique(self.data[:,self.parameters.index(self.independentParam.lower())])
self.counter_records = 0
elif self.ext == '.hdf5':
self.data = self.fp['Data']
self.ranges = numpy.unique(self.data['Table Layout'][self.independentParam.lower()])
self.times = numpy.unique(self.data['Table Layout']['ut1_unix'])
self.counter_records = int(self.data['Table Layout']['recno'][0])
self.nrecords = int(self.data['Table Layout']['recno'][-1])
def setNextFile(self):
'''
'''
file_id = self.fileId
if file_id == len(self.fileList):
log.success('No more files', 'MADReader')
self.flagNoMoreFiles = 1
return 0
log.success(
'Opening: {}'.format(self.fileList[file_id]),
'MADReader'
)
filename = os.path.join(self.path, self.fileList[file_id])
if self.filename is not None:
self.fp.close()
self.filename = filename
self.filedate = self.dateFileList[file_id]
if self.ext=='.hdf5':
self.fp = h5py.File(self.filename, 'r')
else:
self.fp = open(self.filename, 'rb')
self.parseHeader()
self.parseData()
self.sizeOfFile = os.path.getsize(self.filename)
self.flagIsNewFile = 0
self.fileId += 1
return 1
def readNextBlock(self):
while True:
self.flagDiscontinuousBlock = 0
if self.flagIsNewFile:
if not self.setNextFile():
return 0
self.readBlock()
if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
(self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
log.warning(
'Reading Record No. {}/{} -> {} [Skipping]'.format(
self.counter_records,
self.nrecords,
self.datatime.ctime()),
'MADReader')
continue
break
log.log(
'Reading Record No. {}/{} -> {}'.format(
self.counter_records,
self.nrecords,
self.datatime.ctime()),
'MADReader')
return 1
def readBlock(self):
'''
'''
dum = []
if self.ext == '.txt':
dt = self.data[self.counter_records][:6].astype(int)
if datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5]).date() > self.datatime.date():
self.flagDiscontinuousBlock = 1
self.datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
while True:
dt = self.data[self.counter_records][:6].astype(int)
datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
if datatime == self.datatime:
dum.append(self.data[self.counter_records])
self.counter_records += 1
if self.counter_records == self.nrecords:
self.flagIsNewFile = True
break
continue
self.intervals.add((datatime-self.datatime).seconds)
break
elif self.ext == '.hdf5':
datatime = datetime.datetime.utcfromtimestamp(
self.times[self.counter_records])
dum = self.data['Table Layout'][self.data['Table Layout']['recno']==self.counter_records]
self.intervals.add((datatime-self.datatime).seconds)
if datatime.date()>self.datatime.date():
self.flagDiscontinuousBlock = 1
self.datatime = datatime
self.counter_records += 1
if self.counter_records == self.nrecords:
self.flagIsNewFile = True
self.buffer = numpy.array(dum)
return
def set_output(self):
'''
Storing data from buffer to dataOut object
'''
parameters = [None for __ in self.parameters]
for param, attr in list(self.oneDDict.items()):
x = self.parameters.index(param.lower())
setattr(self.dataOut, attr, self.buffer[0][x])
for param, value in list(self.twoDDict.items()):
dummy = numpy.zeros(self.ranges.shape) + numpy.nan
if self.ext == '.txt':
x = self.parameters.index(param.lower())
y = self.parameters.index(self.independentParam.lower())
ranges = self.buffer[:,y]
#if self.ranges.size == ranges.size:
# continue
index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
dummy[index] = self.buffer[:,x]
else:
ranges = self.buffer[self.independentParam.lower()]
index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
dummy[index] = self.buffer[param.lower()]
if isinstance(value, str):
if value not in self.independentParam:
setattr(self.dataOut, value, dummy.reshape(1,-1))
elif isinstance(value, list):
self.output[value[0]][value[1]] = dummy
parameters[value[1]] = param
for key, value in list(self.output.items()):
setattr(self.dataOut, key, numpy.array(value))
self.dataOut.parameters = [s for s in parameters if s]
self.dataOut.heightList = self.ranges
self.dataOut.utctime = (self.datatime - datetime.datetime(1970, 1, 1)).total_seconds()
self.dataOut.utctimeInit = self.dataOut.utctime
self.dataOut.paramInterval = min(self.intervals)
self.dataOut.useLocalTime = False
self.dataOut.flagNoData = False
self.dataOut.nrecords = self.nrecords
self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
def getData(self):
'''
Storing data from databuffer to dataOut object
'''
if self.flagNoMoreFiles:
self.dataOut.flagNoData = True
self.dataOut.error = 'No file left to process'
return 0
if not self.readNextBlock():
self.dataOut.flagNoData = True
return 0
self.set_output()
return 1
@MPDecorator
class MADWriter(Operation):
missing = -32767
def __init__(self):
Operation.__init__(self)
self.dataOut = Parameters()
self.counter = 0
self.path = None
self.fp = None
def run(self, dataOut, path, oneDDict, independentParam='[]', twoDDict='{}',
metadata='{}', format='cedar', **kwargs):
'''
Inputs:
path - path where files will be created
oneDDict - json of one-dimensional parameters in record where keys
are Madrigal codes (integers or mnemonics) and values the corresponding
dataOut attribute e.g: {
'gdlatr': 'lat',
'gdlonr': 'lon',
'gdlat2':'lat',
'glon2':'lon'}
independentParam - list of independent spatial two-dimensional parameters e.g:
['heigthList']
twoDDict - json of two-dimensional parameters in record where keys
are Madrigal codes (integers or mnemonics) and values the corresponding
dataOut attribute if multidimensional array specify as tupple
('attr', pos) e.g: {
'gdalt': 'heightList',
'vn1p2': ('data_output', 0),
'vn2p2': ('data_output', 1),
'vn3': ('data_output', 2),
'snl': ('data_SNR', 'db')
}
metadata - json of madrigal metadata (kinst, kindat, catalog and header)
'''
if not self.isConfig:
self.setup(path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs)
self.isConfig = True
self.dataOut = dataOut
self.putData()
return 1
def setup(self, path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs):
'''
Configure Operation
'''
self.path = path
self.blocks = kwargs.get('blocks', None)
self.counter = 0
self.oneDDict = load_json(oneDDict)
self.twoDDict = load_json(twoDDict)
self.independentParam = load_json(independentParam)
meta = load_json(metadata)
self.kinst = meta.get('kinst')
self.kindat = meta.get('kindat')
self.catalog = meta.get('catalog', DEF_CATALOG)
self.header = meta.get('header', DEF_HEADER)
if format == 'cedar':
self.ext = '.dat'
self.extra_args = {}
elif format == 'hdf5':
self.ext = '.hdf5'
self.extra_args = {'independentParam': self.independentParam}
self.keys = [k.lower() for k in self.twoDDict]
if 'range' in self.keys:
self.keys.remove('range')
if 'gdalt' in self.keys:
self.keys.remove('gdalt')
def setFile(self):
'''
Create new cedar file object
'''
self.mnemonic = MNEMONICS[self.kinst] #TODO get mnemonic from madrigal
date = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
filename = '{}{}{}'.format(self.mnemonic,
date.strftime('%Y%m%d_%H%M%S'),
self.ext)
self.fullname = os.path.join(self.path, filename)
if os.path.isfile(self.fullname) :
log.warning(
'Destination file {} already exists, previous file deleted.'.format(
self.fullname),
'MADWriter')
os.remove(self.fullname)
try:
log.success(
'Creating file: {}'.format(self.fullname),
'MADWriter')
self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
except ValueError as e:
log.error(
'Impossible to create a cedar object with "madrigal.cedar.MadrigalCedarFile"',
'MADWriter')
return
return 1
def writeBlock(self):
'''
Add data records to cedar file taking data from oneDDict and twoDDict
attributes.
Allowed parameters in: parcodes.tab
'''
startTime = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
endTime = startTime + datetime.timedelta(seconds=self.dataOut.paramInterval)
heights = self.dataOut.heightList
if self.ext == '.dat':
for key, value in list(self.twoDDict.items()):
if isinstance(value, str):
data = getattr(self.dataOut, value)
invalid = numpy.isnan(data)
data[invalid] = self.missing
elif isinstance(value, (tuple, list)):
attr, key = value
data = getattr(self.dataOut, attr)
invalid = numpy.isnan(data)
data[invalid] = self.missing
out = {}
for key, value in list(self.twoDDict.items()):
key = key.lower()
if isinstance(value, str):
if 'db' in value.lower():
tmp = getattr(self.dataOut, value.replace('_db', ''))
SNRavg = numpy.average(tmp, axis=0)
tmp = 10*numpy.log10(SNRavg)
else:
tmp = getattr(self.dataOut, value)
out[key] = tmp.flatten()
elif isinstance(value, (tuple, list)):
attr, x = value
data = getattr(self.dataOut, attr)
out[key] = data[int(x)]
a = numpy.array([out[k] for k in self.keys])
nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
index = numpy.where(nrows == False)[0]
rec = madrigal.cedar.MadrigalDataRecord(
self.kinst,
self.kindat,
startTime.year,
startTime.month,
startTime.day,
startTime.hour,
startTime.minute,
startTime.second,
startTime.microsecond/10000,
endTime.year,
endTime.month,
endTime.day,
endTime.hour,
endTime.minute,
endTime.second,
endTime.microsecond/10000,
list(self.oneDDict.keys()),
list(self.twoDDict.keys()),
len(index),
**self.extra_args
)
# Setting 1d values
for key in self.oneDDict:
rec.set1D(key, getattr(self.dataOut, self.oneDDict[key]))
# Setting 2d values
nrec = 0
for n in index:
for key in out:
rec.set2D(key, nrec, out[key][n])
nrec += 1
self.fp.append(rec)
if self.ext == '.hdf5' and self.counter % 500 == 0 and self.counter > 0:
self.fp.dump()
if self.counter % 20 == 0 and self.counter > 0:
log.log(
'Writing {} records'.format(
self.counter),
'MADWriter')
def setHeader(self):
'''
Create an add catalog and header to cedar file
'''
log.success('Closing file {}'.format(self.fullname), 'MADWriter')
if self.ext == '.dat':
self.fp.write()
else:
self.fp.dump()
self.fp.close()
header = madrigal.cedar.CatalogHeaderCreator(self.fullname)
header.createCatalog(**self.catalog)
header.createHeader(**self.header)
header.write()
def putData(self):
if self.dataOut.flagNoData:
return 0
if self.dataOut.flagDiscontinuousBlock or self.counter == self.blocks:
if self.counter > 0:
self.setHeader()
self.counter = 0
if self.counter == 0:
self.setFile()
self.writeBlock()
self.counter += 1
def close(self):
if self.counter > 0:
self.setHeader()