##// END OF EJS Templates
Task #487: Vista de Operation...
Task #487: Vista de Operation git-svn-id: http://jro-dev.igp.gob.pe/svn/jro_hard/radarsys/trunk/webapp@99 aa17d016-51d5-4e8b-934c-7b2bbb1bbe71

File last commit:

r45:f516e8aeadc3
r78:9f9aafc9f148
Show More
models.py
235 lines | 9.1 KiB | text/x-python | PythonLexer
import ast
import json
import numpy as np
from polymorphic import PolymorphicModel
from django.db import models
from django.core.validators import MinValueValidator, MaxValueValidator
from apps.main.models import Configuration
from .utils import pulses, pulses_from_code, create_mask
# Create your models here.
LINE_TYPES = (
('none', 'Not used'),
('tr', 'Transmission/reception selector signal'),
('tx', 'A modulating signal (Transmission pulse)'),
('codes', 'BPSK modulating signal'),
('windows', 'Sample window signal'),
('sync', 'Synchronizing signal'),
('flip', 'IPP related periodic signal'),
('prog_pulses', 'Programmable pulse'),
)
class RCConfiguration(Configuration):
ipp = models.FloatField(verbose_name='Inter pulse period (Km)', validators=[MinValueValidator(1), MaxValueValidator(1000)], default=10)
ntx = models.PositiveIntegerField(verbose_name='Number of TX', default=1)
clock = models.FloatField(verbose_name='Clock Master (MHz)', validators=[MinValueValidator(1), MaxValueValidator(80)], default=1)
clock_divider = models.PositiveIntegerField(verbose_name='Clock divider', validators=[MinValueValidator(1), MaxValueValidator(256)], default=1)
time_before = models.PositiveIntegerField(verbose_name='Time before', default=0)
time_after = models.PositiveIntegerField(verbose_name='Time after', default=0)
sync = models.PositiveIntegerField(verbose_name='Synchro delay', default=0)
class Meta:
db_table = 'rc_configurations'
def get_number_position(self):
lines = RCLine.objects.filter(rc_configuration=self.rc_configuration)
if lines:
return max([line.position for line in lines])
def get_refs_lines(self):
lines = RCLine.objects.filter(rc_configuration=self.pk, line_type__name='tx')
return [(line.pk, line.get_name()) for line in lines]
def get_lines(self, type=None):
if type is not None:
return RCLine.objects.filter(rc_configuration=self.pk, line_type__name=type)
else:
return RCLine.objects.filter(rc_configuration=self.pk)
class RCLineCode(models.Model):
name = models.CharField(max_length=40)
bits_per_code = models.PositiveIntegerField(default=0)
number_of_codes = models.PositiveIntegerField(default=0)
codes = models.TextField(blank=True, null=True)
class Meta:
db_table = 'rc_line_codes'
ordering = ('name',)
def __unicode__(self):
return u'%s' % self.name
class RCLineType(models.Model):
name = models.CharField(choices=LINE_TYPES, max_length=40)
description = models.TextField(blank=True, null=True)
params = models.TextField(default='[]')
class Meta:
db_table = 'rc_line_types'
def __unicode__(self):
return u'%s - %s' % (self.name.upper(), self.get_name_display())
class RCLine(models.Model):
rc_configuration = models.ForeignKey(RCConfiguration)
line_type = models.ForeignKey(RCLineType)
channel = models.PositiveIntegerField(default=0)
position = models.PositiveIntegerField(default=0)
params = models.TextField(default='{}')
pulses = models.TextField(default='')
class Meta:
db_table = 'rc_lines'
ordering = ['channel']
def __unicode__(self):
return u'%s - %s' % (self.rc_configuration, self.get_name())
def get_name(self):
chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
if self.line_type.name in ('tx',):
return '%s %s' % (self.line_type.name.upper(), chars[self.position])
elif self.line_type.name in ('codes', 'windows', 'tr'):
pk = json.loads(self.params)['TX_ref']
if pk in (0, '0'):
refs = ','.join(chars[l.position] for l in self.rc_configuration.get_lines('tx'))
return '%s (%s)' % (self.line_type.name.upper(), refs)
else:
ref = RCLine.objects.get(pk=pk)
return '%s (%s)' % (self.line_type.name.upper(), chars[ref.position])
elif self.line_type.name in ('flip', 'prog_pulses', 'sync'):
return '%s %s' % (self.line_type.name.upper(), self.channel)
else:
return self.line_type.name.upper()
def get_lines(self, type=None):
if type is not None:
return RCLine.objects.filter(rc_configuration=self.rc_configuration, line_type__name=type)
else:
return RCLine.objects.filter(rc_configuration=self.rc_configuration)
def pulses_as_array(self):
return (np.fromstring(self.pulses, dtype=np.uint8)-48).astype(np.int8)
@property
def km2unit(self):
return 20./3*(self.rc_configuration.clock/self.rc_configuration.clock_divider)
def update_pulses(self, save=True, tr=False):
print self
KM2U = 20./3*(self.rc_configuration.clock/self.rc_configuration.clock_divider)
ipp = self.rc_configuration.ipp
ntx = self.rc_configuration.ntx
ipp_u = int(ipp*KM2U)
x = np.arange(0, ipp_u*ntx)
if self.line_type.name=='tr':
params = json.loads(self.params)
if params['TX_ref'] in ('0', 0):
txs = [tx.update_pulses(save=False, tr=True) for tx in self.get_lines('tx')]
else:
txs = [tx.update_pulses(save=False, tr=True) for tx in RCLine.objects.filter(pk=params['TX_ref'])]
if len(txs)==0 or 0 in [len(tx) for tx in txs]:
return
y = np.any(txs, axis=0, out=np.ones(ipp_u*ntx))
ranges = params['range'].split(',')
if len(ranges)>0 and ranges[0]<>'0':
mask = create_mask(ranges, ipp_u, ntx, self.rc_configuration.sync)
y = y.astype(np.int8) & mask
elif self.line_type.name=='tx':
params = json.loads(self.params)
delays = [float(d)*KM2U for d in params['delays'].split(',') if d]
y = pulses(x, ipp_u, float(params['pulse_width'])*KM2U,
delay=delays,
before=self.rc_configuration.time_before,
after=self.rc_configuration.time_after if tr else 0,
sync=self.rc_configuration.sync)
ranges = params['range'].split(',')
if len(ranges)>0 and ranges[0]<>'0':
mask = create_mask(ranges, ipp_u, ntx, self.rc_configuration.sync)
y = y & mask
elif self.line_type.name=='flip':
width = float(json.loads(self.params)['number_of_flips'])*ipp*KM2U
y = pulses(x, 2*width, width)
elif self.line_type.name=='codes':
params = json.loads(self.params)
codes = ast.literal_eval(RCLineCode.objects.get(pk=json.loads(self.params)['code']).codes)
tx = RCLine.objects.get(pk=params['TX_ref'])
tx_params = json.loads(tx.params)
y = pulses_from_code(ipp_u, ntx, codes,
int(float(tx_params['pulse_width'])*KM2U),
before=self.rc_configuration.time_before+self.rc_configuration.sync)
ranges = tx_params['range'].split(',')
if len(ranges)>0 and ranges[0]<>'0':
mask = create_mask(ranges, ipp_u, ntx, self.rc_configuration.sync)
y = y.astype(np.int8) & mask
elif self.line_type.name=='sync':
params = json.loads(self.params)
y = np.zeros(ipp_u*ntx)
if params['invert'] in ('1', 1):
y[-1] = 1
else:
y[0] = 1
elif self.line_type.name=='prog_pulses':
params = json.loads(self.params)
if int(params['periodic'])==0:
nntx = ntx
else:
nntx = 1
if 'params' in params and len(params['params'])>0:
y = sum([pulses(x, ipp_u*nntx, (pp['end']-pp['begin']), shift=pp['begin']) for pp in params['params']])
else:
y = np.zeros(ipp_u*ntx)
elif self.line_type.name=='windows':
params = json.loads(self.params)
if 'params' in params and len(params['params'])>0:
y = sum([pulses(x, ipp_u, pp['resolution']*pp['number_of_samples']*KM2U,
delay=(pp['first_height']-pp['resolution'])*KM2U,
before=self.rc_configuration.time_before) for pp in params['params']])
tr = self.get_lines('tr')[0]
ranges = json.loads(tr.params)['range'].split(',')
if len(ranges)>0 and ranges[0]<>'0':
mask = create_mask(ranges, ipp_u, ntx, self.rc_configuration.sync)
y = y & mask
else:
y = np.zeros(ipp_u*ntx)
else:
y = np.zeros(ipp_u*ntx)
if save:
self.pulses = (y+48).astype(np.uint8).tostring()
self.save()
else:
return y