diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py
index 89d5bb30cbe79bc1962faf40f63456c716047c92..0fe64634dcc47dde28327efd60c8c8942754506e 100644
--- a/src/rc_gpfs/cli/convert_flat_to_hive.py
+++ b/src/rc_gpfs/cli/convert_flat_to_hive.py
@@ -1,26 +1,29 @@
 import argparse
 import subprocess
+import re
 import time
 import random
 from pathlib import Path
 import polars as pl
+from typing import List
 
 from ..policy.hive import hivize
-from .utils import define_python_interpreter,batch_parser,setup_slurm_logs
+from .utils import define_python_interpreter,batch_parser_no_mem,setup_slurm_logs
+from ..utils import get_parquet_dataset_size
 
 DESCRIPTION = """
 Converts flat parquet GPFS datasets to a hive format partitioned by tld and log acquisition date. This creates a timeseries of structured datasets for each tld for much easier more efficient log comparisons within tld. Output parquets are sorted by the path column for convenience, but no index is set.
 
-Setting the --batch flag will create a Slurm array job where each task processes one tld, either from the passed parameter or from the unique values in the parquet dataset.
+Setting the --batch flag will create a Slurm array job where each task processes one tld, either from the passed parameter or from the unique values in the parquet dataset. Each tld's in-memory size will be estimated based on header information from the parquet dataset and how many records a given tld has. These estimated sizes are used to group tlds, and each group will be submitted as a separate array job with different requested memory. This helps maximize throughput while also mitigating OOM errors.
 
 All processing is done via Polars and so can take advantage of parallel processing. Higher core counts can provide a limited benefit to performance, and a GPU is not required.
 """
 
-def parse_args():
+def parse_args(cli_args=None):
     parser = argparse.ArgumentParser(
         description=DESCRIPTION,
         formatter_class=argparse.RawTextHelpFormatter,
-        parents=[batch_parser(cpus_per_task=16, gpus=0, partition='amd-hdr100', mem='32G',time='02:00:00')]
+        parents=[batch_parser_no_mem(cpus_per_task=8, gpus=0, partition='amd-hdr100',time='02:00:00')]
     )
     parser.add_argument('parquet_path',
                         type=Path,
@@ -35,18 +38,26 @@ def parse_args():
     parser.add_argument('--partition-size',
                         dest='partition_chunk_size_bytes',
                         type=str,
-                        default='100MiB',
+                        default='200MiB',
                         help='Max size of in-memory data for each partition in a single hive dataset. Smaller partitions cause more files to be written. Can pass the byte size as an integer or as a human-readable byte string. For example, 1024 and 1KiB are equivalent.')
+    parser.add_argument('--mem-factor',
+                        type=int,
+                        default=3,
+                        help="Factor to scale each tld's estimated memory by when setting memory requirements in the Slurm job. A factor of 3 accounts for almost all cases. A lower factor can result in more OOM errors but would increase task throughput, and vice versa for a higher factor.")
     parser.add_argument('--no-clobber',
                         default=False,
                         action='store_true',
                         help="Flag to set whether contents of a hive cell will be overwritten. If True, the pipeline will exit if any parquet files are found in the cell directory. No processing will occur in that case. If False (default), any files existing in the cell directory will be removed prior to data writing.")
+    parser.add_argument('--dry-run',
+                        default=False,
+                        action='store_true',
+                        help="If set, no jobs will be submitted at the end. Tld grouping still occurs, and the text files with the groups are still written.")
 
-    args = parser.parse_args()
+    args = parser.parse_args(cli_args)
     return vars(args)
 
 SLURM_OPTS = """\
-#SBATCH --job-name=hivize
+#SBATCH --job-name=hivize-{mem}
 #SBATCH --ntasks={ntasks}
 #SBATCH --cpus-per-task={cpus_per_task}
 #SBATCH --partition={partition}
@@ -78,17 +89,76 @@ def submit_batch(**kwargs):
 
     script = f"#!/bin/bash\n#\n{slurm_opts}\n{BATCH_CMDS.format(**kwargs)}"
 
-    # Wait between 1 and 5 seconds before batch submission. This helps avoid a situation where this setup is running in
-    # a batch array job and all of the array tasks submit their child array jobs at the same time. That results in jobs
-    # failing to be submitted due to overwhelming the scheduler with simultaneous requests. Adding a random delay should
-    # fix that
-    time.sleep(random.uniform(1,5))
+    if not kwargs['dry_run']:
+        # Wait between 1 and 5 seconds before batch submission. This helps avoid a situation where this setup is running
+        # in a batch array job and all of the array tasks submit their child array jobs at the same time. That results
+        # in jobs failing to be submitted due to overwhelming the scheduler with simultaneous requests. Adding a random
+        # delay should fix that
+        time.sleep(random.uniform(1,5))
 
-    subprocess.run(['sbatch'],input=script,shell=True,text=True)
+        subprocess.run(['sbatch'],input=script,shell=True,text=True)
     pass
 
-def convert_flat_to_hive():
-    args = parse_args()
+def submit_tld_groups(df,grp_dir,args):
+    mem_grp = df[0,'req_mem_grp']
+    
+    tld_file = grp_dir.joinpath(f"tld-{mem_grp}.txt")
+    
+    tlds = df["tld"].to_list()
+
+    with open(tld_file,'wt') as f:
+        f.writelines('\n'.join(tlds))
+    
+    args['ntlds'] = len(tlds)
+    args['tld_file'] = tld_file
+    args['mem'] = mem_grp
+
+    submit_batch(**args)
+    pass
+
+def get_tld_row_counts(parquet_path: Path) -> pl.DataFrame:
+    
+    tld_rows = (
+        pl.scan_parquet(parquet_path, parallel="prefiltered")
+        .group_by("tld")
+        .agg(pl.col("tld").count().alias("file_count"))
+        .collect(engine="streaming")
+    )
+    return tld_rows
+
+def estimate_req_mem(
+    parquet_path: Path, 
+    tld: List[str] | pl.Series, 
+    mem_factor: int = 3
+) -> pl.DataFrame:
+    mem_breaks = [8, 16, 32, 64]
+    mem_labels = ["8G", "16G", "32G", "64G", "128G"]
+
+    dataset_size = get_parquet_dataset_size(parquet_path) / (1024**3)
+
+    tld_sizes = get_tld_row_counts(parquet_path)
+
+    tld_sizes = (
+        tld_sizes.with_columns(
+            (pl.col("file_count") / pl.col("file_count").sum() * dataset_size).alias(
+                "est_size"
+            )
+        )
+        .with_columns((pl.col("est_size") * mem_factor).alias("est_req_mem"))
+        .with_columns(
+            (
+                pl.col("est_req_mem")
+                .cut(breaks=mem_breaks, labels=mem_labels)
+                .alias("req_mem_grp")
+            )
+        )
+        .filter(pl.col("tld").is_in(tld))
+    )
+
+    return tld_sizes
+
+def convert_flat_to_hive(cli_args=None):
+    args = parse_args(cli_args)
 
     if args['tld'] is None:
         tlds = (
@@ -102,18 +172,34 @@ def convert_flat_to_hive():
         )
     else:
         tlds = args['tld'].split(',')
+
+    if args['no_clobber']:
+        acq = re.search(r"\d{4}-\d{2}-\d{2}", str(args['parquet_path'].absolute())).group(0)
+
+        existing_acq_paths = [p for p in args['hive_path'].rglob(f'*/acq={acq}') if len(list(p.glob('*.parquet'))) > 0]
+        existing_tlds = [re.search(r"tld=([^/]+)/", str(p)).group(1) for p in existing_acq_paths]
+
+        args['tld'] = [t for t in tlds if t not in existing_tlds]
+
+        if len(args['tld']) == 0:
+            print("INFO: All tlds already exist, and no-clobber is set. Exiting without converting")
+            return
     
     if args['batch']:
-        tld_file = args['parquet_path'].parent.joinpath('misc','tld.txt')
-        tld_file.parent.mkdir(parents=True,exist_ok=True)
+        req_mem = estimate_req_mem(args['parquet_path'],args['tld'],args['mem_factor'])
         
-        with open(tld_file,'wt') as f:
-            f.writelines('\n'.join(tlds))
+        grp_dir = args["parquet_path"].parent.joinpath("misc")
+        grp_dir.mkdir(parents=True,exist_ok=True)
+        [f.unlink() for f in grp_dir.glob('tld*.txt')]
         
-        args['ntlds'] = len(tlds)
-        args['tld_file'] = tld_file
+        req_mem_file = grp_dir.joinpath('tld_est_mem.parquet')
+        req_mem_file.unlink(missing_ok=True)
+
+        req_mem.write_parquet(req_mem_file)
 
-        submit_batch(**args)
+        for grp, df in req_mem.group_by('req_mem_grp'):
+            print(f"INFO: Submitting array job for {grp[0]}",flush=True)
+            submit_tld_groups(df,grp_dir,args)
     
     else:
         _ = args.pop('tld')
diff --git a/src/rc_gpfs/cli/utils.py b/src/rc_gpfs/cli/utils.py
index 18cb8fb142b99bccad2a03426f38588a333ea38f..e879259e8cbf9581a229fbbaa2e96fbcf08be5d4 100644
--- a/src/rc_gpfs/cli/utils.py
+++ b/src/rc_gpfs/cli/utils.py
@@ -27,6 +27,39 @@ class CustomHelpFormatter(argparse.MetavarTypeHelpFormatter):
         actions = sorted(actions, key=lambda x: x.container.title if x.container.title else '')
         super(CustomHelpFormatter, self).add_arguments(actions)
 
+def batch_parser_no_mem(
+    cpus_per_task: int | None = None,
+    gpus: int | None = None,
+    partition: str | None = None,
+    time: str | None = "12:00:00",
+    reservation: str | None = None,
+    slurm_log_dir: str | Path | None = "./out",
+    **kwargs,
+) -> argparse.ArgumentParser:
+    parser = argparse.ArgumentParser(
+        add_help=False, formatter_class=CustomHelpFormatter
+    )
+    slurm = parser.add_argument_group(title="Slurm Options")
+    slurm.add_argument(
+        "--batch",
+        action="store_true",
+        default=False,
+        help="Convert as a batch array job.",
+    )
+    slurm.add_argument("-n", "--ntasks", type=int, default=1)
+    slurm.add_argument("-c", "--cpus-per-task", type=int, default=cpus_per_task)
+    slurm.add_argument("-g", "--gpus", type=int, default=gpus, choices=[0, 1])
+    slurm.add_argument("-p", "--partition", type=str, default=partition)
+    slurm.add_argument("-t", "--time", type=str, default=time)
+    slurm.add_argument("--reservation", type=str, default=reservation)
+    slurm.add_argument(
+        "--slurm-log-dir",
+        type=Path,
+        default=slurm_log_dir,
+        help="Output log directory. If the directory does not exist, it will be created automatically.",
+    )
+    return parser
+
 def batch_parser(
         cpus_per_task: int | None = None,
         gpus: int | None = None, 
diff --git a/src/rc_gpfs/policy/hive.py b/src/rc_gpfs/policy/hive.py
index 03d1c5bb53f4676e1886587b8d089cac168121c0..db840bc862bf8df3ed9d5226e64f5b8a3fea86a9 100644
--- a/src/rc_gpfs/policy/hive.py
+++ b/src/rc_gpfs/policy/hive.py
@@ -16,53 +16,54 @@ from ..utils import (
     calculate_age_distribution
 )
 
-def collect_hive_df(parquet_path: str | Path,  acq: str, tld: str | List[str] | None = None):
-    queries = []
-    for pq in parquet_path.glob('*.parquet'):
-        q = (
-            pl.scan_parquet(pq,parallel='prefiltered',rechunk=True)
-            .select(['size','kballoc','access','create','modify','uid','gid','path','tld'])
-            .with_columns(pl.lit(acq).alias('acq'))
+def collect_hive_df(
+        parquet_path: str | Path,
+        acq: str, 
+        tld: str | List[str] | None = None,
+        hive_path: str | Path | None = None,
+        no_clobber: bool = False
+) -> pl.DataFrame:
+    if not isinstance(tld,list) and tld is not None:
+        tld = [tld]
+    
+    print("Collecting dataframe",flush=True)
+        
+    df = (
+        pl.scan_parquet(parquet_path, parallel="prefiltered", rechunk=True)
+        .select(
+            [
+                "size",
+                "kballoc",
+                "access",
+                "create",
+                "modify",
+                "uid",
+                "gid",
+                "path",
+                "tld",
+            ]
         )
+        .with_columns(pl.lit(acq).alias("acq"))
+    )
 
-        if tld is not None:
-            q = q.filter(pl.col('tld').is_in(tld))
-
-        queries.append(q)
+    if tld is not None:
+        df = df.filter(pl.col("tld").is_in(tld))
 
-    print("Collecting queries",flush=True)
-    dfs = pl.collect_all(queries,engine='streaming')
-    df = pl.concat(dfs).sort('path')
-    print("Finished collecting queries",flush=True)
-    return df
+    if no_clobber:
+        existing_tlds = _get_existing_hive_cells(hive_path,acq)
+        df = df.filter(pl.col('tld').is_in(existing_tlds).not_())
 
-def _remove_clobbered_cells(df: pl.DataFrame, hive_path: Path):
-    hive_cells = (
-        df.select("tld", "acq")
-        .unique()
-        .with_columns(
-            pl.struct("tld", "acq")
-            .map_elements(
-                lambda x: str(hive_path.joinpath(f"tld={x['tld']}", f"acq={x['acq']}")),
-                return_dtype=pl.String,
-            )
-            .alias("hive_cell")
-        )
-    )
+    df = df.collect(engine='streaming')
 
-    clobbered = []
-    for s in hive_cells["hive_cell"].to_list():
-        pqs = list(Path(s).glob("*.parquet"))
-        if len(pqs) > 0:
-            clobbered.append(s)
+    print("Finished collecting queries", flush=True)
 
-    no_clobber = (
-        df
-        .join(hive_cells, how="left", on=["tld", "acq"])
-        .filter(pl.col("hive_cell").is_in(clobbered).not_())
-    )
+    return df
 
-    return no_clobber
+def _get_existing_hive_cells(hive_path: str | Path, acq: str):
+    hive_path = as_path(hive_path)
+    existing_pq = hive_path.rglob(f"*/acq={acq}/*.parquet")
+    existing_tlds = list(set([p.parent.parent.name.removeprefix("tld=") for p in existing_pq]))
+    return existing_tlds
 
 def hivize(
         parquet_path: str | Path,
@@ -100,14 +101,15 @@ def hivize(
     acq = re.search(r'[\d]{4}-[\d]{2}-[\d]{2}',str(parquet_path)).group(0)
     print(f"DEBUG: Acquisition date is {acq}",flush=True)
 
-    df = collect_hive_df(parquet_path,acq,tld)
+    df = collect_hive_df(parquet_path, acq, tld, hive_path, no_clobber)
 
-    if no_clobber:
-        df = _remove_clobbered_cells(df, hive_path)
-        if df.is_empty():
-            print("INFO: All passed tlds already have parquet files in their hive cell directories. Cleaning temp directories and exiting",flush=True)
-            shutil.rmtree(staging_path)
-            return
+    if df.is_empty():
+        print(
+            "INFO: All passed tlds already have parquet files in their hive cell directories. Aborting",
+            flush=True,
+        )
+        shutil.rmtree(staging_path)
+        return
 
     print('Writing to hive')
     df.write_parquet(
@@ -119,7 +121,11 @@ def hivize(
     print("Finished writing hive dataset",flush=True)
 
     if write_metadata:
-        for grp in df['tld'].unique():
+        tlds = df["tld"].unique().to_list()
+        df = df.drop(
+            [c for c in df.columns if c not in ["tld", "size", "modify", "access"]]
+        ).lazy()
+        for grp in tlds:
             tdf = df.filter(pl.col('tld').eq(grp))
             output_dir=staging_path.joinpath(f"tld={grp}",f"acq={acq}")
             write_dataset_metadata(tdf,output_dir,acq)
@@ -128,49 +134,46 @@ def hivize(
     shutil.rmtree(staging_path)
 
 def write_dataset_metadata(
-        df: pl.DataFrame, 
+        df: pl.DataFrame | pl.LazyFrame, 
         parquet_path: Path,
         acq: str,
         **kwargs
 ) -> dict:
-    size_df = df[['size']]
-    size_df = size_df.with_columns(calculate_size_distribution(size_df['size'],**kwargs).alias('grp'))
-        
+    df = df.lazy()
+
     size_dist = (
-        size_df
-        .group_by('grp')
-        .agg([
-            pl.sum('size').alias('bytes'),
-            pl.count('size').alias('file_count')
-        ])
+        df.select("size")
+        .with_columns(
+            calculate_size_distribution(pl.col("size"), **kwargs).alias("grp")
+        )
+        .group_by("grp")
+        .agg([pl.sum("size").alias("bytes"), pl.count("size").alias("file_count")])
+        .sort("grp", descending=True)
+        .collect(engine="streaming")
         .to_dicts()
     )
 
-    access_df = df[['access','size']]
-    access_df = access_df.with_columns(calculate_age_distribution(access_df['access'],acq,**kwargs).alias('grp'))
-
     access_dist = (
-        access_df
-        .group_by('grp')
-        .agg([
-            pl.sum('size').alias('bytes'),
-            pl.count('size').alias('file_count')
-        ])
-        .sort('grp',descending=True)
+        df.select("access", "size")
+        .with_columns(
+            calculate_age_distribution(pl.col('access'), acq, **kwargs).alias("grp")
+        )
+        .group_by("grp")
+        .agg([pl.sum("size").alias("bytes"), pl.count("size").alias("file_count")])
+        .sort("grp", descending=True)
+        .collect(engine='streaming')
         .to_dicts()
     )
- 
-    modify_df = df[['modify','size']]
-    modify_df = modify_df.with_columns(calculate_age_distribution(modify_df['modify'],acq,**kwargs).alias('grp'))
-
+    
     modify_dist = (
-        modify_df
-        .group_by('grp')
-        .agg([
-            pl.sum('size').alias('bytes'),
-            pl.count('size').alias('file_count')
-        ])
-        .sort('grp',descending=True)
+        df.select("modify", "size")
+        .with_columns(
+            calculate_age_distribution(pl.col("modify"), acq, **kwargs).alias("grp")
+        )
+        .group_by("grp")
+        .agg([pl.sum("size").alias("bytes"), pl.count("size").alias("file_count")])
+        .sort("grp", descending=True)
+        .collect(engine="streaming")
         .to_dicts()
     )
 
diff --git a/src/rc_gpfs/utils/core.py b/src/rc_gpfs/utils/core.py
index ff8fa4cd75d1d93161ad615095a101e0c8902a3c..1409d4f2633ef35b457cd0ad55a55bcc9ede6d65 100644
--- a/src/rc_gpfs/utils/core.py
+++ b/src/rc_gpfs/utils/core.py
@@ -4,6 +4,7 @@ import subprocess
 from pathlib import Path
 from typing import List, Literal, Tuple
 import polars as pl
+import pyarrow.parquet as pq
 import numpy as np
 
 from .units import as_bytes, convert_si, create_size_bin_labels
@@ -103,4 +104,14 @@ def calculate_age_distribution(
         .cast(pl.String)
         .cast(pl.Enum(age_labels))
     )
-    return age_grps
\ No newline at end of file
+    return age_grps
+
+def get_parquet_dataset_size(parquet_path):
+    tot_size = 0
+
+    for p in parquet_path.glob("*.parquet"):
+        md = pq.read_metadata(p)
+        for rg in range(0, md.num_row_groups):
+            tot_size += md.row_group(rg).total_byte_size
+    
+    return tot_size
\ No newline at end of file