Skip to content
Snippets Groups Projects
run-submit-pol-job.py 6.91 KiB
Newer Older
#!/bin/python3

import argparse
from pathlib import Path
import subprocess
import re
from exceptions import InvalidDeviceError, ValueError

description = """
Interface for non-privileged users to execute the run-mmpol.sh script with elevated permissions. Calls the 
submit-pol-job wrapper. The default applied policy is ./policy-def/list-path-external but can be changed to 
./policy-def/list-path-dirplus using the --with-dirs flag. No other policy is available via this script.
"""

def parse_args():
    parser = argparse.ArgumentParser(description=description,
                                     formatter_class=argparse.RawTextHelpFormatter
                                     )
    parser.add_argument('-o','--outdir', type=str,
                        help="Directory to store the policy output in",
                        default='/data/rc/gpfs-policy/data')
    parser.add_argument('-f','--log-prefix', type=str,
                        help="Prefix for the policy output file. Defaults to 'list-policy_[device-id]' where 'device-id' is the device stem. The final file name will have the policy type, the job ID, and the run date tagged on the end")
    parser.add_argument('--with-dirs', action='store_true',
                        help="Include directories as entries in the policy output (Default: false)")
    sbatch = parser.add_argument_group('sbatch parameters')
    sbatch.add_argument('-N','--nodes',type=int,default=1,
                        help='Number of nodes to run job across')
    sbatch.add_argument('-c','--cores',type=int,default=16,
                        help='Number of cores to request')
    sbatch.add_argument('-p','--partition',type=str,default='amd-hdr100,medium',
                        help='Partition to submit job to. Can be a comma-separated list of multiple partitions')
    sbatch.add_argument('-t','--time',type=str,default='24:00:00',
                        help='Time limit for job formatted as [D-]HH:MM:SS')
    sbatch.add_argument('-m','--mem-per-cpu',type=str,default='8G',
                        help='Amount of RAM to allocate per core')

    parser.add_argument('--dry-run', action='store_true',
                        help="Do not submit any jobs, run any policies, or create or remove any files or directories."
                             "Used for testing")
    parser.add_argument('device',type=str,
                        help="GPFS fileset/directory apply the policy to. Can be specified as either the name of the"
                             "fileset or the full path to the directory. (Examples: scratch, /data/user/[username])')")
    args = vars(parser.parse_args())
    return args

# Validate that the string supplied to 'device' is either the name of a valid, predefined fileset (only 'scratch' 
# for now) or is a valid path in GPFS. Will not accept 'data' alone. Only valid top-levels in /data are /data/user,
# /home, and /data/project
def validate_device(device):
    device = device.strip()
    if device in ['data','/data']:
        raise InvalidDeviceError("A policy run cannot be performed on the full 'data' fileset. Choose a valid subdirectory such as '/data/user' or '/data/project'")
    
    if device in ['scratch','home']:
        return Path('/').joinpath(device).resolve()

    if device in ['/scratch','/home','/data/user','/data/project']:
        return Path(device).resolve()
    
    p = Path(device).resolve() # resolve given path into absolute path

    # check if p is a valid path and is located in /data or /scratch. If not, raise an exception
    valid_parents = [Path(parent).resolve() for parent in ['/data','/home','/scratch']]
    if p.exists() and any([parent in p.parents for parent in valid_parents]):
        return p
    else:
        raise InvalidDeviceError(f'The path or fileset {device} does not exist within /data or /scratch')

def validate_time(time):
    if not re.match(r'^(?:[0-6]-\d{2}|\d{1,3}):\d{2}:\d{2}$',time):
        raise ValueError("Time must have format [[H]H]H:MM:SS or D-HH:MM:SS")
    
def validate_mem(mem):
    if not re.fullmatch(r'^[\d]+[GM]?$', mem):
        raise ValueError("Mem per CPU must be an integer. May be followed by M or G to denote units")

def validate_partition(partition):
    if len(partition.split('\s')) > 1 and not re.search(',',partition):
        raise ValueError("Multiple partitions should be given as a comma-separated list")
    
    partition = re.sub(r'\s',r'',partition)
    partitions = partition.split(r',')
    cmd=r'sinfo -h -o "%R"'
    avail_partitions=subprocess.run(cmd, shell=True, stdout=subprocess.PIPE).stdout.decode('utf-8').splitlines()
    if any([p for p in partitions if p not in avail_partitions]):
        incorrect = [p for p in partitions if p not in avail_partitions]
        raise ValueError(f"The following partition(s) do not exist: {', '.join(incorrect)}. To see a list of valid partitions, visit https://rc.uab.edu/")

    return partition

def validate_nodes(n):
    if not isinstance(n,int) and n >= 1 and n <= 4:
        raise ValueError('Nodes must be an integer between 1 and 4')

def validate_cores(n):
    if not isinstance(n,int) and n >= 1 and n <= 48:
        raise ValueError('Cores must be an integer between 1 and 48')

# Need to validate that the output directory exists. This will not create a directory that does not already exist.
def validate_output_directory(outdir):
    p = Path(outdir).resolve()

    if not p.is_dir():
        raise ValueError(f"{p} is not a valid output directory")
    return p

def create_default_log_prefix(device):
    mod_device = str(device).strip('/').replace('/','-')
    return f"list-policy_{mod_device}"

def main():
    args = parse_args()
    args['device'] = validate_device(args['device'])

    validate_nodes(args['nodes'])
    validate_cores(args['cores'])
    validate_mem(args['mem_per_cpu'])
    validate_time(args['time'])
    args['partition'] = validate_partition(args['partition'])

    args['outdir'] = validate_output_directory(args['outdir'])

    if args['log_prefix'] is None:
        args['log_prefix'] = create_default_log_prefix(args['device'])
    # Paths to policy definitions are hard-coded here and should not be altered in any way. This script gives elevated 
    # permissions to run the GPFS policy engine to non-admins. These are the only two policy files non-admins should 
    # use. Any other policy needs should go through an admin.
Matthew K Defenderfer's avatar
Matthew K Defenderfer committed
    if args['with_dirs']:
        args['policy'] = './policy-def/list-path-dirplus'
    else:
        args['policy'] = './policy-def/list-path-external'

    if args['dry_run']:
        cmd = "./submit-pol-job -o {outdir} -f {log_prefix} -P {policy} -N {nodes} -c {cores} -p {partition} -t {time} -m {mem_per_cpu} --dry-run {device}".format(**args)
    else:
        cmd = "./submit-pol-job -o {outdir} -f {log_prefix} -P {policy} -N {nodes} -c {cores} -p {partition} -t {time} -m {mem_per_cpu} {device}".format(**args)
    
    print(f"Command: {cmd}")
    subprocess.run(cmd,shell=True)
    exit()

if __name__ == '__main__':
    main()