##// END OF EJS Templates
añadido FaradayIntegration para limpieza de datos, restringida la operación al funcionamiento con la pdata regular
añadido FaradayIntegration para limpieza de datos, restringida la operación al funcionamiento con la pdata regular

File last commit:

r1287:af11e4aac00c
r1399:101813202a42
Show More
jroIO_madrigal.py
594 lines | 19.9 KiB | text/x-python | PythonLexer
'''
Created on Aug 1, 2017
@author: Juan C. Espinoza
'''
import os
import sys
import time
import json
import glob
import datetime
import numpy
import h5py
import schainpy.admin
from schainpy.model.io.jroIO_base import LOCALTIME, Reader
from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
from schainpy.model.data.jrodata import Parameters
from schainpy.utils import log
try:
import madrigal.cedar
except:
pass
try:
basestring
except:
basestring = str
DEF_CATALOG = {
'principleInvestigator': 'Marco Milla',
'expPurpose': '',
'cycleTime': '',
'correlativeExp': '',
'sciRemarks': '',
'instRemarks': ''
}
DEF_HEADER = {
'kindatDesc': '',
'analyst': 'Jicamarca User',
'comments': '',
'history': ''
}
MNEMONICS = {
10: 'jro',
11: 'jbr',
840: 'jul',
13: 'jas',
1000: 'pbr',
1001: 'hbr',
1002: 'obr',
400: 'clr'
}
UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
def load_json(obj):
'''
Parse json as string instead of unicode
'''
if isinstance(obj, str):
iterable = json.loads(obj)
else:
iterable = obj
if isinstance(iterable, dict):
return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, basestring) else v
for k, v in list(iterable.items())}
elif isinstance(iterable, (list, tuple)):
return [str(v) if isinstance(v, basestring) else v for v in iterable]
return iterable
class MADReader(Reader, ProcessingUnit):
def __init__(self):
ProcessingUnit.__init__(self)
self.dataOut = Parameters()
self.counter_records = 0
self.nrecords = None
self.flagNoMoreFiles = 0
self.filename = None
self.intervals = set()
self.datatime = datetime.datetime(1900,1,1)
self.format = None
self.filefmt = "***%Y%m%d*******"
def setup(self, **kwargs):
self.set_kwargs(**kwargs)
self.oneDDict = load_json(self.oneDDict)
self.twoDDict = load_json(self.twoDDict)
self.ind2DList = load_json(self.ind2DList)
self.independentParam = self.ind2DList[0]
if self.path is None:
raise ValueError('The path is not valid')
self.open_file = open
self.open_mode = 'rb'
if self.format is None:
raise ValueError('The format is not valid choose simple or hdf5')
elif self.format.lower() in ('simple', 'txt'):
self.ext = '.txt'
elif self.format.lower() in ('cedar',):
self.ext = '.001'
else:
self.ext = '.hdf5'
self.open_file = h5py.File
self.open_mode = 'r'
if self.online:
log.log("Searching files in online mode...", self.name)
for nTries in range(self.nTries):
fullpath = self.searchFilesOnLine(self.path, self.startDate,
self.endDate, self.expLabel, self.ext, self.walk,
self.filefmt, self.folderfmt)
try:
fullpath = next(fullpath)
except:
fullpath = None
if fullpath:
break
log.warning(
'Waiting {} sec for a valid file in {}: try {} ...'.format(
self.delay, self.path, nTries + 1),
self.name)
time.sleep(self.delay)
if not(fullpath):
raise schainpy.admin.SchainError(
'There isn\'t any valid file in {}'.format(self.path))
else:
log.log("Searching files in {}".format(self.path), self.name)
self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
self.setNextFile()
def readFirstHeader(self):
'''Read header and data'''
self.parseHeader()
self.parseData()
self.blockIndex = 0
return
def parseHeader(self):
'''
'''
self.output = {}
self.version = '2'
s_parameters = None
if self.ext == '.txt':
self.parameters = [s.strip().lower() for s in self.fp.readline().decode().strip().split(' ') if s]
elif self.ext == '.hdf5':
self.metadata = self.fp['Metadata']
if '_record_layout' in self.metadata:
s_parameters = [s[0].lower().decode() for s in self.metadata['Independent Spatial Parameters']]
self.version = '3'
self.parameters = [s[0].lower().decode() for s in self.metadata['Data Parameters']]
log.success('Parameters found: {}'.format(self.parameters),
'MADReader')
if s_parameters:
log.success('Spatial parameters found: {}'.format(s_parameters),
'MADReader')
for param in list(self.oneDDict.keys()):
if param.lower() not in self.parameters:
log.warning(
'Parameter {} not found will be ignored'.format(
param),
'MADReader')
self.oneDDict.pop(param, None)
for param, value in list(self.twoDDict.items()):
if param.lower() not in self.parameters:
log.warning(
'Parameter {} not found, it will be ignored'.format(
param),
'MADReader')
self.twoDDict.pop(param, None)
continue
if isinstance(value, list):
if value[0] not in self.output:
self.output[value[0]] = []
self.output[value[0]].append([])
def parseData(self):
'''
'''
if self.ext == '.txt':
self.data = numpy.genfromtxt(self.fp, missing_values=('missing'))
self.nrecords = self.data.shape[0]
self.ranges = numpy.unique(self.data[:,self.parameters.index(self.independentParam.lower())])
self.counter_records = 0
elif self.ext == '.hdf5':
self.data = self.fp['Data']
self.ranges = numpy.unique(self.data['Table Layout'][self.independentParam.lower()])
self.times = numpy.unique(self.data['Table Layout']['ut1_unix'])
self.counter_records = int(self.data['Table Layout']['recno'][0])
self.nrecords = int(self.data['Table Layout']['recno'][-1])
def readNextBlock(self):
while True:
self.flagDiscontinuousBlock = 0
if self.counter_records == self.nrecords:
self.setNextFile()
self.readBlock()
if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
(self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
log.warning(
'Reading Record No. {}/{} -> {} [Skipping]'.format(
self.counter_records,
self.nrecords,
self.datatime.ctime()),
'MADReader')
continue
break
log.log(
'Reading Record No. {}/{} -> {}'.format(
self.counter_records,
self.nrecords,
self.datatime.ctime()),
'MADReader')
return 1
def readBlock(self):
'''
'''
dum = []
if self.ext == '.txt':
dt = self.data[self.counter_records][:6].astype(int)
if datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5]).date() > self.datatime.date():
self.flagDiscontinuousBlock = 1
self.datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
while True:
dt = self.data[self.counter_records][:6].astype(int)
datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
if datatime == self.datatime:
dum.append(self.data[self.counter_records])
self.counter_records += 1
if self.counter_records == self.nrecords:
break
continue
self.intervals.add((datatime-self.datatime).seconds)
break
elif self.ext == '.hdf5':
datatime = datetime.datetime.utcfromtimestamp(
self.times[self.counter_records])
dum = self.data['Table Layout'][self.data['Table Layout']['recno']==self.counter_records]
self.intervals.add((datatime-self.datatime).seconds)
if datatime.date()>self.datatime.date():
self.flagDiscontinuousBlock = 1
self.datatime = datatime
self.counter_records += 1
self.buffer = numpy.array(dum)
return
def set_output(self):
'''
Storing data from buffer to dataOut object
'''
parameters = [None for __ in self.parameters]
for param, attr in list(self.oneDDict.items()):
x = self.parameters.index(param.lower())
setattr(self.dataOut, attr, self.buffer[0][x])
for param, value in list(self.twoDDict.items()):
dummy = numpy.zeros(self.ranges.shape) + numpy.nan
if self.ext == '.txt':
x = self.parameters.index(param.lower())
y = self.parameters.index(self.independentParam.lower())
ranges = self.buffer[:,y]
#if self.ranges.size == ranges.size:
# continue
index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
dummy[index] = self.buffer[:,x]
else:
ranges = self.buffer[self.independentParam.lower()]
index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
dummy[index] = self.buffer[param.lower()]
if isinstance(value, str):
if value not in self.independentParam:
setattr(self.dataOut, value, dummy.reshape(1,-1))
elif isinstance(value, list):
self.output[value[0]][value[1]] = dummy
parameters[value[1]] = param
for key, value in list(self.output.items()):
setattr(self.dataOut, key, numpy.array(value))
self.dataOut.parameters = [s for s in parameters if s]
self.dataOut.heightList = self.ranges
self.dataOut.utctime = (self.datatime - datetime.datetime(1970, 1, 1)).total_seconds()
self.dataOut.utctimeInit = self.dataOut.utctime
self.dataOut.paramInterval = min(self.intervals)
self.dataOut.useLocalTime = False
self.dataOut.flagNoData = False
self.dataOut.nrecords = self.nrecords
self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
def getData(self):
'''
Storing data from databuffer to dataOut object
'''
if not self.readNextBlock():
self.dataOut.flagNoData = True
return 0
self.set_output()
return 1
def run(self, **kwargs):
if not(self.isConfig):
self.setup(**kwargs)
self.isConfig = True
self.getData()
return
@MPDecorator
class MADWriter(Operation):
'''Writing module for Madrigal files
type: external
Inputs:
path path where files will be created
oneDDict json of one-dimensional parameters in record where keys
are Madrigal codes (integers or mnemonics) and values the corresponding
dataOut attribute e.g: {
'gdlatr': 'lat',
'gdlonr': 'lon',
'gdlat2':'lat',
'glon2':'lon'}
ind2DList list of independent spatial two-dimensional parameters e.g:
['heigthList']
twoDDict json of two-dimensional parameters in record where keys
are Madrigal codes (integers or mnemonics) and values the corresponding
dataOut attribute if multidimensional array specify as tupple
('attr', pos) e.g: {
'gdalt': 'heightList',
'vn1p2': ('data_output', 0),
'vn2p2': ('data_output', 1),
'vn3': ('data_output', 2),
'snl': ('data_SNR', 'db')
}
metadata json of madrigal metadata (kinst, kindat, catalog and header)
format hdf5, cedar
blocks number of blocks per file'''
__attrs__ = ['path', 'oneDDict', 'ind2DList', 'twoDDict','metadata', 'format', 'blocks']
missing = -32767
def __init__(self):
Operation.__init__(self)
self.dataOut = Parameters()
self.counter = 0
self.path = None
self.fp = None
def run(self, dataOut, path, oneDDict, ind2DList='[]', twoDDict='{}',
metadata='{}', format='cedar', **kwargs):
if not self.isConfig:
self.setup(path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs)
self.isConfig = True
self.dataOut = dataOut
self.putData()
return 1
def setup(self, path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs):
'''
Configure Operation
'''
self.path = path
self.blocks = kwargs.get('blocks', None)
self.counter = 0
self.oneDDict = load_json(oneDDict)
self.twoDDict = load_json(twoDDict)
self.ind2DList = load_json(ind2DList)
meta = load_json(metadata)
self.kinst = meta.get('kinst')
self.kindat = meta.get('kindat')
self.catalog = meta.get('catalog', DEF_CATALOG)
self.header = meta.get('header', DEF_HEADER)
if format == 'cedar':
self.ext = '.dat'
self.extra_args = {}
elif format == 'hdf5':
self.ext = '.hdf5'
self.extra_args = {'ind2DList': self.ind2DList}
self.keys = [k.lower() for k in self.twoDDict]
if 'range' in self.keys:
self.keys.remove('range')
if 'gdalt' in self.keys:
self.keys.remove('gdalt')
def setFile(self):
'''
Create new cedar file object
'''
self.mnemonic = MNEMONICS[self.kinst] #TODO get mnemonic from madrigal
date = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
filename = '{}{}{}'.format(self.mnemonic,
date.strftime('%Y%m%d_%H%M%S'),
self.ext)
self.fullname = os.path.join(self.path, filename)
if os.path.isfile(self.fullname) :
log.warning(
'Destination file {} already exists, previous file deleted.'.format(
self.fullname),
'MADWriter')
os.remove(self.fullname)
try:
log.success(
'Creating file: {}'.format(self.fullname),
'MADWriter')
if not os.path.exists(self.path):
os.makedirs(self.path)
self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
except ValueError as e:
log.error(
'Impossible to create a cedar object with "madrigal.cedar.MadrigalCedarFile"',
'MADWriter')
return
return 1
def writeBlock(self):
'''
Add data records to cedar file taking data from oneDDict and twoDDict
attributes.
Allowed parameters in: parcodes.tab
'''
startTime = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
endTime = startTime + datetime.timedelta(seconds=self.dataOut.paramInterval)
heights = self.dataOut.heightList
if self.ext == '.dat':
for key, value in list(self.twoDDict.items()):
if isinstance(value, str):
data = getattr(self.dataOut, value)
invalid = numpy.isnan(data)
data[invalid] = self.missing
elif isinstance(value, (tuple, list)):
attr, key = value
data = getattr(self.dataOut, attr)
invalid = numpy.isnan(data)
data[invalid] = self.missing
out = {}
for key, value in list(self.twoDDict.items()):
key = key.lower()
if isinstance(value, str):
if 'db' in value.lower():
tmp = getattr(self.dataOut, value.replace('_db', ''))
SNRavg = numpy.average(tmp, axis=0)
tmp = 10*numpy.log10(SNRavg)
else:
tmp = getattr(self.dataOut, value)
out[key] = tmp.flatten()[:len(heights)]
elif isinstance(value, (tuple, list)):
attr, x = value
data = getattr(self.dataOut, attr)
out[key] = data[int(x)][:len(heights)]
a = numpy.array([out[k] for k in self.keys])
nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
index = numpy.where(nrows == False)[0]
rec = madrigal.cedar.MadrigalDataRecord(
self.kinst,
self.kindat,
startTime.year,
startTime.month,
startTime.day,
startTime.hour,
startTime.minute,
startTime.second,
startTime.microsecond/10000,
endTime.year,
endTime.month,
endTime.day,
endTime.hour,
endTime.minute,
endTime.second,
endTime.microsecond/10000,
list(self.oneDDict.keys()),
list(self.twoDDict.keys()),
len(index),
**self.extra_args
)
# Setting 1d values
for key in self.oneDDict:
rec.set1D(key, getattr(self.dataOut, self.oneDDict[key]))
# Setting 2d values
nrec = 0
for n in index:
for key in out:
rec.set2D(key, nrec, out[key][n])
nrec += 1
self.fp.append(rec)
if self.ext == '.hdf5' and self.counter % 500 == 0 and self.counter > 0:
self.fp.dump()
if self.counter % 20 == 0 and self.counter > 0:
log.log(
'Writing {} records'.format(
self.counter),
'MADWriter')
def setHeader(self):
'''
Create an add catalog and header to cedar file
'''
log.success('Closing file {}'.format(self.fullname), 'MADWriter')
if self.ext == '.dat':
self.fp.write()
else:
self.fp.dump()
self.fp.close()
header = madrigal.cedar.CatalogHeaderCreator(self.fullname)
header.createCatalog(**self.catalog)
header.createHeader(**self.header)
header.write()
def putData(self):
if self.dataOut.flagNoData:
return 0
if self.dataOut.flagDiscontinuousBlock or self.counter == self.blocks:
if self.counter > 0:
self.setHeader()
self.counter = 0
if self.counter == 0:
self.setFile()
self.writeBlock()
self.counter += 1
def close(self):
if self.counter > 0:
self.setHeader()