''' Created on Aug 1, 2017 @author: Juan C. Espinoza ''' import os import sys import time import json import glob import datetime import numpy import h5py from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator from schainpy.model.data.jrodata import Parameters from schainpy.utils import log try: import madrigal.cedar except: log.warning( 'You should install "madrigal library" module if you want to read/write Madrigal data' ) try: basestring except: basestring = str DEF_CATALOG = { 'principleInvestigator': 'Marco Milla', 'expPurpose': '', 'cycleTime': '', 'correlativeExp': '', 'sciRemarks': '', 'instRemarks': '' } DEF_HEADER = { 'kindatDesc': '', 'analyst': 'Jicamarca User', 'comments': '', 'history': '' } MNEMONICS = { 10: 'jro', 11: 'jbr', 840: 'jul', 13: 'jas', 1000: 'pbr', 1001: 'hbr', 1002: 'obr', 400: 'clr' } UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone) def load_json(obj): ''' Parse json as string instead of unicode ''' if isinstance(obj, str): iterable = json.loads(obj) else: iterable = obj if isinstance(iterable, dict): return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, basestring) else v for k, v in list(iterable.items())} elif isinstance(iterable, (list, tuple)): return [str(v) if isinstance(v, basestring) else v for v in iterable] return iterable @MPDecorator class MADReader(JRODataReader, ProcessingUnit): def __init__(self): ProcessingUnit.__init__(self) self.dataOut = Parameters() self.counter_records = 0 self.nrecords = None self.flagNoMoreFiles = 0 self.isConfig = False self.filename = None self.intervals = set() def setup(self, path=None, startDate=None, endDate=None, format=None, startTime=datetime.time(0, 0, 0), endTime=datetime.time(23, 59, 59), **kwargs): self.path = path self.startDate = startDate self.endDate = endDate self.startTime = startTime self.endTime = endTime self.datatime = datetime.datetime(1900,1,1) self.oneDDict = load_json(kwargs.get('oneDDict', "{\"GDLATR\":\"lat\", \"GDLONR\":\"lon\"}")) self.twoDDict = load_json(kwargs.get('twoDDict', "{\"GDALT\": \"heightList\"}")) self.independentParam = 'GDALT' if self.path is None: raise ValueError('The path is not valid') if format is None: raise ValueError('The format is not valid choose simple or hdf5') elif format.lower() in ('simple', 'txt'): self.ext = '.txt' elif format.lower() in ('cedar',): self.ext = '.001' else: self.ext = '.hdf5' self.search_files(self.path) self.fileId = 0 if not self.fileList: raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path)) self.setNextFile() def search_files(self, path): ''' Searching for madrigal files in path Creating a list of files to procces included in [startDate,endDate] Input: path - Path to find files ''' log.log('Searching files {} in {} '.format(self.ext, path), 'MADReader') fileList0 = glob.glob1(path, '*{}'.format(self.ext)) fileList0.sort() self.fileList = [] self.dateFileList = [] startDate = self.startDate - datetime.timedelta(1) endDate = self.endDate + datetime.timedelta(1) for thisFile in fileList0: year = thisFile[3:7] if not year.isdigit(): continue month = thisFile[7:9] if not month.isdigit(): continue day = thisFile[9:11] if not day.isdigit(): continue year, month, day = int(year), int(month), int(day) dateFile = datetime.date(year, month, day) if (startDate > dateFile) or (endDate < dateFile): continue self.fileList.append(thisFile) self.dateFileList.append(dateFile) return def parseHeader(self): ''' ''' self.output = {} self.version = '2' s_parameters = None if self.ext == '.txt': self.parameters = [s.strip().lower() for s in self.fp.readline().decode().strip().split(' ') if s] elif self.ext == '.hdf5': self.metadata = self.fp['Metadata'] if '_record_layout' in self.metadata: s_parameters = [s[0].lower().decode() for s in self.metadata['Independent Spatial Parameters']] self.version = '3' self.parameters = [s[0].lower().decode() for s in self.metadata['Data Parameters']] log.success('Parameters found: {}'.format(self.parameters), 'MADReader') if s_parameters: log.success('Spatial parameters found: {}'.format(s_parameters), 'MADReader') for param in list(self.oneDDict.keys()): if param.lower() not in self.parameters: log.warning( 'Parameter {} not found will be ignored'.format( param), 'MADReader') self.oneDDict.pop(param, None) for param, value in list(self.twoDDict.items()): if param.lower() not in self.parameters: log.warning( 'Parameter {} not found, it will be ignored'.format( param), 'MADReader') self.twoDDict.pop(param, None) continue if isinstance(value, list): if value[0] not in self.output: self.output[value[0]] = [] self.output[value[0]].append([]) def parseData(self): ''' ''' if self.ext == '.txt': self.data = numpy.genfromtxt(self.fp, missing_values=('missing')) self.nrecords = self.data.shape[0] self.ranges = numpy.unique(self.data[:,self.parameters.index(self.independentParam.lower())]) self.counter_records = 0 elif self.ext == '.hdf5': self.data = self.fp['Data'] self.ranges = numpy.unique(self.data['Table Layout'][self.independentParam.lower()]) self.times = numpy.unique(self.data['Table Layout']['ut1_unix']) self.counter_records = int(self.data['Table Layout']['recno'][0]) self.nrecords = int(self.data['Table Layout']['recno'][-1]) def setNextFile(self): ''' ''' file_id = self.fileId if file_id == len(self.fileList): log.success('No more files', 'MADReader') self.flagNoMoreFiles = 1 return 0 log.success( 'Opening: {}'.format(self.fileList[file_id]), 'MADReader' ) filename = os.path.join(self.path, self.fileList[file_id]) if self.filename is not None: self.fp.close() self.filename = filename self.filedate = self.dateFileList[file_id] if self.ext=='.hdf5': self.fp = h5py.File(self.filename, 'r') else: self.fp = open(self.filename, 'rb') self.parseHeader() self.parseData() self.sizeOfFile = os.path.getsize(self.filename) self.flagIsNewFile = 0 self.fileId += 1 return 1 def readNextBlock(self): while True: self.flagDiscontinuousBlock = 0 if self.flagIsNewFile: if not self.setNextFile(): return 0 self.readBlock() if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \ (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)): log.warning( 'Reading Record No. {}/{} -> {} [Skipping]'.format( self.counter_records, self.nrecords, self.datatime.ctime()), 'MADReader') continue break log.log( 'Reading Record No. {}/{} -> {}'.format( self.counter_records, self.nrecords, self.datatime.ctime()), 'MADReader') return 1 def readBlock(self): ''' ''' dum = [] if self.ext == '.txt': dt = self.data[self.counter_records][:6].astype(int) if datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5]).date() > self.datatime.date(): self.flagDiscontinuousBlock = 1 self.datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5]) while True: dt = self.data[self.counter_records][:6].astype(int) datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5]) if datatime == self.datatime: dum.append(self.data[self.counter_records]) self.counter_records += 1 if self.counter_records == self.nrecords: self.flagIsNewFile = True break continue self.intervals.add((datatime-self.datatime).seconds) break elif self.ext == '.hdf5': datatime = datetime.datetime.utcfromtimestamp( self.times[self.counter_records]) dum = self.data['Table Layout'][self.data['Table Layout']['recno']==self.counter_records] self.intervals.add((datatime-self.datatime).seconds) if datatime.date()>self.datatime.date(): self.flagDiscontinuousBlock = 1 self.datatime = datatime self.counter_records += 1 if self.counter_records == self.nrecords: self.flagIsNewFile = True self.buffer = numpy.array(dum) return def set_output(self): ''' Storing data from buffer to dataOut object ''' parameters = [None for __ in self.parameters] for param, attr in list(self.oneDDict.items()): x = self.parameters.index(param.lower()) setattr(self.dataOut, attr, self.buffer[0][x]) for param, value in list(self.twoDDict.items()): dummy = numpy.zeros(self.ranges.shape) + numpy.nan if self.ext == '.txt': x = self.parameters.index(param.lower()) y = self.parameters.index(self.independentParam.lower()) ranges = self.buffer[:,y] #if self.ranges.size == ranges.size: # continue index = numpy.where(numpy.in1d(self.ranges, ranges))[0] dummy[index] = self.buffer[:,x] else: ranges = self.buffer[self.independentParam.lower()] index = numpy.where(numpy.in1d(self.ranges, ranges))[0] dummy[index] = self.buffer[param.lower()] if isinstance(value, str): if value not in self.independentParam: setattr(self.dataOut, value, dummy.reshape(1,-1)) elif isinstance(value, list): self.output[value[0]][value[1]] = dummy parameters[value[1]] = param for key, value in list(self.output.items()): setattr(self.dataOut, key, numpy.array(value)) self.dataOut.parameters = [s for s in parameters if s] self.dataOut.heightList = self.ranges self.dataOut.utctime = (self.datatime - datetime.datetime(1970, 1, 1)).total_seconds() self.dataOut.utctimeInit = self.dataOut.utctime self.dataOut.paramInterval = min(self.intervals) self.dataOut.useLocalTime = False self.dataOut.flagNoData = False self.dataOut.nrecords = self.nrecords self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock def getData(self): ''' Storing data from databuffer to dataOut object ''' if self.flagNoMoreFiles: self.dataOut.flagNoData = True self.dataOut.error = 'No file left to process' return 0 if not self.readNextBlock(): self.dataOut.flagNoData = True return 0 self.set_output() return 1 @MPDecorator class MADWriter(Operation): missing = -32767 def __init__(self): Operation.__init__(self) self.dataOut = Parameters() self.counter = 0 self.path = None self.fp = None def run(self, dataOut, path, oneDDict, independentParam='[]', twoDDict='{}', metadata='{}', format='cedar', **kwargs): ''' Inputs: path - path where files will be created oneDDict - json of one-dimensional parameters in record where keys are Madrigal codes (integers or mnemonics) and values the corresponding dataOut attribute e.g: { 'gdlatr': 'lat', 'gdlonr': 'lon', 'gdlat2':'lat', 'glon2':'lon'} independentParam - list of independent spatial two-dimensional parameters e.g: ['heigthList'] twoDDict - json of two-dimensional parameters in record where keys are Madrigal codes (integers or mnemonics) and values the corresponding dataOut attribute if multidimensional array specify as tupple ('attr', pos) e.g: { 'gdalt': 'heightList', 'vn1p2': ('data_output', 0), 'vn2p2': ('data_output', 1), 'vn3': ('data_output', 2), 'snl': ('data_SNR', 'db') } metadata - json of madrigal metadata (kinst, kindat, catalog and header) ''' if not self.isConfig: self.setup(path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs) self.isConfig = True self.dataOut = dataOut self.putData() return 1 def setup(self, path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs): ''' Configure Operation ''' self.path = path self.blocks = kwargs.get('blocks', None) self.counter = 0 self.oneDDict = load_json(oneDDict) self.twoDDict = load_json(twoDDict) self.independentParam = load_json(independentParam) meta = load_json(metadata) self.kinst = meta.get('kinst') self.kindat = meta.get('kindat') self.catalog = meta.get('catalog', DEF_CATALOG) self.header = meta.get('header', DEF_HEADER) if format == 'cedar': self.ext = '.dat' self.extra_args = {} elif format == 'hdf5': self.ext = '.hdf5' self.extra_args = {'independentParam': self.independentParam} self.keys = [k.lower() for k in self.twoDDict] if 'range' in self.keys: self.keys.remove('range') if 'gdalt' in self.keys: self.keys.remove('gdalt') def setFile(self): ''' Create new cedar file object ''' self.mnemonic = MNEMONICS[self.kinst] #TODO get mnemonic from madrigal date = datetime.datetime.utcfromtimestamp(self.dataOut.utctime) filename = '{}{}{}'.format(self.mnemonic, date.strftime('%Y%m%d_%H%M%S'), self.ext) self.fullname = os.path.join(self.path, filename) if os.path.isfile(self.fullname) : log.warning( 'Destination file {} already exists, previous file deleted.'.format( self.fullname), 'MADWriter') os.remove(self.fullname) try: log.success( 'Creating file: {}'.format(self.fullname), 'MADWriter') self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True) except ValueError as e: log.error( 'Impossible to create a cedar object with "madrigal.cedar.MadrigalCedarFile"', 'MADWriter') return return 1 def writeBlock(self): ''' Add data records to cedar file taking data from oneDDict and twoDDict attributes. Allowed parameters in: parcodes.tab ''' startTime = datetime.datetime.utcfromtimestamp(self.dataOut.utctime) endTime = startTime + datetime.timedelta(seconds=self.dataOut.paramInterval) heights = self.dataOut.heightList if self.ext == '.dat': for key, value in list(self.twoDDict.items()): if isinstance(value, str): data = getattr(self.dataOut, value) invalid = numpy.isnan(data) data[invalid] = self.missing elif isinstance(value, (tuple, list)): attr, key = value data = getattr(self.dataOut, attr) invalid = numpy.isnan(data) data[invalid] = self.missing out = {} for key, value in list(self.twoDDict.items()): key = key.lower() if isinstance(value, str): if 'db' in value.lower(): tmp = getattr(self.dataOut, value.replace('_db', '')) SNRavg = numpy.average(tmp, axis=0) tmp = 10*numpy.log10(SNRavg) else: tmp = getattr(self.dataOut, value) out[key] = tmp.flatten() elif isinstance(value, (tuple, list)): attr, x = value data = getattr(self.dataOut, attr) out[key] = data[int(x)] a = numpy.array([out[k] for k in self.keys]) nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))]) index = numpy.where(nrows == False)[0] rec = madrigal.cedar.MadrigalDataRecord( self.kinst, self.kindat, startTime.year, startTime.month, startTime.day, startTime.hour, startTime.minute, startTime.second, startTime.microsecond/10000, endTime.year, endTime.month, endTime.day, endTime.hour, endTime.minute, endTime.second, endTime.microsecond/10000, list(self.oneDDict.keys()), list(self.twoDDict.keys()), len(index), **self.extra_args ) # Setting 1d values for key in self.oneDDict: rec.set1D(key, getattr(self.dataOut, self.oneDDict[key])) # Setting 2d values nrec = 0 for n in index: for key in out: rec.set2D(key, nrec, out[key][n]) nrec += 1 self.fp.append(rec) if self.ext == '.hdf5' and self.counter % 500 == 0 and self.counter > 0: self.fp.dump() if self.counter % 20 == 0 and self.counter > 0: log.log( 'Writing {} records'.format( self.counter), 'MADWriter') def setHeader(self): ''' Create an add catalog and header to cedar file ''' log.success('Closing file {}'.format(self.fullname), 'MADWriter') if self.ext == '.dat': self.fp.write() else: self.fp.dump() self.fp.close() header = madrigal.cedar.CatalogHeaderCreator(self.fullname) header.createCatalog(**self.catalog) header.createHeader(**self.header) header.write() def putData(self): if self.dataOut.flagNoData: return 0 if self.dataOut.flagDiscontinuousBlock or self.counter == self.blocks: if self.counter > 0: self.setHeader() self.counter = 0 if self.counter == 0: self.setFile() self.writeBlock() self.counter += 1 def close(self): if self.counter > 0: self.setHeader()