##// END OF EJS Templates
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()

File last commit:

r1167:1f521b07c958
r1279:c53fe2a4a291
Show More
pxIO_param.py
350 lines | 10.9 KiB | text/x-python | PythonLexer
'''
Created on Jan 15, 2018
@author: Juan C. Espinoza
'''
import os
import sys
import time
import glob
import datetime
import tarfile
import numpy
from .utils import folder_in_range
from schainpy.model.io.jroIO_base import JRODataReader
from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
from schainpy.model.data.jrodata import Parameters
from schainpy.utils import log
try:
from netCDF4 import Dataset
except:
log.warning(
'You should install "netCDF4" module if you want to read/write NCDF files'
)
UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
class PXReader(JRODataReader, ProcessingUnit):
def __init__(self, **kwargs):
ProcessingUnit.__init__(self, **kwargs)
self.dataOut = Parameters()
self.counter_records = 0
self.nrecords = None
self.flagNoMoreFiles = 0
self.isConfig = False
self.filename = None
self.intervals = set()
self.ext = ('.nc', '.tgz')
self.online_mode = False
def setup(self,
path=None,
startDate=None,
endDate=None,
format=None,
startTime=datetime.time(0, 0, 0),
endTime=datetime.time(23, 59, 59),
walk=False,
**kwargs):
self.path = path
self.startDate = startDate
self.endDate = endDate
self.startTime = startTime
self.endTime = endTime
self.datatime = datetime.datetime(1900,1,1)
self.walk = walk
self.nTries = kwargs.get('nTries', 10)
self.online = kwargs.get('online', False)
self.delay = kwargs.get('delay', 60)
self.ele = kwargs.get('ext', '')
if self.path is None:
raise ValueError('The path is not valid')
self.search_files(path, startDate, endDate, startTime, endTime, walk)
self.cursor = 0
self.counter_records = 0
if not self.files:
raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path))
def search_files(self, path, startDate, endDate, startTime, endTime, walk):
'''
Searching for NCDF files in path
Creating a list of files to procces included in [startDate,endDate]
Input:
path - Path to find files
'''
log.log('Searching files {} in {} '.format(self.ext, path), 'PXReader')
if walk:
paths = [os.path.join(path, p) for p in os.listdir(path) if os.path.isdir(os.path.join(path, p))]
paths.sort()
else:
paths = [path]
fileList0 = []
for subpath in paths:
if not folder_in_range(subpath.split('/')[-1], startDate, endDate, '%Y%m%d'):
continue
fileList0 += [os.path.join(subpath, s) for s in glob.glob1(subpath, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s]
fileList0.sort()
if self.online:
fileList0 = fileList0[-1:]
self.files = {}
startDate = startDate - datetime.timedelta(1)
endDate = endDate + datetime.timedelta(1)
for fullname in fileList0:
thisFile = fullname.split('/')[-1]
year = thisFile[3:7]
if not year.isdigit():
continue
month = thisFile[7:9]
if not month.isdigit():
continue
day = thisFile[9:11]
if not day.isdigit():
continue
year, month, day = int(year), int(month), int(day)
dateFile = datetime.date(year, month, day)
timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
if (startDate > dateFile) or (endDate < dateFile):
continue
dt = datetime.datetime.combine(dateFile, timeFile)
if dt not in self.files:
self.files[dt] = []
self.files[dt].append(fullname)
self.dates = list(self.files.keys())
self.dates.sort()
return
def search_files_online(self):
'''
Searching for NCDF files in online mode path
Creating a list of files to procces included in [startDate,endDate]
Input:
path - Path to find files
'''
self.files = {}
for n in range(self.nTries):
if self.walk:
paths = [os.path.join(self.path, p) for p in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, p))]
paths.sort()
path = paths[-1]
else:
path = self.path
new_files = [os.path.join(path, s) for s in glob.glob1(path, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s]
new_files.sort()
for fullname in new_files:
thisFile = fullname.split('/')[-1]
year = thisFile[3:7]
if not year.isdigit():
continue
month = thisFile[7:9]
if not month.isdigit():
continue
day = thisFile[9:11]
if not day.isdigit():
continue
year, month, day = int(year), int(month), int(day)
dateFile = datetime.date(year, month, day)
timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
dt = datetime.datetime.combine(dateFile, timeFile)
if self.dt >= dt:
continue
if dt not in self.files:
self.dt = dt
self.files[dt] = []
self.files[dt].append(fullname)
break
if self.files:
break
else:
log.warning('Waiting {} seconds for the next file, try {} ...'.format(self.delay, n + 1), 'PXReader')
time.sleep(self.delay)
if not self.files:
return 0
self.dates = list(self.files.keys())
self.dates.sort()
self.cursor = 0
return 1
def parseFile(self):
'''
'''
header = {}
for attr in self.fp.ncattrs():
header[str(attr)] = getattr(self.fp, attr)
self.header.append(header)
self.data[header['TypeName']] = numpy.array(self.fp.variables[header['TypeName']])
def setNextFile(self):
'''
Open next files for the current datetime
'''
cursor = self.cursor
if not self.online_mode:
if cursor == len(self.dates):
if self.online:
cursor = 0
self.dt = self.dates[cursor]
self.online_mode = True
if not self.search_files_online():
log.success('No more files', 'PXReader')
return 0
else:
log.success('No more files', 'PXReader')
self.flagNoMoreFiles = 1
return 0
else:
if not self.search_files_online():
return 0
cursor = self.cursor
self.data = {}
self.header = []
for fullname in self.files[self.dates[cursor]]:
log.log('Opening: {}'.format(fullname), 'PXReader')
if os.path.splitext(fullname)[-1] == '.tgz':
tar = tarfile.open(fullname, 'r:gz')
tar.extractall('/tmp')
files = [os.path.join('/tmp', member.name) for member in tar.getmembers()]
else:
files = [fullname]
for filename in files:
if self.filename is not None:
self.fp.close()
self.filename = filename
self.filedate = self.dates[cursor]
self.fp = Dataset(self.filename, 'r')
self.parseFile()
self.counter_records += 1
self.cursor += 1
return 1
def readNextFile(self):
while True:
self.flagDiscontinuousBlock = 0
if not self.setNextFile():
return 0
self.datatime = datetime.datetime.utcfromtimestamp(self.header[0]['Time'])
if self.online:
break
if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
(self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
log.warning(
'Reading Record No. {}/{} -> {} [Skipping]'.format(
self.counter_records,
self.nrecords,
self.datatime.ctime()),
'PXReader')
continue
break
log.log(
'Reading Record No. {}/{} -> {}'.format(
self.counter_records,
self.nrecords,
self.datatime.ctime()),
'PXReader')
return 1
def set_output(self):
'''
Storing data from buffer to dataOut object
'''
self.data['Elevation'] = numpy.array(self.fp.variables['Elevation'])
self.data['Azimuth'] = numpy.array(self.fp.variables['Azimuth'])
self.dataOut.range = numpy.array(self.fp.variables['GateWidth'])
self.dataOut.data = self.data
self.dataOut.units = [h['Unit-value'] for h in self.header]
self.dataOut.parameters = [h['TypeName'] for h in self.header]
self.dataOut.missing = self.header[0]['MissingData']
self.dataOut.max_range = self.header[0]['MaximumRange-value']
self.dataOut.elevation = self.header[0]['Elevation']
self.dataOut.azimuth = self.header[0]['Azimuth']
self.dataOut.latitude = self.header[0]['Latitude']
self.dataOut.longitude = self.header[0]['Longitude']
self.dataOut.utctime = self.header[0]['Time']
self.dataOut.utctimeInit = self.dataOut.utctime
self.dataOut.useLocalTime = True
self.dataOut.flagNoData = False
self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
log.log('Parameters found: {}'.format(','.join(self.dataOut.parameters)),
'PXReader')
def getData(self):
'''
Storing data from databuffer to dataOut object
'''
if self.flagNoMoreFiles:
self.dataOut.flagNoData = True
log.error('No file left to process', 'PXReader')
return 0
if not self.readNextFile():
self.dataOut.flagNoData = True
return 0
self.set_output()
return 1