Skip to content
Snippets Groups Projects
Commit 6f38ecb6 authored by Matthew K Defenderfer's avatar Matthew K Defenderfer
Browse files

Add --no-clobber to hive conversion tools

parent 0249a4d9
No related branches found
No related tags found
1 merge request!59Add --no-clobber to hive conversion tools
......@@ -35,6 +35,11 @@ def parse_args():
type=str,
default='100MiB',
help='Max size of in-memory data for each partition in a single hive dataset. Smaller partitions cause more files to be written. Can pass the byte size as an integer or as a human-readable byte string. For example, 1024 and 1KiB are equivalent.')
parser.add_argument('--no-clobber',
type=bool,
default=False,
action='store_true',
help="Flag to set whether contents of a hive cell will be overwritten. If True, the pipeline will exit if any parquet files are found in the cell directory. No processing will occur in that case. If False (default), any files existing in the cell directory will be removed prior to data writing.")
args = parser.parse_args()
return vars(args)
......
......@@ -36,6 +36,34 @@ def collect_hive_df(parquet_path: str | Path, acq: str, tld: str | List[str] |
print("Finished collecting queries",flush=True)
return df
def _remove_clobbered_cells(df: pl.DataFrame, hive_path: Path):
hive_cells = (
df.select("tld", "acq")
.unique()
.with_columns(
pl.struct("tld", "acq")
.map_elements(
lambda x: str(hive_path.joinpath(f"tld={x['tld']}", f"acq={x['acq']}")),
return_dtype=pl.String,
)
.alias("hive_cell")
)
)
clobbered = []
for s in hive_cells["hive_cell"].to_list():
pqs = list(Path(s).glob("*.parquet"))
if len(pqs) > 0:
clobbered.append(s)
no_clobber = (
df
.join(hive_cells, how="left", on=["tld", "acq"])
.filter(pl.col("hive_cell").is_in(clobbered).not_())
)
return no_clobber
def hivize(
parquet_path: str | Path,
hive_path: str | Path,
......@@ -43,6 +71,7 @@ def hivize(
partition_chunk_size_bytes: int | str = '200MiB',
staging_path: str | Path | None = None,
write_metadata: bool = True,
no_clobber: bool = False,
**kwargs
) -> None:
......@@ -73,6 +102,13 @@ def hivize(
df = collect_hive_df(parquet_path,acq,tld)
if no_clobber:
df = _remove_clobbered_cells(df, hive_path)
if df.is_empty():
print("INFO: All passed tlds already have parquet files in their hive cell directories. Cleaning temp directories and exiting",flush=True)
shutil.rmtree(staging_path)
return
print('Writing to hive')
df.write_parquet(
staging_path,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment