jroIO_param.py
1435 lines
| 45.5 KiB
| text/x-python
|
PythonLexer
|
r848 | import numpy | ||
import time | ||||
import os | ||||
import h5py | ||||
import re | ||||
import datetime | ||||
r1241 | import schainpy.admin | |||
|
r848 | from schainpy.model.data.jrodata import * | ||
|
r1179 | from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator | ||
|
r848 | from schainpy.model.io.jroIO_base import * | ||
|
r1179 | from schainpy.utils import log | ||
|
r848 | |||
|
r1179 | @MPDecorator | ||
class ParamReader(JRODataReader,ProcessingUnit): | ||||
|
r848 | ''' | ||
Reads HDF5 format files | ||||
path | ||||
startDate | ||||
endDate | ||||
startTime | ||||
endTime | ||||
''' | ||||
|
r897 | |||
|
r848 | ext = ".hdf5" | ||
optchar = "D" | ||||
timezone = None | ||||
startTime = None | ||||
endTime = None | ||||
fileIndex = None | ||||
utcList = None #To select data in the utctime list | ||||
blockList = None #List to blocks to be read from the file | ||||
|
r897 | blocksPerFile = None #Number of blocks to be read | ||
|
r848 | blockIndex = None | ||
path = None | ||||
#List of Files | ||||
filenameList = None | ||||
datetimeList = None | ||||
#Hdf5 File | ||||
listMetaname = None | ||||
listMeta = None | ||||
listDataname = None | ||||
listData = None | ||||
listShapes = None | ||||
fp = None | ||||
#dataOut reconstruction | ||||
dataOut = None | ||||
|
r897 | |||
|
r1179 | def __init__(self):#, **kwargs): | ||
ProcessingUnit.__init__(self) #, **kwargs) | ||||
|
r848 | self.dataOut = Parameters() | ||
return | ||||
|
r897 | |||
def setup(self, **kwargs): | ||||
|
r848 | path = kwargs['path'] | ||
startDate = kwargs['startDate'] | ||||
endDate = kwargs['endDate'] | ||||
startTime = kwargs['startTime'] | ||||
endTime = kwargs['endTime'] | ||||
walk = kwargs['walk'] | ||||
|
r1167 | if 'ext' in kwargs: | ||
|
r848 | ext = kwargs['ext'] | ||
else: | ||||
ext = '.hdf5' | ||||
|
r1167 | if 'timezone' in kwargs: | ||
|
r848 | self.timezone = kwargs['timezone'] | ||
else: | ||||
self.timezone = 'lt' | ||||
|
r897 | |||
|
r1167 | print("[Reading] Searching files in offline mode ...") | ||
|
r1052 | pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate, | ||
|
r848 | startTime=startTime, endTime=endTime, | ||
|
r897 | ext=ext, walk=walk) | ||
|
r848 | if not(filenameList): | ||
|
r1167 | print("There is no files into the folder: %s"%(path)) | ||
|
r848 | sys.exit(-1) | ||
self.fileIndex = -1 | ||||
self.startTime = startTime | ||||
self.endTime = endTime | ||||
|
r897 | |||
|
r848 | self.__readMetadata() | ||
|
r897 | |||
|
r848 | self.__setNextFileOffline() | ||
|
r897 | |||
|
r848 | return | ||
|
r897 | |||
|
r1052 | def searchFilesOffLine(self, | ||
|
r848 | path, | ||
startDate=None, | ||||
endDate=None, | ||||
startTime=datetime.time(0,0,0), | ||||
endTime=datetime.time(23,59,59), | ||||
ext='.hdf5', | ||||
walk=True): | ||||
|
r897 | |||
|
r848 | expLabel = '' | ||
self.filenameList = [] | ||||
self.datetimeList = [] | ||||
|
r897 | |||
|
r848 | pathList = [] | ||
|
r897 | |||
|
r848 | JRODataObj = JRODataReader() | ||
dateList, pathList = JRODataObj.findDatafiles(path, startDate, endDate, expLabel, ext, walk, include_path=True) | ||||
|
r897 | |||
|
r848 | if dateList == []: | ||
|
r1167 | print("[Reading] No *%s files in %s from %s to %s)"%(ext, path, | ||
|
r848 | datetime.datetime.combine(startDate,startTime).ctime(), | ||
|
r1167 | datetime.datetime.combine(endDate,endTime).ctime())) | ||
|
r897 | |||
|
r848 | return None, None | ||
|
r897 | |||
|
r848 | if len(dateList) > 1: | ||
|
r1167 | print("[Reading] %d days were found in date range: %s - %s" %(len(dateList), startDate, endDate)) | ||
|
r848 | else: | ||
|
r1167 | print("[Reading] data was found for the date %s" %(dateList[0])) | ||
|
r897 | |||
|
r848 | filenameList = [] | ||
datetimeList = [] | ||||
|
r897 | |||
|
r848 | #---------------------------------------------------------------------------------- | ||
|
r897 | |||
|
r848 | for thisPath in pathList: | ||
|
r897 | |||
|
r848 | fileList = glob.glob1(thisPath, "*%s" %ext) | ||
fileList.sort() | ||||
|
r897 | |||
|
r848 | for file in fileList: | ||
|
r897 | |||
|
r848 | filename = os.path.join(thisPath,file) | ||
|
r897 | |||
|
r848 | if not isFileInDateRange(filename, startDate, endDate): | ||
continue | ||||
|
r897 | |||
|
r848 | thisDatetime = self.__isFileInTimeRange(filename, startDate, endDate, startTime, endTime) | ||
|
r897 | |||
|
r848 | if not(thisDatetime): | ||
continue | ||||
|
r897 | |||
|
r848 | filenameList.append(filename) | ||
datetimeList.append(thisDatetime) | ||||
|
r897 | |||
|
r848 | if not(filenameList): | ||
|
r1167 | print("[Reading] Any file was found int time range %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime())) | ||
|
r848 | return None, None | ||
|
r897 | |||
|
r1167 | print("[Reading] %d file(s) was(were) found in time range: %s - %s" %(len(filenameList), startTime, endTime)) | ||
print() | ||||
|
r897 | |||
|
r848 | self.filenameList = filenameList | ||
self.datetimeList = datetimeList | ||||
|
r897 | |||
|
r848 | return pathList, filenameList | ||
|
r897 | |||
|
r848 | def __isFileInTimeRange(self,filename, startDate, endDate, startTime, endTime): | ||
|
r897 | |||
|
r848 | """ | ||
Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado. | ||||
|
r897 | |||
|
r848 | Inputs: | ||
filename : nombre completo del archivo de datos en formato Jicamarca (.r) | ||||
startDate : fecha inicial del rango seleccionado en formato datetime.date | ||||
endDate : fecha final del rango seleccionado en formato datetime.date | ||||
startTime : tiempo inicial del rango seleccionado en formato datetime.time | ||||
endTime : tiempo final del rango seleccionado en formato datetime.time | ||||
|
r897 | |||
|
r848 | Return: | ||
Boolean : Retorna True si el archivo de datos contiene datos en el rango de | ||||
fecha especificado, de lo contrario retorna False. | ||||
|
r897 | |||
|
r848 | Excepciones: | ||
Si el archivo no existe o no puede ser abierto | ||||
Si la cabecera no puede ser leida. | ||||
|
r897 | |||
|
r848 | """ | ||
|
r897 | |||
|
r848 | try: | ||
fp = h5py.File(filename,'r') | ||||
grp1 = fp['Data'] | ||||
|
r897 | |||
|
r848 | except IOError: | ||
traceback.print_exc() | ||||
|
r1167 | raise IOError("The file %s can't be opened" %(filename)) | ||
r1279 | ||||
|
r848 | #In case has utctime attribute | ||
grp2 = grp1['utctime'] | ||||
|
r1193 | # thisUtcTime = grp2.value[0] - 5*3600 #To convert to local time | ||
|
r848 | thisUtcTime = grp2.value[0] | ||
fp.close() | ||||
|
r897 | |||
|
r848 | if self.timezone == 'lt': | ||
thisUtcTime -= 5*3600 | ||||
|
r897 | |||
|
r848 | thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600) | ||
thisDate = thisDatetime.date() | ||||
thisTime = thisDatetime.time() | ||||
|
r897 | |||
|
r848 | startUtcTime = (datetime.datetime.combine(thisDate,startTime)- datetime.datetime(1970, 1, 1)).total_seconds() | ||
endUtcTime = (datetime.datetime.combine(thisDate,endTime)- datetime.datetime(1970, 1, 1)).total_seconds() | ||||
|
r897 | |||
|
r848 | #General case | ||
# o>>>>>>>>>>>>>><<<<<<<<<<<<<<o | ||||
#-----------o----------------------------o----------- | ||||
# startTime endTime | ||||
|
r897 | |||
|
r848 | if endTime >= startTime: | ||
|
r897 | thisUtcLog = numpy.logical_and(thisUtcTime > startUtcTime, thisUtcTime < endUtcTime) | ||
|
r848 | if numpy.any(thisUtcLog): #If there is one block between the hours mentioned | ||
return thisDatetime | ||||
return None | ||||
|
r897 | |||
#If endTime < startTime then endTime belongs to the next day | ||||
|
r848 | #<<<<<<<<<<<o o>>>>>>>>>>> | ||
#-----------o----------------------------o----------- | ||||
# endTime startTime | ||||
|
r897 | |||
|
r848 | if (thisDate == startDate) and numpy.all(thisUtcTime < startUtcTime): | ||
return None | ||||
|
r897 | |||
|
r848 | if (thisDate == endDate) and numpy.all(thisUtcTime > endUtcTime): | ||
return None | ||||
|
r897 | |||
|
r848 | if numpy.all(thisUtcTime < startUtcTime) and numpy.all(thisUtcTime > endUtcTime): | ||
return None | ||||
|
r897 | |||
|
r848 | return thisDatetime | ||
|
r897 | |||
|
r848 | def __setNextFileOffline(self): | ||
|
r897 | |||
|
r848 | self.fileIndex += 1 | ||
idFile = self.fileIndex | ||||
|
r897 | |||
|
r848 | if not(idFile < len(self.filenameList)): | ||
r1241 | raise schainpy.admin.SchainError("No more Files") | |||
|
r848 | return 0 | ||
filename = self.filenameList[idFile] | ||||
filePointer = h5py.File(filename,'r') | ||||
self.filename = filename | ||||
self.fp = filePointer | ||||
|
r1167 | print("Setting the file: %s"%self.filename) | ||
|
r897 | |||
|
r848 | self.__setBlockList() | ||
self.__readData() | ||||
self.blockIndex = 0 | ||||
return 1 | ||||
|
r897 | |||
|
r848 | def __setBlockList(self): | ||
''' | ||||
Selects the data within the times defined | ||||
|
r897 | |||
|
r848 | self.fp | ||
self.startTime | ||||
self.endTime | ||||
|
r897 | |||
|
r848 | self.blockList | ||
self.blocksPerFile | ||||
|
r897 | |||
|
r848 | ''' | ||
fp = self.fp | ||||
startTime = self.startTime | ||||
endTime = self.endTime | ||||
|
r897 | |||
|
r848 | grp = fp['Data'] | ||
thisUtcTime = grp['utctime'].value.astype(numpy.float)[0] | ||||
|
r897 | |||
|
r848 | #ERROOOOR | ||
if self.timezone == 'lt': | ||||
thisUtcTime -= 5*3600 | ||||
|
r897 | |||
|
r848 | thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600) | ||
|
r897 | |||
|
r848 | thisDate = thisDatetime.date() | ||
thisTime = thisDatetime.time() | ||||
|
r897 | |||
|
r848 | startUtcTime = (datetime.datetime.combine(thisDate,startTime) - datetime.datetime(1970, 1, 1)).total_seconds() | ||
endUtcTime = (datetime.datetime.combine(thisDate,endTime) - datetime.datetime(1970, 1, 1)).total_seconds() | ||||
|
r897 | |||
|
r848 | ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0] | ||
|
r897 | |||
|
r848 | self.blockList = ind | ||
self.blocksPerFile = len(ind) | ||||
|
r897 | |||
|
r848 | return | ||
|
r897 | |||
|
r848 | def __readMetadata(self): | ||
''' | ||||
Reads Metadata | ||||
|
r897 | |||
self.pathMeta | ||||
|
r848 | self.listShapes | ||
self.listMetaname | ||||
self.listMeta | ||||
|
r897 | |||
|
r848 | ''' | ||
|
r897 | |||
|
r848 | filename = self.filenameList[0] | ||
fp = h5py.File(filename,'r') | ||||
gp = fp['Metadata'] | ||||
|
r897 | |||
|
r848 | listMetaname = [] | ||
listMetadata = [] | ||||
|
r1167 | for item in list(gp.items()): | ||
|
r848 | name = item[0] | ||
|
r897 | |||
|
r848 | if name=='array dimensions': | ||
table = gp[name][:] | ||||
listShapes = {} | ||||
for shapes in table: | ||||
listShapes[shapes[0]] = numpy.array([shapes[1],shapes[2],shapes[3],shapes[4],shapes[5]]) | ||||
else: | ||||
data = gp[name].value | ||||
listMetaname.append(name) | ||||
listMetadata.append(data) | ||||
|
r897 | |||
|
r848 | self.listShapes = listShapes | ||
self.listMetaname = listMetaname | ||||
self.listMeta = listMetadata | ||||
|
r897 | |||
|
r848 | fp.close() | ||
return | ||||
|
r897 | |||
|
r848 | def __readData(self): | ||
grp = self.fp['Data'] | ||||
listdataname = [] | ||||
listdata = [] | ||||
|
r897 | |||
|
r1167 | for item in list(grp.items()): | ||
|
r848 | name = item[0] | ||
listdataname.append(name) | ||||
|
r897 | |||
|
r848 | array = self.__setDataArray(grp[name],self.listShapes[name]) | ||
listdata.append(array) | ||||
|
r897 | |||
|
r848 | self.listDataname = listdataname | ||
self.listData = listdata | ||||
return | ||||
|
r897 | |||
|
r872 | def __setDataArray(self, dataset, shapes): | ||
|
r897 | |||
nDims = shapes[0] | ||||
|
r872 | nDim2 = shapes[1] #Dimension 0 | ||
nDim1 = shapes[2] #Dimension 1, number of Points or Parameters | ||||
nDim0 = shapes[3] #Dimension 2, number of samples or ranges | ||||
mode = shapes[4] #Mode of storing | ||||
blockList = self.blockList | ||||
blocksPerFile = self.blocksPerFile | ||||
|
r897 | |||
|
r872 | #Depending on what mode the data was stored | ||
if mode == 0: #Divided in channels | ||||
arrayData = dataset.value.astype(numpy.float)[0][blockList] | ||||
if mode == 1: #Divided in parameter | ||||
strds = 'table' | ||||
nDatas = nDim1 | ||||
newShapes = (blocksPerFile,nDim2,nDim0) | ||||
elif mode==2: #Concatenated in a table | ||||
strds = 'table0' | ||||
arrayData = dataset[strds].value | ||||
#Selecting part of the dataset | ||||
utctime = arrayData[:,0] | ||||
u, indices = numpy.unique(utctime, return_index=True) | ||||
|
r897 | |||
|
r872 | if blockList.size != indices.size: | ||
indMin = indices[blockList[0]] | ||||
if blockList[1] + 1 >= indices.size: | ||||
arrayData = arrayData[indMin:,:] | ||||
else: | ||||
indMax = indices[blockList[1] + 1] | ||||
arrayData = arrayData[indMin:indMax,:] | ||||
return arrayData | ||||
|
r897 | |||
# One dimension | ||||
|
r872 | if nDims == 0: | ||
arrayData = dataset.value.astype(numpy.float)[0][blockList] | ||||
|
r897 | |||
# Two dimensions | ||||
|
r872 | elif nDims == 2: | ||
arrayData = numpy.zeros((blocksPerFile,nDim1,nDim0)) | ||||
newShapes = (blocksPerFile,nDim0) | ||||
|
r897 | nDatas = nDim1 | ||
for i in range(nDatas): | ||||
|
r872 | data = dataset[strds + str(i)].value | ||
|
r897 | arrayData[:,i,:] = data[blockList,:] | ||
# Three dimensions | ||||
|
r872 | else: | ||
|
r897 | arrayData = numpy.zeros((blocksPerFile,nDim2,nDim1,nDim0)) | ||
|
r872 | for i in range(nDatas): | ||
|
r897 | |||
|
r872 | data = dataset[strds + str(i)].value | ||
|
r897 | |||
|
r872 | for b in range(blockList.size): | ||
arrayData[b,:,i,:] = data[:,:,blockList[b]] | ||||
|
r897 | |||
return arrayData | ||||
|
r848 | def __setDataOut(self): | ||
listMeta = self.listMeta | ||||
listMetaname = self.listMetaname | ||||
listDataname = self.listDataname | ||||
listData = self.listData | ||||
listShapes = self.listShapes | ||||
|
r897 | |||
|
r848 | blockIndex = self.blockIndex | ||
|
r1193 | # blockList = self.blockList | ||
|
r897 | |||
|
r848 | for i in range(len(listMeta)): | ||
setattr(self.dataOut,listMetaname[i],listMeta[i]) | ||||
|
r897 | |||
|
r848 | for j in range(len(listData)): | ||
nShapes = listShapes[listDataname[j]][0] | ||||
mode = listShapes[listDataname[j]][4] | ||||
if nShapes == 1: | ||||
setattr(self.dataOut,listDataname[j],listData[j][blockIndex]) | ||||
elif nShapes > 1: | ||||
setattr(self.dataOut,listDataname[j],listData[j][blockIndex,:]) | ||||
elif mode==0: | ||||
setattr(self.dataOut,listDataname[j],listData[j][blockIndex]) | ||||
#Mode Meteors | ||||
elif mode ==2: | ||||
selectedData = self.__selectDataMode2(listData[j], blockIndex) | ||||
setattr(self.dataOut, listDataname[j], selectedData) | ||||
return | ||||
|
r897 | |||
|
r848 | def __selectDataMode2(self, data, blockIndex): | ||
utctime = data[:,0] | ||||
aux, indices = numpy.unique(utctime, return_inverse=True) | ||||
selInd = numpy.where(indices == blockIndex)[0] | ||||
selData = data[selInd,:] | ||||
|
r897 | |||
|
r848 | return selData | ||
|
r897 | |||
|
r848 | def getData(self): | ||
|
r897 | |||
|
r848 | if self.blockIndex==self.blocksPerFile: | ||
if not( self.__setNextFileOffline() ): | ||||
self.dataOut.flagNoData = True | ||||
return 0 | ||||
self.__setDataOut() | ||||
self.dataOut.flagNoData = False | ||||
|
r897 | |||
|
r848 | self.blockIndex += 1 | ||
|
r897 | |||
|
r848 | return | ||
|
r897 | |||
|
r848 | def run(self, **kwargs): | ||
|
r897 | |||
|
r848 | if not(self.isConfig): | ||
self.setup(**kwargs) | ||||
self.isConfig = True | ||||
|
r897 | |||
|
r848 | self.getData() | ||
|
r897 | |||
|
r848 | return | ||
|
r1193 | |||
|
r1179 | @MPDecorator | ||
|
r848 | class ParamWriter(Operation): | ||
''' | ||||
|
r897 | HDF5 Writer, stores parameters data in HDF5 format files | ||
|
r848 | path: path where the files will be stored | ||
blocksPerFile: number of blocks that will be saved in per HDF5 format file | ||||
mode: selects the data stacking mode: '0' channels, '1' parameters, '3' table (for meteors) | ||||
metadataList: list of attributes that will be stored as metadata | ||||
dataList: list of attributes that will be stores as data | ||||
''' | ||||
|
r897 | |||
|
r848 | ext = ".hdf5" | ||
optchar = "D" | ||||
metaoptchar = "M" | ||||
metaFile = None | ||||
filename = None | ||||
path = None | ||||
setFile = None | ||||
fp = None | ||||
grp = None | ||||
ds = None | ||||
firsttime = True | ||||
#Configurations | ||||
blocksPerFile = None | ||||
blockIndex = None | ||||
dataOut = None | ||||
#Data Arrays | ||||
dataList = None | ||||
metadataList = None | ||||
dsList = None #List of dictionaries with dataset properties | ||||
tableDim = None | ||||
dtype = [('arrayName', 'S20'),('nDimensions', 'i'), ('dim2', 'i'), ('dim1', 'i'),('dim0', 'i'),('mode', 'b')] | ||||
currentDay = None | ||||
|
r853 | lastTime = None | ||
|
r1222 | setType = None | ||
|
r897 | |||
|
r1193 | def __init__(self): | ||
r1279 | ||||
|
r1193 | Operation.__init__(self) | ||
|
r848 | return | ||
|
r897 | |||
|
r1193 | def setup(self, dataOut, path=None, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None): | ||
|
r955 | self.path = path | ||
self.blocksPerFile = blocksPerFile | ||||
self.metadataList = metadataList | ||||
self.dataList = dataList | ||||
|
r848 | self.dataOut = dataOut | ||
self.mode = mode | ||||
|
r955 | if self.mode is not None: | ||
self.mode = numpy.zeros(len(self.dataList)) + mode | ||||
else: | ||||
self.mode = numpy.ones(len(self.dataList)) | ||||
|
r1193 | |||
self.setType = setType | ||||
|
r897 | |||
|
r848 | arrayDim = numpy.zeros((len(self.dataList),5)) | ||
|
r897 | |||
#Table dimensions | ||||
|
r848 | dtype0 = self.dtype | ||
tableList = [] | ||||
|
r897 | |||
|
r848 | #Dictionary and list of tables | ||
dsList = [] | ||||
|
r897 | |||
|
r848 | for i in range(len(self.dataList)): | ||
dsDict = {} | ||||
dataAux = getattr(self.dataOut, self.dataList[i]) | ||||
dsDict['variable'] = self.dataList[i] | ||||
#--------------------- Conditionals ------------------------ | ||||
#There is no data | ||||
r1279 | ||||
|
r897 | if dataAux is None: | ||
r1279 | ||||
|
r848 | return 0 | ||
|
r897 | |||
|
r1195 | if isinstance(dataAux, (int, float, numpy.integer, numpy.float)): | ||
|
r848 | dsDict['mode'] = 0 | ||
dsDict['nDim'] = 0 | ||||
arrayDim[i,0] = 0 | ||||
dsList.append(dsDict) | ||||
#Mode 2: meteors | ||||
|
r1179 | elif self.mode[i] == 2: | ||
|
r897 | dsDict['dsName'] = 'table0' | ||
|
r848 | dsDict['mode'] = 2 # Mode meteors | ||
dsDict['shape'] = dataAux.shape[-1] | ||||
dsDict['nDim'] = 0 | ||||
dsDict['dsNumber'] = 1 | ||||
arrayDim[i,3] = dataAux.shape[-1] | ||||
|
r1179 | arrayDim[i,4] = self.mode[i] #Mode the data was stored | ||
|
r848 | dsList.append(dsDict) | ||
|
r897 | |||
|
r848 | #Mode 1 | ||
else: | ||||
arrayDim0 = dataAux.shape #Data dimensions | ||||
arrayDim[i,0] = len(arrayDim0) #Number of array dimensions | ||||
|
r1179 | arrayDim[i,4] = self.mode[i] #Mode the data was stored | ||
|
r848 | strtable = 'table' | ||
dsDict['mode'] = 1 # Mode parameters | ||||
|
r897 | |||
|
r848 | # Three-dimension arrays | ||
if len(arrayDim0) == 3: | ||||
arrayDim[i,1:-1] = numpy.array(arrayDim0) | ||||
nTables = int(arrayDim[i,2]) | ||||
dsDict['dsNumber'] = nTables | ||||
dsDict['shape'] = arrayDim[i,2:4] | ||||
dsDict['nDim'] = 3 | ||||
|
r897 | |||
|
r848 | for j in range(nTables): | ||
dsDict = dsDict.copy() | ||||
dsDict['dsName'] = strtable + str(j) | ||||
dsList.append(dsDict) | ||||
|
r897 | |||
|
r848 | # Two-dimension arrays | ||
elif len(arrayDim0) == 2: | ||||
arrayDim[i,2:-1] = numpy.array(arrayDim0) | ||||
nTables = int(arrayDim[i,2]) | ||||
dsDict['dsNumber'] = nTables | ||||
dsDict['shape'] = arrayDim[i,3] | ||||
dsDict['nDim'] = 2 | ||||
|
r897 | |||
|
r848 | for j in range(nTables): | ||
dsDict = dsDict.copy() | ||||
dsDict['dsName'] = strtable + str(j) | ||||
dsList.append(dsDict) | ||||
|
r897 | |||
|
r848 | # One-dimension arrays | ||
elif len(arrayDim0) == 1: | ||||
arrayDim[i,3] = arrayDim0[0] | ||||
dsDict['shape'] = arrayDim0[0] | ||||
dsDict['dsNumber'] = 1 | ||||
dsDict['dsName'] = strtable + str(0) | ||||
dsDict['nDim'] = 1 | ||||
dsList.append(dsDict) | ||||
|
r897 | |||
|
r848 | table = numpy.array((self.dataList[i],) + tuple(arrayDim[i,:]),dtype = dtype0) | ||
tableList.append(table) | ||||
|
r897 | |||
self.dsList = dsList | ||||
self.tableDim = numpy.array(tableList, dtype = dtype0) | ||||
|
r848 | self.blockIndex = 0 | ||
timeTuple = time.localtime(dataOut.utctime) | ||||
self.currentDay = timeTuple.tm_yday | ||||
def putMetadata(self): | ||||
|
r897 | |||
|
r848 | fp = self.createMetadataFile() | ||
|
r897 | self.writeMetadata(fp) | ||
|
r848 | fp.close() | ||
return | ||||
|
r897 | |||
|
r848 | def createMetadataFile(self): | ||
ext = self.ext | ||||
path = self.path | ||||
setFile = self.setFile | ||||
|
r897 | |||
|
r848 | timeTuple = time.localtime(self.dataOut.utctime) | ||
|
r897 | |||
subfolder = '' | ||||
|
r848 | fullpath = os.path.join( path, subfolder ) | ||
|
r897 | |||
|
r848 | if not( os.path.exists(fullpath) ): | ||
os.mkdir(fullpath) | ||||
setFile = -1 #inicializo mi contador de seteo | ||||
|
r897 | |||
subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday) | ||||
|
r848 | fullpath = os.path.join( path, subfolder ) | ||
|
r897 | |||
|
r848 | if not( os.path.exists(fullpath) ): | ||
os.mkdir(fullpath) | ||||
|
r897 | setFile = -1 #inicializo mi contador de seteo | ||
|
r848 | |||
else: | ||||
filesList = os.listdir( fullpath ) | ||||
filesList = sorted( filesList, key=str.lower ) | ||||
if len( filesList ) > 0: | ||||
|
r1193 | filesList = [k for k in filesList if k.startswith(self.metaoptchar)] | ||
|
r848 | filen = filesList[-1] | ||
# el filename debera tener el siguiente formato | ||||
# 0 1234 567 89A BCDE (hex) | ||||
# x YYYY DDD SSS .ext | ||||
if isNumber( filen[8:11] ): | ||||
setFile = int( filen[8:11] ) #inicializo mi contador de seteo al seteo del ultimo file | ||||
|
r897 | else: | ||
|
r848 | setFile = -1 | ||
else: | ||||
setFile = -1 #inicializo mi contador de seteo | ||||
|
r986 | if self.setType is None: | ||
setFile += 1 | ||||
file = '%s%4.4d%3.3d%03d%s' % (self.metaoptchar, | ||||
timeTuple.tm_year, | ||||
timeTuple.tm_yday, | ||||
setFile, | ||||
ext ) | ||||
else: | ||||
setFile = timeTuple.tm_hour*60+timeTuple.tm_min | ||||
file = '%s%4.4d%3.3d%04d%s' % (self.metaoptchar, | ||||
timeTuple.tm_year, | ||||
timeTuple.tm_yday, | ||||
setFile, | ||||
ext ) | ||||
|
r848 | |||
filename = os.path.join( path, subfolder, file ) | ||||
self.metaFile = file | ||||
#Setting HDF5 File | ||||
fp = h5py.File(filename,'w') | ||||
return fp | ||||
|
r897 | |||
def writeMetadata(self, fp): | ||||
|
r848 | grp = fp.create_group("Metadata") | ||
grp.create_dataset('array dimensions', data = self.tableDim, dtype = self.dtype) | ||||
|
r897 | |||
|
r848 | for i in range(len(self.metadataList)): | ||
grp.create_dataset(self.metadataList[i], data=getattr(self.dataOut, self.metadataList[i])) | ||||
return | ||||
|
r897 | |||
|
r853 | def timeFlag(self): | ||
currentTime = self.dataOut.utctime | ||||
|
r897 | |||
|
r853 | if self.lastTime is None: | ||
self.lastTime = currentTime | ||||
|
r897 | |||
|
r853 | #Day | ||
timeTuple = time.localtime(currentTime) | ||||
|
r848 | dataDay = timeTuple.tm_yday | ||
|
r897 | |||
|
r853 | #Time | ||
timeDiff = currentTime - self.lastTime | ||||
|
r897 | |||
|
r853 | #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora | ||
if dataDay != self.currentDay: | ||||
self.currentDay = dataDay | ||||
return True | ||||
elif timeDiff > 3*60*60: | ||||
self.lastTime = currentTime | ||||
return True | ||||
else: | ||||
self.lastTime = currentTime | ||||
return False | ||||
|
r848 | |||
def setNextFile(self): | ||||
r1279 | ||||
|
r848 | ext = self.ext | ||
path = self.path | ||||
setFile = self.setFile | ||||
mode = self.mode | ||||
|
r897 | |||
|
r848 | timeTuple = time.localtime(self.dataOut.utctime) | ||
subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday) | ||||
fullpath = os.path.join( path, subfolder ) | ||||
|
r897 | |||
|
r848 | if os.path.exists(fullpath): | ||
filesList = os.listdir( fullpath ) | ||||
r1279 | ##filesList = [k for k in filesList if 'M' in k] | |||
|
r848 | if len( filesList ) > 0: | ||
filesList = sorted( filesList, key=str.lower ) | ||||
filen = filesList[-1] | ||||
# el filename debera tener el siguiente formato | ||||
# 0 1234 567 89A BCDE (hex) | ||||
# x YYYY DDD SSS .ext | ||||
if isNumber( filen[8:11] ): | ||||
setFile = int( filen[8:11] ) #inicializo mi contador de seteo al seteo del ultimo file | ||||
|
r897 | else: | ||
|
r848 | setFile = -1 | ||
else: | ||||
setFile = -1 #inicializo mi contador de seteo | ||||
else: | ||||
|
r928 | os.makedirs(fullpath) | ||
|
r897 | setFile = -1 #inicializo mi contador de seteo | ||
|
r1193 | if self.setType is None: | ||
|
r986 | setFile += 1 | ||
|
r1193 | file = '%s%4.4d%3.3d%03d%s' % (self.optchar, | ||
|
r986 | timeTuple.tm_year, | ||
timeTuple.tm_yday, | ||||
setFile, | ||||
ext ) | ||||
else: | ||||
setFile = timeTuple.tm_hour*60+timeTuple.tm_min | ||||
|
r1193 | file = '%s%4.4d%3.3d%04d%s' % (self.optchar, | ||
|
r986 | timeTuple.tm_year, | ||
timeTuple.tm_yday, | ||||
setFile, | ||||
ext ) | ||||
|
r848 | |||
filename = os.path.join( path, subfolder, file ) | ||||
#Setting HDF5 File | ||||
fp = h5py.File(filename,'w') | ||||
#write metadata | ||||
|
r897 | self.writeMetadata(fp) | ||
|
r848 | #Write data | ||
grp = fp.create_group("Data") | ||||
ds = [] | ||||
data = [] | ||||
dsList = self.dsList | ||||
i = 0 | ||||
|
r897 | while i < len(dsList): | ||
|
r848 | dsInfo = dsList[i] | ||
#One-dimension data | ||||
if dsInfo['mode'] == 0: | ||||
ds0 = grp.create_dataset(dsInfo['variable'], (1,1), maxshape=(1,self.blocksPerFile) , chunks = True, dtype=numpy.float64) | ||||
ds.append(ds0) | ||||
data.append([]) | ||||
i += 1 | ||||
continue | ||||
elif dsInfo['mode'] == 2: | ||||
grp0 = grp.create_group(dsInfo['variable']) | ||||
ds0 = grp0.create_dataset(dsInfo['dsName'], (1,dsInfo['shape']), data = numpy.zeros((1,dsInfo['shape'])) , maxshape=(None,dsInfo['shape']), chunks=True) | ||||
ds.append(ds0) | ||||
data.append([]) | ||||
i += 1 | ||||
continue | ||||
|
r897 | |||
|
r848 | elif dsInfo['mode'] == 1: | ||
grp0 = grp.create_group(dsInfo['variable']) | ||||
|
r897 | |||
|
r848 | for j in range(dsInfo['dsNumber']): | ||
dsInfo = dsList[i] | ||||
tableName = dsInfo['dsName'] | ||||
r1279 | ||||
|
r897 | |||
if dsInfo['nDim'] == 3: | ||||
|
r1179 | shape = dsInfo['shape'].astype(int) | ||
|
r848 | ds0 = grp0.create_dataset(tableName, (shape[0],shape[1],1) , data = numpy.zeros((shape[0],shape[1],1)), maxshape = (None,shape[1],None), chunks=True) | ||
else: | ||||
|
r1179 | shape = int(dsInfo['shape']) | ||
|
r848 | ds0 = grp0.create_dataset(tableName, (1,shape), data = numpy.zeros((1,shape)) , maxshape=(None,shape), chunks=True) | ||
|
r897 | |||
|
r848 | ds.append(ds0) | ||
data.append([]) | ||||
i += 1 | ||||
|
r897 | |||
|
r848 | fp.flush() | ||
fp.close() | ||||
|
r897 | |||
|
r1193 | log.log('creating file: {}'.format(filename), 'Writing') | ||
|
r848 | self.filename = filename | ||
self.ds = ds | ||||
self.data = data | ||||
self.firsttime = True | ||||
self.blockIndex = 0 | ||||
return | ||||
|
r897 | |||
|
r848 | def putData(self): | ||
|
r853 | if self.blockIndex == self.blocksPerFile or self.timeFlag(): | ||
|
r897 | self.setNextFile() | ||
|
r848 | self.readBlock() | ||
self.setBlock() #Prepare data to be written | ||||
self.writeBlock() #Write data | ||||
|
r897 | |||
|
r848 | return | ||
|
r897 | |||
|
r848 | def readBlock(self): | ||
|
r897 | |||
|
r848 | ''' | ||
data Array configured | ||||
|
r897 | |||
|
r848 | self.data | ||
''' | ||||
dsList = self.dsList | ||||
ds = self.ds | ||||
#Setting HDF5 File | ||||
fp = h5py.File(self.filename,'r+') | ||||
grp = fp["Data"] | ||||
ind = 0 | ||||
|
r897 | |||
while ind < len(dsList): | ||||
|
r848 | dsInfo = dsList[ind] | ||
|
r897 | |||
|
r848 | if dsInfo['mode'] == 0: | ||
ds0 = grp[dsInfo['variable']] | ||||
ds[ind] = ds0 | ||||
ind += 1 | ||||
else: | ||||
|
r897 | |||
|
r848 | grp0 = grp[dsInfo['variable']] | ||
|
r897 | |||
|
r848 | for j in range(dsInfo['dsNumber']): | ||
dsInfo = dsList[ind] | ||||
ds0 = grp0[dsInfo['dsName']] | ||||
ds[ind] = ds0 | ||||
ind += 1 | ||||
|
r897 | |||
|
r848 | self.fp = fp | ||
self.grp = grp | ||||
self.ds = ds | ||||
|
r897 | |||
|
r848 | return | ||
def setBlock(self): | ||||
''' | ||||
data Array configured | ||||
|
r897 | |||
|
r848 | self.data | ||
''' | ||||
#Creating Arrays | ||||
dsList = self.dsList | ||||
data = self.data | ||||
ind = 0 | ||||
|
r897 | |||
while ind < len(dsList): | ||||
|
r848 | dsInfo = dsList[ind] | ||
dataAux = getattr(self.dataOut, dsInfo['variable']) | ||||
|
r897 | |||
|
r848 | mode = dsInfo['mode'] | ||
nDim = dsInfo['nDim'] | ||||
|
r897 | |||
|
r848 | if mode == 0 or mode == 2 or nDim == 1: | ||
data[ind] = dataAux | ||||
|
r897 | ind += 1 | ||
|
r1193 | # elif nDim == 1: | ||
# data[ind] = numpy.reshape(dataAux,(numpy.size(dataAux),1)) | ||||
# ind += 1 | ||||
|
r848 | elif nDim == 2: | ||
for j in range(dsInfo['dsNumber']): | ||||
data[ind] = dataAux[j,:] | ||||
ind += 1 | ||||
elif nDim == 3: | ||||
for j in range(dsInfo['dsNumber']): | ||||
data[ind] = dataAux[:,j,:] | ||||
ind += 1 | ||||
self.data = data | ||||
return | ||||
|
r897 | |||
|
r848 | def writeBlock(self): | ||
''' | ||||
Saves the block in the HDF5 file | ||||
''' | ||||
|
r897 | dsList = self.dsList | ||
|
r848 | for i in range(len(self.ds)): | ||
|
r897 | dsInfo = dsList[i] | ||
|
r848 | nDim = dsInfo['nDim'] | ||
mode = dsInfo['mode'] | ||||
|
r897 | |||
|
r848 | # First time | ||
if self.firsttime: | ||||
if type(self.data[i]) == numpy.ndarray: | ||||
|
r897 | |||
|
r848 | if nDim == 3: | ||
self.data[i] = self.data[i].reshape((self.data[i].shape[0],self.data[i].shape[1],1)) | ||||
self.ds[i].resize(self.data[i].shape) | ||||
if mode == 2: | ||||
self.ds[i].resize(self.data[i].shape) | ||||
|
r897 | self.ds[i][:] = self.data[i] | ||
else: | ||||
|
r848 | # From second time | ||
# Meteors! | ||||
if mode == 2: | ||||
dataShape = self.data[i].shape | ||||
dsShape = self.ds[i].shape | ||||
|
r897 | self.ds[i].resize((self.ds[i].shape[0] + dataShape[0],self.ds[i].shape[1])) | ||
|
r848 | self.ds[i][dsShape[0]:,:] = self.data[i] | ||
# No dimension | ||||
elif mode == 0: | ||||
self.ds[i].resize((self.ds[i].shape[0], self.ds[i].shape[1] + 1)) | ||||
self.ds[i][0,-1] = self.data[i] | ||||
# One dimension | ||||
elif nDim == 1: | ||||
self.ds[i].resize((self.ds[i].shape[0] + 1, self.ds[i].shape[1])) | ||||
|
r897 | self.ds[i][-1,:] = self.data[i] | ||
|
r848 | # Two dimension | ||
elif nDim == 2: | ||||
|
r897 | self.ds[i].resize((self.ds[i].shape[0] + 1,self.ds[i].shape[1])) | ||
|
r848 | self.ds[i][self.blockIndex,:] = self.data[i] | ||
# Three dimensions | ||||
elif nDim == 3: | ||||
|
r897 | self.ds[i].resize((self.ds[i].shape[0],self.ds[i].shape[1],self.ds[i].shape[2]+1)) | ||
|
r848 | self.ds[i][:,:,-1] = self.data[i] | ||
|
r897 | |||
self.firsttime = False | ||||
|
r848 | self.blockIndex += 1 | ||
|
r897 | |||
|
r848 | #Close to save changes | ||
self.fp.flush() | ||||
self.fp.close() | ||||
return | ||||
|
r897 | |||
|
r1193 | def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None): | ||
|
r848 | |||
|
r1224 | self.dataOut = dataOut | ||
|
r848 | if not(self.isConfig): | ||
r1279 | self.setup(dataOut, path=path, blocksPerFile=blocksPerFile, | |||
|
r1193 | metadataList=metadataList, dataList=dataList, mode=mode, | ||
setType=setType) | ||||
|
r897 | |||
|
r848 | self.isConfig = True | ||
self.setNextFile() | ||||
|
r897 | |||
|
r848 | self.putData() | ||
|
r1179 | return | ||
r1279 | ||||
|
r1232 | |||
@MPDecorator | ||||
r1254 | class ParameterReader(Reader, ProcessingUnit): | |||
|
r1232 | ''' | ||
Reads HDF5 format files | ||||
''' | ||||
def __init__(self): | ||||
ProcessingUnit.__init__(self) | ||||
self.dataOut = Parameters() | ||||
r1254 | self.ext = ".hdf5" | |||
self.optchar = "D" | ||||
self.timezone = "lt" | ||||
self.listMetaname = [] | ||||
self.listMeta = [] | ||||
self.listDataname = [] | ||||
self.listData = [] | ||||
self.listShapes = [] | ||||
self.open_file = h5py.File | ||||
self.open_mode = 'r' | ||||
self.metadata = False | ||||
self.filefmt = "*%Y%j***" | ||||
self.folderfmt = "*%Y%j" | ||||
|
r1232 | |||
def setup(self, **kwargs): | ||||
r1254 | self.set_kwargs(**kwargs) | |||
if not self.ext.startswith('.'): | ||||
r1279 | self.ext = '.{}'.format(self.ext) | |||
|
r1232 | |||
r1254 | if self.online: | |||
log.log("Searching files in online mode...", self.name) | ||||
|
r1232 | |||
r1254 | for nTries in range(self.nTries): | |||
fullpath = self.searchFilesOnLine(self.path, self.startDate, | ||||
r1279 | self.endDate, self.expLabel, self.ext, self.walk, | |||
r1254 | self.filefmt, self.folderfmt) | |||
|
r1232 | |||
r1254 | try: | |||
fullpath = next(fullpath) | ||||
except: | ||||
fullpath = None | ||||
r1279 | ||||
r1254 | if fullpath: | |||
break | ||||
log.warning( | ||||
'Waiting {} sec for a valid file in {}: try {} ...'.format( | ||||
r1279 | self.delay, self.path, nTries + 1), | |||
r1254 | self.name) | |||
time.sleep(self.delay) | ||||
if not(fullpath): | ||||
raise schainpy.admin.SchainError( | ||||
r1279 | 'There isn\'t any valid file in {}'.format(self.path)) | |||
r1254 | ||||
pathname, filename = os.path.split(fullpath) | ||||
self.year = int(filename[1:5]) | ||||
self.doy = int(filename[5:8]) | ||||
r1279 | self.set = int(filename[8:11]) - 1 | |||
|
r1232 | else: | ||
r1254 | log.log("Searching files in {}".format(self.path), self.name) | |||
r1279 | self.filenameList = self.searchFilesOffLine(self.path, self.startDate, | |||
r1254 | self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt) | |||
r1279 | ||||
r1254 | self.setNextFile() | |||
|
r1232 | |||
r1254 | return | |||
|
r1232 | |||
r1254 | def readFirstHeader(self): | |||
'''Read metadata and data''' | ||||
|
r1232 | |||
r1279 | self.__readMetadata() | |||
|
r1232 | self.__readData() | ||
r1254 | self.__setBlockList() | |||
|
r1232 | self.blockIndex = 0 | ||
r1279 | ||||
r1254 | return | |||
|
r1232 | |||
def __setBlockList(self): | ||||
''' | ||||
Selects the data within the times defined | ||||
self.fp | ||||
self.startTime | ||||
self.endTime | ||||
self.blockList | ||||
self.blocksPerFile | ||||
''' | ||||
r1254 | ||||
|
r1232 | startTime = self.startTime | ||
endTime = self.endTime | ||||
r1254 | index = self.listDataname.index('utctime') | |||
thisUtcTime = self.listData[index] | ||||
self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1]) | ||||
|
r1232 | |||
if self.timezone == 'lt': | ||||
thisUtcTime -= 5*3600 | ||||
|
r1233 | thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600) | ||
|
r1232 | |||
thisDate = thisDatetime.date() | ||||
thisTime = thisDatetime.time() | ||||
startUtcTime = (datetime.datetime.combine(thisDate,startTime) - datetime.datetime(1970, 1, 1)).total_seconds() | ||||
endUtcTime = (datetime.datetime.combine(thisDate,endTime) - datetime.datetime(1970, 1, 1)).total_seconds() | ||||
ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0] | ||||
self.blockList = ind | ||||
self.blocksPerFile = len(ind) | ||||
return | ||||
def __readMetadata(self): | ||||
''' | ||||
Reads Metadata | ||||
''' | ||||
listMetaname = [] | ||||
listMetadata = [] | ||||
r1254 | if 'Metadata' in self.fp: | |||
gp = self.fp['Metadata'] | ||||
for item in list(gp.items()): | ||||
name = item[0] | ||||
if name=='variables': | ||||
table = gp[name][:] | ||||
listShapes = {} | ||||
for shapes in table: | ||||
listShapes[shapes[0].decode()] = numpy.array([shapes[1]]) | ||||
else: | ||||
data = gp[name].value | ||||
listMetaname.append(name) | ||||
r1279 | listMetadata.append(data) | |||
r1254 | elif self.metadata: | |||
metadata = json.loads(self.metadata) | ||||
listShapes = {} | ||||
for tup in metadata: | ||||
name, values, dim = tup | ||||
if dim == -1: | ||||
listMetaname.append(name) | ||||
listMetadata.append(self.fp[values].value) | ||||
else: | ||||
listShapes[name] = numpy.array([dim]) | ||||
else: | ||||
raise IOError('Missing Metadata group in file or metadata info') | ||||
|
r1232 | |||
self.listShapes = listShapes | ||||
self.listMetaname = listMetaname | ||||
r1279 | self.listMeta = listMetadata | |||
|
r1232 | |||
return | ||||
def __readData(self): | ||||
listdataname = [] | ||||
listdata = [] | ||||
r1279 | ||||
r1254 | if 'Data' in self.fp: | |||
grp = self.fp['Data'] | ||||
for item in list(grp.items()): | ||||
name = item[0] | ||||
listdataname.append(name) | ||||
dim = self.listShapes[name][0] | ||||
if dim == 0: | ||||
array = grp[name].value | ||||
else: | ||||
array = [] | ||||
for i in range(dim): | ||||
array.append(grp[name]['table{:02d}'.format(i)].value) | ||||
array = numpy.array(array) | ||||
r1279 | ||||
r1254 | listdata.append(array) | |||
elif self.metadata: | ||||
metadata = json.loads(self.metadata) | ||||
for tup in metadata: | ||||
name, values, dim = tup | ||||
listdataname.append(name) | ||||
if dim == -1: | ||||
continue | ||||
elif dim == 0: | ||||
array = self.fp[values].value | ||||
else: | ||||
array = [] | ||||
for var in values: | ||||
array.append(self.fp[var].value) | ||||
array = numpy.array(array) | ||||
listdata.append(array) | ||||
else: | ||||
raise IOError('Missing Data group in file or metadata info') | ||||
|
r1232 | |||
self.listDataname = listdataname | ||||
self.listData = listdata | ||||
return | ||||
r1279 | ||||
|
r1232 | def getData(self): | ||
for i in range(len(self.listMeta)): | ||||
setattr(self.dataOut, self.listMetaname[i], self.listMeta[i]) | ||||
for j in range(len(self.listData)): | ||||
dim = self.listShapes[self.listDataname[j]][0] | ||||
if dim == 0: | ||||
setattr(self.dataOut, self.listDataname[j], self.listData[j][self.blockIndex]) | ||||
else: | ||||
setattr(self.dataOut, self.listDataname[j], self.listData[j][:,self.blockIndex]) | ||||
r1254 | self.dataOut.paramInterval = self.interval | |||
|
r1232 | self.dataOut.flagNoData = False | ||
self.blockIndex += 1 | ||||
return | ||||
def run(self, **kwargs): | ||||
if not(self.isConfig): | ||||
self.setup(**kwargs) | ||||
self.isConfig = True | ||||
if self.blockIndex == self.blocksPerFile: | ||||
r1254 | self.setNextFile() | |||
|
r1232 | |||
self.getData() | ||||
return | ||||
@MPDecorator | ||||
class ParameterWriter(Operation): | ||||
''' | ||||
HDF5 Writer, stores parameters data in HDF5 format files | ||||
path: path where the files will be stored | ||||
blocksPerFile: number of blocks that will be saved in per HDF5 format file | ||||
mode: selects the data stacking mode: '0' channels, '1' parameters, '3' table (for meteors) | ||||
metadataList: list of attributes that will be stored as metadata | ||||
dataList: list of attributes that will be stores as data | ||||
''' | ||||
ext = ".hdf5" | ||||
optchar = "D" | ||||
metaoptchar = "M" | ||||
metaFile = None | ||||
filename = None | ||||
path = None | ||||
setFile = None | ||||
fp = None | ||||
grp = None | ||||
ds = None | ||||
firsttime = True | ||||
#Configurations | ||||
blocksPerFile = None | ||||
blockIndex = None | ||||
dataOut = None | ||||
#Data Arrays | ||||
dataList = None | ||||
metadataList = None | ||||
dsList = None #List of dictionaries with dataset properties | ||||
tableDim = None | ||||
dtype = [('name', 'S20'),('nDim', 'i')] | ||||
currentDay = None | ||||
lastTime = None | ||||
def __init__(self): | ||||
r1279 | ||||
|
r1232 | Operation.__init__(self) | ||
return | ||||
def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None): | ||||
self.path = path | ||||
self.blocksPerFile = blocksPerFile | ||||
self.metadataList = metadataList | ||||
self.dataList = dataList | ||||
self.setType = setType | ||||
tableList = [] | ||||
dsList = [] | ||||
for i in range(len(self.dataList)): | ||||
dsDict = {} | ||||
dataAux = getattr(self.dataOut, self.dataList[i]) | ||||
dsDict['variable'] = self.dataList[i] | ||||
if dataAux is None: | ||||
continue | ||||
elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)): | ||||
dsDict['nDim'] = 0 | ||||
else: | ||||
dsDict['nDim'] = len(dataAux.shape) | ||||
dsDict['shape'] = dataAux.shape | ||||
dsDict['dsNumber'] = dataAux.shape[0] | ||||
r1279 | ||||
|
r1232 | dsList.append(dsDict) | ||
tableList.append((self.dataList[i], dsDict['nDim'])) | ||||
self.dsList = dsList | ||||
self.tableDim = numpy.array(tableList, dtype=self.dtype) | ||||
self.currentDay = self.dataOut.datatime.date() | ||||
def timeFlag(self): | ||||
currentTime = self.dataOut.utctime | ||||
timeTuple = time.localtime(currentTime) | ||||
dataDay = timeTuple.tm_yday | ||||
if self.lastTime is None: | ||||
self.lastTime = currentTime | ||||
self.currentDay = dataDay | ||||
return False | ||||
r1279 | ||||
|
r1232 | timeDiff = currentTime - self.lastTime | ||
#Si el dia es diferente o si la diferencia entre un dato y otro supera la hora | ||||
if dataDay != self.currentDay: | ||||
self.currentDay = dataDay | ||||
return True | ||||
elif timeDiff > 3*60*60: | ||||
self.lastTime = currentTime | ||||
return True | ||||
else: | ||||
self.lastTime = currentTime | ||||
return False | ||||
def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, setType=None): | ||||
self.dataOut = dataOut | ||||
if not(self.isConfig): | ||||
r1279 | self.setup(path=path, blocksPerFile=blocksPerFile, | |||
|
r1232 | metadataList=metadataList, dataList=dataList, | ||
setType=setType) | ||||
self.isConfig = True | ||||
self.setNextFile() | ||||
self.putData() | ||||
return | ||||
r1279 | ||||
|
r1232 | def setNextFile(self): | ||
r1279 | ||||
|
r1232 | ext = self.ext | ||
path = self.path | ||||
setFile = self.setFile | ||||
timeTuple = time.localtime(self.dataOut.utctime) | ||||
subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday) | ||||
fullpath = os.path.join(path, subfolder) | ||||
if os.path.exists(fullpath): | ||||
filesList = os.listdir(fullpath) | ||||
filesList = [k for k in filesList if k.startswith(self.optchar)] | ||||
if len( filesList ) > 0: | ||||
filesList = sorted(filesList, key=str.lower) | ||||
filen = filesList[-1] | ||||
# el filename debera tener el siguiente formato | ||||
# 0 1234 567 89A BCDE (hex) | ||||
# x YYYY DDD SSS .ext | ||||
if isNumber(filen[8:11]): | ||||
setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file | ||||
else: | ||||
setFile = -1 | ||||
else: | ||||
setFile = -1 #inicializo mi contador de seteo | ||||
else: | ||||
os.makedirs(fullpath) | ||||
setFile = -1 #inicializo mi contador de seteo | ||||
if self.setType is None: | ||||
setFile += 1 | ||||
file = '%s%4.4d%3.3d%03d%s' % (self.optchar, | ||||
timeTuple.tm_year, | ||||
timeTuple.tm_yday, | ||||
setFile, | ||||
ext ) | ||||
else: | ||||
setFile = timeTuple.tm_hour*60+timeTuple.tm_min | ||||
file = '%s%4.4d%3.3d%04d%s' % (self.optchar, | ||||
timeTuple.tm_year, | ||||
timeTuple.tm_yday, | ||||
setFile, | ||||
ext ) | ||||
self.filename = os.path.join( path, subfolder, file ) | ||||
#Setting HDF5 File | ||||
self.fp = h5py.File(self.filename, 'w') | ||||
#write metadata | ||||
self.writeMetadata(self.fp) | ||||
#Write data | ||||
self.writeData(self.fp) | ||||
def writeMetadata(self, fp): | ||||
grp = fp.create_group("Metadata") | ||||
grp.create_dataset('variables', data=self.tableDim, dtype=self.dtype) | ||||
for i in range(len(self.metadataList)): | ||||
if not hasattr(self.dataOut, self.metadataList[i]): | ||||
log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name) | ||||
continue | ||||
value = getattr(self.dataOut, self.metadataList[i]) | ||||
grp.create_dataset(self.metadataList[i], data=value) | ||||
return | ||||
def writeData(self, fp): | ||||
r1279 | ||||
|
r1232 | grp = fp.create_group("Data") | ||
dtsets = [] | ||||
data = [] | ||||
r1279 | ||||
|
r1232 | for dsInfo in self.dsList: | ||
if dsInfo['nDim'] == 0: | ||||
ds = grp.create_dataset( | ||||
r1279 | dsInfo['variable'], | |||
|
r1232 | (self.blocksPerFile, ), | ||
r1279 | chunks=True, | |||
|
r1232 | dtype=numpy.float64) | ||
dtsets.append(ds) | ||||
data.append((dsInfo['variable'], -1)) | ||||
else: | ||||
sgrp = grp.create_group(dsInfo['variable']) | ||||
for i in range(dsInfo['dsNumber']): | ||||
ds = sgrp.create_dataset( | ||||
r1279 | 'table{:02d}'.format(i), | |||
|
r1232 | (self.blocksPerFile, ) + dsInfo['shape'][1:], | ||
chunks=True) | ||||
dtsets.append(ds) | ||||
data.append((dsInfo['variable'], i)) | ||||
fp.flush() | ||||
|
r1235 | log.log('Creating file: {}'.format(fp.filename), self.name) | ||
r1279 | ||||
|
r1232 | self.ds = dtsets | ||
self.data = data | ||||
self.firsttime = True | ||||
self.blockIndex = 0 | ||||
return | ||||
def putData(self): | ||||
if (self.blockIndex == self.blocksPerFile) or self.timeFlag(): | ||||
self.closeFile() | ||||
self.setNextFile() | ||||
for i, ds in enumerate(self.ds): | ||||
attr, ch = self.data[i] | ||||
if ch == -1: | ||||
ds[self.blockIndex] = getattr(self.dataOut, attr) | ||||
else: | ||||
ds[self.blockIndex] = getattr(self.dataOut, attr)[ch] | ||||
self.fp.flush() | ||||
self.blockIndex += 1 | ||||
|
r1235 | log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name) | ||
|
r1232 | |||
return | ||||
def closeFile(self): | ||||
if self.blockIndex != self.blocksPerFile: | ||||
for ds in self.ds: | ||||
ds.resize(self.blockIndex, axis=0) | ||||
self.fp.flush() | ||||
self.fp.close() | ||||
def close(self): | ||||
self.closeFile() | ||||