Newer
Older
import argparse
import subprocess
from pathlib import Path
from .utils import define_python_interpreter,batch_parser,setup_slurm_logs
from ..policy import split
BATCH_SCRIPT = """\
#!/bin/bash
#
#SBATCH --job-name=split
#SBATCH --ntasks=1
#SBATCH --cpus-per-task={cpus_per_task}
#SBATCH --partition={partition}
#SBATCH --time={time}
#SBATCH --mem={mem}
#SBATCH --output={output_log}
#SBATCH --error={error_log}
{env_cmd}
split-log -o {output_dir} -l {lines} {log}
"""
def parse_args():
parser = argparse.ArgumentParser(
description="Splits a GPFS policy log into multiple parts for batch array processing.",
parents=[batch_parser(cpus_per_task=24,time='02:00:00',mem='8G',gpus=0,partition='amd-hdr100,express')]
)
parser.add_argument('log',
type=Path,
help="Path to a raw GPFS log file. The log can be either uncompressed or gz compressed.")
parser.add_argument('-o','--output-dir',
type=Path,
default = None,
help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../chunks")
parser.add_argument('-l', '--lines', type=int, default=int(5e6),
help="Number of lines to split the log file by")
parser.add_argument('--no-clobber', action='store_true',default=False,
help='When set and existing split logs are found, immediately exits without any processing')
args = parser.parse_args()
return vars(args)
def submit_batch(**kwargs):
env_cmd = define_python_interpreter()
kwargs.update({"env_cmd":env_cmd})
slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'split')
kwargs.update(slurm_logs)
script = BATCH_SCRIPT.format(**kwargs)
subprocess.run(['sbatch'],input=script,shell=True,text=True)
pass
def split_log():
args = parse_args()
if args['output_dir'] is None:
args['output_dir'] = args['log'].parent.parent.joinpath('chunks')
args['output_dir'].mkdir(exist_ok = True, mode = 0o2770)
output_files_exist = len(list(args['output_dir'].glob('*.gz'))) > 0
if args['no_clobber'] and output_files_exist:
sys.exit('The output directory already contains split log files. Exiting')
if args.get('batch'):
submit_batch(**args)
else:
split(**args)
pass