diff --git a/schainpy/model/io/exampleForMultiprocessing_AMISR_merge.txt b/schainpy/model/io/exampleForMultiprocessing_AMISR_merge.txt new file mode 100644 index 0000000..2f97b30 --- /dev/null +++ b/schainpy/model/io/exampleForMultiprocessing_AMISR_merge.txt @@ -0,0 +1,222 @@ +import os, sys, json +import time +import datetime +from multiprocessing import Process +from shutil import rmtree +from schainpy.controller import Project +from multiprocessing.connection import wait +from schainpy.model.io.utilsIO import MergeH5 +########################################################################### +########################################################################### +dty = datetime.date.today() +str1 = dty + datetime.timedelta(days=1) +str2 = dty - datetime.timedelta(days=1) +today = dty.strftime("%Y/%m/%d") +tomorrow = str1.strftime("%Y/%m/%d") +yesterday = str2.strftime("%Y/%m/%d") +########################################################################### +########################################################################### + + +path= '/media/soporte/Elements/DATA_AMISR/2022' +#path= '/media/soporte/Elements/DATA_AMISR/2022/' +outPath = '/home/soporte/Data/data2Fabiano/POWER_H5_AMISR/2022/' + +########################################################################### +########################################################################### +xmin = 7 +xmax = 18 +localtime=1 + +startDate=today +endDate=tomorrow + +startDate='2022/09/24' #apunte perp y oblicuo +endDate='2022/09/25' + +startTime = '18:00:00' +endTime = '07:00:00' +############################################################################### +############################################################################### +nChannels = 10 +IPPms =10 +code = [1,-1,-1,-1,1,1,1,1,-1,-1,-1,1,-1,-1,-1,1,-1,-1,-1,1,-1,-1,1,-1,1,1,-1,1] +nCode = 1 +nBaud = 28 +nOsamp = 2 # oversample +dataList = ['data_pow','utctime'] +############################################################################### +############################################################################### +nipp = (1000/IPPms)/nChannels +ippP10sec = nipp*10 +print("{} profiles in 10 seconds".format(ippP10sec)) +############################################################################### +############################################################################### +l = startDate.split('/') #adding day of the year to outPath +datelist = datetime.date(int(l[0]),int(l[1]),int(l[2])) +DOY = datelist.timetuple().tm_yday +year = l[0] +month = l[1].zfill(2) +day = l[2].zfill(2) +doy = str(DOY).zfill(3) + +########################################################################### +########################################################################### +fpathOut = outPath+ "ESF"+l[0]+doy + +if os.path.exists(fpathOut): + print("outPath 1: ", fpathOut) +else : + os.mkdir(fpathOut) + +outPaths = [fpathOut+"/CH{}".format(ch) for ch in range(nChannels)] +########################################################################### +########################################################################### + + +def schainByChannels(channel, Outdata): + ''' + + ''' + + if os.path.exists(Outdata): + print("Outdata {}: ".format(channel), Outdata) + else : + os.mkdir(Outdata) + + controllerObj = Project() + controllerObj.setup(id = channel, name='amisr_test', description='desc') + readUnitConfObj = controllerObj.addReadUnit(datatype='AMISRReader', + path=path, + startDate=startDate,#'2016/07/12', + endDate=endDate,#'2016/07/13', + startTime=startTime,#'18:00:00', + endTime=endTime,#'07:00:00', + walk='1', + code = code, + nCode = nCode, + nBaud = nBaud, + nOsamp = nOsamp, + nFFT = 10, + timezone='lt', + margin_days=5, + online=0) + + proc_volts = controllerObj.addProcUnit(datatype='VoltageProc', inputId=readUnitConfObj.getId()) + + opObj03 = proc_volts.addOperation(name='selectChannels', optype='other') + opObj03.addParameter(name='channelList', value=[channel], format='list') + + opObj01 = proc_volts.addOperation(name='Decoder', optype='other') + opObj01.addParameter(name='code', value=code, format='floatlist') + opObj01.addParameter(name='nCode', value=nCode, format='int') + opObj01.addParameter(name='nBaud', value=nBaud, format='int') + opObj01.addParameter(name='osamp', value=nOsamp, format='int') + + + proc_spc = controllerObj.addProcUnit(datatype='SpectraProc', inputId=proc_volts.getId()) + proc_spc.addParameter(name='nFFTPoints', value=10, format='int') + + + opObj13 = proc_spc.addOperation(name='IncohInt', optype='other') + opObj13.addParameter(name='timeInterval', value='20', format='int') + + + procParam= controllerObj.addProcUnit(datatype='ParametersProc',inputId=proc_spc.getId()) + moments = procParam.addOperation(name='SpectralMoments',optype='other') + opObj12 = procParam.addOperation(name='setAttribute') + opObj12.addParameter(name='type', value='Spectra') + + writer = procParam.addOperation(name='HDFWriter',optype='other') + writer.addParameter(name='path', value=Outdata) + writer.addParameter(name='timeZone', value="ut") + writer.addParameter(name='hourLimit', value=14) + writer.addParameter(name='breakDays', value=False) + writer.addParameter(name='blocksPerFile', value='2340',format='int') #2340รถ +# writer1.addParameter(name='metadataList', value='type,timeZone,frequency,channelList,heightList,ippSeconds,\ +# azimuthList,elevationList,nCohInt,nIncohInt,nFFTPoints',format='list') + writer.addParameter(name='metadataList', value='timeZone,type,unitsDescription,\ +radarControllerHeaderObj.dtype,radarControllerHeaderObj.ipp,radarControllerHeaderObj.txA,radarControllerHeaderObj.frequency,\ +radarControllerHeaderObj.sampleRate,radarControllerHeaderObj.heightList,radarControllerHeaderObj.elevationList,\ +radarControllerHeaderObj.azimuthList,radarControllerHeaderObj.channelList,radarControllerHeaderObj.heightResolution,\ +radarControllerHeaderObj.code,radarControllerHeaderObj.nCode,radarControllerHeaderObj.nBaud,radarControllerHeaderObj.nOsamp,\ +processingHeaderObj.dtype,processingHeaderObj.ipp,processingHeaderObj.nCohInt,processingHeaderObj.nSamplesFFT,\ +processingHeaderObj.nFFTPoints,processingHeaderObj.timeIncohInt,processingHeaderObj.heightList,processingHeaderObj.channelList,processingHeaderObj.elevationList,\ +processingHeaderObj.azimuthList,processingHeaderObj.heightResolution',format='list') + writer.addParameter(name='dataList',value='data_pow,utctime',format='list') + + controllerObj.start() + + +def plotMerged(mergedPath): + + controllerObj = Project() + controllerObj.setup(id = '22164', name='esf_proc', description="plot merged power") + ##.......................................................................... + ##.......................................................................... + + readerUnit = controllerObj.addReadUnit(datatype='HDFReader', + path=mergedPath, + startDate=startDate,#startDate,#'2016/07/12', + endDate=endDate,#endDate,#'2016/07/13', + startTime=startTime,#'07:00:00', + endTime=endTime,#'15:00:00', + walk=0, + timezone='lt', + online=0) + ##.......................................................................... + opObj21 = readerUnit.addOperation(name='PowerPlot', optype='external') + opObj21.addParameter(name='showprofile', value='1', format='int') + opObj21.addParameter(name='xmin', value=xmin, format='int') + opObj21.addParameter(name='xmax', value=xmax, format='int') + opObj21.addParameter(name='zmin', value=100, format='int') + opObj21.addParameter(name='zmax', value=120, format='int') + opObj21.addParameter(name='show', value=0, format='int') + opObj21.addParameter(name='localtime', value=1,format='int') + opObj21.addParameter(name='save', value=mergedPath, format='str') + opObj21.addParameter(name='t_units', value='h', format='str') + + controllerObj.start() + controllerObj.join() + + powerPath = mergedPath +"/pow" + figPaths = [powerPath] + for pch in figPaths: + print("Removing ",pch) + rmtree(pch) + + +if __name__ == '__main__': + + + + pool = [] + for ch in range(nChannels): + p = Process(target=schainByChannels, args=(ch,outPaths[ch])) + pool.append(p) + p.start() + + wait(p.sentinel for p in pool) + time.sleep(10) + + ############################################################################ + ############################################################################ + + print("Starting merging proc...") + if os.path.exists(fpathOut): + print("final path Out: {}: ", fpathOut) + else : + os.mkdir(fpathOut) + + merger = MergeH5(nChannels,fpathOut,dataList,*outPaths) + merger.run() + time.sleep(2) + + ############################################################################ + plotMerged(fpathOut) + ############################################################################ + + print("Removing hdf5 files from channels...") + for pch in outPaths: + rmtree(pch) + print("Proc finished ! :)") diff --git a/schainpy/model/io/utilsIO.py b/schainpy/model/io/utilsIO.py index 514aebf..2c139a7 100644 --- a/schainpy/model/io/utilsIO.py +++ b/schainpy/model/io/utilsIO.py @@ -85,6 +85,8 @@ class MergeH5(object): merger = MergeH5(nChannels,pathOut,list, p0, p1,p2,p3) merger.run() + The file example_FULLmultiprocessing_merge.txt show an application for AMISR data + """ # #__attrs__ = ['paths', 'nChannels']