Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import argparse
import subprocess
from pathlib import Path
import multiprocessing
from .utils import define_python_interpreter
from ..policy import convert
from ..utils import parse_scontrol
__all__ = ['convert_to_parquet']
DESCRIPTION = """
Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n
Works with data from /data/user, /data/user/home, /data/project, and /scratch
The data are parsed and the directory beneath any of those listed above is set as the top-level
directory ('tld').
"""
EPILOGUE = """\
Local Parallel Processing:
If processing is done via a local parallel pool, the requested cores need to be accessible by the invoking Python process. When run in a Slurm job context where the number of cores were only specified with the --ntasks property, only 1 core will be available to the Python process regardless of the number of cores requested by the job. Instead, use the --cpus-per-task property to set the number of cores paired with --ntasks=1. This will correctly allow the parallel pool to utilize all cores assigned to the job.
"""
def parse_args():
parser = argparse.ArgumentParser(
description=DESCRIPTION,
formatter_class=argparse.RawTextHelpFormatter,
epilog=EPILOGUE
)
parser.add_argument('input',
type=Path,
help="Path to a log file or directory of log files from mmlspolicy to be converted to parquet. If a directory path is given, all files within the directory will be converted. If the --batch option is set, this will be done in a batch array job. Otherwise, it will be done in a local parallel pool using available compute resources.")
parser.add_argument('-o','--output-dir',
type=Path,
default = None,
help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../parquet")
parser.add_argument('--pool-size',type=int,default=None,
help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process")
slurm = parser.add_argument_group(title='Slurm Options')
slurm.add_argument('--batch', action='store_true', default=False,
help="Run the conversion as a batch job. If a directory path is given as an input, an array job will be created to run the conversion in parallel.")
slurm.add_argument('-n','--ntasks', type=int, default=1)
slurm.add_argument('-p','--partition', type=str, default='amd-hdr100')
slurm.add_argument('-t','--time',type=str,default='02:00:00')
slurm.add_argument('-m','--mem',type=str,default='16G')
slurm.add_argument('--slurm-log-dir',type=Path,default='./out',
help='Output log directory. If the directory does not exist, it will be created automatically. Logs will be named convert_%A_%a to differentiate amongs job IDs and task IDs')
interpreter = slurm.add_mutually_exclusive_group()
interpreter.add_argument('--python-path',type=Path,
help="Path to Python interpreter to use for conversion. This interpreter should have access to a pandas library, preferably version >=2.0. If not specified, the path to the active python3 interpreter will be used.")
interpreter.add_argument('--conda-env',type=str,default=None,
help="The name or prefix of a conda environment to activate as opposed to a python3 path")
args = parser.parse_args()
return vars(args)
BATCH_SCRIPT = """\
#!/bin/bash
#
#SBATCH --job-name=parquet-list-%a
#SBATCH --ntasks={ntasks}
#SBATCH --partition={partition}
#SBATCH --time={time}
#SBATCH --mem={mem}
#SBATCH --output={output_log}
#SBATCH --error={error_log}
#SBATCH --array=1-{nlogs}
{env_cmd}
log=$(ls {input}/*.gz | awk "NR==${{SLURM_ARRAY_TASK_ID}} {{ print $1 }}")
convert-to-parquet -o {output_dir} ${{log}}
"""
def setup_slurm_logs(slurm_log_dir):
slurm_log_dir = slurm_log_dir.absolute()
slurm_log_dir.mkdir(exist_ok = True,parents=True,mode = 0o2770)
out_log,err_log = [slurm_log_dir.joinpath('convert_%A_%a.out'),slurm_log_dir.joinpath('convert_%A_%a.err')]
slurm_logs = {'output_log':out_log,'error_log':err_log}
return slurm_logs
def submit_batch(**kwargs):
env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env'))
kwargs.update({"env_cmd":env_cmd})
slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'))
kwargs.update(slurm_logs)
script = BATCH_SCRIPT.format(**kwargs)
subprocess.run(['sbatch'],input=script,shell=True,text=True)
pass
def convert_to_parquet() -> None:
args = parse_args()
if args['output_dir'] is None:
args['output_dir'] = args['input'].parent.parent.joinpath('parquet')
args['output_dir'].mkdir(exist_ok = True, mode = 0o2770)
if args['input'].is_file():
nlogs = 1
else:
logs = list(args.get('input').glob('*.gz'))
nlogs = len(logs)
args.update({'nlogs':nlogs})
if args['batch']:
submit_batch(**args)
elif nlogs > 1:
ncpus,_ = parse_scontrol()
pool_size = args.get('pool_size',ncpus)
with multiprocessing.Pool(processes=pool_size) as pool:
fargs = list(zip(logs,[args['output_dir']]*nlogs))
pool.starmap(convert, fargs)
else:
convert(args['input'],args['output_dir'])
pass