Skip to content
Snippets Groups Projects
convert_to_parquet.py 4.24 KiB
Newer Older
import argparse
import subprocess
from pathlib import Path
import multiprocessing
from .utils import define_python_interpreter,batch_parser,setup_slurm_logs
from ..policy import convert
from ..utils import parse_scontrol

__all__ = ['convert_to_parquet']

DESCRIPTION = """
Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n
Works with data from /data/user, /data/user/home, /data/project, and /scratch
The data are parsed and the directory beneath any of those listed above is set as the top-level
directory ('tld').
"""

EPILOGUE = """\
Local Parallel Processing:
    If processing is done via a local parallel pool, the requested cores need to be accessible by the invoking Python process. When run in a Slurm job context where the number of cores were only specified with the --ntasks property, only 1 core will be available to the Python process regardless of the number of cores requested by the job. Instead, use the --cpus-per-task property to set the number of cores paired with --ntasks=1. This will correctly allow the parallel pool to utilize all cores assigned to the job.
"""

def parse_args():
    parser = argparse.ArgumentParser(
        description=DESCRIPTION,
        parents=[batch_parser(partition='amd-hdr100,express',time='02:00:00',mem='16G',cpus_per_task=1)],
        epilog=EPILOGUE
    )
    parser.add_argument('input',
                        type=Path,
                        help="Path to a log file or directory of log files from mmlspolicy to be converted to parquet. If a directory path is given, all files within the directory will be converted. If the --batch option is set, this will be done in a batch array job. Otherwise, it will be done in a local parallel pool using available compute resources.")
    
    parser.add_argument('-o','--output-dir',
                        type=Path,
                        default = None,
                        help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../parquet")
    parser.add_argument('--pool-size',type=int,default=None,
                        help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process")
    parser.add_argument('--no-clobber', action='store_true',default=False,
                        help='When set and existing parquet files are found, immediately exits without any processing')
    args = parser.parse_args()
    return vars(args)

BATCH_SCRIPT = """\
#!/bin/bash
#
#SBATCH --job-name=parquet-list-%a
#SBATCH --ntasks={ntasks}
#SBATCH --partition={partition}
#SBATCH --time={time}
#SBATCH --mem={mem}
#SBATCH --output={output_log}
#SBATCH --error={error_log}
#SBATCH --array=1-{nlogs}

{env_cmd}

log=$(ls {input}/*.gz | awk "NR==${{SLURM_ARRAY_TASK_ID}} {{ print $1 }}")

convert-to-parquet -o {output_dir} ${{log}}
"""


def submit_batch(**kwargs):
    env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env'))
    kwargs.update({"env_cmd":env_cmd})
    
    slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'parquet')
    kwargs.update(slurm_logs)
    
    script = BATCH_SCRIPT.format(**kwargs)

    subprocess.run(['sbatch'],input=script,shell=True,text=True)
    pass

def convert_to_parquet() -> None:
    args = parse_args()
    if args['output_dir'] is None:
        args['output_dir'] = args['input'].parent.joinpath('parquet')
    args['output_dir'].mkdir(exist_ok = True, mode = 0o2770)

    output_files_exist = len(list(args['output_dir'].glob('*.parquet'))) > 0
    if args['no_clobber'] and output_files_exist:
        sys.exit('The output directory already contains parquet files. Exiting')
    
    if args['input'].is_file():
        nlogs = 1
    else:
        logs = list(args.get('input').glob('*.gz'))
        nlogs = len(logs)
    
    args.update({'nlogs':nlogs})

    if args['batch']:
        submit_batch(**args)
    elif nlogs > 1:
        ncpus,_ = parse_scontrol()
        pool_size = args.get('pool_size',ncpus)
        with multiprocessing.Pool(processes=pool_size) as pool:
            fargs = list(zip(logs,[args['output_dir']]*nlogs))
            pool.starmap(convert, fargs)
    else:
        convert(args['input'],args['output_dir'])