From be759db811d715fff30cfd0320b8273fb082579d Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Fri, 17 Jan 2025 15:00:55 -0600
Subject: [PATCH] Add policy comparison pipeline

---
 churn-analysis.ipynb                    | 269 ++++++++++++++++++++++++
 deps.yml                                |   6 +-
 example-job-scripts/calculate-churn.sh  |  38 ++++
 example-job-scripts/convert-logs.sh     |   2 +-
 example-job-scripts/convert-to-hive.sh  |   8 +-
 example-job-scripts/split-logs.sh       |   2 +-
 example-job-scripts/write-metadata.sh   |  39 ++++
 src/rc_gpfs/cli/convert_flat_to_hive.py |  12 +-
 src/rc_gpfs/db/__init__.py              |   2 +
 src/rc_gpfs/db/utils.py                 |  31 +++
 src/rc_gpfs/policy/convert.py           |  49 ++++-
 src/rc_gpfs/process/factory.py          |  90 +++++++-
 src/rc_gpfs/process/process.py          | 117 ++++++++++-
 13 files changed, 646 insertions(+), 19 deletions(-)
 create mode 100644 churn-analysis.ipynb
 create mode 100644 example-job-scripts/calculate-churn.sh
 create mode 100644 example-job-scripts/write-metadata.sh
 create mode 100644 src/rc_gpfs/db/__init__.py
 create mode 100644 src/rc_gpfs/db/utils.py

diff --git a/churn-analysis.ipynb b/churn-analysis.ipynb
new file mode 100644
index 0000000..17df93f
--- /dev/null
+++ b/churn-analysis.ipynb
@@ -0,0 +1,269 @@
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "execution_count": 1,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import sqlalchemy\n",
+    "import pandas as pd\n",
+    "import numpy as np\n",
+    "from pathlib import Path\n",
+    "import cudf"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 2,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "hive_dir = '/data/rc/gpfs-policy/data/gpfs-hive/data-project/'\n",
+    "db = Path('/data/rc/gpfs-policy/data/gpfs-hive/db/data-project.db')\n",
+    "engine = sqlalchemy.create_engine(f\"sqlite:///{db}\")"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 3,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "df = pd.read_sql(\"SELECT * FROM churn WHERE prior_log_dt >= '2024-11-14'\",engine)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 4,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "df['total_churn'] = df['created'] + df['deleted'] + df['modified']\n",
+    "df[['log_dt','prior_log_dt']] = df[['log_dt','prior_log_dt']].apply(lambda x: pd.to_datetime(x))\n",
+    "df['tld'] = df['tld'].astype('category')"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 5,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "tld_agg = df.groupby('tld',observed=True)['total_churn'].sum().sort_values(ascending=False)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 6,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "no_churn = tld_agg.loc[tld_agg.eq(0)].index"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 7,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "cdf = cudf.read_parquet(hive_dir,filters = [('tld','in',no_churn.to_list()),('acq','==','2025-01-15')],columns=['tld','size','kballoc'],categorical_partitions=True)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 8,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "cdf['tld'] = cdf['tld'].astype('category')"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 9,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "inactive_storage = cdf.groupby('tld',observed=True)[['size','kballoc']].sum()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "inactive_storage['kballoc'].divide(1024**3).sum()"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "### Plotting"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 11,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "active = df.loc[~df['tld'].isin(no_churn.to_list())].copy()\n",
+    "active['tld'] = active['tld'].cat.remove_unused_categories()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 12,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# order by total churn over the whole time period\n",
+    "order = active.groupby('tld',observed=True)['total_churn'].sum().sort_values(ascending=False).index.as_ordered()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 13,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# order by daily activity, percentage of days in the time period where at least one change was made\n",
+    "order = active.groupby('tld',observed=True)['total_churn'].apply(lambda x: x.ne(0).sum()).sort_values(ascending=False).index.as_ordered()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 14,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import plotly.graph_objects as go"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "fig = go.Figure(\n",
+    "    data = go.Heatmap(\n",
+    "        z = np.log10(active['total_churn']),\n",
+    "        y = active['log_dt'],\n",
+    "        x = active['tld'],\n",
+    "        xgap=1,\n",
+    "        colorscale='thermal',\n",
+    "        colorbar=dict(\n",
+    "            tickvals=np.arange(0,9),\n",
+    "            ticktext=[str(10**d) for d in np.arange(0,9)],\n",
+    "            tickfont=dict(\n",
+    "                size = 14\n",
+    "            ),\n",
+    "            title=dict(\n",
+    "                text='Churn (files altered)',\n",
+    "                font=dict(\n",
+    "                    size = 16\n",
+    "                )\n",
+    "            )\n",
+    "        ),\n",
+    "        hovertemplate='Dir: %{x}<br>Date: %{y}<br>Churn: %{customdata}<extra></extra>',\n",
+    "        customdata=active['total_churn']\n",
+    "    )\n",
+    ")"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 163,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "fig = fig.update_layout(\n",
+    "    template = 'plotly_white',\n",
+    "    height = 1000,\n",
+    "    width = 2000,\n",
+    "    title_text = 'Time Course of Total Churn For Project Directories Over 2 Months',\n",
+    "    title_x = 0.5,\n",
+    "    title_xanchor = 'center',\n",
+    "    title_font_size = 30,\n",
+    "\n",
+    "    xaxis = dict(\n",
+    "        title = dict(\n",
+    "            text = 'Directory Name',\n",
+    "            font_size = 20\n",
+    "        ),\n",
+    "        gridwidth = 2,\n",
+    "        showgrid = True,\n",
+    "        gridcolor='black'\n",
+    "    ),\n",
+    "    \n",
+    "    yaxis = dict(\n",
+    "        showgrid = False,\n",
+    "        title = dict(\n",
+    "            text = 'Policy Run Date',\n",
+    "            font_size = 20\n",
+    "        ),\n",
+    "        gridcolor = 'black',\n",
+    "    ),\n",
+    "    \n",
+    "    coloraxis_colorbar=dict(\n",
+    "        title=\"Raw Values\",  # Change the title of the z-axis\n",
+    "        titlefont=dict(size=20)  # Increase the font size\n",
+    "    ),\n",
+    "\n",
+    "    margin=dict(t=100, b=20, l=40, r=40)\n",
+    ")\n",
+    "\n",
+    "fig = fig.update_xaxes(\n",
+    "    categoryorder='array',\n",
+    "    categoryarray=order,\n",
+    "    tickfont={'size':14},\n",
+    "    ticklabelshift = 3,\n",
+    "    tickson = 'boundaries',\n",
+    "    gridwidth=2\n",
+    ")\n",
+    "\n",
+    "fig = fig.update_yaxes(\n",
+    "    tickfont={'size':16},\n",
+    "    tickformat = \"%Y-%m-%d\",\n",
+    "    tick0 = '2024-11-15',\n",
+    "    ticklabelstep=2,\n",
+    ")"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "fig.show()"
+   ]
+  }
+ ],
+ "metadata": {
+  "kernelspec": {
+   "display_name": "Python 3",
+   "language": "python",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 3
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython3",
+   "version": "3.11.11"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}
diff --git a/deps.yml b/deps.yml
index d7f0098..b27a195 100644
--- a/deps.yml
+++ b/deps.yml
@@ -74,8 +74,10 @@ dependencies:
   - fmt=11.0.2=h434a139_0
   - freetype=2.12.1=h267a509_2
   - fsspec=2024.10.0=pyhd8ed1ab_1
+  - greenlet=3.1.1=py311hfdbb021_1
   - gflags=2.2.2=h5888daf_1005
   - glog=0.7.1=hbabe93e_0
+  - gputil=1.4.0=pyhd8ed1ab_1
   - h2=4.1.0=pyhd8ed1ab_1
   - hpack=4.0.0=pyhd8ed1ab_1
   - hyperframe=6.0.1=pyhd8ed1ab_1
@@ -171,7 +173,7 @@ dependencies:
   - nvcomp=4.0.1=hbc370b7_0
   - nvtx=0.2.10=py311h9ecbd09_2
   - openjpeg=2.5.3=h5fbd93e_0
-  - openssl=3.4.0=hb9d3cd8_0
+  - openssl=3.4.0=h7b32b05_1
   - orc=2.0.3=h97ab989_1
   - packaging=24.2=pyhd8ed1ab_2
   - pandas=2.2.2=py311h14de704_1
@@ -214,6 +216,8 @@ dependencies:
   - snappy=1.2.1=h8bd8927_1
   - sortedcontainers=2.4.0=pyhd8ed1ab_0
   - spdlog=1.14.1=hed91bc2_1
+  - sqlalchemy=2.0.36=py311h9ecbd09_0
+  - sqlite=3.47.2=h9eae976_0
   - stack_data=0.6.3=pyhd8ed1ab_1
   - tblib=3.0.0=pyhd8ed1ab_1
   - tenacity=9.0.0=pyhd8ed1ab_1
diff --git a/example-job-scripts/calculate-churn.sh b/example-job-scripts/calculate-churn.sh
new file mode 100644
index 0000000..56858a8
--- /dev/null
+++ b/example-job-scripts/calculate-churn.sh
@@ -0,0 +1,38 @@
+#! /bin/bash
+#
+#SBATCH --job-name=calculate-churn
+#SBATCH --ntasks=1
+#SBATCH --cpus-per-task=16
+#SBATCH --mem=90G
+#SBATCH --partition=amperenodes-reserve
+#SBATCH --time=12:00:00
+#SBATCH --reservation=rc-gpfs
+#SBATCH --gres=gpu:1
+#SBATCH --output=out/churn-%A-%a.out
+#SBATCH --error=out/churn-%A-%a.err
+#SBATCH --array=1-162
+
+module load Anaconda3
+conda activate gpfs-dev
+
+hive="/data/rc/gpfs-policy/data/gpfs-hive/data-project/"
+tlds=($(find ${hive} -name "tld=*" -type d | sed -n "s/.*tld=//p"))
+tld=${tlds[${SLURM_ARRAY_TASK_ID}]}
+
+echo "TLD: ${tld}"
+
+python << END
+from pathlib import Path
+from rc_gpfs.process.process import calculate_churn
+
+hive_path = Path("${hive}")
+tld = "${tld}"
+
+acqs = hive_path.joinpath(f"tld={tld}").glob("acq=*")
+acq_dates = [p.name.removeprefix("acq=") for p in acqs]
+
+with_cuda=True
+
+churn = calculate_churn(hive_path,tld,acq_dates,with_cuda)
+
+END
diff --git a/example-job-scripts/convert-logs.sh b/example-job-scripts/convert-logs.sh
index f503652..7ce3c09 100644
--- a/example-job-scripts/convert-logs.sh
+++ b/example-job-scripts/convert-logs.sh
@@ -11,7 +11,7 @@
 #SBATCH --array=0-49
 
 module load Anaconda3
-conda activate gpfs-dev
+conda activate gpfs
 
 logs=($(find /data/rc/gpfs-policy/data -path "*/list-policy_data-project_list-path-external_slurm-*/chunks"))
 log=${logs[${SLURM_ARRAY_TASK_ID}]}
diff --git a/example-job-scripts/convert-to-hive.sh b/example-job-scripts/convert-to-hive.sh
index 6178f1f..14acda9 100644
--- a/example-job-scripts/convert-to-hive.sh
+++ b/example-job-scripts/convert-to-hive.sh
@@ -18,4 +18,10 @@ conda activate gpfs-dev
 parquets=($(find /data/rc/gpfs-policy/data -path "*/list-policy_data-project_list-path-external_slurm-*/parquet"))
 pq=${parquets[${SLURM_ARRAY_TASK_ID}]}
 
-convert-to-hive --batch ${pq} /scratch/mdefende/project-hive
+convert-to-hive --batch \
+    --reservation=rc-gpfs \
+    --partition=amperenodes-reserve \
+    --mem=120G \
+    --grp-file "${pq}/../misc/tld_grps.txt" \
+    ${pq} \
+    /scratch/mdefende/project-hive
diff --git a/example-job-scripts/split-logs.sh b/example-job-scripts/split-logs.sh
index 137fcd3..8ac746f 100644
--- a/example-job-scripts/split-logs.sh
+++ b/example-job-scripts/split-logs.sh
@@ -11,7 +11,7 @@
 #SBATCH --array=0-49
 
 module load Anaconda3
-conda activate gpfs-dev
+conda activate gpfs
 
 logs=($(find /data/rc/gpfs-policy/data -path "*/list-policy_data-project_list-path-external_slurm-*/raw/*.gz"))
 log=${logs[${SLURM_ARRAY_TASK_ID}]}
diff --git a/example-job-scripts/write-metadata.sh b/example-job-scripts/write-metadata.sh
new file mode 100644
index 0000000..cc87d0d
--- /dev/null
+++ b/example-job-scripts/write-metadata.sh
@@ -0,0 +1,39 @@
+#! /bin/bash
+#
+#SBATCH --job-name=write-meta
+#SBATCH --ntasks=1
+#SBATCH --cpus-per-task=16
+#SBATCH --mem=90G
+#SBATCH --partition=amperenodes-reserve
+#SBATCH --time=12:00:00
+#SBATCH --reservation=rc-gpfs
+#SBATCH --gres=gpu:1
+#SBATCH --output=out/metadata-%A-%a.out
+#SBATCH --error=out/metadata-%A-%a.err
+#SBATCH --array=0-162
+
+module load Anaconda3
+conda activate gpfs-dev
+
+tlds=($(find /data/rc/gpfs-policy/data/gpfs-hive/data-project/ -name "tld=*" -type d))
+tld=${tlds[${SLURM_ARRAY_TASK_ID}]}
+
+python << END
+import cudf 
+from rc_gpfs.policy.convert import write_dataset_metadata
+import rmm
+
+from pathlib import Path
+
+rmm.reinitialize(
+    pool_allocator=True,
+    managed_memory=True,
+    initial_pool_size='70GiB'
+)
+
+acqs = Path("${tld}").glob("acq=*")
+for i in acqs:
+    df = cudf.read_parquet(i)
+    write_dataset_metadata(df,i)
+    del df
+END
diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py
index f0e169f..a10eee0 100644
--- a/src/rc_gpfs/cli/convert_flat_to_hive.py
+++ b/src/rc_gpfs/cli/convert_flat_to_hive.py
@@ -49,9 +49,7 @@ def parse_args():
     args = parser.parse_args()
     return vars(args)
 
-BATCH_SCRIPT = """\
-#!/bin/bash
-#
+SLURM_OPTS = """\
 #SBATCH --job-name=hivize
 #SBATCH --ntasks={ntasks}
 #SBATCH --cpus-per-task={cpus_per_task}
@@ -62,7 +60,9 @@ BATCH_SCRIPT = """\
 #SBATCH --output={output_log}
 #SBATCH --error={error_log}
 #SBATCH --array=1-{ngroups}
+"""
 
+BATCH_CMDS = """\
 {env_cmd}
 
 tld=$(sed -n "${{SLURM_ARRAY_TASK_ID}}p" {grp_file})
@@ -77,7 +77,11 @@ def submit_batch(**kwargs):
     slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'hive')
     kwargs.update(slurm_logs)
     
-    script = BATCH_SCRIPT.format(**kwargs)
+    slurm_opts = SLURM_OPTS.format(**kwargs)
+    if kwargs.get('reservation') is not None:
+        slurm_opts = f"{slurm_opts}#SBATCH --reservation={kwargs.get('reservation')}"
+
+    script = f"#!/bin/bash\n#\n{slurm_opts}\n{BATCH_CMDS.format(**kwargs)}"
 
     subprocess.run(['sbatch'],input=script,shell=True,text=True)
     pass
diff --git a/src/rc_gpfs/db/__init__.py b/src/rc_gpfs/db/__init__.py
new file mode 100644
index 0000000..1cd63b8
--- /dev/null
+++ b/src/rc_gpfs/db/__init__.py
@@ -0,0 +1,2 @@
+
+from .utils import create_dbs, df_to_sql
\ No newline at end of file
diff --git a/src/rc_gpfs/db/utils.py b/src/rc_gpfs/db/utils.py
new file mode 100644
index 0000000..52f75b9
--- /dev/null
+++ b/src/rc_gpfs/db/utils.py
@@ -0,0 +1,31 @@
+import pandas as pd
+from pathlib import Path
+from sqlalchemy import create_engine, text
+
+from ..utils import as_path
+
+__all__= ['create_dbs','df_to_sql']
+
+CHURN_TBL_DEFINITION = """\
+tld TEXT,
+log_dt INTEGER,
+created INTEGER,
+deleted INTEGER,
+modified INTEGER,
+prior_log_dt INTEGER,
+PRIMARY KEY (tld, log_dt)
+"""
+
+def create_dbs(db_dir,device):
+    db_dir = as_path(db_dir)
+    db_dir.mkdir(exist_ok=True,parents=True)
+    db_path = db_dir.joinpath(f"{device}.db")
+    engine = create_engine(f"sqlite:///{db_path}")
+    with engine.connect() as conn:
+        conn.execute(text(f"CREATE TABLE IF NOT EXISTS churn ({CHURN_TBL_DEFINITION})"))
+        conn.execute(text("CREATE INDEX IF NOT EXISTS churn_idx ON churn (tld, log_dt)"))
+
+def df_to_sql(df: pd.DataFrame, path: Path | str, table: str):
+    path = as_path(path)
+    engine = create_engine(f"sqlite:///{path}")
+    df.to_sql(table,engine,if_exists='append',index=False)
\ No newline at end of file
diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py
index 1f2788b..16f1c2d 100755
--- a/src/rc_gpfs/policy/convert.py
+++ b/src/rc_gpfs/policy/convert.py
@@ -1,16 +1,23 @@
 import os
 import re
 import gzip
+import json
 import random
 import string
 import shutil
 from pathlib import Path
 from typing import Literal, List
 from urllib.parse import unquote
+import GPUtil
 
+import cudf
 import pandas as pd
 import dask.dataframe as dd
 import dask.config
+import rmm
+
+import pyarrow as pa
+import pyarrow.parquet as pq
 
 from .policy_defs import SCHEMA
 from ..compute.backend import infer_cuda
@@ -85,6 +92,7 @@ def hivize(
         staging_path: str | Path | None = None,
         partition_size: str = '100MiB', 
         with_cuda: bool | Literal['infer'] = 'infer',
+        write_metadata: bool = True,
         **kwargs
     ) -> None:
     parquet_path = as_path(parquet_path)
@@ -107,6 +115,11 @@ def hivize(
         import dask_cudf as backend
         from dask_cudf.core import from_cudf as from_local
         dask.config.set({'dataframe.backend':'cudf'})
+        rmm.reinitialize(
+            pool_allocator=True,
+            managed_memory=True,
+            initial_pool_size='60GiB'
+        )
     else:
         import dask as backend
         from dask.dataframe import from_pandas as from_local
@@ -136,6 +149,40 @@ def hivize(
     ddf = from_local(df).repartition(partition_size=partition_size,force=True)
     ddf.to_parquet(staging_path,partition_on=['tld','acq'],name_function = indexed_name)
 
+    if write_metadata:
+        (
+            df
+            .groupby(['tld','acq'])
+            .apply(lambda x: 
+                   write_dataset_metadata(
+                       x, 
+                       staging_path.joinpath(f"tld={x['tld'].iloc[0]}",f"acq={x['acq'].iloc[0]}")
+                    )
+                )
+        )
+
     shutil.copytree(staging_path,hive_path,dirs_exist_ok=True)
     shutil.rmtree(staging_path)
-    pass
\ No newline at end of file
+    pass
+
+def write_dataset_metadata(df: cudf.DataFrame | pd.DataFrame, parquet_path: str | Path) -> dict:
+    parquet_path = as_path(parquet_path)
+    if isinstance(df,cudf.DataFrame):
+        vram_usage = df.memory_usage(deep=True).to_dict()
+        ram_usage  = df.to_pandas().memory_usage(deep=True).to_dict()
+    else:
+        if len(GPUtil.getAvailable()) > 0:
+            vram_usage = cudf.from_pandas(df).memory_usage(deep=True).to_dict()
+        else:
+            vram_usage = 'NA'
+        ram_usage = df.memory_usage(deep=True).to_dict()
+    
+    metadata = {
+        'num_rows': df.shape[0],
+        'num_columns': df.shape[1],
+        'vram_usage': vram_usage,
+        'ram_usage': ram_usage
+    }
+
+    with open(parquet_path.joinpath('_metadata.json'),'w') as f:
+        json.dump(metadata,f)
\ No newline at end of file
diff --git a/src/rc_gpfs/process/factory.py b/src/rc_gpfs/process/factory.py
index c608f82..b4d0767 100644
--- a/src/rc_gpfs/process/factory.py
+++ b/src/rc_gpfs/process/factory.py
@@ -1,7 +1,10 @@
 import cudf
+import cupy
 import pandas as pd
+import numpy as np
 import dask.dataframe as dd
 import dask_cudf
+import rmm
 from .utils import as_timedelta
 from typing import Literal, List
 from typeguard import typechecked
@@ -81,7 +84,7 @@ class PandasAggregator(Aggregator):
 
     def aggregate(
         self,
-        df: cudf.DataFrame,
+        df: pd.DataFrame,
         col: str | List[str],
         grps: str | List[str],
         funcs: str | List[str]
@@ -95,6 +98,45 @@ class PandasAggregator(Aggregator):
         )
         return df_agg
 
+    def calculate_churn(df1,df2):
+        df = pd.concat([df1,df2])
+        df['acq'] = cudf.to_datetime(df['acq'].astype('str'))
+        diff = (
+            df
+            .reset_index()
+            .drop_duplicates(subset=[c for c in df.columns if c != 'acq'],keep=False)
+            .set_index('path')
+        )
+        agg = diff.groupby(level=0)['acq'].agg(['count','max'])
+
+        dates = diff['acq'].unique()
+
+        # When using duplicates to find if files change, there are 4 possible options for any file:
+        # 1. 2 records with different modification times: 'modified'
+        # 2. 1 record with `acq` from the older GPFS log: 'deleted'
+        # 3. 1 record with `acq` from the newer GPFS log: 'created'
+        # 4. 0 records: 'unchanged' and ignored
+        conditions = [agg['count'] == 2, agg['max'] == dates.min(), agg['max'] == dates.max()]
+        choices = ['modified','deleted','created']
+
+        agg['type'] = np.select(conditions,choices,default=np.array(np.nan, dtype='object'))
+        agg['type'] = agg['type'].astype('category').cat.set_categories([choices])
+
+        if agg.empty:
+            ser = pd.Series(data = [0,0,0], name='count')
+
+            # The index needs to be specified in exactly this manner to match how type is being converted to a 
+            # categorical above. This essentially empty series needs to have the exact same structure as a non-empty 
+            # series
+            ser.index = pd.CategoricalIndex(
+                data=['created', 'deleted', 'modified'],
+                categories=['modified','deleted','created'],
+                ordered=False,
+                name='type')
+            return ser
+
+        return agg['type'].value_counts()
+
     
 class CUDFAggregator(Aggregator):
     def __init__(self):
@@ -122,6 +164,52 @@ class CUDFAggregator(Aggregator):
             .reset_index()
         )
         return df_agg
+    
+    def create_memory_pool(self,size,**kwargs):
+        pool_allocator = kwargs.pop('pool_allocator',True)
+        managed_memory = kwargs.pop('managed_memory',True)
+        
+        rmm.reinitialize(
+            pool_allocator=pool_allocator,
+            managed_memory=managed_memory,
+            initial_pool_size=size,
+            **kwargs
+        )
+
+    def calculate_churn(self,df1,df2):
+        df = cudf.concat([df1,df2])
+        #df['acq'] = cudf.to_datetime(df['acq'].astype('str'))
+        diff = (
+            df
+            .reset_index()
+            .drop_duplicates(subset=[c for c in df.columns if c != 'acq'],keep=False)
+            .set_index('path')
+        )
+        agg = diff.groupby(level=0)['acq'].agg(['count','max'])
+
+        dates = np.unique(df['acq'].to_numpy())
+        conditions = [agg['count'] == 2, agg['max'] == dates.min(), agg['max'] == dates.max()]
+        choices = cupy.array([0,1,2])
+
+        agg['type'] = cupy.select(conditions,choices)
+        agg['type'] = agg['type'].map({0:'modified',1:'deleted',2:'created'})
+        agg['type'] = agg['type'].astype('category').cat.set_categories(['modified','deleted','created'])
+
+        if agg.empty:
+            ser = cudf.Series(data = [0,0,0], name='count')
+
+            # The index needs to be specified in exactly this manner to match how type is being converted to a 
+            # categorical above. This essentially empty series needs to have the exact same structure as a non-empty 
+            # series
+            ser.index = cudf.CategoricalIndex(
+                data=['created', 'deleted', 'modified'],
+                categories=['modified','deleted','created'],
+                ordered=False)
+            return ser
+        else:
+            counts = agg['type'].value_counts()
+            counts.index.name = None
+            return counts
 
 
 class DaskAggregator(Aggregator):
diff --git a/src/rc_gpfs/process/process.py b/src/rc_gpfs/process/process.py
index 2de789f..390defe 100644
--- a/src/rc_gpfs/process/process.py
+++ b/src/rc_gpfs/process/process.py
@@ -1,16 +1,18 @@
 from pathlib import Path
 import pandas as pd
+import numpy as np
 from ..compute import start_backend
+from ..utils import as_path
 from .utils import extract_run_date_from_filename
+from ..db.utils import df_to_sql, create_dbs
 from .factory import get_aggregator
 from typing import Literal, List
 from typeguard import typechecked
 
-__all__ = ['aggregate_gpfs_dataset']
+__all__ = ['aggregate_gpfs_dataset','calculate_churn']
 
 def _check_dataset_path(dataset_path) -> Path:
-    if not isinstance(dataset_path,Path):
-        dataset_path = Path(dataset_path)
+    dataset_path = as_path(dataset_path)
 
     if dataset_path.is_file():
         print(f"INFO: Found 1 file ({dataset_path})")
@@ -52,10 +54,13 @@ def _check_timedelta_values(vals,unit):
 # backend is beyond the scope of the process module and this function specifically. Can wrap this with the backend 
 # creation in a CLI command later for convenience, but that shouldn't be baked in
 
+# ENH: Make the grouping more generic as opposed to only working for time delta ranges. Splitting files by size and 
+# kballoc is also important in some circumstances
+
 @typechecked
 def aggregate_gpfs_dataset(
     dataset_path: str | Path,
-    run_date: pd.Timestamp | None = None,
+    acq_date: pd.Timestamp | None = None,
     delta_vals: int | List[int] | None = None,
     delta_unit: Literal['D','W','M','Y'] | None = None,
     time_val: Literal['access','modify','create'] = 'access',
@@ -76,8 +81,8 @@ def aggregate_gpfs_dataset(
     report_path = _check_report_paths(report_dir,report_name,parent_path)
     _check_timedelta_values(delta_vals,delta_unit)
 
-    if run_date is None:
-        run_date = extract_run_date_from_filename(dataset_path)
+    if acq_date is None:
+        acq_date = extract_run_date_from_filename(dataset_path)
 
     manager,backend = start_backend(
         dataset_path=dataset_path,
@@ -92,7 +97,7 @@ def aggregate_gpfs_dataset(
         df = aggregator.read_parquet(dataset_path)
         grps = ['tld']
         if delta_vals is not None:
-            cutoffs = aggregator.create_timedelta_cutoffs(delta_vals,delta_unit,run_date)
+            cutoffs = aggregator.create_timedelta_cutoffs(delta_vals,delta_unit,acq_date)
             labels  = aggregator.create_timedelta_labels(delta_vals,delta_unit)
             df['dt_grp'] = aggregator.cut_dt(df[time_val],cutoffs,labels)
             grps.append('dt_grp')
@@ -104,6 +109,100 @@ def aggregate_gpfs_dataset(
         df_agg.to_parquet(report_path)
     finally:
         if manager is not None:
-            manager.close()
+            manager.shutdown()
+    
+@typechecked
+def calculate_churn(
+    hive_path: str | Path,
+    tld: str,
+    acq_dates: List[ pd.Timestamp | np.datetime64 | str ],
+    with_cuda: Literal['infer'] | bool = 'infer',
+    write_db: bool | Path | str = True,
+    **kwargs
+) -> pd.DataFrame:
+    
+    ## Input checking
+    hive_path = as_path(hive_path)
+    dataset_path = hive_path.joinpath(f"tld={tld}")
+    acq_dirs = [np.datetime64(d.name.removeprefix('acq=')) for d in dataset_path.glob("acq=*")]
+
+    # Conversion of datetimes to np.datetime64 provides compatibility for either cudf or pandas dataframe backends.
+    # At the same time, remove any datetimes for which the given tld does not have data.
+    acq_dates = [np.datetime64(d) for d in acq_dates if np.datetime64(d) in acq_dirs]
+    acq_dates.sort()
+
+    if len(acq_dates) <= 1:
+        raise ValueError(f"Fewer than two given policy acquisition dates contained data for {tld} in {hive_path}.")
     
-    return None
\ No newline at end of file
+    manager,backend = start_backend(
+        dataset_path=dataset_path,
+        with_cuda=with_cuda,
+        with_dask=False,
+        **kwargs
+    )
+
+    try:
+        churn_l = []
+        aggregator = get_aggregator(backend)
+        
+        if backend == 'cudf':
+            pool_size = kwargs.pop('pool_size',70*(1024**3)) # 70 GiB as default
+            aggregator.create_memory_pool(pool_size,**kwargs)
+        
+        df_init = (
+            aggregator
+            .read_parquet(
+                dataset_path.joinpath(f"acq={np.datetime_as_string(acq_dates[0],'D')}"),
+                columns=['modify']
+            )
+        )
+        df_init['acq'] = acq_dates[0]
+        
+        for i in range(1,len(acq_dates)):
+            df_target = (
+                aggregator
+                .read_parquet(
+                    dataset_path.joinpath(f"acq={np.datetime_as_string(acq_dates[i],'D')}"),
+                    columns=['modify']
+                )
+            )
+            df_target['acq'] = acq_dates[i]
+            churn = (
+                aggregator
+                .calculate_churn(df_init,df_target)
+                .to_pandas()
+                .to_frame()
+                .T
+                .assign(
+                    log_dt = df_target['acq'].iloc[0],
+                    prior_log_dt = df_init['acq'].iloc[0],
+                    tld = tld)
+            )
+            churn_l.append(churn)
+
+            # This delete pattern paired with the loop creates a type of rotating list where each dataframe, aside from 
+            # the initial and final, is processed as the target and the source for which files exist at a given time. 
+            # The target is then referred to as the source as we move through the time series. Each source is removed 
+            # from memory. This both limits the amount of memory used to only two datasets at a time while also 
+            # only reading each dataset once.
+            del df_init
+            df_init = df_target
+            del df_target
+            
+        churn_df = pd.concat(churn_l).fillna(0).reset_index(drop=True)
+
+        if isinstance(write_db,(str,Path)):
+            db = as_path(write_db)
+            db.parent.mkdir(exist_ok=True,parents=True)
+            df_to_sql(churn_df,db,'churn')
+        elif write_db is True:
+            db_name = hive_path.name
+            db_path = hive_path.parent.joinpath('db')
+            db_path.mkdir(parents=True,exist_ok=True)
+            db = db_path.joinpath(f"{db_name}.db")
+            df_to_sql(churn_df,db,'churn')
+        
+        return churn_df
+    finally:
+        if manager is not None:
+            manager.shutdown()
\ No newline at end of file
-- 
GitLab