import ast import json import numpy as np from polymorphic import PolymorphicModel from django.db import models from django.core.validators import MinValueValidator, MaxValueValidator from apps.main.models import Configuration from .utils import pulses, pulses_from_code, create_mask # Create your models here. LINE_TYPES = ( ('none', 'Not used'), ('tr', 'Transmission/reception selector signal'), ('tx', 'A modulating signal (Transmission pulse)'), ('codes', 'BPSK modulating signal'), ('windows', 'Sample window signal'), ('sync', 'Synchronizing signal'), ('flip', 'IPP related periodic signal'), ('prog_pulses', 'Programmable pulse'), ) class RCConfiguration(Configuration): ipp = models.FloatField(verbose_name='Inter pulse period (Km)', validators=[MinValueValidator(1), MaxValueValidator(1000)], default=10) ntx = models.PositiveIntegerField(verbose_name='Number of TX', default=1) clock = models.FloatField(verbose_name='Clock Master (MHz)', validators=[MinValueValidator(1), MaxValueValidator(80)], default=1) clock_divider = models.PositiveIntegerField(verbose_name='Clock divider', validators=[MinValueValidator(1), MaxValueValidator(256)], default=1) time_before = models.PositiveIntegerField(verbose_name='Time before', default=0) time_after = models.PositiveIntegerField(verbose_name='Time after', default=0) sync = models.PositiveIntegerField(verbose_name='Synchro delay', default=0) class Meta: db_table = 'rc_configurations' def get_number_position(self): lines = RCLine.objects.filter(rc_configuration=self.rc_configuration) if lines: return max([line.position for line in lines]) def get_refs_lines(self): lines = RCLine.objects.filter(rc_configuration=self.pk, line_type__name='tx') return [(line.pk, line.get_name()) for line in lines] def get_lines(self, type=None): if type is not None: return RCLine.objects.filter(rc_configuration=self.pk, line_type__name=type) else: return RCLine.objects.filter(rc_configuration=self.pk) class RCLineCode(models.Model): name = models.CharField(max_length=40) bits_per_code = models.PositiveIntegerField(default=0) number_of_codes = models.PositiveIntegerField(default=0) codes = models.TextField(blank=True, null=True) class Meta: db_table = 'rc_line_codes' ordering = ('name',) def __unicode__(self): return u'%s' % self.name class RCLineType(models.Model): name = models.CharField(choices=LINE_TYPES, max_length=40) description = models.TextField(blank=True, null=True) params = models.TextField(default='[]') class Meta: db_table = 'rc_line_types' def __unicode__(self): return u'%s - %s' % (self.name.upper(), self.get_name_display()) class RCLine(models.Model): rc_configuration = models.ForeignKey(RCConfiguration) line_type = models.ForeignKey(RCLineType) channel = models.PositiveIntegerField(default=0) position = models.PositiveIntegerField(default=0) params = models.TextField(default='{}') pulses = models.TextField(default='') class Meta: db_table = 'rc_lines' ordering = ['channel'] def __unicode__(self): return u'%s - %s' % (self.rc_configuration, self.get_name()) def get_name(self): chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' if self.line_type.name in ('tx',): return '%s %s' % (self.line_type.name.upper(), chars[self.position]) elif self.line_type.name in ('codes', 'windows', 'tr'): pk = json.loads(self.params)['TX_ref'] if pk in (0, '0'): refs = ','.join(chars[l.position] for l in self.rc_configuration.get_lines('tx')) return '%s (%s)' % (self.line_type.name.upper(), refs) else: ref = RCLine.objects.get(pk=pk) return '%s (%s)' % (self.line_type.name.upper(), chars[ref.position]) elif self.line_type.name in ('flip', 'prog_pulses', 'sync'): return '%s %s' % (self.line_type.name.upper(), self.channel) else: return self.line_type.name.upper() def get_lines(self, type=None): if type is not None: return RCLine.objects.filter(rc_configuration=self.rc_configuration, line_type__name=type) else: return RCLine.objects.filter(rc_configuration=self.rc_configuration) def pulses_as_array(self): return (np.fromstring(self.pulses, dtype=np.uint8)-48).astype(np.int8) @property def km2unit(self): return 20./3*(self.rc_configuration.clock/self.rc_configuration.clock_divider) def update_pulses(self, save=True, tr=False): print self KM2U = 20./3*(self.rc_configuration.clock/self.rc_configuration.clock_divider) ipp = self.rc_configuration.ipp ntx = self.rc_configuration.ntx ipp_u = int(ipp*KM2U) x = np.arange(0, ipp_u*ntx) if self.line_type.name=='tr': params = json.loads(self.params) if params['TX_ref'] in ('0', 0): txs = [tx.update_pulses(save=False, tr=True) for tx in self.get_lines('tx')] else: txs = [tx.update_pulses(save=False, tr=True) for tx in RCLine.objects.filter(pk=params['TX_ref'])] if len(txs)==0 or 0 in [len(tx) for tx in txs]: return y = np.any(txs, axis=0, out=np.ones(ipp_u*ntx)) ranges = params['range'].split(',') if len(ranges)>0 and ranges[0]<>'0': mask = create_mask(ranges, ipp_u, ntx, self.rc_configuration.sync) y = y.astype(np.int8) & mask elif self.line_type.name=='tx': params = json.loads(self.params) delays = [float(d)*KM2U for d in params['delays'].split(',') if d] y = pulses(x, ipp_u, float(params['pulse_width'])*KM2U, delay=delays, before=self.rc_configuration.time_before, after=self.rc_configuration.time_after if tr else 0, sync=self.rc_configuration.sync) ranges = params['range'].split(',') if len(ranges)>0 and ranges[0]<>'0': mask = create_mask(ranges, ipp_u, ntx, self.rc_configuration.sync) y = y & mask elif self.line_type.name=='flip': width = float(json.loads(self.params)['number_of_flips'])*ipp*KM2U y = pulses(x, 2*width, width) elif self.line_type.name=='codes': params = json.loads(self.params) codes = ast.literal_eval(RCLineCode.objects.get(pk=json.loads(self.params)['code']).codes) tx = RCLine.objects.get(pk=params['TX_ref']) tx_params = json.loads(tx.params) y = pulses_from_code(ipp_u, ntx, codes, int(float(tx_params['pulse_width'])*KM2U), before=self.rc_configuration.time_before+self.rc_configuration.sync) ranges = tx_params['range'].split(',') if len(ranges)>0 and ranges[0]<>'0': mask = create_mask(ranges, ipp_u, ntx, self.rc_configuration.sync) y = y.astype(np.int8) & mask elif self.line_type.name=='sync': params = json.loads(self.params) y = np.zeros(ipp_u*ntx) if params['invert'] in ('1', 1): y[-1] = 1 else: y[0] = 1 elif self.line_type.name=='prog_pulses': params = json.loads(self.params) if int(params['periodic'])==0: nntx = ntx else: nntx = 1 if 'params' in params and len(params['params'])>0: y = sum([pulses(x, ipp_u*nntx, (pp['end']-pp['begin']), shift=pp['begin']) for pp in params['params']]) else: y = np.zeros(ipp_u*ntx) elif self.line_type.name=='windows': params = json.loads(self.params) if 'params' in params and len(params['params'])>0: y = sum([pulses(x, ipp_u, pp['resolution']*pp['number_of_samples']*KM2U, delay=(pp['first_height']-pp['resolution'])*KM2U, before=self.rc_configuration.time_before) for pp in params['params']]) tr = self.get_lines('tr')[0] ranges = json.loads(tr.params)['range'].split(',') if len(ranges)>0 and ranges[0]<>'0': mask = create_mask(ranges, ipp_u, ntx, self.rc_configuration.sync) y = y & mask else: y = np.zeros(ipp_u*ntx) else: y = np.zeros(ipp_u*ntx) if save: self.pulses = (y+48).astype(np.uint8).tostring() self.save() else: return y