import os import json import requests import time from datetime import datetime import base64 try: from polymorphic.models import PolymorphicModel except: from polymorphic import PolymorphicModel from django.template.base import kwarg_re from django.db import models from django.urls import reverse from django.core.validators import MinValueValidator, MaxValueValidator from django.shortcuts import get_object_or_404 from django.contrib.auth.models import User from django.db.models.signals import post_save from django.dispatch import receiver from apps.main.utils import Params DEV_PORTS = { 'pedestal' : 80, 'pedestal_dev' : 80, 'generator' : 80, 'usrp_rx' : 2000, 'usrp_tx' : 2000, } RADAR_STATES = ( (0, 'No connected'), (1, 'Connected'), (2, 'Configured'), (3, 'Running'), (4, 'Scheduled'), ) EXPERIMENT_TYPE = ( (0, 'RAW_DATA'), (1, 'PDATA'), ) DECODE_TYPE = ( (0, 'None'), (1, 'TimeDomain'), (2, 'FreqDomain'), (3, 'InvFreqDomain'), ) DEV_STATES = ( (0, 'Unknown'), (1, 'Connected'), (2, 'Configured'), (3, 'Running'), (4, 'Offline'), ) DEV_TYPES = ( ('', 'Select a device type'), ('pedestal', 'Pedestal Controller'), ('pedestal_dev', 'Pedestal Controller Dev Mode'), ('generator', 'Pulse Generator'), ('usrp_rx', 'Universal Software Radio Peripheral Rx'), ('usrp_tx', 'Universal Software Radio Peripheral Tx'), ) EXP_STATES = ( (0,'Error'), #RED (1,'Cancelled'), #YELLOW (2,'Running'), #GREEN (3,'Scheduled'), #BLUE (4,'Unknown'), #WHITE (5,'Other'), #ORANGE ) CONF_TYPES = ( (0, 'Active'), (1, 'Historical'), ) class Profile(models.Model): user = models.OneToOneField(User, on_delete=models.CASCADE) theme = models.CharField(max_length=30, default='spacelab') @receiver(post_save, sender=User) def create_user_profile(sender, instance, created, **kwargs): if created: Profile.objects.create(user=instance) @receiver(post_save, sender=User) def save_user_profile(sender, instance, **kwargs): instance.profile.save() class DeviceType(models.Model): name = models.CharField(max_length = 15, choices = DEV_TYPES, default = 'pedestal') sequence = models.PositiveSmallIntegerField(default=55) description = models.TextField(blank=True, null=True) class Meta: db_table = 'db_device_types' def __str__(self): return u'%s' % self.name.title() class Device(models.Model): device_type = models.ForeignKey('DeviceType', on_delete=models.CASCADE) ip_address = models.GenericIPAddressField(verbose_name = 'IP address', protocol='IPv4', default='0.0.0.0') port_address = models.PositiveSmallIntegerField(default=2000) description = models.TextField(blank=True, null=True) status = models.PositiveSmallIntegerField(default=4, choices=DEV_STATES) conf_active = models.PositiveIntegerField(default=0, verbose_name='Current configuration') class Meta: db_table = 'db_devices' def __str__(self): ret = self.device_type return str(ret) @property def name(self): return str(self) def get_status(self): return self.status @property def status_color(self): color = 'muted' if self.status == 0: color = "danger" elif self.status == 1: color = "primary" elif self.status == 2: color = "info" elif self.status == 3: color = "success" return color def url(self, path=None): if path: return 'http://{}:{}/{}/'.format(self.ip_address, self.port_address, path) else: return 'http://{}:{}/'.format(self.ip_address, self.port_address) def get_absolute_url(self): return reverse('url_device', args=[str(self.id)]) def get_absolute_url_edit(self): return reverse('url_edit_device', args=[str(self.id)]) def get_absolute_url_delete(self): return reverse('url_delete_device', args=[str(self.id)]) def change_ip(self, ip_address, mask, gateway, dns, **kwargs): if self.device_type.name=='pedestal': headers = {'content-type': "application/json", 'cache-control': "no-cache"} ip = [int(x) for x in ip_address.split('.')] dns = [int(x) for x in dns.split('.')] gateway = [int(x) for x in gateway.split('.')] subnet = [int(x) for x in mask.split('.')] payload = { "ip": ip, "dns": dns, "gateway": gateway, "subnet": subnet } req = requests.post(self.url('changeip'), data=json.dumps(payload), headers=headers) try: answer = req.json() if answer['changeip']=='ok': self.message = '25|IP succesfully changed' self.ip_address = ip_address self.save() else: self.message = '30|An error ocuur when changing IP' except Exception as e: self.message = '40|{}'.format(str(e)) else: self.message = 'Not implemented' return False return True class Experiment(PolymorphicModel): name = models.CharField(max_length=40, default='', unique=True) pedestal = models.ForeignKey('pedestal.PedestalConfiguration', null=False, blank=False, on_delete=models.PROTECT, default=None, limit_choices_to={'type': 0}, related_name = "pedestal_conf") generator = models.ForeignKey('Device', null=False, blank=False, on_delete=models.PROTECT, default=2, editable=False, limit_choices_to={'device_type__name': 'generator'}, related_name = "generator_conf") reception_rx = models.ForeignKey('usrp_rx.USRPRXConfiguration', null=False, blank=False, on_delete=models.PROTECT, default=None, limit_choices_to={'type': 0}, related_name = "usrp_rx_CONF") transmission_tx = models.ForeignKey('usrp_tx.USRPTXConfiguration', null=False, blank=False, on_delete=models.PROTECT, default=None, limit_choices_to={'type': 0}, related_name = "usrp_tx") task = models.CharField(max_length=36, default='', blank=True, null=True) status = models.PositiveSmallIntegerField(default=4, choices=EXP_STATES) author = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE) hash = models.CharField(default='', max_length=64, null=True, blank=True) latitude = models.FloatField(null=False, blank=False, default=0, editable=False) longitude = models.FloatField(null=False, blank=False, default=0, editable=False) altitude = models.FloatField(null=False, blank=False, default=0, editable=False) heading = models.FloatField(null=True, blank=True) mode_stop = models.CharField(max_length=3, null=False, blank=False, default='web', editable=False) class Meta: db_table = 'db_experiments' ordering = ('name',) def __str__(self): return u'%s' % (self.name) def jsonify(self): data = {} ignored = [] for field in self._meta.fields: if field.name in ignored: continue data[field.name] = field.value_from_object(self) data['configurations'] = ['{}'.format(conf.pk) for conf in Configuration.objects.filter(experiment=self, type=0)] return data def clone(self, **kwargs): confs = Configuration.objects.filter(experiment=self, type=0) self.pk = None self.name = '{}_{:%y%m%d}'.format(self.name, datetime.now()) for attr, value in kwargs.items(): setattr(self, attr, value) self.save() for conf in confs: conf.clone(experiment=self) return self def generator_start(self): try: experiment = get_object_or_404(Experiment, pk=self.id) generator_url = experiment.generator.url() period = self.transmission_tx.ipp*2/0.3 if self.transmission_tx.enable_2: payload = {"Delay": 1 + self.transmission_tx.delay, "periode1": period, "width1": self.transmission_tx.pulse_1 + 6, "repeatability1": self.transmission_tx.repetitions_1, "periode2": period, "width2": self.transmission_tx.pulse_2 + 6, "repeatability2": self.transmission_tx.repetitions_2, "enable": 1} else: payload = {"Delay": 1 + self.transmission_tx.delay, "periode1": period, "width1": self.transmission_tx.pulse_1 + 6, "repeatability1": 1, "periode2": period, "width2": self.transmission_tx.pulse_1 + 6, "repeatability2": 1, "enable": 1} json_trmode = json.dumps(payload) base64_trmode = base64.standard_b64encode(json_trmode.encode('ascii')) trmode_url = generator_url + "trmode?params=" complete_url_trmode = trmode_url + base64_trmode.decode('ascii') requests.get(complete_url_trmode) except: return False return True def generator_stop(self): try: experiment = get_object_or_404(Experiment, pk=self.id) generator_url = experiment.generator.url() payload = {"enable": 0} json_trmode_selector = json.dumps(payload) base64_trmode_selector = base64.standard_b64encode(json_trmode_selector.encode('ascii')) trmode_url = generator_url + "trmode?params=" url_trmode_selector = trmode_url + base64_trmode_selector.decode('ascii') requests.get(url_trmode_selector) except: return False return True def start(self): ''' Configure and start experiments's devices ''' all_status = Experiment.objects.filter(status=2) if self.status != 2: if len(all_status) == 0: data = { 'name': '{}@{}'.format(self.name, datetime.now().strftime('%Y-%m-%dT%H-%M-%S')), 'latitude': self.latitude, 'longitude': self.longitude, 'altitude': self.altitude, 'heading': self.heading } try: data['pedestal'] = self.pedestal.start_device(name_experiment=data['name']) time.sleep(1.0) self.generator_start() time.sleep(1.0) data['usrp_tx'] = self.transmission_tx.start_device(name_experiment=data['name']) time.sleep(1.0) data['usrp_rx'] = self.reception_rx.start_device(name_experiment=data['name']) time.sleep(0.1) proc_url = 'http://'+os.environ['PROC_SITE']+'/start' requests.post(proc_url, json=data) except: return 0 return 2 else: return 5 else: return 2 def stop(self): ''' Stop experiments's devices PEDESTAL, GENERATOR & USRP's ''' try: self.transmission_tx.stop_device() time.sleep(1.0) self.generator_stop() time.sleep(0.1) self.reception_rx.stop_device() time.sleep(0.1) self.pedestal.reset_device() time.sleep(14) self.pedestal.stop_device() time.sleep(0.1) proc_url = 'http://'+os.environ['PROC_SITE']+'/stop' requests.get(proc_url) except: return 0 return 4 def get_status(self): if self.status == 3: return confs = Configuration.objects.filter(experiment=self, type=0) for conf in confs: conf.status_device() total = confs.aggregate(models.Sum('device__status'))['device__status__sum'] if total==2*confs.count(): status = 1 elif total == 3*confs.count(): status = 2 else: status = 0 self.status = status self.save() def status_color(self): color = 'muted' if self.status == 0: color = "danger" elif self.status == 1: color = "warning" elif self.status == 2: color = "success" elif self.status == 3: color = "info" return color def parms_to_dict(self): params = Params({}) params.add(self.jsonify(), 'experiments') configurations = Configuration.objects.filter(experiment=self, type=0) for conf in configurations: params.add(conf.jsonify(), 'configurations') return params.data def dict_to_parms(self, parms, CONF_MODELS, id_exp=None): configurations = Configuration.objects.filter(experiment=self) if id_exp is not None: exp_parms = parms['experiments']['byId'][id_exp] else: exp_parms = parms['experiments']['byId'][parms['experiments']['allIds'][0]] if configurations: for configuration in configurations: configuration.delete() for id_conf in exp_parms['configurations']: conf_parms = parms['configurations']['byId'][id_conf] device = Device.objects.filter(device_type__name=conf_parms['device_type'])[0] model = CONF_MODELS[conf_parms['device_type']] conf = model( experiment = self, device = device, ) conf.dict_to_parms(parms, id=id_conf) self.name = '{}-{}'.format(exp_parms['name'], datetime.now().strftime('%y%m%d')) self.save() return self def get_absolute_url(self): return reverse('url_experiment', args=[str(self.id)]) def get_absolute_url_edit(self): return reverse('url_edit_experiment', args=[str(self.id)]) def get_absolute_url_delete(self): return reverse('url_delete_experiment', args=[str(self.id)]) def get_absolute_url_import(self): return reverse('url_import_experiment', args=[str(self.id)]) def get_absolute_url_export(self): return reverse('url_export_experiment', args=[str(self.id)]) def get_absolute_url_start(self): return reverse('url_start_experiment', args=[str(self.id)]) def get_absolute_url_stop(self): return reverse('url_stop_experiment', args=[str(self.id)]) class Configuration(PolymorphicModel): id = models.AutoField(primary_key=True) experiment = models.CharField(default='empty', editable=False, max_length=64, null=True, blank=True) experiment_date = models.DateTimeField(default=datetime.now, editable=False) device = models.ForeignKey('Device', verbose_name='Device', null=True, on_delete=models.CASCADE) type = models.PositiveSmallIntegerField(default=0, choices=CONF_TYPES) created_date = models.DateTimeField(auto_now_add=True) programmed_date = models.DateTimeField(auto_now=True) parameters = models.TextField(default='{}') author = models.ForeignKey(User, null=True, blank=True,on_delete=models.CASCADE) hash = models.CharField(default='', max_length=64, null=True, blank=True) message = "" class Meta: db_table = 'db_configurations' ordering = ('device__device_type__name',) def __str__(self): ret = u'{} '.format(self.device.device_type.name.upper()) if 'mix' in [f.name for f in self._meta.get_fields()]: if self.mix: ret = '{} MIX '.format(self.device.device_type.name.upper()) if 'label' in [f.name for f in self._meta.get_fields()]: ret += '{}'.format(self.label) return ret @property def name(self): return str(self) @property def label(self): return str(self) def jsonify(self): data = {} ignored = ('type', 'polymorphic_ctype', 'configuration_ptr', 'created_date', 'programmed_date', 'device', 'experiment', 'author') for field in self._meta.fields: if field.name in ignored: continue data[field.name] = field.value_from_object(self) data['device_type'] = self.device.device_type.name return data def clone(self, **kwargs): before_id = self.id self.pk = None self.id = None for attr, value in kwargs.items(): setattr(self, attr, value) self.save() self.id = before_id return self def parms_to_dict(self): params = Params({}) params.add(self.jsonify(), 'configurations') return params.data def parms_to_text(self): raise NotImplementedError("This method should be implemented in %s Configuration model" %str(self.device.device_type.name).upper()) def parms_to_binary(self): raise NotImplementedError("This method should be implemented in %s Configuration model" %str(self.device.device_type.name).upper()) def dict_to_parms(self, parameters, id=None): params = Params(parameters) if id: data = params.get_conf(id_conf=id) else: data = params.get_conf(dtype=self.device.device_type.name) for key, value in data.items(): if key not in ('id', 'device_type'): setattr(self, key, value) self.save() def export_to_file(self, format="json"): content_type = '' if format == 'racp': content_type = 'text/plain' filename = '%s_%s.%s' %(self.device.device_type.name, self.name, 'racp') content = self.parms_to_text(file_format = 'racp') if format == 'text': content_type = 'text/plain' filename = '%s_%s.%s' %(self.device.device_type.name, self.name, self.device.device_type.name) content = self.parms_to_text() if format == 'binary': content_type = 'application/octet-stream' filename = '%s_%s.bin' %(self.device.device_type.name, self.name) content = self.parms_to_binary() if not content_type: content_type = 'application/json' filename = '%s_%s.json' %(self.device.device_type.name, self.name) content = json.dumps(self.parms_to_dict(), indent=2) fields = {'content_type':content_type, 'filename':filename, 'content':content } return fields def import_from_file(self, fp): parms = {} path, ext = os.path.splitext(fp.name) if ext == '.json': parms = json.load(fp) return parms def status_device(self): self.message = 'Function not supported' return False def stop_device(self): self.message = 'Function not supported' return False def start_device(self): self.message = 'Function not supported' return False def write_device(self): self.message = 'Function not supported' return False def read_device(self): self.message = 'Function not supported' return False def get_absolute_url(self): return reverse('url_%s_conf' % self.device.device_type.name, args=[str(self.id)]) def get_absolute_url_edit(self): return reverse('url_edit_%s_conf' % self.device.device_type.name, args=[str(self.id)]) def get_absolute_url_delete(self): return reverse('url_delete_dev_conf', args=[str(self.id)]) def get_absolute_url_import(self): return reverse('url_import_dev_conf', args=[str(self.id)]) def get_absolute_url_export(self): return reverse('url_export_dev_conf', args=[str(self.id)]) def get_absolute_url_write(self): return reverse('url_write_dev_conf', args=[str(self.id)]) def get_absolute_url_read(self): return reverse('url_read_dev_conf', args=[str(self.id)]) def get_absolute_url_start(self): return reverse('url_start_dev_conf', args=[str(self.id)]) def get_absolute_url_stop(self): return reverse('url_stop_dev_conf', args=[str(self.id)]) def get_absolute_url_status(self): return reverse('url_status_dev_conf', args=[str(self.id)])