##// END OF EJS Templates
Use of delays instead of input queue to keep dataouts and avoid loose of them
Use of delays instead of input queue to keep dataouts and avoid loose of them

File last commit:

r1241:c3044f867269
r1245:1ee18bfa3eb6
Show More
jroIO_madrigal.py
638 lines | 21.0 KiB | text/x-python | PythonLexer
'''
Created on Aug 1, 2017
@author: Juan C. Espinoza
'''
import os
import sys
import time
import json
import glob
import datetime
import numpy
import h5py
import schainpy.admin
from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
from schainpy.model.data.jrodata import Parameters
from schainpy.utils import log
try:
import madrigal.cedar
except:
log.warning(
'You should install "madrigal library" module if you want to read/write Madrigal data'
)
try:
basestring
except:
basestring = str
DEF_CATALOG = {
'principleInvestigator': 'Marco Milla',
'expPurpose': '',
'cycleTime': '',
'correlativeExp': '',
'sciRemarks': '',
'instRemarks': ''
}
DEF_HEADER = {
'kindatDesc': '',
'analyst': 'Jicamarca User',
'comments': '',
'history': ''
}
MNEMONICS = {
10: 'jro',
11: 'jbr',
840: 'jul',
13: 'jas',
1000: 'pbr',
1001: 'hbr',
1002: 'obr',
400: 'clr'
}
UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
def load_json(obj):
'''
Parse json as string instead of unicode
'''
if isinstance(obj, str):
iterable = json.loads(obj)
else:
iterable = obj
if isinstance(iterable, dict):
return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, basestring) else v
for k, v in list(iterable.items())}
elif isinstance(iterable, (list, tuple)):
return [str(v) if isinstance(v, basestring) else v for v in iterable]
return iterable
@MPDecorator
class MADReader(JRODataReader, ProcessingUnit):
def __init__(self):
ProcessingUnit.__init__(self)
self.dataOut = Parameters()
self.counter_records = 0
self.nrecords = None
self.flagNoMoreFiles = 0
self.isConfig = False
self.filename = None
self.intervals = set()
def setup(self,
path=None,
startDate=None,
endDate=None,
format=None,
startTime=datetime.time(0, 0, 0),
endTime=datetime.time(23, 59, 59),
**kwargs):
self.path = path
self.startDate = startDate
self.endDate = endDate
self.startTime = startTime
self.endTime = endTime
self.datatime = datetime.datetime(1900,1,1)
self.oneDDict = load_json(kwargs.get('oneDDict',
"{\"GDLATR\":\"lat\", \"GDLONR\":\"lon\"}"))
self.twoDDict = load_json(kwargs.get('twoDDict',
"{\"GDALT\": \"heightList\"}"))
self.independentParam = 'GDALT'
if self.path is None:
raise ValueError('The path is not valid')
if format is None:
raise ValueError('The format is not valid choose simple or hdf5')
elif format.lower() in ('simple', 'txt'):
self.ext = '.txt'
elif format.lower() in ('cedar',):
self.ext = '.001'
else:
self.ext = '.hdf5'
self.search_files(self.path)
self.fileId = 0
if not self.fileList:
raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path))
self.setNextFile()
def search_files(self, path):
'''
Searching for madrigal files in path
Creating a list of files to procces included in [startDate,endDate]
Input:
path - Path to find files
'''
log.log('Searching files {} in {} '.format(self.ext, path), 'MADReader')
fileList0 = glob.glob1(path, '*{}'.format(self.ext))
fileList0.sort()
self.fileList = []
self.dateFileList = []
startDate = self.startDate - datetime.timedelta(1)
endDate = self.endDate + datetime.timedelta(1)
for thisFile in fileList0:
year = thisFile[3:7]
if not year.isdigit():
continue
month = thisFile[7:9]
if not month.isdigit():
continue
day = thisFile[9:11]
if not day.isdigit():
continue
year, month, day = int(year), int(month), int(day)
dateFile = datetime.date(year, month, day)
if (startDate > dateFile) or (endDate < dateFile):
continue
self.fileList.append(thisFile)
self.dateFileList.append(dateFile)
return
def parseHeader(self):
'''
'''
self.output = {}
self.version = '2'
s_parameters = None
if self.ext == '.txt':
self.parameters = [s.strip().lower() for s in self.fp.readline().decode().strip().split(' ') if s]
elif self.ext == '.hdf5':
self.metadata = self.fp['Metadata']
if '_record_layout' in self.metadata:
s_parameters = [s[0].lower().decode() for s in self.metadata['Independent Spatial Parameters']]
self.version = '3'
self.parameters = [s[0].lower().decode() for s in self.metadata['Data Parameters']]
log.success('Parameters found: {}'.format(self.parameters),
'MADReader')
if s_parameters:
log.success('Spatial parameters found: {}'.format(s_parameters),
'MADReader')
for param in list(self.oneDDict.keys()):
if param.lower() not in self.parameters:
log.warning(
'Parameter {} not found will be ignored'.format(
param),
'MADReader')
self.oneDDict.pop(param, None)
for param, value in list(self.twoDDict.items()):
if param.lower() not in self.parameters:
log.warning(
'Parameter {} not found, it will be ignored'.format(
param),
'MADReader')
self.twoDDict.pop(param, None)
continue
if isinstance(value, list):
if value[0] not in self.output:
self.output[value[0]] = []
self.output[value[0]].append([])
def parseData(self):
'''
'''
if self.ext == '.txt':
self.data = numpy.genfromtxt(self.fp, missing_values=('missing'))
self.nrecords = self.data.shape[0]
self.ranges = numpy.unique(self.data[:,self.parameters.index(self.independentParam.lower())])
self.counter_records = 0
elif self.ext == '.hdf5':
self.data = self.fp['Data']
self.ranges = numpy.unique(self.data['Table Layout'][self.independentParam.lower()])
self.times = numpy.unique(self.data['Table Layout']['ut1_unix'])
self.counter_records = int(self.data['Table Layout']['recno'][0])
self.nrecords = int(self.data['Table Layout']['recno'][-1])
def setNextFile(self):
'''
'''
file_id = self.fileId
if file_id == len(self.fileList):
log.success('No more files', 'MADReader')
self.flagNoMoreFiles = 1
return 0
log.success(
'Opening: {}'.format(self.fileList[file_id]),
'MADReader'
)
filename = os.path.join(self.path, self.fileList[file_id])
if self.filename is not None:
self.fp.close()
self.filename = filename
self.filedate = self.dateFileList[file_id]
if self.ext=='.hdf5':
self.fp = h5py.File(self.filename, 'r')
else:
self.fp = open(self.filename, 'rb')
self.parseHeader()
self.parseData()
self.sizeOfFile = os.path.getsize(self.filename)
self.flagIsNewFile = 0
self.fileId += 1
return 1
def readNextBlock(self):
while True:
self.flagDiscontinuousBlock = 0
if self.flagIsNewFile:
if not self.setNextFile():
return 0
self.readBlock()
if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
(self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
log.warning(
'Reading Record No. {}/{} -> {} [Skipping]'.format(
self.counter_records,
self.nrecords,
self.datatime.ctime()),
'MADReader')
continue
break
log.log(
'Reading Record No. {}/{} -> {}'.format(
self.counter_records,
self.nrecords,
self.datatime.ctime()),
'MADReader')
return 1
def readBlock(self):
'''
'''
dum = []
if self.ext == '.txt':
dt = self.data[self.counter_records][:6].astype(int)
if datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5]).date() > self.datatime.date():
self.flagDiscontinuousBlock = 1
self.datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
while True:
dt = self.data[self.counter_records][:6].astype(int)
datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
if datatime == self.datatime:
dum.append(self.data[self.counter_records])
self.counter_records += 1
if self.counter_records == self.nrecords:
self.flagIsNewFile = True
break
continue
self.intervals.add((datatime-self.datatime).seconds)
break
elif self.ext == '.hdf5':
datatime = datetime.datetime.utcfromtimestamp(
self.times[self.counter_records])
dum = self.data['Table Layout'][self.data['Table Layout']['recno']==self.counter_records]
self.intervals.add((datatime-self.datatime).seconds)
if datatime.date()>self.datatime.date():
self.flagDiscontinuousBlock = 1
self.datatime = datatime
self.counter_records += 1
if self.counter_records == self.nrecords:
self.flagIsNewFile = True
self.buffer = numpy.array(dum)
return
def set_output(self):
'''
Storing data from buffer to dataOut object
'''
parameters = [None for __ in self.parameters]
for param, attr in list(self.oneDDict.items()):
x = self.parameters.index(param.lower())
setattr(self.dataOut, attr, self.buffer[0][x])
for param, value in list(self.twoDDict.items()):
dummy = numpy.zeros(self.ranges.shape) + numpy.nan
if self.ext == '.txt':
x = self.parameters.index(param.lower())
y = self.parameters.index(self.independentParam.lower())
ranges = self.buffer[:,y]
#if self.ranges.size == ranges.size:
# continue
index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
dummy[index] = self.buffer[:,x]
else:
ranges = self.buffer[self.independentParam.lower()]
index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
dummy[index] = self.buffer[param.lower()]
if isinstance(value, str):
if value not in self.independentParam:
setattr(self.dataOut, value, dummy.reshape(1,-1))
elif isinstance(value, list):
self.output[value[0]][value[1]] = dummy
parameters[value[1]] = param
for key, value in list(self.output.items()):
setattr(self.dataOut, key, numpy.array(value))
self.dataOut.parameters = [s for s in parameters if s]
self.dataOut.heightList = self.ranges
self.dataOut.utctime = (self.datatime - datetime.datetime(1970, 1, 1)).total_seconds()
self.dataOut.utctimeInit = self.dataOut.utctime
self.dataOut.paramInterval = min(self.intervals)
self.dataOut.useLocalTime = False
self.dataOut.flagNoData = False
self.dataOut.nrecords = self.nrecords
self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
def getData(self):
'''
Storing data from databuffer to dataOut object
'''
if self.flagNoMoreFiles:
self.dataOut.flagNoData = True
raise schainpy.admin.SchainError('No file left to process')
return 0
if not self.readNextBlock():
self.dataOut.flagNoData = True
return 0
self.set_output()
return 1
@MPDecorator
class MADWriter(Operation):
missing = -32767
def __init__(self):
Operation.__init__(self)
self.dataOut = Parameters()
self.counter = 0
self.path = None
self.fp = None
def run(self, dataOut, path, oneDDict, independentParam='[]', twoDDict='{}',
metadata='{}', format='cedar', **kwargs):
'''
Inputs:
path - path where files will be created
oneDDict - json of one-dimensional parameters in record where keys
are Madrigal codes (integers or mnemonics) and values the corresponding
dataOut attribute e.g: {
'gdlatr': 'lat',
'gdlonr': 'lon',
'gdlat2':'lat',
'glon2':'lon'}
independentParam - list of independent spatial two-dimensional parameters e.g:
['heigthList']
twoDDict - json of two-dimensional parameters in record where keys
are Madrigal codes (integers or mnemonics) and values the corresponding
dataOut attribute if multidimensional array specify as tupple
('attr', pos) e.g: {
'gdalt': 'heightList',
'vn1p2': ('data_output', 0),
'vn2p2': ('data_output', 1),
'vn3': ('data_output', 2),
'snl': ('data_SNR', 'db')
}
metadata - json of madrigal metadata (kinst, kindat, catalog and header)
'''
if not self.isConfig:
self.setup(path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs)
self.isConfig = True
self.dataOut = dataOut
self.putData()
return 1
def setup(self, path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs):
'''
Configure Operation
'''
self.path = path
self.blocks = kwargs.get('blocks', None)
self.counter = 0
self.oneDDict = load_json(oneDDict)
self.twoDDict = load_json(twoDDict)
self.independentParam = load_json(independentParam)
meta = load_json(metadata)
self.kinst = meta.get('kinst')
self.kindat = meta.get('kindat')
self.catalog = meta.get('catalog', DEF_CATALOG)
self.header = meta.get('header', DEF_HEADER)
if format == 'cedar':
self.ext = '.dat'
self.extra_args = {}
elif format == 'hdf5':
self.ext = '.hdf5'
self.extra_args = {'independentParam': self.independentParam}
self.keys = [k.lower() for k in self.twoDDict]
if 'range' in self.keys:
self.keys.remove('range')
if 'gdalt' in self.keys:
self.keys.remove('gdalt')
def setFile(self):
'''
Create new cedar file object
'''
self.mnemonic = MNEMONICS[self.kinst] #TODO get mnemonic from madrigal
date = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
filename = '{}{}{}'.format(self.mnemonic,
date.strftime('%Y%m%d_%H%M%S'),
self.ext)
self.fullname = os.path.join(self.path, filename)
if os.path.isfile(self.fullname) :
log.warning(
'Destination file {} already exists, previous file deleted.'.format(
self.fullname),
'MADWriter')
os.remove(self.fullname)
try:
log.success(
'Creating file: {}'.format(self.fullname),
'MADWriter')
self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
except ValueError as e:
log.error(
'Impossible to create a cedar object with "madrigal.cedar.MadrigalCedarFile"',
'MADWriter')
return
return 1
def writeBlock(self):
'''
Add data records to cedar file taking data from oneDDict and twoDDict
attributes.
Allowed parameters in: parcodes.tab
'''
startTime = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
endTime = startTime + datetime.timedelta(seconds=self.dataOut.paramInterval)
heights = self.dataOut.heightList
if self.ext == '.dat':
for key, value in list(self.twoDDict.items()):
if isinstance(value, str):
data = getattr(self.dataOut, value)
invalid = numpy.isnan(data)
data[invalid] = self.missing
elif isinstance(value, (tuple, list)):
attr, key = value
data = getattr(self.dataOut, attr)
invalid = numpy.isnan(data)
data[invalid] = self.missing
out = {}
for key, value in list(self.twoDDict.items()):
key = key.lower()
if isinstance(value, str):
if 'db' in value.lower():
tmp = getattr(self.dataOut, value.replace('_db', ''))
SNRavg = numpy.average(tmp, axis=0)
tmp = 10*numpy.log10(SNRavg)
else:
tmp = getattr(self.dataOut, value)
out[key] = tmp.flatten()
elif isinstance(value, (tuple, list)):
attr, x = value
data = getattr(self.dataOut, attr)
out[key] = data[int(x)]
a = numpy.array([out[k] for k in self.keys])
nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
index = numpy.where(nrows == False)[0]
rec = madrigal.cedar.MadrigalDataRecord(
self.kinst,
self.kindat,
startTime.year,
startTime.month,
startTime.day,
startTime.hour,
startTime.minute,
startTime.second,
startTime.microsecond/10000,
endTime.year,
endTime.month,
endTime.day,
endTime.hour,
endTime.minute,
endTime.second,
endTime.microsecond/10000,
list(self.oneDDict.keys()),
list(self.twoDDict.keys()),
len(index),
**self.extra_args
)
# Setting 1d values
for key in self.oneDDict:
rec.set1D(key, getattr(self.dataOut, self.oneDDict[key]))
# Setting 2d values
nrec = 0
for n in index:
for key in out:
rec.set2D(key, nrec, out[key][n])
nrec += 1
self.fp.append(rec)
if self.ext == '.hdf5' and self.counter % 500 == 0 and self.counter > 0:
self.fp.dump()
if self.counter % 20 == 0 and self.counter > 0:
log.log(
'Writing {} records'.format(
self.counter),
'MADWriter')
def setHeader(self):
'''
Create an add catalog and header to cedar file
'''
log.success('Closing file {}'.format(self.fullname), 'MADWriter')
if self.ext == '.dat':
self.fp.write()
else:
self.fp.dump()
self.fp.close()
header = madrigal.cedar.CatalogHeaderCreator(self.fullname)
header.createCatalog(**self.catalog)
header.createHeader(**self.header)
header.write()
def putData(self):
if self.dataOut.flagNoData:
return 0
if self.dataOut.flagDiscontinuousBlock or self.counter == self.blocks:
if self.counter > 0:
self.setHeader()
self.counter = 0
if self.counter == 0:
self.setFile()
self.writeBlock()
self.counter += 1
def close(self):
if self.counter > 0:
self.setHeader()