restart_reception.py
156 lines
| 5.6 KiB
| text/x-python
|
PythonLexer
r443 | from django.core.management.base import BaseCommand | ||
from apps.main.models import Experiment | |||
from django.shortcuts import get_object_or_404 | |||
import os, fnmatch | |||
import time | |||
from datetime import datetime | |||
import requests | |||
class Command(BaseCommand): | |||
""" | |||
Check data acquisition each 05 minutes. | |||
Example: | |||
manage.py restart_reception | |||
""" | |||
def handle(self, *args, **options): | |||
#start = time.time() | |||
time.sleep(15) | |||
restart_acquisition(self) | |||
#end = time.time() | |||
#self.stdout.write(f'TIME: "{end - start}"') | |||
def check_experiment(): | |||
if len(Experiment.objects.filter(status=2)) > 0: | |||
return True | |||
else: | |||
return False | |||
def acquisition_start(self, id_exp): | |||
all_status = Experiment.objects.filter(status=2) | |||
check_id = False | |||
if len(all_status) > 0: | |||
check_id = all_status[0].pk | |||
if check_id and check_id == id_exp: | |||
exp = get_object_or_404(Experiment, pk=id_exp) | |||
name = '{}-R@{}'.format(exp.name, datetime.now().strftime('%Y-%m-%dT%H-%M-%S')) | |||
exp.reception_rx.start_device(name_experiment = name, restart = True) | |||
self.stdout.write(f'"{exp.name}" experiment: Data acquisition was restarted') | |||
r444 | |||
r443 | self.stdout.write(f'Restarting schain...') | ||
r444 | datadir_exp = exp.reception_rx.datadir | ||
datadir_exp = datadir_exp.replace(os.environ.get('EXPOSE_NAS', '/DATA_RM/DATA') + '/', '') | |||
datadir_exp = datadir_exp.replace('/rawdata', '') | |||
r443 | |||
r = requests.get('http://'+os.environ.get('SCHAIN_SITE', 'sophy-schain')+'/stop') | |||
time.sleep(1) | |||
r444 | r = requests.post('http://'+os.environ.get('SCHAIN_SITE', 'sophy-schain')+'/start', json={'name': datadir_exp}) | ||
r443 | |||
def acquisition_stop(self, id_exp): | |||
all_status = Experiment.objects.filter(status=2) | |||
check_id = False | |||
if len(all_status) > 0: | |||
check_id = all_status[0].pk | |||
if check_id and check_id == id_exp: | |||
exp = get_object_or_404(Experiment, pk=id_exp) | |||
exp.reception_rx.stop_device() | |||
self.stdout.write(f'"{exp.name}" experiment: Data acquisition "{exp.name}" was stopped') | |||
def count_data(last_channel): | |||
pattern = "rf@*.h5" | |||
count = 0 | |||
list_channel = os.listdir(last_channel) | |||
for entry in sorted(list_channel): | |||
if fnmatch.fnmatch(entry, pattern): | |||
count = count + 1 | |||
return count | |||
def response_data(datadir, old_channel, old_rawdata, new_rawdata, search): | |||
path_channels = {'ch0': True, 'ch1': True} | |||
channel = {'ch0': False, 'ch1': False} | |||
for key, value in path_channels.items(): | |||
rootdir = os.path.join(datadir, key) | |||
if os.path.isdir(rootdir): | |||
channel[key] = path_data(os.path.join(datadir, key)) | |||
if key in search: | |||
if channel[key]: | |||
if not old_channel[key] or channel[key] != old_channel[key]: | |||
old_rawdata[key] = count_data(channel[key]) | |||
time.sleep(1) | |||
new_rawdata[key] = count_data(channel[key]) | |||
else: | |||
time.sleep(1) | |||
else: | |||
path_channels[key] = False | |||
return path_channels, channel, old_rawdata, new_rawdata | |||
def path_data(rootdir): | |||
list_=[] | |||
for it in os.scandir(rootdir): | |||
if it.is_dir(): | |||
try: | |||
datetime.strptime(it.path.split("/")[-1], "%Y-%m-%dT%H-00-00") | |||
list_.append(it.path) | |||
except ValueError: | |||
pass | |||
list_ = sorted(list_, reverse=True) | |||
try: | |||
return list_[0] | |||
except: | |||
return False | |||
def check_count(datadir): | |||
old_numbers = {'ch0': 0, 'ch1': 0} | |||
new_numbers = {'ch0': 0, 'ch1': 0} | |||
validation = {'ch0': False, 'ch1': False} | |||
channel = {'ch0': False, 'ch1': False} | |||
path_channels, channel, old_numbers, new_numbers = response_data(datadir, channel, old_numbers, new_numbers, ['ch0', 'ch1']) | |||
for key, value in validation.items(): | |||
for _ in range(5): | |||
if new_numbers[key] > old_numbers[key]: | |||
validation[key] = True | |||
break | |||
else: | |||
path_channels, channel, old_numbers, new_numbers = response_data(datadir, channel, old_numbers, new_numbers, [key]) | |||
return path_channels, channel, validation | |||
def restart_acquisition(self): | |||
if check_experiment(): | |||
all_status = Experiment.objects.filter(status=2) | |||
id_exp = all_status[0].pk | |||
datadir_exp = all_status[0].reception_rx.datadir | |||
datadir_exp = datadir_exp.replace(os.environ.get('EXPOSE_NAS', '/DATA_RM/DATA'), '/data') | |||
path_channels, channel, validation = check_count(datadir_exp) | |||
if path_channels['ch0'] and path_channels['ch1']: | |||
# Execute the process | |||
if validation['ch0'] and validation['ch1']: | |||
self.stdout.write(f'Data acquisition is running') | |||
else: | |||
if not channel['ch0'] or not channel['ch1']: | |||
for key, value in channel.items(): | |||
if not value: | |||
self.stdout.write(f'No such directory with datetime format "%Y-%m-%dT%H-00-00": channel["{key}"], retry!') | |||
else: | |||
for key, value in validation.items(): | |||
if not value: | |||
self.stdout.write(f'No file increment: channel["{key}"]') | |||
acquisition_stop(self, id_exp) | |||
time.sleep(3) | |||
acquisition_start(self, id_exp) | |||
else: | |||
for key, value in path_channels.items(): | |||
if not value: | |||
self.stdout.write(f'No such directory: channel["{key}"], fail!') |