models.py
235 lines
| 9.1 KiB
| text/x-python
|
PythonLexer
|
r23 | ||
|
r45 | import ast | |
|
r25 | import json | |
|
r45 | import numpy as np | |
|
r25 | ||
|
r23 | from polymorphic import PolymorphicModel | |
|
r0 | from django.db import models | |
|
r23 | from django.core.validators import MinValueValidator, MaxValueValidator | |
|
r6 | from apps.main.models import Configuration | |
|
r45 | from .utils import pulses, pulses_from_code, create_mask | |
|
r0 | # Create your models here. | |
|
r6 | ||
|
r23 | LINE_TYPES = ( | |
|
r45 | ('none', 'Not used'), | |
|
r23 | ('tr', 'Transmission/reception selector signal'), | |
('tx', 'A modulating signal (Transmission pulse)'), | |||
('codes', 'BPSK modulating signal'), | |||
('windows', 'Sample window signal'), | |||
('sync', 'Synchronizing signal'), | |||
('flip', 'IPP related periodic signal'), | |||
('prog_pulses', 'Programmable pulse'), | |||
) | |||
|
r6 | class RCConfiguration(Configuration): | |
|
r45 | ||
ipp = models.FloatField(verbose_name='Inter pulse period (Km)', validators=[MinValueValidator(1), MaxValueValidator(1000)], default=10) | |||
ntx = models.PositiveIntegerField(verbose_name='Number of TX', default=1) | |||
clock = models.FloatField(verbose_name='Clock Master (MHz)', validators=[MinValueValidator(1), MaxValueValidator(80)], default=1) | |||
clock_divider = models.PositiveIntegerField(verbose_name='Clock divider', validators=[MinValueValidator(1), MaxValueValidator(256)], default=1) | |||
time_before = models.PositiveIntegerField(verbose_name='Time before', default=0) | |||
time_after = models.PositiveIntegerField(verbose_name='Time after', default=0) | |||
sync = models.PositiveIntegerField(verbose_name='Synchro delay', default=0) | |||
|
r6 | ||
class Meta: | |||
db_table = 'rc_configurations' | |||
|
r23 | ||
def get_number_position(self): | |||
lines = RCLine.objects.filter(rc_configuration=self.rc_configuration) | |||
if lines: | |||
return max([line.position for line in lines]) | |||
|
r25 | def get_refs_lines(self): | |
lines = RCLine.objects.filter(rc_configuration=self.pk, line_type__name='tx') | |||
return [(line.pk, line.get_name()) for line in lines] | |||
|
r45 | def get_lines(self, type=None): | |
if type is not None: | |||
return RCLine.objects.filter(rc_configuration=self.pk, line_type__name=type) | |||
else: | |||
return RCLine.objects.filter(rc_configuration=self.pk) | |||
|
r23 | class RCLineCode(models.Model): | |
|
r25 | name = models.CharField(max_length=40) | |
|
r23 | bits_per_code = models.PositiveIntegerField(default=0) | |
number_of_codes = models.PositiveIntegerField(default=0) | |||
codes = models.TextField(blank=True, null=True) | |||
class Meta: | |||
db_table = 'rc_line_codes' | |||
|
r25 | ordering = ('name',) | |
def __unicode__(self): | |||
return u'%s' % self.name | |||
|
r23 | ||
class RCLineType(models.Model): | |||
name = models.CharField(choices=LINE_TYPES, max_length=40) | |||
description = models.TextField(blank=True, null=True) | |||
params = models.TextField(default='[]') | |||
class Meta: | |||
db_table = 'rc_line_types' | |||
def __unicode__(self): | |||
return u'%s - %s' % (self.name.upper(), self.get_name_display()) | |||
class RCLine(models.Model): | |||
rc_configuration = models.ForeignKey(RCConfiguration) | |||
line_type = models.ForeignKey(RCLineType) | |||
channel = models.PositiveIntegerField(default=0) | |||
position = models.PositiveIntegerField(default=0) | |||
params = models.TextField(default='{}') | |||
|
r45 | pulses = models.TextField(default='') | |
|
r23 | ||
class Meta: | |||
db_table = 'rc_lines' | |||
|
r45 | ordering = ['channel'] | |
|
r23 | ||
def __unicode__(self): | |||
return u'%s - %s' % (self.rc_configuration, self.get_name()) | |||
def get_name(self): | |||
chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' | |||
|
r45 | if self.line_type.name in ('tx',): | |
|
r25 | return '%s %s' % (self.line_type.name.upper(), chars[self.position]) | |
|
r45 | elif self.line_type.name in ('codes', 'windows', 'tr'): | |
|
r25 | pk = json.loads(self.params)['TX_ref'] | |
|
r45 | if pk in (0, '0'): | |
refs = ','.join(chars[l.position] for l in self.rc_configuration.get_lines('tx')) | |||
return '%s (%s)' % (self.line_type.name.upper(), refs) | |||
else: | |||
ref = RCLine.objects.get(pk=pk) | |||
return '%s (%s)' % (self.line_type.name.upper(), chars[ref.position]) | |||
elif self.line_type.name in ('flip', 'prog_pulses', 'sync'): | |||
return '%s %s' % (self.line_type.name.upper(), self.channel) | |||
|
r25 | else: | |
return self.line_type.name.upper() | |||
|
r45 | ||
def get_lines(self, type=None): | |||
if type is not None: | |||
return RCLine.objects.filter(rc_configuration=self.rc_configuration, line_type__name=type) | |||
else: | |||
return RCLine.objects.filter(rc_configuration=self.rc_configuration) | |||
def pulses_as_array(self): | |||
return (np.fromstring(self.pulses, dtype=np.uint8)-48).astype(np.int8) | |||
@property | |||
def km2unit(self): | |||
return 20./3*(self.rc_configuration.clock/self.rc_configuration.clock_divider) | |||
def update_pulses(self, save=True, tr=False): | |||
print self | |||
KM2U = 20./3*(self.rc_configuration.clock/self.rc_configuration.clock_divider) | |||
ipp = self.rc_configuration.ipp | |||
ntx = self.rc_configuration.ntx | |||
ipp_u = int(ipp*KM2U) | |||
x = np.arange(0, ipp_u*ntx) | |||
if self.line_type.name=='tr': | |||
params = json.loads(self.params) | |||
if params['TX_ref'] in ('0', 0): | |||
txs = [tx.update_pulses(save=False, tr=True) for tx in self.get_lines('tx')] | |||
else: | |||
txs = [tx.update_pulses(save=False, tr=True) for tx in RCLine.objects.filter(pk=params['TX_ref'])] | |||
if len(txs)==0 or 0 in [len(tx) for tx in txs]: | |||
return | |||
y = np.any(txs, axis=0, out=np.ones(ipp_u*ntx)) | |||
ranges = params['range'].split(',') | |||
if len(ranges)>0 and ranges[0]<>'0': | |||
mask = create_mask(ranges, ipp_u, ntx, self.rc_configuration.sync) | |||
y = y.astype(np.int8) & mask | |||
elif self.line_type.name=='tx': | |||
params = json.loads(self.params) | |||
delays = [float(d)*KM2U for d in params['delays'].split(',') if d] | |||
y = pulses(x, ipp_u, float(params['pulse_width'])*KM2U, | |||
delay=delays, | |||
before=self.rc_configuration.time_before, | |||
after=self.rc_configuration.time_after if tr else 0, | |||
sync=self.rc_configuration.sync) | |||
ranges = params['range'].split(',') | |||
if len(ranges)>0 and ranges[0]<>'0': | |||
mask = create_mask(ranges, ipp_u, ntx, self.rc_configuration.sync) | |||
y = y & mask | |||
elif self.line_type.name=='flip': | |||
width = float(json.loads(self.params)['number_of_flips'])*ipp*KM2U | |||
y = pulses(x, 2*width, width) | |||
elif self.line_type.name=='codes': | |||
params = json.loads(self.params) | |||
codes = ast.literal_eval(RCLineCode.objects.get(pk=json.loads(self.params)['code']).codes) | |||
tx = RCLine.objects.get(pk=params['TX_ref']) | |||
tx_params = json.loads(tx.params) | |||
y = pulses_from_code(ipp_u, ntx, codes, | |||
int(float(tx_params['pulse_width'])*KM2U), | |||
before=self.rc_configuration.time_before+self.rc_configuration.sync) | |||
ranges = tx_params['range'].split(',') | |||
if len(ranges)>0 and ranges[0]<>'0': | |||
mask = create_mask(ranges, ipp_u, ntx, self.rc_configuration.sync) | |||
y = y.astype(np.int8) & mask | |||
elif self.line_type.name=='sync': | |||
params = json.loads(self.params) | |||
y = np.zeros(ipp_u*ntx) | |||
if params['invert'] in ('1', 1): | |||
y[-1] = 1 | |||
else: | |||
y[0] = 1 | |||
elif self.line_type.name=='prog_pulses': | |||
params = json.loads(self.params) | |||
if int(params['periodic'])==0: | |||
nntx = ntx | |||
else: | |||
nntx = 1 | |||
if 'params' in params and len(params['params'])>0: | |||
y = sum([pulses(x, ipp_u*nntx, (pp['end']-pp['begin']), shift=pp['begin']) for pp in params['params']]) | |||
else: | |||
y = np.zeros(ipp_u*ntx) | |||
elif self.line_type.name=='windows': | |||
params = json.loads(self.params) | |||
if 'params' in params and len(params['params'])>0: | |||
y = sum([pulses(x, ipp_u, pp['resolution']*pp['number_of_samples']*KM2U, | |||
delay=(pp['first_height']-pp['resolution'])*KM2U, | |||
before=self.rc_configuration.time_before) for pp in params['params']]) | |||
tr = self.get_lines('tr')[0] | |||
ranges = json.loads(tr.params)['range'].split(',') | |||
if len(ranges)>0 and ranges[0]<>'0': | |||
mask = create_mask(ranges, ipp_u, ntx, self.rc_configuration.sync) | |||
y = y & mask | |||
else: | |||
y = np.zeros(ipp_u*ntx) | |||
else: | |||
y = np.zeros(ipp_u*ntx) | |||
if save: | |||
self.pulses = (y+48).astype(np.uint8).tostring() | |||
self.save() | |||
else: | |||
return y | |||