From 88e288ba26843f0ed0b47af656ed5ab4dc8e8190 Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Thu, 2 Jan 2025 17:38:45 -0600
Subject: [PATCH] Add conversion of flat parquet structure to hive

---
 .gitignore                              |  26 +++-
 example-job-scripts/convert-logs.sh     |  19 +++
 example-job-scripts/convert-to-hive.sh  |  21 ++++
 example-job-scripts/split-logs.sh       |  19 +++
 pyproject.toml                          |   1 +
 src/rc_gpfs/cli/__init__.py             |   1 +
 src/rc_gpfs/cli/convert_flat_to_hive.py | 153 ++++++++++++++++++++++++
 src/rc_gpfs/cli/convert_to_parquet.py   |  38 ++----
 src/rc_gpfs/cli/split_log.py            |  27 ++---
 src/rc_gpfs/cli/utils.py                |  42 ++++++-
 src/rc_gpfs/compute/backend.py          |   2 +-
 src/rc_gpfs/policy/__init__.py          |   2 +-
 src/rc_gpfs/policy/convert.py           |  81 ++++++++++++-
 src/rc_gpfs/policy/split.py             |   3 +-
 src/rc_gpfs/policy/utils.py             |   6 -
 src/rc_gpfs/utils.py                    |   8 +-
 16 files changed, 386 insertions(+), 63 deletions(-)
 create mode 100644 example-job-scripts/convert-logs.sh
 create mode 100644 example-job-scripts/convert-to-hive.sh
 create mode 100644 example-job-scripts/split-logs.sh
 create mode 100644 src/rc_gpfs/cli/convert_flat_to_hive.py

diff --git a/.gitignore b/.gitignore
index e979021..c5d8ecd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,14 +1,30 @@
+# Ignore paths to actual GPFS logs
 data
-local-data/
-joblogs/
+
+# Ignore all potential Slurm outputs from running jobs
 slurm-*
 out/
 err/
-*.sif
+
+# Ignore cache directories
 __pycache__
+
+# Ignore quarto outputs
 quarto*
-cufile.log
 *.html
 general-report_files
+
+# Ignore CUDA and dask logs
+cufile.log
+output.log
+rmm_log.txt
+dask-logs/*
+
+# Ignore poetry configuration
 poetry.toml
-.vscode
\ No newline at end of file
+
+# Ignore local vscode config
+.vscode
+
+# Ignore random extra files
+extra/
\ No newline at end of file
diff --git a/example-job-scripts/convert-logs.sh b/example-job-scripts/convert-logs.sh
new file mode 100644
index 0000000..f503652
--- /dev/null
+++ b/example-job-scripts/convert-logs.sh
@@ -0,0 +1,19 @@
+#! /bin/bash
+#
+#SBATCH --job-name=convert
+#SBATCH --ntasks=1
+#SBATCH --cpus-per-task=1
+#SBATCH --mem=8G
+#SBATCH --partition=amd-hdr100,intel-dcb,express
+#SBATCH --time=02:00:00
+#SBATCH --output=out/convert-%A-%a.out
+#SBATCH --error=out/convert-%A-%a.err
+#SBATCH --array=0-49
+
+module load Anaconda3
+conda activate gpfs-dev
+
+logs=($(find /data/rc/gpfs-policy/data -path "*/list-policy_data-project_list-path-external_slurm-*/chunks"))
+log=${logs[${SLURM_ARRAY_TASK_ID}]}
+
+convert-to-parquet --batch --no-clobber --partition=amd-hdr100,express,intel-dcb ${log}
\ No newline at end of file
diff --git a/example-job-scripts/convert-to-hive.sh b/example-job-scripts/convert-to-hive.sh
new file mode 100644
index 0000000..6178f1f
--- /dev/null
+++ b/example-job-scripts/convert-to-hive.sh
@@ -0,0 +1,21 @@
+#! /bin/bash
+#
+#SBATCH --job-name=hive-setup
+#SBATCH --ntasks=1
+#SBATCH --cpus-per-task=16
+#SBATCH --mem=90G
+#SBATCH --partition=amperenodes
+#SBATCH --time=02:00:00
+#SBATCH --reservation=rc-gpfs
+#SBATCH --gres=gpu:1
+#SBATCH --output=out/hive-setup-%A-%a.out
+#SBATCH --error=out/hive-setup-%A-%a.err
+#SBATCH --array=0-49
+
+module load Anaconda3
+conda activate gpfs-dev
+
+parquets=($(find /data/rc/gpfs-policy/data -path "*/list-policy_data-project_list-path-external_slurm-*/parquet"))
+pq=${parquets[${SLURM_ARRAY_TASK_ID}]}
+
+convert-to-hive --batch ${pq} /scratch/mdefende/project-hive
diff --git a/example-job-scripts/split-logs.sh b/example-job-scripts/split-logs.sh
new file mode 100644
index 0000000..137fcd3
--- /dev/null
+++ b/example-job-scripts/split-logs.sh
@@ -0,0 +1,19 @@
+#! /bin/bash
+#
+#SBATCH --job-name=split
+#SBATCH --ntasks=1
+#SBATCH --cpus-per-task=24
+#SBATCH --mem=8G
+#SBATCH --partition=amd-hdr100
+#SBATCH --time=02:00:00
+#SBATCH --output=out/split-%A-%a.out
+#SBATCH --error=out/split-%A-%a.err
+#SBATCH --array=0-49
+
+module load Anaconda3
+conda activate gpfs-dev
+
+logs=($(find /data/rc/gpfs-policy/data -path "*/list-policy_data-project_list-path-external_slurm-*/raw/*.gz"))
+log=${logs[${SLURM_ARRAY_TASK_ID}]}
+
+split-log --no-clobber ${log}
\ No newline at end of file
diff --git a/pyproject.toml b/pyproject.toml
index f7f43d5..1edf54b 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -34,6 +34,7 @@ url="https://pypi.nvidia.com"
 priority = "supplemental"
 
 [tool.poetry.scripts]
+convert-to-hive = "rc_gpfs.cli:convert_flat_to_hive"
 convert-to-parquet = "rc_gpfs.cli:convert_to_parquet"
 split-log = "rc_gpfs.cli:split_log"
 
diff --git a/src/rc_gpfs/cli/__init__.py b/src/rc_gpfs/cli/__init__.py
index fcbd7b1..305d2af 100644
--- a/src/rc_gpfs/cli/__init__.py
+++ b/src/rc_gpfs/cli/__init__.py
@@ -1,2 +1,3 @@
+from .convert_flat_to_hive import convert_flat_to_hive
 from .convert_to_parquet import convert_to_parquet
 from .split_log import split_log
\ No newline at end of file
diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py
new file mode 100644
index 0000000..f0e169f
--- /dev/null
+++ b/src/rc_gpfs/cli/convert_flat_to_hive.py
@@ -0,0 +1,153 @@
+import argparse
+import subprocess
+from pathlib import Path
+
+import dask.dataframe as dd
+import dask.config
+dask.config.set({'dataframe.backend':'cudf'})
+from dask.diagnostics import ProgressBar
+
+from .utils import define_python_interpreter,batch_parser,setup_slurm_logs
+from ..policy import hivize
+
+DESCRIPTION = """
+Converts flat parquet GPFS datasets to a hive format partitioned by tld and log acquisition date. This essentially creates a timeseries of structured datasets for each tld for much easier more efficient log comparisons within tld. Each file path is set as the index and sorted, and all final output parquets are partitioned to have similar in-memory sizes.
+
+Batch Processing
+----------------
+
+This script can be used to convert a number of tlds directly or to automatically process all tlds in a batch array job. When submitting a batch job, tlds will be grouped by their estimated memory usage over the entire parquet dataset, roughly related to the number of rows belonging to any given tld. A tld taking up more space in the log will be grouped with fewer (or no) other tlds than a tld taking up a smaller space. The max size of a group in memory is editable by the user.
+
+This method was implemented to account for GPFS logs being larger than memory in most cases, especially when using GPUs for processing. Sorting along an index is critical for cross-dataset join performance and must be done for efficient storage of parquet files within each parquet directory. However, this initial sort requires the full dataset to be loaded into memory which is not possible on our current hardware for very large logs while leaving memory available for actual processing.
+
+When submitting a batch array, be cognizant of the amount of memory available to your job, either RAM for CPU tasks or VRAM for GPU tasks. For example, on an 80 GiB A100, setting an upper group limit of 40 GiB would suffice in most cases. This will minimize the number of array tasks while ensuring enough memory is free for processing.
+"""
+
+def parse_args():
+    parser = argparse.ArgumentParser(
+        description=DESCRIPTION,
+        formatter_class=argparse.RawTextHelpFormatter,
+        parents=[batch_parser(cpus_per_task=16, gpus=1, partition='amperenodes', mem='90G')]
+    )
+    parser.add_argument('parquet_path',
+                        type=Path,
+                        help="Path to a directory containing a flat parquet dataset.")
+    
+    parser.add_argument('hive_path',
+                        type=Path,
+                        help="Parent directory for the hive. This can be either a new directory or an existing hive directory. If this is an existing hive dataset, new data will be appended to the old data. This will not alter any existing data provided the tld or date of acquisition differ from the existing hive data.")
+    parser.add_argument('--tld',
+                        type=str,
+                        help='Comma-separated list of tld to convert to hive')
+    parser.add_argument('--cutoff',
+                        type=float,
+                        default=30)
+    parser.add_argument('--grp-file',
+                        type=Path,
+                        help="Path to an existing group file for batch processing.")
+
+    args = parser.parse_args()
+    return vars(args)
+
+BATCH_SCRIPT = """\
+#!/bin/bash
+#
+#SBATCH --job-name=hivize
+#SBATCH --ntasks={ntasks}
+#SBATCH --cpus-per-task={cpus_per_task}
+#SBATCH --partition={partition}
+#SBATCH --time={time}
+#SBATCH --mem={mem}
+#SBATCH --gres=gpu:{gpus}
+#SBATCH --output={output_log}
+#SBATCH --error={error_log}
+#SBATCH --array=1-{ngroups}
+
+{env_cmd}
+
+tld=$(sed -n "${{SLURM_ARRAY_TASK_ID}}p" {grp_file})
+
+convert-to-hive --tld ${{tld}} {parquet_path} {hive_path}
+"""
+
+def submit_batch(**kwargs):
+    env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env'))
+    kwargs.update({"env_cmd":env_cmd})
+    
+    slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'hive')
+    kwargs.update(slurm_logs)
+    
+    script = BATCH_SCRIPT.format(**kwargs)
+
+    subprocess.run(['sbatch'],input=script,shell=True,text=True)
+    pass
+
+def split_into_groups(series, cutoff):
+    groups = []
+    
+    while len(series.index) > 0:
+        current_group = []
+        current_sum = 0
+        for username, storage_size in series.items():
+            if storage_size > cutoff:
+                groups.append({username})
+                series = series.drop(username)
+                break
+            elif current_sum + storage_size <= cutoff:
+                current_group.append(username)
+                current_sum += storage_size
+        
+        series = series.drop(current_group)
+        if current_group:
+            groups.append(set(current_group))
+    
+    return groups
+
+def calc_tld_mem(df):
+    mem = df.groupby('tld',observed=True).apply(lambda x: x.memory_usage(deep=True).sum())
+    return mem
+
+def define_tld_groups(input,cutoff):
+    ddf = dd.read_parquet(input,columns = ['size','kballoc','access','create','modify','uid','gid','path','tld'])
+    with ProgressBar():
+        tld_mem = ddf.map_partitions(calc_tld_mem).compute()
+    tld_mem = tld_mem.groupby(tld_mem.index).sum().divide(1024**3).to_pandas()
+    grps = split_into_groups(tld_mem,cutoff)
+    return grps
+
+def nested_list_to_log(nest,file):
+    """
+    Writes a list of lists to a text log
+
+    Args:
+    nest (list): A list of lists to be converted.
+    """
+    with open(file, 'w', newline='') as f:
+        for l in nest:
+            f.write(f"{','.join(l)}\n")
+
+def convert_flat_to_hive():
+    args = parse_args()
+
+    if args.get('batch'):
+        if not args.get('grp_file'):
+            grps = define_tld_groups(args.get('parquet_path'),args.get('cutoff'))
+            
+            misc_path = args.get('parquet_path').parent.joinpath('misc','tld_grps.txt')
+            misc_path.parent.mkdir(exist_ok = True, parents = True)
+            nested_list_to_log(grps,misc_path)
+            ngroups = len(grps)
+            grp_file = str(misc_path)
+            args.update({'ngroups':ngroups,
+                     'grp_file':grp_file})
+        else:
+            ngroups = sum(1 for line in open(args.get('grp_file')))
+            args.update({'ngroups':ngroups}) 
+
+        submit_batch(**args)
+
+    else:
+        tld = args.get('tld').split(',')
+        args.update({'tld':tld})
+        hivize(**args)
+    pass
\ No newline at end of file
diff --git a/src/rc_gpfs/cli/convert_to_parquet.py b/src/rc_gpfs/cli/convert_to_parquet.py
index e684935..f035843 100644
--- a/src/rc_gpfs/cli/convert_to_parquet.py
+++ b/src/rc_gpfs/cli/convert_to_parquet.py
@@ -1,8 +1,9 @@
+import sys
 import argparse
 import subprocess
 from pathlib import Path
 import multiprocessing
-from .utils import define_python_interpreter
+from .utils import define_python_interpreter,batch_parser,setup_slurm_logs
 from ..policy import convert
 from ..utils import parse_scontrol
 
@@ -23,7 +24,7 @@ Local Parallel Processing:
 def parse_args():
     parser = argparse.ArgumentParser(
         description=DESCRIPTION,
-        formatter_class=argparse.RawTextHelpFormatter,
+        parents=[batch_parser(partition='amd-hdr100,express',time='02:00:00',mem='16G',cpus_per_task=1)],
         epilog=EPILOGUE
     )
     parser.add_argument('input',
@@ -36,21 +37,8 @@ def parse_args():
                         help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../parquet")
     parser.add_argument('--pool-size',type=int,default=None,
                         help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process")
-
-    slurm = parser.add_argument_group(title='Slurm Options')
-    slurm.add_argument('--batch', action='store_true', default=False,
-                       help="Run the conversion as a batch job. If a directory path is given as an input, an array job will be created to run the conversion in parallel.")
-    slurm.add_argument('-n','--ntasks', type=int, default=1)
-    slurm.add_argument('-p','--partition', type=str, default='amd-hdr100')
-    slurm.add_argument('-t','--time',type=str,default='02:00:00')
-    slurm.add_argument('-m','--mem',type=str,default='16G')
-    slurm.add_argument('--slurm-log-dir',type=Path, default='./out',
-                       help='Output log directory. If the directory does not exist, it will be created automatically. Logs will be named convert_%%A_%%a to differentiate amongs job IDs and task IDs')
-    interpreter = slurm.add_mutually_exclusive_group()
-    interpreter.add_argument('--python-path',type=Path,
-                       help="Path to Python interpreter to use for conversion. This interpreter should have access to a pandas library, preferably version >=2.0. If not specified, the path to the active python3 interpreter will be used.")
-    interpreter.add_argument('--conda-env',type=str,default=None,
-                             help="The name or prefix of a conda environment to activate as opposed to a python3 path")
+    parser.add_argument('--no-clobber', action='store_true',default=False,
+                        help='When set and existing parquet files are found, immediately exits without any processing')
     args = parser.parse_args()
     return vars(args)
 
@@ -73,18 +61,13 @@ log=$(ls {input}/*.gz | awk "NR==${{SLURM_ARRAY_TASK_ID}} {{ print $1 }}")
 convert-to-parquet -o {output_dir} ${{log}}
 """
 
-def setup_slurm_logs(slurm_log_dir):
-    slurm_log_dir = slurm_log_dir.absolute()
-    slurm_log_dir.mkdir(exist_ok = True,parents=True,mode = 0o2770)
-    out_log,err_log = [str(slurm_log_dir.joinpath('convert_%A_%a.out')),str(slurm_log_dir.joinpath('convert_%A_%a.err'))]
-    slurm_logs = {'output_log':out_log,'error_log':err_log}
-    return slurm_logs
+
 
 def submit_batch(**kwargs):
     env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env'))
     kwargs.update({"env_cmd":env_cmd})
     
-    slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'))
+    slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'parquet')
     kwargs.update(slurm_logs)
     
     script = BATCH_SCRIPT.format(**kwargs)
@@ -96,8 +79,12 @@ def convert_to_parquet() -> None:
     args = parse_args()
     if args['output_dir'] is None:
         args['output_dir'] = args['input'].parent.joinpath('parquet')
-    
+
     args['output_dir'].mkdir(exist_ok = True, mode = 0o2770)
+
+    output_files_exist = len(list(args['output_dir'].glob('*.parquet'))) > 0
+    if args['no_clobber'] and output_files_exist:
+        sys.exit('The output directory already contains parquet files. Exiting')
     
     if args['input'].is_file():
         nlogs = 1
@@ -117,4 +104,3 @@ def convert_to_parquet() -> None:
             pool.starmap(convert, fargs)
     else:
         convert(args['input'],args['output_dir'])
-    pass
diff --git a/src/rc_gpfs/cli/split_log.py b/src/rc_gpfs/cli/split_log.py
index 6c37bde..c0cc64a 100644
--- a/src/rc_gpfs/cli/split_log.py
+++ b/src/rc_gpfs/cli/split_log.py
@@ -1,7 +1,8 @@
+import sys
 import argparse
 import subprocess
 from pathlib import Path
-from .utils import define_python_interpreter
+from .utils import define_python_interpreter,batch_parser,setup_slurm_logs
 from ..policy import split
 
 BATCH_SCRIPT = """\
@@ -24,7 +25,7 @@ split-log -o {output_dir} -l {lines} {log}
 def parse_args():
     parser = argparse.ArgumentParser(
         description="Splits a GPFS policy log into multiple parts for batch array processing.",
-        formatter_class=argparse.RawTextHelpFormatter
+        parents=[batch_parser(cpus_per_task=24,time='02:00:00',mem='8G',gpus=0,partition='amd-hdr100,express')]
     )
     parser.add_argument('log',
                         type=Path,
@@ -36,17 +37,9 @@ def parse_args():
                         help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../chunks")
     parser.add_argument('-l', '--lines', type=int, default=int(5e6),
                         help="Number of lines to split the log file by")
+    parser.add_argument('--no-clobber', action='store_true',default=False,
+                        help='When set and existing split logs are found, immediately exits without any processing')
 
-    slurm = parser.add_argument_group(title='Slurm Options')
-    slurm.add_argument('--batch', action='store_true', default=False,
-                       help="Run as a batch job. Otherwise use the current processing environment")
-    slurm.add_argument('-n','--cpus-per-task', type=int, default=24,
-                       help="Number of cores assigned to the job. Ntasks is always set to 1")
-    slurm.add_argument('-p','--partition', type=str, default='amd-hdr100')
-    slurm.add_argument('-t','--time',type=str,default='02:00:00')
-    slurm.add_argument('-m','--mem',type=str,default='8G')
-    slurm.add_argument('--slurm-log-dir',type=Path,default='./out',
-                       help='Output log directory. If the directory does not exist, it will be created automatically')
     args = parser.parse_args()
     return vars(args)
 
@@ -54,10 +47,8 @@ def submit_batch(**kwargs):
     env_cmd = define_python_interpreter()
     kwargs.update({"env_cmd":env_cmd})
     
-    kwargs.get('slurm_log_dir').mkdir(exist_ok=True,parents=True,mode = 0o2770)
-    output_log = kwargs.get('slurm_log_dir').joinpath('split.out')
-    error_log = kwargs.get('slurm_log_dir').joinpath('split.err')
-    kwargs.update({'output_log':output_log,'error_log':error_log})
+    slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'split')
+    kwargs.update(slurm_logs)
 
     script = BATCH_SCRIPT.format(**kwargs)
 
@@ -71,6 +62,10 @@ def split_log():
         args['output_dir'] = args['log'].parent.parent.joinpath('chunks')
     args['output_dir'].mkdir(exist_ok = True, mode = 0o2770)
 
+    output_files_exist = len(list(args['output_dir'].glob('*.gz'))) > 0
+    if args['no_clobber'] and output_files_exist:
+        sys.exit('The output directory already contains split log files. Exiting')
+
     if args.get('batch'):
         submit_batch(**args)
     else:
diff --git a/src/rc_gpfs/cli/utils.py b/src/rc_gpfs/cli/utils.py
index 8115f44..18cb8fb 100644
--- a/src/rc_gpfs/cli/utils.py
+++ b/src/rc_gpfs/cli/utils.py
@@ -1,3 +1,4 @@
+import argparse
 import sys
 import os
 from pathlib import Path
@@ -18,4 +19,43 @@ def define_python_interpreter(python_path=None, conda_env=None):
         else:
             parent = Path(sys.executable).absolute().parent
             env =  venv_base.format(python_path=parent.joinpath('activate'))
-    return env
\ No newline at end of file
+    return env
+
+class CustomHelpFormatter(argparse.MetavarTypeHelpFormatter):
+    def add_arguments(self, actions):
+        # Sort actions by their group title
+        actions = sorted(actions, key=lambda x: x.container.title if x.container.title else '')
+        super(CustomHelpFormatter, self).add_arguments(actions)
+
+def batch_parser(
+        cpus_per_task: int | None = None,
+        gpus: int | None = None, 
+        partition: str | None = None, 
+        mem: str | None = None,
+        time: str | None = '12:00:00',
+        reservation: str | None = None,
+        slurm_log_dir: str | Path | None = './out',
+        **kwargs
+) -> argparse.ArgumentParser:
+    parser = argparse.ArgumentParser(add_help=False,
+                                     formatter_class=CustomHelpFormatter)
+    slurm = parser.add_argument_group(title='Slurm Options')
+    slurm.add_argument('--batch', action='store_true', default=False,
+                       help="Convert as a batch array job.")
+    slurm.add_argument('-n', '--ntasks', type=int, default=1)
+    slurm.add_argument('-c', '--cpus-per-task', type=int, default=cpus_per_task)
+    slurm.add_argument('-g', '--gpus', type=int, default=gpus, choices=[0, 1])
+    slurm.add_argument('-p', '--partition', type=str, default=partition)
+    slurm.add_argument('-t', '--time', type=str, default=time)
+    slurm.add_argument('-m', '--mem', type=str, default=mem)
+    slurm.add_argument('--reservation',type=str,default=reservation)
+    slurm.add_argument('--slurm-log-dir', type=Path, default=slurm_log_dir,
+                       help='Output log directory. If the directory does not exist, it will be created automatically.')
+    return parser
+
+def setup_slurm_logs(slurm_log_dir,log_basename):
+    slurm_log_dir = slurm_log_dir.absolute()
+    slurm_log_dir.mkdir(exist_ok = True,parents=True,mode = 0o2770)
+    out_log,err_log = [str(slurm_log_dir.joinpath(f'{log_basename}_%A_%a.out')),str(slurm_log_dir.joinpath(f'{log_basename}_%A_%a.err'))]
+    slurm_logs = {'output_log':out_log,'error_log':err_log}
+    return slurm_logs
\ No newline at end of file
diff --git a/src/rc_gpfs/compute/backend.py b/src/rc_gpfs/compute/backend.py
index 5f8022e..ba8a86f 100644
--- a/src/rc_gpfs/compute/backend.py
+++ b/src/rc_gpfs/compute/backend.py
@@ -5,7 +5,7 @@ from .utils import *
 from ..utils import parse_scontrol
 from typing import Literal
 
-__all__ = ['start_backend']
+__all__ = ['start_backend','start_local_cluster']
 
 # ENH: Add default parameters for cluster creation based on defined type and available resources. For instance, creating a LocalCluster should default to using all available CPUs and all available RAM.
 class DaskClusterManager:
diff --git a/src/rc_gpfs/policy/__init__.py b/src/rc_gpfs/policy/__init__.py
index 754c706..c24d1c3 100644
--- a/src/rc_gpfs/policy/__init__.py
+++ b/src/rc_gpfs/policy/__init__.py
@@ -1,2 +1,2 @@
 from .split import split,compress_logs
-from .convert import convert
\ No newline at end of file
+from .convert import convert, hivize
\ No newline at end of file
diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py
index a3f7ad9..1414580 100755
--- a/src/rc_gpfs/policy/convert.py
+++ b/src/rc_gpfs/policy/convert.py
@@ -1,10 +1,20 @@
-from urllib.parse import unquote
+import os
 import re
-import pandas as pd
 import gzip
+import random
+import string
+import shutil
 from pathlib import Path
-from .utils import as_path
+from typing import Literal
+from urllib.parse import unquote
+
+import pandas as pd
+import dask.dataframe as dd
+import dask.config
+
 from .policy_defs import SCHEMA
+from ..compute.backend import infer_cuda
+from ..utils import as_path
 
 def parse_line(line):
     try:
@@ -65,4 +75,67 @@ def convert(
     df = pd.DataFrame.from_dict(dicts).sort_values('tld')
     df = df.astype(SCHEMA)
 
-    df.to_parquet(output_path,engine = 'pyarrow')
\ No newline at end of file
+    df.to_parquet(output_path,engine = 'pyarrow')
+
+
+def hivize(
+        parquet_path: str | Path, 
+        hive_path: str | Path, 
+        tld: str | list[str] | None = None,
+        staging_path: str | Path | None = None,
+        partition_size: str = '100MiB', 
+        with_cuda: bool | Literal['infer'] = 'infer',
+        **kwargs
+    ) -> None:
+    parquet_path = as_path(parquet_path)
+    hive_path = as_path(hive_path)
+    
+    if staging_path is None:
+        rand_str = ''.join(random.choices(string.ascii_letters + string.digits, k=8))
+        staging_path = Path(os.getenv('TMPDIR')).joinpath(os.getenv('USER'),f'hive-{rand_str}')
+        print(f"INFO: Using {staging_path} as temporary directory",flush=True)
+    else:
+        staging_path = as_path(staging_path)
+
+    hive_path.mkdir(exist_ok=True,parents=True)
+    staging_path.mkdir(exist_ok=True,parents=True)
+
+    if with_cuda == 'infer':
+        with_cuda = infer_cuda()
+
+    if with_cuda:
+        import dask_cudf as backend
+        from dask_cudf.core import from_cudf as from_local
+        dask.config.set({'dataframe.backend':'cudf'})
+    else:
+        import dask as backend
+        from dask.dataframe import from_pandas as from_local
+        dask.config.set({'dataframe.backend':'pandas'})
+
+    def indexed_name(ind):
+        return f"indexed-{ind}.parquet"
+
+    if tld is not None:
+        if not isinstance(tld,list):
+            tld = [tld]
+        predicates = [('tld','in',tld)]
+    else:
+        predicates = None
+    print(f"DEBUG: Filtering predicates are: {predicates}",flush=True)
+
+    acq = re.search(r'[\d]{4}-[\d]{2}-[\d]{2}',str(parquet_path)).group(0)
+    print(f"DEBUG: Acquisition date is {acq}",flush=True)
+
+    # The flat parquet is initially read in via dask to avoid reading the full dataset into memory which happens even 
+    # when including predicates for filtering. The dask dataframe is converted to a regular dataframe to drastically 
+    # improve indexing and sorting by removing partitions. The sorted dataframe is converted back to a dask dataframe 
+    # to create partitions within the parquet dataset and write to multiple files defined by those partitions.
+    ddf = backend.read_parquet(parquet_path,filters=predicates,columns = ['size','kballoc','access','create','modify','uid','gid','path','tld'])
+    df = ddf.compute()
+    df = df.set_index('path').sort_index().assign(acq=acq)
+    ddf = from_local(df).repartition(partition_size=partition_size,force=True)
+    ddf.to_parquet(staging_path,partition_on=['tld','acq'],name_function = indexed_name)
+
+    shutil.copytree(staging_path,hive_path,dirs_exist_ok=True)
+    shutil.rmtree(staging_path)
+    pass
\ No newline at end of file
diff --git a/src/rc_gpfs/policy/split.py b/src/rc_gpfs/policy/split.py
index ac5fe69..1438a20 100644
--- a/src/rc_gpfs/policy/split.py
+++ b/src/rc_gpfs/policy/split.py
@@ -1,7 +1,6 @@
 from pathlib import Path
 import subprocess
-from .utils import as_path
-from ..utils import parse_scontrol
+from ..utils import parse_scontrol,as_path
 
 __all__ = ['split','compress_logs']
 
diff --git a/src/rc_gpfs/policy/utils.py b/src/rc_gpfs/policy/utils.py
index bd89f7d..e69de29 100644
--- a/src/rc_gpfs/policy/utils.py
+++ b/src/rc_gpfs/policy/utils.py
@@ -1,6 +0,0 @@
-from pathlib import Path
-
-def as_path(s: str | Path) -> Path:
-    if not isinstance(s,Path):
-        s = Path(s)
-    return s
\ No newline at end of file
diff --git a/src/rc_gpfs/utils.py b/src/rc_gpfs/utils.py
index 2c205b9..dc54e47 100644
--- a/src/rc_gpfs/utils.py
+++ b/src/rc_gpfs/utils.py
@@ -2,6 +2,7 @@ from typing import Literal
 import os
 import re
 import subprocess
+from pathlib import Path
 
 # ENH: if this package becomes merged with noctua, need to replace this function since it's copied directly from there
 def convert_si(value: str | float | int, 
@@ -75,4 +76,9 @@ def parse_scontrol():
     
     cores = int(cores)
     mem = convert_si(mem,to_unit='G',use_binary=True)
-    return [cores,mem]
\ No newline at end of file
+    return [cores,mem]
+
+def as_path(s: str | Path) -> Path:
+    if not isinstance(s,Path):
+        s = Path(s)
+    return s
\ No newline at end of file
-- 
GitLab