jroIO_digitalRF.py
799 lines
| 27.9 KiB
| text/x-python
|
PythonLexer
|
r979 | ||
|
r973 | ''' | |
Created on Jul 3, 2014 | |||
@author: roj-idl71 | |||
''' | |||
|
r991 | # SUBCHANNELS EN VEZ DE CHANNELS | |
# BENCHMARKS -> PROBLEMAS CON ARCHIVOS GRANDES -> INCONSTANTE EN EL TIEMPO | |||
# ACTUALIZACION DE VERSION | |||
# HEADERS | |||
# MODULO DE ESCRITURA | |||
# METADATA | |||
|
r973 | import os | |
import datetime | |||
import numpy | |||
|
r985 | import timeit | |
|
r979 | from fractions import Fraction | |
|
r973 | ||
try: | |||
from gevent import sleep | |||
except: | |||
from time import sleep | |||
from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader | |||
from schainpy.model.data.jrodata import Voltage | |||
from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation | |||
|
r981 | from time import time | |
|
r985 | ||
|
r1167 | import pickle | |
|
r973 | try: | |
import digital_rf | |||
except: | |||
|
r1167 | print('You should install "digital_rf" module if you want to read Digital RF data') | |
|
r973 | ||
|
r1092 | ||
|
r973 | class DigitalRFReader(ProcessingUnit): | |
''' | |||
classdocs | |||
''' | |||
def __init__(self, **kwargs): | |||
''' | |||
Constructor | |||
''' | |||
ProcessingUnit.__init__(self, **kwargs) | |||
self.dataOut = Voltage() | |||
self.__printInfo = True | |||
self.__flagDiscontinuousBlock = False | |||
self.__bufferIndex = 9999999 | |||
self.__ippKm = None | |||
self.__codeType = 0 | |||
self.__nCode = None | |||
self.__nBaud = None | |||
self.__code = None | |||
|
r1054 | self.dtype = None | |
|
r1103 | self.oldAverage = None | |
|
r973 | ||
|
r991 | def close(self): | |
|
r1167 | print('Average of writing to digital rf format is ', self.oldAverage * 1000) | |
|
r991 | return | |
|
r973 | def __getCurrentSecond(self): | |
|
r1092 | return self.__thisUnixSample / self.__sample_rate | |
|
r973 | ||
thisSecond = property(__getCurrentSecond, "I'm the 'thisSecond' property.") | |||
def __setFileHeader(self): | |||
''' | |||
In this method will be initialized every parameter of dataOut object (header, no data) | |||
''' | |||
|
r1092 | ippSeconds = 1.0 * self.__nSamples / self.__sample_rate | |
nProfiles = 1.0 / ippSeconds # Number of profiles in one second | |||
|
r973 | ||
|
r1054 | try: | |
|
r1092 | self.dataOut.radarControllerHeaderObj = RadarControllerHeader( | |
self.__radarControllerHeader) | |||
|
r1054 | except: | |
self.dataOut.radarControllerHeaderObj = RadarControllerHeader( | |||
|
r1092 | txA=0, | |
txB=0, | |||
nWindows=1, | |||
nHeights=self.__nSamples, | |||
firstHeight=self.__firstHeigth, | |||
deltaHeight=self.__deltaHeigth, | |||
codeType=self.__codeType, | |||
nCode=self.__nCode, nBaud=self.__nBaud, | |||
code=self.__code) | |||
|
r1054 | try: | |
self.dataOut.systemHeaderObj = SystemHeader(self.__systemHeader) | |||
except: | |||
self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples, | |||
nProfiles=nProfiles, | |||
|
r1092 | nChannels=len( | |
self.__channelList), | |||
|
r1054 | adcResolution=14) | |
|
r973 | self.dataOut.type = "Voltage" | |
|
r1092 | ||
|
r973 | self.dataOut.data = None | |
|
r981 | self.dataOut.dtype = self.dtype | |
|
r973 | ||
|
r981 | # self.dataOut.nChannels = 0 | |
|
r973 | ||
|
r981 | # self.dataOut.nHeights = 0 | |
|
r973 | ||
|
r1069 | self.dataOut.nProfiles = int(nProfiles) | |
|
r973 | ||
|
r1092 | self.dataOut.heightList = self.__firstHeigth + \ | |
numpy.arange(self.__nSamples, dtype=numpy.float) * \ | |||
self.__deltaHeigth | |||
|
r973 | ||
|
r1167 | self.dataOut.channelList = list(range(self.__num_subchannels)) | |
|
r973 | ||
self.dataOut.blocksize = self.dataOut.getNChannels() * self.dataOut.getNHeights() | |||
|
r981 | # self.dataOut.channelIndexList = None | |
|
r973 | ||
self.dataOut.flagNoData = True | |||
|
r982 | ||
|
r981 | self.dataOut.flagDataAsBlock = False | |
# Set to TRUE if the data is discontinuous | |||
|
r973 | self.dataOut.flagDiscontinuousBlock = False | |
self.dataOut.utctime = None | |||
|
r1092 | # timezone like jroheader, difference in minutes between UTC and localtime | |
self.dataOut.timeZone = self.__timezone / 60 | |||
|
r973 | ||
self.dataOut.dstFlag = 0 | |||
self.dataOut.errorCount = 0 | |||
|
r1054 | try: | |
|
r1092 | self.dataOut.nCohInt = self.fixed_metadata_dict.get( | |
'nCohInt', self.nCohInt) | |||
|
r982 | ||
|
r1092 | # asumo que la data esta decodificada | |
self.dataOut.flagDecodeData = self.fixed_metadata_dict.get( | |||
'flagDecodeData', self.flagDecodeData) | |||
|
r973 | ||
|
r1092 | # asumo que la data esta sin flip | |
self.dataOut.flagDeflipData = self.fixed_metadata_dict['flagDeflipData'] | |||
|
r973 | ||
|
r1054 | self.dataOut.flagShiftFFT = self.fixed_metadata_dict['flagShiftFFT'] | |
|
r973 | ||
|
r1054 | self.dataOut.useLocalTime = self.fixed_metadata_dict['useLocalTime'] | |
except: | |||
pass | |||
|
r973 | ||
self.dataOut.ippSeconds = ippSeconds | |||
|
r981 | # Time interval between profiles | |
# self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt | |||
|
r973 | ||
self.dataOut.frequency = self.__frequency | |||
self.dataOut.realtime = self.__online | |||
def findDatafiles(self, path, startDate=None, endDate=None): | |||
if not os.path.isdir(path): | |||
return [] | |||
try: | |||
|
r1092 | digitalReadObj = digital_rf.DigitalRFReader( | |
path, load_all_metadata=True) | |||
|
r973 | except: | |
digitalReadObj = digital_rf.DigitalRFReader(path) | |||
channelNameList = digitalReadObj.get_channels() | |||
if not channelNameList: | |||
return [] | |||
metadata_dict = digitalReadObj.get_rf_file_metadata(channelNameList[0]) | |||
sample_rate = metadata_dict['sample_rate'][0] | |||
this_metadata_file = digitalReadObj.get_metadata(channelNameList[0]) | |||
try: | |||
timezone = this_metadata_file['timezone'].value | |||
except: | |||
timezone = 0 | |||
|
r1092 | startUTCSecond, endUTCSecond = digitalReadObj.get_bounds( | |
channelNameList[0]) / sample_rate - timezone | |||
|
r973 | ||
startDatetime = datetime.datetime.utcfromtimestamp(startUTCSecond) | |||
endDatatime = datetime.datetime.utcfromtimestamp(endUTCSecond) | |||
if not startDate: | |||
startDate = startDatetime.date() | |||
if not endDate: | |||
endDate = endDatatime.date() | |||
dateList = [] | |||
thisDatetime = startDatetime | |||
|
r1092 | while(thisDatetime <= endDatatime): | |
|
r973 | ||
thisDate = thisDatetime.date() | |||
if thisDate < startDate: | |||
continue | |||
if thisDate > endDate: | |||
break | |||
dateList.append(thisDate) | |||
thisDatetime += datetime.timedelta(1) | |||
return dateList | |||
|
r1092 | def setup(self, path=None, | |
startDate=None, | |||
endDate=None, | |||
startTime=datetime.time(0, 0, 0), | |||
endTime=datetime.time(23, 59, 59), | |||
channelList=None, | |||
nSamples=None, | |||
online=False, | |||
delay=60, | |||
buffer_size=1024, | |||
ippKm=None, | |||
nCohInt=1, | |||
nCode=1, | |||
nBaud=1, | |||
flagDecodeData=False, | |||
code=numpy.ones((1, 1), dtype=numpy.int), | |||
**kwargs): | |||
|
r973 | ''' | |
In this method we should set all initial parameters. | |||
Inputs: | |||
path | |||
startDate | |||
endDate | |||
startTime | |||
endTime | |||
set | |||
expLabel | |||
ext | |||
online | |||
delay | |||
''' | |||
|
r1092 | self.nCohInt = nCohInt | |
self.flagDecodeData = flagDecodeData | |||
|
r998 | self.i = 0 | |
|
r973 | if not os.path.isdir(path): | |
|
r1167 | raise ValueError("[Reading] Directory %s does not exist" % path) | |
|
r973 | ||
try: | |||
|
r1092 | self.digitalReadObj = digital_rf.DigitalRFReader( | |
path, load_all_metadata=True) | |||
|
r973 | except: | |
self.digitalReadObj = digital_rf.DigitalRFReader(path) | |||
channelNameList = self.digitalReadObj.get_channels() | |||
if not channelNameList: | |||
|
r1167 | raise ValueError("[Reading] Directory %s does not have any files" % path) | |
|
r973 | ||
if not channelList: | |||
|
r1167 | channelList = list(range(len(channelNameList))) | |
|
r973 | ||
########## Reading metadata ###################### | |||
|
r1092 | top_properties = self.digitalReadObj.get_properties( | |
channelNameList[channelList[0]]) | |||
|
r985 | ||
self.__num_subchannels = top_properties['num_subchannels'] | |||
|
r1092 | self.__sample_rate = 1.0 * \ | |
top_properties['sample_rate_numerator'] / \ | |||
top_properties['sample_rate_denominator'] | |||
|
r981 | # self.__samples_per_file = top_properties['samples_per_file'][0] | |
|
r1092 | self.__deltaHeigth = 1e6 * 0.15 / self.__sample_rate # why 0.15? | |
|
r973 | ||
|
r1092 | this_metadata_file = self.digitalReadObj.get_digital_metadata( | |
channelNameList[channelList[0]]) | |||
|
r973 | metadata_bounds = this_metadata_file.get_bounds() | |
|
r1092 | self.fixed_metadata_dict = this_metadata_file.read( | |
metadata_bounds[0])[metadata_bounds[0]] # GET FIRST HEADER | |||
|
r1054 | ||
try: | |||
self.__processingHeader = self.fixed_metadata_dict['processingHeader'] | |||
self.__radarControllerHeader = self.fixed_metadata_dict['radarControllerHeader'] | |||
self.__systemHeader = self.fixed_metadata_dict['systemHeader'] | |||
|
r1167 | self.dtype = pickle.loads(self.fixed_metadata_dict['dtype']) | |
|
r1054 | except: | |
pass | |||
|
r973 | ||
self.__frequency = None | |||
|
r981 | ||
|
r1054 | self.__frequency = self.fixed_metadata_dict.get('frequency', 1) | |
|
r973 | ||
|
r1054 | self.__timezone = self.fixed_metadata_dict.get('timezone', 300) | |
|
r973 | ||
|
r981 | try: | |
|
r982 | nSamples = self.fixed_metadata_dict['nSamples'] | |
|
r981 | except: | |
nSamples = None | |||
|
r1092 | ||
|
r973 | self.__firstHeigth = 0 | |
try: | |||
|
r981 | codeType = self.__radarControllerHeader['codeType'] | |
|
r973 | except: | |
codeType = 0 | |||
|
r1054 | try: | |
if codeType: | |||
nCode = self.__radarControllerHeader['nCode'] | |||
nBaud = self.__radarControllerHeader['nBaud'] | |||
code = self.__radarControllerHeader['code'] | |||
except: | |||
pass | |||
|
r1092 | ||
|
r973 | if not ippKm: | |
try: | |||
|
r981 | # seconds to km | |
|
r982 | ippKm = self.__radarControllerHeader['ipp'] | |
|
r973 | except: | |
ippKm = None | |||
#################################################### | |||
|
r1054 | self.__ippKm = ippKm | |
|
r973 | startUTCSecond = None | |
endUTCSecond = None | |||
if startDate: | |||
startDatetime = datetime.datetime.combine(startDate, startTime) | |||
|
r1092 | startUTCSecond = ( | |
startDatetime - datetime.datetime(1970, 1, 1)).total_seconds() + self.__timezone | |||
|
r973 | ||
if endDate: | |||
endDatetime = datetime.datetime.combine(endDate, endTime) | |||
|
r1092 | endUTCSecond = (endDatetime - datetime.datetime(1970, | |
1, 1)).total_seconds() + self.__timezone | |||
|
r973 | ||
|
r1092 | start_index, end_index = self.digitalReadObj.get_bounds( | |
channelNameList[channelList[0]]) | |||
|
r973 | ||
if not startUTCSecond: | |||
|
r1092 | startUTCSecond = start_index / self.__sample_rate | |
|
r973 | ||
|
r1092 | if start_index > startUTCSecond * self.__sample_rate: | |
startUTCSecond = start_index / self.__sample_rate | |||
|
r973 | ||
if not endUTCSecond: | |||
|
r1092 | endUTCSecond = end_index / self.__sample_rate | |
|
r973 | ||
|
r1092 | if end_index < endUTCSecond * self.__sample_rate: | |
endUTCSecond = end_index / self.__sample_rate | |||
|
r973 | if not nSamples: | |
if not ippKm: | |||
|
r1167 | raise ValueError("[Reading] nSamples or ippKm should be defined") | |
|
r1092 | nSamples = int(ippKm / (1e6 * 0.15 / self.__sample_rate)) | |
|
r973 | channelBoundList = [] | |
channelNameListFiltered = [] | |||
for thisIndexChannel in channelList: | |||
|
r1092 | thisChannelName = channelNameList[thisIndexChannel] | |
start_index, end_index = self.digitalReadObj.get_bounds( | |||
thisChannelName) | |||
|
r973 | channelBoundList.append((start_index, end_index)) | |
channelNameListFiltered.append(thisChannelName) | |||
self.profileIndex = 0 | |||
|
r1092 | self.i = 0 | |
|
r973 | self.__delay = delay | |
|
r1092 | ||
|
r973 | self.__codeType = codeType | |
self.__nCode = nCode | |||
self.__nBaud = nBaud | |||
self.__code = code | |||
self.__datapath = path | |||
self.__online = online | |||
self.__channelList = channelList | |||
self.__channelNameList = channelNameListFiltered | |||
self.__channelBoundList = channelBoundList | |||
self.__nSamples = nSamples | |||
|
r1167 | self.__samples_to_read = int(nSamples) # FIJO: AHORA 40 | |
|
r973 | self.__nChannels = len(self.__channelList) | |
self.__startUTCSecond = startUTCSecond | |||
self.__endUTCSecond = endUTCSecond | |||
|
r1092 | self.__timeInterval = 1.0 * self.__samples_to_read / \ | |
self.__sample_rate # Time interval | |||
|
r973 | ||
if online: | |||
|
r1092 | # self.__thisUnixSample = int(endUTCSecond*self.__sample_rate - 4*self.__samples_to_read) | |
|
r973 | startUTCSecond = numpy.floor(endUTCSecond) | |
|
r1092 | # por que en el otro metodo lo primero q se hace es sumar samplestoread | |
|
r1167 | self.__thisUnixSample = int( | |
|
r1092 | startUTCSecond * self.__sample_rate) - self.__samples_to_read | |
|
r973 | ||
|
r1092 | self.__data_buffer = numpy.zeros( | |
(self.__num_subchannels, self.__samples_to_read), dtype=numpy.complex) | |||
|
r973 | ||
self.__setFileHeader() | |||
self.isConfig = True | |||
|
r1167 | print("[Reading] Digital RF Data was found from %s to %s " % ( | |
|
r1092 | datetime.datetime.utcfromtimestamp( | |
self.__startUTCSecond - self.__timezone), | |||
datetime.datetime.utcfromtimestamp( | |||
self.__endUTCSecond - self.__timezone) | |||
|
r1167 | )) | |
|
r1092 | ||
|
r1167 | print("[Reading] Starting process from %s to %s" % (datetime.datetime.utcfromtimestamp(startUTCSecond - self.__timezone), | |
|
r1092 | datetime.datetime.utcfromtimestamp( | |
endUTCSecond - self.__timezone) | |||
|
r1167 | )) | |
|
r991 | self.oldAverage = None | |
self.count = 0 | |||
self.executionTime = 0 | |||
|
r1092 | ||
|
r973 | def __reload(self): | |
# print "%s not in range [%s, %s]" %( | |||
# datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), | |||
# datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone), | |||
# datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone) | |||
# ) | |||
|
r1167 | print("[Reading] reloading metadata ...") | |
|
r973 | ||
try: | |||
self.digitalReadObj.reload(complete_update=True) | |||
except: | |||
self.digitalReadObj.reload() | |||
|
r1092 | start_index, end_index = self.digitalReadObj.get_bounds( | |
self.__channelNameList[self.__channelList[0]]) | |||
|
r973 | ||
|
r1092 | if start_index > self.__startUTCSecond * self.__sample_rate: | |
self.__startUTCSecond = 1.0 * start_index / self.__sample_rate | |||
|
r973 | ||
|
r1092 | if end_index > self.__endUTCSecond * self.__sample_rate: | |
self.__endUTCSecond = 1.0 * end_index / self.__sample_rate | |||
|
r1167 | print() | |
print("[Reading] New timerange found [%s, %s] " % ( | |||
|
r1092 | datetime.datetime.utcfromtimestamp( | |
self.__startUTCSecond - self.__timezone), | |||
datetime.datetime.utcfromtimestamp( | |||
self.__endUTCSecond - self.__timezone) | |||
|
r1167 | )) | |
|
r973 | ||
return True | |||
return False | |||
|
r991 | def timeit(self, toExecute): | |
t0 = time() | |||
toExecute() | |||
self.executionTime = time() - t0 | |||
|
r1092 | if self.oldAverage is None: | |
self.oldAverage = self.executionTime | |||
self.oldAverage = (self.executionTime + self.count * | |||
self.oldAverage) / (self.count + 1.0) | |||
|
r991 | self.count = self.count + 1.0 | |
return | |||
|
r1092 | def __readNextBlock(self, seconds=30, volt_scale=1): | |
|
r973 | ''' | |
''' | |||
|
r981 | # Set the next data | |
|
r973 | self.__flagDiscontinuousBlock = False | |
self.__thisUnixSample += self.__samples_to_read | |||
|
r1092 | if self.__thisUnixSample + 2 * self.__samples_to_read > self.__endUTCSecond * self.__sample_rate: | |
|
r1167 | print("[Reading] There are no more data into selected time-range") | |
|
r973 | if self.__online: | |
self.__reload() | |||
else: | |||
return False | |||
|
r1092 | if self.__thisUnixSample + 2 * self.__samples_to_read > self.__endUTCSecond * self.__sample_rate: | |
|
r973 | return False | |
|
r1092 | self.__thisUnixSample -= self.__samples_to_read | |
|
r973 | ||
indexChannel = 0 | |||
dataOk = False | |||
|
r1092 | for thisChannelName in self.__channelNameList: # TODO VARIOS CHANNELS? | |
|
r985 | for indexSubchannel in range(self.__num_subchannels): | |
try: | |||
|
r991 | t0 = time() | |
|
r985 | result = self.digitalReadObj.read_vector_c81d(self.__thisUnixSample, | |
|
r991 | self.__samples_to_read, | |
thisChannelName, sub_channel=indexSubchannel) | |||
self.executionTime = time() - t0 | |||
|
r1092 | if self.oldAverage is None: | |
self.oldAverage = self.executionTime | |||
self.oldAverage = ( | |||
self.executionTime + self.count * self.oldAverage) / (self.count + 1.0) | |||
|
r991 | self.count = self.count + 1.0 | |
|
r1092 | ||
|
r1167 | except IOError as e: | |
|
r1092 | # read next profile | |
|
r985 | self.__flagDiscontinuousBlock = True | |
|
r1167 | print("[Reading] %s" % datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), e) | |
|
r985 | break | |
|
r973 | ||
|
r985 | if result.shape[0] != self.__samples_to_read: | |
self.__flagDiscontinuousBlock = True | |||
|
r1167 | print("[Reading] %s: Too few samples were found, just %d/%d samples" % (datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), | |
|
r1092 | result.shape[0], | |
|
r1167 | self.__samples_to_read)) | |
|
r985 | break | |
|
r973 | ||
|
r1092 | self.__data_buffer[indexSubchannel, :] = result * volt_scale | |
|
r973 | ||
|
r985 | indexChannel += 1 | |
|
r973 | ||
|
r985 | dataOk = True | |
|
r1092 | ||
self.__utctime = self.__thisUnixSample / self.__sample_rate | |||
|
r973 | ||
if not dataOk: | |||
return False | |||
|
r1167 | print("[Reading] %s: %d samples <> %f sec" % (datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), | |
|
r1092 | self.__samples_to_read, | |
|
r1167 | self.__timeInterval)) | |
|
r973 | ||
self.__bufferIndex = 0 | |||
return True | |||
def __isBufferEmpty(self): | |||
|
r1092 | return self.__bufferIndex > self.__samples_to_read - self.__nSamples # 40960 - 40 | |
|
r973 | ||
def getData(self, seconds=30, nTries=5): | |||
''' | |||
This method gets the data from files and put the data into the dataOut object | |||
In addition, increase el the buffer counter in one. | |||
Return: | |||
data : retorna un perfil de voltages (alturas * canales) copiados desde el | |||
buffer. Si no hay mas archivos a leer retorna None. | |||
Affected: | |||
self.dataOut | |||
self.profileIndex | |||
self.flagDiscontinuousBlock | |||
self.flagIsNewBlock | |||
''' | |||
err_counter = 0 | |||
self.dataOut.flagNoData = True | |||
if self.__isBufferEmpty(): | |||
self.__flagDiscontinuousBlock = False | |||
while True: | |||
if self.__readNextBlock(): | |||
break | |||
|
r1092 | if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate: | |
|
r973 | return False | |
if self.__flagDiscontinuousBlock: | |||
|
r1167 | print('[Reading] discontinuous block found ... continue with the next block') | |
|
r973 | continue | |
if not self.__online: | |||
return False | |||
err_counter += 1 | |||
if err_counter > nTries: | |||
return False | |||
|
r1167 | print('[Reading] waiting %d seconds to read a new block' % seconds) | |
|
r973 | sleep(seconds) | |
|
r1092 | self.dataOut.data = self.__data_buffer[:, | |
self.__bufferIndex:self.__bufferIndex + self.__nSamples] | |||
self.dataOut.utctime = ( | |||
self.__thisUnixSample + self.__bufferIndex) / self.__sample_rate | |||
|
r973 | self.dataOut.flagNoData = False | |
self.dataOut.flagDiscontinuousBlock = self.__flagDiscontinuousBlock | |||
self.dataOut.profileIndex = self.profileIndex | |||
self.__bufferIndex += self.__nSamples | |||
self.profileIndex += 1 | |||
if self.profileIndex == self.dataOut.nProfiles: | |||
self.profileIndex = 0 | |||
return True | |||
def printInfo(self): | |||
''' | |||
''' | |||
if self.__printInfo == False: | |||
return | |||
|
r981 | # self.systemHeaderObj.printInfo() | |
# self.radarControllerHeaderObj.printInfo() | |||
|
r973 | ||
self.__printInfo = False | |||
def printNumberOfBlock(self): | |||
''' | |||
''' | |||
return | |||
|
r981 | # print self.profileIndex | |
|
r973 | ||
def run(self, **kwargs): | |||
''' | |||
This method will be called many times so here you should put all your code | |||
''' | |||
|
r1092 | ||
|
r973 | if not self.isConfig: | |
self.setup(**kwargs) | |||
|
r998 | #self.i = self.i+1 | |
|
r973 | self.getData(seconds=self.__delay) | |
return | |||
|
r1092 | ||
|
r973 | class DigitalRFWriter(Operation): | |
''' | |||
classdocs | |||
''' | |||
def __init__(self, **kwargs): | |||
''' | |||
Constructor | |||
''' | |||
Operation.__init__(self, **kwargs) | |||
|
r981 | self.metadata_dict = {} | |
|
r1092 | self.dataOut = None | |
|
r1054 | self.dtype = None | |
|
r1103 | self.oldAverage = 0 | |
|
r991 | ||
def setHeader(self): | |||
self.metadata_dict['frequency'] = self.dataOut.frequency | |||
self.metadata_dict['timezone'] = self.dataOut.timeZone | |||
|
r1167 | self.metadata_dict['dtype'] = pickle.dumps(self.dataOut.dtype) | |
|
r991 | self.metadata_dict['nProfiles'] = self.dataOut.nProfiles | |
self.metadata_dict['heightList'] = self.dataOut.heightList | |||
self.metadata_dict['channelList'] = self.dataOut.channelList | |||
self.metadata_dict['flagDecodeData'] = self.dataOut.flagDecodeData | |||
self.metadata_dict['flagDeflipData'] = self.dataOut.flagDeflipData | |||
self.metadata_dict['flagShiftFFT'] = self.dataOut.flagShiftFFT | |||
self.metadata_dict['useLocalTime'] = self.dataOut.useLocalTime | |||
self.metadata_dict['nCohInt'] = self.dataOut.nCohInt | |||
|
r1120 | self.metadata_dict['type'] = self.dataOut.type | |
self.metadata_dict['flagDataAsBlock'] = getattr( | |||
self.dataOut, 'flagDataAsBlock', None) # chequear | |||
|
r982 | ||
|
r991 | def setup(self, dataOut, path, frequency, fileCadence, dirCadence, metadataCadence, set=0, metadataFile='metadata', ext='.h5'): | |
|
r973 | ''' | |
In this method we should set all initial parameters. | |||
Input: | |||
|
r981 | dataOut: Input data will also be outputa data | |
|
r973 | ''' | |
|
r991 | self.setHeader() | |
|
r980 | self.__ippSeconds = dataOut.ippSeconds | |
self.__deltaH = dataOut.getDeltaH() | |||
|
r1092 | self.__sample_rate = 1e6 * 0.15 / self.__deltaH | |
|
r980 | self.__dtype = dataOut.dtype | |
if len(dataOut.dtype) == 2: | |||
self.__dtype = dataOut.dtype[0] | |||
self.__nSamples = dataOut.systemHeaderObj.nSamples | |||
self.__nProfiles = dataOut.nProfiles | |||
|
r998 | ||
|
r1120 | if self.dataOut.type != 'Voltage': | |
raise 'Digital RF cannot be used with this data type' | |||
self.arr_data = numpy.ones((1, dataOut.nFFTPoints * len( | |||
self.dataOut.channelList)), dtype=[('r', self.__dtype), ('i', self.__dtype)]) | |||
else: | |||
self.arr_data = numpy.ones((self.__nSamples, len( | |||
self.dataOut.channelList)), dtype=[('r', self.__dtype), ('i', self.__dtype)]) | |||
|
r980 | ||
|
r1103 | file_cadence_millisecs = 1000 | |
|
r980 | ||
|
r979 | sample_rate_fraction = Fraction(self.__sample_rate).limit_denominator() | |
|
r1167 | sample_rate_numerator = int(sample_rate_fraction.numerator) | |
sample_rate_denominator = int(sample_rate_fraction.denominator) | |||
|
r980 | start_global_index = dataOut.utctime * self.__sample_rate | |
|
r1092 | ||
|
r979 | uuid = 'prueba' | |
|
r1103 | compression_level = 0 | |
|
r979 | checksum = False | |
is_complex = True | |||
|
r985 | num_subchannels = len(dataOut.channelList) | |
|
r979 | is_continuous = True | |
marching_periods = False | |||
|
r991 | self.digitalWriteObj = digital_rf.DigitalRFWriter(path, self.__dtype, dirCadence, | |
|
r1092 | fileCadence, start_global_index, | |
sample_rate_numerator, sample_rate_denominator, uuid, compression_level, checksum, | |||
is_complex, num_subchannels, is_continuous, marching_periods) | |||
|
r980 | metadata_dir = os.path.join(path, 'metadata') | |
os.system('mkdir %s' % (metadata_dir)) | |||
|
r1092 | self.digitalMetadataWriteObj = digital_rf.DigitalMetadataWriter(metadata_dir, dirCadence, 1, # 236, file_cadence_millisecs / 1000 | |
sample_rate_numerator, sample_rate_denominator, | |||
metadataFile) | |||
|
r973 | self.isConfig = True | |
|
r980 | self.currentSample = 0 | |
|
r985 | self.oldAverage = 0 | |
self.count = 0 | |||
|
r973 | return | |
|
r1092 | ||
|
r981 | def writeMetadata(self): | |
start_idx = self.__sample_rate * self.dataOut.utctime | |||
|
r1092 | ||
self.metadata_dict['processingHeader'] = self.dataOut.processingHeaderObj.getAsDict( | |||
) | |||
self.metadata_dict['radarControllerHeader'] = self.dataOut.radarControllerHeaderObj.getAsDict( | |||
) | |||
self.metadata_dict['systemHeader'] = self.dataOut.systemHeaderObj.getAsDict( | |||
) | |||
|
r981 | self.digitalMetadataWriteObj.write(start_idx, self.metadata_dict) | |
return | |||
|
r973 | ||
|
r991 | def timeit(self, toExecute): | |
|
r985 | t0 = time() | |
|
r991 | toExecute() | |
|
r985 | self.executionTime = time() - t0 | |
|
r1092 | if self.oldAverage is None: | |
self.oldAverage = self.executionTime | |||
self.oldAverage = (self.executionTime + self.count * | |||
self.oldAverage) / (self.count + 1.0) | |||
|
r985 | self.count = self.count + 1.0 | |
|
r991 | return | |
def writeData(self): | |||
|
r1120 | if self.dataOut.type != 'Voltage': | |
raise 'Digital RF cannot be used with this data type' | |||
|
r991 | for channel in self.dataOut.channelList: | |
|
r1120 | for i in range(self.dataOut.nFFTPoints): | |
self.arr_data[1][channel * self.dataOut.nFFTPoints + | |||
i]['r'] = self.dataOut.data[channel][i].real | |||
self.arr_data[1][channel * self.dataOut.nFFTPoints + | |||
i]['i'] = self.dataOut.data[channel][i].imag | |||
else: | |||
for i in range(self.dataOut.systemHeaderObj.nSamples): | |||
for channel in self.dataOut.channelList: | |||
self.arr_data[i][channel]['r'] = self.dataOut.data[channel][i].real | |||
self.arr_data[i][channel]['i'] = self.dataOut.data[channel][i].imag | |||
|
r991 | ||
def f(): return self.digitalWriteObj.rf_write(self.arr_data) | |||
self.timeit(f) | |||
|
r1092 | ||
|
r981 | return | |
|
r1092 | ||
|
r1103 | def run(self, dataOut, frequency=49.92e6, path=None, fileCadence=1000, dirCadence=36000, metadataCadence=1, **kwargs): | |
|
r973 | ''' | |
This method will be called many times so here you should put all your code | |||
Inputs: | |||
|
r981 | dataOut: object with the data | |
|
r973 | ''' | |
|
r981 | # print dataOut.__dict__ | |
|
r980 | self.dataOut = dataOut | |
|
r973 | if not self.isConfig: | |
|
r1092 | self.setup(dataOut, path, frequency, fileCadence, | |
dirCadence, metadataCadence, **kwargs) | |||
|
r998 | self.writeMetadata() | |
|
r979 | ||
|
r981 | self.writeData() | |
|
r1092 | ||
|
r998 | ## self.currentSample += 1 | |
|
r1092 | # if self.dataOut.flagDataAsBlock or self.currentSample == 1: | |
# self.writeMetadata() | |||
|
r998 | ## if self.currentSample == self.__nProfiles: self.currentSample = 0 | |
|
r981 | ||
|
r980 | def close(self): | |
|
r1167 | print('[Writing] - Closing files ') | |
print('Average of writing to digital rf format is ', self.oldAverage * 1000) | |||
|
r981 | try: | |
self.digitalWriteObj.close() | |||
except: | |||
pass | |||
|
r1092 | ||
|
r1103 | ||
|
r981 | # raise | |
|
r973 | if __name__ == '__main__': | |
readObj = DigitalRFReader() | |||
while True: | |||
readObj.run(path='/home/jchavez/jicamarca/mocked_data/') | |||
|
r981 | # readObj.printInfo() | |
|
r1167 | # readObj.printNumberOfBlock() |