From df48418de9d9eae6dcf628d26c348fc64315a385 2019-07-31 20:05:20 From: Juan C. Espinoza Date: 2019-07-31 20:05:20 Subject: [PATCH] Merge branch 'v3.0-devel' of http://jro-dev.igp.gob.pe/rhodecode/schain into v3.0-devel --- diff --git a/schainpy/model/data/jrodata.py b/schainpy/model/data/jrodata.py index 6ca9ad3..5a3bb3e 100644 --- a/schainpy/model/data/jrodata.py +++ b/schainpy/model/data/jrodata.py @@ -1103,8 +1103,9 @@ class PlotterData(object): MAXNUMX = 100 MAXNUMY = 100 - def __init__(self, code, throttle_value, exp_code, buffering=True): + def __init__(self, code, throttle_value, exp_code, buffering=True, snr=False): + self.key = code self.throttle = throttle_value self.exp_code = exp_code self.buffering = buffering @@ -1123,7 +1124,10 @@ class PlotterData(object): self.plottypes = ['noise', 'rti'] else: self.plottypes = [code] - + + if 'snr' not in self.plottypes and snr: + self.plottypes.append('snr') + for plot in self.plottypes: self.data[plot] = {} @@ -1199,10 +1203,8 @@ class PlotterData(object): self.tm = tm self.type = dataOut.type self.parameters = getattr(dataOut, 'parameters', []) - if hasattr(dataOut, 'pairsList'): - self.pairs = dataOut.pairsList if hasattr(dataOut, 'meta'): - self.meta = dataOut.meta + self.meta.update(dataOut.meta) self.channels = dataOut.channelList self.interval = dataOut.getTimeInterval() self.localtime = dataOut.useLocalTime @@ -1285,40 +1287,40 @@ class PlotterData(object): self.__heights = [H for tm in self.__times] - def jsonify(self, decimate=False): + def jsonify(self, plot_name, plot_type, decimate=False): ''' Convert data to json ''' - data = {} tm = self.times[-1] dy = int(self.heights.size/self.MAXNUMY) + 1 - for key in self.data: - if key in ('spc', 'cspc') or not self.buffering: - dx = int(self.data[key].shape[1]/self.MAXNUMX) + 1 - data[key] = self.roundFloats( - self.data[key][::, ::dx, ::dy].tolist()) - else: - data[key] = self.roundFloats(self.data[key][tm].tolist()) - - ret = {'data': data} - ret['exp_code'] = self.exp_code - ret['time'] = float(tm) - ret['interval'] = float(self.interval) - ret['localtime'] = self.localtime - ret['yrange'] = self.roundFloats(self.heights[::dy].tolist()) - if 'spc' in self.data or 'cspc' in self.data: - ret['xrange'] = self.roundFloats(self.xrange[2][::dx].tolist()) + if self.key in ('spc', 'cspc') or not self.buffering: + dx = int(self.data[self.key].shape[1]/self.MAXNUMX) + 1 + data = self.roundFloats( + self.data[self.key][::, ::dx, ::dy].tolist()) else: - ret['xrange'] = [] - if hasattr(self, 'pairs'): - ret['pairs'] = [(int(p[0]), int(p[1])) for p in self.pairs] + data = self.roundFloats(self.data[self.key][tm].tolist()) + if self.key is 'noise': + data = [[x] for x in data] + + meta = {} + ret = { + 'plot': plot_name, + 'code': self.exp_code, + 'time': float(tm), + 'data': data, + } + meta['type'] = plot_type + meta['interval'] = float(self.interval) + meta['localtime'] = self.localtime + meta['yrange'] = self.roundFloats(self.heights[::dy].tolist()) + if 'spc' in self.data or 'cspc' in self.data: + meta['xrange'] = self.roundFloats(self.xrange[2][::dx].tolist()) else: - ret['pairs'] = [] - - for key, value in list(self.meta.items()): - ret[key] = value + meta['xrange'] = [] + meta.update(self.meta) + ret['metadata'] = meta return json.dumps(ret) @property diff --git a/schainpy/model/graphics/jroplot_base.py b/schainpy/model/graphics/jroplot_base.py index 54d09d3..ec57ba2 100644 --- a/schainpy/model/graphics/jroplot_base.py +++ b/schainpy/model/graphics/jroplot_base.py @@ -162,6 +162,7 @@ class Plot(Operation): self.isPlotConfig = False self.save_counter = 1 self.sender_counter = 1 + self.data = None def __fmtTime(self, x, pos): ''' @@ -226,7 +227,7 @@ class Plot(Operation): self.sender_period = kwargs.get('sender_period', 2) self.__throttle_plot = apply_throttle(self.throttle) self.data = PlotterData( - self.CODE, self.throttle, self.exp_code, self.buffering) + self.CODE, self.throttle, self.exp_code, self.buffering, snr=self.showSNR) if self.plot_server: if not self.plot_server.startswith('tcp://'): @@ -235,6 +236,8 @@ class Plot(Operation): 'Sending to server: {}'.format(self.plot_server), self.name ) + if 'plot_name' in kwargs: + self.plot_name = kwargs['plot_name'] def __setup_plot(self): ''' @@ -243,11 +246,7 @@ class Plot(Operation): self.setup() - self.time_label = 'LT' if self.localtime else 'UTC' - if self.data.localtime: - self.getDateTime = datetime.datetime.fromtimestamp - else: - self.getDateTime = datetime.datetime.utcfromtimestamp + self.time_label = 'LT' if self.localtime else 'UTC' if self.width is None: self.width = 8 @@ -306,15 +305,13 @@ class Plot(Operation): cmap = plt.get_cmap(self.colormap) cmap.set_bad(self.bgcolor, 1.) self.cmaps.append(cmap) - + for fig in self.figures: fig.canvas.mpl_connect('key_press_event', self.OnKeyPress) fig.canvas.mpl_connect('scroll_event', self.OnBtnScroll) fig.canvas.mpl_connect('button_press_event', self.onBtnPress) fig.canvas.mpl_connect('motion_notify_event', self.onMotion) fig.canvas.mpl_connect('button_release_event', self.onBtnRelease) - if self.show: - fig.show() def OnKeyPress(self, event): ''' @@ -463,7 +460,6 @@ class Plot(Operation): datetime.datetime(1970, 1, 1)).total_seconds() if self.data.localtime: xmin += time.timezone - self.tmin = xmin else: xmin = self.xmin @@ -563,7 +559,7 @@ class Plot(Operation): ax.set_title('{} {} {}'.format( self.titles[n], self.getDateTime(self.data.max_time).strftime( - '%H:%M:%S'), + '%Y-%m-%d %H:%M:%S'), self.time_label), size=8) else: @@ -608,6 +604,9 @@ class Plot(Operation): fig.canvas.manager.set_window_title('{} - {}'.format(self.title, self.getDateTime(self.data.max_time).strftime('%Y/%m/%d'))) fig.canvas.draw() + if self.show: + fig.show() + figpause(0.1) if self.save: self.save_figure(n) @@ -675,10 +674,10 @@ class Plot(Operation): self.sender_counter += 1 self.sender_counter = 1 - + self.data.meta['titles'] = self.titles retries = 2 while True: - self.socket.send_string(self.data.jsonify()) + self.socket.send_string(self.data.jsonify(self.plot_name, self.plot_type)) socks = dict(self.poll.poll(5000)) if socks.get(self.socket) == zmq.POLLIN: reply = self.socket.recv_string() @@ -730,14 +729,35 @@ class Plot(Operation): raise NotImplementedError def run(self, dataOut, **kwargs): - - if dataOut.error: - coerce = True - else: - coerce = False + ''' + Main plotting routine + ''' if self.isConfig is False: self.__setup(**kwargs) + if dataOut.type == 'Parameters': + t = dataOut.utctimeInit + else: + t = dataOut.utctime + + if dataOut.useLocalTime: + self.getDateTime = datetime.datetime.fromtimestamp + if not self.localtime: + t += time.timezone + else: + self.getDateTime = datetime.datetime.utcfromtimestamp + if self.localtime: + t -= time.timezone + + if self.xmin is None: + self.tmin = t + else: + self.tmin = ( + self.getDateTime(t).replace( + hour=self.xmin, + minute=0, + second=0) - self.getDateTime(0)).total_seconds() + self.data.setup() self.isConfig = True if self.plot_server: @@ -751,16 +771,19 @@ class Plot(Operation): tm = dataOut.utctimeInit else: tm = dataOut.utctime - - if dataOut.useLocalTime: - if not self.localtime: - tm += time.timezone - else: - if self.localtime: - tm -= time.timezone - if self.xaxis is 'time' and self.data and (tm - self.tmin) >= self.xrange*60*60: + if not dataOut.useLocalTime and self.localtime: + tm -= time.timezone + if dataOut.useLocalTime and not self.localtime: + tm += time.timezone + + if self.xaxis is 'time' and self.data and (tm - self.tmin) >= self.xrange*60*60: + self.save_counter = self.save_period self.__plot() + self.xmin += self.xrange + if self.xmin >= 24: + self.xmin -= 24 + self.tmin += self.xrange*60*60 self.data.setup() self.clear_figures() @@ -773,12 +796,13 @@ class Plot(Operation): if self.realtime: self.__plot() else: - self.__throttle_plot(self.__plot, coerce=coerce) - - figpause(0.001) + self.__throttle_plot(self.__plot)#, coerce=coerce) def close(self): + if self.data: + self.save_counter = self.save_period + self.__plot() if self.data and self.pause: figpause(10) diff --git a/schainpy/model/graphics/jroplot_data.py b/schainpy/model/graphics/jroplot_data.py index f2ac422..688db09 100644 --- a/schainpy/model/graphics/jroplot_data.py +++ b/schainpy/model/graphics/jroplot_data.py @@ -42,6 +42,8 @@ class SpectraPlot(Plot): CODE = 'spc' colormap = 'jro' + plot_name = 'Spectra' + plot_type = 'pcolor' def setup(self): self.nplots = len(self.data.channels) @@ -112,6 +114,8 @@ class CrossSpectraPlot(Plot): CODE = 'cspc' colormap = 'jet' + plot_name = 'CrossSpectra' + plot_type = 'pcolor' zmin_coh = None zmax_coh = None zmin_phase = None @@ -211,6 +215,8 @@ class SpectralMomentsPlot(SpectraPlot): ''' CODE = 'spc_moments' colormap = 'jro' + plot_name = 'SpectralMoments' + plot_type = 'pcolor' class RTIPlot(Plot): @@ -220,6 +226,8 @@ class RTIPlot(Plot): CODE = 'rti' colormap = 'jro' + plot_name = 'RTI' + plot_type = 'pcolorbuffer' def setup(self): self.xaxis = 'time' @@ -275,6 +283,7 @@ class CoherencePlot(RTIPlot): ''' CODE = 'coh' + plot_name = 'Coherence' def setup(self): self.xaxis = 'time' @@ -299,6 +308,7 @@ class PhasePlot(CoherencePlot): CODE = 'phase' colormap = 'seismic' + plot_name = 'Phase' class NoisePlot(Plot): @@ -307,6 +317,9 @@ class NoisePlot(Plot): ''' CODE = 'noise' + plot_name = 'Noise' + plot_type = 'scatterbuffer' + def setup(self): self.xaxis = 'time' @@ -345,6 +358,7 @@ class SnrPlot(RTIPlot): CODE = 'snr' colormap = 'jet' + plot_name = 'SNR' class DopplerPlot(RTIPlot): @@ -354,6 +368,7 @@ class DopplerPlot(RTIPlot): CODE = 'dop' colormap = 'jet' + plot_name = 'Doppler' class SkyMapPlot(Plot): @@ -411,6 +426,7 @@ class ParametersPlot(RTIPlot): CODE = 'param' colormap = 'seismic' + plot_name = 'Parameters' def setup(self): self.xaxis = 'time' @@ -480,6 +496,7 @@ class OutputPlot(ParametersPlot): CODE = 'output' colormap = 'seismic' + plot_name = 'Output' class PolarMapPlot(Plot): @@ -621,6 +638,8 @@ class ScopePlot(Plot): ''' CODE = 'scope' + plot_name = 'Scope' + plot_type = 'scatter' def setup(self): @@ -720,11 +739,6 @@ class ScopePlot(Plot): thisDatetime, wintitle1 ) - - - - - else: wintitle = " [Profile = %d] " %self.data.profileIndex @@ -743,6 +757,3 @@ class ScopePlot(Plot): thisDatetime, wintitle ) - - - \ No newline at end of file diff --git a/schainpy/model/io/bltrIO_param.py b/schainpy/model/io/bltrIO_param.py index 8043296..cd839ce 100644 --- a/schainpy/model/io/bltrIO_param.py +++ b/schainpy/model/io/bltrIO_param.py @@ -86,7 +86,8 @@ DATA_STRUCTURE = numpy.dtype([ @MPDecorator class BLTRParamReader(JRODataReader, ProcessingUnit): ''' - Boundary Layer and Tropospheric Radar (BLTR) reader, Wind velocities and SNR from *.sswma files + Boundary Layer and Tropospheric Radar (BLTR) reader, Wind velocities and SNR + from *.sswma files ''' ext = '.sswma' @@ -118,6 +119,9 @@ class BLTRParamReader(JRODataReader, ProcessingUnit): self.endTime = endTime self.status_value = status_value self.datatime = datetime.datetime(1900,1,1) + self.delay = kwargs.get('delay', 10) + self.online = kwargs.get('online', False) + self.nTries = kwargs.get('nTries', 3) if self.path is None: raise ValueError("The path is not valid") @@ -125,7 +129,7 @@ class BLTRParamReader(JRODataReader, ProcessingUnit): if ext is None: ext = self.ext - self.search_files(self.path, startDate, endDate, ext) + self.fileList = self.search_files(self.path, startDate, endDate, ext) self.timezone = timezone self.fileIndex = 0 @@ -135,6 +139,29 @@ class BLTRParamReader(JRODataReader, ProcessingUnit): self.setNextFile() + def search_last_file(self): + ''' + Get last file and add it to the list + ''' + + for n in range(self.nTries+1): + if n>0: + log.warning( + "Waiting %0.2f seconds for the next file, try %03d ..." % (self.delay, n+1), + self.name + ) + time.sleep(self.delay) + file_list = os.listdir(self.path) + file_list.sort() + if file_list: + if self.filename: + if file_list[-1] not in self.filename: + return file_list[-1] + else: + continue + return file_list[-1] + return 0 + def search_files(self, path, startDate, endDate, ext): ''' Searching for BLTR rawdata file in path @@ -152,9 +179,6 @@ class BLTRParamReader(JRODataReader, ProcessingUnit): fileList0 = glob.glob1(path, "*%s" % ext) fileList0.sort() - self.fileList = [] - self.dateFileList = [] - for thisFile in fileList0: year = thisFile[-14:-10] if not isNumber(year): @@ -174,28 +198,32 @@ class BLTRParamReader(JRODataReader, ProcessingUnit): if (startDate > dateFile) or (endDate < dateFile): continue - self.fileList.append(thisFile) - self.dateFileList.append(dateFile) + yield thisFile return def setNextFile(self): - file_id = self.fileIndex - - if file_id == len(self.fileList): - self.flagNoMoreFiles = 1 - return 0 - - log.success('Opening {}'.format(self.fileList[file_id]), 'BLTRParamReader') - filename = os.path.join(self.path, self.fileList[file_id]) + if self.online: + filename = self.search_last_file() + if not filename: + self.flagNoMoreFiles = 1 + return 0 + else: + try: + filename = next(self.fileList) + except StopIteration: + self.flagNoMoreFiles = 1 + return 0 + + log.success('Opening {}'.format(filename), 'BLTRParamReader') dirname, name = os.path.split(filename) # 'peru2' ---> Piura - 'peru1' ---> Huancayo or Porcuya - self.siteFile = name.split('.')[0] + self.siteFile = filename.split('.')[0] if self.filename is not None: self.fp.close() - self.filename = filename + self.filename = os.path.join(self.path, filename) self.fp = open(self.filename, 'rb') self.header_file = numpy.fromfile(self.fp, FILE_HEADER_STRUCTURE, 1) self.nrecords = self.header_file['nrec'][0] @@ -203,18 +231,27 @@ class BLTRParamReader(JRODataReader, ProcessingUnit): self.counter_records = 0 self.flagIsNewFile = 0 self.fileIndex += 1 + time.sleep(2) return 1 def readNextBlock(self): while True: - if self.counter_records == self.nrecords: + if not self.online and self.counter_records == self.nrecords: self.flagIsNewFile = 1 if not self.setNextFile(): return 0 - self.readBlock() + try: + pointer = self.fp.tell() + self.readBlock() + except: + if self.online and self.waitDataBlock(pointer, 38512) == 1: + continue + else: + if not self.setNextFile(): + return 0 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \ (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)): @@ -227,9 +264,9 @@ class BLTRParamReader(JRODataReader, ProcessingUnit): continue break - log.log('Reading Record No. {}/{} -> {}'.format( + log.log('Reading Record No. {} -> {}'.format( self.counter_records, - self.nrecords, + # self.nrecords, self.datatime.ctime()), 'BLTRParamReader') return 1 @@ -288,10 +325,12 @@ class BLTRParamReader(JRODataReader, ProcessingUnit): def readData(self): ''' - Reading and filtering data block record of BLTR rawdata file, filtering is according to status_value. + Reading and filtering data block record of BLTR rawdata file, + filtering is according to status_value. Input: - status_value - Array data is set to NAN for values that are not equal to status_value + status_value - Array data is set to NAN for values that are not + equal to status_value ''' self.nchannels = int(self.nchannels) @@ -357,10 +396,11 @@ class BLTRParamReader(JRODataReader, ProcessingUnit): if self.flagNoMoreFiles: self.dataOut.flagNoData = True self.dataOut.error = 'No More files to read' + return if not self.readNextBlock(): self.dataOut.flagNoData = True - return 0 + self.dataOut.error = 'Time for wait new file reach!!!' self.set_output() diff --git a/schainpy/model/io/jroIO_base.py b/schainpy/model/io/jroIO_base.py index b213a09..1df7a64 100644 --- a/schainpy/model/io/jroIO_base.py +++ b/schainpy/model/io/jroIO_base.py @@ -946,11 +946,13 @@ class JRODataReader(JRODataIO): return 0 - def waitDataBlock(self, pointer_location): + def waitDataBlock(self, pointer_location, blocksize=None): currentPointer = pointer_location - - neededSize = self.processingHeaderObj.blockSize # + self.basicHeaderSize + if blocksize is None: + neededSize = self.processingHeaderObj.blockSize # + self.basicHeaderSize + else: + neededSize = blocksize for nTries in range(self.nTries): self.fp.close() @@ -963,7 +965,10 @@ class JRODataReader(JRODataIO): if (currentSize >= neededSize): return 1 - print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1)) + log.warning( + "Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1), + self.name + ) sleep(self.delay) return 0 diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index eed9f4b..902fde2 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -189,6 +189,7 @@ def MPDecorator(BaseClass): self.kwargs = kwargs self.sender = None self.receiver = None + self.i = 0 self.name = BaseClass.__name__ if 'plot' in self.name.lower() and not self.name.endswith('_'): self.name = '{}{}'.format(self.CODE.upper(), 'Plot') @@ -204,6 +205,9 @@ def MPDecorator(BaseClass): self.inputId = args[0] self.project_id = args[1] self.typeProc = "Operation" + + def fix_publish(self,valor,multiple1): + return True if valor%multiple1 ==0 else False def subscribe(self): ''' @@ -221,8 +225,11 @@ def MPDecorator(BaseClass): ''' This function waits for objects and deserialize using pickle ''' - - data = pickle.loads(self.receiver.recv_multipart()[1]) + try: + data = pickle.loads(self.receiver.recv_multipart()[1]) + except zmq.ZMQError as e: + if e.errno == zmq.ETERM: + print (e.errno) return data @@ -240,7 +247,14 @@ def MPDecorator(BaseClass): def publish(self, data, id): ''' This function publish an object, to a specific topic. + The fix method only affect inputId None which is Read Unit + Use value between 64 80, you should notice a little retard in processing ''' + if self.inputId is None: + self.i+=1 + if self.fix_publish(self.i,80) == True:# value n + time.sleep(0.01) + self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) def runReader(self):