|
|
from django.core.management.base import BaseCommand
|
|
|
from apps.main.models import Experiment
|
|
|
from django.shortcuts import get_object_or_404
|
|
|
import os, fnmatch
|
|
|
import time
|
|
|
from datetime import datetime
|
|
|
import requests
|
|
|
|
|
|
class Command(BaseCommand):
|
|
|
"""
|
|
|
Check data acquisition each 05 minutes.
|
|
|
Example:
|
|
|
manage.py restart_reception
|
|
|
"""
|
|
|
def handle(self, *args, **options):
|
|
|
#start = time.time()
|
|
|
time.sleep(15)
|
|
|
restart_acquisition(self)
|
|
|
#end = time.time()
|
|
|
#self.stdout.write(f'TIME: "{end - start}"')
|
|
|
|
|
|
def check_experiment():
|
|
|
if len(Experiment.objects.filter(status=2)) > 0:
|
|
|
return True
|
|
|
else:
|
|
|
return False
|
|
|
|
|
|
def acquisition_start(self, id_exp):
|
|
|
all_status = Experiment.objects.filter(status=2)
|
|
|
check_id = False
|
|
|
|
|
|
if len(all_status) > 0:
|
|
|
check_id = all_status[0].pk
|
|
|
|
|
|
if check_id and check_id == id_exp:
|
|
|
exp = get_object_or_404(Experiment, pk=id_exp)
|
|
|
name = '{}-R@{}'.format(exp.name, datetime.now().strftime('%Y-%m-%dT%H-%M-%S'))
|
|
|
exp.reception_rx.start_device(name_experiment = name, restart = True)
|
|
|
self.stdout.write(f'"{exp.name}" experiment: Data acquisition was restarted')
|
|
|
|
|
|
self.stdout.write(f'Restarting schain...')
|
|
|
datadir_exp = exp.reception_rx.datadir
|
|
|
datadir_exp = datadir_exp.replace(os.environ.get('EXPOSE_NAS', '/DATA_RM/DATA') + '/', '')
|
|
|
datadir_exp = datadir_exp.replace('/rawdata', '')
|
|
|
|
|
|
r = requests.get('http://'+os.environ.get('SCHAIN_SITE', 'sophy-schain')+'/stop')
|
|
|
time.sleep(1)
|
|
|
r = requests.post('http://'+os.environ.get('SCHAIN_SITE', 'sophy-schain')+'/start', json={'name': datadir_exp})
|
|
|
|
|
|
def acquisition_stop(self, id_exp):
|
|
|
all_status = Experiment.objects.filter(status=2)
|
|
|
check_id = False
|
|
|
|
|
|
if len(all_status) > 0:
|
|
|
check_id = all_status[0].pk
|
|
|
|
|
|
if check_id and check_id == id_exp:
|
|
|
exp = get_object_or_404(Experiment, pk=id_exp)
|
|
|
exp.reception_rx.stop_device()
|
|
|
self.stdout.write(f'"{exp.name}" experiment: Data acquisition "{exp.name}" was stopped')
|
|
|
|
|
|
def count_data(last_channel):
|
|
|
pattern = "rf@*.h5"
|
|
|
count = 0
|
|
|
list_channel = os.listdir(last_channel)
|
|
|
|
|
|
for entry in sorted(list_channel):
|
|
|
if fnmatch.fnmatch(entry, pattern):
|
|
|
count = count + 1
|
|
|
return count
|
|
|
|
|
|
def response_data(datadir, old_channel, old_rawdata, new_rawdata, search):
|
|
|
path_channels = {'ch0': True, 'ch1': True}
|
|
|
channel = {'ch0': False, 'ch1': False}
|
|
|
|
|
|
for key, value in path_channels.items():
|
|
|
rootdir = os.path.join(datadir, key)
|
|
|
if os.path.isdir(rootdir):
|
|
|
channel[key] = path_data(os.path.join(datadir, key))
|
|
|
if key in search:
|
|
|
if channel[key]:
|
|
|
if not old_channel[key] or channel[key] != old_channel[key]:
|
|
|
old_rawdata[key] = count_data(channel[key])
|
|
|
time.sleep(1)
|
|
|
new_rawdata[key] = count_data(channel[key])
|
|
|
else:
|
|
|
time.sleep(1)
|
|
|
else:
|
|
|
path_channels[key] = False
|
|
|
|
|
|
return path_channels, channel, old_rawdata, new_rawdata
|
|
|
|
|
|
def path_data(rootdir):
|
|
|
list_=[]
|
|
|
for it in os.scandir(rootdir):
|
|
|
if it.is_dir():
|
|
|
try:
|
|
|
datetime.strptime(it.path.split("/")[-1], "%Y-%m-%dT%H-00-00")
|
|
|
list_.append(it.path)
|
|
|
except ValueError:
|
|
|
pass
|
|
|
|
|
|
list_ = sorted(list_, reverse=True)
|
|
|
try:
|
|
|
return list_[0]
|
|
|
except:
|
|
|
return False
|
|
|
|
|
|
def check_count(datadir):
|
|
|
old_numbers = {'ch0': 0, 'ch1': 0}
|
|
|
new_numbers = {'ch0': 0, 'ch1': 0}
|
|
|
validation = {'ch0': False, 'ch1': False}
|
|
|
channel = {'ch0': False, 'ch1': False}
|
|
|
|
|
|
path_channels, channel, old_numbers, new_numbers = response_data(datadir, channel, old_numbers, new_numbers, ['ch0', 'ch1'])
|
|
|
|
|
|
for key, value in validation.items():
|
|
|
for _ in range(5):
|
|
|
if new_numbers[key] > old_numbers[key]:
|
|
|
validation[key] = True
|
|
|
break
|
|
|
else:
|
|
|
path_channels, channel, old_numbers, new_numbers = response_data(datadir, channel, old_numbers, new_numbers, [key])
|
|
|
|
|
|
return path_channels, channel, validation
|
|
|
|
|
|
def restart_acquisition(self):
|
|
|
if check_experiment():
|
|
|
all_status = Experiment.objects.filter(status=2)
|
|
|
id_exp = all_status[0].pk
|
|
|
datadir_exp = all_status[0].reception_rx.datadir
|
|
|
datadir_exp = datadir_exp.replace(os.environ.get('EXPOSE_NAS', '/DATA_RM/DATA'), '/data')
|
|
|
|
|
|
path_channels, channel, validation = check_count(datadir_exp)
|
|
|
|
|
|
if path_channels['ch0'] and path_channels['ch1']:
|
|
|
# Execute the process
|
|
|
if validation['ch0'] and validation['ch1']:
|
|
|
self.stdout.write(f'Data acquisition is running')
|
|
|
else:
|
|
|
if not channel['ch0'] or not channel['ch1']:
|
|
|
for key, value in channel.items():
|
|
|
if not value:
|
|
|
self.stdout.write(f'No such directory with datetime format "%Y-%m-%dT%H-00-00": channel["{key}"], retry!')
|
|
|
else:
|
|
|
for key, value in validation.items():
|
|
|
if not value:
|
|
|
self.stdout.write(f'No file increment: channel["{key}"]')
|
|
|
|
|
|
acquisition_stop(self, id_exp)
|
|
|
time.sleep(3)
|
|
|
acquisition_start(self, id_exp)
|
|
|
|
|
|
else:
|
|
|
for key, value in path_channels.items():
|
|
|
if not value:
|
|
|
self.stdout.write(f'No such directory: channel["{key}"], fail!')
|