models.py
785 lines
| 28.2 KiB
| text/x-python
|
PythonLexer
|
r23 | ||
|
r45 | import ast | |
|
r25 | import json | |
|
r45 | import numpy as np | |
|
r25 | ||
|
r23 | from polymorphic import PolymorphicModel | |
|
r0 | from django.db import models | |
|
r79 | from django.core.urlresolvers import reverse | |
|
r23 | from django.core.validators import MinValueValidator, MaxValueValidator | |
|
r6 | from apps.main.models import Configuration | |
|
r107 | from devices.rc import api | |
|
r111 | from .utils import RCFile | |
|
r79 | ||
|
r0 | # Create your models here. | |
|
r6 | ||
|
r23 | LINE_TYPES = ( | |
|
r45 | ('none', 'Not used'), | |
|
r23 | ('tr', 'Transmission/reception selector signal'), | |
('tx', 'A modulating signal (Transmission pulse)'), | |||
('codes', 'BPSK modulating signal'), | |||
('windows', 'Sample window signal'), | |||
('sync', 'Synchronizing signal'), | |||
('flip', 'IPP related periodic signal'), | |||
('prog_pulses', 'Programmable pulse'), | |||
|
r107 | ('mix', 'Mixed line'), | |
|
r23 | ) | |
|
r79 | SAMPLING_REFS = ( | |
('none', 'No Reference'), | |||
|
r157 | ('begin_baud', 'Begin of the first baud'), | |
|
r79 | ('first_baud', 'Middle of the first baud'), | |
('sub_baud', 'Middle of the sub-baud') | |||
) | |||
DAT_CMDS = { | |||
# Pulse Design commands | |||
'DISABLE' : 0, # Disables pulse generation | |||
'ENABLE' : 24, # Enables pulse generation | |||
'DELAY_START' : 40, # Write delay status to memory | |||
'FLIP_START' : 48, # Write flip status to memory | |||
'SAMPLING_PERIOD' : 64, # Establish Sampling Period | |||
'TX_ONE' : 72, # Output '0' in line TX | |||
'TX_ZERO' : 88, # Output '0' in line TX | |||
'SW_ONE' : 104, # Output '0' in line SW | |||
'SW_ZERO' : 112, # Output '1' in line SW | |||
'RESTART': 120, # Restarts CR8 Firmware | |||
'CONTINUE' : 253, # Function Unknown | |||
# Commands available to new controllers | |||
# In Pulse Design Executable, the clock divisor code is written as 12 at the start, but it should be written as code 22(below) just before the final enable. | |||
'CLOCK_DIVISOR_INIT' : 12, # Specifies Clock Divisor. Legacy command, ignored in the actual .dat conversion | |||
'CLOCK_DIVISOR_LAST' : 22, # Specifies Clock Divisor (default 60 if not included) syntax: 255,22 254,N-1. | |||
'CLOCK_DIVIDER' : 8, | |||
} | |||
|
r6 | class RCConfiguration(Configuration): | |
|
r45 | ||
|
r138 | ipp = models.FloatField(verbose_name='IPP [Km]', validators=[MinValueValidator(1), MaxValueValidator(9000)], default=300) | |
ntx = models.PositiveIntegerField(verbose_name='Number of TX', validators=[MinValueValidator(1), MaxValueValidator(400)], default=1) | |||
|
r112 | clock_in = models.FloatField(verbose_name='Clock in [MHz]', validators=[MinValueValidator(1), MaxValueValidator(80)], default=1) | |
|
r45 | clock_divider = models.PositiveIntegerField(verbose_name='Clock divider', validators=[MinValueValidator(1), MaxValueValidator(256)], default=1) | |
|
r112 | clock = models.FloatField(verbose_name='Clock Master [MHz]', blank=True, default=1) | |
time_before = models.PositiveIntegerField(verbose_name='Time before [μS]', default=12) | |||
time_after = models.PositiveIntegerField(verbose_name='Time after [μS]', default=1) | |||
|
r45 | sync = models.PositiveIntegerField(verbose_name='Synchro delay', default=0) | |
|
r79 | sampling_reference = models.CharField(verbose_name='Sampling Reference', choices=SAMPLING_REFS, default='none', max_length=40) | |
control_tx = models.BooleanField(verbose_name='Control Switch TX', default=False) | |||
control_sw = models.BooleanField(verbose_name='Control Switch SW', default=False) | |||
|
r112 | total_units = models.PositiveIntegerField(default=0) | |
|
r107 | mix = models.BooleanField(default=False) | |
|
r6 | ||
class Meta: | |||
db_table = 'rc_configurations' | |||
|
r79 | ||
def get_absolute_url_plot(self): | |||
return reverse('url_plot_rc_pulses', args=[str(self.id)]) | |||
def get_absolute_url_import(self): | |||
return reverse('url_import_rc_conf', args=[str(self.id)]) | |||
|
r112 | @property | |
def ipp_unit(self): | |||
return '{} ({})'.format(self.ipp, int(self.ipp*self.km2unit)) | |||
|
r79 | @property | |
def us2unit(self): | |||
|
r23 | ||
|
r79 | return self.clock_in/self.clock_divider | |
@property | |||
def km2unit(self): | |||
|
r25 | ||
|
r79 | return 20./3*(self.clock_in/self.clock_divider) | |
|
r85 | def clone(self, **kwargs): | |
lines = self.get_lines() | |||
self.pk = None | |||
self.id = None | |||
for attr, value in kwargs.items(): | |||
setattr(self, attr, value) | |||
self.save() | |||
for line in lines: | |||
line.clone(rc_configuration=self) | |||
return self | |||
|
r107 | def get_lines(self, **kwargs): | |
|
r79 | ''' | |
Retrieve configuration lines | |||
''' | |||
|
r45 | ||
|
r107 | return RCLine.objects.filter(rc_configuration=self.pk, **kwargs) | |
|
r45 | ||
|
r79 | def clean_lines(self): | |
''' | |||
''' | |||
|
r111 | empty_line = RCLineType.objects.get(name='none') | |
|
r79 | ||
for line in self.get_lines(): | |||
line.line_type = empty_line | |||
line.params = '{}' | |||
line.save() | |||
def parms_to_dict(self): | |||
''' | |||
''' | |||
|
r111 | ignored = ('parameters', 'type', 'polymorphic_ctype', 'configuration_ptr', | |
'created_date', 'programmed_date') | |||
|
r79 | data = {} | |
for field in self._meta.fields: | |||
|
r111 | if field.name in ignored: | |
continue | |||
|
r79 | data[field.name] = '{}'.format(field.value_from_object(self)) | |
|
r147 | data['device_id'] = data.pop('device') | |
|
r79 | data['lines'] = [] | |
for line in self.get_lines(): | |||
|
r157 | line_data = json.loads(line.params) | |
|
r79 | if 'TX_ref' in line_data and line_data['TX_ref'] not in (0, '0'): | |
|
r157 | line_data['TX_ref'] = line.get_name() | |
|
r79 | if 'code' in line_data: | |
line_data['code'] = RCLineCode.objects.get(pk=line_data['code']).name | |||
line_data['type'] = line.line_type.name | |||
|
r157 | line_data['name'] = line.get_name() | |
|
r79 | data['lines'].append(line_data) | |
|
r107 | data['delays'] = self.get_delays() | |
data['pulses'] = self.get_pulses() | |||
|
r79 | ||
return data | |||
|
r107 | def dict_to_parms(self, data): | |
''' | |||
''' | |||
self.name = data['name'] | |||
|
r148 | self.ipp = float(data['ipp']) | |
self.ntx = int(data['ntx']) | |||
self.clock_in = float(data['clock_in']) | |||
self.clock_divider = int(data['clock_divider']) | |||
self.clock = float(data['clock']) | |||
|
r107 | self.time_before = data['time_before'] | |
self.time_after = data['time_after'] | |||
self.sync = data['sync'] | |||
self.sampling_reference = data['sampling_reference'] | |||
|
r119 | self.total_units = self.ipp*self.ntx*self.km2unit | |
|
r114 | self.save() | |
|
r107 | self.clean_lines() | |
lines = [] | |||
positions = {'tx':0, 'tr':0} | |||
|
r119 | for i, line_data in enumerate(data['lines']): | |
|
r107 | line_type = RCLineType.objects.get(name=line_data.pop('type')) | |
if line_type.name=='codes': | |||
code = RCLineCode.objects.get(name=line_data['code']) | |||
line_data['code'] = code.pk | |||
line = RCLine.objects.filter(rc_configuration=self, channel=i) | |||
if line: | |||
line = line[0] | |||
line.line_type = line_type | |||
line.params = json.dumps(line_data) | |||
else: | |||
line = RCLine(rc_configuration=self, line_type=line_type, | |||
params=json.dumps(line_data), | |||
channel=i) | |||
if line_type.name=='tx': | |||
line.position = positions['tx'] | |||
positions['tx'] += 1 | |||
if line_type.name=='tr': | |||
line.position = positions['tr'] | |||
positions['tr'] += 1 | |||
line.save() | |||
lines.append(line) | |||
for line, line_data in zip(lines, data['lines']): | |||
if 'TX_ref' in line_data: | |||
params = json.loads(line.params) | |||
if line_data['TX_ref'] in (0, '0'): | |||
params['TX_ref'] = '0' | |||
else: | |||
|
r113 | params['TX_ref'] = [l.pk for l in lines if l.line_type.name=='tx' and line_data['TX_ref'] in l.get_name()][0] | |
|
r107 | line.params = json.dumps(params) | |
line.save() | |||
|
r79 | def get_delays(self): | |
|
r111 | pulses = [line.pulses_as_points() for line in self.get_lines()] | |
|
r79 | points = [tup for tups in pulses for tup in tups] | |
points = set([x for tup in points for x in tup]) | |||
points = list(points) | |||
points.sort() | |||
if points[0]<>0: | |||
points.insert(0, 0) | |||
return [points[i+1]-points[i] for i in range(len(points)-1)] | |||
|
r107 | def get_pulses(self, binary=True): | |
|
r111 | pulses = [line.pulses_as_points() for line in self.get_lines()] | |
|
r107 | points = [tup for tups in pulses for tup in tups] | |
points = set([x for tup in points for x in tup]) | |||
points = list(points) | |||
points.sort() | |||
|
r79 | ||
|
r111 | line_points = [line.pulses_as_points() for line in self.get_lines()] | |
|
r79 | line_points = [[(x, x+y) for x,y in tups] for tups in line_points] | |
line_points = [[t for x in tups for t in x] for tups in line_points] | |||
states = [[1 if x in tups else 0 for tups in line_points] for x in points] | |||
|
r107 | if binary: | |
states.reverse() | |||
states = [int(''.join([str(x) for x in flips]), 2) for flips in states] | |||
return states[:-1] | |||
|
r79 | ||
def add_cmd(self, cmd): | |||
if cmd in DAT_CMDS: | |||
return (255, DAT_CMDS[cmd]) | |||
def add_data(self, value): | |||
return (254, value-1) | |||
def parms_to_binary(self): | |||
''' | |||
Create "dat" stream to be send to CR | |||
''' | |||
data = [] | |||
# create header | |||
data.append(self.add_cmd('DISABLE')) | |||
data.append(self.add_cmd('CONTINUE')) | |||
data.append(self.add_cmd('RESTART')) | |||
if self.control_sw: | |||
data.append(self.add_cmd('SW_ONE')) | |||
else: | |||
data.append(self.add_cmd('SW_ZERO')) | |||
if self.control_tx: | |||
data.append(self.add_cmd('TX_ONE')) | |||
else: | |||
data.append(self.add_cmd('TX_ZERO')) | |||
# write divider | |||
data.append(self.add_cmd('CLOCK_DIVIDER')) | |||
data.append(self.add_data(self.clock_divider)) | |||
# write delays | |||
data.append(self.add_cmd('DELAY_START')) | |||
# first delay is always zero | |||
data.append(self.add_data(1)) | |||
|
r107 | ||
delays = self.get_delays() | |||
|
r79 | ||
for delay in delays: | |||
while delay>252: | |||
data.append(self.add_data(253)) | |||
delay -= 253 | |||
data.append(self.add_data(delay)) | |||
# write flips | |||
data.append(self.add_cmd('FLIP_START')) | |||
|
r107 | ||
states = self.get_pulses(binary=False) | |||
for flips, delay in zip(states, delays): | |||
|
r79 | flips.reverse() | |
flip = int(''.join([str(x) for x in flips]), 2) | |||
data.append(self.add_data(flip+1)) | |||
while delay>252: | |||
data.append(self.add_data(1)) | |||
delay -= 253 | |||
# write sampling period | |||
data.append(self.add_cmd('SAMPLING_PERIOD')) | |||
|
r107 | wins = self.get_lines(line_type__name='windows') | |
|
r79 | if wins: | |
win_params = json.loads(wins[0].params)['params'] | |||
if win_params: | |||
dh = int(win_params[0]['resolution']*self.km2unit) | |||
else: | |||
dh = 1 | |||
else: | |||
dh = 1 | |||
data.append(self.add_data(dh)) | |||
# write enable | |||
data.append(self.add_cmd('ENABLE')) | |||
return '\n'.join(['{}'.format(x) for tup in data for x in tup]) | |||
def update_from_file(self, filename): | |||
''' | |||
Update instance from file | |||
''' | |||
f = RCFile(filename) | |||
|
r107 | self.dict_to_parms(f.data) | |
|
r111 | self.update_pulses() | |
|
r107 | ||
def update_pulses(self): | |||
|
r79 | ||
|
r107 | for line in self.get_lines(): | |
|
r119 | line.update_pulses() | |
|
r111 | ||
|
r157 | def plot_pulses(self, km=False): | |
|
r111 | ||
import matplotlib.pyplot as plt | |||
from bokeh.resources import CDN | |||
from bokeh.embed import components | |||
from bokeh.mpl import to_bokeh | |||
|
r116 | from bokeh.models.tools import WheelZoomTool, ResetTool, PanTool, PreviewSaveTool | |
|
r111 | ||
|
r112 | lines = self.get_lines() | |
|
r111 | ||
N = len(lines) | |||
|
r157 | npoints = self.total_units/self.km2unit if km else self.total_units | |
|
r111 | fig = plt.figure(figsize=(10, 2+N*0.5)) | |
ax = fig.add_subplot(111) | |||
|
r157 | labels = ['IPP'] | |
|
r111 | ||
for i, line in enumerate(lines): | |||
|
r157 | labels.append(line.get_name(channel=True)) | |
l = ax.plot((0, npoints),(N-i-1, N-i-1)) | |||
points = [(tup[0], tup[1]-tup[0]) for tup in line.pulses_as_points(km=km) if tup<>(0,0)] | |||
|
r111 | ax.broken_barh(points, (N-i-1, 0.5), | |
edgecolor=l[0].get_color(), facecolor='none') | |||
|
r157 | n = 0 | |
f = ((self.ntx+50)/100)*5 if ((self.ntx+50)/100)*10>0 else 2 | |||
for x in np.arange(0, npoints, self.ipp if km else self.ipp*self.km2unit): | |||
if n%f==0: | |||
ax.text(x, N, '%s' % n, size=10) | |||
n += 1 | |||
|
r111 | labels.reverse() | |
ax.set_yticklabels(labels) | |||
|
r157 | ||
|
r116 | ax.set_xlabel = 'Units' | |
plot = to_bokeh(fig, use_pandas=False) | |||
|
r111 | plot.tools = [PanTool(dimensions=['width']), WheelZoomTool(dimensions=['width']), ResetTool(), PreviewSaveTool()] | |
return components(plot, CDN) | |||
|
r107 | ||
|
r85 | def status_device(self): | |
return 0 | |||
|
r107 | def stop_device(self): | |
answer = api.disable(ip = self.device.ip_address, | |||
port = self.device.port_address) | |||
if answer[0] != "1": | |||
self.message = answer[0:] | |||
return 0 | |||
self.message = answer[2:] | |||
return 1 | |||
def start_device(self): | |||
answer = api.enable(ip = self.device.ip_address, | |||
port = self.device.port_address) | |||
if answer[0] != "1": | |||
self.message = answer[0:] | |||
return 0 | |||
self.message = answer[2:] | |||
return 1 | |||
def write_device(self): | |||
answer = api.write_config(ip = self.device.ip_address, | |||
port = self.device.port_address, | |||
parms = self.parms_to_dict()) | |||
if answer[0] != "1": | |||
self.message = answer[0:] | |||
return 0 | |||
self.message = answer[2:] | |||
return 1 | |||
|
r23 | class RCLineCode(models.Model): | |
|
r25 | name = models.CharField(max_length=40) | |
|
r23 | bits_per_code = models.PositiveIntegerField(default=0) | |
number_of_codes = models.PositiveIntegerField(default=0) | |||
codes = models.TextField(blank=True, null=True) | |||
class Meta: | |||
db_table = 'rc_line_codes' | |||
|
r25 | ordering = ('name',) | |
def __unicode__(self): | |||
return u'%s' % self.name | |||
|
r23 | ||
|
r107 | ||
|
r23 | class RCLineType(models.Model): | |
name = models.CharField(choices=LINE_TYPES, max_length=40) | |||
description = models.TextField(blank=True, null=True) | |||
params = models.TextField(default='[]') | |||
class Meta: | |||
db_table = 'rc_line_types' | |||
def __unicode__(self): | |||
return u'%s - %s' % (self.name.upper(), self.get_name_display()) | |||
class RCLine(models.Model): | |||
|
r85 | rc_configuration = models.ForeignKey(RCConfiguration, on_delete=models.CASCADE) | |
|
r23 | line_type = models.ForeignKey(RCLineType) | |
channel = models.PositiveIntegerField(default=0) | |||
position = models.PositiveIntegerField(default=0) | |||
params = models.TextField(default='{}') | |||
|
r45 | pulses = models.TextField(default='') | |
|
r23 | ||
class Meta: | |||
db_table = 'rc_lines' | |||
|
r45 | ordering = ['channel'] | |
|
r23 | ||
def __unicode__(self): | |||
|
r79 | if self.rc_configuration: | |
return u'%s - %s' % (self.rc_configuration, self.get_name()) | |||
|
r23 | ||
|
r85 | def clone(self, **kwargs): | |
self.pk = None | |||
for attr, value in kwargs.items(): | |||
setattr(self, attr, value) | |||
self.save() | |||
return self | |||
|
r157 | def get_name(self, channel=False): | |
|
r23 | ||
chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' | |||
|
r111 | s = '' | |
|
r23 | ||
|
r45 | if self.line_type.name in ('tx',): | |
|
r111 | s = chars[self.position] | |
|
r45 | elif self.line_type.name in ('codes', 'windows', 'tr'): | |
|
r111 | if 'TX_ref' in json.loads(self.params): | |
pk = json.loads(self.params)['TX_ref'] | |||
if pk in (0, '0'): | |||
|
r113 | s = ','.join(chars[l.position] for l in self.rc_configuration.get_lines(line_type__name='tx')) | |
|
r111 | else: | |
ref = RCLine.objects.get(pk=pk) | |||
s = chars[ref.position] | |||
|
r113 | s = '({})'.format(s) | |
|
r157 | ||
s = '{}{}'.format(self.line_type.name.upper(), s) | |||
if channel: | |||
return '{} {}'.format(s, self.channel) | |||
|
r25 | else: | |
|
r157 | return s | |
|
r45 | ||
|
r107 | def get_lines(self, **kwargs): | |
return RCLine.objects.filter(rc_configuration=self.rc_configuration, **kwargs) | |||
|
r79 | ||
|
r45 | def pulses_as_array(self): | |
|
r112 | y = np.zeros(self.rc_configuration.total_units) | |
|
r79 | ||
|
r111 | for tup in ast.literal_eval(self.pulses): | |
y[tup[0]:tup[1]] = 1 | |||
return y.astype(np.int8) | |||
|
r79 | ||
|
r157 | def pulses_as_points(self, km=False): | |
|
r79 | ||
|
r157 | if km: | |
unit2km = 1/self.rc_configuration.km2unit | |||
return [(tup[0]*unit2km, tup[1]*unit2km) for tup in ast.literal_eval(self.pulses)] | |||
else: | |||
return ast.literal_eval(self.pulses) | |||
|
r79 | ||
def get_win_ref(self, params, tx_id, km2unit): | |||
ref = self.rc_configuration.sampling_reference | |||
|
r107 | codes = [line for line in self.get_lines(line_type__name='codes') if int(json.loads(line.params)['TX_ref'])==int(tx_id)] | |
if codes: | |||
tx_width = float(json.loads(RCLine.objects.get(pk=tx_id).params)['pulse_width'])*km2unit/len(json.loads(codes[0].params)['codes'][0]) | |||
|
r79 | else: | |
tx_width = float(json.loads(RCLine.objects.get(pk=tx_id).params)['pulse_width'])*km2unit | |||
|
r45 | ||
|
r79 | if ref=='first_baud': | |
return int(1 + (tx_width + 1)/2 + params['first_height']*km2unit - params['resolution']*km2unit) | |||
elif ref=='sub_baud': | |||
return int(1 + params['first_height']*km2unit - params['resolution']*km2unit/2) | |||
else: | |||
return 0 | |||
|
r45 | ||
|
r111 | def update_pulses(self): | |
|
r79 | ''' | |
Update pulses field | |||
''' | |||
km2unit = self.rc_configuration.km2unit | |||
us2unit = self.rc_configuration.us2unit | |||
|
r45 | ipp = self.rc_configuration.ipp | |
ntx = self.rc_configuration.ntx | |||
|
r79 | ipp_u = int(ipp*km2unit) | |
|
r116 | total = ipp_u*ntx if self.rc_configuration.total_units==0 else self.rc_configuration.total_units | |
|
r111 | y = [] | |
|
r45 | ||
if self.line_type.name=='tr': | |||
|
r111 | tr_params = json.loads(self.params) | |
if tr_params['TX_ref'] in ('0', 0): | |||
txs = self.get_lines(line_type__name='tx') | |||
|
r45 | else: | |
|
r115 | txs = RCLine.objects.filter(pk=tr_params['TX_ref']) | |
|
r45 | ||
|
r111 | for tx in txs: | |
params = json.loads(tx.params) | |||
|
r119 | ||
|
r111 | if float(params['pulse_width'])==0: | |
continue | |||
|
r119 | delays = [float(d)*km2unit for d in params['delays'].split(',') if d] | |
|
r111 | width = float(params['pulse_width'])*km2unit+int(self.rc_configuration.time_before*us2unit) | |
before = 0 | |||
after = int(self.rc_configuration.time_after*us2unit) | |||
y_tx = self.points(ntx, ipp_u, width, | |||
delay=delays, | |||
before=before, | |||
after=after, | |||
sync=self.rc_configuration.sync) | |||
ranges = params['range'].split(',') | |||
if len(ranges)>0 and ranges[0]<>'0': | |||
y_tx = self.mask_ranges(y_tx, ranges) | |||
tr_ranges = tr_params['range'].split(',') | |||
if len(tr_ranges)>0 and tr_ranges[0]<>'0': | |||
y_tx = self.mask_ranges(y_tx, tr_ranges) | |||
y.extend(y_tx) | |||
|
r119 | ||
self.pulses = unicode(y) | |||
|
r116 | y = self.array_to_points(self.pulses_as_array()) | |
|
r45 | elif self.line_type.name=='tx': | |
|
r119 | params = json.loads(self.params) | |
|
r111 | delays = [float(d)*km2unit for d in params['delays'].split(',') if d] | |
width = float(params['pulse_width'])*km2unit | |||
|
r79 | ||
|
r111 | if width>0: | |
before = int(self.rc_configuration.time_before*us2unit) | |||
after = 0 | |||
y = self.points(ntx, ipp_u, width, | |||
delay=delays, | |||
before=before, | |||
after=after, | |||
sync=self.rc_configuration.sync) | |||
ranges = params['range'].split(',') | |||
if len(ranges)>0 and ranges[0]<>'0': | |||
y = self.mask_ranges(y, ranges) | |||
|
r45 | ||
elif self.line_type.name=='flip': | |||
|
r111 | n = float(json.loads(self.params)['number_of_flips']) | |
width = n*ipp*km2unit | |||
y = self.points(int((ntx+1)/(2*n)), ipp_u*n*2, width) | |||
|
r45 | ||
elif self.line_type.name=='codes': | |||
params = json.loads(self.params) | |||
tx = RCLine.objects.get(pk=params['TX_ref']) | |||
tx_params = json.loads(tx.params) | |||
|
r111 | delays = [float(d)*km2unit for d in tx_params['delays'].split(',') if d] | |
f = int(float(tx_params['pulse_width'])*km2unit)/len(params['codes'][0]) | |||
codes = [(np.fromstring(''.join([s*f for s in code]), dtype=np.uint8)-48).astype(np.int8) for code in params['codes']] | |||
codes = [self.array_to_points(code) for code in codes] | |||
n = len(codes) | |||
|
r45 | ||
|
r111 | for i, tup in enumerate(tx.pulses_as_points()): | |
code = codes[i%n] | |||
y.extend([(c[0]+tup[0], c[1]+tup[0]) for c in code]) | |||
|
r45 | ranges = tx_params['range'].split(',') | |
if len(ranges)>0 and ranges[0]<>'0': | |||
|
r111 | y = self.mask_ranges(y, ranges) | |
|
r45 | ||
elif self.line_type.name=='sync': | |||
params = json.loads(self.params) | |||
|
r111 | n = ipp_u*ntx | |
|
r45 | if params['invert'] in ('1', 1): | |
|
r111 | y = [(n-1, n)] | |
|
r45 | else: | |
|
r111 | y = [(0, 1)] | |
|
r45 | ||
elif self.line_type.name=='prog_pulses': | |||
params = json.loads(self.params) | |||
if int(params['periodic'])==0: | |||
nntx = 1 | |||
|
r111 | nipp = ipp_u*ntx | |
else: | |||
nntx = ntx | |||
nipp = ipp_u | |||
|
r45 | ||
if 'params' in params and len(params['params'])>0: | |||
|
r111 | for p in params['params']: | |
y_pp = self.points(nntx, nipp, | |||
p['end']-p['begin'], | |||
before=p['begin']) | |||
y.extend(y_pp) | |||
|
r45 | elif self.line_type.name=='windows': | |
|
r111 | params = json.loads(self.params) | |
|
r107 | ||
|
r45 | if 'params' in params and len(params['params'])>0: | |
|
r111 | tr_params = json.loads(self.get_lines(line_type__name='tr')[0].params) | |
tr_ranges = tr_params['range'].split(',') | |||
for p in params['params']: | |||
y_win = self.points(ntx, ipp_u, | |||
p['resolution']*p['number_of_samples']*km2unit, | |||
before=int(self.rc_configuration.time_before*us2unit)+self.get_win_ref(p, params['TX_ref'], km2unit), | |||
sync=self.rc_configuration.sync) | |||
if len(tr_ranges)>0 and tr_ranges[0]<>'0': | |||
y_win = self.mask_ranges(y_win, tr_ranges) | |||
y.extend(y_win) | |||
|
r107 | elif self.line_type.name=='mix': | |
values = self.rc_configuration.parameters.split('-') | |||
|
r116 | confs = [RCConfiguration.objects.get(pk=value.split('|')[0]) for value in values] | |
|
r107 | modes = [value.split('|')[1] for value in values] | |
|
r112 | ops = [value.split('|')[2] for value in values] | |
delays = [value.split('|')[3] for value in values] | |||
masks = [value.split('|')[4] for value in values] | |||
mask = list('{:8b}'.format(int(masks[0]))) | |||
mask.reverse() | |||
if mask[self.channel] in ('0', '', ' '): | |||
|
r116 | y = np.zeros(confs[0].total_units, dtype=np.int8) | |
|
r112 | else: | |
|
r116 | y = confs[0].get_lines(channel=self.channel)[0].pulses_as_array() | |
|
r107 | ||
for i in range(1, len(values)): | |||
mask = list('{:8b}'.format(int(masks[i]))) | |||
mask.reverse() | |||
if mask[self.channel] in ('0', '', ' '): | |||
continue | |||
|
r116 | Y = confs[i].get_lines(channel=self.channel)[0].pulses_as_array() | |
|
r107 | delay = float(delays[i])*km2unit | |
|
r112 | ||
|
r116 | if modes[i]=='P': | |
if delay>0: | |||
if delay<self.rc_configuration.ipp*km2unit and len(Y)==len(y): | |||
y_temp = np.empty_like(Y) | |||
y_temp[:delay] = 0 | |||
y_temp[delay:] = Y[:-delay] | |||
elif delay+len(Y)>len(y): | |||
y_new = np.zeros(delay+len(Y), dtype=np.int8) | |||
y_new[:len(y)] = y | |||
y = y_new | |||
y_temp = np.zeros(delay+len(Y), dtype=np.int8) | |||
y_temp[-len(Y):] = Y | |||
elif delay+len(Y)==len(y): | |||
y_temp = np.zeros(delay+len(Y)) | |||
y_temp[-len(Y):] = Y | |||
elif delay+len(Y)<len(y): | |||
y_temp = np.zeros(len(y), dtype=np.int8) | |||
y_temp[delay:delay+len(Y)] = Y | |||
if ops[i]=='OR': | |||
y = y | y_temp | |||
elif ops[i]=='XOR': | |||
y = y ^ y_temp | |||
elif ops[i]=='AND': | |||
y = y & y_temp | |||
elif ops[i]=='NAND': | |||
y = y & ~y_temp | |||
|
r119 | else: | |
|
r116 | y = np.concatenate([y, Y]) | |
|
r112 | ||
total = len(y) | |||
|
r111 | y = self.array_to_points(y) | |
|
r107 | ||
|
r45 | else: | |
|
r111 | y = [] | |
|
r45 | ||
|
r112 | if self.rc_configuration.total_units <> total: | |
self.rc_configuration.total_units = total | |||
|
r119 | self.rc_configuration.save() | |
|
r112 | ||
|
r116 | self.pulses = unicode(y) | |
|
r111 | self.save() | |
@staticmethod | |||
def array_to_points(X): | |||
d = X[1:]-X[:-1] | |||
up = np.where(d==1)[0] | |||
if X[0]==1: | |||
up = np.concatenate((np.array([-1]), up)) | |||
up += 1 | |||
dw = np.where(d==-1)[0] | |||
if X[-1]==1: | |||
dw = np.concatenate((dw, np.array([len(X)-1]))) | |||
dw += 1 | |||
return [(tup[0], tup[1]) for tup in zip(up, dw)] | |||
@staticmethod | |||
def mask_ranges(Y, ranges): | |||
y = [(0, 0) for __ in Y] | |||
for index in ranges: | |||
if '-' in index: | |||
args = [int(a) for a in index.split('-')] | |||
y[args[0]-1:args[1]] = Y[args[0]-1:args[1]] | |||
else: | |||
y[int(index-1)] = Y[int(index-1)] | |||
return y | |||
@staticmethod | |||
def points(ntx, ipp, width, delay=[0], before=0, after=0, sync=0): | |||
delays = len(delay) | |||
Y = [(ipp*x+before+delay[x%delays], ipp*x+width+before+delay[x%delays]+after) for x in range(ntx)] | |||
|
r107 | ||
|
r111 | return Y |