|
|
import ast
|
|
|
import json
|
|
|
import requests
|
|
|
import base64
|
|
|
from struct import pack
|
|
|
|
|
|
from django.db import models
|
|
|
from django.urls import reverse
|
|
|
from django.core.validators import MinValueValidator, MaxValueValidator
|
|
|
|
|
|
from apps.main.models import Configuration
|
|
|
|
|
|
BOARD_VALUE = (
|
|
|
('1', 'A:AB'),
|
|
|
('2', 'AB')
|
|
|
)
|
|
|
|
|
|
ANT_VALUE = (
|
|
|
('1', 'RX'),
|
|
|
('2', 'TX'),
|
|
|
('3', 'TX/RX')
|
|
|
)
|
|
|
|
|
|
|
|
|
class TXCode(models.Model):
|
|
|
|
|
|
name = models.CharField(max_length=40)
|
|
|
bits_per_code = models.PositiveIntegerField(default=0)
|
|
|
number_of_codes = models.PositiveIntegerField(default=0)
|
|
|
codes = models.TextField(blank=True, null=True)
|
|
|
|
|
|
class Meta:
|
|
|
db_table = 'tx_codes'
|
|
|
ordering = ('name',)
|
|
|
|
|
|
def __str__(self):
|
|
|
return u'%s' % self.name
|
|
|
|
|
|
|
|
|
class USRPTXConfiguration(Configuration):
|
|
|
|
|
|
ip_address = models.GenericIPAddressField(
|
|
|
verbose_name = 'IP address',
|
|
|
protocol='IPv4',
|
|
|
default='0.0.0.0')
|
|
|
|
|
|
daughterboard = models.CharField(
|
|
|
verbose_name='Daughterboard',
|
|
|
max_length=3,
|
|
|
choices=BOARD_VALUE,
|
|
|
null=False,
|
|
|
blank=False
|
|
|
)
|
|
|
|
|
|
antenna = models.CharField(
|
|
|
verbose_name='Antenna',
|
|
|
max_length=3,
|
|
|
choices=ANT_VALUE,
|
|
|
null=False,
|
|
|
blank=False
|
|
|
)
|
|
|
|
|
|
frequency = models.FloatField(
|
|
|
verbose_name='Frequency [MHz]',
|
|
|
validators=[MinValueValidator(0.1)],
|
|
|
blank=False,
|
|
|
null=False,
|
|
|
help_text='Introduce the value in MHz (10^6)'
|
|
|
)
|
|
|
|
|
|
samplerate = models.FloatField(
|
|
|
verbose_name='Sample rate [MHz]',
|
|
|
validators=[MinValueValidator(0.1)],
|
|
|
blank=False,
|
|
|
null=False,
|
|
|
help_text='Introduce the value in MHz (10^6)'
|
|
|
)
|
|
|
|
|
|
ipp = models.FloatField(
|
|
|
verbose_name='IPP [km]',
|
|
|
validators=[MinValueValidator(0.1)],
|
|
|
blank=False,
|
|
|
null=False,
|
|
|
help_text='Introduce the value in km'
|
|
|
)
|
|
|
|
|
|
delay = models.FloatField(
|
|
|
verbose_name='Delay [us]',
|
|
|
validators=[MinValueValidator(0.0)],
|
|
|
null=False,
|
|
|
help_text='Introduce the value in µs'
|
|
|
)
|
|
|
|
|
|
pulse_1 = models.FloatField(
|
|
|
verbose_name='Pulse Width 1 [us]',
|
|
|
validators=[MinValueValidator(0.1)],
|
|
|
blank=False,
|
|
|
null=False,
|
|
|
help_text='Introduce the value in us'
|
|
|
)
|
|
|
|
|
|
code_type_1 = models.ForeignKey(
|
|
|
TXCode,
|
|
|
on_delete=models.CASCADE,
|
|
|
related_name='%(class)s_code_1',
|
|
|
verbose_name="Code Type 1"
|
|
|
)
|
|
|
|
|
|
code_1 = models.TextField(
|
|
|
verbose_name="Code 1",
|
|
|
blank=True,
|
|
|
null=True,
|
|
|
help_text="binary code"
|
|
|
)
|
|
|
|
|
|
repetitions_1 = models.IntegerField(
|
|
|
verbose_name='NTX 1',
|
|
|
default=1,
|
|
|
blank=True,
|
|
|
null=True
|
|
|
)
|
|
|
|
|
|
enable_2 = models.BooleanField(
|
|
|
verbose_name='Pulse 2',
|
|
|
blank=False,
|
|
|
null=False
|
|
|
)
|
|
|
|
|
|
pulse_2 = models.FloatField(
|
|
|
verbose_name='Pulse Width 2 [us]',
|
|
|
default=1,
|
|
|
blank=True,
|
|
|
null=True,
|
|
|
help_text='Introduce the value in µs'
|
|
|
)
|
|
|
|
|
|
code_type_2 = models.ForeignKey(
|
|
|
TXCode,
|
|
|
on_delete=models.CASCADE,
|
|
|
related_name='%(class)s_code_2',
|
|
|
verbose_name="Code Type 2",
|
|
|
blank=True,
|
|
|
null=True,
|
|
|
)
|
|
|
|
|
|
code_2 = models.TextField(
|
|
|
verbose_name="Code 2",
|
|
|
blank=True,
|
|
|
null=True,
|
|
|
help_text="Introduce the binary code"
|
|
|
)
|
|
|
|
|
|
repetitions_2 = models.IntegerField(
|
|
|
verbose_name='NTX 2',
|
|
|
default=0,
|
|
|
blank=True,
|
|
|
null=True
|
|
|
)
|
|
|
|
|
|
class Meta:
|
|
|
db_table = 'usrp_tx_configurations'
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
|
if not self.enable_2:
|
|
|
return 'TX with IPP={}km, Pulse 1: PW={} us, CODE={}'.format(self.ipp, self.pulse_1, self.code_type_1)
|
|
|
else:
|
|
|
return 'TX with IPP={}km, Pulse 1: PW={} us, CODE={} Pulse 2: PW={} us, CODE={}'.format(self.ipp, self.pulse_1, self.code_type_1, self.pulse_2, self.code_type_2)
|
|
|
|
|
|
def get_absolute_url_plot(self):
|
|
|
return reverse('url_plot_usrp_tx_pulses', args=[str(self.id)])
|
|
|
|
|
|
def request(self, cmd, method='get', **kwargs):
|
|
|
|
|
|
req = getattr(requests, method)(self.device.url(cmd), **kwargs)
|
|
|
payload = req.json()
|
|
|
return payload
|
|
|
|
|
|
def status_device(self):
|
|
|
|
|
|
try:
|
|
|
|
|
|
payload = requests.get(self.device.url())
|
|
|
|
|
|
if payload:
|
|
|
self.device.status = 1
|
|
|
elif payload['status']=='disable':
|
|
|
self.device.status = 2
|
|
|
else:
|
|
|
self.device.status = 1
|
|
|
self.device.save()
|
|
|
self.message = 'USRP Tx status: {}'.format(payload['status'])
|
|
|
return False
|
|
|
except Exception as e:
|
|
|
if 'No route to host' not in str(e):
|
|
|
self.device.status = 4
|
|
|
self.device.save()
|
|
|
self.message = 'USRP Tx status: {}'.format(str(e))
|
|
|
return False
|
|
|
|
|
|
self.device.save()
|
|
|
return True
|
|
|
|
|
|
def reset_device(self):
|
|
|
|
|
|
try:
|
|
|
payload = self.request('reset', 'post')
|
|
|
if payload['reset']=='ok':
|
|
|
self.message = 'USRP Tx restarted OK'
|
|
|
self.device.status = 2
|
|
|
self.device.save()
|
|
|
else:
|
|
|
self.message = 'USRP Tx restart fail'
|
|
|
self.device.status = 4
|
|
|
self.device.save()
|
|
|
except Exception as e:
|
|
|
self.message = 'USRP Tx reset: {}'.format(str(e))
|
|
|
return False
|
|
|
|
|
|
return True
|
|
|
|
|
|
def stop_device(self):
|
|
|
|
|
|
try:
|
|
|
command = self.device.url() + "stoptx"
|
|
|
r = requests.get(command)
|
|
|
if r:
|
|
|
self.device.status = 4
|
|
|
self.device.save()
|
|
|
self.message = 'USRP stopped'
|
|
|
else:
|
|
|
self.device.status = 4
|
|
|
self.device.save()
|
|
|
return False
|
|
|
except Exception as e:
|
|
|
if 'No route to host' not in str(e):
|
|
|
self.device.status = 4
|
|
|
else:
|
|
|
self.device.status = 0
|
|
|
#self.message = 'USRP Tx stop: {}'.format(str(e))
|
|
|
self.message = "USRP Tx can't start, please check network/device connection or IP address/port configuration"
|
|
|
self.device.save()
|
|
|
return False
|
|
|
|
|
|
return True
|
|
|
|
|
|
def start_device(self):
|
|
|
|
|
|
try:
|
|
|
usrp = USRPTXConfiguration.objects.get(pk=self)
|
|
|
usrp_daughterboard = usrp.get_daughterboard_display()
|
|
|
usrp_antenna = usrp.get_antenna_display()
|
|
|
|
|
|
if usrp.enable_2 is True:
|
|
|
payload = {'ip_address': usrp.ip_address, 'daughterboard': usrp_daughterboard, 'antenna': usrp_antenna,
|
|
|
'frequency': usrp.frequency, 'sample_rate': usrp.samplerate, 'ipp': usrp.ipp, 'delay': usrp.delay,
|
|
|
'pulse_1': usrp.pulse_1, 'code_type_1': usrp.code_type_1.name, 'code_1': usrp.code_1,
|
|
|
'repetitions_1': usrp.repetitions_1, 'enable_2': usrp.enable_2, 'pulse_2': usrp.pulse_2, 'code_type_2': usrp.code_type_2.name,
|
|
|
'code_2': usrp.code_2, 'repetitions_2': usrp.repetitions_2}
|
|
|
else:
|
|
|
payload = {'ip_address': usrp.ip_address, 'daughterboard': usrp_daughterboard, 'antenna': usrp_antenna,
|
|
|
'frequency': usrp.frequency, 'sample_rate': usrp.samplerate, 'ipp': usrp.ipp, 'delay': usrp.delay,
|
|
|
'pulse_1': usrp.pulse_1, 'code_type_1': usrp.code_type_1.name, 'code_1': usrp.code_1,
|
|
|
'repetitions_1': usrp.repetitions_1, 'enable_2': usrp.enable_2}
|
|
|
|
|
|
r = requests.post(self.device.url("usrptx"), json=payload)
|
|
|
|
|
|
if r:
|
|
|
self.device.status = 3
|
|
|
self.device.save()
|
|
|
self.message = 'USRP Tx configured and started'
|
|
|
else:
|
|
|
return False
|
|
|
except Exception as e:
|
|
|
if 'No route to host' not in str(e):
|
|
|
self.device.status = 4
|
|
|
else:
|
|
|
self.device.status = 0
|
|
|
#self.message = 'USRP Tx start: {}'.format(str(e))
|
|
|
self.message = "USRP Tx can't start, please check network/device connection or IP address/port configuration"
|
|
|
self.device.save()
|
|
|
return False
|
|
|
|
|
|
return True
|
|
|
|
|
|
def get_absolute_url_import(self):
|
|
|
return reverse('url_import_usrp_tx_conf', args=[str(self.id)])
|