jroIO_digitalRF.py
751 lines
| 27.1 KiB
| text/x-python
|
PythonLexer
|
r979 | |||
|
r973 | ''' | ||
Created on Jul 3, 2014 | ||||
@author: roj-idl71 | ||||
''' | ||||
|
r991 | # SUBCHANNELS EN VEZ DE CHANNELS | ||
# BENCHMARKS -> PROBLEMAS CON ARCHIVOS GRANDES -> INCONSTANTE EN EL TIEMPO | ||||
# ACTUALIZACION DE VERSION | ||||
# HEADERS | ||||
# MODULO DE ESCRITURA | ||||
# METADATA | ||||
|
r973 | import os | ||
import datetime | ||||
import numpy | ||||
|
r985 | import timeit | ||
|
r981 | from profilehooks import coverage, profile | ||
|
r979 | from fractions import Fraction | ||
|
r973 | |||
try: | ||||
from gevent import sleep | ||||
except: | ||||
from time import sleep | ||||
from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader | ||||
from schainpy.model.data.jrodata import Voltage | ||||
from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation | ||||
|
r981 | from time import time | ||
|
r985 | |||
|
r981 | import cPickle | ||
|
r973 | try: | ||
import digital_rf | ||||
except: | ||||
print 'You should install "digital_rf" module if you want to read Digital RF data' | ||||
class DigitalRFReader(ProcessingUnit): | ||||
''' | ||||
classdocs | ||||
''' | ||||
def __init__(self, **kwargs): | ||||
''' | ||||
Constructor | ||||
''' | ||||
ProcessingUnit.__init__(self, **kwargs) | ||||
self.dataOut = Voltage() | ||||
self.__printInfo = True | ||||
self.__flagDiscontinuousBlock = False | ||||
self.__bufferIndex = 9999999 | ||||
self.__ippKm = None | ||||
self.__codeType = 0 | ||||
self.__nCode = None | ||||
self.__nBaud = None | ||||
self.__code = None | ||||
|
r1054 | self.dtype = None | ||
|
r973 | |||
|
r991 | def close(self): | ||
print 'Average of writing to digital rf format is ', self.oldAverage * 1000 | ||||
return | ||||
|
r973 | def __getCurrentSecond(self): | ||
return self.__thisUnixSample/self.__sample_rate | ||||
thisSecond = property(__getCurrentSecond, "I'm the 'thisSecond' property.") | ||||
def __setFileHeader(self): | ||||
''' | ||||
In this method will be initialized every parameter of dataOut object (header, no data) | ||||
''' | ||||
ippSeconds = 1.0*self.__nSamples/self.__sample_rate | ||||
|
r981 | nProfiles = 1.0/ippSeconds # Number of profiles in one second | ||
|
r1054 | |||
try: | ||||
self.dataOut.radarControllerHeaderObj = RadarControllerHeader(self.__radarControllerHeader) | ||||
except: | ||||
self.dataOut.radarControllerHeaderObj = RadarControllerHeader( | ||||
txA=0, | ||||
txB=0, | ||||
nWindows=1, | ||||
nHeights=self.__nSamples, | ||||
firstHeight=self.__firstHeigth, | ||||
deltaHeight=self.__deltaHeigth, | ||||
codeType=self.__codeType, | ||||
nCode=self.__nCode, nBaud=self.__nBaud, | ||||
code = self.__code) | ||||
try: | ||||
self.dataOut.systemHeaderObj = SystemHeader(self.__systemHeader) | ||||
except: | ||||
self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples, | ||||
nProfiles=nProfiles, | ||||
nChannels=len(self.__channelList), | ||||
adcResolution=14) | ||||
|
r973 | self.dataOut.type = "Voltage" | ||
|
r1054 | |||
|
r973 | self.dataOut.data = None | ||
|
r981 | self.dataOut.dtype = self.dtype | ||
|
r973 | |||
|
r981 | # self.dataOut.nChannels = 0 | ||
|
r973 | |||
|
r981 | # self.dataOut.nHeights = 0 | ||
|
r973 | |||
|
r1069 | self.dataOut.nProfiles = int(nProfiles) | ||
|
r973 | |||
self.dataOut.heightList = self.__firstHeigth + numpy.arange(self.__nSamples, dtype = numpy.float)*self.__deltaHeigth | ||||
|
r985 | self.dataOut.channelList = range(self.__num_subchannels) | ||
|
r973 | |||
self.dataOut.blocksize = self.dataOut.getNChannels() * self.dataOut.getNHeights() | ||||
|
r981 | # self.dataOut.channelIndexList = None | ||
|
r973 | |||
self.dataOut.flagNoData = True | ||||
|
r982 | |||
|
r981 | self.dataOut.flagDataAsBlock = False | ||
# Set to TRUE if the data is discontinuous | ||||
|
r973 | self.dataOut.flagDiscontinuousBlock = False | ||
self.dataOut.utctime = None | ||||
|
r981 | self.dataOut.timeZone = self.__timezone/60 # timezone like jroheader, difference in minutes between UTC and localtime | ||
|
r973 | |||
self.dataOut.dstFlag = 0 | ||||
self.dataOut.errorCount = 0 | ||||
|
r1054 | try: | ||
self.dataOut.nCohInt = self.fixed_metadata_dict.get('nCohInt', 1) | ||||
|
r982 | |||
|
r1054 | self.dataOut.flagDecodeData = self.fixed_metadata_dict['flagDecodeData'] # asumo que la data esta decodificada | ||
|
r973 | |||
|
r1054 | self.dataOut.flagDeflipData = self.fixed_metadata_dict['flagDeflipData'] # asumo que la data esta sin flip | ||
|
r973 | |||
|
r1054 | self.dataOut.flagShiftFFT = self.fixed_metadata_dict['flagShiftFFT'] | ||
|
r973 | |||
|
r1054 | self.dataOut.useLocalTime = self.fixed_metadata_dict['useLocalTime'] | ||
except: | ||||
pass | ||||
|
r973 | |||
self.dataOut.ippSeconds = ippSeconds | ||||
|
r981 | # Time interval between profiles | ||
# self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt | ||||
|
r973 | |||
self.dataOut.frequency = self.__frequency | ||||
self.dataOut.realtime = self.__online | ||||
def findDatafiles(self, path, startDate=None, endDate=None): | ||||
if not os.path.isdir(path): | ||||
return [] | ||||
try: | ||||
digitalReadObj = digital_rf.DigitalRFReader(path, load_all_metadata=True) | ||||
except: | ||||
digitalReadObj = digital_rf.DigitalRFReader(path) | ||||
channelNameList = digitalReadObj.get_channels() | ||||
if not channelNameList: | ||||
return [] | ||||
metadata_dict = digitalReadObj.get_rf_file_metadata(channelNameList[0]) | ||||
sample_rate = metadata_dict['sample_rate'][0] | ||||
this_metadata_file = digitalReadObj.get_metadata(channelNameList[0]) | ||||
try: | ||||
timezone = this_metadata_file['timezone'].value | ||||
except: | ||||
timezone = 0 | ||||
startUTCSecond, endUTCSecond = digitalReadObj.get_bounds(channelNameList[0])/sample_rate - timezone | ||||
startDatetime = datetime.datetime.utcfromtimestamp(startUTCSecond) | ||||
endDatatime = datetime.datetime.utcfromtimestamp(endUTCSecond) | ||||
if not startDate: | ||||
startDate = startDatetime.date() | ||||
if not endDate: | ||||
endDate = endDatatime.date() | ||||
dateList = [] | ||||
thisDatetime = startDatetime | ||||
while(thisDatetime<=endDatatime): | ||||
thisDate = thisDatetime.date() | ||||
if thisDate < startDate: | ||||
continue | ||||
if thisDate > endDate: | ||||
break | ||||
dateList.append(thisDate) | ||||
thisDatetime += datetime.timedelta(1) | ||||
return dateList | ||||
def setup(self, path = None, | ||||
startDate = None, | ||||
endDate = None, | ||||
startTime = datetime.time(0,0,0), | ||||
endTime = datetime.time(23,59,59), | ||||
channelList = None, | ||||
nSamples = None, | ||||
online = False, | ||||
delay = 60, | ||||
buffer_size = 1024, | ||||
|
r982 | ippKm=None, | ||
|
r973 | **kwargs): | ||
''' | ||||
In this method we should set all initial parameters. | ||||
Inputs: | ||||
path | ||||
startDate | ||||
endDate | ||||
startTime | ||||
endTime | ||||
set | ||||
expLabel | ||||
ext | ||||
online | ||||
delay | ||||
''' | ||||
|
r998 | self.i = 0 | ||
|
r973 | if not os.path.isdir(path): | ||
raise ValueError, "[Reading] Directory %s does not exist" %path | ||||
try: | ||||
self.digitalReadObj = digital_rf.DigitalRFReader(path, load_all_metadata=True) | ||||
except: | ||||
self.digitalReadObj = digital_rf.DigitalRFReader(path) | ||||
channelNameList = self.digitalReadObj.get_channels() | ||||
if not channelNameList: | ||||
raise ValueError, "[Reading] Directory %s does not have any files" %path | ||||
if not channelList: | ||||
channelList = range(len(channelNameList)) | ||||
|
r981 | |||
|
r973 | ########## Reading metadata ###################### | ||
|
r981 | top_properties = self.digitalReadObj.get_properties(channelNameList[channelList[0]]) | ||
|
r973 | |||
|
r985 | |||
self.__num_subchannels = top_properties['num_subchannels'] | ||||
|
r981 | self.__sample_rate = 1.0 * top_properties['sample_rate_numerator'] / top_properties['sample_rate_denominator'] | ||
# self.__samples_per_file = top_properties['samples_per_file'][0] | ||||
|
r973 | self.__deltaHeigth = 1e6*0.15/self.__sample_rate ## why 0.15? | ||
this_metadata_file = self.digitalReadObj.get_digital_metadata(channelNameList[channelList[0]]) | ||||
metadata_bounds = this_metadata_file.get_bounds() | ||||
|
r981 | self.fixed_metadata_dict = this_metadata_file.read(metadata_bounds[0])[metadata_bounds[0]] ## GET FIRST HEADER | ||
|
r1054 | |||
try: | ||||
self.__processingHeader = self.fixed_metadata_dict['processingHeader'] | ||||
self.__radarControllerHeader = self.fixed_metadata_dict['radarControllerHeader'] | ||||
self.__systemHeader = self.fixed_metadata_dict['systemHeader'] | ||||
self.dtype = cPickle.loads(self.fixed_metadata_dict['dtype']) | ||||
except: | ||||
pass | ||||
|
r973 | |||
self.__frequency = None | ||||
|
r981 | |||
|
r1054 | self.__frequency = self.fixed_metadata_dict.get('frequency', 1) | ||
|
r973 | |||
|
r1054 | self.__timezone = self.fixed_metadata_dict.get('timezone', 300) | ||
|
r973 | |||
|
r982 | |||
|
r981 | try: | ||
|
r982 | nSamples = self.fixed_metadata_dict['nSamples'] | ||
|
r981 | except: | ||
nSamples = None | ||||
|
r982 | |||
|
r973 | self.__firstHeigth = 0 | ||
try: | ||||
|
r981 | codeType = self.__radarControllerHeader['codeType'] | ||
|
r973 | except: | ||
codeType = 0 | ||||
nCode = 1 | ||||
nBaud = 1 | ||||
code = numpy.ones((nCode, nBaud), dtype=numpy.int) | ||||
|
r1054 | try: | ||
if codeType: | ||||
nCode = self.__radarControllerHeader['nCode'] | ||||
nBaud = self.__radarControllerHeader['nBaud'] | ||||
code = self.__radarControllerHeader['code'] | ||||
except: | ||||
pass | ||||
|
r973 | if not ippKm: | ||
try: | ||||
|
r981 | # seconds to km | ||
|
r982 | ippKm = self.__radarControllerHeader['ipp'] | ||
|
r973 | except: | ||
ippKm = None | ||||
#################################################### | ||||
|
r1054 | self.__ippKm = ippKm | ||
|
r973 | startUTCSecond = None | ||
endUTCSecond = None | ||||
if startDate: | ||||
startDatetime = datetime.datetime.combine(startDate, startTime) | ||||
startUTCSecond = (startDatetime-datetime.datetime(1970,1,1)).total_seconds() + self.__timezone | ||||
if endDate: | ||||
endDatetime = datetime.datetime.combine(endDate, endTime) | ||||
endUTCSecond = (endDatetime-datetime.datetime(1970,1,1)).total_seconds() + self.__timezone | ||||
start_index, end_index = self.digitalReadObj.get_bounds(channelNameList[channelList[0]]) | ||||
if not startUTCSecond: | ||||
startUTCSecond = start_index/self.__sample_rate | ||||
if start_index > startUTCSecond*self.__sample_rate: | ||||
startUTCSecond = start_index/self.__sample_rate | ||||
if not endUTCSecond: | ||||
endUTCSecond = end_index/self.__sample_rate | ||||
if end_index < endUTCSecond*self.__sample_rate: | ||||
endUTCSecond = end_index/self.__sample_rate | ||||
if not nSamples: | ||||
if not ippKm: | ||||
raise ValueError, "[Reading] nSamples or ippKm should be defined" | ||||
nSamples = int(ippKm / (1e6*0.15/self.__sample_rate)) | ||||
channelBoundList = [] | ||||
channelNameListFiltered = [] | ||||
for thisIndexChannel in channelList: | ||||
thisChannelName = channelNameList[thisIndexChannel] | ||||
start_index, end_index = self.digitalReadObj.get_bounds(thisChannelName) | ||||
channelBoundList.append((start_index, end_index)) | ||||
channelNameListFiltered.append(thisChannelName) | ||||
self.profileIndex = 0 | ||||
|
r981 | self.i= 0 | ||
|
r973 | self.__delay = delay | ||
|
r1054 | |||
|
r973 | self.__codeType = codeType | ||
self.__nCode = nCode | ||||
self.__nBaud = nBaud | ||||
self.__code = code | ||||
self.__datapath = path | ||||
self.__online = online | ||||
self.__channelList = channelList | ||||
self.__channelNameList = channelNameListFiltered | ||||
self.__channelBoundList = channelBoundList | ||||
self.__nSamples = nSamples | ||||
|
r981 | self.__samples_to_read = long(nSamples) # FIJO: AHORA 40 | ||
|
r973 | self.__nChannels = len(self.__channelList) | ||
self.__startUTCSecond = startUTCSecond | ||||
self.__endUTCSecond = endUTCSecond | ||||
|
r981 | self.__timeInterval = 1.0 * self.__samples_to_read/self.__sample_rate # Time interval | ||
|
r973 | |||
if online: | ||||
|
r981 | # self.__thisUnixSample = int(endUTCSecond*self.__sample_rate - 4*self.__samples_to_read) | ||
|
r973 | startUTCSecond = numpy.floor(endUTCSecond) | ||
|
r981 | self.__thisUnixSample = long(startUTCSecond*self.__sample_rate) - self.__samples_to_read ## por que en el otro metodo lo primero q se hace es sumar samplestoread | ||
|
r973 | |||
|
r985 | self.__data_buffer = numpy.zeros((self.__num_subchannels, self.__samples_to_read), dtype = numpy.complex) | ||
|
r973 | |||
self.__setFileHeader() | ||||
self.isConfig = True | ||||
print "[Reading] Digital RF Data was found from %s to %s " %( | ||||
datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone), | ||||
datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone) | ||||
) | ||||
print "[Reading] Starting process from %s to %s" %(datetime.datetime.utcfromtimestamp(startUTCSecond - self.__timezone), | ||||
datetime.datetime.utcfromtimestamp(endUTCSecond - self.__timezone) | ||||
) | ||||
|
r991 | self.oldAverage = None | ||
self.count = 0 | ||||
self.executionTime = 0 | ||||
|
r973 | def __reload(self): | ||
# print "%s not in range [%s, %s]" %( | ||||
# datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), | ||||
# datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone), | ||||
# datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone) | ||||
# ) | ||||
print "[Reading] reloading metadata ..." | ||||
try: | ||||
self.digitalReadObj.reload(complete_update=True) | ||||
except: | ||||
self.digitalReadObj.reload() | ||||
start_index, end_index = self.digitalReadObj.get_bounds(self.__channelNameList[self.__channelList[0]]) | ||||
if start_index > self.__startUTCSecond*self.__sample_rate: | ||||
self.__startUTCSecond = 1.0*start_index/self.__sample_rate | ||||
if end_index > self.__endUTCSecond*self.__sample_rate: | ||||
self.__endUTCSecond = 1.0*end_index/self.__sample_rate | ||||
print "[Reading] New timerange found [%s, %s] " %( | ||||
datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone), | ||||
datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone) | ||||
) | ||||
return True | ||||
return False | ||||
|
r991 | def timeit(self, toExecute): | ||
t0 = time() | ||||
toExecute() | ||||
self.executionTime = time() - t0 | ||||
if self.oldAverage is None: self.oldAverage = self.executionTime | ||||
self.oldAverage = (self.executionTime + self.count*self.oldAverage) / (self.count + 1.0) | ||||
self.count = self.count + 1.0 | ||||
return | ||||
|
r980 | def __readNextBlock(self, seconds=30, volt_scale = 1): | ||
|
r973 | ''' | ||
''' | ||||
|
r981 | # Set the next data | ||
|
r973 | self.__flagDiscontinuousBlock = False | ||
self.__thisUnixSample += self.__samples_to_read | ||||
if self.__thisUnixSample + 2*self.__samples_to_read > self.__endUTCSecond*self.__sample_rate: | ||||
print "[Reading] There are no more data into selected time-range" | ||||
if self.__online: | ||||
self.__reload() | ||||
else: | ||||
return False | ||||
if self.__thisUnixSample + 2*self.__samples_to_read > self.__endUTCSecond*self.__sample_rate: | ||||
return False | ||||
self.__thisUnixSample -= self.__samples_to_read | ||||
indexChannel = 0 | ||||
dataOk = False | ||||
for thisChannelName in self.__channelNameList: ##TODO VARIOS CHANNELS? | ||||
|
r985 | for indexSubchannel in range(self.__num_subchannels): | ||
try: | ||||
|
r991 | t0 = time() | ||
|
r985 | result = self.digitalReadObj.read_vector_c81d(self.__thisUnixSample, | ||
|
r991 | self.__samples_to_read, | ||
thisChannelName, sub_channel=indexSubchannel) | ||||
self.executionTime = time() - t0 | ||||
if self.oldAverage is None: self.oldAverage = self.executionTime | ||||
self.oldAverage = (self.executionTime + self.count*self.oldAverage) / (self.count + 1.0) | ||||
self.count = self.count + 1.0 | ||||
|
r985 | except IOError, e: | ||
#read next profile | ||||
self.__flagDiscontinuousBlock = True | ||||
print "[Reading] %s" %datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), e | ||||
break | ||||
|
r973 | |||
|
r985 | if result.shape[0] != self.__samples_to_read: | ||
self.__flagDiscontinuousBlock = True | ||||
print "[Reading] %s: Too few samples were found, just %d/%d samples" %(datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), | ||||
result.shape[0], | ||||
self.__samples_to_read) | ||||
break | ||||
|
r973 | |||
|
r985 | self.__data_buffer[indexSubchannel,:] = result*volt_scale | ||
|
r973 | |||
|
r985 | indexChannel += 1 | ||
|
r973 | |||
|
r985 | dataOk = True | ||
|
r981 | |||
|
r973 | self.__utctime = self.__thisUnixSample/self.__sample_rate | ||
if not dataOk: | ||||
return False | ||||
print "[Reading] %s: %d samples <> %f sec" %(datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), | ||||
self.__samples_to_read, | ||||
self.__timeInterval) | ||||
self.__bufferIndex = 0 | ||||
return True | ||||
def __isBufferEmpty(self): | ||||
return self.__bufferIndex > self.__samples_to_read - self.__nSamples #40960 - 40 | ||||
def getData(self, seconds=30, nTries=5): | ||||
''' | ||||
This method gets the data from files and put the data into the dataOut object | ||||
In addition, increase el the buffer counter in one. | ||||
Return: | ||||
data : retorna un perfil de voltages (alturas * canales) copiados desde el | ||||
buffer. Si no hay mas archivos a leer retorna None. | ||||
Affected: | ||||
self.dataOut | ||||
self.profileIndex | ||||
self.flagDiscontinuousBlock | ||||
self.flagIsNewBlock | ||||
''' | ||||
err_counter = 0 | ||||
self.dataOut.flagNoData = True | ||||
if self.__isBufferEmpty(): | ||||
self.__flagDiscontinuousBlock = False | ||||
while True: | ||||
if self.__readNextBlock(): | ||||
break | ||||
if self.__thisUnixSample > self.__endUTCSecond*self.__sample_rate: | ||||
return False | ||||
if self.__flagDiscontinuousBlock: | ||||
print '[Reading] discontinuous block found ... continue with the next block' | ||||
continue | ||||
if not self.__online: | ||||
return False | ||||
err_counter += 1 | ||||
if err_counter > nTries: | ||||
return False | ||||
print '[Reading] waiting %d seconds to read a new block' %seconds | ||||
sleep(seconds) | ||||
self.dataOut.data = self.__data_buffer[:,self.__bufferIndex:self.__bufferIndex+self.__nSamples] | ||||
self.dataOut.utctime = (self.__thisUnixSample + self.__bufferIndex)/self.__sample_rate | ||||
self.dataOut.flagNoData = False | ||||
self.dataOut.flagDiscontinuousBlock = self.__flagDiscontinuousBlock | ||||
self.dataOut.profileIndex = self.profileIndex | ||||
self.__bufferIndex += self.__nSamples | ||||
self.profileIndex += 1 | ||||
if self.profileIndex == self.dataOut.nProfiles: | ||||
self.profileIndex = 0 | ||||
return True | ||||
def printInfo(self): | ||||
''' | ||||
''' | ||||
if self.__printInfo == False: | ||||
return | ||||
|
r981 | # self.systemHeaderObj.printInfo() | ||
# self.radarControllerHeaderObj.printInfo() | ||||
|
r973 | |||
self.__printInfo = False | ||||
def printNumberOfBlock(self): | ||||
''' | ||||
''' | ||||
return | ||||
|
r981 | # print self.profileIndex | ||
|
r973 | |||
|
r998 | |||
|
r973 | def run(self, **kwargs): | ||
''' | ||||
This method will be called many times so here you should put all your code | ||||
''' | ||||
|
r981 | |||
|
r973 | if not self.isConfig: | ||
self.setup(**kwargs) | ||||
|
r998 | #self.i = self.i+1 | ||
|
r973 | self.getData(seconds=self.__delay) | ||
return | ||||
class DigitalRFWriter(Operation): | ||||
''' | ||||
classdocs | ||||
''' | ||||
def __init__(self, **kwargs): | ||||
''' | ||||
Constructor | ||||
''' | ||||
Operation.__init__(self, **kwargs) | ||||
|
r981 | self.metadata_dict = {} | ||
|
r991 | self.dataOut = None | ||
|
r1054 | self.dtype = None | ||
|
r991 | |||
def setHeader(self): | ||||
self.metadata_dict['frequency'] = self.dataOut.frequency | ||||
self.metadata_dict['timezone'] = self.dataOut.timeZone | ||||
self.metadata_dict['dtype'] = cPickle.dumps(self.dataOut.dtype) | ||||
self.metadata_dict['nProfiles'] = self.dataOut.nProfiles | ||||
self.metadata_dict['heightList'] = self.dataOut.heightList | ||||
self.metadata_dict['channelList'] = self.dataOut.channelList | ||||
self.metadata_dict['flagDecodeData'] = self.dataOut.flagDecodeData | ||||
self.metadata_dict['flagDeflipData'] = self.dataOut.flagDeflipData | ||||
self.metadata_dict['flagShiftFFT'] = self.dataOut.flagShiftFFT | ||||
self.metadata_dict['flagDataAsBlock'] = self.dataOut.flagDataAsBlock | ||||
self.metadata_dict['useLocalTime'] = self.dataOut.useLocalTime | ||||
self.metadata_dict['nCohInt'] = self.dataOut.nCohInt | ||||
|
r982 | return | ||
|
r991 | def setup(self, dataOut, path, frequency, fileCadence, dirCadence, metadataCadence, set=0, metadataFile='metadata', ext='.h5'): | ||
|
r973 | ''' | ||
In this method we should set all initial parameters. | ||||
Input: | ||||
|
r981 | dataOut: Input data will also be outputa data | ||
|
r973 | ''' | ||
|
r991 | self.setHeader() | ||
|
r980 | self.__ippSeconds = dataOut.ippSeconds | ||
self.__deltaH = dataOut.getDeltaH() | ||||
|
r979 | self.__sample_rate = 1e6*0.15/self.__deltaH | ||
|
r980 | self.__dtype = dataOut.dtype | ||
if len(dataOut.dtype) == 2: | ||||
self.__dtype = dataOut.dtype[0] | ||||
self.__nSamples = dataOut.systemHeaderObj.nSamples | ||||
self.__nProfiles = dataOut.nProfiles | ||||
self.__blocks_per_file = dataOut.processingHeaderObj.dataBlocksPerFile | ||||
|
r998 | |||
self.arr_data = arr_data = numpy.ones((self.__nSamples, len(self.dataOut.channelList)), dtype=[('r', self.__dtype), ('i', self.__dtype)]) | ||||
|
r980 | |||
|
r981 | file_cadence_millisecs = long(1.0 * self.__blocks_per_file * self.__nProfiles * self.__nSamples / self.__sample_rate) * 1000 | ||
sub_cadence_secs = file_cadence_millisecs / 500 | ||||
|
r980 | |||
|
r979 | sample_rate_fraction = Fraction(self.__sample_rate).limit_denominator() | ||
sample_rate_numerator = long(sample_rate_fraction.numerator) | ||||
sample_rate_denominator = long(sample_rate_fraction.denominator) | ||||
|
r980 | start_global_index = dataOut.utctime * self.__sample_rate | ||
|
r979 | uuid = 'prueba' | ||
compression_level = 1 | ||||
checksum = False | ||||
is_complex = True | ||||
|
r985 | num_subchannels = len(dataOut.channelList) | ||
|
r979 | is_continuous = True | ||
marching_periods = False | ||||
|
r991 | self.digitalWriteObj = digital_rf.DigitalRFWriter(path, self.__dtype, dirCadence, | ||
fileCadence, start_global_index, | ||||
|
r979 | sample_rate_numerator, sample_rate_denominator, uuid, compression_level, checksum, | ||
is_complex, num_subchannels, is_continuous, marching_periods) | ||||
|
r980 | |||
metadata_dir = os.path.join(path, 'metadata') | ||||
os.system('mkdir %s' % (metadata_dir)) | ||||
|
r991 | self.digitalMetadataWriteObj = digital_rf.DigitalMetadataWriter(metadata_dir, dirCadence, 1, ##236, file_cadence_millisecs / 1000 | ||
|
r980 | sample_rate_numerator, sample_rate_denominator, | ||
metadataFile) | ||||
|
r973 | |||
self.isConfig = True | ||||
|
r980 | self.currentSample = 0 | ||
|
r985 | self.oldAverage = 0 | ||
self.count = 0 | ||||
|
r973 | return | ||
|
r981 | |||
def writeMetadata(self): | ||||
print '[Writing] - Writing metadata' | ||||
start_idx = self.__sample_rate * self.dataOut.utctime | ||||
self.metadata_dict['processingHeader'] = self.dataOut.processingHeaderObj.getAsDict() | ||||
self.metadata_dict['radarControllerHeader'] = self.dataOut.radarControllerHeaderObj.getAsDict() | ||||
self.metadata_dict['systemHeader'] = self.dataOut.systemHeaderObj.getAsDict() | ||||
self.digitalMetadataWriteObj.write(start_idx, self.metadata_dict) | ||||
return | ||||
|
r973 | |||
|
r981 | |||
|
r991 | def timeit(self, toExecute): | ||
|
r985 | t0 = time() | ||
|
r991 | toExecute() | ||
|
r985 | self.executionTime = time() - t0 | ||
if self.oldAverage is None: self.oldAverage = self.executionTime | ||||
self.oldAverage = (self.executionTime + self.count*self.oldAverage) / (self.count + 1.0) | ||||
self.count = self.count + 1.0 | ||||
|
r991 | return | ||
def writeData(self): | ||||
for i in range(self.dataOut.systemHeaderObj.nSamples): | ||||
for channel in self.dataOut.channelList: | ||||
self.arr_data[i][channel]['r'] = self.dataOut.data[channel][i].real | ||||
self.arr_data[i][channel]['i'] = self.dataOut.data[channel][i].imag | ||||
def f(): return self.digitalWriteObj.rf_write(self.arr_data) | ||||
self.timeit(f) | ||||
|
r985 | |||
|
r981 | return | ||
|
r998 | def run(self, dataOut, frequency=49.92e6, path=None, fileCadence=100, dirCadence=25, metadataCadence=1, **kwargs): | ||
|
r973 | ''' | ||
This method will be called many times so here you should put all your code | ||||
Inputs: | ||||
|
r981 | dataOut: object with the data | ||
|
r973 | ''' | ||
|
r981 | # print dataOut.__dict__ | ||
|
r980 | self.dataOut = dataOut | ||
|
r973 | if not self.isConfig: | ||
|
r991 | self.setup(dataOut, path, frequency, fileCadence, dirCadence, metadataCadence, **kwargs) | ||
|
r998 | self.writeMetadata() | ||
|
r979 | |||
|
r981 | self.writeData() | ||
|
r998 | ## self.currentSample += 1 | ||
## if self.dataOut.flagDataAsBlock or self.currentSample == 1: | ||||
## self.writeMetadata() | ||||
## if self.currentSample == self.__nProfiles: self.currentSample = 0 | ||||
|
r981 | |||
|
r980 | def close(self): | ||
print '[Writing] - Closing files ' | ||||
|
r985 | print 'Average of writing to digital rf format is ', self.oldAverage * 1000 | ||
|
r981 | try: | ||
self.digitalWriteObj.close() | ||||
except: | ||||
pass | ||||
# raise | ||||
|
r973 | if __name__ == '__main__': | ||
readObj = DigitalRFReader() | ||||
while True: | ||||
readObj.run(path='/home/jchavez/jicamarca/mocked_data/') | ||||
|
r981 | # readObj.printInfo() | ||
# readObj.printNumberOfBlock() | ||||