From 943e956e52e00504f46d687389df588de9f33f83 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Fri, 25 Apr 2025 14:42:03 -0500 Subject: [PATCH 1/3] implement no-clobber in the convert function instead of the CLI function. Allows users to submit full array jobs that only target log chunks which are missing parquet conversions --- src/rc_gpfs/cli/convert_to_parquet.py | 12 ++++-------- src/rc_gpfs/policy/convert.py | 13 ++++++++++++- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/rc_gpfs/cli/convert_to_parquet.py b/src/rc_gpfs/cli/convert_to_parquet.py index 1e00596..b0478dd 100644 --- a/src/rc_gpfs/cli/convert_to_parquet.py +++ b/src/rc_gpfs/cli/convert_to_parquet.py @@ -3,6 +3,8 @@ import argparse import subprocess from pathlib import Path import multiprocessing + +from ..policy.convert import convert from .utils import define_python_interpreter,batch_parser,setup_slurm_logs from ..utils import parse_scontrol @@ -37,7 +39,7 @@ def parse_args(): parser.add_argument('--pool-size',type=int,default=None, help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process") parser.add_argument('--no-clobber', action='store_true',default=False, - help='When set and existing parquet files are found, immediately exits without any processing') + help='When set, skips any log chunks that already have corresponding parquet files. Chunks without a parquet file are processed as normal.') args = parser.parse_args() return vars(args) @@ -51,7 +53,7 @@ BATCH_SCRIPT = """\ #SBATCH --mem={mem} #SBATCH --output={output_log} #SBATCH --error={error_log} -#SBATCH --array=1-{nlogs} +#SBATCH --array={array_idxs} {env_cmd} @@ -77,17 +79,11 @@ def submit_batch(**kwargs): def convert_to_parquet() -> None: args = parse_args() - from ..policy.convert import convert - if args['output_dir'] is None: args['output_dir'] = args['input'].parent.joinpath('parquet') args['output_dir'].mkdir(exist_ok = True, mode = 0o2770) - output_files_exist = len(list(args['output_dir'].glob('*.parquet'))) > 0 - if args['no_clobber'] and output_files_exist: - sys.exit('The output directory already contains parquet files. Exiting') - if args['input'].is_file(): nlogs = 1 else: diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index ab42702..05f5f62 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -31,7 +31,8 @@ def parse_line(line): def convert( input_file: str | Path, output_dir: str | Path | None = None, - output_name: str | Path | None = None + output_name: str | Path | None = None, + no_clobber: bool = False, ) -> None: """ Converts a GPFS log file to parquet format. The data schema assumes the same policy definition from list-path-external and list-path-dirplus. @@ -44,6 +45,8 @@ def convert( Directory path in which to store the file. If the directory does not exist, it will be created. If None, the output directory will be set to ./input_file/../../parquet in accordance with our standard organization. By default None output_name : str | Path | None, optional Name of the output file. The name will be automatically appended with .parquet. If None, the name of the input file will be used. By default None + no_clobber : bool, optional + When set to True, if output_dir/output_name.parquet already exists, exit without overwriting the existing file. If False (default), any existing parquet file will be overwritten """ input_file = as_path(input_file) @@ -60,6 +63,14 @@ def convert( output_path = output_dir.joinpath(output_name) + if output_path.exists() and no_clobber: + print( + "INFO: Output file already exists. Pass no_clobber=False to overwrite any existing output parquet file.", + flush=True, + ) + print("INFO: Cleaning and exiting.",flush=True) + return + output_dir.mkdir(mode=0o2770, exist_ok=True) with gzip.open(input_file,'r') as f: -- GitLab From 5695579205e6371409726ee0aa0f066059bbb62c Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Fri, 25 Apr 2025 15:46:19 -0500 Subject: [PATCH 2/3] split set_output_filename to its own function so it can be used in the CLI function --- src/rc_gpfs/policy/convert.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index 05f5f62..0873e9a 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -28,6 +28,14 @@ def parse_line(line): except: return line +def set_output_filename(input_file,output_name = None): + if output_name is None: + output_name = input_file.with_suffix(".parquet").name + else: + output_name = as_path(output_name).with_suffix(".parquet").name + + return str(output_name) + def convert( input_file: str | Path, output_dir: str | Path | None = None, @@ -55,12 +63,8 @@ def convert( output_dir = as_path(output_dir) else: output_dir = input_file.parent.parent.joinpath('parquet') - - if output_name is None: - output_name = input_file.with_suffix('.parquet').name - else: - output_name = as_path(output_name).with_suffix('.parquet').name + output_name = set_output_filename(input_file,output_name) output_path = output_dir.joinpath(output_name) if output_path.exists() and no_clobber: -- GitLab From 5d1a1f2e328c1ae84e28ba2bc7a563d6d5373c8b Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Fri, 25 Apr 2025 15:48:46 -0500 Subject: [PATCH 3/3] update no-clobber option to apply to each individual log chunk instead of quitting execution if any parquet files are found. also update array job to be 0 indexed to match the index name in the files themselves --- src/rc_gpfs/cli/convert_to_parquet.py | 72 ++++++++++++++++++++++++--- 1 file changed, 66 insertions(+), 6 deletions(-) diff --git a/src/rc_gpfs/cli/convert_to_parquet.py b/src/rc_gpfs/cli/convert_to_parquet.py index b0478dd..0a0f78a 100644 --- a/src/rc_gpfs/cli/convert_to_parquet.py +++ b/src/rc_gpfs/cli/convert_to_parquet.py @@ -1,10 +1,10 @@ -import sys import argparse +import re import subprocess from pathlib import Path import multiprocessing -from ..policy.convert import convert +from ..policy.convert import convert, set_output_filename from .utils import define_python_interpreter,batch_parser,setup_slurm_logs from ..utils import parse_scontrol @@ -57,13 +57,11 @@ BATCH_SCRIPT = """\ {env_cmd} -log=$(ls {input}/*.gz | awk "NR==${{SLURM_ARRAY_TASK_ID}} {{ print $1 }}") +log=$(ls {input}/*.gz | sort | awk "NR==${{SLURM_ARRAY_TASK_ID+1}} {{ print $1 }}") -convert-to-parquet -o {output_dir} ${{log}} +convert-to-parquet {no_clobber_opt} -o {output_dir} ${{log}} """ - - def submit_batch(**kwargs): env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env')) kwargs.update({"env_cmd":env_cmd}) @@ -76,6 +74,39 @@ def submit_batch(**kwargs): subprocess.run(['sbatch'],input=script,shell=True,text=True) pass +def _find_sequential_indexes(idxs): + if not idxs: + return [] + + idxs = sorted(idxs) + result = [] + start = idxs[0] + prev = idxs[0] + + for num in idxs[1:]: + if num == prev + 1: + prev = num + else: + if start == prev: + result.append(str(start)) + else: + result.append(f"{start}-{prev}") + start = num + prev = num + # Add the last sequence + if start == prev: + result.append(str(start)) + else: + result.append(f"{start}-{prev}") + + return result + +def _get_missing_indexes(chunks, parquets): + missing_indexes = [ + index for index, element in enumerate(chunks) if element not in parquets + ] + return missing_indexes + def convert_to_parquet() -> None: args = parse_args() @@ -90,6 +121,35 @@ def convert_to_parquet() -> None: logs = list(args.get('input').glob('*.gz')) nlogs = len(logs) + if args["no_clobber"]: + args.update({"no_clobber_opt": "--no-clobber"}) + + if args['input'].is_dir(): + chunks = logs + chunks.sort() + else: + chunks = [args["input"].name] + + pqs = [f.name for f in args["output_dir"].glob("*.parquet")] + + target_pqs = [set_output_filename(f) for f in chunks] + + idxs_to_run = _get_missing_indexes(target_pqs,pqs) + + if len(idxs_to_run) == 0: + print("INFO: All log chunks have been converted to parquet. Exiting without processing") + return + + array_idxs = _find_sequential_indexes(idxs_to_run) + args['array_idxs'] = ','.join(array_idxs) + else: + args.update( + { + "no_clobber_opt": "", + "array_idxs" : f"0-{nlogs-1}" + } + ) + args.update({'nlogs':nlogs}) if args['batch']: -- GitLab