From 0416def9e8022b5a93ccdbcd6972f2e8e6455b1e Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Fri, 25 Apr 2025 14:15:49 -0500 Subject: [PATCH] add no-clobber option for hive conversion. Is invoked both at a CLI level as well as a function level --- src/rc_gpfs/cli/convert_flat_to_hive.py | 5 ++++ src/rc_gpfs/policy/hive.py | 36 +++++++++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py index 355d608..ae155bf 100644 --- a/src/rc_gpfs/cli/convert_flat_to_hive.py +++ b/src/rc_gpfs/cli/convert_flat_to_hive.py @@ -35,6 +35,11 @@ def parse_args(): type=str, default='100MiB', help='Max size of in-memory data for each partition in a single hive dataset. Smaller partitions cause more files to be written. Can pass the byte size as an integer or as a human-readable byte string. For example, 1024 and 1KiB are equivalent.') + parser.add_argument('--no-clobber', + type=bool, + default=False, + action='store_true', + help="Flag to set whether contents of a hive cell will be overwritten. If True, the pipeline will exit if any parquet files are found in the cell directory. No processing will occur in that case. If False (default), any files existing in the cell directory will be removed prior to data writing.") args = parser.parse_args() return vars(args) diff --git a/src/rc_gpfs/policy/hive.py b/src/rc_gpfs/policy/hive.py index 3b485ec..03d1c5b 100644 --- a/src/rc_gpfs/policy/hive.py +++ b/src/rc_gpfs/policy/hive.py @@ -36,6 +36,34 @@ def collect_hive_df(parquet_path: str | Path, acq: str, tld: str | List[str] | print("Finished collecting queries",flush=True) return df +def _remove_clobbered_cells(df: pl.DataFrame, hive_path: Path): + hive_cells = ( + df.select("tld", "acq") + .unique() + .with_columns( + pl.struct("tld", "acq") + .map_elements( + lambda x: str(hive_path.joinpath(f"tld={x['tld']}", f"acq={x['acq']}")), + return_dtype=pl.String, + ) + .alias("hive_cell") + ) + ) + + clobbered = [] + for s in hive_cells["hive_cell"].to_list(): + pqs = list(Path(s).glob("*.parquet")) + if len(pqs) > 0: + clobbered.append(s) + + no_clobber = ( + df + .join(hive_cells, how="left", on=["tld", "acq"]) + .filter(pl.col("hive_cell").is_in(clobbered).not_()) + ) + + return no_clobber + def hivize( parquet_path: str | Path, hive_path: str | Path, @@ -43,6 +71,7 @@ def hivize( partition_chunk_size_bytes: int | str = '200MiB', staging_path: str | Path | None = None, write_metadata: bool = True, + no_clobber: bool = False, **kwargs ) -> None: @@ -73,6 +102,13 @@ def hivize( df = collect_hive_df(parquet_path,acq,tld) + if no_clobber: + df = _remove_clobbered_cells(df, hive_path) + if df.is_empty(): + print("INFO: All passed tlds already have parquet files in their hive cell directories. Cleaning temp directories and exiting",flush=True) + shutil.rmtree(staging_path) + return + print('Writing to hive') df.write_parquet( staging_path, -- GitLab