jroIO_digitalRF.py
800 lines
| 27.8 KiB
| text/x-python
|
PythonLexer
|
r979 | |||
|
r973 | ''' | ||
Created on Jul 3, 2014 | ||||
@author: roj-idl71 | ||||
''' | ||||
|
r991 | # SUBCHANNELS EN VEZ DE CHANNELS | ||
# BENCHMARKS -> PROBLEMAS CON ARCHIVOS GRANDES -> INCONSTANTE EN EL TIEMPO | ||||
# ACTUALIZACION DE VERSION | ||||
# HEADERS | ||||
# MODULO DE ESCRITURA | ||||
# METADATA | ||||
|
r973 | import os | ||
import datetime | ||||
import numpy | ||||
|
r985 | import timeit | ||
|
r979 | from fractions import Fraction | ||
|
r973 | |||
try: | ||||
from gevent import sleep | ||||
except: | ||||
from time import sleep | ||||
from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader | ||||
from schainpy.model.data.jrodata import Voltage | ||||
from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation | ||||
|
r981 | from time import time | ||
|
r985 | |||
|
r981 | import cPickle | ||
|
r973 | try: | ||
import digital_rf | ||||
except: | ||||
print 'You should install "digital_rf" module if you want to read Digital RF data' | ||||
|
r1092 | |||
|
r973 | class DigitalRFReader(ProcessingUnit): | ||
''' | ||||
classdocs | ||||
''' | ||||
def __init__(self, **kwargs): | ||||
''' | ||||
Constructor | ||||
''' | ||||
ProcessingUnit.__init__(self, **kwargs) | ||||
self.dataOut = Voltage() | ||||
self.__printInfo = True | ||||
self.__flagDiscontinuousBlock = False | ||||
self.__bufferIndex = 9999999 | ||||
self.__ippKm = None | ||||
self.__codeType = 0 | ||||
self.__nCode = None | ||||
self.__nBaud = None | ||||
self.__code = None | ||||
|
r1054 | self.dtype = None | ||
|
r1103 | self.oldAverage = None | ||
|
r973 | |||
|
r991 | def close(self): | ||
print 'Average of writing to digital rf format is ', self.oldAverage * 1000 | ||||
return | ||||
|
r973 | def __getCurrentSecond(self): | ||
|
r1092 | return self.__thisUnixSample / self.__sample_rate | ||
|
r973 | |||
thisSecond = property(__getCurrentSecond, "I'm the 'thisSecond' property.") | ||||
def __setFileHeader(self): | ||||
''' | ||||
In this method will be initialized every parameter of dataOut object (header, no data) | ||||
''' | ||||
|
r1092 | ippSeconds = 1.0 * self.__nSamples / self.__sample_rate | ||
nProfiles = 1.0 / ippSeconds # Number of profiles in one second | ||||
|
r973 | |||
|
r1054 | try: | ||
|
r1092 | self.dataOut.radarControllerHeaderObj = RadarControllerHeader( | ||
self.__radarControllerHeader) | ||||
|
r1054 | except: | ||
self.dataOut.radarControllerHeaderObj = RadarControllerHeader( | ||||
|
r1092 | txA=0, | ||
txB=0, | ||||
nWindows=1, | ||||
nHeights=self.__nSamples, | ||||
firstHeight=self.__firstHeigth, | ||||
deltaHeight=self.__deltaHeigth, | ||||
codeType=self.__codeType, | ||||
nCode=self.__nCode, nBaud=self.__nBaud, | ||||
code=self.__code) | ||||
|
r1054 | try: | ||
self.dataOut.systemHeaderObj = SystemHeader(self.__systemHeader) | ||||
except: | ||||
self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples, | ||||
nProfiles=nProfiles, | ||||
|
r1092 | nChannels=len( | ||
self.__channelList), | ||||
|
r1054 | adcResolution=14) | ||
|
r973 | self.dataOut.type = "Voltage" | ||
|
r1092 | |||
|
r973 | self.dataOut.data = None | ||
|
r981 | self.dataOut.dtype = self.dtype | ||
|
r973 | |||
|
r981 | # self.dataOut.nChannels = 0 | ||
|
r973 | |||
|
r981 | # self.dataOut.nHeights = 0 | ||
|
r973 | |||
|
r1069 | self.dataOut.nProfiles = int(nProfiles) | ||
|
r973 | |||
|
r1092 | self.dataOut.heightList = self.__firstHeigth + \ | ||
numpy.arange(self.__nSamples, dtype=numpy.float) * \ | ||||
self.__deltaHeigth | ||||
|
r973 | |||
|
r985 | self.dataOut.channelList = range(self.__num_subchannels) | ||
|
r973 | |||
self.dataOut.blocksize = self.dataOut.getNChannels() * self.dataOut.getNHeights() | ||||
|
r981 | # self.dataOut.channelIndexList = None | ||
|
r973 | |||
self.dataOut.flagNoData = True | ||||
|
r982 | |||
|
r981 | self.dataOut.flagDataAsBlock = False | ||
# Set to TRUE if the data is discontinuous | ||||
|
r973 | self.dataOut.flagDiscontinuousBlock = False | ||
self.dataOut.utctime = None | ||||
|
r1092 | # timezone like jroheader, difference in minutes between UTC and localtime | ||
self.dataOut.timeZone = self.__timezone / 60 | ||||
|
r973 | |||
self.dataOut.dstFlag = 0 | ||||
self.dataOut.errorCount = 0 | ||||
|
r1054 | try: | ||
|
r1092 | self.dataOut.nCohInt = self.fixed_metadata_dict.get( | ||
'nCohInt', self.nCohInt) | ||||
|
r982 | |||
|
r1092 | # asumo que la data esta decodificada | ||
self.dataOut.flagDecodeData = self.fixed_metadata_dict.get( | ||||
'flagDecodeData', self.flagDecodeData) | ||||
|
r973 | |||
|
r1092 | # asumo que la data esta sin flip | ||
self.dataOut.flagDeflipData = self.fixed_metadata_dict['flagDeflipData'] | ||||
|
r973 | |||
|
r1054 | self.dataOut.flagShiftFFT = self.fixed_metadata_dict['flagShiftFFT'] | ||
|
r973 | |||
|
r1054 | self.dataOut.useLocalTime = self.fixed_metadata_dict['useLocalTime'] | ||
except: | ||||
pass | ||||
|
r973 | |||
self.dataOut.ippSeconds = ippSeconds | ||||
|
r981 | # Time interval between profiles | ||
# self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt | ||||
|
r973 | |||
self.dataOut.frequency = self.__frequency | ||||
self.dataOut.realtime = self.__online | ||||
def findDatafiles(self, path, startDate=None, endDate=None): | ||||
if not os.path.isdir(path): | ||||
return [] | ||||
try: | ||||
|
r1092 | digitalReadObj = digital_rf.DigitalRFReader( | ||
path, load_all_metadata=True) | ||||
|
r973 | except: | ||
digitalReadObj = digital_rf.DigitalRFReader(path) | ||||
channelNameList = digitalReadObj.get_channels() | ||||
if not channelNameList: | ||||
return [] | ||||
metadata_dict = digitalReadObj.get_rf_file_metadata(channelNameList[0]) | ||||
sample_rate = metadata_dict['sample_rate'][0] | ||||
this_metadata_file = digitalReadObj.get_metadata(channelNameList[0]) | ||||
try: | ||||
timezone = this_metadata_file['timezone'].value | ||||
except: | ||||
timezone = 0 | ||||
|
r1092 | startUTCSecond, endUTCSecond = digitalReadObj.get_bounds( | ||
channelNameList[0]) / sample_rate - timezone | ||||
|
r973 | |||
startDatetime = datetime.datetime.utcfromtimestamp(startUTCSecond) | ||||
endDatatime = datetime.datetime.utcfromtimestamp(endUTCSecond) | ||||
if not startDate: | ||||
startDate = startDatetime.date() | ||||
if not endDate: | ||||
endDate = endDatatime.date() | ||||
dateList = [] | ||||
thisDatetime = startDatetime | ||||
|
r1092 | while(thisDatetime <= endDatatime): | ||
|
r973 | |||
thisDate = thisDatetime.date() | ||||
if thisDate < startDate: | ||||
continue | ||||
if thisDate > endDate: | ||||
break | ||||
dateList.append(thisDate) | ||||
thisDatetime += datetime.timedelta(1) | ||||
return dateList | ||||
|
r1092 | def setup(self, path=None, | ||
startDate=None, | ||||
endDate=None, | ||||
startTime=datetime.time(0, 0, 0), | ||||
endTime=datetime.time(23, 59, 59), | ||||
channelList=None, | ||||
nSamples=None, | ||||
online=False, | ||||
delay=60, | ||||
buffer_size=1024, | ||||
ippKm=None, | ||||
nCohInt=1, | ||||
nCode=1, | ||||
nBaud=1, | ||||
flagDecodeData=False, | ||||
code=numpy.ones((1, 1), dtype=numpy.int), | ||||
**kwargs): | ||||
|
r973 | ''' | ||
In this method we should set all initial parameters. | ||||
Inputs: | ||||
path | ||||
startDate | ||||
endDate | ||||
startTime | ||||
endTime | ||||
set | ||||
expLabel | ||||
ext | ||||
online | ||||
delay | ||||
''' | ||||
|
r1092 | self.nCohInt = nCohInt | ||
self.flagDecodeData = flagDecodeData | ||||
|
r998 | self.i = 0 | ||
|
r973 | if not os.path.isdir(path): | ||
|
r1092 | raise ValueError, "[Reading] Directory %s does not exist" % path | ||
|
r973 | |||
try: | ||||
|
r1092 | self.digitalReadObj = digital_rf.DigitalRFReader( | ||
path, load_all_metadata=True) | ||||
|
r973 | except: | ||
self.digitalReadObj = digital_rf.DigitalRFReader(path) | ||||
channelNameList = self.digitalReadObj.get_channels() | ||||
if not channelNameList: | ||||
|
r1092 | raise ValueError, "[Reading] Directory %s does not have any files" % path | ||
|
r973 | |||
if not channelList: | ||||
channelList = range(len(channelNameList)) | ||||
########## Reading metadata ###################### | ||||
|
r1092 | top_properties = self.digitalReadObj.get_properties( | ||
channelNameList[channelList[0]]) | ||||
|
r985 | |||
self.__num_subchannels = top_properties['num_subchannels'] | ||||
|
r1092 | self.__sample_rate = 1.0 * \ | ||
top_properties['sample_rate_numerator'] / \ | ||||
top_properties['sample_rate_denominator'] | ||||
|
r981 | # self.__samples_per_file = top_properties['samples_per_file'][0] | ||
|
r1092 | self.__deltaHeigth = 1e6 * 0.15 / self.__sample_rate # why 0.15? | ||
|
r973 | |||
|
r1092 | this_metadata_file = self.digitalReadObj.get_digital_metadata( | ||
channelNameList[channelList[0]]) | ||||
|
r973 | metadata_bounds = this_metadata_file.get_bounds() | ||
|
r1092 | self.fixed_metadata_dict = this_metadata_file.read( | ||
metadata_bounds[0])[metadata_bounds[0]] # GET FIRST HEADER | ||||
|
r1054 | |||
try: | ||||
self.__processingHeader = self.fixed_metadata_dict['processingHeader'] | ||||
self.__radarControllerHeader = self.fixed_metadata_dict['radarControllerHeader'] | ||||
self.__systemHeader = self.fixed_metadata_dict['systemHeader'] | ||||
self.dtype = cPickle.loads(self.fixed_metadata_dict['dtype']) | ||||
except: | ||||
pass | ||||
|
r973 | |||
self.__frequency = None | ||||
|
r981 | |||
|
r1054 | self.__frequency = self.fixed_metadata_dict.get('frequency', 1) | ||
|
r973 | |||
|
r1054 | self.__timezone = self.fixed_metadata_dict.get('timezone', 300) | ||
|
r973 | |||
|
r981 | try: | ||
|
r982 | nSamples = self.fixed_metadata_dict['nSamples'] | ||
|
r981 | except: | ||
nSamples = None | ||||
|
r1092 | |||
|
r973 | self.__firstHeigth = 0 | ||
try: | ||||
|
r981 | codeType = self.__radarControllerHeader['codeType'] | ||
|
r973 | except: | ||
codeType = 0 | ||||
|
r1054 | try: | ||
if codeType: | ||||
nCode = self.__radarControllerHeader['nCode'] | ||||
nBaud = self.__radarControllerHeader['nBaud'] | ||||
code = self.__radarControllerHeader['code'] | ||||
except: | ||||
pass | ||||
|
r1092 | |||
|
r973 | if not ippKm: | ||
try: | ||||
|
r981 | # seconds to km | ||
|
r982 | ippKm = self.__radarControllerHeader['ipp'] | ||
|
r973 | except: | ||
ippKm = None | ||||
#################################################### | ||||
|
r1054 | self.__ippKm = ippKm | ||
|
r973 | startUTCSecond = None | ||
endUTCSecond = None | ||||
if startDate: | ||||
startDatetime = datetime.datetime.combine(startDate, startTime) | ||||
|
r1092 | startUTCSecond = ( | ||
startDatetime - datetime.datetime(1970, 1, 1)).total_seconds() + self.__timezone | ||||
|
r973 | |||
if endDate: | ||||
endDatetime = datetime.datetime.combine(endDate, endTime) | ||||
|
r1092 | endUTCSecond = (endDatetime - datetime.datetime(1970, | ||
1, 1)).total_seconds() + self.__timezone | ||||
|
r973 | |||
|
r1092 | start_index, end_index = self.digitalReadObj.get_bounds( | ||
channelNameList[channelList[0]]) | ||||
|
r973 | |||
if not startUTCSecond: | ||||
|
r1092 | startUTCSecond = start_index / self.__sample_rate | ||
|
r973 | |||
|
r1092 | if start_index > startUTCSecond * self.__sample_rate: | ||
startUTCSecond = start_index / self.__sample_rate | ||||
|
r973 | |||
if not endUTCSecond: | ||||
|
r1092 | endUTCSecond = end_index / self.__sample_rate | ||
|
r973 | |||
|
r1092 | if end_index < endUTCSecond * self.__sample_rate: | ||
endUTCSecond = end_index / self.__sample_rate | ||||
|
r973 | if not nSamples: | ||
if not ippKm: | ||||
raise ValueError, "[Reading] nSamples or ippKm should be defined" | ||||
|
r1092 | nSamples = int(ippKm / (1e6 * 0.15 / self.__sample_rate)) | ||
|
r973 | channelBoundList = [] | ||
channelNameListFiltered = [] | ||||
for thisIndexChannel in channelList: | ||||
|
r1092 | thisChannelName = channelNameList[thisIndexChannel] | ||
start_index, end_index = self.digitalReadObj.get_bounds( | ||||
thisChannelName) | ||||
|
r973 | channelBoundList.append((start_index, end_index)) | ||
channelNameListFiltered.append(thisChannelName) | ||||
self.profileIndex = 0 | ||||
|
r1092 | self.i = 0 | ||
|
r973 | self.__delay = delay | ||
|
r1092 | |||
|
r973 | self.__codeType = codeType | ||
self.__nCode = nCode | ||||
self.__nBaud = nBaud | ||||
self.__code = code | ||||
self.__datapath = path | ||||
self.__online = online | ||||
self.__channelList = channelList | ||||
self.__channelNameList = channelNameListFiltered | ||||
self.__channelBoundList = channelBoundList | ||||
self.__nSamples = nSamples | ||||
|
r1092 | self.__samples_to_read = long(nSamples) # FIJO: AHORA 40 | ||
|
r973 | self.__nChannels = len(self.__channelList) | ||
self.__startUTCSecond = startUTCSecond | ||||
self.__endUTCSecond = endUTCSecond | ||||
|
r1092 | self.__timeInterval = 1.0 * self.__samples_to_read / \ | ||
self.__sample_rate # Time interval | ||||
|
r973 | |||
if online: | ||||
|
r1092 | # self.__thisUnixSample = int(endUTCSecond*self.__sample_rate - 4*self.__samples_to_read) | ||
|
r973 | startUTCSecond = numpy.floor(endUTCSecond) | ||
|
r1092 | # por que en el otro metodo lo primero q se hace es sumar samplestoread | ||
self.__thisUnixSample = long( | ||||
startUTCSecond * self.__sample_rate) - self.__samples_to_read | ||||
|
r973 | |||
|
r1092 | self.__data_buffer = numpy.zeros( | ||
(self.__num_subchannels, self.__samples_to_read), dtype=numpy.complex) | ||||
|
r973 | |||
self.__setFileHeader() | ||||
self.isConfig = True | ||||
|
r1092 | print "[Reading] Digital RF Data was found from %s to %s " % ( | ||
datetime.datetime.utcfromtimestamp( | ||||
self.__startUTCSecond - self.__timezone), | ||||
datetime.datetime.utcfromtimestamp( | ||||
self.__endUTCSecond - self.__timezone) | ||||
) | ||||
print "[Reading] Starting process from %s to %s" % (datetime.datetime.utcfromtimestamp(startUTCSecond - self.__timezone), | ||||
datetime.datetime.utcfromtimestamp( | ||||
endUTCSecond - self.__timezone) | ||||
) | ||||
|
r991 | self.oldAverage = None | ||
self.count = 0 | ||||
self.executionTime = 0 | ||||
|
r1092 | |||
|
r973 | def __reload(self): | ||
# print "%s not in range [%s, %s]" %( | ||||
# datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), | ||||
# datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone), | ||||
# datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone) | ||||
# ) | ||||
print "[Reading] reloading metadata ..." | ||||
try: | ||||
self.digitalReadObj.reload(complete_update=True) | ||||
except: | ||||
self.digitalReadObj.reload() | ||||
|
r1092 | start_index, end_index = self.digitalReadObj.get_bounds( | ||
self.__channelNameList[self.__channelList[0]]) | ||||
|
r973 | |||
|
r1092 | if start_index > self.__startUTCSecond * self.__sample_rate: | ||
self.__startUTCSecond = 1.0 * start_index / self.__sample_rate | ||||
|
r973 | |||
|
r1092 | if end_index > self.__endUTCSecond * self.__sample_rate: | ||
self.__endUTCSecond = 1.0 * end_index / self.__sample_rate | ||||
|
r973 | |||
|
r1092 | print "[Reading] New timerange found [%s, %s] " % ( | ||
datetime.datetime.utcfromtimestamp( | ||||
self.__startUTCSecond - self.__timezone), | ||||
datetime.datetime.utcfromtimestamp( | ||||
self.__endUTCSecond - self.__timezone) | ||||
) | ||||
|
r973 | |||
return True | ||||
return False | ||||
|
r991 | def timeit(self, toExecute): | ||
t0 = time() | ||||
toExecute() | ||||
self.executionTime = time() - t0 | ||||
|
r1092 | if self.oldAverage is None: | ||
self.oldAverage = self.executionTime | ||||
self.oldAverage = (self.executionTime + self.count * | ||||
self.oldAverage) / (self.count + 1.0) | ||||
|
r991 | self.count = self.count + 1.0 | ||
return | ||||
|
r1092 | def __readNextBlock(self, seconds=30, volt_scale=1): | ||
|
r973 | ''' | ||
''' | ||||
|
r981 | # Set the next data | ||
|
r973 | self.__flagDiscontinuousBlock = False | ||
self.__thisUnixSample += self.__samples_to_read | ||||
|
r1092 | if self.__thisUnixSample + 2 * self.__samples_to_read > self.__endUTCSecond * self.__sample_rate: | ||
|
r973 | print "[Reading] There are no more data into selected time-range" | ||
if self.__online: | ||||
self.__reload() | ||||
else: | ||||
return False | ||||
|
r1092 | if self.__thisUnixSample + 2 * self.__samples_to_read > self.__endUTCSecond * self.__sample_rate: | ||
|
r973 | return False | ||
|
r1092 | self.__thisUnixSample -= self.__samples_to_read | ||
|
r973 | |||
indexChannel = 0 | ||||
dataOk = False | ||||
|
r1092 | for thisChannelName in self.__channelNameList: # TODO VARIOS CHANNELS? | ||
|
r985 | for indexSubchannel in range(self.__num_subchannels): | ||
try: | ||||
|
r991 | t0 = time() | ||
|
r985 | result = self.digitalReadObj.read_vector_c81d(self.__thisUnixSample, | ||
|
r991 | self.__samples_to_read, | ||
thisChannelName, sub_channel=indexSubchannel) | ||||
self.executionTime = time() - t0 | ||||
|
r1092 | if self.oldAverage is None: | ||
self.oldAverage = self.executionTime | ||||
self.oldAverage = ( | ||||
self.executionTime + self.count * self.oldAverage) / (self.count + 1.0) | ||||
|
r991 | self.count = self.count + 1.0 | ||
|
r1092 | |||
|
r985 | except IOError, e: | ||
|
r1092 | # read next profile | ||
|
r985 | self.__flagDiscontinuousBlock = True | ||
|
r1092 | print "[Reading] %s" % datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), e | ||
|
r985 | break | ||
|
r973 | |||
|
r985 | if result.shape[0] != self.__samples_to_read: | ||
self.__flagDiscontinuousBlock = True | ||||
|
r1092 | print "[Reading] %s: Too few samples were found, just %d/%d samples" % (datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), | ||
result.shape[0], | ||||
self.__samples_to_read) | ||||
|
r985 | break | ||
|
r973 | |||
|
r1092 | self.__data_buffer[indexSubchannel, :] = result * volt_scale | ||
|
r973 | |||
|
r985 | indexChannel += 1 | ||
|
r973 | |||
|
r985 | dataOk = True | ||
|
r1092 | |||
self.__utctime = self.__thisUnixSample / self.__sample_rate | ||||
|
r973 | |||
if not dataOk: | ||||
return False | ||||
|
r1092 | print "[Reading] %s: %d samples <> %f sec" % (datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), | ||
self.__samples_to_read, | ||||
self.__timeInterval) | ||||
|
r973 | |||
self.__bufferIndex = 0 | ||||
return True | ||||
def __isBufferEmpty(self): | ||||
|
r1092 | return self.__bufferIndex > self.__samples_to_read - self.__nSamples # 40960 - 40 | ||
|
r973 | |||
def getData(self, seconds=30, nTries=5): | ||||
''' | ||||
This method gets the data from files and put the data into the dataOut object | ||||
In addition, increase el the buffer counter in one. | ||||
Return: | ||||
data : retorna un perfil de voltages (alturas * canales) copiados desde el | ||||
buffer. Si no hay mas archivos a leer retorna None. | ||||
Affected: | ||||
self.dataOut | ||||
self.profileIndex | ||||
self.flagDiscontinuousBlock | ||||
self.flagIsNewBlock | ||||
''' | ||||
err_counter = 0 | ||||
self.dataOut.flagNoData = True | ||||
if self.__isBufferEmpty(): | ||||
self.__flagDiscontinuousBlock = False | ||||
while True: | ||||
if self.__readNextBlock(): | ||||
break | ||||
|
r1092 | if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate: | ||
|
r973 | return False | ||
if self.__flagDiscontinuousBlock: | ||||
print '[Reading] discontinuous block found ... continue with the next block' | ||||
continue | ||||
if not self.__online: | ||||
return False | ||||
err_counter += 1 | ||||
if err_counter > nTries: | ||||
return False | ||||
|
r1092 | print '[Reading] waiting %d seconds to read a new block' % seconds | ||
|
r973 | sleep(seconds) | ||
|
r1092 | self.dataOut.data = self.__data_buffer[:, | ||
self.__bufferIndex:self.__bufferIndex + self.__nSamples] | ||||
self.dataOut.utctime = ( | ||||
self.__thisUnixSample + self.__bufferIndex) / self.__sample_rate | ||||
|
r973 | self.dataOut.flagNoData = False | ||
self.dataOut.flagDiscontinuousBlock = self.__flagDiscontinuousBlock | ||||
self.dataOut.profileIndex = self.profileIndex | ||||
self.__bufferIndex += self.__nSamples | ||||
self.profileIndex += 1 | ||||
if self.profileIndex == self.dataOut.nProfiles: | ||||
self.profileIndex = 0 | ||||
return True | ||||
def printInfo(self): | ||||
''' | ||||
''' | ||||
if self.__printInfo == False: | ||||
return | ||||
|
r981 | # self.systemHeaderObj.printInfo() | ||
# self.radarControllerHeaderObj.printInfo() | ||||
|
r973 | |||
self.__printInfo = False | ||||
def printNumberOfBlock(self): | ||||
''' | ||||
''' | ||||
return | ||||
|
r981 | # print self.profileIndex | ||
|
r973 | |||
def run(self, **kwargs): | ||||
''' | ||||
This method will be called many times so here you should put all your code | ||||
''' | ||||
|
r1092 | |||
|
r973 | if not self.isConfig: | ||
self.setup(**kwargs) | ||||
|
r998 | #self.i = self.i+1 | ||
|
r973 | self.getData(seconds=self.__delay) | ||
return | ||||
|
r1092 | |||
|
r973 | class DigitalRFWriter(Operation): | ||
''' | ||||
classdocs | ||||
''' | ||||
def __init__(self, **kwargs): | ||||
''' | ||||
Constructor | ||||
''' | ||||
Operation.__init__(self, **kwargs) | ||||
|
r981 | self.metadata_dict = {} | ||
|
r1092 | self.dataOut = None | ||
|
r1054 | self.dtype = None | ||
|
r1103 | self.oldAverage = 0 | ||
|
r991 | |||
def setHeader(self): | ||||
self.metadata_dict['frequency'] = self.dataOut.frequency | ||||
self.metadata_dict['timezone'] = self.dataOut.timeZone | ||||
self.metadata_dict['dtype'] = cPickle.dumps(self.dataOut.dtype) | ||||
self.metadata_dict['nProfiles'] = self.dataOut.nProfiles | ||||
self.metadata_dict['heightList'] = self.dataOut.heightList | ||||
self.metadata_dict['channelList'] = self.dataOut.channelList | ||||
self.metadata_dict['flagDecodeData'] = self.dataOut.flagDecodeData | ||||
self.metadata_dict['flagDeflipData'] = self.dataOut.flagDeflipData | ||||
self.metadata_dict['flagShiftFFT'] = self.dataOut.flagShiftFFT | ||||
self.metadata_dict['useLocalTime'] = self.dataOut.useLocalTime | ||||
self.metadata_dict['nCohInt'] = self.dataOut.nCohInt | ||||
|
r1120 | self.metadata_dict['type'] = self.dataOut.type | ||
self.metadata_dict['flagDataAsBlock'] = getattr( | ||||
self.dataOut, 'flagDataAsBlock', None) # chequear | ||||
|
r982 | |||
|
r991 | def setup(self, dataOut, path, frequency, fileCadence, dirCadence, metadataCadence, set=0, metadataFile='metadata', ext='.h5'): | ||
|
r973 | ''' | ||
In this method we should set all initial parameters. | ||||
Input: | ||||
|
r981 | dataOut: Input data will also be outputa data | ||
|
r973 | ''' | ||
|
r991 | self.setHeader() | ||
|
r980 | self.__ippSeconds = dataOut.ippSeconds | ||
self.__deltaH = dataOut.getDeltaH() | ||||
|
r1092 | self.__sample_rate = 1e6 * 0.15 / self.__deltaH | ||
|
r980 | self.__dtype = dataOut.dtype | ||
if len(dataOut.dtype) == 2: | ||||
self.__dtype = dataOut.dtype[0] | ||||
self.__nSamples = dataOut.systemHeaderObj.nSamples | ||||
self.__nProfiles = dataOut.nProfiles | ||||
|
r998 | |||
|
r1120 | if self.dataOut.type != 'Voltage': | ||
raise 'Digital RF cannot be used with this data type' | ||||
self.arr_data = numpy.ones((1, dataOut.nFFTPoints * len( | ||||
self.dataOut.channelList)), dtype=[('r', self.__dtype), ('i', self.__dtype)]) | ||||
else: | ||||
self.arr_data = numpy.ones((self.__nSamples, len( | ||||
self.dataOut.channelList)), dtype=[('r', self.__dtype), ('i', self.__dtype)]) | ||||
|
r980 | |||
|
r1103 | file_cadence_millisecs = 1000 | ||
|
r980 | |||
|
r979 | sample_rate_fraction = Fraction(self.__sample_rate).limit_denominator() | ||
sample_rate_numerator = long(sample_rate_fraction.numerator) | ||||
sample_rate_denominator = long(sample_rate_fraction.denominator) | ||||
|
r980 | start_global_index = dataOut.utctime * self.__sample_rate | ||
|
r1092 | |||
|
r979 | uuid = 'prueba' | ||
|
r1103 | compression_level = 0 | ||
|
r979 | checksum = False | ||
is_complex = True | ||||
|
r985 | num_subchannels = len(dataOut.channelList) | ||
|
r979 | is_continuous = True | ||
marching_periods = False | ||||
|
r991 | self.digitalWriteObj = digital_rf.DigitalRFWriter(path, self.__dtype, dirCadence, | ||
|
r1092 | fileCadence, start_global_index, | ||
sample_rate_numerator, sample_rate_denominator, uuid, compression_level, checksum, | ||||
is_complex, num_subchannels, is_continuous, marching_periods) | ||||
|
r980 | metadata_dir = os.path.join(path, 'metadata') | ||
os.system('mkdir %s' % (metadata_dir)) | ||||
|
r1092 | self.digitalMetadataWriteObj = digital_rf.DigitalMetadataWriter(metadata_dir, dirCadence, 1, # 236, file_cadence_millisecs / 1000 | ||
sample_rate_numerator, sample_rate_denominator, | ||||
metadataFile) | ||||
|
r973 | self.isConfig = True | ||
|
r980 | self.currentSample = 0 | ||
|
r985 | self.oldAverage = 0 | ||
self.count = 0 | ||||
|
r973 | return | ||
|
r1092 | |||
|
r981 | def writeMetadata(self): | ||
start_idx = self.__sample_rate * self.dataOut.utctime | ||||
|
r1092 | |||
self.metadata_dict['processingHeader'] = self.dataOut.processingHeaderObj.getAsDict( | ||||
) | ||||
self.metadata_dict['radarControllerHeader'] = self.dataOut.radarControllerHeaderObj.getAsDict( | ||||
) | ||||
self.metadata_dict['systemHeader'] = self.dataOut.systemHeaderObj.getAsDict( | ||||
) | ||||
|
r981 | self.digitalMetadataWriteObj.write(start_idx, self.metadata_dict) | ||
return | ||||
|
r973 | |||
|
r991 | def timeit(self, toExecute): | ||
|
r985 | t0 = time() | ||
|
r991 | toExecute() | ||
|
r985 | self.executionTime = time() - t0 | ||
|
r1092 | if self.oldAverage is None: | ||
self.oldAverage = self.executionTime | ||||
self.oldAverage = (self.executionTime + self.count * | ||||
self.oldAverage) / (self.count + 1.0) | ||||
|
r985 | self.count = self.count + 1.0 | ||
|
r991 | return | ||
def writeData(self): | ||||
|
r1120 | if self.dataOut.type != 'Voltage': | ||
raise 'Digital RF cannot be used with this data type' | ||||
|
r991 | for channel in self.dataOut.channelList: | ||
|
r1120 | for i in range(self.dataOut.nFFTPoints): | ||
self.arr_data[1][channel * self.dataOut.nFFTPoints + | ||||
i]['r'] = self.dataOut.data[channel][i].real | ||||
self.arr_data[1][channel * self.dataOut.nFFTPoints + | ||||
i]['i'] = self.dataOut.data[channel][i].imag | ||||
else: | ||||
for i in range(self.dataOut.systemHeaderObj.nSamples): | ||||
for channel in self.dataOut.channelList: | ||||
self.arr_data[i][channel]['r'] = self.dataOut.data[channel][i].real | ||||
self.arr_data[i][channel]['i'] = self.dataOut.data[channel][i].imag | ||||
|
r991 | |||
def f(): return self.digitalWriteObj.rf_write(self.arr_data) | ||||
self.timeit(f) | ||||
|
r1092 | |||
|
r981 | return | ||
|
r1092 | |||
|
r1103 | def run(self, dataOut, frequency=49.92e6, path=None, fileCadence=1000, dirCadence=36000, metadataCadence=1, **kwargs): | ||
|
r973 | ''' | ||
This method will be called many times so here you should put all your code | ||||
Inputs: | ||||
|
r981 | dataOut: object with the data | ||
|
r973 | ''' | ||
|
r981 | # print dataOut.__dict__ | ||
|
r980 | self.dataOut = dataOut | ||
|
r973 | if not self.isConfig: | ||
|
r1092 | self.setup(dataOut, path, frequency, fileCadence, | ||
dirCadence, metadataCadence, **kwargs) | ||||
|
r998 | self.writeMetadata() | ||
|
r979 | |||
|
r981 | self.writeData() | ||
|
r1092 | |||
|
r998 | ## self.currentSample += 1 | ||
|
r1092 | # if self.dataOut.flagDataAsBlock or self.currentSample == 1: | ||
# self.writeMetadata() | ||||
|
r998 | ## if self.currentSample == self.__nProfiles: self.currentSample = 0 | ||
|
r981 | |||
|
r980 | def close(self): | ||
print '[Writing] - Closing files ' | ||||
|
r985 | print 'Average of writing to digital rf format is ', self.oldAverage * 1000 | ||
|
r981 | try: | ||
self.digitalWriteObj.close() | ||||
except: | ||||
pass | ||||
|
r1092 | |||
|
r1103 | |||
|
r981 | # raise | ||
|
r973 | if __name__ == '__main__': | ||
readObj = DigitalRFReader() | ||||
while True: | ||||
readObj.run(path='/home/jchavez/jicamarca/mocked_data/') | ||||
|
r981 | # readObj.printInfo() | ||
|
r1092 | # readObj.printNumberOfBlock() | ||