models.py
251 lines
| 7.8 KiB
| text/x-python
|
PythonLexer
r348 | import ast | |||
import json | ||||
import requests | ||||
import base64 | ||||
import struct | ||||
from struct import pack | ||||
import time | ||||
from django.contrib import messages | ||||
from django.db import models | ||||
from django.urls import reverse | ||||
from django.core.validators import MinValueValidator, MaxValueValidator | ||||
from apps.main.models import Configuration | ||||
r350 | SELECTOR_VALUE = ( | |||
(0, 'disable'), | ||||
(1, 'enable') | ||||
) | ||||
r348 | class GeneratorConfiguration(Configuration): | |||
r350 | periode = models.IntegerField( | |||
r348 | verbose_name='Periode', | |||
blank=False, | ||||
null=False | ||||
) | ||||
r350 | delay = models.IntegerField( | |||
r348 | verbose_name='Delay', | |||
blank=False, | ||||
null=False | ||||
) | ||||
r350 | width = models.IntegerField( | |||
r348 | verbose_name='Width', | |||
blank=False, | ||||
null=False | ||||
) | ||||
r350 | selector = models.IntegerField( | |||
verbose_name='Selector', | ||||
choices=SELECTOR_VALUE, | ||||
r348 | blank=False, | |||
null=False | ||||
) | ||||
class Meta: | ||||
db_table = 'generator_configurations' | ||||
def __str__(self): | ||||
r349 | return str(self.label) | |||
r348 | ||||
def get_absolute_url_plot(self): | ||||
return reverse('url_plot_generator_pulses', args=[str(self.id)]) | ||||
def request(self, cmd, method='get', **kwargs): | ||||
req = getattr(requests, method)(self.device.url(cmd), **kwargs) | ||||
payload = req.json() | ||||
return payload | ||||
def status_device(self): | ||||
try: | ||||
r350 | #self.device.status = 0 | |||
#payload = self.request('status') | ||||
payload = requests.get(self.device.url()) | ||||
print(payload) | ||||
if payload: | ||||
self.device.status = 1 | ||||
r348 | elif payload['status']=='disable': | |||
self.device.status = 2 | ||||
else: | ||||
self.device.status = 1 | ||||
self.device.save() | ||||
self.message = 'Generator status: {}'.format(payload['status']) | ||||
return False | ||||
except Exception as e: | ||||
if 'No route to host' not in str(e): | ||||
self.device.status = 4 | ||||
self.device.save() | ||||
self.message = 'Generator status: {}'.format(str(e)) | ||||
return False | ||||
self.device.save() | ||||
return True | ||||
def reset_device(self): | ||||
try: | ||||
payload = self.request('reset', 'post') | ||||
if payload['reset']=='ok': | ||||
self.message = 'Generator restarted OK' | ||||
self.device.status = 2 | ||||
self.device.save() | ||||
else: | ||||
self.message = 'Generator restart fail' | ||||
self.device.status = 4 | ||||
self.device.save() | ||||
except Exception as e: | ||||
self.message = 'Generator reset: {}'.format(str(e)) | ||||
return False | ||||
return True | ||||
def stop_device(self): | ||||
try: | ||||
command = self.device.url() + "stop" | ||||
r = requests.get(command) | ||||
if r: | ||||
self.device.status = 4 | ||||
self.device.save() | ||||
self.message = 'Generator stopped' | ||||
else: | ||||
self.device.status = 4 | ||||
self.device.save() | ||||
return False | ||||
except Exception as e: | ||||
if 'No route to host' not in str(e): | ||||
self.device.status = 4 | ||||
else: | ||||
self.device.status = 0 | ||||
r350 | #self.message = 'Generator stop: {}'.format(str(e)) | |||
self.message = "Generator can't start, please check network/device connection or IP address/port configuration" | ||||
r348 | self.device.save() | |||
return False | ||||
return True | ||||
def start_device(self): | ||||
r350 | ||||
r348 | try: | |||
generator = GeneratorConfiguration.objects.get(pk=self) | ||||
print(generator) | ||||
r352 | json_trmode = json.dumps({"periode": generator.periode, "Delay": generator.delay, "width": generator.width, "enable": generator.selector}) | |||
r350 | print(json_trmode) | |||
r357 | base64_trmode = base64.standard_b64encode(json_trmode.encode('ascii')) | |||
r348 | print(base64_trmode) | |||
trmode_url = self.device.url() + "trmode?params=" | ||||
r355 | complete_url_trmode = trmode_url + base64_trmode.decode('ascii') | |||
r348 | print(complete_url_trmode) | |||
r = requests.get(complete_url_trmode) | ||||
if r: | ||||
self.device.status = 3 | ||||
self.device.save() | ||||
self.message = 'Generator configured and started' | ||||
else: | ||||
return False | ||||
except Exception as e: | ||||
if 'No route to host' not in str(e): | ||||
self.device.status = 4 | ||||
else: | ||||
self.device.status = 0 | ||||
r350 | #self.message = 'Generator start: {}'.format(str(e)) | |||
self.message = "Generator can't start, please check network/device connection or IP address/port configuration" | ||||
r348 | self.device.save() | |||
return False | ||||
return True | ||||
#def write_device(self, raw=False): | ||||
if not raw: | ||||
clock = RCClock.objects.get(rc_configuration=self) | ||||
print(clock) | ||||
if clock.mode: | ||||
data = {'default': clock.frequency} | ||||
else: | ||||
data = {'manual': [clock.multiplier, clock.divisor, clock.reference]} | ||||
payload = self.request('setfreq', 'post', data=json.dumps(data)) | ||||
print(payload) | ||||
if payload['command'] != 'ok': | ||||
self.message = 'Generator write: {}'.format(payload['command']) | ||||
else: | ||||
self.message = payload['programming'] | ||||
if payload['programming'] == 'fail': | ||||
self.message = 'Generator write: error programming CGS chip' | ||||
values = [] | ||||
for pulse, delay in zip(self.get_pulses(), self.get_delays()): | ||||
while delay>65536: | ||||
values.append((pulse, 65535)) | ||||
delay -= 65536 | ||||
values.append((pulse, delay-1)) | ||||
data = bytearray() | ||||
#reset | ||||
data.extend((128, 0)) | ||||
#disable | ||||
data.extend((129, 0)) | ||||
#SW switch | ||||
if self.control_sw: | ||||
data.extend((130, 2)) | ||||
else: | ||||
data.extend((130, 0)) | ||||
#divider | ||||
data.extend((131, self.clock_divider-1)) | ||||
#enable writing | ||||
data.extend((139, 62)) | ||||
last = 0 | ||||
for tup in values: | ||||
vals = pack('<HH', last^tup[0], tup[1]) | ||||
last = tup[0] | ||||
data.extend((133, vals[1], 132, vals[0], 133, vals[3], 132, vals[2])) | ||||
#enable | ||||
data.extend((129, 1)) | ||||
if raw: | ||||
return b64encode(data) | ||||
try: | ||||
payload = self.request('stop', 'post') | ||||
payload = self.request('reset', 'post') | ||||
#payload = self.request('divider', 'post', data={'divider': self.clock_divider-1}) | ||||
#payload = self.request('write', 'post', data=b64encode(bytearray((139, 62))), timeout=20) | ||||
n = len(data) | ||||
x = 0 | ||||
#while x < n: | ||||
payload = self.request('write', 'post', data=b64encode(data)) | ||||
# x += 1024 | ||||
if payload['write']=='ok': | ||||
self.device.status = 3 | ||||
self.device.save() | ||||
self.message = 'Generator configured and started' | ||||
else: | ||||
self.device.status = 1 | ||||
self.device.save() | ||||
self.message = 'Generator write: {}'.format(payload['write']) | ||||
return False | ||||
#payload = self.request('start', 'post') | ||||
except Exception as e: | ||||
if 'No route to host' not in str(e): | ||||
self.device.status = 4 | ||||
else: | ||||
self.device.status = 0 | ||||
self.message = 'Generator write: {}'.format(str(e)) | ||||
self.device.save() | ||||
return False | ||||
return True | ||||
def get_absolute_url_import(self): | ||||
r349 | return reverse('url_import_generator_conf', args=[str(self.id)]) | |||