diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py index 355d6082e21780e5f93c7199c49d77f07fa49fa0..ae155bfe65387f0c8e23027b5114891022e1d130 100644 --- a/src/rc_gpfs/cli/convert_flat_to_hive.py +++ b/src/rc_gpfs/cli/convert_flat_to_hive.py @@ -35,6 +35,11 @@ def parse_args(): type=str, default='100MiB', help='Max size of in-memory data for each partition in a single hive dataset. Smaller partitions cause more files to be written. Can pass the byte size as an integer or as a human-readable byte string. For example, 1024 and 1KiB are equivalent.') + parser.add_argument('--no-clobber', + type=bool, + default=False, + action='store_true', + help="Flag to set whether contents of a hive cell will be overwritten. If True, the pipeline will exit if any parquet files are found in the cell directory. No processing will occur in that case. If False (default), any files existing in the cell directory will be removed prior to data writing.") args = parser.parse_args() return vars(args) diff --git a/src/rc_gpfs/policy/hive.py b/src/rc_gpfs/policy/hive.py index 3b485ecba7cad0eb0703746f5c35429aa482ed52..03d1c5bb53f4676e1886587b8d089cac168121c0 100644 --- a/src/rc_gpfs/policy/hive.py +++ b/src/rc_gpfs/policy/hive.py @@ -36,6 +36,34 @@ def collect_hive_df(parquet_path: str | Path, acq: str, tld: str | List[str] | print("Finished collecting queries",flush=True) return df +def _remove_clobbered_cells(df: pl.DataFrame, hive_path: Path): + hive_cells = ( + df.select("tld", "acq") + .unique() + .with_columns( + pl.struct("tld", "acq") + .map_elements( + lambda x: str(hive_path.joinpath(f"tld={x['tld']}", f"acq={x['acq']}")), + return_dtype=pl.String, + ) + .alias("hive_cell") + ) + ) + + clobbered = [] + for s in hive_cells["hive_cell"].to_list(): + pqs = list(Path(s).glob("*.parquet")) + if len(pqs) > 0: + clobbered.append(s) + + no_clobber = ( + df + .join(hive_cells, how="left", on=["tld", "acq"]) + .filter(pl.col("hive_cell").is_in(clobbered).not_()) + ) + + return no_clobber + def hivize( parquet_path: str | Path, hive_path: str | Path, @@ -43,6 +71,7 @@ def hivize( partition_chunk_size_bytes: int | str = '200MiB', staging_path: str | Path | None = None, write_metadata: bool = True, + no_clobber: bool = False, **kwargs ) -> None: @@ -73,6 +102,13 @@ def hivize( df = collect_hive_df(parquet_path,acq,tld) + if no_clobber: + df = _remove_clobbered_cells(df, hive_path) + if df.is_empty(): + print("INFO: All passed tlds already have parquet files in their hive cell directories. Cleaning temp directories and exiting",flush=True) + shutil.rmtree(staging_path) + return + print('Writing to hive') df.write_parquet( staging_path,