From 259145213e897307aef4395bd6d1a453735229fc Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Tue, 29 Apr 2025 17:20:36 -0500 Subject: [PATCH] Improve hive conversion functionality --- src/rc_gpfs/cli/convert_flat_to_hive.py | 130 +++++++++++++++---- src/rc_gpfs/cli/utils.py | 33 +++++ src/rc_gpfs/policy/hive.py | 161 ++++++++++++------------ src/rc_gpfs/utils/core.py | 13 +- 4 files changed, 235 insertions(+), 102 deletions(-) diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py index 89d5bb3..0fe6463 100644 --- a/src/rc_gpfs/cli/convert_flat_to_hive.py +++ b/src/rc_gpfs/cli/convert_flat_to_hive.py @@ -1,26 +1,29 @@ import argparse import subprocess +import re import time import random from pathlib import Path import polars as pl +from typing import List from ..policy.hive import hivize -from .utils import define_python_interpreter,batch_parser,setup_slurm_logs +from .utils import define_python_interpreter,batch_parser_no_mem,setup_slurm_logs +from ..utils import get_parquet_dataset_size DESCRIPTION = """ Converts flat parquet GPFS datasets to a hive format partitioned by tld and log acquisition date. This creates a timeseries of structured datasets for each tld for much easier more efficient log comparisons within tld. Output parquets are sorted by the path column for convenience, but no index is set. -Setting the --batch flag will create a Slurm array job where each task processes one tld, either from the passed parameter or from the unique values in the parquet dataset. +Setting the --batch flag will create a Slurm array job where each task processes one tld, either from the passed parameter or from the unique values in the parquet dataset. Each tld's in-memory size will be estimated based on header information from the parquet dataset and how many records a given tld has. These estimated sizes are used to group tlds, and each group will be submitted as a separate array job with different requested memory. This helps maximize throughput while also mitigating OOM errors. All processing is done via Polars and so can take advantage of parallel processing. Higher core counts can provide a limited benefit to performance, and a GPU is not required. """ -def parse_args(): +def parse_args(cli_args=None): parser = argparse.ArgumentParser( description=DESCRIPTION, formatter_class=argparse.RawTextHelpFormatter, - parents=[batch_parser(cpus_per_task=16, gpus=0, partition='amd-hdr100', mem='32G',time='02:00:00')] + parents=[batch_parser_no_mem(cpus_per_task=8, gpus=0, partition='amd-hdr100',time='02:00:00')] ) parser.add_argument('parquet_path', type=Path, @@ -35,18 +38,26 @@ def parse_args(): parser.add_argument('--partition-size', dest='partition_chunk_size_bytes', type=str, - default='100MiB', + default='200MiB', help='Max size of in-memory data for each partition in a single hive dataset. Smaller partitions cause more files to be written. Can pass the byte size as an integer or as a human-readable byte string. For example, 1024 and 1KiB are equivalent.') + parser.add_argument('--mem-factor', + type=int, + default=3, + help="Factor to scale each tld's estimated memory by when setting memory requirements in the Slurm job. A factor of 3 accounts for almost all cases. A lower factor can result in more OOM errors but would increase task throughput, and vice versa for a higher factor.") parser.add_argument('--no-clobber', default=False, action='store_true', help="Flag to set whether contents of a hive cell will be overwritten. If True, the pipeline will exit if any parquet files are found in the cell directory. No processing will occur in that case. If False (default), any files existing in the cell directory will be removed prior to data writing.") + parser.add_argument('--dry-run', + default=False, + action='store_true', + help="If set, no jobs will be submitted at the end. Tld grouping still occurs, and the text files with the groups are still written.") - args = parser.parse_args() + args = parser.parse_args(cli_args) return vars(args) SLURM_OPTS = """\ -#SBATCH --job-name=hivize +#SBATCH --job-name=hivize-{mem} #SBATCH --ntasks={ntasks} #SBATCH --cpus-per-task={cpus_per_task} #SBATCH --partition={partition} @@ -78,17 +89,76 @@ def submit_batch(**kwargs): script = f"#!/bin/bash\n#\n{slurm_opts}\n{BATCH_CMDS.format(**kwargs)}" - # Wait between 1 and 5 seconds before batch submission. This helps avoid a situation where this setup is running in - # a batch array job and all of the array tasks submit their child array jobs at the same time. That results in jobs - # failing to be submitted due to overwhelming the scheduler with simultaneous requests. Adding a random delay should - # fix that - time.sleep(random.uniform(1,5)) + if not kwargs['dry_run']: + # Wait between 1 and 5 seconds before batch submission. This helps avoid a situation where this setup is running + # in a batch array job and all of the array tasks submit their child array jobs at the same time. That results + # in jobs failing to be submitted due to overwhelming the scheduler with simultaneous requests. Adding a random + # delay should fix that + time.sleep(random.uniform(1,5)) - subprocess.run(['sbatch'],input=script,shell=True,text=True) + subprocess.run(['sbatch'],input=script,shell=True,text=True) pass -def convert_flat_to_hive(): - args = parse_args() +def submit_tld_groups(df,grp_dir,args): + mem_grp = df[0,'req_mem_grp'] + + tld_file = grp_dir.joinpath(f"tld-{mem_grp}.txt") + + tlds = df["tld"].to_list() + + with open(tld_file,'wt') as f: + f.writelines('\n'.join(tlds)) + + args['ntlds'] = len(tlds) + args['tld_file'] = tld_file + args['mem'] = mem_grp + + submit_batch(**args) + pass + +def get_tld_row_counts(parquet_path: Path) -> pl.DataFrame: + + tld_rows = ( + pl.scan_parquet(parquet_path, parallel="prefiltered") + .group_by("tld") + .agg(pl.col("tld").count().alias("file_count")) + .collect(engine="streaming") + ) + return tld_rows + +def estimate_req_mem( + parquet_path: Path, + tld: List[str] | pl.Series, + mem_factor: int = 3 +) -> pl.DataFrame: + mem_breaks = [8, 16, 32, 64] + mem_labels = ["8G", "16G", "32G", "64G", "128G"] + + dataset_size = get_parquet_dataset_size(parquet_path) / (1024**3) + + tld_sizes = get_tld_row_counts(parquet_path) + + tld_sizes = ( + tld_sizes.with_columns( + (pl.col("file_count") / pl.col("file_count").sum() * dataset_size).alias( + "est_size" + ) + ) + .with_columns((pl.col("est_size") * mem_factor).alias("est_req_mem")) + .with_columns( + ( + pl.col("est_req_mem") + .cut(breaks=mem_breaks, labels=mem_labels) + .alias("req_mem_grp") + ) + ) + .filter(pl.col("tld").is_in(tld)) + ) + + return tld_sizes + +def convert_flat_to_hive(cli_args=None): + args = parse_args(cli_args) if args['tld'] is None: tlds = ( @@ -102,18 +172,34 @@ def convert_flat_to_hive(): ) else: tlds = args['tld'].split(',') + + if args['no_clobber']: + acq = re.search(r"\d{4}-\d{2}-\d{2}", str(args['parquet_path'].absolute())).group(0) + + existing_acq_paths = [p for p in args['hive_path'].rglob(f'*/acq={acq}') if len(list(p.glob('*.parquet'))) > 0] + existing_tlds = [re.search(r"tld=([^/]+)/", str(p)).group(1) for p in existing_acq_paths] + + args['tld'] = [t for t in tlds if t not in existing_tlds] + + if len(args['tld']) == 0: + print("INFO: All tlds already exist, and no-clobber is set. Exiting without converting") + return if args['batch']: - tld_file = args['parquet_path'].parent.joinpath('misc','tld.txt') - tld_file.parent.mkdir(parents=True,exist_ok=True) + req_mem = estimate_req_mem(args['parquet_path'],args['tld'],args['mem_factor']) - with open(tld_file,'wt') as f: - f.writelines('\n'.join(tlds)) + grp_dir = args["parquet_path"].parent.joinpath("misc") + grp_dir.mkdir(parents=True,exist_ok=True) + [f.unlink() for f in grp_dir.glob('tld*.txt')] - args['ntlds'] = len(tlds) - args['tld_file'] = tld_file + req_mem_file = grp_dir.joinpath('tld_est_mem.parquet') + req_mem_file.unlink(missing_ok=True) + + req_mem.write_parquet(req_mem_file) - submit_batch(**args) + for grp, df in req_mem.group_by('req_mem_grp'): + print(f"INFO: Submitting array job for {grp[0]}",flush=True) + submit_tld_groups(df,grp_dir,args) else: _ = args.pop('tld') diff --git a/src/rc_gpfs/cli/utils.py b/src/rc_gpfs/cli/utils.py index 18cb8fb..e879259 100644 --- a/src/rc_gpfs/cli/utils.py +++ b/src/rc_gpfs/cli/utils.py @@ -27,6 +27,39 @@ class CustomHelpFormatter(argparse.MetavarTypeHelpFormatter): actions = sorted(actions, key=lambda x: x.container.title if x.container.title else '') super(CustomHelpFormatter, self).add_arguments(actions) +def batch_parser_no_mem( + cpus_per_task: int | None = None, + gpus: int | None = None, + partition: str | None = None, + time: str | None = "12:00:00", + reservation: str | None = None, + slurm_log_dir: str | Path | None = "./out", + **kwargs, +) -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + add_help=False, formatter_class=CustomHelpFormatter + ) + slurm = parser.add_argument_group(title="Slurm Options") + slurm.add_argument( + "--batch", + action="store_true", + default=False, + help="Convert as a batch array job.", + ) + slurm.add_argument("-n", "--ntasks", type=int, default=1) + slurm.add_argument("-c", "--cpus-per-task", type=int, default=cpus_per_task) + slurm.add_argument("-g", "--gpus", type=int, default=gpus, choices=[0, 1]) + slurm.add_argument("-p", "--partition", type=str, default=partition) + slurm.add_argument("-t", "--time", type=str, default=time) + slurm.add_argument("--reservation", type=str, default=reservation) + slurm.add_argument( + "--slurm-log-dir", + type=Path, + default=slurm_log_dir, + help="Output log directory. If the directory does not exist, it will be created automatically.", + ) + return parser + def batch_parser( cpus_per_task: int | None = None, gpus: int | None = None, diff --git a/src/rc_gpfs/policy/hive.py b/src/rc_gpfs/policy/hive.py index 03d1c5b..db840bc 100644 --- a/src/rc_gpfs/policy/hive.py +++ b/src/rc_gpfs/policy/hive.py @@ -16,53 +16,54 @@ from ..utils import ( calculate_age_distribution ) -def collect_hive_df(parquet_path: str | Path, acq: str, tld: str | List[str] | None = None): - queries = [] - for pq in parquet_path.glob('*.parquet'): - q = ( - pl.scan_parquet(pq,parallel='prefiltered',rechunk=True) - .select(['size','kballoc','access','create','modify','uid','gid','path','tld']) - .with_columns(pl.lit(acq).alias('acq')) +def collect_hive_df( + parquet_path: str | Path, + acq: str, + tld: str | List[str] | None = None, + hive_path: str | Path | None = None, + no_clobber: bool = False +) -> pl.DataFrame: + if not isinstance(tld,list) and tld is not None: + tld = [tld] + + print("Collecting dataframe",flush=True) + + df = ( + pl.scan_parquet(parquet_path, parallel="prefiltered", rechunk=True) + .select( + [ + "size", + "kballoc", + "access", + "create", + "modify", + "uid", + "gid", + "path", + "tld", + ] ) + .with_columns(pl.lit(acq).alias("acq")) + ) - if tld is not None: - q = q.filter(pl.col('tld').is_in(tld)) - - queries.append(q) + if tld is not None: + df = df.filter(pl.col("tld").is_in(tld)) - print("Collecting queries",flush=True) - dfs = pl.collect_all(queries,engine='streaming') - df = pl.concat(dfs).sort('path') - print("Finished collecting queries",flush=True) - return df + if no_clobber: + existing_tlds = _get_existing_hive_cells(hive_path,acq) + df = df.filter(pl.col('tld').is_in(existing_tlds).not_()) -def _remove_clobbered_cells(df: pl.DataFrame, hive_path: Path): - hive_cells = ( - df.select("tld", "acq") - .unique() - .with_columns( - pl.struct("tld", "acq") - .map_elements( - lambda x: str(hive_path.joinpath(f"tld={x['tld']}", f"acq={x['acq']}")), - return_dtype=pl.String, - ) - .alias("hive_cell") - ) - ) + df = df.collect(engine='streaming') - clobbered = [] - for s in hive_cells["hive_cell"].to_list(): - pqs = list(Path(s).glob("*.parquet")) - if len(pqs) > 0: - clobbered.append(s) + print("Finished collecting queries", flush=True) - no_clobber = ( - df - .join(hive_cells, how="left", on=["tld", "acq"]) - .filter(pl.col("hive_cell").is_in(clobbered).not_()) - ) + return df - return no_clobber +def _get_existing_hive_cells(hive_path: str | Path, acq: str): + hive_path = as_path(hive_path) + existing_pq = hive_path.rglob(f"*/acq={acq}/*.parquet") + existing_tlds = list(set([p.parent.parent.name.removeprefix("tld=") for p in existing_pq])) + return existing_tlds def hivize( parquet_path: str | Path, @@ -100,14 +101,15 @@ def hivize( acq = re.search(r'[\d]{4}-[\d]{2}-[\d]{2}',str(parquet_path)).group(0) print(f"DEBUG: Acquisition date is {acq}",flush=True) - df = collect_hive_df(parquet_path,acq,tld) + df = collect_hive_df(parquet_path, acq, tld, hive_path, no_clobber) - if no_clobber: - df = _remove_clobbered_cells(df, hive_path) - if df.is_empty(): - print("INFO: All passed tlds already have parquet files in their hive cell directories. Cleaning temp directories and exiting",flush=True) - shutil.rmtree(staging_path) - return + if df.is_empty(): + print( + "INFO: All passed tlds already have parquet files in their hive cell directories. Aborting", + flush=True, + ) + shutil.rmtree(staging_path) + return print('Writing to hive') df.write_parquet( @@ -119,7 +121,11 @@ def hivize( print("Finished writing hive dataset",flush=True) if write_metadata: - for grp in df['tld'].unique(): + tlds = df["tld"].unique().to_list() + df = df.drop( + [c for c in df.columns if c not in ["tld", "size", "modify", "access"]] + ).lazy() + for grp in tlds: tdf = df.filter(pl.col('tld').eq(grp)) output_dir=staging_path.joinpath(f"tld={grp}",f"acq={acq}") write_dataset_metadata(tdf,output_dir,acq) @@ -128,49 +134,46 @@ def hivize( shutil.rmtree(staging_path) def write_dataset_metadata( - df: pl.DataFrame, + df: pl.DataFrame | pl.LazyFrame, parquet_path: Path, acq: str, **kwargs ) -> dict: - size_df = df[['size']] - size_df = size_df.with_columns(calculate_size_distribution(size_df['size'],**kwargs).alias('grp')) - + df = df.lazy() + size_dist = ( - size_df - .group_by('grp') - .agg([ - pl.sum('size').alias('bytes'), - pl.count('size').alias('file_count') - ]) + df.select("size") + .with_columns( + calculate_size_distribution(pl.col("size"), **kwargs).alias("grp") + ) + .group_by("grp") + .agg([pl.sum("size").alias("bytes"), pl.count("size").alias("file_count")]) + .sort("grp", descending=True) + .collect(engine="streaming") .to_dicts() ) - access_df = df[['access','size']] - access_df = access_df.with_columns(calculate_age_distribution(access_df['access'],acq,**kwargs).alias('grp')) - access_dist = ( - access_df - .group_by('grp') - .agg([ - pl.sum('size').alias('bytes'), - pl.count('size').alias('file_count') - ]) - .sort('grp',descending=True) + df.select("access", "size") + .with_columns( + calculate_age_distribution(pl.col('access'), acq, **kwargs).alias("grp") + ) + .group_by("grp") + .agg([pl.sum("size").alias("bytes"), pl.count("size").alias("file_count")]) + .sort("grp", descending=True) + .collect(engine='streaming') .to_dicts() ) - - modify_df = df[['modify','size']] - modify_df = modify_df.with_columns(calculate_age_distribution(modify_df['modify'],acq,**kwargs).alias('grp')) - + modify_dist = ( - modify_df - .group_by('grp') - .agg([ - pl.sum('size').alias('bytes'), - pl.count('size').alias('file_count') - ]) - .sort('grp',descending=True) + df.select("modify", "size") + .with_columns( + calculate_age_distribution(pl.col("modify"), acq, **kwargs).alias("grp") + ) + .group_by("grp") + .agg([pl.sum("size").alias("bytes"), pl.count("size").alias("file_count")]) + .sort("grp", descending=True) + .collect(engine="streaming") .to_dicts() ) diff --git a/src/rc_gpfs/utils/core.py b/src/rc_gpfs/utils/core.py index ff8fa4c..1409d4f 100644 --- a/src/rc_gpfs/utils/core.py +++ b/src/rc_gpfs/utils/core.py @@ -4,6 +4,7 @@ import subprocess from pathlib import Path from typing import List, Literal, Tuple import polars as pl +import pyarrow.parquet as pq import numpy as np from .units import as_bytes, convert_si, create_size_bin_labels @@ -103,4 +104,14 @@ def calculate_age_distribution( .cast(pl.String) .cast(pl.Enum(age_labels)) ) - return age_grps \ No newline at end of file + return age_grps + +def get_parquet_dataset_size(parquet_path): + tot_size = 0 + + for p in parquet_path.glob("*.parquet"): + md = pq.read_metadata(p) + for rg in range(0, md.num_row_groups): + tot_size += md.row_group(rg).total_byte_size + + return tot_size \ No newline at end of file -- GitLab