Skip to content
Snippets Groups Projects
convert_to_parquet.py 5.44 KiB
Newer Older
import argparse
import subprocess
from pathlib import Path
import multiprocessing
from .utils import define_python_interpreter
from ..policy import convert
from ..utils import parse_scontrol

__all__ = ['convert_to_parquet']

DESCRIPTION = """
Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n
Works with data from /data/user, /data/user/home, /data/project, and /scratch
The data are parsed and the directory beneath any of those listed above is set as the top-level
directory ('tld').
"""

EPILOGUE = """\
Local Parallel Processing:
    If processing is done via a local parallel pool, the requested cores need to be accessible by the invoking Python process. When run in a Slurm job context where the number of cores were only specified with the --ntasks property, only 1 core will be available to the Python process regardless of the number of cores requested by the job. Instead, use the --cpus-per-task property to set the number of cores paired with --ntasks=1. This will correctly allow the parallel pool to utilize all cores assigned to the job.
"""

def parse_args():
    parser = argparse.ArgumentParser(
        description=DESCRIPTION,
        formatter_class=argparse.RawTextHelpFormatter,
        epilog=EPILOGUE
    )
    parser.add_argument('input',
                        type=Path,
                        help="Path to a log file or directory of log files from mmlspolicy to be converted to parquet. If a directory path is given, all files within the directory will be converted. If the --batch option is set, this will be done in a batch array job. Otherwise, it will be done in a local parallel pool using available compute resources.")
    
    parser.add_argument('-o','--output-dir',
                        type=Path,
                        default = None,
                        help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../parquet")
    parser.add_argument('--pool-size',type=int,default=None,
                        help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process")

    slurm = parser.add_argument_group(title='Slurm Options')
    slurm.add_argument('--batch', action='store_true', default=False,
                       help="Run the conversion as a batch job. If a directory path is given as an input, an array job will be created to run the conversion in parallel.")
    slurm.add_argument('-n','--ntasks', type=int, default=1)
    slurm.add_argument('-p','--partition', type=str, default='amd-hdr100')
    slurm.add_argument('-t','--time',type=str,default='02:00:00')
    slurm.add_argument('-m','--mem',type=str,default='16G')
    slurm.add_argument('--slurm-log-dir',type=Path, default='./out',
                       help='Output log directory. If the directory does not exist, it will be created automatically. Logs will be named convert_%%A_%%a to differentiate amongs job IDs and task IDs')
    interpreter = slurm.add_mutually_exclusive_group()
    interpreter.add_argument('--python-path',type=Path,
                       help="Path to Python interpreter to use for conversion. This interpreter should have access to a pandas library, preferably version >=2.0. If not specified, the path to the active python3 interpreter will be used.")
    interpreter.add_argument('--conda-env',type=str,default=None,
                             help="The name or prefix of a conda environment to activate as opposed to a python3 path")
    args = parser.parse_args()
    return vars(args)

BATCH_SCRIPT = """\
#!/bin/bash
#
#SBATCH --job-name=parquet-list-%a
#SBATCH --ntasks={ntasks}
#SBATCH --partition={partition}
#SBATCH --time={time}
#SBATCH --mem={mem}
#SBATCH --output={output_log}
#SBATCH --error={error_log}
#SBATCH --array=1-{nlogs}

{env_cmd}

log=$(ls {input}/*.gz | awk "NR==${{SLURM_ARRAY_TASK_ID}} {{ print $1 }}")

convert-to-parquet -o {output_dir} ${{log}}
"""

def setup_slurm_logs(slurm_log_dir):
    slurm_log_dir = slurm_log_dir.absolute()
    slurm_log_dir.mkdir(exist_ok = True,parents=True,mode = 0o2770)
    out_log,err_log = [str(slurm_log_dir.joinpath('convert_%A_%a.out')),str(slurm_log_dir.joinpath('convert_%A_%a.err'))]
    slurm_logs = {'output_log':out_log,'error_log':err_log}
    return slurm_logs

def submit_batch(**kwargs):
    env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env'))
    kwargs.update({"env_cmd":env_cmd})
    
    slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'))
    kwargs.update(slurm_logs)
    
    script = BATCH_SCRIPT.format(**kwargs)

    subprocess.run(['sbatch'],input=script,shell=True,text=True)
    pass

def convert_to_parquet() -> None:
    args = parse_args()
    if args['output_dir'] is None:
        args['output_dir'] = args['input'].parent.joinpath('parquet')
    
    args['output_dir'].mkdir(exist_ok = True, mode = 0o2770)
    
    if args['input'].is_file():
        nlogs = 1
    else:
        logs = list(args.get('input').glob('*.gz'))
        nlogs = len(logs)
    
    args.update({'nlogs':nlogs})

    if args['batch']:
        submit_batch(**args)
    elif nlogs > 1:
        ncpus,_ = parse_scontrol()
        pool_size = args.get('pool_size',ncpus)
        with multiprocessing.Pool(processes=pool_size) as pool:
            fargs = list(zip(logs,[args['output_dir']]*nlogs))
            pool.starmap(convert, fargs)
    else:
        convert(args['input'],args['output_dir'])
    pass