Newer
Older
import argparse
import subprocess
from pathlib import Path
import multiprocessing
from .utils import define_python_interpreter,batch_parser,setup_slurm_logs
from ..policy import convert
from ..utils import parse_scontrol
__all__ = ['convert_to_parquet']
DESCRIPTION = """
Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n
Works with data from /data/user, /data/user/home, /data/project, and /scratch
The data are parsed and the directory beneath any of those listed above is set as the top-level
directory ('tld').
"""
EPILOGUE = """\
Local Parallel Processing:
If processing is done via a local parallel pool, the requested cores need to be accessible by the invoking Python process. When run in a Slurm job context where the number of cores were only specified with the --ntasks property, only 1 core will be available to the Python process regardless of the number of cores requested by the job. Instead, use the --cpus-per-task property to set the number of cores paired with --ntasks=1. This will correctly allow the parallel pool to utilize all cores assigned to the job.
"""
def parse_args():
parser = argparse.ArgumentParser(
description=DESCRIPTION,
parents=[batch_parser(partition='amd-hdr100,express',time='02:00:00',mem='16G',cpus_per_task=1)],
epilog=EPILOGUE
)
parser.add_argument('input',
type=Path,
help="Path to a log file or directory of log files from mmlspolicy to be converted to parquet. If a directory path is given, all files within the directory will be converted. If the --batch option is set, this will be done in a batch array job. Otherwise, it will be done in a local parallel pool using available compute resources.")
parser.add_argument('-o','--output-dir',
type=Path,
default = None,
help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../parquet")
parser.add_argument('--pool-size',type=int,default=None,
help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process")
parser.add_argument('--no-clobber', action='store_true',default=False,
help='When set and existing parquet files are found, immediately exits without any processing')
args = parser.parse_args()
return vars(args)
BATCH_SCRIPT = """\
#!/bin/bash
#
#SBATCH --job-name=parquet-list-%a
#SBATCH --ntasks={ntasks}
#SBATCH --partition={partition}
#SBATCH --time={time}
#SBATCH --mem={mem}
#SBATCH --output={output_log}
#SBATCH --error={error_log}
#SBATCH --array=1-{nlogs}
{env_cmd}
log=$(ls {input}/*.gz | awk "NR==${{SLURM_ARRAY_TASK_ID}} {{ print $1 }}")
convert-to-parquet -o {output_dir} ${{log}}
"""
def submit_batch(**kwargs):
env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env'))
kwargs.update({"env_cmd":env_cmd})
slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'parquet')
kwargs.update(slurm_logs)
script = BATCH_SCRIPT.format(**kwargs)
subprocess.run(['sbatch'],input=script,shell=True,text=True)
pass
def convert_to_parquet() -> None:
args = parse_args()
if args['output_dir'] is None:
args['output_dir'] = args['input'].parent.joinpath('parquet')
args['output_dir'].mkdir(exist_ok = True, mode = 0o2770)
output_files_exist = len(list(args['output_dir'].glob('*.parquet'))) > 0
if args['no_clobber'] and output_files_exist:
sys.exit('The output directory already contains parquet files. Exiting')
if args['input'].is_file():
nlogs = 1
else:
logs = list(args.get('input').glob('*.gz'))
nlogs = len(logs)
args.update({'nlogs':nlogs})
if args['batch']:
submit_batch(**args)
elif nlogs > 1:
ncpus,_ = parse_scontrol()
pool_size = args.get('pool_size',ncpus)
with multiprocessing.Pool(processes=pool_size) as pool:
fargs = list(zip(logs,[args['output_dir']]*nlogs))
pool.starmap(convert, fargs)
else:
convert(args['input'],args['output_dir'])