exampleForMultiprocessing_AMISR_merge.txt
222 lines
| 9.5 KiB
| text/plain
|
TextLexer
r1580 | import os, sys, json | |||
import time | ||||
import datetime | ||||
from multiprocessing import Process | ||||
from shutil import rmtree | ||||
from schainpy.controller import Project | ||||
from multiprocessing.connection import wait | ||||
from schainpy.model.io.utilsIO import MergeH5 | ||||
########################################################################### | ||||
########################################################################### | ||||
dty = datetime.date.today() | ||||
str1 = dty + datetime.timedelta(days=1) | ||||
str2 = dty - datetime.timedelta(days=1) | ||||
today = dty.strftime("%Y/%m/%d") | ||||
tomorrow = str1.strftime("%Y/%m/%d") | ||||
yesterday = str2.strftime("%Y/%m/%d") | ||||
########################################################################### | ||||
########################################################################### | ||||
path= '/media/soporte/Elements/DATA_AMISR/2022' | ||||
#path= '/media/soporte/Elements/DATA_AMISR/2022/' | ||||
outPath = '/home/soporte/Data/data2Fabiano/POWER_H5_AMISR/2022/' | ||||
########################################################################### | ||||
########################################################################### | ||||
xmin = 7 | ||||
xmax = 18 | ||||
localtime=1 | ||||
startDate=today | ||||
endDate=tomorrow | ||||
startDate='2022/09/24' #apunte perp y oblicuo | ||||
endDate='2022/09/25' | ||||
startTime = '18:00:00' | ||||
endTime = '07:00:00' | ||||
############################################################################### | ||||
############################################################################### | ||||
nChannels = 10 | ||||
IPPms =10 | ||||
code = [1,-1,-1,-1,1,1,1,1,-1,-1,-1,1,-1,-1,-1,1,-1,-1,-1,1,-1,-1,1,-1,1,1,-1,1] | ||||
nCode = 1 | ||||
nBaud = 28 | ||||
nOsamp = 2 # oversample | ||||
dataList = ['data_pow','utctime'] | ||||
############################################################################### | ||||
############################################################################### | ||||
nipp = (1000/IPPms)/nChannels | ||||
ippP10sec = nipp*10 | ||||
print("{} profiles in 10 seconds".format(ippP10sec)) | ||||
############################################################################### | ||||
############################################################################### | ||||
l = startDate.split('/') #adding day of the year to outPath | ||||
datelist = datetime.date(int(l[0]),int(l[1]),int(l[2])) | ||||
DOY = datelist.timetuple().tm_yday | ||||
year = l[0] | ||||
month = l[1].zfill(2) | ||||
day = l[2].zfill(2) | ||||
doy = str(DOY).zfill(3) | ||||
########################################################################### | ||||
########################################################################### | ||||
fpathOut = outPath+ "ESF"+l[0]+doy | ||||
if os.path.exists(fpathOut): | ||||
print("outPath 1: ", fpathOut) | ||||
else : | ||||
os.mkdir(fpathOut) | ||||
outPaths = [fpathOut+"/CH{}".format(ch) for ch in range(nChannels)] | ||||
########################################################################### | ||||
########################################################################### | ||||
def schainByChannels(channel, Outdata): | ||||
''' | ||||
''' | ||||
if os.path.exists(Outdata): | ||||
print("Outdata {}: ".format(channel), Outdata) | ||||
else : | ||||
os.mkdir(Outdata) | ||||
controllerObj = Project() | ||||
controllerObj.setup(id = channel, name='amisr_test', description='desc') | ||||
readUnitConfObj = controllerObj.addReadUnit(datatype='AMISRReader', | ||||
path=path, | ||||
startDate=startDate,#'2016/07/12', | ||||
endDate=endDate,#'2016/07/13', | ||||
startTime=startTime,#'18:00:00', | ||||
endTime=endTime,#'07:00:00', | ||||
walk='1', | ||||
code = code, | ||||
nCode = nCode, | ||||
nBaud = nBaud, | ||||
nOsamp = nOsamp, | ||||
nFFT = 10, | ||||
timezone='lt', | ||||
margin_days=5, | ||||
online=0) | ||||
proc_volts = controllerObj.addProcUnit(datatype='VoltageProc', inputId=readUnitConfObj.getId()) | ||||
opObj03 = proc_volts.addOperation(name='selectChannels', optype='other') | ||||
opObj03.addParameter(name='channelList', value=[channel], format='list') | ||||
opObj01 = proc_volts.addOperation(name='Decoder', optype='other') | ||||
opObj01.addParameter(name='code', value=code, format='floatlist') | ||||
opObj01.addParameter(name='nCode', value=nCode, format='int') | ||||
opObj01.addParameter(name='nBaud', value=nBaud, format='int') | ||||
opObj01.addParameter(name='osamp', value=nOsamp, format='int') | ||||
proc_spc = controllerObj.addProcUnit(datatype='SpectraProc', inputId=proc_volts.getId()) | ||||
proc_spc.addParameter(name='nFFTPoints', value=10, format='int') | ||||
opObj13 = proc_spc.addOperation(name='IncohInt', optype='other') | ||||
opObj13.addParameter(name='timeInterval', value='20', format='int') | ||||
procParam= controllerObj.addProcUnit(datatype='ParametersProc',inputId=proc_spc.getId()) | ||||
moments = procParam.addOperation(name='SpectralMoments',optype='other') | ||||
opObj12 = procParam.addOperation(name='setAttribute') | ||||
opObj12.addParameter(name='type', value='Spectra') | ||||
writer = procParam.addOperation(name='HDFWriter',optype='other') | ||||
writer.addParameter(name='path', value=Outdata) | ||||
writer.addParameter(name='timeZone', value="ut") | ||||
writer.addParameter(name='hourLimit', value=14) | ||||
writer.addParameter(name='breakDays', value=False) | ||||
writer.addParameter(name='blocksPerFile', value='2340',format='int') #2340รถ | ||||
# writer1.addParameter(name='metadataList', value='type,timeZone,frequency,channelList,heightList,ippSeconds,\ | ||||
# azimuthList,elevationList,nCohInt,nIncohInt,nFFTPoints',format='list') | ||||
writer.addParameter(name='metadataList', value='timeZone,type,unitsDescription,\ | ||||
radarControllerHeaderObj.dtype,radarControllerHeaderObj.ipp,radarControllerHeaderObj.txA,radarControllerHeaderObj.frequency,\ | ||||
radarControllerHeaderObj.sampleRate,radarControllerHeaderObj.heightList,radarControllerHeaderObj.elevationList,\ | ||||
radarControllerHeaderObj.azimuthList,radarControllerHeaderObj.channelList,radarControllerHeaderObj.heightResolution,\ | ||||
radarControllerHeaderObj.code,radarControllerHeaderObj.nCode,radarControllerHeaderObj.nBaud,radarControllerHeaderObj.nOsamp,\ | ||||
processingHeaderObj.dtype,processingHeaderObj.ipp,processingHeaderObj.nCohInt,processingHeaderObj.nSamplesFFT,\ | ||||
processingHeaderObj.nFFTPoints,processingHeaderObj.timeIncohInt,processingHeaderObj.heightList,processingHeaderObj.channelList,processingHeaderObj.elevationList,\ | ||||
processingHeaderObj.azimuthList,processingHeaderObj.heightResolution',format='list') | ||||
writer.addParameter(name='dataList',value='data_pow,utctime',format='list') | ||||
controllerObj.start() | ||||
def plotMerged(mergedPath): | ||||
controllerObj = Project() | ||||
controllerObj.setup(id = '22164', name='esf_proc', description="plot merged power") | ||||
##.......................................................................... | ||||
##.......................................................................... | ||||
readerUnit = controllerObj.addReadUnit(datatype='HDFReader', | ||||
path=mergedPath, | ||||
startDate=startDate,#startDate,#'2016/07/12', | ||||
endDate=endDate,#endDate,#'2016/07/13', | ||||
startTime=startTime,#'07:00:00', | ||||
endTime=endTime,#'15:00:00', | ||||
walk=0, | ||||
timezone='lt', | ||||
online=0) | ||||
##.......................................................................... | ||||
opObj21 = readerUnit.addOperation(name='PowerPlot', optype='external') | ||||
opObj21.addParameter(name='showprofile', value='1', format='int') | ||||
opObj21.addParameter(name='xmin', value=xmin, format='int') | ||||
opObj21.addParameter(name='xmax', value=xmax, format='int') | ||||
opObj21.addParameter(name='zmin', value=100, format='int') | ||||
opObj21.addParameter(name='zmax', value=120, format='int') | ||||
opObj21.addParameter(name='show', value=0, format='int') | ||||
opObj21.addParameter(name='localtime', value=1,format='int') | ||||
opObj21.addParameter(name='save', value=mergedPath, format='str') | ||||
opObj21.addParameter(name='t_units', value='h', format='str') | ||||
controllerObj.start() | ||||
controllerObj.join() | ||||
powerPath = mergedPath +"/pow" | ||||
figPaths = [powerPath] | ||||
for pch in figPaths: | ||||
print("Removing ",pch) | ||||
rmtree(pch) | ||||
if __name__ == '__main__': | ||||
pool = [] | ||||
for ch in range(nChannels): | ||||
p = Process(target=schainByChannels, args=(ch,outPaths[ch])) | ||||
pool.append(p) | ||||
p.start() | ||||
wait(p.sentinel for p in pool) | ||||
time.sleep(10) | ||||
############################################################################ | ||||
############################################################################ | ||||
print("Starting merging proc...") | ||||
if os.path.exists(fpathOut): | ||||
print("final path Out: {}: ", fpathOut) | ||||
else : | ||||
os.mkdir(fpathOut) | ||||
merger = MergeH5(nChannels,fpathOut,dataList,*outPaths) | ||||
merger.run() | ||||
time.sleep(2) | ||||
############################################################################ | ||||
plotMerged(fpathOut) | ||||
############################################################################ | ||||
print("Removing hdf5 files from channels...") | ||||
for pch in outPaths: | ||||
rmtree(pch) | ||||
print("Proc finished ! :)") | ||||