##// END OF EJS Templates
Add power monitor
Add power monitor

File last commit:

r385:9039baf7f871
r393:737695221ef9
Show More
models.py
286 lines | 8.4 KiB | text/x-python | PythonLexer
import ast
import json
import requests
import base64
from struct import pack
from django.db import models
from django.urls import reverse
from django.core.validators import MinValueValidator, MaxValueValidator
from apps.main.models import Configuration
BOARD_VALUE = (
('1', 'A:AB'),
('2', 'AB')
)
ANT_VALUE = (
('1', 'RX'),
('2', 'TX'),
('3', 'TX/RX')
)
class TXCode(models.Model):
name = models.CharField(max_length=40)
bits_per_code = models.PositiveIntegerField(default=0)
number_of_codes = models.PositiveIntegerField(default=0)
codes = models.TextField(blank=True, null=True)
class Meta:
db_table = 'tx_codes'
ordering = ('name',)
def __str__(self):
return u'%s' % self.name
class USRPTXConfiguration(Configuration):
ip_address = models.GenericIPAddressField(
verbose_name = 'IP address',
protocol='IPv4',
default='0.0.0.0')
daughterboard = models.CharField(
verbose_name='Daughterboard',
max_length=3,
choices=BOARD_VALUE,
null=False,
blank=False
)
antenna = models.CharField(
verbose_name='Antenna',
max_length=3,
choices=ANT_VALUE,
null=False,
blank=False
)
frequency = models.FloatField(
verbose_name='Frequency [MHz]',
validators=[MinValueValidator(0.1)],
blank=False,
null=False,
help_text='Introduce the value in MHz (10^6)'
)
samplerate = models.FloatField(
verbose_name='Sample rate [MHz]',
validators=[MinValueValidator(0.1)],
blank=False,
null=False,
help_text='Introduce the value in MHz (10^6)'
)
ipp = models.FloatField(
verbose_name='IPP [km]',
validators=[MinValueValidator(0.1)],
blank=False,
null=False,
help_text='Introduce the value in km'
)
delay = models.FloatField(
verbose_name='Delay [us]',
validators=[MinValueValidator(0.0)],
null=False,
help_text='Introduce the value in µs'
)
pulse_1 = models.FloatField(
verbose_name='Pulse Width 1 [us]',
validators=[MinValueValidator(0.1)],
blank=False,
null=False,
help_text='Introduce the value in us'
)
code_type_1 = models.ForeignKey(
TXCode,
on_delete=models.CASCADE,
related_name='%(class)s_code_1',
verbose_name="Code Type 1"
)
code_1 = models.TextField(
verbose_name="Code 1",
blank=True,
null=True,
help_text="binary code"
)
repetitions_1 = models.IntegerField(
verbose_name='NTX 1',
default=1,
blank=True,
null=True
)
enable_2 = models.BooleanField(
verbose_name='Pulse 2',
blank=False,
null=False
)
pulse_2 = models.FloatField(
verbose_name='Pulse Width 2 [us]',
default=1,
blank=True,
null=True,
help_text='Introduce the value in µs'
)
code_type_2 = models.ForeignKey(
TXCode,
on_delete=models.CASCADE,
related_name='%(class)s_code_2',
verbose_name="Code Type 2",
blank=True,
null=True,
)
code_2 = models.TextField(
verbose_name="Code 2",
blank=True,
null=True,
help_text="Introduce the binary code"
)
repetitions_2 = models.IntegerField(
verbose_name='NTX 2',
default=0,
blank=True,
null=True
)
class Meta:
db_table = 'usrp_tx_configurations'
def __str__(self):
if not self.enable_2:
return 'TX with IPP={}km, Pulse 1: PW={} us, CODE={}'.format(self.ipp, self.pulse_1, self.code_type_1)
else:
return 'TX with IPP={}km, Pulse 1: PW={} us, CODE={} Pulse 2: PW={} us, CODE={}'.format(self.ipp, self.pulse_1, self.code_type_1, self.pulse_2, self.code_type_2)
def get_absolute_url_plot(self):
return reverse('url_plot_usrp_tx_pulses', args=[str(self.id)])
def request(self, cmd, method='get', **kwargs):
req = getattr(requests, method)(self.device.url(cmd), **kwargs)
payload = req.json()
return payload
def status_device(self):
try:
payload = requests.get(self.device.url())
if payload:
self.device.status = 1
elif payload['status']=='disable':
self.device.status = 2
else:
self.device.status = 1
self.device.save()
self.message = 'USRP Tx status: {}'.format(payload['status'])
return False
except Exception as e:
if 'No route to host' not in str(e):
self.device.status = 4
self.device.save()
self.message = 'USRP Tx status: {}'.format(str(e))
return False
self.device.save()
return True
def reset_device(self):
try:
payload = self.request('reset', 'post')
if payload['reset']=='ok':
self.message = 'USRP Tx restarted OK'
self.device.status = 2
self.device.save()
else:
self.message = 'USRP Tx restart fail'
self.device.status = 4
self.device.save()
except Exception as e:
self.message = 'USRP Tx reset: {}'.format(str(e))
return False
return True
def stop_device(self):
try:
command = self.device.url() + "stoptx"
r = requests.get(command)
if r:
self.device.status = 4
self.device.save()
self.message = 'USRP stopped'
else:
self.device.status = 4
self.device.save()
return False
except Exception as e:
if 'No route to host' not in str(e):
self.device.status = 4
else:
self.device.status = 0
#self.message = 'USRP Tx stop: {}'.format(str(e))
self.message = "USRP Tx can't start, please check network/device connection or IP address/port configuration"
self.device.save()
return False
return True
def start_device(self):
try:
usrp = USRPTXConfiguration.objects.get(pk=self)
usrp_daughterboard = usrp.get_daughterboard_display()
usrp_antenna = usrp.get_antenna_display()
if usrp.enable_2 is True:
payload = {'ip_address': usrp.ip_address, 'daughterboard': usrp_daughterboard, 'antenna': usrp_antenna,
'frequency': usrp.frequency, 'sample_rate': usrp.samplerate, 'ipp': usrp.ipp, 'delay': usrp.delay,
'pulse_1': usrp.pulse_1, 'code_type_1': usrp.code_type_1.name, 'code_1': usrp.code_1,
'repetitions_1': usrp.repetitions_1, 'enable_2': usrp.enable_2, 'pulse_2': usrp.pulse_2, 'code_type_2': usrp.code_type_2.name,
'code_2': usrp.code_2, 'repetitions_2': usrp.repetitions_2}
else:
payload = {'ip_address': usrp.ip_address, 'daughterboard': usrp_daughterboard, 'antenna': usrp_antenna,
'frequency': usrp.frequency, 'sample_rate': usrp.samplerate, 'ipp': usrp.ipp, 'delay': usrp.delay,
'pulse_1': usrp.pulse_1, 'code_type_1': usrp.code_type_1.name, 'code_1': usrp.code_1,
'repetitions_1': usrp.repetitions_1, 'enable_2': usrp.enable_2}
r = requests.post(self.device.url("usrptx"), json=payload)
if r:
self.device.status = 3
self.device.save()
self.message = 'USRP Tx configured and started'
else:
return False
except Exception as e:
if 'No route to host' not in str(e):
self.device.status = 4
else:
self.device.status = 0
#self.message = 'USRP Tx start: {}'.format(str(e))
self.message = "USRP Tx can't start, please check network/device connection or IP address/port configuration"
self.device.save()
return False
return True
def get_absolute_url_import(self):
return reverse('url_import_usrp_tx_conf', args=[str(self.id)])