Newer
Older
import subprocess
from pathlib import Path
import multiprocessing
from ..policy.convert import convert, set_output_filename
from .utils import define_python_interpreter,batch_parser,setup_slurm_logs
from ..utils import parse_scontrol
__all__ = ['convert_to_parquet']
DESCRIPTION = """
Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n
Works with data from /data/user, /data/user/home, /data/project, and /scratch
The data are parsed and the directory beneath any of those listed above is set as the top-level
directory ('tld').
"""
EPILOGUE = """\
Local Parallel Processing:
If processing is done via a local parallel pool, the requested cores need to be accessible by the invoking Python process. When run in a Slurm job context where the number of cores were only specified with the --ntasks property, only 1 core will be available to the Python process regardless of the number of cores requested by the job. Instead, use the --cpus-per-task property to set the number of cores paired with --ntasks=1. This will correctly allow the parallel pool to utilize all cores assigned to the job.
"""
def parse_args():
parser = argparse.ArgumentParser(
description=DESCRIPTION,
parents=[batch_parser(partition='amd-hdr100,express',time='02:00:00',mem='16G',cpus_per_task=1)],
epilog=EPILOGUE
)
parser.add_argument('input',
type=Path,
help="Path to a log file or directory of log files from mmlspolicy to be converted to parquet. If a directory path is given, all files within the directory will be converted. If the --batch option is set, this will be done in a batch array job. Otherwise, it will be done in a local parallel pool using available compute resources.")
parser.add_argument('-o','--output-dir',
type=Path,
default = None,
help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../parquet")
parser.add_argument('--pool-size',type=int,default=None,
help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process")
parser.add_argument('--no-clobber', action='store_true',default=False,
help='When set, skips any log chunks that already have corresponding parquet files. Chunks without a parquet file are processed as normal.')
args = parser.parse_args()
return vars(args)
BATCH_SCRIPT = """\
#!/bin/bash
#
#SBATCH --job-name=parquet-list-%a
#SBATCH --ntasks={ntasks}
#SBATCH --partition={partition}
#SBATCH --time={time}
#SBATCH --mem={mem}
#SBATCH --output={output_log}
#SBATCH --error={error_log}
#SBATCH --array={array_idxs}
log=$(ls {input}/*.gz | sort | awk "NR==${{SLURM_ARRAY_TASK_ID+1}} {{ print $1 }}")
convert-to-parquet {no_clobber_opt} -o {output_dir} ${{log}}
"""
def submit_batch(**kwargs):
env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env'))
kwargs.update({"env_cmd":env_cmd})
slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'parquet')
kwargs.update(slurm_logs)
script = BATCH_SCRIPT.format(**kwargs)
subprocess.run(['sbatch'],input=script,shell=True,text=True)
pass
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
def _find_sequential_indexes(idxs):
if not idxs:
return []
idxs = sorted(idxs)
result = []
start = idxs[0]
prev = idxs[0]
for num in idxs[1:]:
if num == prev + 1:
prev = num
else:
if start == prev:
result.append(str(start))
else:
result.append(f"{start}-{prev}")
start = num
prev = num
# Add the last sequence
if start == prev:
result.append(str(start))
else:
result.append(f"{start}-{prev}")
return result
def _get_missing_indexes(chunks, parquets):
missing_indexes = [
index for index, element in enumerate(chunks) if element not in parquets
]
return missing_indexes
def convert_to_parquet() -> None:
args = parse_args()
if args['output_dir'] is None:
args['output_dir'] = args['input'].parent.joinpath('parquet')
args['output_dir'].mkdir(exist_ok = True, mode = 0o2770)
if args['input'].is_file():
nlogs = 1
else:
logs = list(args.get('input').glob('*.gz'))
nlogs = len(logs)
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
if args["no_clobber"]:
args.update({"no_clobber_opt": "--no-clobber"})
if args['input'].is_dir():
chunks = logs
chunks.sort()
else:
chunks = [args["input"].name]
pqs = [f.name for f in args["output_dir"].glob("*.parquet")]
target_pqs = [set_output_filename(f) for f in chunks]
idxs_to_run = _get_missing_indexes(target_pqs,pqs)
if len(idxs_to_run) == 0:
print("INFO: All log chunks have been converted to parquet. Exiting without processing")
return
array_idxs = _find_sequential_indexes(idxs_to_run)
args['array_idxs'] = ','.join(array_idxs)
else:
args.update(
{
"no_clobber_opt": "",
"array_idxs" : f"0-{nlogs-1}"
}
)
args.update({'nlogs':nlogs})
if args['batch']:
submit_batch(**args)
elif nlogs > 1:
ncpus,_ = parse_scontrol()
pool_size = args.get('pool_size',ncpus)
with multiprocessing.Pool(processes=pool_size) as pool:
fargs = list(zip(logs,[args['output_dir']]*nlogs))
pool.starmap(convert, fargs)
else:
convert(args['input'],args['output_dir'])