##// END OF EJS Templates
Update plot codes, add meta attribute to dataOut to send metadata to plots
Update plot codes, add meta attribute to dataOut to send metadata to plots

File last commit:

r1086:828386118e15
r1139:ade57c0ecace
Show More
jroIO_madrigal.py
642 lines | 21.3 KiB | text/x-python | PythonLexer
'''
Created on Aug 1, 2017
@author: Juan C. Espinoza
'''
import os
import sys
import time
import json
import glob
import datetime
import numpy
import h5py
from schainpy.model.io.jroIO_base import JRODataReader
from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
from schainpy.model.data.jrodata import Parameters
from schainpy.utils import log
try:
import madrigal.cedar
except:
log.warning(
'You should install "madrigal library" module if you want to read/write Madrigal data'
)
DEF_CATALOG = {
'principleInvestigator': 'Marco Milla',
'expPurpose': None,
'cycleTime': None,
'correlativeExp': None,
'sciRemarks': None,
'instRemarks': None
}
DEF_HEADER = {
'kindatDesc': None,
'analyst': 'Jicamarca User',
'comments': None,
'history': None
}
MNEMONICS = {
10: 'jro',
11: 'jbr',
840: 'jul',
13: 'jas',
1000: 'pbr',
1001: 'hbr',
1002: 'obr',
}
UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
def load_json(obj):
'''
Parse json as string instead of unicode
'''
if isinstance(obj, str):
iterable = json.loads(obj)
else:
iterable = obj
if isinstance(iterable, dict):
return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, unicode) else v
for k, v in iterable.items()}
elif isinstance(iterable, (list, tuple)):
return [str(v) if isinstance(v, unicode) else v for v in iterable]
return iterable
class MADReader(JRODataReader, ProcessingUnit):
def __init__(self, **kwargs):
ProcessingUnit.__init__(self, **kwargs)
self.dataOut = Parameters()
self.counter_records = 0
self.nrecords = None
self.flagNoMoreFiles = 0
self.isConfig = False
self.filename = None
self.intervals = set()
def setup(self,
path=None,
startDate=None,
endDate=None,
format=None,
startTime=datetime.time(0, 0, 0),
endTime=datetime.time(23, 59, 59),
**kwargs):
self.path = path
self.startDate = startDate
self.endDate = endDate
self.startTime = startTime
self.endTime = endTime
self.datatime = datetime.datetime(1900,1,1)
self.oneDDict = load_json(kwargs.get('oneDDict',
"{\"GDLATR\":\"lat\", \"GDLONR\":\"lon\"}"))
self.twoDDict = load_json(kwargs.get('twoDDict',
"{\"GDALT\": \"heightList\"}"))
self.ind2DList = load_json(kwargs.get('ind2DList',
"[\"GDALT\"]"))
if self.path is None:
raise ValueError, 'The path is not valid'
if format is None:
raise ValueError, 'The format is not valid choose simple or hdf5'
elif format.lower() in ('simple', 'txt'):
self.ext = '.txt'
elif format.lower() in ('cedar',):
self.ext = '.001'
else:
self.ext = '.hdf5'
self.search_files(self.path)
self.fileId = 0
if not self.fileList:
raise Warning, 'There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path)
self.setNextFile()
def search_files(self, path):
'''
Searching for madrigal files in path
Creating a list of files to procces included in [startDate,endDate]
Input:
path - Path to find files
'''
log.log('Searching files {} in {} '.format(self.ext, path), 'MADReader')
foldercounter = 0
fileList0 = glob.glob1(path, '*{}'.format(self.ext))
fileList0.sort()
self.fileList = []
self.dateFileList = []
startDate = self.startDate - datetime.timedelta(1)
endDate = self.endDate + datetime.timedelta(1)
for thisFile in fileList0:
year = thisFile[3:7]
if not year.isdigit():
continue
month = thisFile[7:9]
if not month.isdigit():
continue
day = thisFile[9:11]
if not day.isdigit():
continue
year, month, day = int(year), int(month), int(day)
dateFile = datetime.date(year, month, day)
if (startDate > dateFile) or (endDate < dateFile):
continue
self.fileList.append(thisFile)
self.dateFileList.append(dateFile)
return
def parseHeader(self):
'''
'''
self.output = {}
self.version = '2'
s_parameters = None
if self.ext == '.txt':
self.parameters = [s.strip().lower() for s in self.fp.readline().strip().split(' ') if s]
elif self.ext == '.hdf5':
metadata = self.fp['Metadata']
data = self.fp['Data']['Array Layout']
if 'Independent Spatial Parameters' in metadata:
s_parameters = [s[0].lower() for s in metadata['Independent Spatial Parameters']]
self.version = '3'
one = [s[0].lower() for s in data['1D Parameters']['Data Parameters']]
one_d = [1 for s in one]
two = [s[0].lower() for s in data['2D Parameters']['Data Parameters']]
two_d = [2 for s in two]
self.parameters = one + two
self.parameters_d = one_d + two_d
log.success('Parameters found: {}'.format(','.join(self.parameters)),
'MADReader')
if s_parameters:
log.success('Spatial parameters: {}'.format(','.join(s_parameters)),
'MADReader')
for param in self.oneDDict.keys():
if param.lower() not in self.parameters:
log.warning(
'Parameter {} not found will be ignored'.format(
param),
'MADReader')
self.oneDDict.pop(param, None)
for param, value in self.twoDDict.items():
if param.lower() not in self.parameters:
log.warning(
'Parameter {} not found, it will be ignored'.format(
param),
'MADReader')
self.twoDDict.pop(param, None)
continue
if isinstance(value, list):
if value[0] not in self.output:
self.output[value[0]] = []
self.output[value[0]].append(None)
def parseData(self):
'''
'''
if self.ext == '.txt':
self.data = numpy.genfromtxt(self.fp, missing_values=('missing'))
self.nrecords = self.data.shape[0]
self.ranges = numpy.unique(self.data[:,self.parameters.index(self.ind2DList[0].lower())])
elif self.ext == '.hdf5':
self.data = self.fp['Data']['Array Layout']
self.nrecords = len(self.data['timestamps'].value)
self.ranges = self.data['range'].value
def setNextFile(self):
'''
'''
file_id = self.fileId
if file_id == len(self.fileList):
log.success('No more files', 'MADReader')
self.flagNoMoreFiles = 1
return 0
log.success(
'Opening: {}'.format(self.fileList[file_id]),
'MADReader'
)
filename = os.path.join(self.path, self.fileList[file_id])
if self.filename is not None:
self.fp.close()
self.filename = filename
self.filedate = self.dateFileList[file_id]
if self.ext=='.hdf5':
self.fp = h5py.File(self.filename, 'r')
else:
self.fp = open(self.filename, 'rb')
self.parseHeader()
self.parseData()
self.sizeOfFile = os.path.getsize(self.filename)
self.counter_records = 0
self.flagIsNewFile = 0
self.fileId += 1
return 1
def readNextBlock(self):
while True:
self.flagDiscontinuousBlock = 0
if self.flagIsNewFile:
if not self.setNextFile():
return 0
self.readBlock()
if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
(self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
log.warning(
'Reading Record No. {}/{} -> {} [Skipping]'.format(
self.counter_records,
self.nrecords,
self.datatime.ctime()),
'MADReader')
continue
break
log.log(
'Reading Record No. {}/{} -> {}'.format(
self.counter_records,
self.nrecords,
self.datatime.ctime()),
'MADReader')
return 1
def readBlock(self):
'''
'''
dum = []
if self.ext == '.txt':
dt = self.data[self.counter_records][:6].astype(int)
if datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5]).date() > self.datatime.date():
self.flagDiscontinuousBlock = 1
self.datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
while True:
dt = self.data[self.counter_records][:6].astype(int)
datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
if datatime == self.datatime:
dum.append(self.data[self.counter_records])
self.counter_records += 1
if self.counter_records == self.nrecords:
self.flagIsNewFile = True
break
continue
self.intervals.add((datatime-self.datatime).seconds)
break
elif self.ext == '.hdf5':
datatime = datetime.datetime.utcfromtimestamp(
self.data['timestamps'][self.counter_records])
nHeights = len(self.ranges)
for n, param in enumerate(self.parameters):
if self.parameters_d[n] == 1:
dum.append(numpy.ones(nHeights)*self.data['1D Parameters'][param][self.counter_records])
else:
if self.version == '2':
dum.append(self.data['2D Parameters'][param][self.counter_records])
else:
tmp = self.data['2D Parameters'][param].value.T
dum.append(tmp[self.counter_records])
self.intervals.add((datatime-self.datatime).seconds)
if datatime.date()>self.datatime.date():
self.flagDiscontinuousBlock = 1
self.datatime = datatime
self.counter_records += 1
if self.counter_records == self.nrecords:
self.flagIsNewFile = True
self.buffer = numpy.array(dum)
return
def set_output(self):
'''
Storing data from buffer to dataOut object
'''
parameters = [None for __ in self.parameters]
for param, attr in self.oneDDict.items():
x = self.parameters.index(param.lower())
setattr(self.dataOut, attr, self.buffer[0][x])
for param, value in self.twoDDict.items():
x = self.parameters.index(param.lower())
if self.ext == '.txt':
y = self.parameters.index(self.ind2DList[0].lower())
ranges = self.buffer[:,y]
if self.ranges.size == ranges.size:
continue
index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
dummy = numpy.zeros(self.ranges.shape) + numpy.nan
dummy[index] = self.buffer[:,x]
else:
dummy = self.buffer[x]
if isinstance(value, str):
if value not in self.ind2DList:
setattr(self.dataOut, value, dummy.reshape(1,-1))
elif isinstance(value, list):
self.output[value[0]][value[1]] = dummy
parameters[value[1]] = param
for key, value in self.output.items():
setattr(self.dataOut, key, numpy.array(value))
self.dataOut.parameters = [s for s in parameters if s]
self.dataOut.heightList = self.ranges
self.dataOut.utctime = (self.datatime - datetime.datetime(1970, 1, 1)).total_seconds()
self.dataOut.utctimeInit = self.dataOut.utctime
self.dataOut.paramInterval = min(self.intervals)
self.dataOut.useLocalTime = False
self.dataOut.flagNoData = False
self.dataOut.nrecords = self.nrecords
self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
def getData(self):
'''
Storing data from databuffer to dataOut object
'''
if self.flagNoMoreFiles:
self.dataOut.flagNoData = True
log.error('No file left to process', 'MADReader')
return 0
if not self.readNextBlock():
self.dataOut.flagNoData = True
return 0
self.set_output()
return 1
class MADWriter(Operation):
missing = -32767
def __init__(self, **kwargs):
Operation.__init__(self, **kwargs)
self.dataOut = Parameters()
self.counter = 0
self.path = None
self.fp = None
def run(self, dataOut, path, oneDDict, ind2DList='[]', twoDDict='{}',
metadata='{}', format='cedar', **kwargs):
'''
Inputs:
path - path where files will be created
oneDDict - json of one-dimensional parameters in record where keys
are Madrigal codes (integers or mnemonics) and values the corresponding
dataOut attribute e.g: {
'gdlatr': 'lat',
'gdlonr': 'lon',
'gdlat2':'lat',
'glon2':'lon'}
ind2DList - list of independent spatial two-dimensional parameters e.g:
['heighList']
twoDDict - json of two-dimensional parameters in record where keys
are Madrigal codes (integers or mnemonics) and values the corresponding
dataOut attribute if multidimensional array specify as tupple
('attr', pos) e.g: {
'gdalt': 'heightList',
'vn1p2': ('data_output', 0),
'vn2p2': ('data_output', 1),
'vn3': ('data_output', 2),
'snl': ('data_SNR', 'db')
}
metadata - json of madrigal metadata (kinst, kindat, catalog and header)
'''
if not self.isConfig:
self.setup(path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs)
self.isConfig = True
self.dataOut = dataOut
self.putData()
return
def setup(self, path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs):
'''
Configure Operation
'''
self.path = path
self.blocks = kwargs.get('blocks', None)
self.counter = 0
self.oneDDict = load_json(oneDDict)
self.twoDDict = load_json(twoDDict)
self.ind2DList = load_json(ind2DList)
meta = load_json(metadata)
self.kinst = meta.get('kinst')
self.kindat = meta.get('kindat')
self.catalog = meta.get('catalog', DEF_CATALOG)
self.header = meta.get('header', DEF_HEADER)
if format == 'cedar':
self.ext = '.dat'
self.extra_args = {}
elif format == 'hdf5':
self.ext = '.hdf5'
self.extra_args = {'ind2DList': self.ind2DList}
self.keys = [k.lower() for k in self.twoDDict]
if 'range' in self.keys:
self.keys.remove('range')
if 'gdalt' in self.keys:
self.keys.remove('gdalt')
def setFile(self):
'''
Create new cedar file object
'''
self.mnemonic = MNEMONICS[self.kinst] #TODO get mnemonic from madrigal
date = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
filename = '{}{}{}'.format(self.mnemonic,
date.strftime('%Y%m%d_%H%M%S'),
self.ext)
self.fullname = os.path.join(self.path, filename)
if os.path.isfile(self.fullname) :
log.warning(
'Destination file {} already exists, previous file deleted.'.format(
self.fullname),
'MADWriter')
os.remove(self.fullname)
try:
log.success(
'Creating file: {}'.format(self.fullname),
'MADWriter')
self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
except ValueError, e:
log.error(
'Impossible to create a cedar object with "madrigal.cedar.MadrigalCedarFile"',
'MADWriter')
return
return 1
def writeBlock(self):
'''
Add data records to cedar file taking data from oneDDict and twoDDict
attributes.
Allowed parameters in: parcodes.tab
'''
startTime = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
endTime = startTime + datetime.timedelta(seconds=self.dataOut.paramInterval)
heights = self.dataOut.heightList
if self.ext == '.dat':
for key, value in self.twoDDict.items():
if isinstance(value, str):
data = getattr(self.dataOut, value)
invalid = numpy.isnan(data)
data[invalid] = self.missing
elif isinstance(value, (tuple, list)):
attr, key = value
data = getattr(self.dataOut, attr)
invalid = numpy.isnan(data)
data[invalid] = self.missing
out = {}
for key, value in self.twoDDict.items():
key = key.lower()
if isinstance(value, str):
if 'db' in value.lower():
tmp = getattr(self.dataOut, value.replace('_db', ''))
SNRavg = numpy.average(tmp, axis=0)
tmp = 10*numpy.log10(SNRavg)
else:
tmp = getattr(self.dataOut, value)
out[key] = tmp.flatten()
elif isinstance(value, (tuple, list)):
attr, x = value
data = getattr(self.dataOut, attr)
out[key] = data[int(x)]
a = numpy.array([out[k] for k in self.keys])
nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
index = numpy.where(nrows == False)[0]
rec = madrigal.cedar.MadrigalDataRecord(
self.kinst,
self.kindat,
startTime.year,
startTime.month,
startTime.day,
startTime.hour,
startTime.minute,
startTime.second,
startTime.microsecond/10000,
endTime.year,
endTime.month,
endTime.day,
endTime.hour,
endTime.minute,
endTime.second,
endTime.microsecond/10000,
self.oneDDict.keys(),
self.twoDDict.keys(),
len(index),
**self.extra_args
)
# Setting 1d values
for key in self.oneDDict:
rec.set1D(key, getattr(self.dataOut, self.oneDDict[key]))
# Setting 2d values
nrec = 0
for n in index:
for key in out:
rec.set2D(key, nrec, out[key][n])
nrec += 1
self.fp.append(rec)
if self.ext == '.hdf5' and self.counter % 500 == 0 and self.counter > 0:
self.fp.dump()
if self.counter % 100 == 0 and self.counter > 0:
log.log(
'Writing {} records'.format(
self.counter),
'MADWriter')
def setHeader(self):
'''
Create an add catalog and header to cedar file
'''
log.success('Closing file {}'.format(self.fullname), 'MADWriter')
if self.ext == '.dat':
self.fp.write()
else:
self.fp.dump()
self.fp.close()
header = madrigal.cedar.CatalogHeaderCreator(self.fullname)
header.createCatalog(**self.catalog)
header.createHeader(**self.header)
header.write()
def putData(self):
if self.dataOut.flagNoData:
return 0
if self.dataOut.flagDiscontinuousBlock or self.counter == self.blocks:
if self.counter > 0:
self.setHeader()
self.counter = 0
if self.counter == 0:
self.setFile()
self.writeBlock()
self.counter += 1
def close(self):
if self.counter > 0:
self.setHeader()