From 124e67b869ed1f81e991a6609e2bae5b2295b9a6 2018-01-24 19:51:26 From: jespinoza Date: 2018-01-24 19:51:26 Subject: [PATCH] Add NCDFReader for PX1000 radar, new PlotPolarMap, bugs in jroplot_data --- diff --git a/schainpy/model/graphics/jroplot_data.py b/schainpy/model/graphics/jroplot_data.py index 261792b..4bd04bb 100644 --- a/schainpy/model/graphics/jroplot_data.py +++ b/schainpy/model/graphics/jroplot_data.py @@ -22,7 +22,7 @@ ncmap = matplotlib.colors.LinearSegmentedColormap.from_list( 'jro', numpy.vstack((blu_values, jet_values))) matplotlib.pyplot.register_cmap(cmap=ncmap) -CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis', 'plasma', 'inferno', 'Greys', 'seismic', 'bwr', 'coolwarm')] +CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis', 'plasma', 'inferno', 'Greys', 'seismic', 'bwr', 'coolwarm', 'spectral')] def figpause(interval): @@ -51,7 +51,7 @@ class PlotData(Operation, Process): __attrs__ = ['show', 'save', 'xmin', 'xmax', 'ymin', 'ymax', 'zmin', 'zmax', 'zlimits', 'xlabel', 'ylabel', 'xaxis','cb_label', 'title', 'colorbar', 'bgcolor', 'width', 'height', 'localtime', 'oneFigure', - 'showprofile', 'decimation'] + 'showprofile', 'decimation', 'ftp'] def __init__(self, **kwargs): @@ -68,6 +68,7 @@ class PlotData(Operation, Process): self.localtime = kwargs.pop('localtime', True) self.show = kwargs.get('show', True) self.save = kwargs.get('save', False) + self.ftp = kwargs.get('ftp', False) self.colormap = kwargs.get('colormap', self.colormap) self.colormap_coh = kwargs.get('colormap_coh', 'jet') self.colormap_phase = kwargs.get('colormap_phase', 'RdBu_r') @@ -77,6 +78,7 @@ class PlotData(Operation, Process): self.title = kwargs.get('wintitle', self.CODE.upper()) self.cb_label = kwargs.get('cb_label', None) self.cb_labels = kwargs.get('cb_labels', None) + self.labels = kwargs.get('labels', None) self.xaxis = kwargs.get('xaxis', 'frequency') self.zmin = kwargs.get('zmin', None) self.zmax = kwargs.get('zmax', None) @@ -84,8 +86,10 @@ class PlotData(Operation, Process): self.xmin = kwargs.get('xmin', None) self.xmax = kwargs.get('xmax', None) self.xrange = kwargs.get('xrange', 24) + self.xscale = kwargs.get('xscale', None) self.ymin = kwargs.get('ymin', None) self.ymax = kwargs.get('ymax', None) + self.yscale = kwargs.get('yscale', None) self.xlabel = kwargs.get('xlabel', None) self.decimation = kwargs.get('decimation', None) self.showSNR = kwargs.get('showSNR', False) @@ -94,6 +98,7 @@ class PlotData(Operation, Process): self.height = kwargs.get('height', None) self.colorbar = kwargs.get('colorbar', True) self.factors = kwargs.get('factors', [1, 1, 1, 1, 1, 1, 1, 1]) + self.channels = kwargs.get('channels', None) self.titles = kwargs.get('titles', []) self.polar = False @@ -368,14 +373,19 @@ class PlotData(Operation, Process): ymin = self.ymin if self.ymin else numpy.nanmin(self.y) ymax = self.ymax if self.ymax else numpy.nanmax(self.y) - Y = numpy.array([5, 10, 20, 50, 100, 200, 500, 1000, 2000]) - i = 1 if numpy.where(ymax-ymin < Y)[0][0] < 0 else numpy.where(ymax-ymin < Y)[0][0] - ystep = Y[i] / 5 + Y = numpy.array([5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000]) + i = 1 if numpy.where(abs(ymax-ymin) <= Y)[0][0] < 0 else numpy.where(abs(ymax-ymin) <= Y)[0][0] + ystep = Y[i] / 5. for n, ax in enumerate(self.axes): if ax.firsttime: ax.set_facecolor(self.bgcolor) ax.yaxis.set_major_locator(MultipleLocator(ystep)) + ax.xaxis.set_major_locator(MultipleLocator(ystep)) + if self.xscale: + ax.xaxis.set_major_formatter(FuncFormatter(lambda x, pos: '{0:g}'.format(x*self.xscale))) + if self.xscale: + ax.yaxis.set_major_formatter(FuncFormatter(lambda x, pos: '{0:g}'.format(x*self.yscale))) if self.xaxis is 'time': ax.xaxis.set_major_formatter(FuncFormatter(self.__fmtTime)) ax.xaxis.set_major_locator(LinearLocator(9)) @@ -419,7 +429,7 @@ class PlotData(Operation, Process): def __plot(self): ''' ''' - log.success('Plotting', self.name) + log.log('Plotting', self.name) try: self.plot() @@ -439,14 +449,20 @@ class PlotData(Operation, Process): self.getDateTime(self.max_time).strftime('%Y/%m/%d'))) fig.canvas.draw() - if self.save and self.data.ended: - channels = range(self.nrows) + if self.save and (self.data.ended or not self.data.buffering): + + if self.labels: + labels = self.labels + else: + labels = range(self.nrows) + if self.oneFigure: label = '' else: - label = '_{}'.format(channels[n]) + label = '-{}'.format(labels[n]) figname = os.path.join( self.save, + self.CODE, '{}{}_{}.png'.format( self.CODE, label, @@ -455,6 +471,8 @@ class PlotData(Operation, Process): ) ) log.log('Saving figure: {}'.format(figname), self.name) + if not os.path.isdir(os.path.dirname(figname)): + os.makedirs(os.path.dirname(figname)) fig.savefig(figname) def plot(self): @@ -464,7 +482,7 @@ class PlotData(Operation, Process): def run(self): - log.success('Starting', self.name) + log.log('Starting', self.name) context = zmq.Context() receiver = context.socket(zmq.SUB) @@ -499,7 +517,7 @@ class PlotData(Operation, Process): self.__plot() except zmq.Again as e: - log.log('Waiting for data...') + log.log('.', tag='', nl=False) if self.data: figpause(self.data.throttle) else: @@ -963,3 +981,62 @@ class PlotOutputData(PlotParamData): CODE = 'output' colormap = 'seismic' + + +class PlotPolarMapData(PlotData): + ''' + Plot for meteors detection data + ''' + + CODE = 'param' + colormap = 'seismic' + + def setup(self): + self.ncols = 1 + self.nrows = 1 + self.width = 9 + self.height = 8 + if self.channels is not None: + self.nplots = len(self.channels) + self.nrows = len(self.channels) + else: + self.nplots = self.data.shape(self.CODE)[0] + self.nrows = self.nplots + self.channels = range(self.nplots) + self.xlabel = 'Zonal Distance (km)' + self.ylabel = 'Meridional Distance (km)' + self.bgcolor = 'white' + + def plot(self): + + for n, ax in enumerate(self.axes): + data = self.data['param'][self.channels[n]] + + zeniths = numpy.arange(data.shape[1]) + azimuths = -numpy.radians(self.data.heights)+numpy.pi/2 + self.y = zeniths + + r, theta = numpy.meshgrid(zeniths, azimuths) + x, y = r*numpy.cos(theta), r*numpy.sin(theta) + + if ax.firsttime: + if self.zlimits is not None: + self.zmin, self.zmax = self.zlimits[n] + ax.plt = ax.pcolormesh(x, y, numpy.ma.array(data, mask=numpy.isnan(data)), + vmin=self.zmin, + vmax=self.zmax, + cmap=self.cmaps[n]) + else: + if self.zlimits is not None: + self.zmin, self.zmax = self.zlimits[n] + ax.collections.remove(ax.collections[0]) + ax.plt = ax.pcolormesh(x, y, numpy.ma.array(data, mask=numpy.isnan(data)), + vmin=self.zmin, + vmax=self.zmax, + cmap=self.cmaps[n]) + + + title = '' + + self.titles = [self.data.parameters[x] for x in self.channels] + self.saveTime = self.max_time diff --git a/schainpy/model/io/__init__.py b/schainpy/model/io/__init__.py index f125f45..e35b986 100644 --- a/schainpy/model/io/__init__.py +++ b/schainpy/model/io/__init__.py @@ -18,4 +18,6 @@ from jroIO_madrigal import * from bltrIO_param import * from jroIO_bltr import * from jroIO_mira35c import * -from julIO_param import * \ No newline at end of file +from julIO_param import * + +from pxIO_param import * \ No newline at end of file diff --git a/schainpy/model/io/pxIO_param.py b/schainpy/model/io/pxIO_param.py new file mode 100644 index 0000000..ce5fee5 --- /dev/null +++ b/schainpy/model/io/pxIO_param.py @@ -0,0 +1,351 @@ +''' +Created on Dec 27, 2017 + +@author: Juan C. Espinoza +''' + +import os +import sys +import time +import json +import glob +import datetime +import tarfile + +import numpy +from netCDF4 import Dataset + +from schainpy.model.io.jroIO_base import JRODataReader +from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation +from schainpy.model.data.jrodata import Parameters +from schainpy.utils import log + +UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone) + +def load_json(obj): + ''' + Parse json as string instead of unicode + ''' + + if isinstance(obj, str): + iterable = json.loads(obj) + else: + iterable = obj + + if isinstance(iterable, dict): + return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, unicode) else v + for k, v in iterable.items()} + elif isinstance(iterable, (list, tuple)): + return [str(v) if isinstance(v, unicode) else v for v in iterable] + + return iterable + + +class NCDFReader(JRODataReader, ProcessingUnit): + + def __init__(self, **kwargs): + + ProcessingUnit.__init__(self, **kwargs) + + self.dataOut = Parameters() + self.counter_records = 0 + self.nrecords = None + self.flagNoMoreFiles = 0 + self.isConfig = False + self.filename = None + self.intervals = set() + self.ext = ('.nc', '.tgz') + self.online_mode = False + + def setup(self, + path=None, + startDate=None, + endDate=None, + format=None, + startTime=datetime.time(0, 0, 0), + endTime=datetime.time(23, 59, 59), + walk=False, + **kwargs): + + self.path = path + self.startDate = startDate + self.endDate = endDate + self.startTime = startTime + self.endTime = endTime + self.datatime = datetime.datetime(1900,1,1) + self.walk = walk + self.nTries = kwargs.get('nTries', 3) + self.online = kwargs.get('online', False) + self.delay = kwargs.get('delay', 30) + + if self.path is None: + raise ValueError, 'The path is not valid' + + self.search_files(path, startDate, endDate, startTime, endTime, walk) + self.cursor = 0 + self.counter_records = 0 + + if not self.files: + raise Warning, 'There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path) + + def search_files(self, path, startDate, endDate, startTime, endTime, walk): + ''' + Searching for NCDF files in path + Creating a list of files to procces included in [startDate,endDate] + + Input: + path - Path to find files + ''' + + log.log('Searching files {} in {} '.format(self.ext, path), 'NCDFReader') + if walk: + paths = [os.path.join(path, p) for p in os.listdir(path) if os.path.isdir(os.path.join(path, p))] + paths.sort() + else: + paths = [path] + + fileList0 = [] + + for subpath in paths: + fileList0 += [os.path.join(subpath, s) for s in glob.glob1(subpath, '*') if os.path.splitext(s)[-1] in self.ext] + + fileList0.sort() + + self.files = {} + + startDate = startDate - datetime.timedelta(1) + endDate = endDate + datetime.timedelta(1) + + for fullname in fileList0: + thisFile = fullname.split('/')[-1] + year = thisFile[3:7] + if not year.isdigit(): + continue + + month = thisFile[7:9] + if not month.isdigit(): + continue + + day = thisFile[9:11] + if not day.isdigit(): + continue + + year, month, day = int(year), int(month), int(day) + dateFile = datetime.date(year, month, day) + timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18])) + + if (startDate > dateFile) or (endDate < dateFile): + continue + + dt = datetime.datetime.combine(dateFile, timeFile) + if dt not in self.files: + self.files[dt] = [] + self.files[dt].append(fullname) + + self.dates = self.files.keys() + self.dates.sort() + + return + + def search_files_online(self): + ''' + Searching for NCDF files in online mode path + Creating a list of files to procces included in [startDate,endDate] + + Input: + path - Path to find files + ''' + + old_files = self.files[self.dt] + self.files = {} + + for n in range(self.nTries): + + if self.walk: + paths = [os.path.join(self.path, p) for p in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, p))] + paths = paths[-2:] + else: + paths = [self.path] + + new_files = [] + + for path in paths: + new_files += [os.path.join(path, s) for s in glob.glob1(path, '*') if os.path.splitext(s)[-1] in self.ext and os.path.join(path, s not in old_files)] + + new_files.sort() + + if new_files: + break + else: + log.warning('Waiting {} seconds for the next file, try {} ...'.format(self.delay, n + 1), 'NCDFReader') + time.sleep(self.delay) + + if not new_files: + log.error('No more files found', 'NCDFReader') + return 0 + + startDate = self.dt - datetime.timedelta(seconds=1) + + for fullname in new_files: + thisFile = fullname.split('/')[-1] + year = thisFile[3:7] + if not year.isdigit(): + continue + + month = thisFile[7:9] + if not month.isdigit(): + continue + + day = thisFile[9:11] + if not day.isdigit(): + continue + + year, month, day = int(year), int(month), int(day) + dateFile = datetime.date(year, month, day) + timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18])) + + if (startDate > dateFile): + continue + + dt = datetime.datetime.combine(dateFile, timeFile) + if dt not in self.files: + self.files[dt] = [] + + self.files[dt].append(fullname) + + self.dates = self.files.keys() + self.dates.sort() + self.cursor = 0 + + return 1 + + def parseFile(self): + ''' + ''' + + self.header = {} + + for attr in self.fp.ncattrs(): + self.header[str(attr)] = getattr(self.fp, attr) + + self.data[self.header['TypeName']] = numpy.array(self.fp.variables[self.header['TypeName']]) + + if 'Azimuth' not in self.data: + self.data['Azimuth'] = numpy.array(self.fp.variables['Azimuth']) + + + def setNextFile(self): + ''' + Open next files for the current datetime + ''' + + cursor = self.cursor + + if not self.online_mode: + self.dt = self.dates[cursor] + if cursor == len(self.dates): + if self.online: + self.online_mode = True + else: + log.success('No more files', 'NCDFReader') + self.flagNoMoreFiles = 1 + return 0 + else: + if not self.search_files_online(): + return 0 + + log.log( + 'Opening: {}\'s files'.format(self.dates[cursor]), + 'NCDFReader' + ) + + self.data = {} + + for fullname in self.files[self.dates[cursor]]: + + if os.path.splitext(fullname)[-1] == '.tgz': + tar = tarfile.open(fullname, 'r:gz') + tar.extractall('/tmp') + files = [os.path.join('/tmp', member.name) for member in tar.getmembers()] + else: + files = [fullname] + + for filename in files: + if self.filename is not None: + self.fp.close() + + self.filename = filename + self.filedate = self.dates[cursor] + self.fp = Dataset(self.filename, 'r') + self.parseFile() + + self.counter_records += 1 + self.cursor += 1 + return 1 + + def readNextFile(self): + + while True: + self.flagDiscontinuousBlock = 0 + if not self.setNextFile(): + return 0 + + self.datatime = datetime.datetime.utcfromtimestamp(self.header['Time']) + + if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \ + (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)): + log.warning( + 'Reading Record No. {}/{} -> {} [Skipping]'.format( + self.counter_records, + self.nrecords, + self.datatime.ctime()), + 'NCDFReader') + continue + break + + log.log( + 'Reading Record No. {}/{} -> {}'.format( + self.counter_records, + self.nrecords, + self.datatime.ctime()), + 'NCDFReader') + + return 1 + + + def set_output(self): + ''' + Storing data from buffer to dataOut object + ''' + + self.dataOut.heightList = self.data.pop('Azimuth') + + log.log('Parameters found: {}'.format(','.join(self.data.keys())), + 'PXReader') + + self.dataOut.data_param = numpy.array(self.data.values()) + self.dataOut.data_param[self.dataOut.data_param == -99900.] = numpy.nan + self.dataOut.parameters = self.data.keys() + self.dataOut.utctime = self.header['Time'] + self.dataOut.utctimeInit = self.dataOut.utctime + self.dataOut.useLocalTime = False + self.dataOut.flagNoData = False + self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock + + def getData(self): + ''' + Storing data from databuffer to dataOut object + ''' + if self.flagNoMoreFiles: + self.dataOut.flagNoData = True + log.error('No file left to process', 'NCDFReader') + return 0 + + if not self.readNextFile(): + self.dataOut.flagNoData = True + return 0 + + self.set_output() + + return 1 +