models.py
878 lines
| 29.7 KiB
| text/x-python
|
PythonLexer
|
r23 | |||
|
r45 | import ast | ||
|
r25 | import json | ||
|
r175 | import requests | ||
|
r45 | import numpy as np | ||
|
r175 | from base64 import b64encode | ||
|
r25 | |||
|
r0 | from django.db import models | ||
|
r79 | from django.core.urlresolvers import reverse | ||
|
r23 | from django.core.validators import MinValueValidator, MaxValueValidator | ||
|
r6 | from apps.main.models import Configuration | ||
|
r107 | from devices.rc import api | ||
|
r111 | from .utils import RCFile | ||
|
r79 | |||
|
r0 | # Create your models here. | ||
|
r6 | |||
|
r23 | LINE_TYPES = ( | ||
|
r45 | ('none', 'Not used'), | ||
|
r23 | ('tr', 'Transmission/reception selector signal'), | ||
('tx', 'A modulating signal (Transmission pulse)'), | ||||
('codes', 'BPSK modulating signal'), | ||||
('windows', 'Sample window signal'), | ||||
('sync', 'Synchronizing signal'), | ||||
('flip', 'IPP related periodic signal'), | ||||
('prog_pulses', 'Programmable pulse'), | ||||
|
r107 | ('mix', 'Mixed line'), | ||
|
r23 | ) | ||
|
r79 | SAMPLING_REFS = ( | ||
('none', 'No Reference'), | ||||
|
r157 | ('begin_baud', 'Begin of the first baud'), | ||
|
r79 | ('first_baud', 'Middle of the first baud'), | ||
('sub_baud', 'Middle of the sub-baud') | ||||
) | ||||
DAT_CMDS = { | ||||
# Pulse Design commands | ||||
'DISABLE' : 0, # Disables pulse generation | ||||
'ENABLE' : 24, # Enables pulse generation | ||||
|
r172 | 'DELAY_START' : 40, # Write delay status to memory | ||
|
r79 | 'FLIP_START' : 48, # Write flip status to memory | ||
'SAMPLING_PERIOD' : 64, # Establish Sampling Period | ||||
|
r172 | 'TX_ONE' : 72, # Output '0' in line TX | ||
'TX_ZERO' : 88, # Output '0' in line TX | ||||
'SW_ONE' : 104, # Output '0' in line SW | ||||
|
r79 | 'SW_ZERO' : 112, # Output '1' in line SW | ||
'RESTART': 120, # Restarts CR8 Firmware | ||||
'CONTINUE' : 253, # Function Unknown | ||||
# Commands available to new controllers | ||||
# In Pulse Design Executable, the clock divisor code is written as 12 at the start, but it should be written as code 22(below) just before the final enable. | ||||
'CLOCK_DIVISOR_INIT' : 12, # Specifies Clock Divisor. Legacy command, ignored in the actual .dat conversion | ||||
'CLOCK_DIVISOR_LAST' : 22, # Specifies Clock Divisor (default 60 if not included) syntax: 255,22 254,N-1. | ||||
|
r172 | 'CLOCK_DIVIDER' : 8, | ||
|
r79 | } | ||
|
r6 | class RCConfiguration(Configuration): | ||
|
r172 | |||
|
r138 | ipp = models.FloatField(verbose_name='IPP [Km]', validators=[MinValueValidator(1), MaxValueValidator(9000)], default=300) | ||
|
r172 | ntx = models.PositiveIntegerField(verbose_name='Number of TX', validators=[MinValueValidator(1), MaxValueValidator(400)], default=1) | ||
|
r112 | clock_in = models.FloatField(verbose_name='Clock in [MHz]', validators=[MinValueValidator(1), MaxValueValidator(80)], default=1) | ||
|
r45 | clock_divider = models.PositiveIntegerField(verbose_name='Clock divider', validators=[MinValueValidator(1), MaxValueValidator(256)], default=1) | ||
|
r112 | clock = models.FloatField(verbose_name='Clock Master [MHz]', blank=True, default=1) | ||
time_before = models.PositiveIntegerField(verbose_name='Time before [μS]', default=12) | ||||
time_after = models.PositiveIntegerField(verbose_name='Time after [μS]', default=1) | ||||
|
r45 | sync = models.PositiveIntegerField(verbose_name='Synchro delay', default=0) | ||
|
r79 | sampling_reference = models.CharField(verbose_name='Sampling Reference', choices=SAMPLING_REFS, default='none', max_length=40) | ||
control_tx = models.BooleanField(verbose_name='Control Switch TX', default=False) | ||||
control_sw = models.BooleanField(verbose_name='Control Switch SW', default=False) | ||||
|
r112 | total_units = models.PositiveIntegerField(default=0) | ||
|
r107 | mix = models.BooleanField(default=False) | ||
|
r6 | |||
class Meta: | ||||
db_table = 'rc_configurations' | ||||
|
r172 | |||
|
r79 | def get_absolute_url_plot(self): | ||
return reverse('url_plot_rc_pulses', args=[str(self.id)]) | ||||
|
r172 | |||
|
r79 | def get_absolute_url_import(self): | ||
return reverse('url_import_rc_conf', args=[str(self.id)]) | ||||
|
r172 | |||
|
r112 | @property | ||
def ipp_unit(self): | ||||
|
r172 | |||
|
r112 | return '{} ({})'.format(self.ipp, int(self.ipp*self.km2unit)) | ||
|
r172 | |||
|
r79 | @property | ||
def us2unit(self): | ||||
|
r172 | |||
|
r79 | return self.clock_in/self.clock_divider | ||
@property | ||||
def km2unit(self): | ||||
|
r172 | |||
|
r79 | return 20./3*(self.clock_in/self.clock_divider) | ||
|
r85 | def clone(self, **kwargs): | ||
|
r172 | |||
|
r85 | lines = self.get_lines() | ||
self.pk = None | ||||
self.id = None | ||||
for attr, value in kwargs.items(): | ||||
setattr(self, attr, value) | ||||
self.save() | ||||
|
r172 | |||
|
r85 | for line in lines: | ||
line.clone(rc_configuration=self) | ||||
|
r172 | |||
return self | ||||
|
r85 | |||
|
r107 | def get_lines(self, **kwargs): | ||
|
r79 | ''' | ||
|
r172 | Retrieve configuration lines | ||
|
r79 | ''' | ||
|
r172 | |||
|
r107 | return RCLine.objects.filter(rc_configuration=self.pk, **kwargs) | ||
|
r172 | |||
|
r45 | |||
|
r79 | def clean_lines(self): | ||
''' | ||||
''' | ||||
|
r172 | |||
|
r111 | empty_line = RCLineType.objects.get(name='none') | ||
|
r172 | |||
|
r79 | for line in self.get_lines(): | ||
line.line_type = empty_line | ||||
line.params = '{}' | ||||
line.save() | ||||
def parms_to_dict(self): | ||||
''' | ||||
|
r172 | ''' | ||
|
r111 | ignored = ('parameters', 'type', 'polymorphic_ctype', 'configuration_ptr', | ||
'created_date', 'programmed_date') | ||||
|
r172 | |||
|
r79 | data = {} | ||
for field in self._meta.fields: | ||||
|
r111 | if field.name in ignored: | ||
continue | ||||
|
r79 | data[field.name] = '{}'.format(field.value_from_object(self)) | ||
|
r172 | |||
|
r147 | data['device_id'] = data.pop('device') | ||
|
r79 | data['lines'] = [] | ||
|
r172 | |||
|
r79 | for line in self.get_lines(): | ||
|
r172 | line_data = json.loads(line.params) | ||
|
r79 | if 'TX_ref' in line_data and line_data['TX_ref'] not in (0, '0'): | ||
|
r172 | line_data['TX_ref'] = RCLine.objects.get(pk=line_data['TX_ref']).get_name() | ||
|
r79 | if 'code' in line_data: | ||
line_data['code'] = RCLineCode.objects.get(pk=line_data['code']).name | ||||
line_data['type'] = line.line_type.name | ||||
|
r157 | line_data['name'] = line.get_name() | ||
|
r79 | data['lines'].append(line_data) | ||
|
r172 | |||
|
r107 | data['delays'] = self.get_delays() | ||
data['pulses'] = self.get_pulses() | ||||
|
r172 | |||
|
r79 | return data | ||
|
r172 | |||
|
r107 | def dict_to_parms(self, data): | ||
''' | ||||
''' | ||||
|
r172 | |||
|
r107 | self.name = data['name'] | ||
|
r148 | self.ipp = float(data['ipp']) | ||
self.ntx = int(data['ntx']) | ||||
|
r172 | self.clock_in = float(data['clock_in']) | ||
|
r148 | self.clock_divider = int(data['clock_divider']) | ||
self.clock = float(data['clock']) | ||||
|
r107 | self.time_before = data['time_before'] | ||
self.time_after = data['time_after'] | ||||
self.sync = data['sync'] | ||||
self.sampling_reference = data['sampling_reference'] | ||||
|
r119 | self.total_units = self.ipp*self.ntx*self.km2unit | ||
|
r114 | self.save() | ||
|
r107 | self.clean_lines() | ||
|
r172 | |||
|
r107 | lines = [] | ||
|
r172 | positions = {'tx':0, 'tr':0} | ||
for i, line_data in enumerate(data['lines']): | ||||
name = line_data.pop('name', '') | ||||
|
r107 | line_type = RCLineType.objects.get(name=line_data.pop('type')) | ||
if line_type.name=='codes': | ||||
code = RCLineCode.objects.get(name=line_data['code']) | ||||
|
r172 | line_data['code'] = code.pk | ||
|
r107 | line = RCLine.objects.filter(rc_configuration=self, channel=i) | ||
if line: | ||||
line = line[0] | ||||
line.line_type = line_type | ||||
line.params = json.dumps(line_data) | ||||
else: | ||||
|
r172 | line = RCLine(rc_configuration=self, line_type=line_type, | ||
|
r107 | params=json.dumps(line_data), | ||
channel=i) | ||||
|
r172 | |||
|
r107 | if line_type.name=='tx': | ||
line.position = positions['tx'] | ||||
positions['tx'] += 1 | ||||
|
r172 | |||
|
r107 | if line_type.name=='tr': | ||
line.position = positions['tr'] | ||||
positions['tr'] += 1 | ||||
|
r172 | |||
|
r107 | line.save() | ||
lines.append(line) | ||||
|
r172 | |||
|
r107 | for line, line_data in zip(lines, data['lines']): | ||
if 'TX_ref' in line_data: | ||||
params = json.loads(line.params) | ||||
if line_data['TX_ref'] in (0, '0'): | ||||
params['TX_ref'] = '0' | ||||
else: | ||||
|
r113 | params['TX_ref'] = [l.pk for l in lines if l.line_type.name=='tx' and line_data['TX_ref'] in l.get_name()][0] | ||
|
r107 | line.params = json.dumps(params) | ||
line.save() | ||||
|
r172 | |||
|
r79 | def get_delays(self): | ||
|
r172 | |||
|
r111 | pulses = [line.pulses_as_points() for line in self.get_lines()] | ||
|
r79 | points = [tup for tups in pulses for tup in tups] | ||
points = set([x for tup in points for x in tup]) | ||||
points = list(points) | ||||
|
r172 | points.sort() | ||
if points[0]!=0: | ||||
|
r79 | points.insert(0, 0) | ||
|
r172 | |||
|
r79 | return [points[i+1]-points[i] for i in range(len(points)-1)] | ||
|
r172 | |||
|
r107 | def get_pulses(self, binary=True): | ||
|
r172 | |||
|
r111 | pulses = [line.pulses_as_points() for line in self.get_lines()] | ||
|
r107 | points = [tup for tups in pulses for tup in tups] | ||
points = set([x for tup in points for x in tup]) | ||||
points = list(points) | ||||
|
r172 | points.sort() | ||
|
r111 | line_points = [line.pulses_as_points() for line in self.get_lines()] | ||
|
r79 | line_points = [[(x, x+y) for x,y in tups] for tups in line_points] | ||
|
r172 | line_points = [[t for x in tups for t in x] for tups in line_points] | ||
|
r79 | states = [[1 if x in tups else 0 for tups in line_points] for x in points] | ||
|
r172 | |||
|
r107 | if binary: | ||
states.reverse() | ||||
states = [int(''.join([str(x) for x in flips]), 2) for flips in states] | ||||
|
r172 | |||
|
r107 | return states[:-1] | ||
|
r172 | |||
|
r79 | def add_cmd(self, cmd): | ||
|
r172 | |||
|
r79 | if cmd in DAT_CMDS: | ||
return (255, DAT_CMDS[cmd]) | ||||
|
r172 | |||
def add_data(self, value): | ||||
|
r79 | return (254, value-1) | ||
|
r175 | |||
def parms_to_binary(self, dat=True): | ||||
|
r79 | ''' | ||
Create "dat" stream to be send to CR | ||||
''' | ||||
|
r172 | |||
|
r175 | data = bytearray() | ||
|
r79 | # create header | ||
|
r175 | data.extend(self.add_cmd('DISABLE')) | ||
data.extend(self.add_cmd('CONTINUE')) | ||||
data.extend(self.add_cmd('RESTART')) | ||||
|
r172 | |||
|
r79 | if self.control_sw: | ||
|
r175 | data.extend(self.add_cmd('SW_ONE')) | ||
|
r79 | else: | ||
|
r175 | data.extend(self.add_cmd('SW_ZERO')) | ||
|
r172 | |||
|
r79 | if self.control_tx: | ||
|
r175 | data.extend(self.add_cmd('TX_ONE')) | ||
|
r79 | else: | ||
|
r175 | data.extend(self.add_cmd('TX_ZERO')) | ||
|
r172 | |||
|
r79 | # write divider | ||
|
r175 | data.extend(self.add_cmd('CLOCK_DIVIDER')) | ||
data.extend(self.add_data(self.clock_divider)) | ||||
|
r172 | |||
|
r79 | # write delays | ||
|
r175 | data.extend(self.add_cmd('DELAY_START')) | ||
|
r79 | # first delay is always zero | ||
|
r175 | data.extend(self.add_data(1)) | ||
|
r172 | |||
|
r107 | delays = self.get_delays() | ||
|
r172 | |||
for delay in delays: | ||||
while delay>252: | ||||
|
r175 | data.extend(self.add_data(253)) | ||
|
r79 | delay -= 253 | ||
|
r175 | data.extend(self.add_data(int(delay))) | ||
|
r172 | |||
|
r79 | # write flips | ||
|
r175 | data.extend(self.add_cmd('FLIP_START')) | ||
|
r172 | |||
|
r107 | states = self.get_pulses(binary=False) | ||
for flips, delay in zip(states, delays): | ||||
|
r79 | flips.reverse() | ||
|
r172 | flip = int(''.join([str(x) for x in flips]), 2) | ||
|
r175 | data.extend(self.add_data(flip+1)) | ||
|
r79 | while delay>252: | ||
|
r175 | data.extend(self.add_data(1)) | ||
|
r79 | delay -= 253 | ||
|
r172 | |||
|
r79 | # write sampling period | ||
|
r175 | data.extend(self.add_cmd('SAMPLING_PERIOD')) | ||
|
r107 | wins = self.get_lines(line_type__name='windows') | ||
|
r79 | if wins: | ||
win_params = json.loads(wins[0].params)['params'] | ||||
if win_params: | ||||
dh = int(win_params[0]['resolution']*self.km2unit) | ||||
else: | ||||
dh = 1 | ||||
else: | ||||
dh = 1 | ||||
|
r175 | data.extend(self.add_data(dh)) | ||
|
r172 | |||
|
r79 | # write enable | ||
|
r175 | data.extend(self.add_cmd('ENABLE')) | ||
if not dat: | ||||
return data | ||||
return '\n'.join(['{}'.format(x) for x in data]) | ||||
|
r172 | |||
|
r79 | def update_from_file(self, filename): | ||
''' | ||||
Update instance from file | ||||
''' | ||||
|
r172 | |||
|
r79 | f = RCFile(filename) | ||
|
r107 | self.dict_to_parms(f.data) | ||
|
r111 | self.update_pulses() | ||
|
r107 | |||
def update_pulses(self): | ||||
|
r172 | |||
|
r107 | for line in self.get_lines(): | ||
|
r119 | line.update_pulses() | ||
|
r172 | |||
|
r175 | def plot_pulses2(self, km=False): | ||
|
r172 | |||
|
r111 | import matplotlib.pyplot as plt | ||
from bokeh.resources import CDN | ||||
from bokeh.embed import components | ||||
|
r172 | from bokeh.mpl import to_bokeh | ||
from bokeh.models.tools import WheelZoomTool, ResetTool, PanTool, HoverTool, SaveTool | ||||
lines = self.get_lines() | ||||
|
r111 | N = len(lines) | ||
|
r172 | npoints = self.total_units/self.km2unit if km else self.total_units | ||
|
r175 | fig = plt.figure(figsize=(12, 2+N*0.5)) | ||
|
r111 | ax = fig.add_subplot(111) | ||
|
r172 | labels = ['IPP'] | ||
|
r111 | for i, line in enumerate(lines): | ||
|
r172 | labels.append(line.get_name(channel=True)) | ||
|
r157 | l = ax.plot((0, npoints),(N-i-1, N-i-1)) | ||
|
r172 | points = [(tup[0], tup[1]-tup[0]) for tup in line.pulses_as_points(km=km) if tup!=(0,0)] | ||
ax.broken_barh(points, (N-i-1, 0.5), | ||||
|
r111 | edgecolor=l[0].get_color(), facecolor='none') | ||
|
r172 | |||
|
r157 | n = 0 | ||
f = ((self.ntx+50)/100)*5 if ((self.ntx+50)/100)*10>0 else 2 | ||||
for x in np.arange(0, npoints, self.ipp if km else self.ipp*self.km2unit): | ||||
if n%f==0: | ||||
ax.text(x, N, '%s' % n, size=10) | ||||
n += 1 | ||||
|
r172 | |||
|
r111 | labels.reverse() | ||
|
r172 | ax.set_yticks(range(len(labels))) | ||
|
r111 | ax.set_yticklabels(labels) | ||
|
r116 | ax.set_xlabel = 'Units' | ||
|
r175 | plot = to_bokeh(fig, use_pandas=False) | ||
|
r172 | plot.tools = [PanTool(dimensions=['width']), WheelZoomTool(dimensions=['width']), ResetTool(), SaveTool()] | ||
plot.toolbar_location="above" | ||||
|
r111 | return components(plot, CDN) | ||
|
r172 | |||
|
r175 | def plot_pulses(self, km=False): | ||
|
r172 | |||
|
r175 | from bokeh.plotting import figure | ||
from bokeh.resources import CDN | ||||
from bokeh.embed import components | ||||
from bokeh.models import FixedTicker, PrintfTickFormatter | ||||
from bokeh.models.tools import WheelZoomTool, ResetTool, PanTool, HoverTool, SaveTool | ||||
from bokeh.models.sources import ColumnDataSource | ||||
|
r85 | |||
|
r175 | lines = self.get_lines().reverse() | ||
N = len(lines) | ||||
npoints = self.total_units/self.km2unit if km else self.total_units | ||||
ipp = self.ipp if km else self.ipp*self.km2unit | ||||
hover = HoverTool(tooltips=[("Line", "@name"), | ||||
("IPP", "@ipp"), | ||||
("X", "@left")]) | ||||
tools = [PanTool(dimensions=['width']), | ||||
WheelZoomTool(dimensions=['width']), | ||||
hover, SaveTool()] | ||||
plot = figure(width=1000, | ||||
height=40+N*50, | ||||
y_range = (0, N), | ||||
tools=tools, | ||||
toolbar_location='above', | ||||
toolbar_sticky=False,) | ||||
plot.xaxis.axis_label = 'Km' if km else 'Units' | ||||
plot.xaxis[0].formatter = PrintfTickFormatter(format='%d') | ||||
plot.yaxis.axis_label = 'Pulses' | ||||
plot.yaxis[0].ticker=FixedTicker(ticks=list(range(N))) | ||||
plot.yaxis[0].formatter = PrintfTickFormatter(format='Line %d') | ||||
for i, line in enumerate(lines): | ||||
points = [tup for tup in line.pulses_as_points(km=km) if tup!=(0,0)] | ||||
source = ColumnDataSource(data = dict( | ||||
bottom = [i for tup in points], | ||||
top = [i+0.5 for tup in points], | ||||
left = [tup[0] for tup in points], | ||||
right = [tup[1] for tup in points], | ||||
ipp = [int(tup[0]/ipp) for tup in points], | ||||
name = [line.get_name() for tup in points] | ||||
)) | ||||
plot.quad( | ||||
bottom = 'bottom', | ||||
top = 'top', | ||||
left = 'left', | ||||
right = 'right', | ||||
source = source, | ||||
fill_alpha = 0, | ||||
#line_color = 'blue', | ||||
) | ||||
plot.line([0, npoints], [i, i])#, color='blue') | ||||
return components(plot, CDN) | ||||
|
r172 | |||
|
r175 | def status_device(self): | ||
|
r172 | |||
|
r175 | try: | ||
req = requests.get(self.device.url) | ||||
payload = req.json() | ||||
if payload['status']=='ok': | ||||
self.device.status = 3 | ||||
else: | ||||
self.device.status = 1 | ||||
except: | ||||
self.device.status = 0 | ||||
self.device.save() | ||||
return self.device.status | ||||
def reset_device(self): | ||||
payload = bytearray() | ||||
payload.extend(self.add_cmd('RESTART')) | ||||
data = b64encode(payload) | ||||
req = requests.put(self.device.url, data) | ||||
if data==req.text.encode('utf8'): | ||||
return 1 | ||||
else: | ||||
|
r107 | return 0 | ||
|
r175 | |||
def stop_device(self): | ||||
|
r172 | |||
|
r175 | payload = bytearray() | ||
payload.extend(self.add_cmd('DISABLE')) | ||||
data = b64encode(payload) | ||||
req = requests.put(self.device.url, data) | ||||
if data==req.text.encode('utf8'): | ||||
return 1 | ||||
else: | ||||
return 0 | ||||
|
r172 | |||
|
r107 | def start_device(self): | ||
|
r172 | |||
|
r175 | payload = bytearray() | ||
payload.extend(self.add_cmd('ENABLE')) | ||||
data = b64encode(payload) | ||||
req = requests.put(self.device.url, data) | ||||
if data==req.text.encode('utf8'): | ||||
return 1 | ||||
else: | ||||
|
r107 | return 0 | ||
|
r172 | |||
|
r107 | def write_device(self): | ||
|
r175 | |||
data = b64encode(self.parms_to_binary(dat=False)) | ||||
req = requests.put(self.device.url, data) | ||||
print(req.text) | ||||
if data==req.text.encode('utf8'): | ||||
return 1 | ||||
else: | ||||
|
r107 | return 0 | ||
|
r172 | |||
|
r107 | |||
|
r23 | class RCLineCode(models.Model): | ||
|
r172 | |||
|
r25 | name = models.CharField(max_length=40) | ||
|
r23 | bits_per_code = models.PositiveIntegerField(default=0) | ||
number_of_codes = models.PositiveIntegerField(default=0) | ||||
codes = models.TextField(blank=True, null=True) | ||||
|
r172 | |||
|
r23 | class Meta: | ||
db_table = 'rc_line_codes' | ||||
|
r25 | ordering = ('name',) | ||
|
r172 | |||
def __str__(self): | ||||
return u'%s' % self.name | ||||
|
r23 | |||
|
r107 | |||
|
r23 | class RCLineType(models.Model): | ||
|
r172 | |||
|
r23 | name = models.CharField(choices=LINE_TYPES, max_length=40) | ||
description = models.TextField(blank=True, null=True) | ||||
params = models.TextField(default='[]') | ||||
|
r172 | |||
|
r23 | class Meta: | ||
db_table = 'rc_line_types' | ||||
|
r172 | def __str__(self): | ||
|
r23 | return u'%s - %s' % (self.name.upper(), self.get_name_display()) | ||
|
r172 | |||
|
r23 | class RCLine(models.Model): | ||
|
r172 | |||
|
r85 | rc_configuration = models.ForeignKey(RCConfiguration, on_delete=models.CASCADE) | ||
|
r23 | line_type = models.ForeignKey(RCLineType) | ||
channel = models.PositiveIntegerField(default=0) | ||||
position = models.PositiveIntegerField(default=0) | ||||
params = models.TextField(default='{}') | ||||
|
r45 | pulses = models.TextField(default='') | ||
|
r172 | |||
|
r23 | class Meta: | ||
db_table = 'rc_lines' | ||||
|
r45 | ordering = ['channel'] | ||
|
r172 | |||
def __str__(self): | ||||
|
r79 | if self.rc_configuration: | ||
return u'%s - %s' % (self.rc_configuration, self.get_name()) | ||||
|
r172 | |||
|
r85 | def clone(self, **kwargs): | ||
|
r172 | |||
|
r85 | self.pk = None | ||
|
r172 | |||
|
r85 | for attr, value in kwargs.items(): | ||
setattr(self, attr, value) | ||||
|
r172 | |||
self.save() | ||||
|
r85 | |||
return self | ||||
|
r172 | |||
|
r157 | def get_name(self, channel=False): | ||
|
r172 | |||
|
r23 | chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' | ||
|
r172 | s = '' | ||
|
r45 | if self.line_type.name in ('tx',): | ||
|
r111 | s = chars[self.position] | ||
|
r45 | elif self.line_type.name in ('codes', 'windows', 'tr'): | ||
|
r111 | if 'TX_ref' in json.loads(self.params): | ||
pk = json.loads(self.params)['TX_ref'] | ||||
if pk in (0, '0'): | ||||
|
r172 | s = ','.join(chars[l.position] for l in self.rc_configuration.get_lines(line_type__name='tx')) | ||
|
r111 | else: | ||
ref = RCLine.objects.get(pk=pk) | ||||
s = chars[ref.position] | ||||
|
r113 | s = '({})'.format(s) | ||
|
r172 | |||
|
r157 | s = '{}{}'.format(self.line_type.name.upper(), s) | ||
|
r172 | |||
|
r157 | if channel: | ||
return '{} {}'.format(s, self.channel) | ||||
|
r25 | else: | ||
|
r157 | return s | ||
|
r45 | |||
|
r107 | def get_lines(self, **kwargs): | ||
|
r172 | |||
return RCLine.objects.filter(rc_configuration=self.rc_configuration, **kwargs) | ||||
|
r45 | def pulses_as_array(self): | ||
|
r172 | |||
|
r112 | y = np.zeros(self.rc_configuration.total_units) | ||
|
r172 | |||
|
r111 | for tup in ast.literal_eval(self.pulses): | ||
y[tup[0]:tup[1]] = 1 | ||||
|
r172 | |||
|
r111 | return y.astype(np.int8) | ||
|
r172 | |||
|
r157 | def pulses_as_points(self, km=False): | ||
|
r172 | |||
|
r157 | if km: | ||
unit2km = 1/self.rc_configuration.km2unit | ||||
return [(tup[0]*unit2km, tup[1]*unit2km) for tup in ast.literal_eval(self.pulses)] | ||||
else: | ||||
return ast.literal_eval(self.pulses) | ||||
|
r172 | |||
|
r79 | def get_win_ref(self, params, tx_id, km2unit): | ||
|
r172 | |||
|
r79 | ref = self.rc_configuration.sampling_reference | ||
|
r107 | codes = [line for line in self.get_lines(line_type__name='codes') if int(json.loads(line.params)['TX_ref'])==int(tx_id)] | ||
|
r172 | |||
if codes: | ||||
|
r107 | tx_width = float(json.loads(RCLine.objects.get(pk=tx_id).params)['pulse_width'])*km2unit/len(json.loads(codes[0].params)['codes'][0]) | ||
|
r79 | else: | ||
tx_width = float(json.loads(RCLine.objects.get(pk=tx_id).params)['pulse_width'])*km2unit | ||||
|
r172 | |||
|
r79 | if ref=='first_baud': | ||
return int(1 + (tx_width + 1)/2 + params['first_height']*km2unit - params['resolution']*km2unit) | ||||
elif ref=='sub_baud': | ||||
return int(1 + params['first_height']*km2unit - params['resolution']*km2unit/2) | ||||
else: | ||||
return 0 | ||||
|
r172 | |||
|
r111 | def update_pulses(self): | ||
|
r79 | ''' | ||
|
r172 | Update pulses field | ||
|
r79 | ''' | ||
|
r172 | |||
|
r79 | km2unit = self.rc_configuration.km2unit | ||
us2unit = self.rc_configuration.us2unit | ||||
|
r45 | ipp = self.rc_configuration.ipp | ||
|
r175 | ntx = int(self.rc_configuration.ntx) | ||
|
r79 | ipp_u = int(ipp*km2unit) | ||
|
r116 | total = ipp_u*ntx if self.rc_configuration.total_units==0 else self.rc_configuration.total_units | ||
|
r111 | y = [] | ||
|
r172 | |||
|
r45 | if self.line_type.name=='tr': | ||
|
r111 | tr_params = json.loads(self.params) | ||
|
r172 | |||
|
r111 | if tr_params['TX_ref'] in ('0', 0): | ||
txs = self.get_lines(line_type__name='tx') | ||||
|
r45 | else: | ||
|
r115 | txs = RCLine.objects.filter(pk=tr_params['TX_ref']) | ||
|
r172 | |||
|
r111 | for tx in txs: | ||
params = json.loads(tx.params) | ||||
|
r172 | |||
|
r111 | if float(params['pulse_width'])==0: | ||
continue | ||||
|
r172 | delays = [float(d)*km2unit for d in params['delays'].split(',') if d] | ||
|
r111 | width = float(params['pulse_width'])*km2unit+int(self.rc_configuration.time_before*us2unit) | ||
before = 0 | ||||
after = int(self.rc_configuration.time_after*us2unit) | ||||
|
r172 | |||
|
r111 | y_tx = self.points(ntx, ipp_u, width, | ||
delay=delays, | ||||
before=before, | ||||
after=after, | ||||
sync=self.rc_configuration.sync) | ||||
|
r172 | |||
|
r111 | ranges = params['range'].split(',') | ||
|
r172 | |||
if len(ranges)>0 and ranges[0]!='0': | ||||
|
r111 | y_tx = self.mask_ranges(y_tx, ranges) | ||
|
r172 | |||
|
r111 | tr_ranges = tr_params['range'].split(',') | ||
|
r172 | |||
if len(tr_ranges)>0 and tr_ranges[0]!='0': | ||||
|
r111 | y_tx = self.mask_ranges(y_tx, tr_ranges) | ||
|
r172 | |||
|
r111 | y.extend(y_tx) | ||
|
r119 | |||
|
r172 | self.pulses = str(y) | ||
|
r116 | y = self.array_to_points(self.pulses_as_array()) | ||
|
r172 | |||
|
r45 | elif self.line_type.name=='tx': | ||
|
r172 | params = json.loads(self.params) | ||
delays = [float(d)*km2unit for d in params['delays'].split(',') if d] | ||||
|
r111 | width = float(params['pulse_width'])*km2unit | ||
|
r172 | |||
if width>0: | ||||
|
r111 | before = int(self.rc_configuration.time_before*us2unit) | ||
after = 0 | ||||
|
r172 | |||
|
r111 | y = self.points(ntx, ipp_u, width, | ||
delay=delays, | ||||
before=before, | ||||
after=after, | ||||
|
r172 | sync=self.rc_configuration.sync) | ||
|
r111 | ranges = params['range'].split(',') | ||
|
r172 | |||
if len(ranges)>0 and ranges[0]!='0': | ||||
|
r111 | y = self.mask_ranges(y, ranges) | ||
|
r172 | |||
|
r45 | elif self.line_type.name=='flip': | ||
|
r111 | n = float(json.loads(self.params)['number_of_flips']) | ||
width = n*ipp*km2unit | ||||
y = self.points(int((ntx+1)/(2*n)), ipp_u*n*2, width) | ||||
|
r172 | |||
|
r45 | elif self.line_type.name=='codes': | ||
params = json.loads(self.params) | ||||
tx = RCLine.objects.get(pk=params['TX_ref']) | ||||
tx_params = json.loads(tx.params) | ||||
|
r172 | delays = [float(d)*km2unit for d in tx_params['delays'].split(',') if d] | ||
f = int(float(tx_params['pulse_width'])*km2unit/len(params['codes'][0])) | ||||
codes = [(np.fromstring(''.join([s*f for s in code]), dtype=np.uint8)-48).astype(np.int8) for code in params['codes']] | ||||
|
r111 | codes = [self.array_to_points(code) for code in codes] | ||
n = len(codes) | ||||
|
r172 | |||
|
r111 | for i, tup in enumerate(tx.pulses_as_points()): | ||
code = codes[i%n] | ||||
y.extend([(c[0]+tup[0], c[1]+tup[0]) for c in code]) | ||||
|
r172 | |||
|
r45 | ranges = tx_params['range'].split(',') | ||
|
r172 | if len(ranges)>0 and ranges[0]!='0': | ||
|
r111 | y = self.mask_ranges(y, ranges) | ||
|
r172 | |||
|
r45 | elif self.line_type.name=='sync': | ||
params = json.loads(self.params) | ||||
|
r111 | n = ipp_u*ntx | ||
|
r45 | if params['invert'] in ('1', 1): | ||
|
r111 | y = [(n-1, n)] | ||
|
r45 | else: | ||
|
r111 | y = [(0, 1)] | ||
|
r172 | |||
|
r45 | elif self.line_type.name=='prog_pulses': | ||
params = json.loads(self.params) | ||||
if int(params['periodic'])==0: | ||||
nntx = 1 | ||||
|
r111 | nipp = ipp_u*ntx | ||
else: | ||||
nntx = ntx | ||||
nipp = ipp_u | ||||
|
r172 | |||
if 'params' in params and len(params['params'])>0: | ||||
|
r111 | for p in params['params']: | ||
|
r172 | y_pp = self.points(nntx, nipp, | ||
p['end']-p['begin'], | ||||
before=p['begin']) | ||||
y.extend(y_pp) | ||||
|
r45 | elif self.line_type.name=='windows': | ||
|
r172 | params = json.loads(self.params) | ||
|
r45 | if 'params' in params and len(params['params'])>0: | ||
|
r111 | tr_params = json.loads(self.get_lines(line_type__name='tr')[0].params) | ||
tr_ranges = tr_params['range'].split(',') | ||||
for p in params['params']: | ||||
|
r172 | y_win = self.points(ntx, ipp_u, | ||
p['resolution']*p['number_of_samples']*km2unit, | ||||
|
r111 | before=int(self.rc_configuration.time_before*us2unit)+self.get_win_ref(p, params['TX_ref'], km2unit), | ||
sync=self.rc_configuration.sync) | ||||
|
r172 | |||
if len(tr_ranges)>0 and tr_ranges[0]!='0': | ||||
|
r111 | y_win = self.mask_ranges(y_win, tr_ranges) | ||
|
r172 | |||
y.extend(y_win) | ||||
|
r107 | elif self.line_type.name=='mix': | ||
values = self.rc_configuration.parameters.split('-') | ||||
|
r116 | confs = [RCConfiguration.objects.get(pk=value.split('|')[0]) for value in values] | ||
|
r107 | modes = [value.split('|')[1] for value in values] | ||
|
r112 | ops = [value.split('|')[2] for value in values] | ||
delays = [value.split('|')[3] for value in values] | ||||
masks = [value.split('|')[4] for value in values] | ||||
mask = list('{:8b}'.format(int(masks[0]))) | ||||
mask.reverse() | ||||
if mask[self.channel] in ('0', '', ' '): | ||||
|
r116 | y = np.zeros(confs[0].total_units, dtype=np.int8) | ||
|
r112 | else: | ||
|
r116 | y = confs[0].get_lines(channel=self.channel)[0].pulses_as_array() | ||
|
r172 | |||
|
r107 | for i in range(1, len(values)): | ||
mask = list('{:8b}'.format(int(masks[i]))) | ||||
|
r172 | mask.reverse() | ||
|
r107 | if mask[self.channel] in ('0', '', ' '): | ||
continue | ||||
|
r116 | Y = confs[i].get_lines(channel=self.channel)[0].pulses_as_array() | ||
|
r107 | delay = float(delays[i])*km2unit | ||
|
r172 | |||
|
r116 | if modes[i]=='P': | ||
if delay>0: | ||||
|
r172 | if delay<self.rc_configuration.ipp*km2unit and len(Y)==len(y): | ||
|
r116 | y_temp = np.empty_like(Y) | ||
y_temp[:delay] = 0 | ||||
y_temp[delay:] = Y[:-delay] | ||||
elif delay+len(Y)>len(y): | ||||
y_new = np.zeros(delay+len(Y), dtype=np.int8) | ||||
y_new[:len(y)] = y | ||||
y = y_new | ||||
y_temp = np.zeros(delay+len(Y), dtype=np.int8) | ||||
y_temp[-len(Y):] = Y | ||||
elif delay+len(Y)==len(y): | ||||
y_temp = np.zeros(delay+len(Y)) | ||||
y_temp[-len(Y):] = Y | ||||
elif delay+len(Y)<len(y): | ||||
y_temp = np.zeros(len(y), dtype=np.int8) | ||||
y_temp[delay:delay+len(Y)] = Y | ||||
|
r172 | |||
|
r116 | if ops[i]=='OR': | ||
y = y | y_temp | ||||
elif ops[i]=='XOR': | ||||
y = y ^ y_temp | ||||
elif ops[i]=='AND': | ||||
y = y & y_temp | ||||
elif ops[i]=='NAND': | ||||
y = y & ~y_temp | ||||
|
r172 | else: | ||
|
r116 | y = np.concatenate([y, Y]) | ||
|
r172 | |||
|
r112 | total = len(y) | ||
|
r111 | y = self.array_to_points(y) | ||
|
r172 | |||
|
r45 | else: | ||
|
r111 | y = [] | ||
|
r172 | |||
if self.rc_configuration.total_units != total: | ||||
|
r112 | self.rc_configuration.total_units = total | ||
|
r119 | self.rc_configuration.save() | ||
|
r172 | |||
self.pulses = str(y) | ||||
|
r111 | self.save() | ||
|
r172 | |||
|
r111 | @staticmethod | ||
def array_to_points(X): | ||||
|
r172 | |||
|
r111 | d = X[1:]-X[:-1] | ||
|
r172 | |||
|
r111 | up = np.where(d==1)[0] | ||
if X[0]==1: | ||||
up = np.concatenate((np.array([-1]), up)) | ||||
up += 1 | ||||
|
r172 | |||
|
r111 | dw = np.where(d==-1)[0] | ||
if X[-1]==1: | ||||
dw = np.concatenate((dw, np.array([len(X)-1]))) | ||||
dw += 1 | ||||
|
r172 | |||
|
r111 | return [(tup[0], tup[1]) for tup in zip(up, dw)] | ||
@staticmethod | ||||
def mask_ranges(Y, ranges): | ||||
|
r172 | |||
|
r111 | y = [(0, 0) for __ in Y] | ||
|
r172 | |||
|
r111 | for index in ranges: | ||
if '-' in index: | ||||
args = [int(a) for a in index.split('-')] | ||||
|
r172 | y[args[0]-1:args[1]] = Y[args[0]-1:args[1]] | ||
|
r111 | else: | ||
|
r172 | y[int(index-1)] = Y[int(index-1)] | ||
|
r111 | return y | ||
|
r172 | |||
|
r111 | @staticmethod | ||
def points(ntx, ipp, width, delay=[0], before=0, after=0, sync=0): | ||||
|
r172 | |||
|
r111 | delays = len(delay) | ||
|
r172 | |||
|
r111 | Y = [(ipp*x+before+delay[x%delays], ipp*x+width+before+delay[x%delays]+after) for x in range(ntx)] | ||
|
r172 | |||
return Y | ||||