Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import argparse
import subprocess
from pathlib import Path
import multiprocessing
from .utils import define_python_interpreter
from ..policy import convert
from ..utils import parse_scontrol
__all__ = ['convert_to_parquet']
DESCRIPTION = """
Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n
Works with data from /data/user, /data/user/home, /data/project, and /scratch
The data are parsed and the directory beneath any of those listed above is set as the top-level
directory ('tld').
"""
EPILOGUE = """\
Local Parallel Processing:
If processing is done via a local parallel pool, the requested cores need to be accessible by the invoking Python process. When run in a Slurm job context where the number of cores were only specified with the --ntasks property, only 1 core will be available to the Python process regardless of the number of cores requested by the job. Instead, use the --cpus-per-task property to set the number of cores paired with --ntasks=1. This will correctly allow the parallel pool to utilize all cores assigned to the job.
"""
def parse_args():
parser = argparse.ArgumentParser(
description=DESCRIPTION,
formatter_class=argparse.RawTextHelpFormatter,
epilog=EPILOGUE
)
parser.add_argument('input',
type=Path,
help="Path to a log file or directory of log files from mmlspolicy to be converted to parquet. If a directory path is given, all files within the directory will be converted. If the --batch option is set, this will be done in a batch array job. Otherwise, it will be done in a local parallel pool using available compute resources.")
parser.add_argument('-o','--output-dir',
type=Path,
default = None,
help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../parquet")
parser.add_argument('--pool-size',type=int,default=None,
help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process")
slurm = parser.add_argument_group(title='Slurm Options')
slurm.add_argument('--batch', action='store_true', default=False,
help="Run the conversion as a batch job. If a directory path is given as an input, an array job will be created to run the conversion in parallel.")
slurm.add_argument('-n','--ntasks', type=int, default=1)
slurm.add_argument('-p','--partition', type=str, default='amd-hdr100')
slurm.add_argument('-t','--time',type=str,default='02:00:00')
slurm.add_argument('-m','--mem',type=str,default='16G')
slurm.add_argument('--slurm-log-dir',type=Path, default='./out',
help='Output log directory. If the directory does not exist, it will be created automatically. Logs will be named convert_%%A_%%a to differentiate amongs job IDs and task IDs')
interpreter = slurm.add_mutually_exclusive_group()
interpreter.add_argument('--python-path',type=Path,
help="Path to Python interpreter to use for conversion. This interpreter should have access to a pandas library, preferably version >=2.0. If not specified, the path to the active python3 interpreter will be used.")
interpreter.add_argument('--conda-env',type=str,default=None,
help="The name or prefix of a conda environment to activate as opposed to a python3 path")
args = parser.parse_args()
return vars(args)
BATCH_SCRIPT = """\
#!/bin/bash
#
#SBATCH --job-name=parquet-list-%a
#SBATCH --ntasks={ntasks}
#SBATCH --partition={partition}
#SBATCH --time={time}
#SBATCH --mem={mem}
#SBATCH --output={output_log}
#SBATCH --error={error_log}
#SBATCH --array=1-{nlogs}
{env_cmd}
log=$(ls {input}/*.gz | awk "NR==${{SLURM_ARRAY_TASK_ID}} {{ print $1 }}")
convert-to-parquet -o {output_dir} ${{log}}
"""
def setup_slurm_logs(slurm_log_dir):
slurm_log_dir = slurm_log_dir.absolute()
slurm_log_dir.mkdir(exist_ok = True,parents=True,mode = 0o2770)
out_log,err_log = [str(slurm_log_dir.joinpath('convert_%A_%a.out')),str(slurm_log_dir.joinpath('convert_%A_%a.err'))]
slurm_logs = {'output_log':out_log,'error_log':err_log}
return slurm_logs
def submit_batch(**kwargs):
env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env'))
kwargs.update({"env_cmd":env_cmd})
slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'))
kwargs.update(slurm_logs)
script = BATCH_SCRIPT.format(**kwargs)
subprocess.run(['sbatch'],input=script,shell=True,text=True)
pass
def convert_to_parquet() -> None:
args = parse_args()
if args['output_dir'] is None:
args['output_dir'] = args['input'].parent.joinpath('parquet')
args['output_dir'].mkdir(exist_ok = True, mode = 0o2770)
if args['input'].is_file():
nlogs = 1
else:
logs = list(args.get('input').glob('*.gz'))
nlogs = len(logs)
args.update({'nlogs':nlogs})
if args['batch']:
submit_batch(**args)
elif nlogs > 1:
ncpus,_ = parse_scontrol()
pool_size = args.get('pool_size',ncpus)
with multiprocessing.Pool(processes=pool_size) as pool:
fargs = list(zip(logs,[args['output_dir']]*nlogs))
pool.starmap(convert, fargs)
else:
convert(args['input'],args['output_dir'])
pass