diff --git a/schainpy/controller.py b/schainpy/controller.py index affb256..ce97a95 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -1282,7 +1282,7 @@ class Project(Process): def run(self): - log.success('Starting {}'.format(self.name)) + log.success('Starting {}'.format(self.name), tag='') self.start_time = time.time() self.createObjects() self.connectObjects() diff --git a/schainpy/model/graphics/jroplot_data.py b/schainpy/model/graphics/jroplot_data.py index 21914ec..6253a0a 100644 --- a/schainpy/model/graphics/jroplot_data.py +++ b/schainpy/model/graphics/jroplot_data.py @@ -9,6 +9,7 @@ import zmq import numpy import matplotlib import matplotlib.pyplot as plt +from matplotlib.patches import Polygon from mpl_toolkits.axes_grid1 import make_axes_locatable from matplotlib.ticker import FuncFormatter, LinearLocator, MultipleLocator @@ -24,6 +25,23 @@ matplotlib.pyplot.register_cmap(cmap=ncmap) CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis', 'plasma', 'inferno', 'Greys', 'seismic', 'bwr', 'coolwarm')] +EARTH_RADIUS = 6.3710e3 + +def ll2xy(lat1, lon1, lat2, lon2): + + p = 0.017453292519943295 + a = 0.5 - numpy.cos((lat2 - lat1) * p)/2 + numpy.cos(lat1 * p) * numpy.cos(lat2 * p) * (1 - numpy.cos((lon2 - lon1) * p)) / 2 + r = 12742 * numpy.arcsin(numpy.sqrt(a)) + theta = numpy.arctan2(numpy.sin((lon2-lon1)*p)*numpy.cos(lat2*p), numpy.cos(lat1*p)*numpy.sin(lat2*p)-numpy.sin(lat1*p)*numpy.cos(lat2*p)*numpy.cos((lon2-lon1)*p)) + theta = -theta + numpy.pi/2 + return r*numpy.cos(theta), r*numpy.sin(theta) + +def km2deg(km): + ''' + Convert distance in km to degrees + ''' + + return numpy.rad2deg(km/EARTH_RADIUS) def figpause(interval): backend = plt.rcParams['backend'] @@ -64,7 +82,7 @@ class PlotData(Operation, Process): __attrs__ = ['show', 'save', 'xmin', 'xmax', 'ymin', 'ymax', 'zmin', 'zmax', 'zlimits', 'xlabel', 'ylabel', 'xaxis','cb_label', 'title', 'colorbar', 'bgcolor', 'width', 'height', 'localtime', 'oneFigure', - 'showprofile', 'decimation'] + 'showprofile', 'decimation', 'ftp'] def __init__(self, **kwargs): @@ -81,6 +99,7 @@ class PlotData(Operation, Process): self.localtime = kwargs.pop('localtime', True) self.show = kwargs.get('show', True) self.save = kwargs.get('save', False) + self.ftp = kwargs.get('ftp', False) self.colormap = kwargs.get('colormap', self.colormap) self.colormap_coh = kwargs.get('colormap_coh', 'jet') self.colormap_phase = kwargs.get('colormap_phase', 'RdBu_r') @@ -90,6 +109,7 @@ class PlotData(Operation, Process): self.title = kwargs.get('wintitle', self.CODE.upper()) self.cb_label = kwargs.get('cb_label', None) self.cb_labels = kwargs.get('cb_labels', None) + self.labels = kwargs.get('labels', None) self.xaxis = kwargs.get('xaxis', 'frequency') self.zmin = kwargs.get('zmin', None) self.zmax = kwargs.get('zmax', None) @@ -97,8 +117,10 @@ class PlotData(Operation, Process): self.xmin = kwargs.get('xmin', None) self.xmax = kwargs.get('xmax', None) self.xrange = kwargs.get('xrange', 24) + self.xscale = kwargs.get('xscale', None) self.ymin = kwargs.get('ymin', None) self.ymax = kwargs.get('ymax', None) + self.yscale = kwargs.get('yscale', None) self.xlabel = kwargs.get('xlabel', None) self.decimation = kwargs.get('decimation', None) self.showSNR = kwargs.get('showSNR', False) @@ -107,8 +129,10 @@ class PlotData(Operation, Process): self.height = kwargs.get('height', None) self.colorbar = kwargs.get('colorbar', True) self.factors = kwargs.get('factors', [1, 1, 1, 1, 1, 1, 1, 1]) + self.channels = kwargs.get('channels', None) self.titles = kwargs.get('titles', []) self.polar = False + self.grid = kwargs.get('grid', False) def __fmtTime(self, x, pos): ''' @@ -381,14 +405,19 @@ class PlotData(Operation, Process): ymin = self.ymin if self.ymin else numpy.nanmin(self.y) ymax = self.ymax if self.ymax else numpy.nanmax(self.y) - Y = numpy.array([5, 10, 20, 50, 100, 200, 500, 1000, 2000]) - i = 1 if numpy.where(ymax-ymin < Y)[0][0] < 0 else numpy.where(ymax-ymin < Y)[0][0] - ystep = Y[i] / 5 + Y = numpy.array([1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000]) + i = 1 if numpy.where(abs(ymax-ymin) <= Y)[0][0] < 0 else numpy.where(abs(ymax-ymin) <= Y)[0][0] + ystep = Y[i] / 10. for n, ax in enumerate(self.axes): if ax.firsttime: ax.set_facecolor(self.bgcolor) ax.yaxis.set_major_locator(MultipleLocator(ystep)) + ax.xaxis.set_major_locator(MultipleLocator(ystep)) + if self.xscale: + ax.xaxis.set_major_formatter(FuncFormatter(lambda x, pos: '{0:g}'.format(x*self.xscale))) + if self.xscale: + ax.yaxis.set_major_formatter(FuncFormatter(lambda x, pos: '{0:g}'.format(x*self.yscale))) if self.xaxis is 'time': ax.xaxis.set_major_formatter(FuncFormatter(self.__fmtTime)) ax.xaxis.set_major_locator(LinearLocator(9)) @@ -414,13 +443,15 @@ class PlotData(Operation, Process): ax.cbar.set_label(self.cb_labels[n], size=8) else: ax.cbar = None + if self.grid: + ax.grid(True) if not self.polar: ax.set_xlim(xmin, xmax) ax.set_ylim(ymin, ymax) - ax.set_title('{} - {} {}'.format( + ax.set_title('{} {} {}'.format( self.titles[n], - self.getDateTime(self.max_time).strftime('%H:%M:%S'), + self.getDateTime(self.max_time).strftime('%Y-%m-%dT%H:%M:%S'), self.time_label), size=8) else: @@ -432,13 +463,15 @@ class PlotData(Operation, Process): def __plot(self): ''' ''' - log.success('Plotting', self.name) + log.log('Plotting', self.name) try: self.plot() self.format() - except: + except Exception as e: log.warning('{} Plot could not be updated... check data'.format(self.CODE), self.name) + log.error(str(e), '') + return for n, fig in enumerate(self.figures): if self.nrows == 0 or self.nplots == 0: @@ -452,14 +485,20 @@ class PlotData(Operation, Process): self.getDateTime(self.max_time).strftime('%Y/%m/%d'))) fig.canvas.draw() - if self.save and self.data.ended: - channels = range(self.nrows) + if self.save and (self.data.ended or not self.data.buffering): + + if self.save_labels: + labels = self.save_labels + else: + labels = range(self.nrows) + if self.oneFigure: label = '' else: - label = '_{}'.format(channels[n]) + label = '-{}'.format(labels[n]) figname = os.path.join( self.save, + self.CODE, '{}{}_{}.png'.format( self.CODE, label, @@ -468,6 +507,8 @@ class PlotData(Operation, Process): ) ) log.log('Saving figure: {}'.format(figname), self.name) + if not os.path.isdir(os.path.dirname(figname)): + os.makedirs(os.path.dirname(figname)) fig.savefig(figname) def plot(self): @@ -477,7 +518,7 @@ class PlotData(Operation, Process): def run(self): - log.success('Starting', self.name) + log.log('Starting', self.name) context = zmq.Context() receiver = context.socket(zmq.SUB) @@ -978,3 +1019,130 @@ class PlotOutputData(PlotParamData): CODE = 'output' colormap = 'seismic' + + +class PlotPolarMapData(PlotData): + ''' + Plot for meteors detection data + ''' + + CODE = 'param' + colormap = 'seismic' + + def setup(self): + self.ncols = 1 + self.nrows = 1 + self.width = 9 + self.height = 8 + self.mode = self.data.meta['mode'] + if self.channels is not None: + self.nplots = len(self.channels) + self.nrows = len(self.channels) + else: + self.nplots = self.data.shape(self.CODE)[0] + self.nrows = self.nplots + self.channels = range(self.nplots) + if self.mode == 'E': + self.xlabel = 'Longitude' + self.ylabel = 'Latitude' + else: + self.xlabel = 'Range (km)' + self.ylabel = 'Height (km)' + self.bgcolor = 'white' + self.cb_labels = self.data.meta['units'] + self.lat = self.data.meta['latitude'] + self.lon = self.data.meta['longitude'] + self.xmin, self.xmax = float(km2deg(self.xmin) + self.lon), float(km2deg(self.xmax) + self.lon) + self.ymin, self.ymax = float(km2deg(self.ymin) + self.lat), float(km2deg(self.ymax) + self.lat) + # self.polar = True + + def plot(self): + + for n, ax in enumerate(self.axes): + data = self.data['param'][self.channels[n]] + + zeniths = numpy.linspace(0, self.data.meta['max_range'], data.shape[1]) + if self.mode == 'E': + azimuths = -numpy.radians(self.data.heights)+numpy.pi/2 + r, theta = numpy.meshgrid(zeniths, azimuths) + x, y = r*numpy.cos(theta)*numpy.cos(numpy.radians(self.data.meta['elevation'])), r*numpy.sin(theta)*numpy.cos(numpy.radians(self.data.meta['elevation'])) + x = km2deg(x) + self.lon + y = km2deg(y) + self.lat + else: + azimuths = numpy.radians(self.data.heights) + r, theta = numpy.meshgrid(zeniths, azimuths) + x, y = r*numpy.cos(theta), r*numpy.sin(theta) + self.y = zeniths + + if ax.firsttime: + if self.zlimits is not None: + self.zmin, self.zmax = self.zlimits[n] + ax.plt = ax.pcolormesh(#r, theta, numpy.ma.array(data, mask=numpy.isnan(data)), + x, y, numpy.ma.array(data, mask=numpy.isnan(data)), + vmin=self.zmin, + vmax=self.zmax, + cmap=self.cmaps[n]) + else: + if self.zlimits is not None: + self.zmin, self.zmax = self.zlimits[n] + ax.collections.remove(ax.collections[0]) + ax.plt = ax.pcolormesh(# r, theta, numpy.ma.array(data, mask=numpy.isnan(data)), + x, y, numpy.ma.array(data, mask=numpy.isnan(data)), + vmin=self.zmin, + vmax=self.zmax, + cmap=self.cmaps[n]) + + if self.mode == 'A': + continue + + # plot district names + f = open('/data/workspace/schain_scripts/distrito.csv') + for line in f: + label, lon, lat = [s.strip() for s in line.split(',') if s] + lat = float(lat) + lon = float(lon) + # ax.plot(lon, lat, '.b', ms=2) + ax.text(lon, lat, label.decode('utf8'), ha='center', va='bottom', size='8', color='black') + + # plot limites + limites =[] + tmp = [] + for line in open('/data/workspace/schain_scripts/lima.csv'): + if '#' in line: + if tmp: + limites.append(tmp) + tmp = [] + continue + values = line.strip().split(',') + tmp.append((float(values[0]), float(values[1]))) + for points in limites: + ax.add_patch(Polygon(points, ec='k', fc='none', ls='--', lw=0.5)) + + # plot Cuencas + for cuenca in ('rimac', 'lurin', 'mala', 'chillon', 'chilca', 'chancay-huaral'): + f = open('/data/workspace/schain_scripts/{}.csv'.format(cuenca)) + values = [line.strip().split(',') for line in f] + points = [(float(s[0]), float(s[1])) for s in values] + ax.add_patch(Polygon(points, ec='b', fc='none')) + + # plot grid + for r in (15, 30, 45, 60): + ax.add_artist(plt.Circle((self.lon, self.lat), km2deg(r), color='0.6', fill=False, lw=0.2)) + ax.text( + self.lon + (km2deg(r))*numpy.cos(60*numpy.pi/180), + self.lat + (km2deg(r))*numpy.sin(60*numpy.pi/180), + '{}km'.format(r), + ha='center', va='bottom', size='8', color='0.6', weight='heavy') + + if self.mode == 'E': + title = 'El={}$^\circ$'.format(self.data.meta['elevation']) + label = 'E{:02d}'.format(int(self.data.meta['elevation'])) + else: + title = 'Az={}$^\circ$'.format(self.data.meta['azimuth']) + label = 'A{:02d}'.format(int(self.data.meta['azimuth'])) + + self.save_labels = ['{}-{}'.format(lbl, label) for lbl in self.labels] + self.titles = ['{} {}'.format(self.data.parameters[x], title) for x in self.channels] + self.saveTime = self.max_time + + diff --git a/schainpy/model/graphics/jroplot_spectra.py b/schainpy/model/graphics/jroplot_spectra.py index 99882c0..eafac9f 100644 --- a/schainpy/model/graphics/jroplot_spectra.py +++ b/schainpy/model/graphics/jroplot_spectra.py @@ -25,8 +25,8 @@ class SpectraPlot(Figure): self.isConfig = False self.__nsubplots = 1 - self.WIDTH = 300 - self.HEIGHT = 300 + self.WIDTH = 250 + self.HEIGHT = 250 self.WIDTHPROF = 120 self.HEIGHTPROF = 0 self.counter_imagwr = 0 diff --git a/schainpy/model/io/__init__.py b/schainpy/model/io/__init__.py index f125f45..e35b986 100644 --- a/schainpy/model/io/__init__.py +++ b/schainpy/model/io/__init__.py @@ -18,4 +18,6 @@ from jroIO_madrigal import * from bltrIO_param import * from jroIO_bltr import * from jroIO_mira35c import * -from julIO_param import * \ No newline at end of file +from julIO_param import * + +from pxIO_param import * \ No newline at end of file diff --git a/schainpy/model/io/pxIO_param.py b/schainpy/model/io/pxIO_param.py new file mode 100644 index 0000000..d7d4223 --- /dev/null +++ b/schainpy/model/io/pxIO_param.py @@ -0,0 +1,350 @@ +''' +Created on Jan 15, 2018 + +@author: Juan C. Espinoza +''' + +import os +import sys +import time +import glob +import datetime +import tarfile + +import numpy +try: + from netCDF4 import Dataset +except: + log.warning( + 'You should install "netCDF4" module if you want to read/write NCDF files' + ) + +from utils import folder_in_range + +from schainpy.model.io.jroIO_base import JRODataReader +from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation +from schainpy.model.data.jrodata import Parameters +from schainpy.utils import log + +UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone) + + +class PXReader(JRODataReader, ProcessingUnit): + + def __init__(self, **kwargs): + + ProcessingUnit.__init__(self, **kwargs) + + self.dataOut = Parameters() + self.counter_records = 0 + self.nrecords = None + self.flagNoMoreFiles = 0 + self.isConfig = False + self.filename = None + self.intervals = set() + self.ext = ('.nc', '.tgz') + self.online_mode = False + + def setup(self, + path=None, + startDate=None, + endDate=None, + format=None, + startTime=datetime.time(0, 0, 0), + endTime=datetime.time(23, 59, 59), + walk=False, + **kwargs): + + self.path = path + self.startDate = startDate + self.endDate = endDate + self.startTime = startTime + self.endTime = endTime + self.datatime = datetime.datetime(1900,1,1) + self.walk = walk + self.nTries = kwargs.get('nTries', 10) + self.online = kwargs.get('online', False) + self.delay = kwargs.get('delay', 60) + self.ele = kwargs.get('ext', '') + + if self.path is None: + raise ValueError, 'The path is not valid' + + self.search_files(path, startDate, endDate, startTime, endTime, walk) + self.cursor = 0 + self.counter_records = 0 + + if not self.files: + raise Warning, 'There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path) + + def search_files(self, path, startDate, endDate, startTime, endTime, walk): + ''' + Searching for NCDF files in path + Creating a list of files to procces included in [startDate,endDate] + + Input: + path - Path to find files + ''' + + log.log('Searching files {} in {} '.format(self.ext, path), 'PXReader') + if walk: + paths = [os.path.join(path, p) for p in os.listdir(path) if os.path.isdir(os.path.join(path, p))] + paths.sort() + else: + paths = [path] + + fileList0 = [] + + for subpath in paths: + if not folder_in_range(subpath.split('/')[-1], startDate, endDate, '%Y%m%d'): + continue + fileList0 += [os.path.join(subpath, s) for s in glob.glob1(subpath, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s] + + fileList0.sort() + if self.online: + fileList0 = fileList0[-1:] + + self.files = {} + + startDate = startDate - datetime.timedelta(1) + endDate = endDate + datetime.timedelta(1) + + for fullname in fileList0: + thisFile = fullname.split('/')[-1] + year = thisFile[3:7] + if not year.isdigit(): + continue + + month = thisFile[7:9] + if not month.isdigit(): + continue + + day = thisFile[9:11] + if not day.isdigit(): + continue + + year, month, day = int(year), int(month), int(day) + dateFile = datetime.date(year, month, day) + timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18])) + + if (startDate > dateFile) or (endDate < dateFile): + continue + + dt = datetime.datetime.combine(dateFile, timeFile) + if dt not in self.files: + self.files[dt] = [] + self.files[dt].append(fullname) + + self.dates = self.files.keys() + self.dates.sort() + + return + + def search_files_online(self): + ''' + Searching for NCDF files in online mode path + Creating a list of files to procces included in [startDate,endDate] + + Input: + path - Path to find files + ''' + + self.files = {} + + for n in range(self.nTries): + + if self.walk: + paths = [os.path.join(self.path, p) for p in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, p))] + paths.sort() + path = paths[-1] + else: + path = self.path + + new_files = [os.path.join(path, s) for s in glob.glob1(path, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s] + new_files.sort() + + for fullname in new_files: + thisFile = fullname.split('/')[-1] + year = thisFile[3:7] + if not year.isdigit(): + continue + + month = thisFile[7:9] + if not month.isdigit(): + continue + + day = thisFile[9:11] + if not day.isdigit(): + continue + + year, month, day = int(year), int(month), int(day) + dateFile = datetime.date(year, month, day) + timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18])) + + dt = datetime.datetime.combine(dateFile, timeFile) + + if self.dt >= dt: + continue + + if dt not in self.files: + self.dt = dt + self.files[dt] = [] + + self.files[dt].append(fullname) + break + + if self.files: + break + else: + log.warning('Waiting {} seconds for the next file, try {} ...'.format(self.delay, n + 1), 'PXReader') + time.sleep(self.delay) + + if not self.files: + return 0 + + self.dates = self.files.keys() + self.dates.sort() + self.cursor = 0 + + return 1 + + def parseFile(self): + ''' + ''' + + header = {} + + for attr in self.fp.ncattrs(): + header[str(attr)] = getattr(self.fp, attr) + + self.header.append(header) + + self.data[header['TypeName']] = numpy.array(self.fp.variables[header['TypeName']]) + + def setNextFile(self): + ''' + Open next files for the current datetime + ''' + + cursor = self.cursor + if not self.online_mode: + if cursor == len(self.dates): + if self.online: + cursor = 0 + self.dt = self.dates[cursor] + self.online_mode = True + if not self.search_files_online(): + log.success('No more files', 'PXReader') + return 0 + else: + log.success('No more files', 'PXReader') + self.flagNoMoreFiles = 1 + return 0 + else: + if not self.search_files_online(): + return 0 + cursor = self.cursor + + self.data = {} + self.header = [] + + for fullname in self.files[self.dates[cursor]]: + + log.log('Opening: {}'.format(fullname), 'PXReader') + + if os.path.splitext(fullname)[-1] == '.tgz': + tar = tarfile.open(fullname, 'r:gz') + tar.extractall('/tmp') + files = [os.path.join('/tmp', member.name) for member in tar.getmembers()] + else: + files = [fullname] + + for filename in files: + if self.filename is not None: + self.fp.close() + + self.filename = filename + self.filedate = self.dates[cursor] + self.fp = Dataset(self.filename, 'r') + self.parseFile() + + self.counter_records += 1 + self.cursor += 1 + return 1 + + def readNextFile(self): + + while True: + self.flagDiscontinuousBlock = 0 + if not self.setNextFile(): + return 0 + + self.datatime = datetime.datetime.utcfromtimestamp(self.header[0]['Time']) + + if self.online: + break + + if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \ + (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)): + log.warning( + 'Reading Record No. {}/{} -> {} [Skipping]'.format( + self.counter_records, + self.nrecords, + self.datatime.ctime()), + 'PXReader') + continue + break + + log.log( + 'Reading Record No. {}/{} -> {}'.format( + self.counter_records, + self.nrecords, + self.datatime.ctime()), + 'PXReader') + + return 1 + + + def set_output(self): + ''' + Storing data from buffer to dataOut object + ''' + + self.data['Elevation'] = numpy.array(self.fp.variables['Elevation']) + self.data['Azimuth'] = numpy.array(self.fp.variables['Azimuth']) + self.dataOut.range = numpy.array(self.fp.variables['GateWidth']) + self.dataOut.data = self.data + self.dataOut.units = [h['Unit-value'] for h in self.header] + self.dataOut.parameters = [h['TypeName'] for h in self.header] + self.dataOut.missing = self.header[0]['MissingData'] + self.dataOut.max_range = self.header[0]['MaximumRange-value'] + self.dataOut.elevation = self.header[0]['Elevation'] + self.dataOut.azimuth = self.header[0]['Azimuth'] + self.dataOut.latitude = self.header[0]['Latitude'] + self.dataOut.longitude = self.header[0]['Longitude'] + self.dataOut.utctime = self.header[0]['Time'] + self.dataOut.utctimeInit = self.dataOut.utctime + self.dataOut.useLocalTime = True + self.dataOut.flagNoData = False + self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock + + log.log('Parameters found: {}'.format(','.join(self.dataOut.parameters)), + 'PXReader') + + def getData(self): + ''' + Storing data from databuffer to dataOut object + ''' + if self.flagNoMoreFiles: + self.dataOut.flagNoData = True + log.error('No file left to process', 'PXReader') + return 0 + + if not self.readNextFile(): + self.dataOut.flagNoData = True + return 0 + + self.set_output() + + return 1 + diff --git a/schainpy/model/io/utils.py b/schainpy/model/io/utils.py new file mode 100644 index 0000000..2660a31 --- /dev/null +++ b/schainpy/model/io/utils.py @@ -0,0 +1,24 @@ +""" +Utilities for IO modules +""" + +import os +from datetime import datetime + +def folder_in_range(folder, start_date, end_date, pattern): + """ + Check whether folder is bettwen start_date and end_date + + Args: + folder (str): Folder to check + start_date (date): Initial date + end_date (date): Final date + pattern (str): Datetime format of the folder + Returns: + bool: True for success, False otherwise + """ + try: + dt = datetime.strptime(folder, pattern) + except: + raise ValueError('Folder {} does not match {} format'.format(folder, pattern)) + return start_date <= dt.date() <= end_date diff --git a/schainpy/model/proc/__init__.py b/schainpy/model/proc/__init__.py index af598f5..2f1588c 100644 --- a/schainpy/model/proc/__init__.py +++ b/schainpy/model/proc/__init__.py @@ -13,3 +13,4 @@ from jroproc_parameters import * from jroproc_spectra_lags import * from jroproc_spectra_acf import * from bltrproc_parameters import * +from pxproc_parameters import * diff --git a/schainpy/model/proc/pxproc_parameters.py b/schainpy/model/proc/pxproc_parameters.py new file mode 100644 index 0000000..5d5e9d8 --- /dev/null +++ b/schainpy/model/proc/pxproc_parameters.py @@ -0,0 +1,64 @@ +''' +Created on Oct 24, 2016 + +@author: roj- LouVD +''' + +import numpy +import datetime +import time +from time import gmtime + +from numpy import transpose + +from jroproc_base import ProcessingUnit, Operation +from schainpy.model.data.jrodata import Parameters + + +class PXParametersProc(ProcessingUnit): + ''' + Processing unit for PX parameters data + ''' + + def __init__(self, **kwargs): + """ + Inputs: None + """ + ProcessingUnit.__init__(self, **kwargs) + self.dataOut = Parameters() + self.isConfig = False + + def setup(self, mode): + """ + """ + self.dataOut.mode = mode + + def run(self, mode): + """ + Args: + mode (str): select independent variable 'E' for elevation or 'A' for azimuth + """ + + if not self.isConfig: + self.setup(mode) + self.isConfig = True + + if self.dataIn.type == 'Parameters': + self.dataOut.copy(self.dataIn) + + self.dataOut.data_param = numpy.array([self.dataOut.data[var] for var in self.dataOut.parameters]) + self.dataOut.data_param[self.dataOut.data_param == self.dataOut.missing] = numpy.nan + + if mode.upper()=='E': + self.dataOut.heightList = self.dataOut.data['Azimuth'] + else: + self.dataOut.heightList = self.dataOut.data['Elevation'] + + attrs = ['units', 'elevation', 'azimuth', 'max_range', 'latitude', 'longitude'] + meta = {} + + for attr in attrs: + meta[attr] = getattr(self.dataOut, attr) + + meta['mode'] = mode + self.dataOut.meta = meta \ No newline at end of file diff --git a/schainpy/model/utils/jroutils_publish.py b/schainpy/model/utils/jroutils_publish.py index 3e59563..fdd71ab 100644 --- a/schainpy/model/utils/jroutils_publish.py +++ b/schainpy/model/utils/jroutils_publish.py @@ -2,12 +2,15 @@ @author: Juan C. Espinoza ''' +import os +import glob import time import json import numpy import paho.mqtt.client as mqtt import zmq import datetime +import ftplib from zmq.utils.monitor import recv_monitor_message from functools import wraps from threading import Thread @@ -17,12 +20,39 @@ from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit from schainpy.model.data.jrodata import JROData from schainpy.utils import log -MAXNUMX = 100 -MAXNUMY = 100 - -class PrettyFloat(float): - def __repr__(self): - return '%.2f' % self +MAXNUMX = 500 +MAXNUMY = 500 + +PLOT_CODES = { + 'rti': 0, # Range time intensity (RTI). + 'spc': 1, # Spectra (and Cross-spectra) information. + 'cspc': 2, # Cross-Correlation information. + 'coh': 3, # Coherence map. + 'base': 4, # Base lines graphic. + 'row': 5, # Row Spectra. + 'total': 6, # Total Power. + 'drift': 7, # Drifts graphics. + 'height': 8, # Height profile. + 'phase': 9, # Signal Phase. + 'power': 16, + 'noise': 17, + 'beacon': 18, + 'wind': 22, + 'skymap': 23, + 'Unknown': 24, + 'V-E': 25, # PIP Velocity. + 'Z-E': 26, # PIP Reflectivity. + 'V-A': 27, # RHI Velocity. + 'Z-A': 28, # RHI Reflectivity. +} + +def get_plot_code(s): + label = s.split('_')[0] + codes = [key for key in PLOT_CODES if key in label] + if codes: + return PLOT_CODES[codes[0]] + else: + return 24 def roundFloats(obj): if isinstance(obj, list): @@ -82,12 +112,14 @@ class Data(object): Object to hold data to be plotted ''' - def __init__(self, plottypes, throttle_value, exp_code): + def __init__(self, plottypes, throttle_value, exp_code, buffering=True): self.plottypes = plottypes self.throttle = throttle_value self.exp_code = exp_code + self.buffering = buffering self.ended = False self.localtime = False + self.meta = {} self.__times = [] self.__heights = [] @@ -102,7 +134,7 @@ class Data(object): if key not in self.data: raise KeyError(log.error('Missing key: {}'.format(key))) - if 'spc' in key: + if 'spc' in key or not self.buffering: ret = self.data[key] else: ret = numpy.array([self.data[key][x] for x in self.times]) @@ -118,6 +150,7 @@ class Data(object): Configure object ''' + self.type = '' self.ended = False self.data = {} self.__times = [] @@ -134,7 +167,7 @@ class Data(object): ''' if len(self.data[key]): - if 'spc' in key: + if 'spc' in key or not self.buffering: return self.data[key].shape return self.data[key][self.__times[0]].shape return (0,) @@ -147,9 +180,12 @@ class Data(object): if tm in self.__times: return + self.type = dataOut.type self.parameters = getattr(dataOut, 'parameters', []) if hasattr(dataOut, 'pairsList'): self.pairs = dataOut.pairsList + if hasattr(dataOut, 'meta'): + self.meta = dataOut.meta self.channels = dataOut.channelList self.interval = dataOut.getTimeInterval() self.localtime = dataOut.useLocalTime @@ -162,31 +198,39 @@ class Data(object): for plot in self.plottypes: if plot == 'spc': z = dataOut.data_spc/dataOut.normFactor - self.data[plot] = 10*numpy.log10(z) + buffer = 10*numpy.log10(z) if plot == 'cspc': - self.data[plot] = dataOut.data_cspc + buffer = dataOut.data_cspc if plot == 'noise': - self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor) + buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor) if plot == 'rti': - self.data[plot][tm] = dataOut.getPower() + buffer = dataOut.getPower() if plot == 'snr_db': - self.data['snr'][tm] = dataOut.data_SNR + buffer = dataOut.data_SNR if plot == 'snr': - self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR) + buffer = 10*numpy.log10(dataOut.data_SNR) if plot == 'dop': - self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP) + buffer = 10*numpy.log10(dataOut.data_DOP) if plot == 'mean': - self.data[plot][tm] = dataOut.data_MEAN + buffer = dataOut.data_MEAN if plot == 'std': - self.data[plot][tm] = dataOut.data_STD + buffer = dataOut.data_STD if plot == 'coh': - self.data[plot][tm] = dataOut.getCoherence() + buffer = dataOut.getCoherence() if plot == 'phase': - self.data[plot][tm] = dataOut.getCoherence(phase=True) + buffer = dataOut.getCoherence(phase=True) if plot == 'output': - self.data[plot][tm] = dataOut.data_output + buffer = dataOut.data_output if plot == 'param': - self.data[plot][tm] = dataOut.data_param + buffer = dataOut.data_param + + if 'spc' in plot: + self.data[plot] = buffer + else: + if self.buffering: + self.data[plot][tm] = buffer + else: + self.data[plot] = buffer def normalize_heights(self): ''' @@ -220,7 +264,7 @@ class Data(object): tm = self.times[-1] dy = int(self.heights.size/MAXNUMY) + 1 for key in self.data: - if key in ('spc', 'cspc'): + if key in ('spc', 'cspc') or not self.buffering: dx = int(self.data[key].shape[1]/MAXNUMX) + 1 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist()) else: @@ -240,6 +284,10 @@ class Data(object): ret['pairs'] = self.pairs else: ret['pairs'] = [] + + for key, value in self.meta.items(): + ret[key] = value + return json.dumps(ret) @property @@ -492,7 +540,7 @@ class PlotterReceiver(ProcessingUnit, Process): throttle_value = 5 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle', - 'exp_code', 'web_server'] + 'exp_code', 'web_server', 'buffering'] def __init__(self, **kwargs): @@ -513,6 +561,7 @@ class PlotterReceiver(ProcessingUnit, Process): self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')] self.realtime = kwargs.get('realtime', False) self.localtime = kwargs.get('localtime', True) + self.buffering = kwargs.get('buffering', True) self.throttle_value = kwargs.get('throttle', 5) self.exp_code = kwargs.get('exp_code', None) self.sendData = self.initThrottle(self.throttle_value) @@ -521,8 +570,8 @@ class PlotterReceiver(ProcessingUnit, Process): def setup(self): - self.data = Data(self.plottypes, self.throttle_value, self.exp_code) - self.isConfig = True + self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering) + self.isConfig = True def event_monitor(self, monitor): @@ -556,12 +605,12 @@ class PlotterReceiver(ProcessingUnit, Process): return sendDataThrottled def send(self, data): - log.success('Sending {}'.format(data), self.name) + log.log('Sending {}'.format(data), self.name) self.sender.send_pyobj(data) def run(self): - log.success( + log.log( 'Starting from {}'.format(self.address), self.name ) @@ -659,3 +708,157 @@ class PlotterReceiver(ProcessingUnit, Process): coerce = False return + + +class SendToFTP(Operation, Process): + + ''' + Operation to send data over FTP. + ''' + + __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout'] + + def __init__(self, **kwargs): + ''' + patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...] + ''' + Operation.__init__(self, **kwargs) + Process.__init__(self) + self.server = kwargs.get('server') + self.username = kwargs.get('username') + self.password = kwargs.get('password') + self.patterns = kwargs.get('patterns') + self.timeout = kwargs.get('timeout', 30) + self.times = [time.time() for p in self.patterns] + self.latest = ['' for p in self.patterns] + self.mp = False + self.ftp = None + + def setup(self): + + log.log('Connecting to ftp://{}'.format(self.server), self.name) + try: + self.ftp = ftplib.FTP(self.server, timeout=self.timeout) + except ftplib.all_errors: + log.error('Server connection fail: {}'.format(self.server), self.name) + if self.ftp is not None: + self.ftp.close() + self.ftp = None + self.isConfig = False + return + + try: + self.ftp.login(self.username, self.password) + except ftplib.all_errors: + log.error('The given username y/o password are incorrect', self.name) + if self.ftp is not None: + self.ftp.close() + self.ftp = None + self.isConfig = False + return + + log.success('Connection success', self.name) + self.isConfig = True + return + + def check(self): + + try: + self.ftp.voidcmd("NOOP") + except: + log.warning('Connection lost... trying to reconnect', self.name) + if self.ftp is not None: + self.ftp.close() + self.ftp = None + self.setup() + + def find_files(self, path, ext): + + files = glob.glob1(path, '*{}'.format(ext)) + files.sort() + if files: + return files[-1] + return None + + def getftpname(self, filename, exp_code, sub_exp_code): + + thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d') + YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year + DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday + exp_code = '%3.3d'%exp_code + sub_exp_code = '%2.2d'%sub_exp_code + plot_code = '%2.2d'% get_plot_code(filename) + name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png' + return name + + def upload(self, src, dst): + + log.log('Uploading {} '.format(src), self.name, nl=False) + + fp = open(src, 'rb') + command = 'STOR {}'.format(dst) + + try: + self.ftp.storbinary(command, fp, blocksize=1024) + except Exception, e: + log.error('{}'.format(e), self.name) + if self.ftp is not None: + self.ftp.close() + self.ftp = None + return 0 + + try: + self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst)) + except Exception, e: + log.error('{}'.format(e), self.name) + if self.ftp is not None: + self.ftp.close() + self.ftp = None + return 0 + + fp.close() + log.success('OK', tag='') + return 1 + + def send_files(self): + + for x, pattern in enumerate(self.patterns): + local, remote, ext, delay, exp_code, sub_exp_code = pattern + if time.time()-self.times[x] >= delay: + srcname = self.find_files(local, ext) + src = os.path.join(local, srcname) + if os.path.getmtime(src) < time.time() - 30*60: + continue + + if srcname is None or srcname == self.latest[x]: + continue + + if 'png' in ext: + dstname = self.getftpname(srcname, exp_code, sub_exp_code) + else: + dstname = srcname + + dst = os.path.join(remote, dstname) + + if self.upload(src, dst): + self.times[x] = time.time() + self.latest[x] = srcname + else: + self.isConfig = False + break + + def run(self): + + while True: + if not self.isConfig: + self.setup() + if self.ftp is not None: + self.check() + self.send_files() + time.sleep(10) + + def close(): + + if self.ftp is not None: + self.ftp.close() + self.terminate() diff --git a/schainpy/utils/log.py b/schainpy/utils/log.py index 0a5b43d..d06396e 100644 --- a/schainpy/utils/log.py +++ b/schainpy/utils/log.py @@ -18,23 +18,35 @@ SCHAINPY - LOG import click -def warning(message, tag='Warning'): - click.echo(click.style('[{}] {}'.format(tag, message), fg='yellow')) +def warning(message, tag='Warning', nl=True): + if tag: + click.echo(click.style('[{}] {}'.format(tag, message), fg='yellow'), nl=nl) + else: + click.echo(click.style('{}'.format(message), fg='yellow'), nl=nl) pass -def error(message, tag='Error'): - click.echo(click.style('[{}] {}'.format(tag, message), fg='red')) +def error(message, tag='Error', nl=True): + if tag: + click.echo(click.style('[{}] {}'.format(tag, message), fg='red'), nl=nl) + else: + click.echo(click.style('{}'.format(message), fg='red'), nl=nl) pass -def success(message, tag='Info'): - click.echo(click.style('[{}] {}'.format(tag, message), fg='green')) +def success(message, tag='Success', nl=True): + if tag: + click.echo(click.style('[{}] {}'.format(tag, message), fg='green'), nl=nl) + else: + click.echo(click.style('{}'.format(message), fg='green'), nl=nl) pass -def log(message, tag='Info'): - click.echo('[{}] {}'.format(tag, message)) +def log(message, tag='Info', nl=True): + if tag: + click.echo('[{}] {}'.format(tag, message), nl=nl) + else: + click.echo('{}'.format(message), nl=nl) pass