cli.py
183 lines
| 6.3 KiB
| text/x-python
|
PythonLexer
/ schaincli / cli.py
|
r934 | import click | ||
import schainpy | ||||
|
r935 | import subprocess | ||
|
r936 | import os | ||
import sys | ||||
import glob | ||||
|
r944 | save_stdout = sys.stdout | ||
sys.stdout = open('trash', 'w') | ||||
|
r943 | from multiprocessing import cpu_count | ||
from schaincli import templates | ||||
from schainpy import controller_api | ||||
|
r944 | from schainpy.model import Operation, ProcessingUnit | ||
|
r943 | from schainpy.utils import log | ||
|
r944 | from importlib import import_module | ||
from pydoc import locate | ||||
from fuzzywuzzy import process | ||||
sys.stdout = save_stdout | ||||
|
r934 | |||
def print_version(ctx, param, value): | ||||
if not value or ctx.resilient_parsing: | ||||
return | ||||
click.echo(schainpy.__version__) | ||||
ctx.exit() | ||||
|
r944 | |||
|
r943 | cliLogger = log.makelogger('schain cli') | ||
|
r944 | PREFIX = 'experiment' | ||
|
r934 | |||
@click.command() | ||||
@click.option('--version', '-v', is_flag=True, callback=print_version, help='SChain version', type=str) | ||||
|
r936 | @click.option('--xml', '-x', default=None, help='run an XML file', type=click.Path(exists=True, resolve_path=True)) | ||
|
r934 | @click.argument('command', default='run', required=True) | ||
|
r944 | @click.argument('nextcommand', default=None, required=False, type=str) | ||
|
r936 | def main(command, nextcommand, version, xml): | ||
|
r934 | """COMMAND LINE INTERFACE FOR SIGNAL CHAIN - JICAMARCA RADIO OBSERVATORY""" | ||
|
r935 | if xml is not None: | ||
|
r939 | runFromXML(xml) | ||
|
r935 | elif command == 'generate': | ||
|
r934 | generate() | ||
elif command == 'test': | ||||
test() | ||||
|
r936 | elif command == 'run': | ||
|
r944 | runschain(nextcommand) | ||
elif command == 'search': | ||||
search(nextcommand) | ||||
else: | ||||
log.error('Command {} is not defined'.format(command)) | ||||
def check_module(possible, instance): | ||||
def check(x): | ||||
try: | ||||
instancia = locate('schainpy.model.{}'.format(x)) | ||||
return isinstance(instancia(), instance) | ||||
except Exception as e: | ||||
return False | ||||
clean = clean_modules(possible) | ||||
return [x for x in clean if check(x)] | ||||
def clean_modules(module): | ||||
noEndsUnder = [x for x in module if not x.endswith('__')] | ||||
noStartUnder = [x for x in noEndsUnder if not x.startswith('__')] | ||||
noFullUpper = [x for x in noStartUnder if not x.isupper()] | ||||
return noFullUpper | ||||
def search(nextcommand): | ||||
if nextcommand is None: | ||||
log.error('There is no Operation/ProcessingUnit to search') | ||||
elif nextcommand == 'procs': | ||||
module = dir(import_module('schainpy.model')) | ||||
procs = check_module(module, ProcessingUnit) | ||||
try: | ||||
procs.remove('ProcessingUnit') | ||||
except Exception as e: | ||||
pass | ||||
log.success('Current ProcessingUnits are:\n\033[1m{}\033[0m'.format('\n'.join(procs))) | ||||
elif nextcommand == 'operations': | ||||
module = dir(import_module('schainpy.model')) | ||||
noProcs = [x for x in module if not x.endswith('Proc')] | ||||
operations = check_module(noProcs, Operation) | ||||
try: | ||||
operations.remove('Operation') | ||||
except Exception as e: | ||||
pass | ||||
log.success('Current Operations are:\n\033[1m{}\033[0m'.format('\n'.join(operations))) | ||||
else: | ||||
try: | ||||
module = locate('schainpy.model.{}'.format(nextcommand)) | ||||
args = module().getAllowedArgs() | ||||
|
r945 | log.warning('Use this feature with caution. It may not return all the allowed arguments') | ||
|
r944 | try: | ||
args.remove('self') | ||||
except Exception as e: | ||||
pass | ||||
try: | ||||
args.remove('dataOut') | ||||
except Exception as e: | ||||
pass | ||||
|
r945 | if len(args) == 0: | ||
log.success('{} has no arguments'.format(nextcommand)) | ||||
else: | ||||
log.success('Showing arguments of {} are:\n\033[1m{}\033[0m'.format(nextcommand, '\n'.join(args))) | ||||
|
r944 | except Exception as e: | ||
log.error('Module {} does not exists'.format(nextcommand)) | ||||
allModules = dir(import_module('schainpy.model')) | ||||
module = check_module(allModules, Operation) | ||||
module.extend(check_module(allModules, ProcessingUnit)) | ||||
similar = process.extractOne(nextcommand, module)[0] | ||||
|
r945 | log.success('Searching {} instead'.format(similar)) | ||
|
r944 | search(similar) | ||
def runschain(nextcommand): | ||||
if nextcommand is None: | ||||
currentfiles = glob.glob('./{}_*.py'.format(PREFIX)) | ||||
numberfiles = len(currentfiles) | ||||
if numberfiles > 1: | ||||
log.error('There is more than one file to run') | ||||
elif numberfiles == 1: | ||||
subprocess.call(['python ' + currentfiles[0]], shell=True) | ||||
|
r936 | else: | ||
|
r944 | log.error('There is no file to run') | ||
|
r934 | else: | ||
|
r944 | try: | ||
subprocess.call(['python ' + nextcommand], shell=True) | ||||
except Exception as e: | ||||
log.error("I cannot run the file. Does it exists?") | ||||
|
r934 | |||
|
r939 | |||
|
r935 | def basicInputs(): | ||
|
r934 | inputs = {} | ||
inputs['desc'] = click.prompt('Enter a description', default="A schain project", type=str) | ||||
inputs['name'] = click.prompt('Name of the project', default="project", type=str) | ||||
inputs['path'] = click.prompt('Data path', default=os.getcwd(), type=click.Path(exists=True, resolve_path=True)) | ||||
|
r935 | inputs['startDate'] = click.prompt('Start date', default='1970/01/01', type=str) | ||
inputs['endDate'] = click.prompt('End date', default='2017/12/31', type=str) | ||||
|
r934 | inputs['startHour'] = click.prompt('Start hour', default='00:00:00', type=str) | ||
inputs['endHour'] = click.prompt('End hour', default='23:59:59', type=str) | ||||
|
r935 | inputs['figpath'] = inputs['path'] + '/figs' | ||
return inputs | ||||
|
r939 | |||
|
r935 | def generate(): | ||
inputs = basicInputs() | ||||
inputs['multiprocess'] = click.confirm('Is this a multiprocess script?') | ||||
if inputs['multiprocess']: | ||||
inputs['nProcess'] = click.prompt('How many process?', default=cpu_count(), type=int) | ||||
current = templates.multiprocess.format(**inputs) | ||||
else: | ||||
current = templates.basic.format(**inputs) | ||||
|
r944 | scriptname = '{}_{}.py'.format(PREFIX, inputs['name']) | ||
|
r935 | script = open(scriptname, 'w') | ||
try: | ||||
script.write(current) | ||||
|
r944 | log.success('Script {} generated'.format(scriptname)) | ||
|
r935 | except Exception as e: | ||
|
r943 | log.error('I cannot create the file. Do you have writing permissions?') | ||
|
r935 | |||
|
r934 | |||
def test(): | ||||
|
r943 | log.warning('testing') | ||
|
r939 | |||
def runFromXML(filename): | ||||
controller = controller_api.ControllerThread() | ||||
if not controller.readXml(filename): | ||||
return | ||||
plotterObj = controller.useExternalPlotter() | ||||
controller.start() | ||||
plotterObj.start() | ||||
|
r944 | cliLogger("Finishing all processes") | ||
|
r939 | |||
controller.join(5) | ||||
|
r943 | cliLogger("End of script") | ||
|
r939 | return | ||