pxIO_param.py
350 lines
| 10.9 KiB
| text/x-python
|
PythonLexer
r1137 | ''' | |||
r1143 | Created on Jan 15, 2018 | |||
r1137 | ||||
@author: Juan C. Espinoza | ||||
''' | ||||
import os | ||||
import sys | ||||
import time | ||||
import glob | ||||
import datetime | ||||
import tarfile | ||||
import numpy | ||||
|
r1167 | from .utils import folder_in_range | ||
r1140 | ||||
r1137 | from schainpy.model.io.jroIO_base import JRODataReader | |||
from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation | ||||
from schainpy.model.data.jrodata import Parameters | ||||
from schainpy.utils import log | ||||
|
r1166 | try: | ||
from netCDF4 import Dataset | ||||
except: | ||||
log.warning( | ||||
'You should install "netCDF4" module if you want to read/write NCDF files' | ||||
) | ||||
r1137 | UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone) | |||
r1140 | class PXReader(JRODataReader, ProcessingUnit): | |||
r1137 | ||||
def __init__(self, **kwargs): | ||||
ProcessingUnit.__init__(self, **kwargs) | ||||
self.dataOut = Parameters() | ||||
self.counter_records = 0 | ||||
self.nrecords = None | ||||
self.flagNoMoreFiles = 0 | ||||
self.isConfig = False | ||||
self.filename = None | ||||
self.intervals = set() | ||||
self.ext = ('.nc', '.tgz') | ||||
self.online_mode = False | ||||
def setup(self, | ||||
path=None, | ||||
startDate=None, | ||||
endDate=None, | ||||
format=None, | ||||
startTime=datetime.time(0, 0, 0), | ||||
endTime=datetime.time(23, 59, 59), | ||||
walk=False, | ||||
**kwargs): | ||||
self.path = path | ||||
self.startDate = startDate | ||||
self.endDate = endDate | ||||
self.startTime = startTime | ||||
self.endTime = endTime | ||||
self.datatime = datetime.datetime(1900,1,1) | ||||
self.walk = walk | ||||
r1143 | self.nTries = kwargs.get('nTries', 10) | |||
r1137 | self.online = kwargs.get('online', False) | |||
r1143 | self.delay = kwargs.get('delay', 60) | |||
r1138 | self.ele = kwargs.get('ext', '') | |||
r1137 | ||||
if self.path is None: | ||||
|
r1167 | raise ValueError('The path is not valid') | ||
r1137 | ||||
self.search_files(path, startDate, endDate, startTime, endTime, walk) | ||||
self.cursor = 0 | ||||
self.counter_records = 0 | ||||
if not self.files: | ||||
|
r1167 | raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path)) | ||
r1137 | ||||
def search_files(self, path, startDate, endDate, startTime, endTime, walk): | ||||
''' | ||||
Searching for NCDF files in path | ||||
Creating a list of files to procces included in [startDate,endDate] | ||||
Input: | ||||
path - Path to find files | ||||
''' | ||||
r1140 | log.log('Searching files {} in {} '.format(self.ext, path), 'PXReader') | |||
r1137 | if walk: | |||
paths = [os.path.join(path, p) for p in os.listdir(path) if os.path.isdir(os.path.join(path, p))] | ||||
paths.sort() | ||||
else: | ||||
paths = [path] | ||||
fileList0 = [] | ||||
for subpath in paths: | ||||
r1140 | if not folder_in_range(subpath.split('/')[-1], startDate, endDate, '%Y%m%d'): | |||
continue | ||||
fileList0 += [os.path.join(subpath, s) for s in glob.glob1(subpath, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s] | ||||
r1137 | ||||
fileList0.sort() | ||||
r1138 | if self.online: | |||
fileList0 = fileList0[-1:] | ||||
r1137 | ||||
self.files = {} | ||||
startDate = startDate - datetime.timedelta(1) | ||||
endDate = endDate + datetime.timedelta(1) | ||||
for fullname in fileList0: | ||||
thisFile = fullname.split('/')[-1] | ||||
year = thisFile[3:7] | ||||
if not year.isdigit(): | ||||
continue | ||||
month = thisFile[7:9] | ||||
if not month.isdigit(): | ||||
continue | ||||
day = thisFile[9:11] | ||||
if not day.isdigit(): | ||||
continue | ||||
year, month, day = int(year), int(month), int(day) | ||||
dateFile = datetime.date(year, month, day) | ||||
timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18])) | ||||
if (startDate > dateFile) or (endDate < dateFile): | ||||
continue | ||||
dt = datetime.datetime.combine(dateFile, timeFile) | ||||
if dt not in self.files: | ||||
self.files[dt] = [] | ||||
self.files[dt].append(fullname) | ||||
|
r1167 | self.dates = list(self.files.keys()) | ||
r1137 | self.dates.sort() | |||
return | ||||
def search_files_online(self): | ||||
''' | ||||
Searching for NCDF files in online mode path | ||||
Creating a list of files to procces included in [startDate,endDate] | ||||
Input: | ||||
path - Path to find files | ||||
''' | ||||
self.files = {} | ||||
for n in range(self.nTries): | ||||
if self.walk: | ||||
paths = [os.path.join(self.path, p) for p in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, p))] | ||||
r1143 | paths.sort() | |||
r1138 | path = paths[-1] | |||
r1137 | else: | |||
r1143 | path = self.path | |||
r1137 | ||||
r1143 | new_files = [os.path.join(path, s) for s in glob.glob1(path, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s] | |||
r1137 | new_files.sort() | |||
r1138 | ||||
for fullname in new_files: | ||||
thisFile = fullname.split('/')[-1] | ||||
year = thisFile[3:7] | ||||
if not year.isdigit(): | ||||
continue | ||||
month = thisFile[7:9] | ||||
if not month.isdigit(): | ||||
continue | ||||
day = thisFile[9:11] | ||||
if not day.isdigit(): | ||||
continue | ||||
year, month, day = int(year), int(month), int(day) | ||||
dateFile = datetime.date(year, month, day) | ||||
timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18])) | ||||
dt = datetime.datetime.combine(dateFile, timeFile) | ||||
if self.dt >= dt: | ||||
continue | ||||
if dt not in self.files: | ||||
self.dt = dt | ||||
self.files[dt] = [] | ||||
self.files[dt].append(fullname) | ||||
break | ||||
if self.files: | ||||
r1137 | break | |||
else: | ||||
r1140 | log.warning('Waiting {} seconds for the next file, try {} ...'.format(self.delay, n + 1), 'PXReader') | |||
r1137 | time.sleep(self.delay) | |||
r1138 | if not self.files: | |||
r1137 | return 0 | |||
|
r1167 | self.dates = list(self.files.keys()) | ||
r1137 | self.dates.sort() | |||
self.cursor = 0 | ||||
return 1 | ||||
def parseFile(self): | ||||
''' | ||||
''' | ||||
r1140 | header = {} | |||
r1137 | ||||
for attr in self.fp.ncattrs(): | ||||
r1140 | header[str(attr)] = getattr(self.fp, attr) | |||
r1137 | ||||
r1140 | self.header.append(header) | |||
self.data[header['TypeName']] = numpy.array(self.fp.variables[header['TypeName']]) | ||||
r1137 | ||||
def setNextFile(self): | ||||
''' | ||||
Open next files for the current datetime | ||||
''' | ||||
cursor = self.cursor | ||||
if not self.online_mode: | ||||
if cursor == len(self.dates): | ||||
if self.online: | ||||
r1138 | cursor = 0 | |||
self.dt = self.dates[cursor] | ||||
r1137 | self.online_mode = True | |||
r1138 | if not self.search_files_online(): | |||
r1140 | log.success('No more files', 'PXReader') | |||
r1138 | return 0 | |||
r1137 | else: | |||
r1140 | log.success('No more files', 'PXReader') | |||
r1137 | self.flagNoMoreFiles = 1 | |||
return 0 | ||||
else: | ||||
if not self.search_files_online(): | ||||
return 0 | ||||
r1138 | cursor = self.cursor | |||
r1137 | ||||
self.data = {} | ||||
r1140 | self.header = [] | |||
r1137 | ||||
for fullname in self.files[self.dates[cursor]]: | ||||
r1140 | log.log('Opening: {}'.format(fullname), 'PXReader') | |||
r1137 | if os.path.splitext(fullname)[-1] == '.tgz': | |||
tar = tarfile.open(fullname, 'r:gz') | ||||
tar.extractall('/tmp') | ||||
files = [os.path.join('/tmp', member.name) for member in tar.getmembers()] | ||||
else: | ||||
files = [fullname] | ||||
for filename in files: | ||||
if self.filename is not None: | ||||
self.fp.close() | ||||
self.filename = filename | ||||
self.filedate = self.dates[cursor] | ||||
self.fp = Dataset(self.filename, 'r') | ||||
self.parseFile() | ||||
self.counter_records += 1 | ||||
self.cursor += 1 | ||||
return 1 | ||||
def readNextFile(self): | ||||
while True: | ||||
self.flagDiscontinuousBlock = 0 | ||||
if not self.setNextFile(): | ||||
return 0 | ||||
r1140 | self.datatime = datetime.datetime.utcfromtimestamp(self.header[0]['Time']) | |||
r1137 | ||||
r1143 | if self.online: | |||
break | ||||
r1137 | if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \ | |||
(self.datatime > datetime.datetime.combine(self.endDate, self.endTime)): | ||||
log.warning( | ||||
'Reading Record No. {}/{} -> {} [Skipping]'.format( | ||||
self.counter_records, | ||||
self.nrecords, | ||||
self.datatime.ctime()), | ||||
r1140 | 'PXReader') | |||
r1137 | continue | |||
break | ||||
log.log( | ||||
'Reading Record No. {}/{} -> {}'.format( | ||||
self.counter_records, | ||||
self.nrecords, | ||||
self.datatime.ctime()), | ||||
r1140 | 'PXReader') | |||
r1137 | ||||
return 1 | ||||
def set_output(self): | ||||
''' | ||||
Storing data from buffer to dataOut object | ||||
r1140 | ''' | |||
self.data['Elevation'] = numpy.array(self.fp.variables['Elevation']) | ||||
self.data['Azimuth'] = numpy.array(self.fp.variables['Azimuth']) | ||||
self.dataOut.range = numpy.array(self.fp.variables['GateWidth']) | ||||
self.dataOut.data = self.data | ||||
self.dataOut.units = [h['Unit-value'] for h in self.header] | ||||
self.dataOut.parameters = [h['TypeName'] for h in self.header] | ||||
self.dataOut.missing = self.header[0]['MissingData'] | ||||
self.dataOut.max_range = self.header[0]['MaximumRange-value'] | ||||
self.dataOut.elevation = self.header[0]['Elevation'] | ||||
self.dataOut.azimuth = self.header[0]['Azimuth'] | ||||
self.dataOut.latitude = self.header[0]['Latitude'] | ||||
self.dataOut.longitude = self.header[0]['Longitude'] | ||||
self.dataOut.utctime = self.header[0]['Time'] | ||||
r1137 | self.dataOut.utctimeInit = self.dataOut.utctime | |||
r1140 | self.dataOut.useLocalTime = True | |||
r1137 | self.dataOut.flagNoData = False | |||
self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock | ||||
r1140 | log.log('Parameters found: {}'.format(','.join(self.dataOut.parameters)), | |||
'PXReader') | ||||
r1137 | def getData(self): | |||
''' | ||||
Storing data from databuffer to dataOut object | ||||
''' | ||||
if self.flagNoMoreFiles: | ||||
self.dataOut.flagNoData = True | ||||
r1140 | log.error('No file left to process', 'PXReader') | |||
r1137 | return 0 | |||
if not self.readNextFile(): | ||||
self.dataOut.flagNoData = True | ||||
return 0 | ||||
self.set_output() | ||||
return 1 | ||||