##// END OF EJS Templates
README update
README update

File last commit:

r355:0490296ea16e
r356:63d84cfefb7c
Show More
models.py
251 lines | 7.8 KiB | text/x-python | PythonLexer
import ast
import json
import requests
import base64
import struct
from struct import pack
import time
from django.contrib import messages
from django.db import models
from django.urls import reverse
from django.core.validators import MinValueValidator, MaxValueValidator
from apps.main.models import Configuration
SELECTOR_VALUE = (
(0, 'disable'),
(1, 'enable')
)
class GeneratorConfiguration(Configuration):
periode = models.IntegerField(
verbose_name='Periode',
blank=False,
null=False
)
delay = models.IntegerField(
verbose_name='Delay',
blank=False,
null=False
)
width = models.IntegerField(
verbose_name='Width',
blank=False,
null=False
)
selector = models.IntegerField(
verbose_name='Selector',
choices=SELECTOR_VALUE,
blank=False,
null=False
)
class Meta:
db_table = 'generator_configurations'
def __str__(self):
return str(self.label)
def get_absolute_url_plot(self):
return reverse('url_plot_generator_pulses', args=[str(self.id)])
def request(self, cmd, method='get', **kwargs):
req = getattr(requests, method)(self.device.url(cmd), **kwargs)
payload = req.json()
return payload
def status_device(self):
try:
#self.device.status = 0
#payload = self.request('status')
payload = requests.get(self.device.url())
print(payload)
if payload:
self.device.status = 1
elif payload['status']=='disable':
self.device.status = 2
else:
self.device.status = 1
self.device.save()
self.message = 'Generator status: {}'.format(payload['status'])
return False
except Exception as e:
if 'No route to host' not in str(e):
self.device.status = 4
self.device.save()
self.message = 'Generator status: {}'.format(str(e))
return False
self.device.save()
return True
def reset_device(self):
try:
payload = self.request('reset', 'post')
if payload['reset']=='ok':
self.message = 'Generator restarted OK'
self.device.status = 2
self.device.save()
else:
self.message = 'Generator restart fail'
self.device.status = 4
self.device.save()
except Exception as e:
self.message = 'Generator reset: {}'.format(str(e))
return False
return True
def stop_device(self):
try:
command = self.device.url() + "stop"
r = requests.get(command)
if r:
self.device.status = 4
self.device.save()
self.message = 'Generator stopped'
else:
self.device.status = 4
self.device.save()
return False
except Exception as e:
if 'No route to host' not in str(e):
self.device.status = 4
else:
self.device.status = 0
#self.message = 'Generator stop: {}'.format(str(e))
self.message = "Generator can't start, please check network/device connection or IP address/port configuration"
self.device.save()
return False
return True
def start_device(self):
try:
generator = GeneratorConfiguration.objects.get(pk=self)
print(generator)
json_trmode = json.dumps({"periode": generator.periode, "Delay": generator.delay, "width": generator.width, "enable": generator.selector})
print(json_trmode)
base64_trmode = base64.urlsafe_b64encode(json_trmode.encode('ascii'))
print(base64_trmode)
trmode_url = self.device.url() + "trmode?params="
complete_url_trmode = trmode_url + base64_trmode.decode('ascii')
print(complete_url_trmode)
r = requests.get(complete_url_trmode)
if r:
self.device.status = 3
self.device.save()
self.message = 'Generator configured and started'
else:
return False
except Exception as e:
if 'No route to host' not in str(e):
self.device.status = 4
else:
self.device.status = 0
#self.message = 'Generator start: {}'.format(str(e))
self.message = "Generator can't start, please check network/device connection or IP address/port configuration"
self.device.save()
return False
return True
#def write_device(self, raw=False):
if not raw:
clock = RCClock.objects.get(rc_configuration=self)
print(clock)
if clock.mode:
data = {'default': clock.frequency}
else:
data = {'manual': [clock.multiplier, clock.divisor, clock.reference]}
payload = self.request('setfreq', 'post', data=json.dumps(data))
print(payload)
if payload['command'] != 'ok':
self.message = 'Generator write: {}'.format(payload['command'])
else:
self.message = payload['programming']
if payload['programming'] == 'fail':
self.message = 'Generator write: error programming CGS chip'
values = []
for pulse, delay in zip(self.get_pulses(), self.get_delays()):
while delay>65536:
values.append((pulse, 65535))
delay -= 65536
values.append((pulse, delay-1))
data = bytearray()
#reset
data.extend((128, 0))
#disable
data.extend((129, 0))
#SW switch
if self.control_sw:
data.extend((130, 2))
else:
data.extend((130, 0))
#divider
data.extend((131, self.clock_divider-1))
#enable writing
data.extend((139, 62))
last = 0
for tup in values:
vals = pack('<HH', last^tup[0], tup[1])
last = tup[0]
data.extend((133, vals[1], 132, vals[0], 133, vals[3], 132, vals[2]))
#enable
data.extend((129, 1))
if raw:
return b64encode(data)
try:
payload = self.request('stop', 'post')
payload = self.request('reset', 'post')
#payload = self.request('divider', 'post', data={'divider': self.clock_divider-1})
#payload = self.request('write', 'post', data=b64encode(bytearray((139, 62))), timeout=20)
n = len(data)
x = 0
#while x < n:
payload = self.request('write', 'post', data=b64encode(data))
# x += 1024
if payload['write']=='ok':
self.device.status = 3
self.device.save()
self.message = 'Generator configured and started'
else:
self.device.status = 1
self.device.save()
self.message = 'Generator write: {}'.format(payload['write'])
return False
#payload = self.request('start', 'post')
except Exception as e:
if 'No route to host' not in str(e):
self.device.status = 4
else:
self.device.status = 0
self.message = 'Generator write: {}'.format(str(e))
self.device.save()
return False
return True
def get_absolute_url_import(self):
return reverse('url_import_generator_conf', args=[str(self.id)])