From 23e53bebbd064fb5155013fa4972c42956fbe21d 2024-03-05 15:04:29 From: Roberto Flores Date: 2024-03-05 15:04:29 Subject: [PATCH] updated with changes in v3.0-devel --- diff --git a/schainpy/controller.py b/schainpy/controller.py index e02e4d1..51301d7 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -73,7 +73,9 @@ class ConfBase(): ''' ''' - if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value): + if format is not None: + self.parameters[name] = eval(format)(value) + elif isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value): self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')]) elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value): self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')]) @@ -287,7 +289,7 @@ class ReadUnitConf(ProcUnitConf): self.parameters = {} def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='', - startTime='', endTime='', server=None, **kwargs): + startTime='', endTime='', server=None, topic='', **kwargs): if datatype == None and name == None: raise ValueError('datatype or name should be defined') @@ -315,6 +317,8 @@ class ReadUnitConf(ProcUnitConf): self.addParameter(name='endDate', value=endDate) self.addParameter(name='startTime', value=startTime) self.addParameter(name='endTime', value=endTime) + self.addParameter(name='server', value=server) + self.addParameter(name='topic', value=topic) for key, value in kwargs.items(): self.addParameter(name=key, value=value) @@ -553,6 +557,10 @@ class Project(Process): for key in keys: conf = self.configurations[key] conf.createObjects() + if 'Reader' in str(conf): + reader = conf.object + else: + conf.object.reader = reader if conf.inputId is not None: if isinstance(conf.inputId, list): conf.object.setInput([self.configurations[x].object for x in conf.inputId]) diff --git a/schainpy/model/data/jrodata.py b/schainpy/model/data/jrodata.py index e9ec76c..eb82bf6 100644 --- a/schainpy/model/data/jrodata.py +++ b/schainpy/model/data/jrodata.py @@ -125,6 +125,7 @@ class Beam: class GenericData(object): flagNoData = True + blockReader = False def copy(self, inputObj=None): @@ -198,7 +199,11 @@ class JROData(GenericData): def __str__(self): - return '{} - {}'.format(self.type, self.datatime()) + try: + dt = self.datatime + except: + dt = 'None' + return '{} - {}'.format(self.type, dt) def getNoise(self): @@ -467,8 +472,9 @@ class Spectra(JROData): self.ippFactor = 1 self.beacon_heiIndexList = [] self.noise_estimation = None + self.spc_noise = None self.metadata_list = ['type', 'heightList', 'timeZone', 'pairsList', 'channelList', 'nCohInt', - 'code', 'nCode', 'nBaud', 'ippSeconds', 'ipp','nIncohInt', 'nFFTPoints', 'nProfiles'] + 'code', 'nCode', 'nBaud', 'ippSeconds', 'ipp', 'nIncohInt', 'nFFTPoints', 'nProfiles', 'flagDecodeData'] def getNoisebyHildebrand(self, xmin_index=None, xmax_index=None, ymin_index=None, ymax_index=None): """ @@ -491,7 +497,10 @@ class Spectra(JROData): def getNoise(self, xmin_index=None, xmax_index=None, ymin_index=None, ymax_index=None): - if self.noise_estimation is not None: + if self.spc_noise is not None: + # this was estimated by getNoise Operation defined in jroproc_parameters.py + return self.spc_noise + elif self.noise_estimation is not None: # this was estimated by getNoise Operation defined in jroproc_spectra.py return self.noise_estimation else: @@ -868,6 +877,7 @@ class Parameters(Spectra): nAvg = None noise_estimation = None GauSPC = None # Fit gaussian SPC + spc_noise = None def __init__(self): ''' @@ -877,6 +887,7 @@ class Parameters(Spectra): self.systemHeaderObj = SystemHeader() self.type = "Parameters" self.timeZone = 0 + self.ippFactor = 1 def getTimeRange1(self, interval): diff --git a/schainpy/model/graphics/jroplot_base.py b/schainpy/model/graphics/jroplot_base.py index a491605..9ff0764 100644 --- a/schainpy/model/graphics/jroplot_base.py +++ b/schainpy/model/graphics/jroplot_base.py @@ -304,6 +304,7 @@ class Plot(Operation): ax.firsttime = True ax.index = 0 ax.press = None + ax.cbar = None self.axes.append(ax) if self.showprofile: cax = self.__add_axes(ax, size=size, pad=pad) @@ -414,7 +415,7 @@ class Plot(Operation): self.pf_axes[n].grid(b=True, axis='x') [tick.set_visible(False) for tick in self.pf_axes[n].get_yticklabels()] - if self.colorbar: + if self.colorbar and ax.cbar == None: ax.cbar = plt.colorbar( ax.plt, ax=ax, fraction=0.05, pad=0.02, aspect=10) ax.cbar.ax.tick_params(labelsize=8) diff --git a/schainpy/model/graphics/jroplot_parameters.py b/schainpy/model/graphics/jroplot_parameters.py index df92584..38a9b3f 100644 --- a/schainpy/model/graphics/jroplot_parameters.py +++ b/schainpy/model/graphics/jroplot_parameters.py @@ -354,7 +354,7 @@ class GenericRTIPlot(Plot): else: if self.zlimits is not None: self.zmin, self.zmax = self.zlimits[n] - ax.collections.remove(ax.collections[0]) + ax.plt.remove() ax.plt = ax.pcolormesh(x, y, z[n].T * self.factors[n], vmin=self.zmin, vmax=self.zmax, @@ -430,7 +430,7 @@ class PolarMapPlot(Plot): else: if self.zlimits is not None: self.zmin, self.zmax = self.zlimits[n] - ax.collections.remove(ax.collections[0]) + ax.plt.remove() ax.plt = ax.pcolormesh( # r, theta, numpy.ma.array(data, mask=numpy.isnan(data)), x, y, numpy.ma.array(data, mask=numpy.isnan(data)), vmin=self.zmin, diff --git a/schainpy/model/graphics/jroplot_spectra.py b/schainpy/model/graphics/jroplot_spectra.py index aa4829e..a40fa41 100644 --- a/schainpy/model/graphics/jroplot_spectra.py +++ b/schainpy/model/graphics/jroplot_spectra.py @@ -762,7 +762,7 @@ class RTIPlot(Plot): else: if self.zlimits is not None: self.zmin, self.zmax = self.zlimits[n] - ax.collections.remove(ax.collections[0]) + ax.plt.remove() ax.plt = ax.pcolormesh(x, y, z[n].T, vmin=self.zmin, vmax=self.zmax, @@ -869,7 +869,7 @@ class SpectrogramPlot(Plot): cmap=plt.get_cmap(self.colormap) ) else: - ax.collections.remove(ax.collections[0]) + ax.plt.remove() ax.plt = ax.pcolormesh(x, y, z[n].T, vmin=self.zmin, vmax=self.zmax, diff --git a/schainpy/model/graphics/jroplot_voltage_lags.py b/schainpy/model/graphics/jroplot_voltage_lags.py index 5196da1..32709b5 100644 --- a/schainpy/model/graphics/jroplot_voltage_lags.py +++ b/schainpy/model/graphics/jroplot_voltage_lags.py @@ -90,7 +90,7 @@ class RTIDPPlot(RTIPlot): else: #if self.zlimits is not None: #self.zmin, self.zmax = self.zlimits[n] - ax.collections.remove(ax.collections[0]) + ax.plt.remove() ax.plt = ax.pcolormesh(x, y, z[n].T, vmin=self.zmin, vmax=self.zmax, @@ -178,7 +178,7 @@ class RTILPPlot(RTIPlot): else: if self.zlimits is not None: self.zmin, self.zmax = self.zlimits[n] - ax.collections.remove(ax.collections[0]) + ax.plt.remove() ax.plt = ax.pcolormesh(x, y, z[n].T, vmin=self.zmin, vmax=self.zmax, @@ -191,7 +191,7 @@ class DenRTIPlot(RTIPlot): Written by R. Flores ''' ''' - Plot for Den + RTI Plot for Electron Densities ''' CODE = 'denrti' @@ -250,21 +250,21 @@ class DenRTIPlot(RTIPlot): if numpy.log10(self.zmin)<0: self.zmin=1 ax.plt = ax.pcolormesh(x, y, z[n].T * self.factors[n], - vmin=self.zmin, - vmax=self.zmax, + #vmin=self.zmin, + #vmax=self.zmax, cmap=self.cmaps[n], - norm=colors.LogNorm() + norm=colors.LogNorm(vmin=self.zmin,vmax=self.zmax) ) else: if self.zlimits is not None: self.zmin, self.zmax = self.zlimits[n] - ax.collections.remove(ax.collections[0]) + ax.plt.remove() ax.plt = ax.pcolormesh(x, y, z[n].T * self.factors[n], - vmin=self.zmin, - vmax=self.zmax, + #vmin=self.zmin, + #vmax=self.zmax, cmap=self.cmaps[n], - norm=colors.LogNorm() + norm=colors.LogNorm(vmin=self.zmin,vmax=self.zmax) ) @@ -346,7 +346,7 @@ class ETempRTIPlot(RTIPlot): else: if self.zlimits is not None: self.zmin, self.zmax = self.zlimits[n] - ax.collections.remove(ax.collections[0]) + ax.plt.remove() ax.plt = ax.pcolormesh(x, y, z[n].T * self.factors[n], vmin=self.zmin, vmax=self.zmax, @@ -473,8 +473,8 @@ class TempsDPPlot(Plot): errTi = data['Ti_error'] if ax.firsttime: - ax.errorbar(Te, y, xerr=errTe, fmt='r^',elinewidth=1.0,color='b',linewidth=2.0, label='Te') - ax.errorbar(Ti, y, fmt='k^', xerr=errTi,elinewidth=1.0,color='b',linewidth=2.0, label='Ti') + ax.errorbar(Te, y, xerr=errTe, fmt='r^',elinewidth=1.0,color='r',linewidth=2.0, label='Te') + ax.errorbar(Ti, y, fmt='k^', xerr=errTi,elinewidth=1.0,color='k',linewidth=2.0, label='Ti') plt.legend(loc='lower right') self.ystep_given = 50 ax.yaxis.set_minor_locator(MultipleLocator(15)) @@ -482,8 +482,8 @@ class TempsDPPlot(Plot): else: self.clear_figures() - ax.errorbar(Te, y, xerr=errTe, fmt='r^',elinewidth=1.0,color='b',linewidth=2.0, label='Te') - ax.errorbar(Ti, y, fmt='k^', xerr=errTi,elinewidth=1.0,color='b',linewidth=2.0, label='Ti') + ax.errorbar(Te, y, xerr=errTe, fmt='r^',elinewidth=1.0,color='r',linewidth=2.0, label='Te') + ax.errorbar(Ti, y, fmt='k^', xerr=errTi,elinewidth=1.0,color='k',linewidth=2.0, label='Ti') plt.legend(loc='lower right') ax.yaxis.set_minor_locator(MultipleLocator(15)) @@ -545,8 +545,8 @@ class TempsHPPlot(Plot): if ax.firsttime: - ax.errorbar(Te, self.y, xerr=errTe, fmt='r^',elinewidth=1.0,color='b',linewidth=2.0, label='Te') - ax.errorbar(Ti, self.y, fmt='k^', xerr=errTi,elinewidth=1.0,color='b',linewidth=2.0, label='Ti') + ax.errorbar(Te, self.y, xerr=errTe, fmt='r^',elinewidth=1.0,color='r',linewidth=2.0, label='Te') + ax.errorbar(Ti, self.y, fmt='k^', xerr=errTi,elinewidth=1.0,color='',linewidth=2.0, label='Ti') plt.legend(loc='lower right') self.ystep_given = 200 ax.yaxis.set_minor_locator(MultipleLocator(15)) @@ -554,8 +554,8 @@ class TempsHPPlot(Plot): else: self.clear_figures() - ax.errorbar(Te, self.y, xerr=errTe, fmt='r^',elinewidth=1.0,color='b',linewidth=2.0, label='Te') - ax.errorbar(Ti, self.y, fmt='k^', xerr=errTi,elinewidth=1.0,color='b',linewidth=2.0, label='Ti') + ax.errorbar(Te, self.y, xerr=errTe, fmt='r^',elinewidth=1.0,color='r',linewidth=2.0, label='Te') + ax.errorbar(Ti, self.y, fmt='k^', xerr=errTi,elinewidth=1.0,color='k',linewidth=2.0, label='Ti') plt.legend(loc='lower right') ax.yaxis.set_minor_locator(MultipleLocator(15)) ax.grid(which='minor') @@ -624,8 +624,8 @@ class FracsHPPlot(Plot): if ax.firsttime: - ax.errorbar(ph, self.y[cut:], xerr=eph, fmt='r^',elinewidth=1.0,color='b',linewidth=2.0, label='H+') - ax.errorbar(phe, self.y[cut:], fmt='k^', xerr=ephe,elinewidth=1.0,color='b',linewidth=2.0, label='He+') + ax.errorbar(ph, self.y[cut:], xerr=eph, fmt='r^',elinewidth=1.0,color='r',linewidth=2.0, label='H+') + ax.errorbar(phe, self.y[cut:], fmt='k^', xerr=ephe,elinewidth=1.0,color='k',linewidth=2.0, label='He+') plt.legend(loc='lower right') self.xstep_given = 0.2 self.ystep_given = 200 @@ -634,8 +634,8 @@ class FracsHPPlot(Plot): else: self.clear_figures() - ax.errorbar(ph, self.y[cut:], xerr=eph, fmt='r^',elinewidth=1.0,color='b',linewidth=2.0, label='H+') - ax.errorbar(phe, self.y[cut:], fmt='k^', xerr=ephe,elinewidth=1.0,color='b',linewidth=2.0, label='He+') + ax.errorbar(ph, self.y[cut:], xerr=eph, fmt='r^',elinewidth=1.0,color='r',linewidth=2.0, label='H+') + ax.errorbar(phe, self.y[cut:], fmt='k^', xerr=ephe,elinewidth=1.0,color='k',linewidth=2.0, label='He+') plt.legend(loc='lower right') ax.yaxis.set_minor_locator(MultipleLocator(15)) ax.grid(which='minor') @@ -711,14 +711,14 @@ class EDensityPlot(Plot): #ax.errorbar(DenFar, y[:NSHTS], xerr=1, fmt='h-',elinewidth=1.0,color='g',linewidth=1.0, label='Faraday Profile',markersize=2) ax.errorbar(DenFar, y[:NSHTS], xerr=1, fmt='h-',elinewidth=1.0,color='g',linewidth=1.0, label='Faraday',markersize=2,linestyle='-') #ax.errorbar(DenPow, y[:NSHTS], fmt='k^-', xerr=errDenPow,elinewidth=1.0,color='b',linewidth=1.0, label='Power Profile',markersize=2) - ax.errorbar(DenPow, y[:NSHTS], fmt='k^-', xerr=errDenPow,elinewidth=1.0,color='b',linewidth=1.0, label='Power',markersize=2,linestyle='-') + ax.errorbar(DenPow, y[:NSHTS], fmt='k^-', xerr=errDenPow,elinewidth=1.0,color='k',linewidth=1.0, label='Power',markersize=2,linestyle='-') if self.CODE=='denLP': ax.errorbar(DenPowLP[cut:], y[cut:], xerr=errDenPowLP[cut:], fmt='r^-',elinewidth=1.0,color='r',linewidth=1.0, label='LP Profile',markersize=2) plt.legend(loc='upper left',fontsize=8.5) #plt.legend(loc='lower left',fontsize=8.5) - ax.set_xscale("log", nonposx='clip') + ax.set_xscale("log")#, nonposx='clip') grid_y_ticks=numpy.arange(numpy.nanmin(y),numpy.nanmax(y),50) self.ystep_given=100 if self.CODE=='denLP': @@ -738,13 +738,13 @@ class EDensityPlot(Plot): #ax.errorbar(DenFar, y[:NSHTS], xerr=1, fmt='h-',elinewidth=1.0,color='g',linewidth=1.0, label='Faraday Profile',markersize=2) ax.errorbar(DenFar, y[:NSHTS], xerr=1, fmt='h-',elinewidth=1.0,color='g',linewidth=1.0, label='Faraday',markersize=2,linestyle='-') #ax.errorbar(DenPow, y[:NSHTS], fmt='k^-', xerr=errDenPow,elinewidth=1.0,color='b',linewidth=1.0, label='Power Profile',markersize=2) - ax.errorbar(DenPow, y[:NSHTS], fmt='k^-', xerr=errDenPow,elinewidth=1.0,color='b',linewidth=1.0, label='Power',markersize=2,linestyle='-') + ax.errorbar(DenPow, y[:NSHTS], fmt='k^-', xerr=errDenPow,elinewidth=1.0,color='k',linewidth=1.0, label='Power',markersize=2,linestyle='-') ax.errorbar(DenPowBefore, y[:NSHTS], elinewidth=1.0,color='r',linewidth=0.5,linestyle="dashed") if self.CODE=='denLP': ax.errorbar(DenPowLP[cut:], y[cut:], fmt='r^-', xerr=errDenPowLP[cut:],elinewidth=1.0,color='r',linewidth=1.0, label='LP Profile',markersize=2) - ax.set_xscale("log", nonposx='clip') + ax.set_xscale("log")#, nonposx='clip') grid_y_ticks=numpy.arange(numpy.nanmin(y),numpy.nanmax(y),50) ax.set_yticks(grid_y_ticks,minor=True) locmaj = LogLocator(base=10,numticks=12) @@ -805,11 +805,11 @@ class RelativeDenPlot(Plot): if ax.firsttime: self.autoxticks=False - ax.errorbar(DenPow, y, fmt='k^-', xerr=errDenPow,elinewidth=1.0,color='b',linewidth=1.0, label='Power',markersize=2,linestyle='-') + ax.errorbar(DenPow, y, fmt='k^-', xerr=errDenPow,elinewidth=1.0,color='k',linewidth=1.0, label='Power',markersize=2,linestyle='-') plt.legend(loc='upper left',fontsize=8.5) #plt.legend(loc='lower left',fontsize=8.5) - ax.set_xscale("log", nonposx='clip') + ax.set_xscale("log")#, nonposx='clip') grid_y_ticks=numpy.arange(numpy.nanmin(y),numpy.nanmax(y),50) self.ystep_given=100 ax.set_yticks(grid_y_ticks,minor=True) @@ -824,10 +824,10 @@ class RelativeDenPlot(Plot): dataBefore = self.data[-2] DenPowBefore = dataBefore['den_power'] self.clear_figures() - ax.errorbar(DenPow, y, fmt='k^-', xerr=errDenPow,elinewidth=1.0,color='b',linewidth=1.0, label='Power',markersize=2,linestyle='-') + ax.errorbar(DenPow, y, fmt='k^-', xerr=errDenPow,elinewidth=1.0,color='k',linewidth=1.0, label='Power',markersize=2,linestyle='-') ax.errorbar(DenPowBefore, y, elinewidth=1.0,color='r',linewidth=0.5,linestyle="dashed") - ax.set_xscale("log", nonposx='clip') + ax.set_xscale("log")#, nonposx='clip') grid_y_ticks=numpy.arange(numpy.nanmin(y),numpy.nanmax(y),50) ax.set_yticks(grid_y_ticks,minor=True) locmaj = LogLocator(base=10,numticks=12) diff --git a/schainpy/model/io/bltrIO_param.py b/schainpy/model/io/bltrIO_param.py index 3adb361..38879ee 100644 --- a/schainpy/model/io/bltrIO_param.py +++ b/schainpy/model/io/bltrIO_param.py @@ -87,7 +87,7 @@ DATA_STRUCTURE = numpy.dtype([ class BLTRParamReader(Reader, ProcessingUnit): ''' - Boundary Layer and Tropospheric Radar (BLTR) reader, Wind velocities and SNR + Boundary Layer and Tropospheric Radar (BLTR) reader, Wind velocities and SNR from *.sswma files ''' @@ -108,9 +108,9 @@ class BLTRParamReader(Reader, ProcessingUnit): self.filefmt = "*********%Y%m%d******" def setup(self, **kwargs): - + self.set_kwargs(**kwargs) - + if self.path is None: raise ValueError("The path is not valid") @@ -119,13 +119,13 @@ class BLTRParamReader(Reader, ProcessingUnit): for nTries in range(self.nTries): fullpath = self.searchFilesOnLine(self.path, self.startDate, - self.endDate, self.expLabel, self.ext, self.walk, + self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt) try: fullpath = next(fullpath) except: fullpath = None - + if fullpath: self.fileSize = os.path.getsize(fullpath) self.filename = fullpath @@ -138,17 +138,17 @@ class BLTRParamReader(Reader, ProcessingUnit): log.warning( 'Waiting {} sec for a valid file in {}: try {} ...'.format( - self.delay, self.path, nTries + 1), + self.delay, self.path, nTries + 1), self.name) time.sleep(self.delay) if not(fullpath): raise schainpy.admin.SchainError( - 'There isn\'t any valid file in {}'.format(self.path)) + 'There isn\'t any valid file in {}'.format(self.path)) self.readFirstHeader() else: log.log("Searching files in {}".format(self.path), self.name) - self.filenameList = self.searchFilesOffLine(self.path, self.startDate, + self.filenameList = self.searchFilesOffLine(self.path, self.startDate, self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt) self.setNextFile() @@ -162,8 +162,8 @@ class BLTRParamReader(Reader, ProcessingUnit): if os.path.exists(fullfilename): return fullfilename, filename return None, filename - - + + def readFirstHeader(self): ''' ''' @@ -174,7 +174,7 @@ class BLTRParamReader(Reader, ProcessingUnit): self.nrecords = self.header_file['nrec'][0] self.counter_records = 0 self.flagIsNewFile = 0 - self.fileIndex += 1 + self.fileIndex += 1 def readNextBlock(self): @@ -184,7 +184,13 @@ class BLTRParamReader(Reader, ProcessingUnit): if not self.setNextFile(): return 0 try: - pointer = self.fp.tell() + if self.online and self.counter_records == 0: + pos = int(self.fileSize / (38512)) + self.counter_records = pos*2 - 2 + pointer = 38512 * (pos-1) + 48 + self.fp.seek(pointer) + else: + pointer = self.fp.tell() self.readBlock() except: if self.online and self.waitDataBlock(pointer, 38512) == 1: @@ -255,20 +261,20 @@ class BLTRParamReader(Reader, ProcessingUnit): self.correction = self.header_rec['dmode_rngcorr'][0] self.imode = self.header_rec['dmode_index'][0] self.antenna = self.header_rec['antenna_coord'] - self.rx_gains = self.header_rec['rx_gains'] - self.time = self.header_rec['time'][0] + self.rx_gains = self.header_rec['rx_gains'] + self.time = self.header_rec['time'][0] dt = datetime.datetime.utcfromtimestamp(self.time) if dt.date()>self.datatime.date(): self.flagDiscontinuousBlock = 1 self.datatime = dt - + def readData(self): ''' - Reading and filtering data block record of BLTR rawdata file, + Reading and filtering data block record of BLTR rawdata file, filtering is according to status_value. Input: - status_value - Array data is set to NAN for values that are not + status_value - Array data is set to NAN for values that are not equal to status_value ''' @@ -316,7 +322,7 @@ class BLTRParamReader(Reader, ProcessingUnit): self.dataOut.lat = self.lat self.dataOut.lon = self.lon self.dataOut.channelList = list(range(self.nchannels)) - self.dataOut.kchan = self.kchan + self.dataOut.kchan = self.kchan self.dataOut.delta = self.delta self.dataOut.correction = self.correction self.dataOut.nmodes = self.nmodes @@ -341,7 +347,7 @@ class BLTRParamReader(Reader, ProcessingUnit): self.set_output() return 1 - + def run(self, **kwargs): ''' ''' @@ -352,4 +358,4 @@ class BLTRParamReader(Reader, ProcessingUnit): self.getData() - return \ No newline at end of file + return diff --git a/schainpy/model/io/jroIO_base.py b/schainpy/model/io/jroIO_base.py index 5ba2b48..bb9c1c3 100644 --- a/schainpy/model/io/jroIO_base.py +++ b/schainpy/model/io/jroIO_base.py @@ -475,6 +475,7 @@ class Reader(object): warnings = True verbose = True server = None + topic = None format = None oneDDict = None twoDDict = None @@ -781,6 +782,7 @@ class JRODataReader(Reader): firstHeaderSize = 0 basicHeaderSize = 24 __isFirstTimeOnline = 1 + topic = '' filefmt = "*%Y%j***" folderfmt = "*%Y%j" __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'online', 'delay', 'walk'] @@ -1151,13 +1153,14 @@ class JRODataReader(Reader): if self.server is not None: if 'tcp://' in self.server: - address = server + address = self.server else: address = 'ipc:///tmp/%s' % self.server self.server = address self.context = zmq.Context() - self.receiver = self.context.socket(zmq.PULL) + self.receiver = self.context.socket(zmq.SUB) self.receiver.connect(self.server) + self.receiver.setsockopt(zmq.SUBSCRIBE, str.encode(str(self.topic))) time.sleep(0.5) print('[Starting] ReceiverData from {}'.format(self.server)) else: @@ -1286,7 +1289,11 @@ class JRODataReader(Reader): if self.server is None: self.getData() else: - self.getFromServer() + try: + self.getFromServer() + except Exception as e: + log.warning('Invalid block...') + self.dataOut.flagNoData = True class JRODataWriter(Reader): @@ -1470,9 +1477,6 @@ class JRODataWriter(Reader): if self.fp != None: self.fp.close() - if not os.path.exists(path): - os.mkdir(path) - timeTuple = time.localtime(self.dataOut.utctime) subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year, timeTuple.tm_yday) @@ -1480,7 +1484,7 @@ class JRODataWriter(Reader): setFile = self.setFile if not(os.path.exists(fullpath)): - os.mkdir(fullpath) + os.makedirs(fullpath) setFile = -1 # inicializo mi contador de seteo else: filesList = os.listdir(fullpath) diff --git a/schainpy/model/io/jroIO_madrigal.py b/schainpy/model/io/jroIO_madrigal.py index 58d82ab..27b016c 100644 --- a/schainpy/model/io/jroIO_madrigal.py +++ b/schainpy/model/io/jroIO_madrigal.py @@ -48,6 +48,7 @@ DEF_HEADER = { MNEMONICS = { 10: 'jro', + 12: 'jmp', 11: 'jbr', 14: 'jmp', #Added by R. Flores 840: 'jul', diff --git a/schainpy/model/io/jroIO_param.py b/schainpy/model/io/jroIO_param.py index 1b55843..e8ad20e 100644 --- a/schainpy/model/io/jroIO_param.py +++ b/schainpy/model/io/jroIO_param.py @@ -184,6 +184,13 @@ class HDFReader(Reader, ProcessingUnit): self.blockList = ind self.blocksPerFile = len(ind) + # similar to master + if len(ind)==0: + print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.blockIndex, + self.blocksPerFile, + thisDatetime)) + self.setNextFile() + # similar to master return def __readMetadata(self): @@ -360,14 +367,26 @@ class HDFWriter(Operation): Operation.__init__(self) return - def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None, uniqueChannel=False): + def set_kwargs(self, **kwargs): + + for key, value in kwargs.items(): + setattr(self, key, value) + + def set_kwargs_obj(self, obj, **kwargs): + + for key, value in kwargs.items(): + setattr(obj, key, value) + + def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None, **kwargs): self.path = path self.blocksPerFile = blocksPerFile self.metadataList = metadataList self.dataList = [s.strip() for s in dataList] self.setType = setType self.description = description - self.uniqueChannel = uniqueChannel + self.set_kwargs(**kwargs) + #print("self.uniqueChannel: ", self.uniqueChannel) + #self.uniqueChannel = uniqueChannel if self.metadataList is None: self.metadataList = self.dataOut.metadata_list @@ -389,7 +408,7 @@ class HDFWriter(Operation): elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)): dsDict['nDim'] = 0 else: - if uniqueChannel: #Creates extra dimension to avoid the creation of multiple channels + if self.uniqueChannel: #Creates extra dimension to avoid the creation of multiple channels dataAux = numpy.expand_dims(dataAux, axis=0) #setattr(self.dataOut, self.dataList[i], numpy.expand_dims(getattr(self.dataOut, self.dataList[i]), axis=0)) #dataAux = getattr(self.dataOut, self.dataList[i]) @@ -428,13 +447,14 @@ class HDFWriter(Operation): return False def run(self, dataOut, path, blocksPerFile=10, metadataList=None, - dataList=[], setType=None, description={}, uniqueChannel= False): + dataList=[], setType=None, description={}, **kwargs): self.dataOut = dataOut + self.set_kwargs_obj(self.dataOut, **kwargs) if not(self.isConfig): self.setup(path=path, blocksPerFile=blocksPerFile, metadataList=metadataList, dataList=dataList, - setType=setType, description=description, uniqueChannel=uniqueChannel) + setType=setType, description=description, **kwargs) self.isConfig = True self.setNextFile() @@ -515,15 +535,17 @@ class HDFWriter(Operation): return key return name else: - if 'Metadata' in self.description: - meta = self.description['Metadata'] + if 'Data' in self.description: + data = self.description['Data'] + if 'Metadata' in self.description: + data.update(self.description['Metadata']) else: - meta = self.description - if name in meta: - if isinstance(meta[name], list): - return meta[name][x] - elif isinstance(meta[name], dict): - for key, value in meta[name].items(): + data = self.description + if name in data: + if isinstance(data[name], list): + return data[name][x] + elif isinstance(data[name], dict): + for key, value in data[name].items(): return value[x] if 'cspc' in name: return 'pair{:02d}'.format(x) diff --git a/schainpy/model/io/jroIO_voltage.py b/schainpy/model/io/jroIO_voltage.py index 4dcff1c..662f678 100644 --- a/schainpy/model/io/jroIO_voltage.py +++ b/schainpy/model/io/jroIO_voltage.py @@ -296,7 +296,7 @@ class VoltageReader(JRODataReader, ProcessingUnit): self.nReadBlocks += 1 self.blockPointer = 0 - block = self.receiver.recv() + topic, block = self.receiver.recv_multipart() self.basicHeaderObj.read(block[self.blockPointer:]) self.blockPointer += self.basicHeaderObj.length @@ -309,7 +309,11 @@ class VoltageReader(JRODataReader, ProcessingUnit): self.readFirstHeaderFromServer() timestamp = self.basicHeaderObj.get_datatime() - print('[Reading] - Block {} - {}'.format(self.nTotalBlocks, timestamp)) + print('[Receiving] - Block {} - {} from {}'.format(self.nTotalBlocks, timestamp, topic.decode())) + if self.nTotalBlocks == self.processingHeaderObj.dataBlocksPerFile: + self.nTotalBlocks = 0 + self.nReadBlocks = 0 + print('Receiving the next stream...') current_pointer_location = self.blockPointer junk = numpy.fromstring( block[self.blockPointer:], self.dtype, self.blocksize) diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index d374f1f..15d0ee0 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -14,8 +14,6 @@ from threading import Thread from multiprocessing import Process, Queue from schainpy.utils import log -import copy - QUEUE_SIZE = int(os.environ.get('QUEUE_MAX_SIZE', '10')) class ProcessingUnit(object): @@ -24,6 +22,7 @@ class ProcessingUnit(object): ''' proc_type = 'processing' + bypass = False def __init__(self): @@ -81,8 +80,9 @@ class ProcessingUnit(object): else: return self.dataIn.isReady() elif self.dataIn is None or not self.dataIn.error: - #print([getattr(self, at) for at in self.inputs]) - #print("Elif 1") + if 'Reader' in self.name and self.bypass: + print('Skipping...reader') + return self.dataOut.isReady() self.run(**kwargs) elif self.dataIn.error: #print("Elif 2") diff --git a/schainpy/model/proc/jroproc_spectra.py b/schainpy/model/proc/jroproc_spectra.py index bd01a88..2d4314b 100644 --- a/schainpy/model/proc/jroproc_spectra.py +++ b/schainpy/model/proc/jroproc_spectra.py @@ -186,6 +186,18 @@ class SpectraProc(ProcessingUnit): self.profIndex += nVoltProfiles self.id_min += nVoltProfiles self.id_max += nVoltProfiles + elif nVoltProfiles > nProfiles: + self.reader.bypass = True + if self.profIndex == 0: + self.id_min = 0 + self.id_max = nProfiles + + self.buffer = self.dataIn.data[:, self.id_min:self.id_max,:] + self.profIndex += nProfiles + self.id_min += nProfiles + self.id_max += nProfiles + if self.id_max == nVoltProfiles: + self.reader.bypass = False else: raise ValueError("The type object %s has %d profiles, it should just has %d profiles" % ( self.dataIn.type, self.dataIn.data.shape[1], nProfiles)) @@ -197,7 +209,7 @@ class SpectraProc(ProcessingUnit): if self.firstdatatime == None: self.firstdatatime = self.dataIn.utctime - if self.profIndex == nProfiles: + if self.profIndex % nProfiles == 0: self.__updateSpecFromVoltage() if pairsList == None: self.dataOut.pairsList = [pair for pair in itertools.combinations(self.dataOut.channelList, 2)] @@ -206,7 +218,8 @@ class SpectraProc(ProcessingUnit): self.__getFft() self.dataOut.flagNoData = False self.firstdatatime = None - self.profIndex = 0 + if not self.reader.bypass: + self.profIndex = 0 else: raise ValueError("The type of input object '%s' is not valid".format( self.dataIn.type)) diff --git a/schainpy/model/proc/jroproc_spectra_lags_faraday.py b/schainpy/model/proc/jroproc_spectra_lags_faraday.py index d1dfcfa..eefc035 100644 --- a/schainpy/model/proc/jroproc_spectra_lags_faraday.py +++ b/schainpy/model/proc/jroproc_spectra_lags_faraday.py @@ -3815,7 +3815,7 @@ class IncohInt(Operation): dataOut.flagNoData = False dataOut.VelRange = dataOut.getVelRange(0) - dataOut.FreqRange = dataOut.getFreqRange(0)/1000. + dataOut.FreqRange = dataOut.getFreqRange(0)/1000. #kHz #print("VelRange: ", dataOut.VelRange) #exit(1) diff --git a/schainpy/model/proc/jroproc_voltage.py b/schainpy/model/proc/jroproc_voltage.py index 2524845..7b64905 100644 --- a/schainpy/model/proc/jroproc_voltage.py +++ b/schainpy/model/proc/jroproc_voltage.py @@ -324,6 +324,18 @@ class filterByHeights(Operation): return dataOut +class setOffset(Operation): + + def run(self, dataOut, offset=None): + + if not offset: + offset = 0.0 + + newHeiRange = dataOut.heightList - offset + + dataOut.heightList = newHeiRange + + return dataOut class setH0(Operation): @@ -9944,9 +9956,11 @@ class SSheightProfiles(Operation): profileIndex = None #print(dataOut.getFreqRange(1)/1000.) #exit(1) + ''' if dataOut.flagDataAsBlock: dataOut.data = numpy.average(dataOut.data,axis=1) #print("jee") + ''' dataOut.flagDataAsBlock = False if not self.isConfig: self.setup(dataOut, step=step , nsamples=nsamples)