Skip to content
Snippets Groups Projects
split_log.py 2.39 KiB
Newer Older
import argparse
import subprocess
from pathlib import Path
from .utils import define_python_interpreter,batch_parser,setup_slurm_logs
from ..policy import split

BATCH_SCRIPT = """\
#!/bin/bash
#
#SBATCH --job-name=split
#SBATCH --ntasks=1
#SBATCH --cpus-per-task={cpus_per_task}
#SBATCH --partition={partition}
#SBATCH --time={time}
#SBATCH --mem={mem}
#SBATCH --output={output_log}
#SBATCH --error={error_log}

{env_cmd}

split-log -o {output_dir} -l {lines} {log}
"""

def parse_args():
    parser = argparse.ArgumentParser(
        description="Splits a GPFS policy log into multiple parts for batch array processing.",
        parents=[batch_parser(cpus_per_task=24,time='02:00:00',mem='8G',gpus=0,partition='amd-hdr100,express')]
    )
    parser.add_argument('log',
                        type=Path,
                        help="Path to a raw GPFS log file. The log can be either uncompressed or gz compressed.")
    
    parser.add_argument('-o','--output-dir',
                        type=Path,
                        default = None,
                        help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../chunks")
    parser.add_argument('-l', '--lines', type=int, default=int(5e6),
                        help="Number of lines to split the log file by")
    parser.add_argument('--no-clobber', action='store_true',default=False,
                        help='When set and existing split logs are found, immediately exits without any processing')

    args = parser.parse_args()
    return vars(args)

def submit_batch(**kwargs):
    env_cmd = define_python_interpreter()
    kwargs.update({"env_cmd":env_cmd})
    
    slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'split')
    kwargs.update(slurm_logs)

    script = BATCH_SCRIPT.format(**kwargs)

    subprocess.run(['sbatch'],input=script,shell=True,text=True)
    pass

def split_log():
    args = parse_args()

    if args['output_dir'] is None:
        args['output_dir'] = args['log'].parent.parent.joinpath('chunks')
    args['output_dir'].mkdir(exist_ok = True, mode = 0o2770)

    output_files_exist = len(list(args['output_dir'].glob('*.gz'))) > 0
    if args['no_clobber'] and output_files_exist:
        sys.exit('The output directory already contains split log files. Exiting')

    if args.get('batch'):
        submit_batch(**args)
    else:
        split(**args)
    pass