From 248238ff277bedc8b6ca11d823dab694a36b911c Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Mon, 28 Apr 2025 14:15:55 -0500
Subject: [PATCH 01/11] add no-clobber to CLI function instead of relying on it
 only in the inner function

---
 src/rc_gpfs/cli/convert_flat_to_hive.py | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py
index 89d5bb3..b1c3cf8 100644
--- a/src/rc_gpfs/cli/convert_flat_to_hive.py
+++ b/src/rc_gpfs/cli/convert_flat_to_hive.py
@@ -1,5 +1,6 @@
 import argparse
 import subprocess
+import re
 import time
 import random
 from pathlib import Path
@@ -102,6 +103,17 @@ def convert_flat_to_hive():
         )
     else:
         tlds = args['tld'].split(',')
+
+    if args['no_clobber']:
+        acq = re.search(r"\d{4}-\d{2}-\d{2}", str(args['parquet_path'].absolute())).group(0)
+
+        existing_acq_paths = [p for p in args['hive_path'].rglob(f'*/acq={acq}') if len(list(p.glob('*.parquet'))) > 0]
+        existing_tlds = [re.search(r"tld=([^/]+)/", str(p)).group(1) for p in existing_acq_paths]
+
+        args['tld'] = [t for t in tlds if t not in existing_tlds]
+
+        if len(args['tld']) == 0:
+            print("INFO: All tlds already exist, and no-clobber is set. Exiting with converting")
     
     if args['batch']:
         tld_file = args['parquet_path'].parent.joinpath('misc','tld.txt')
-- 
GitLab


From 60660f5aed4f66bc251bc11e7090ec673977755d Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Mon, 28 Apr 2025 15:22:50 -0500
Subject: [PATCH 02/11] add automatic conversion to list if tld is not a list
 already

---
 src/rc_gpfs/policy/hive.py | 9 ++-------
 1 file changed, 2 insertions(+), 7 deletions(-)

diff --git a/src/rc_gpfs/policy/hive.py b/src/rc_gpfs/policy/hive.py
index 03d1c5b..fd5b2a8 100644
--- a/src/rc_gpfs/policy/hive.py
+++ b/src/rc_gpfs/policy/hive.py
@@ -17,13 +17,8 @@ from ..utils import (
 )
 
 def collect_hive_df(parquet_path: str | Path,  acq: str, tld: str | List[str] | None = None):
-    queries = []
-    for pq in parquet_path.glob('*.parquet'):
-        q = (
-            pl.scan_parquet(pq,parallel='prefiltered',rechunk=True)
-            .select(['size','kballoc','access','create','modify','uid','gid','path','tld'])
-            .with_columns(pl.lit(acq).alias('acq'))
-        )
+    if not isinstance(tld,list) and tld is not None:
+        tld = [tld]
 
         if tld is not None:
             q = q.filter(pl.col('tld').is_in(tld))
-- 
GitLab


From 396dd4c71979245fc42ad018b80c9321f9a36885 Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Mon, 28 Apr 2025 15:23:48 -0500
Subject: [PATCH 03/11] replace multi-query method with more straightforward
 scan and filter combination

---
 src/rc_gpfs/policy/hive.py | 29 +++++++++++++++++++++++------
 1 file changed, 23 insertions(+), 6 deletions(-)

diff --git a/src/rc_gpfs/policy/hive.py b/src/rc_gpfs/policy/hive.py
index fd5b2a8..e3abc51 100644
--- a/src/rc_gpfs/policy/hive.py
+++ b/src/rc_gpfs/policy/hive.py
@@ -19,15 +19,32 @@ from ..utils import (
 def collect_hive_df(parquet_path: str | Path,  acq: str, tld: str | List[str] | None = None):
     if not isinstance(tld,list) and tld is not None:
         tld = [tld]
+    
+    print("Collecting dataframe",flush=True)
+    
+    df = (
+        pl.scan_parquet(parquet_path, parallel="prefiltered")
+        .select(
+            [
+                "size",
+                "kballoc",
+                "access",
+                "create",
+                "modify",
+                "uid",
+                "gid",
+                "path",
+                "tld",
+            ]
+        )
+        .with_columns(pl.lit(acq).alias("acq"))
+    )
 
-        if tld is not None:
-            q = q.filter(pl.col('tld').is_in(tld))
+    if tld is not None:
+        df = df.filter(pl.col("tld").is_in(tld))
 
-        queries.append(q)
+    df = df.collect(engine='streaming')
 
-    print("Collecting queries",flush=True)
-    dfs = pl.collect_all(queries,engine='streaming')
-    df = pl.concat(dfs).sort('path')
     print("Finished collecting queries",flush=True)
     return df
 
-- 
GitLab


From 46f2825b4cc7dcb33846b4724aa7632dce7bc724 Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Mon, 28 Apr 2025 15:25:09 -0500
Subject: [PATCH 04/11] improve memory requirement but not performing any join,
 only filtering on if a tld already has parquet files in the specific hive
 cell, and using lazyframes wherever possible with streaming

---
 src/rc_gpfs/policy/hive.py | 27 ++++++++++++++++-----------
 1 file changed, 16 insertions(+), 11 deletions(-)

diff --git a/src/rc_gpfs/policy/hive.py b/src/rc_gpfs/policy/hive.py
index e3abc51..e650cfc 100644
--- a/src/rc_gpfs/policy/hive.py
+++ b/src/rc_gpfs/policy/hive.py
@@ -50,28 +50,33 @@ def collect_hive_df(parquet_path: str | Path,  acq: str, tld: str | List[str] |
 
 def _remove_clobbered_cells(df: pl.DataFrame, hive_path: Path):
     hive_cells = (
-        df.select("tld", "acq")
+        df.lazy()
+        .select("tld", "acq")
         .unique()
         .with_columns(
             pl.struct("tld", "acq")
             .map_elements(
-                lambda x: str(hive_path.joinpath(f"tld={x['tld']}", f"acq={x['acq']}")),
-                return_dtype=pl.String,
+                lambda x: len(
+                    list(
+                        hive_path.joinpath(f"tld={x['tld']}", f"acq={x['acq']}").glob(
+                            "*.parquet"
+                        )
+                    )
+                ) > 0,
+                return_dtype=pl.Boolean,
             )
-            .alias("hive_cell")
+            .alias("exists")
         )
+        .collect(engine="streaming")
     )
 
-    clobbered = []
-    for s in hive_cells["hive_cell"].to_list():
-        pqs = list(Path(s).glob("*.parquet"))
-        if len(pqs) > 0:
-            clobbered.append(s)
+    filter_df = hive_cells.filter(pl.col("exists").not_()).select("tld")
 
     no_clobber = (
         df
-        .join(hive_cells, how="left", on=["tld", "acq"])
-        .filter(pl.col("hive_cell").is_in(clobbered).not_())
+        .lazy()
+        .filter(pl.col('tld').is_in(filter_df['tld']))
+        .collect(engine='streaming')
     )
 
     return no_clobber
-- 
GitLab


From bf5b505445bdcf93343d0c3af4c5b47a94c2bcce Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Tue, 29 Apr 2025 09:43:56 -0500
Subject: [PATCH 05/11] add missing return statement

---
 src/rc_gpfs/cli/convert_flat_to_hive.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py
index b1c3cf8..77f88d0 100644
--- a/src/rc_gpfs/cli/convert_flat_to_hive.py
+++ b/src/rc_gpfs/cli/convert_flat_to_hive.py
@@ -113,7 +113,8 @@ def convert_flat_to_hive():
         args['tld'] = [t for t in tlds if t not in existing_tlds]
 
         if len(args['tld']) == 0:
-            print("INFO: All tlds already exist, and no-clobber is set. Exiting with converting")
+            print("INFO: All tlds already exist, and no-clobber is set. Exiting without converting")
+            return
     
     if args['batch']:
         tld_file = args['parquet_path'].parent.joinpath('misc','tld.txt')
-- 
GitLab


From e086a3971d057e763bc6c63568f8d6287b52b2a3 Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Tue, 29 Apr 2025 09:46:09 -0500
Subject: [PATCH 06/11] add no clobber in collect_hive_df to add a filter step
 to the lazyframe instead of the full dataframe

---
 src/rc_gpfs/policy/hive.py | 71 +++++++++++++++-----------------------
 1 file changed, 28 insertions(+), 43 deletions(-)

diff --git a/src/rc_gpfs/policy/hive.py b/src/rc_gpfs/policy/hive.py
index e650cfc..897dc03 100644
--- a/src/rc_gpfs/policy/hive.py
+++ b/src/rc_gpfs/policy/hive.py
@@ -16,14 +16,20 @@ from ..utils import (
     calculate_age_distribution
 )
 
-def collect_hive_df(parquet_path: str | Path,  acq: str, tld: str | List[str] | None = None):
+def collect_hive_df(
+        parquet_path: str | Path,
+        acq: str, 
+        tld: str | List[str] | None = None,
+        hive_path: str | Path | None = None,
+        no_clobber: bool = False
+) -> pl.DataFrame:
     if not isinstance(tld,list) and tld is not None:
         tld = [tld]
     
     print("Collecting dataframe",flush=True)
-    
+        
     df = (
-        pl.scan_parquet(parquet_path, parallel="prefiltered")
+        pl.scan_parquet(parquet_path, parallel="prefiltered", rechunk=True)
         .select(
             [
                 "size",
@@ -43,43 +49,21 @@ def collect_hive_df(parquet_path: str | Path,  acq: str, tld: str | List[str] |
     if tld is not None:
         df = df.filter(pl.col("tld").is_in(tld))
 
-    df = df.collect(engine='streaming')
-
-    print("Finished collecting queries",flush=True)
-    return df
+    if no_clobber:
+        existing_tlds = _get_existing_hive_cells(hive_path,acq)
+        df = df.filter(pl.col('tld').is_in(existing_tlds).not_())
 
-def _remove_clobbered_cells(df: pl.DataFrame, hive_path: Path):
-    hive_cells = (
-        df.lazy()
-        .select("tld", "acq")
-        .unique()
-        .with_columns(
-            pl.struct("tld", "acq")
-            .map_elements(
-                lambda x: len(
-                    list(
-                        hive_path.joinpath(f"tld={x['tld']}", f"acq={x['acq']}").glob(
-                            "*.parquet"
-                        )
-                    )
-                ) > 0,
-                return_dtype=pl.Boolean,
-            )
-            .alias("exists")
-        )
-        .collect(engine="streaming")
-    )
+    df = df.collect(engine='streaming')
 
-    filter_df = hive_cells.filter(pl.col("exists").not_()).select("tld")
+    print("Finished collecting queries", flush=True)
 
-    no_clobber = (
-        df
-        .lazy()
-        .filter(pl.col('tld').is_in(filter_df['tld']))
-        .collect(engine='streaming')
-    )
+    return df
 
-    return no_clobber
+def _get_existing_hive_cells(hive_path: str | Path, acq: str):
+    hive_path = as_path(hive_path)
+    existing_pq = hive_path.rglob(f"*/acq={acq}/*.parquet")
+    existing_tlds = list(set([p.parent.parent.name.removeprefix("tld=") for p in existing_pq]))
+    return existing_tlds
 
 def hivize(
         parquet_path: str | Path,
@@ -117,14 +101,15 @@ def hivize(
     acq = re.search(r'[\d]{4}-[\d]{2}-[\d]{2}',str(parquet_path)).group(0)
     print(f"DEBUG: Acquisition date is {acq}",flush=True)
 
-    df = collect_hive_df(parquet_path,acq,tld)
+    df = collect_hive_df(parquet_path, acq, tld, hive_path, no_clobber)
 
-    if no_clobber:
-        df = _remove_clobbered_cells(df, hive_path)
-        if df.is_empty():
-            print("INFO: All passed tlds already have parquet files in their hive cell directories. Cleaning temp directories and exiting",flush=True)
-            shutil.rmtree(staging_path)
-            return
+    if df.is_empty():
+        print(
+            "INFO: All passed tlds already have parquet files in their hive cell directories. Aborting",
+            flush=True,
+        )
+        shutil.rmtree(staging_path)
+        return
 
     print('Writing to hive')
     df.write_parquet(
-- 
GitLab


From 25319fcb7684424dc550364105c4c7ce4df3240a Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Tue, 29 Apr 2025 09:47:17 -0500
Subject: [PATCH 07/11] change to lazyframe processing to better manage memory
 usage

---
 src/rc_gpfs/policy/hive.py | 65 +++++++++++++++++++-------------------
 1 file changed, 33 insertions(+), 32 deletions(-)

diff --git a/src/rc_gpfs/policy/hive.py b/src/rc_gpfs/policy/hive.py
index 897dc03..db840bc 100644
--- a/src/rc_gpfs/policy/hive.py
+++ b/src/rc_gpfs/policy/hive.py
@@ -121,7 +121,11 @@ def hivize(
     print("Finished writing hive dataset",flush=True)
 
     if write_metadata:
-        for grp in df['tld'].unique():
+        tlds = df["tld"].unique().to_list()
+        df = df.drop(
+            [c for c in df.columns if c not in ["tld", "size", "modify", "access"]]
+        ).lazy()
+        for grp in tlds:
             tdf = df.filter(pl.col('tld').eq(grp))
             output_dir=staging_path.joinpath(f"tld={grp}",f"acq={acq}")
             write_dataset_metadata(tdf,output_dir,acq)
@@ -130,49 +134,46 @@ def hivize(
     shutil.rmtree(staging_path)
 
 def write_dataset_metadata(
-        df: pl.DataFrame, 
+        df: pl.DataFrame | pl.LazyFrame, 
         parquet_path: Path,
         acq: str,
         **kwargs
 ) -> dict:
-    size_df = df[['size']]
-    size_df = size_df.with_columns(calculate_size_distribution(size_df['size'],**kwargs).alias('grp'))
-        
+    df = df.lazy()
+
     size_dist = (
-        size_df
-        .group_by('grp')
-        .agg([
-            pl.sum('size').alias('bytes'),
-            pl.count('size').alias('file_count')
-        ])
+        df.select("size")
+        .with_columns(
+            calculate_size_distribution(pl.col("size"), **kwargs).alias("grp")
+        )
+        .group_by("grp")
+        .agg([pl.sum("size").alias("bytes"), pl.count("size").alias("file_count")])
+        .sort("grp", descending=True)
+        .collect(engine="streaming")
         .to_dicts()
     )
 
-    access_df = df[['access','size']]
-    access_df = access_df.with_columns(calculate_age_distribution(access_df['access'],acq,**kwargs).alias('grp'))
-
     access_dist = (
-        access_df
-        .group_by('grp')
-        .agg([
-            pl.sum('size').alias('bytes'),
-            pl.count('size').alias('file_count')
-        ])
-        .sort('grp',descending=True)
+        df.select("access", "size")
+        .with_columns(
+            calculate_age_distribution(pl.col('access'), acq, **kwargs).alias("grp")
+        )
+        .group_by("grp")
+        .agg([pl.sum("size").alias("bytes"), pl.count("size").alias("file_count")])
+        .sort("grp", descending=True)
+        .collect(engine='streaming')
         .to_dicts()
     )
- 
-    modify_df = df[['modify','size']]
-    modify_df = modify_df.with_columns(calculate_age_distribution(modify_df['modify'],acq,**kwargs).alias('grp'))
-
+    
     modify_dist = (
-        modify_df
-        .group_by('grp')
-        .agg([
-            pl.sum('size').alias('bytes'),
-            pl.count('size').alias('file_count')
-        ])
-        .sort('grp',descending=True)
+        df.select("modify", "size")
+        .with_columns(
+            calculate_age_distribution(pl.col("modify"), acq, **kwargs).alias("grp")
+        )
+        .group_by("grp")
+        .agg([pl.sum("size").alias("bytes"), pl.count("size").alias("file_count")])
+        .sort("grp", descending=True)
+        .collect(engine="streaming")
         .to_dicts()
     )
 
-- 
GitLab


From 5ee09d9c3c905b8ce9930e3322def53c7d5f04bf Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Tue, 29 Apr 2025 16:02:49 -0500
Subject: [PATCH 08/11] Add functionality to group tlds based on estimated
 dataset size in the gpfs logs. Grouping allows us to specify different memory
 requests across each group to improve throughput while still mitigating OOM
 errors

---
 src/rc_gpfs/cli/convert_flat_to_hive.py | 95 +++++++++++++++++++++----
 src/rc_gpfs/cli/utils.py                | 33 +++++++++
 src/rc_gpfs/utils/core.py               | 13 +++-
 3 files changed, 126 insertions(+), 15 deletions(-)

diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py
index 77f88d0..6fbfb0b 100644
--- a/src/rc_gpfs/cli/convert_flat_to_hive.py
+++ b/src/rc_gpfs/cli/convert_flat_to_hive.py
@@ -7,12 +7,13 @@ from pathlib import Path
 import polars as pl
 
 from ..policy.hive import hivize
-from .utils import define_python_interpreter,batch_parser,setup_slurm_logs
+from .utils import define_python_interpreter,batch_parser_no_mem,setup_slurm_logs
+from ..utils import get_parquet_dataset_size
 
 DESCRIPTION = """
 Converts flat parquet GPFS datasets to a hive format partitioned by tld and log acquisition date. This creates a timeseries of structured datasets for each tld for much easier more efficient log comparisons within tld. Output parquets are sorted by the path column for convenience, but no index is set.
 
-Setting the --batch flag will create a Slurm array job where each task processes one tld, either from the passed parameter or from the unique values in the parquet dataset.
+Setting the --batch flag will create a Slurm array job where each task processes one tld, either from the passed parameter or from the unique values in the parquet dataset. Each tld's in-memory size will be estimated based on header information from the parquet dataset and how many records a given tld has. These estimated sizes are used to group tlds, and each group will be submitted as a separate array job with different requested memory. This helps maximize throughput while also mitigating OOM errors.
 
 All processing is done via Polars and so can take advantage of parallel processing. Higher core counts can provide a limited benefit to performance, and a GPU is not required.
 """
@@ -21,7 +22,7 @@ def parse_args():
     parser = argparse.ArgumentParser(
         description=DESCRIPTION,
         formatter_class=argparse.RawTextHelpFormatter,
-        parents=[batch_parser(cpus_per_task=16, gpus=0, partition='amd-hdr100', mem='32G',time='02:00:00')]
+        parents=[batch_parser_no_mem(cpus_per_task=16, gpus=0, partition='amd-hdr100',time='02:00:00')]
     )
     parser.add_argument('parquet_path',
                         type=Path,
@@ -36,8 +37,12 @@ def parse_args():
     parser.add_argument('--partition-size',
                         dest='partition_chunk_size_bytes',
                         type=str,
-                        default='100MiB',
+                        default='200MiB',
                         help='Max size of in-memory data for each partition in a single hive dataset. Smaller partitions cause more files to be written. Can pass the byte size as an integer or as a human-readable byte string. For example, 1024 and 1KiB are equivalent.')
+    parser.add_argument('--mem-factor',
+                        type=int,
+                        default=3,
+                        help="Factor to scale each tld's estimated memory by when setting memory requirements in the Slurm job. A factor of 3 accounts for almost all cases. A lower factor can result in more OOM errors but would increase task throughput, and vice versa for a higher factor.")
     parser.add_argument('--no-clobber',
                         default=False,
                         action='store_true',
@@ -47,7 +52,7 @@ def parse_args():
     return vars(args)
 
 SLURM_OPTS = """\
-#SBATCH --job-name=hivize
+#SBATCH --job-name=hivize-{mem}
 #SBATCH --ntasks={ntasks}
 #SBATCH --cpus-per-task={cpus_per_task}
 #SBATCH --partition={partition}
@@ -88,8 +93,66 @@ def submit_batch(**kwargs):
     subprocess.run(['sbatch'],input=script,shell=True,text=True)
     pass
 
-def convert_flat_to_hive():
-    args = parse_args()
+def submit_tld_groups(df,grp_dir,args):
+    mem_grp = df[0,'req_mem_grp']
+    
+    tld_file = grp_dir.joinpath(f"tld-{mem_grp}.txt")
+    
+    tlds = df["tld"].to_list()
+
+    with open(tld_file,'wt') as f:
+        f.writelines('\n'.join(tlds))
+    
+    args['ntlds'] = len(tlds)
+    args['tld_file'] = tld_file
+    args['mem'] = mem_grp
+
+    submit_batch(**args)
+    pass
+
+def get_tld_row_counts(parquet_path: Path) -> pl.DataFrame:
+    
+    tld_rows = (
+        pl.scan_parquet(parquet_path, parallel="prefiltered")
+        .group_by("tld")
+        .agg(pl.col("tld").count().alias("file_count"))
+        .collect(engine="streaming")
+    )
+    return tld_rows
+
+def estimate_req_mem(
+    parquet_path: Path, 
+    tld: List[str] | pl.Series, 
+    mem_factor: int = 3
+) -> pl.DataFrame:
+    mem_breaks = [8, 16, 32, 64]
+    mem_labels = ["8G", "16G", "32G", "64G", "128G"]
+
+    dataset_size = get_parquet_dataset_size(parquet_path) / (1024**3)
+
+    tld_sizes = get_tld_row_counts(parquet_path)
+
+    tld_sizes = (
+        tld_sizes.with_columns(
+            (pl.col("file_count") / pl.col("file_count").sum() * dataset_size).alias(
+                "est_size"
+            )
+        )
+        .with_columns((pl.col("est_size") * mem_factor).alias("est_req_mem"))
+        .with_columns(
+            (
+                pl.col("est_req_mem")
+                .cut(breaks=mem_breaks, labels=mem_labels)
+                .alias("req_mem_grp")
+            )
+        )
+        .filter(pl.col("tld").is_in(tld))
+    )
+
+    return tld_sizes
+
+def convert_flat_to_hive(cli_args=None):
+    args = parse_args(cli_args)
 
     if args['tld'] is None:
         tlds = (
@@ -117,16 +180,20 @@ def convert_flat_to_hive():
             return
     
     if args['batch']:
-        tld_file = args['parquet_path'].parent.joinpath('misc','tld.txt')
-        tld_file.parent.mkdir(parents=True,exist_ok=True)
+        req_mem = estimate_req_mem(args['parquet_path'],args['tld'],args['mem_factor'])
         
-        with open(tld_file,'wt') as f:
-            f.writelines('\n'.join(tlds))
+        grp_dir = args["parquet_path"].parent.joinpath("misc")
+        grp_dir.mkdir(parents=True,exist_ok=True)
+        [f.unlink() for f in grp_dir.glob('tld*.txt')]
         
-        args['ntlds'] = len(tlds)
-        args['tld_file'] = tld_file
+        req_mem_file = grp_dir.joinpath('tld_est_mem.parquet')
+        req_mem_file.unlink(missing_ok=True)
+
+        req_mem.write_parquet(req_mem_file)
 
-        submit_batch(**args)
+        for grp, df in req_mem.group_by('req_mem_grp'):
+            print(f"INFO: Submitting array job for {grp[0]}",flush=True)
+            submit_tld_groups(df,grp_dir,args)
     
     else:
         _ = args.pop('tld')
diff --git a/src/rc_gpfs/cli/utils.py b/src/rc_gpfs/cli/utils.py
index 18cb8fb..e879259 100644
--- a/src/rc_gpfs/cli/utils.py
+++ b/src/rc_gpfs/cli/utils.py
@@ -27,6 +27,39 @@ class CustomHelpFormatter(argparse.MetavarTypeHelpFormatter):
         actions = sorted(actions, key=lambda x: x.container.title if x.container.title else '')
         super(CustomHelpFormatter, self).add_arguments(actions)
 
+def batch_parser_no_mem(
+    cpus_per_task: int | None = None,
+    gpus: int | None = None,
+    partition: str | None = None,
+    time: str | None = "12:00:00",
+    reservation: str | None = None,
+    slurm_log_dir: str | Path | None = "./out",
+    **kwargs,
+) -> argparse.ArgumentParser:
+    parser = argparse.ArgumentParser(
+        add_help=False, formatter_class=CustomHelpFormatter
+    )
+    slurm = parser.add_argument_group(title="Slurm Options")
+    slurm.add_argument(
+        "--batch",
+        action="store_true",
+        default=False,
+        help="Convert as a batch array job.",
+    )
+    slurm.add_argument("-n", "--ntasks", type=int, default=1)
+    slurm.add_argument("-c", "--cpus-per-task", type=int, default=cpus_per_task)
+    slurm.add_argument("-g", "--gpus", type=int, default=gpus, choices=[0, 1])
+    slurm.add_argument("-p", "--partition", type=str, default=partition)
+    slurm.add_argument("-t", "--time", type=str, default=time)
+    slurm.add_argument("--reservation", type=str, default=reservation)
+    slurm.add_argument(
+        "--slurm-log-dir",
+        type=Path,
+        default=slurm_log_dir,
+        help="Output log directory. If the directory does not exist, it will be created automatically.",
+    )
+    return parser
+
 def batch_parser(
         cpus_per_task: int | None = None,
         gpus: int | None = None, 
diff --git a/src/rc_gpfs/utils/core.py b/src/rc_gpfs/utils/core.py
index ff8fa4c..1409d4f 100644
--- a/src/rc_gpfs/utils/core.py
+++ b/src/rc_gpfs/utils/core.py
@@ -4,6 +4,7 @@ import subprocess
 from pathlib import Path
 from typing import List, Literal, Tuple
 import polars as pl
+import pyarrow.parquet as pq
 import numpy as np
 
 from .units import as_bytes, convert_si, create_size_bin_labels
@@ -103,4 +104,14 @@ def calculate_age_distribution(
         .cast(pl.String)
         .cast(pl.Enum(age_labels))
     )
-    return age_grps
\ No newline at end of file
+    return age_grps
+
+def get_parquet_dataset_size(parquet_path):
+    tot_size = 0
+
+    for p in parquet_path.glob("*.parquet"):
+        md = pq.read_metadata(p)
+        for rg in range(0, md.num_row_groups):
+            tot_size += md.row_group(rg).total_byte_size
+    
+    return tot_size
\ No newline at end of file
-- 
GitLab


From f84873ecc75173d15895a731fa93215b1ff8b934 Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Tue, 29 Apr 2025 16:03:31 -0500
Subject: [PATCH 09/11] add cli_args parameter for testing in Jupyter

---
 src/rc_gpfs/cli/convert_flat_to_hive.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py
index 6fbfb0b..07bb5f3 100644
--- a/src/rc_gpfs/cli/convert_flat_to_hive.py
+++ b/src/rc_gpfs/cli/convert_flat_to_hive.py
@@ -18,7 +18,7 @@ Setting the --batch flag will create a Slurm array job where each task processes
 All processing is done via Polars and so can take advantage of parallel processing. Higher core counts can provide a limited benefit to performance, and a GPU is not required.
 """
 
-def parse_args():
+def parse_args(cli_args=None):
     parser = argparse.ArgumentParser(
         description=DESCRIPTION,
         formatter_class=argparse.RawTextHelpFormatter,
@@ -48,7 +48,7 @@ def parse_args():
                         action='store_true',
                         help="Flag to set whether contents of a hive cell will be overwritten. If True, the pipeline will exit if any parquet files are found in the cell directory. No processing will occur in that case. If False (default), any files existing in the cell directory will be removed prior to data writing.")
 
-    args = parser.parse_args()
+    args = parser.parse_args(cli_args)
     return vars(args)
 
 SLURM_OPTS = """\
-- 
GitLab


From 2147abf9a99be42f1290557335693a5d6163d525 Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Tue, 29 Apr 2025 16:06:14 -0500
Subject: [PATCH 10/11] add dry-run option and add typing import

---
 src/rc_gpfs/cli/convert_flat_to_hive.py | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py
index 07bb5f3..2c08352 100644
--- a/src/rc_gpfs/cli/convert_flat_to_hive.py
+++ b/src/rc_gpfs/cli/convert_flat_to_hive.py
@@ -5,6 +5,7 @@ import time
 import random
 from pathlib import Path
 import polars as pl
+from typing import List
 
 from ..policy.hive import hivize
 from .utils import define_python_interpreter,batch_parser_no_mem,setup_slurm_logs
@@ -47,6 +48,10 @@ def parse_args(cli_args=None):
                         default=False,
                         action='store_true',
                         help="Flag to set whether contents of a hive cell will be overwritten. If True, the pipeline will exit if any parquet files are found in the cell directory. No processing will occur in that case. If False (default), any files existing in the cell directory will be removed prior to data writing.")
+    parser.add_argument('--dry-run',
+                        default=False,
+                        action='store_true',
+                        help="If set, no jobs will be submitted at the end. Tld grouping still occurs, and the text files with the groups are still written.")
 
     args = parser.parse_args(cli_args)
     return vars(args)
@@ -84,13 +89,14 @@ def submit_batch(**kwargs):
 
     script = f"#!/bin/bash\n#\n{slurm_opts}\n{BATCH_CMDS.format(**kwargs)}"
 
-    # Wait between 1 and 5 seconds before batch submission. This helps avoid a situation where this setup is running in
-    # a batch array job and all of the array tasks submit their child array jobs at the same time. That results in jobs
-    # failing to be submitted due to overwhelming the scheduler with simultaneous requests. Adding a random delay should
-    # fix that
-    time.sleep(random.uniform(1,5))
+    if not kwargs['dry_run']:
+        # Wait between 1 and 5 seconds before batch submission. This helps avoid a situation where this setup is running
+        # in a batch array job and all of the array tasks submit their child array jobs at the same time. That results
+        # in jobs failing to be submitted due to overwhelming the scheduler with simultaneous requests. Adding a random
+        # delay should fix that
+        time.sleep(random.uniform(1,5))
 
-    subprocess.run(['sbatch'],input=script,shell=True,text=True)
+        subprocess.run(['sbatch'],input=script,shell=True,text=True)
     pass
 
 def submit_tld_groups(df,grp_dir,args):
-- 
GitLab


From 1d5b37f75c0ed76f78c881cbb82f61507f4a47d4 Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Tue, 29 Apr 2025 16:07:43 -0500
Subject: [PATCH 11/11] Lower the default CPU value, it doesn't seem to
 drastically impact performance

---
 src/rc_gpfs/cli/convert_flat_to_hive.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py
index 2c08352..0fe6463 100644
--- a/src/rc_gpfs/cli/convert_flat_to_hive.py
+++ b/src/rc_gpfs/cli/convert_flat_to_hive.py
@@ -23,7 +23,7 @@ def parse_args(cli_args=None):
     parser = argparse.ArgumentParser(
         description=DESCRIPTION,
         formatter_class=argparse.RawTextHelpFormatter,
-        parents=[batch_parser_no_mem(cpus_per_task=16, gpus=0, partition='amd-hdr100',time='02:00:00')]
+        parents=[batch_parser_no_mem(cpus_per_task=8, gpus=0, partition='amd-hdr100',time='02:00:00')]
     )
     parser.add_argument('parquet_path',
                         type=Path,
-- 
GitLab