From 796a25f70d0b2e013ce36d170431cfa7317261ec Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Fri, 2 May 2025 17:31:26 -0500
Subject: [PATCH 1/2] added tld grouping back to the batch processing pipeline.
 This removes the need to submit multiple batch jobs for each hive pipeline
 and maximizes resource use per array task

---
 src/rc_gpfs/cli/convert_flat_to_hive.py | 94 ++++++++++++++-----------
 1 file changed, 51 insertions(+), 43 deletions(-)

diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py
index 678646e..9667e2b 100644
--- a/src/rc_gpfs/cli/convert_flat_to_hive.py
+++ b/src/rc_gpfs/cli/convert_flat_to_hive.py
@@ -4,6 +4,7 @@ import re
 import time
 import random
 from pathlib import Path
+import numpy as np
 import polars as pl
 from typing import List
 
@@ -28,7 +29,6 @@ def parse_args(cli_args=None):
     parser.add_argument('parquet_path',
                         type=Path,
                         help="Path to a directory containing a flat parquet dataset.")
-    
     parser.add_argument('hive_path',
                         type=Path,
                         help="Parent directory for the hive. This can be either a new directory or an existing hive directory. If this is an existing hive dataset, new data will be appended to the old data. This will not alter any existing data provided the tld or date of acquisition differ from the existing hive data.")
@@ -57,7 +57,7 @@ def parse_args(cli_args=None):
     return vars(args)
 
 SLURM_OPTS = """\
-#SBATCH --job-name=hivize-{mem}
+#SBATCH --job-name=hivize
 #SBATCH --ntasks={ntasks}
 #SBATCH --cpus-per-task={cpus_per_task}
 #SBATCH --partition={partition}
@@ -65,15 +65,15 @@ SLURM_OPTS = """\
 #SBATCH --mem={mem}
 #SBATCH --output={output_log}
 #SBATCH --error={error_log}
-#SBATCH --array=1-{ntlds}
+#SBATCH --array=0-{ngrps}
 """
 
 BATCH_CMDS = """\
 {env_cmd}
 
-tld=$(sed -n "${{SLURM_ARRAY_TASK_ID}}p" {tld_file})
+tld=$(cat {grp_dir}/tld-${{SLURM_ARRAY_TASK_ID}}.txt | paste -s -d ',')
 
-convert-to-hive --tld ${{tld}} --partition-size={partition_chunk_size_bytes} {parquet_path} {hive_path}
+convert-to-hive --tld=${{tld}} --partition-size={partition_chunk_size_bytes} {parquet_path} {hive_path}
 """
 
 def submit_batch(**kwargs):
@@ -100,24 +100,31 @@ def submit_batch(**kwargs):
             subprocess.run(['sbatch'],input=script,shell=True,text=True,check=True,capture_output=True)
         except subprocess.CalledProcessError as e:
             raise subprocess.CalledProcessError(e.stderr.decode())
+    else:
+        print(script)
     pass
 
-def submit_tld_groups(df,grp_dir,args):
-    mem_grp = df[0,'req_mem_grp']
-    
-    tld_file = grp_dir.joinpath(f"tld-{mem_grp}.txt")
-    
-    tlds = df["tld"].to_list()
+def _calc_closest_pwr(val, base=2) -> int:
+    exp = np.max([np.ceil(np.emath.logn(n=base, x=val)), 0])
+    return int(base**exp)
 
-    with open(tld_file,'wt') as f:
-        f.writelines('\n'.join(tlds))
-    
-    args['ntlds'] = len(tlds)
-    args['tld_file'] = tld_file
-    args['mem'] = mem_grp
+def _group_by_cumsum(series: pl.Series, cutoff: float) -> pl.Series:
+    # Compute cumulative sum
+    cumsum = series.cum_sum()
 
-    submit_batch(**args)
-    pass
+    # Determine group boundaries where cumulative sum exceeds cutoff
+    group = (cumsum / cutoff).floor().cast(pl.Int32)
+
+    # Reset group counter when cumulative sum crosses cutoff
+    # This ensures each group's sum is <= cutoff
+    group_diff = group.diff().fill_null(0)
+    group_counter = group_diff.cum_sum()
+
+    return group_counter
+
+def group_by_required_mem(ser: pl.Series, mem_cutoff: int) -> pl.Series:
+    grps = _group_by_cumsum(ser, mem_cutoff).cast(pl.Int32)
+    return grps
 
 def get_tld_row_counts(parquet_path: Path) -> pl.DataFrame:
     
@@ -134,9 +141,6 @@ def estimate_req_mem(
     tld: List[str] | pl.Series, 
     mem_factor: int = 3
 ) -> pl.DataFrame:
-    mem_breaks = [8, 16, 32, 64]
-    mem_labels = ["8G", "16G", "32G", "64G", "128G"]
-
     dataset_size = get_parquet_dataset_size(parquet_path) / (1024**3)
 
     tld_sizes = get_tld_row_counts(parquet_path)
@@ -148,14 +152,8 @@ def estimate_req_mem(
             )
         )
         .with_columns((pl.col("est_size") * mem_factor).alias("est_req_mem"))
-        .with_columns(
-            (
-                pl.col("est_req_mem")
-                .cut(breaks=mem_breaks, labels=mem_labels)
-                .alias("req_mem_grp")
-            )
-        )
         .filter(pl.col("tld").is_in(tld))
+        .sort('est_req_mem')
     )
 
     return tld_sizes
@@ -164,7 +162,7 @@ def convert_flat_to_hive(cli_args=None):
     args = parse_args(cli_args)
 
     if args['tld'] is None:
-        tlds = (
+        args['tld'] = (
             pl.scan_parquet(args['parquet_path'],cache=False)
             .select('tld')
             .unique()
@@ -174,7 +172,7 @@ def convert_flat_to_hive(cli_args=None):
             .to_list()
         )
     else:
-        tlds = args['tld'].split(',')
+        args['tld'] = args['tld'].split(',')
 
     if args['no_clobber']:
         acq = re.search(r"\d{4}-\d{2}-\d{2}", str(args['parquet_path'].absolute())).group(0)
@@ -182,30 +180,40 @@ def convert_flat_to_hive(cli_args=None):
         existing_acq_paths = [p for p in args['hive_path'].rglob(f'*/acq={acq}') if len(list(p.glob('*.parquet'))) > 0]
         existing_tlds = [re.search(r"tld=([^/]+)/", str(p)).group(1) for p in existing_acq_paths]
 
-        args['tld'] = [t for t in tlds if t not in existing_tlds]
+        args['tld'] = [t for t in args['tld'] if t not in existing_tlds]
 
         if len(args['tld']) == 0:
             print("INFO: All tlds already exist, and no-clobber is set. Exiting without converting")
             return
     
     if args['batch']:
-        req_mem = estimate_req_mem(args['parquet_path'],args['tld'],args['mem_factor'])
+        req_mem = estimate_req_mem(args['parquet_path'], args['tld'], args['mem_factor'])
+
+        # Group sizes will be set either to the next largest power of 2 above the max estimated memory required for the
+        # largest tld or 16, whichever is larger
+        mem = max([_calc_closest_pwr(req_mem['est_req_mem'].max()),16])
+        args['mem'] = f"{mem}G"
+
+        req_mem = req_mem.with_columns(group_by_required_mem(pl.col('est_req_mem'), mem).alias('grp'))
+
+        args['ngrps'] = req_mem['grp'].n_unique() - 1
         
-        grp_dir = args["parquet_path"].parent.joinpath("misc")
-        grp_dir.mkdir(parents=True,exist_ok=True)
-        [f.unlink() for f in grp_dir.glob('tld*.txt')]
+        args['grp_dir'] = args["parquet_path"].parent.joinpath("misc")
+        args['grp_dir'].mkdir(parents=True,exist_ok=True)
+        _ = [f.unlink() for f in args['grp_dir'].glob('tld*.txt')]
+
+        for grp in req_mem['grp'].unique().to_list():
+            grp_tlds = req_mem.filter(pl.col('grp').eq(grp))['tld'].to_list()
+            with open(args['grp_dir'].joinpath(f"tld-{grp}.txt"),'w') as f:
+                f.write('\n'.join(grp_tlds))
         
-        req_mem_file = grp_dir.joinpath('tld_est_mem.parquet')
+        req_mem_file = args['grp_dir'].joinpath('tld_est_mem.parquet')
         req_mem_file.unlink(missing_ok=True)
 
         req_mem.write_parquet(req_mem_file)
 
-        for grp, df in req_mem.group_by('req_mem_grp'):
-            print(f"INFO: Submitting array job for {grp[0]}",flush=True)
-            submit_tld_groups(df,grp_dir,args)
+        submit_batch(**args)
     
     else:
-        _ = args.pop('tld')
-        for tld in tlds:
-            hivize(**args,tld=tld)
+        hivize(**args)
     pass
\ No newline at end of file
-- 
GitLab


From 9075a10140c8cdb538e595c6aba5831c7406904e Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Fri, 2 May 2025 17:32:21 -0500
Subject: [PATCH 2/2] apply ruff formatting

---
 src/rc_gpfs/cli/convert_flat_to_hive.py | 211 +++++++++++++++---------
 1 file changed, 129 insertions(+), 82 deletions(-)

diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py
index 9667e2b..b2e17f5 100644
--- a/src/rc_gpfs/cli/convert_flat_to_hive.py
+++ b/src/rc_gpfs/cli/convert_flat_to_hive.py
@@ -9,7 +9,7 @@ import polars as pl
 from typing import List
 
 from ..policy.hive import hivize
-from .utils import define_python_interpreter,batch_parser_no_mem,setup_slurm_logs
+from .utils import define_python_interpreter, batch_parser_no_mem, setup_slurm_logs
 from ..utils import get_parquet_dataset_size
 
 DESCRIPTION = """
@@ -20,42 +20,62 @@ Setting the --batch flag will create a Slurm array job where each task processes
 All processing is done via Polars and so can take advantage of parallel processing. Higher core counts can provide a limited benefit to performance, and a GPU is not required.
 """
 
+
 def parse_args(cli_args=None):
     parser = argparse.ArgumentParser(
         description=DESCRIPTION,
         formatter_class=argparse.RawTextHelpFormatter,
-        parents=[batch_parser_no_mem(cpus_per_task=8, gpus=0, partition='amd-hdr100',time='02:00:00')]
+        parents=[
+            batch_parser_no_mem(
+                cpus_per_task=8, gpus=0, partition="amd-hdr100", time="02:00:00"
+            )
+        ],
+    )
+    parser.add_argument(
+        "parquet_path",
+        type=Path,
+        help="Path to a directory containing a flat parquet dataset.",
+    )
+    parser.add_argument(
+        "hive_path",
+        type=Path,
+        help="Parent directory for the hive. This can be either a new directory or an existing hive directory. If this is an existing hive dataset, new data will be appended to the old data. This will not alter any existing data provided the tld or date of acquisition differ from the existing hive data.",
+    )
+    parser.add_argument(
+        "--tld",
+        type=str,
+        help="Comma-separated list of tld to convert to hive. If not specified, all unique tld values in the dataset will be processed.",
+    )
+    parser.add_argument(
+        "--partition-size",
+        dest="partition_chunk_size_bytes",
+        type=str,
+        default="200MiB",
+        help="Max size of in-memory data for each partition in a single hive dataset. Smaller partitions cause more files to be written. Can pass the byte size as an integer or as a human-readable byte string. For example, 1024 and 1KiB are equivalent.",
+    )
+    parser.add_argument(
+        "--mem-factor",
+        type=int,
+        default=3,
+        help="Factor to scale each tld's estimated memory by when setting memory requirements in the Slurm job. A factor of 3 accounts for almost all cases. A lower factor can result in more OOM errors but would increase task throughput, and vice versa for a higher factor.",
+    )
+    parser.add_argument(
+        "--no-clobber",
+        default=False,
+        action="store_true",
+        help="Flag to set whether contents of a hive cell will be overwritten. If True, the pipeline will exit if any parquet files are found in the cell directory. No processing will occur in that case. If False (default), any files existing in the cell directory will be removed prior to data writing.",
+    )
+    parser.add_argument(
+        "--dry-run",
+        default=False,
+        action="store_true",
+        help="If set, no jobs will be submitted at the end. Tld grouping still occurs, and the text files with the groups are still written.",
     )
-    parser.add_argument('parquet_path',
-                        type=Path,
-                        help="Path to a directory containing a flat parquet dataset.")
-    parser.add_argument('hive_path',
-                        type=Path,
-                        help="Parent directory for the hive. This can be either a new directory or an existing hive directory. If this is an existing hive dataset, new data will be appended to the old data. This will not alter any existing data provided the tld or date of acquisition differ from the existing hive data.")
-    parser.add_argument('--tld',
-                        type=str,
-                        help='Comma-separated list of tld to convert to hive. If not specified, all unique tld values in the dataset will be processed.')
-    parser.add_argument('--partition-size',
-                        dest='partition_chunk_size_bytes',
-                        type=str,
-                        default='200MiB',
-                        help='Max size of in-memory data for each partition in a single hive dataset. Smaller partitions cause more files to be written. Can pass the byte size as an integer or as a human-readable byte string. For example, 1024 and 1KiB are equivalent.')
-    parser.add_argument('--mem-factor',
-                        type=int,
-                        default=3,
-                        help="Factor to scale each tld's estimated memory by when setting memory requirements in the Slurm job. A factor of 3 accounts for almost all cases. A lower factor can result in more OOM errors but would increase task throughput, and vice versa for a higher factor.")
-    parser.add_argument('--no-clobber',
-                        default=False,
-                        action='store_true',
-                        help="Flag to set whether contents of a hive cell will be overwritten. If True, the pipeline will exit if any parquet files are found in the cell directory. No processing will occur in that case. If False (default), any files existing in the cell directory will be removed prior to data writing.")
-    parser.add_argument('--dry-run',
-                        default=False,
-                        action='store_true',
-                        help="If set, no jobs will be submitted at the end. Tld grouping still occurs, and the text files with the groups are still written.")
 
     args = parser.parse_args(cli_args)
     return vars(args)
 
+
 SLURM_OPTS = """\
 #SBATCH --job-name=hivize
 #SBATCH --ntasks={ntasks}
@@ -76,38 +96,50 @@ tld=$(cat {grp_dir}/tld-${{SLURM_ARRAY_TASK_ID}}.txt | paste -s -d ',')
 convert-to-hive --tld=${{tld}} --partition-size={partition_chunk_size_bytes} {parquet_path} {hive_path}
 """
 
+
 def submit_batch(**kwargs):
-    env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env'))
-    kwargs.update({"env_cmd":env_cmd})
-    
-    slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'hive')
+    env_cmd = define_python_interpreter(
+        kwargs.get("python_path"), kwargs.get("conda_env")
+    )
+    kwargs.update({"env_cmd": env_cmd})
+
+    slurm_logs = setup_slurm_logs(kwargs.get("slurm_log_dir"), "hive")
     kwargs.update(slurm_logs)
-    
+
     slurm_opts = SLURM_OPTS.format(**kwargs)
-    if kwargs.get('reservation') is not None:
+    if kwargs.get("reservation") is not None:
         slurm_opts = f"{slurm_opts}#SBATCH --reservation={kwargs.get('reservation')}"
 
     script = f"#!/bin/bash\n#\n{slurm_opts}\n{BATCH_CMDS.format(**kwargs)}"
 
-    if not kwargs['dry_run']:
+    if not kwargs["dry_run"]:
         # Wait between 1 and 5 seconds before batch submission. This helps avoid a situation where this setup is running
         # in a batch array job and all of the array tasks submit their child array jobs at the same time. That results
         # in jobs failing to be submitted due to overwhelming the scheduler with simultaneous requests. Adding a random
         # delay should fix that
-        time.sleep(random.uniform(1,5))
+        time.sleep(random.uniform(1, 5))
 
         try:
-            subprocess.run(['sbatch'],input=script,shell=True,text=True,check=True,capture_output=True)
+            subprocess.run(
+                ["sbatch"],
+                input=script,
+                shell=True,
+                text=True,
+                check=True,
+                capture_output=True,
+            )
         except subprocess.CalledProcessError as e:
             raise subprocess.CalledProcessError(e.stderr.decode())
     else:
         print(script)
     pass
 
+
 def _calc_closest_pwr(val, base=2) -> int:
     exp = np.max([np.ceil(np.emath.logn(n=base, x=val)), 0])
     return int(base**exp)
 
+
 def _group_by_cumsum(series: pl.Series, cutoff: float) -> pl.Series:
     # Compute cumulative sum
     cumsum = series.cum_sum()
@@ -122,12 +154,13 @@ def _group_by_cumsum(series: pl.Series, cutoff: float) -> pl.Series:
 
     return group_counter
 
+
 def group_by_required_mem(ser: pl.Series, mem_cutoff: int) -> pl.Series:
     grps = _group_by_cumsum(ser, mem_cutoff).cast(pl.Int32)
     return grps
 
+
 def get_tld_row_counts(parquet_path: Path) -> pl.DataFrame:
-    
     tld_rows = (
         pl.scan_parquet(parquet_path, parallel="prefiltered")
         .group_by("tld")
@@ -136,10 +169,9 @@ def get_tld_row_counts(parquet_path: Path) -> pl.DataFrame:
     )
     return tld_rows
 
+
 def estimate_req_mem(
-    parquet_path: Path, 
-    tld: List[str] | pl.Series, 
-    mem_factor: int = 3
+    parquet_path: Path, tld: List[str] | pl.Series, mem_factor: int = 3
 ) -> pl.DataFrame:
     dataset_size = get_parquet_dataset_size(parquet_path) / (1024**3)
 
@@ -153,67 +185,82 @@ def estimate_req_mem(
         )
         .with_columns((pl.col("est_size") * mem_factor).alias("est_req_mem"))
         .filter(pl.col("tld").is_in(tld))
-        .sort('est_req_mem')
+        .sort("est_req_mem")
     )
 
     return tld_sizes
 
+
 def convert_flat_to_hive(cli_args=None):
     args = parse_args(cli_args)
 
-    if args['tld'] is None:
-        args['tld'] = (
-            pl.scan_parquet(args['parquet_path'],cache=False)
-            .select('tld')
+    if args["tld"] is None:
+        args["tld"] = (
+            pl.scan_parquet(args["parquet_path"], cache=False)
+            .select("tld")
             .unique()
-            .collect(engine='streaming')
-            .sort('tld')
-            .get_column('tld')
+            .collect(engine="streaming")
+            .sort("tld")
+            .get_column("tld")
             .to_list()
         )
     else:
-        args['tld'] = args['tld'].split(',')
+        args["tld"] = args["tld"].split(",")
+
+    if args["no_clobber"]:
+        acq = re.search(
+            r"\d{4}-\d{2}-\d{2}", str(args["parquet_path"].absolute())
+        ).group(0)
+
+        existing_acq_paths = [
+            p
+            for p in args["hive_path"].rglob(f"*/acq={acq}")
+            if len(list(p.glob("*.parquet"))) > 0
+        ]
+        existing_tlds = [
+            re.search(r"tld=([^/]+)/", str(p)).group(1) for p in existing_acq_paths
+        ]
+
+        args["tld"] = [t for t in args["tld"] if t not in existing_tlds]
+
+        if len(args["tld"]) == 0:
+            print(
+                "INFO: All tlds already exist, and no-clobber is set. Exiting without converting"
+            )
+            return
 
-    if args['no_clobber']:
-        acq = re.search(r"\d{4}-\d{2}-\d{2}", str(args['parquet_path'].absolute())).group(0)
+    if args["batch"]:
+        req_mem = estimate_req_mem(
+            args["parquet_path"], args["tld"], args["mem_factor"]
+        )
 
-        existing_acq_paths = [p for p in args['hive_path'].rglob(f'*/acq={acq}') if len(list(p.glob('*.parquet'))) > 0]
-        existing_tlds = [re.search(r"tld=([^/]+)/", str(p)).group(1) for p in existing_acq_paths]
+        # Group sizes will be set either to the next largest power of 2 above the max estimated memory required for the
+        # largest tld or 16, whichever is larger
+        mem = max([_calc_closest_pwr(req_mem["est_req_mem"].max()), 16])
+        args["mem"] = f"{mem}G"
 
-        args['tld'] = [t for t in args['tld'] if t not in existing_tlds]
+        req_mem = req_mem.with_columns(
+            group_by_required_mem(pl.col("est_req_mem"), mem).alias("grp")
+        )
 
-        if len(args['tld']) == 0:
-            print("INFO: All tlds already exist, and no-clobber is set. Exiting without converting")
-            return
-    
-    if args['batch']:
-        req_mem = estimate_req_mem(args['parquet_path'], args['tld'], args['mem_factor'])
+        args["ngrps"] = req_mem["grp"].n_unique() - 1
 
-        # Group sizes will be set either to the next largest power of 2 above the max estimated memory required for the
-        # largest tld or 16, whichever is larger
-        mem = max([_calc_closest_pwr(req_mem['est_req_mem'].max()),16])
-        args['mem'] = f"{mem}G"
-
-        req_mem = req_mem.with_columns(group_by_required_mem(pl.col('est_req_mem'), mem).alias('grp'))
-
-        args['ngrps'] = req_mem['grp'].n_unique() - 1
-        
-        args['grp_dir'] = args["parquet_path"].parent.joinpath("misc")
-        args['grp_dir'].mkdir(parents=True,exist_ok=True)
-        _ = [f.unlink() for f in args['grp_dir'].glob('tld*.txt')]
-
-        for grp in req_mem['grp'].unique().to_list():
-            grp_tlds = req_mem.filter(pl.col('grp').eq(grp))['tld'].to_list()
-            with open(args['grp_dir'].joinpath(f"tld-{grp}.txt"),'w') as f:
-                f.write('\n'.join(grp_tlds))
-        
-        req_mem_file = args['grp_dir'].joinpath('tld_est_mem.parquet')
+        args["grp_dir"] = args["parquet_path"].parent.joinpath("misc")
+        args["grp_dir"].mkdir(parents=True, exist_ok=True)
+        _ = [f.unlink() for f in args["grp_dir"].glob("tld*.txt")]
+
+        for grp in req_mem["grp"].unique().to_list():
+            grp_tlds = req_mem.filter(pl.col("grp").eq(grp))["tld"].to_list()
+            with open(args["grp_dir"].joinpath(f"tld-{grp}.txt"), "w") as f:
+                f.write("\n".join(grp_tlds))
+
+        req_mem_file = args["grp_dir"].joinpath("tld_est_mem.parquet")
         req_mem_file.unlink(missing_ok=True)
 
         req_mem.write_parquet(req_mem_file)
 
         submit_batch(**args)
-    
+
     else:
         hivize(**args)
-    pass
\ No newline at end of file
+    pass
-- 
GitLab