import ast import json import requests import base64 from struct import pack from django.db import models from django.urls import reverse from django.core.validators import MinValueValidator, MaxValueValidator from apps.main.models import Configuration BOARD_VALUE = ( ('1', 'A:AB'), ('2', 'AB') ) ANT_VALUE = ( ('1', 'RX'), ('2', 'TX'), ('3', 'TX/RX') ) class TXCode(models.Model): name = models.CharField(max_length=40) bits_per_code = models.PositiveIntegerField(default=0) number_of_codes = models.PositiveIntegerField(default=0) codes = models.TextField(blank=True, null=True) class Meta: db_table = 'tx_codes' ordering = ('name',) def __str__(self): return u'%s' % self.name class USRPTXConfiguration(Configuration): ip_address = models.GenericIPAddressField( verbose_name = 'IP address', protocol='IPv4', default='0.0.0.0') daughterboard = models.CharField( verbose_name='Daughterboard', max_length=3, choices=BOARD_VALUE, null=False, blank=False ) antenna = models.CharField( verbose_name='Antenna', max_length=3, choices=ANT_VALUE, null=False, blank=False ) frequency = models.FloatField( verbose_name='Frequency [MHz]', validators=[MinValueValidator(0.1)], blank=False, null=False, help_text='Introduce the value in MHz (10^6)' ) samplerate = models.FloatField( verbose_name='Sample rate [MHz]', validators=[MinValueValidator(0.1)], blank=False, null=False, help_text='Introduce the value in MHz (10^6)' ) ipp = models.FloatField( verbose_name='IPP [km]', validators=[MinValueValidator(0.1)], blank=False, null=False, help_text='Introduce the value in km' ) delay = models.FloatField( verbose_name='Delay [us]', validators=[MinValueValidator(0.0)], null=False, help_text='Introduce the value in µs' ) pulse_1 = models.FloatField( verbose_name='Pulse Width 1 [us]', validators=[MinValueValidator(0.1)], blank=False, null=False, help_text='Introduce the value in us' ) code_type_1 = models.ForeignKey( TXCode, on_delete=models.CASCADE, related_name='%(class)s_code_1', verbose_name="Code Type 1" ) code_1 = models.TextField( verbose_name="Code 1", blank=True, null=True, help_text="binary code" ) repetitions_1 = models.IntegerField( verbose_name='NTX 1', default=1, blank=True, null=True ) enable_2 = models.BooleanField( verbose_name='Pulse 2', blank=False, null=False ) pulse_2 = models.FloatField( verbose_name='Pulse Width 2 [us]', default=1, blank=True, null=True, help_text='Introduce the value in µs' ) code_type_2 = models.ForeignKey( TXCode, on_delete=models.CASCADE, related_name='%(class)s_code_2', verbose_name="Code Type 2", blank=True, null=True, ) code_2 = models.TextField( verbose_name="Code 2", blank=True, null=True, help_text="Introduce the binary code" ) repetitions_2 = models.IntegerField( verbose_name='NTX 2', default=0, blank=True, null=True ) class Meta: db_table = 'usrp_tx_configurations' def __str__(self): if not self.enable_2: return 'TX with IPP={}km, Pulse 1: PW={} us, CODE={}'.format(self.ipp, self.pulse_1, self.code_type_1) else: return 'TX with IPP={}km, Pulse 1: PW={} us, CODE={} Pulse 2: PW={} us, CODE={}'.format(self.ipp, self.pulse_1, self.code_type_1, self.pulse_2, self.code_type_2) def get_absolute_url_plot(self): return reverse('url_plot_usrp_tx_pulses', args=[str(self.id)]) def request(self, cmd, method='get', **kwargs): req = getattr(requests, method)(self.device.url(cmd), **kwargs) payload = req.json() return payload def status_device(self): try: payload = requests.get(self.device.url()) if payload: self.device.status = 1 elif payload['status']=='disable': self.device.status = 2 else: self.device.status = 1 self.device.save() self.message = 'USRP Tx status: {}'.format(payload['status']) return False except Exception as e: if 'No route to host' not in str(e): self.device.status = 4 self.device.save() self.message = 'USRP Tx status: {}'.format(str(e)) return False self.device.save() return True def reset_device(self): try: payload = self.request('reset', 'post') if payload['reset']=='ok': self.message = 'USRP Tx restarted OK' self.device.status = 2 self.device.save() else: self.message = 'USRP Tx restart fail' self.device.status = 4 self.device.save() except Exception as e: self.message = 'USRP Tx reset: {}'.format(str(e)) return False return True def stop_device(self): try: command = self.device.url() + "stoptx" r = requests.get(command) if r: self.device.status = 4 self.device.save() self.message = 'USRP stopped' else: self.device.status = 4 self.device.save() return False except Exception as e: if 'No route to host' not in str(e): self.device.status = 4 else: self.device.status = 0 #self.message = 'USRP Tx stop: {}'.format(str(e)) self.message = "USRP Tx can't start, please check network/device connection or IP address/port configuration" self.device.save() return False return True def start_device(self): try: usrp = USRPTXConfiguration.objects.get(pk=self) usrp_daughterboard = usrp.get_daughterboard_display() usrp_antenna = usrp.get_antenna_display() if usrp.enable_2 is True: payload = {'ip_address': usrp.ip_address, 'daughterboard': usrp_daughterboard, 'antenna': usrp_antenna, 'frequency': usrp.frequency, 'sample_rate': usrp.samplerate, 'ipp': usrp.ipp, 'delay': usrp.delay, 'pulse_1': usrp.pulse_1, 'code_type_1': usrp.code_type_1.name, 'code_1': usrp.code_1, 'repetitions_1': usrp.repetitions_1, 'enable_2': usrp.enable_2, 'pulse_2': usrp.pulse_2, 'code_type_2': usrp.code_type_2.name, 'code_2': usrp.code_2, 'repetitions_2': usrp.repetitions_2} else: payload = {'ip_address': usrp.ip_address, 'daughterboard': usrp_daughterboard, 'antenna': usrp_antenna, 'frequency': usrp.frequency, 'sample_rate': usrp.samplerate, 'ipp': usrp.ipp, 'delay': usrp.delay, 'pulse_1': usrp.pulse_1, 'code_type_1': usrp.code_type_1.name, 'code_1': usrp.code_1, 'repetitions_1': usrp.repetitions_1, 'enable_2': usrp.enable_2} r = requests.post(self.device.url("usrptx"), json=payload) if r: self.device.status = 3 self.device.save() self.message = 'USRP Tx configured and started' else: return False except Exception as e: if 'No route to host' not in str(e): self.device.status = 4 else: self.device.status = 0 #self.message = 'USRP Tx start: {}'.format(str(e)) self.message = "USRP Tx can't start, please check network/device connection or IP address/port configuration" self.device.save() return False return True def get_absolute_url_import(self): return reverse('url_import_usrp_tx_conf', args=[str(self.id)])