From b9c64f28692454e0a92b85b3632026bd324deb57 Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Thu, 29 May 2025 11:07:12 -0500
Subject: [PATCH] Add CLI to calculate churn

---
 example-job-scripts/30-calculate-churn.sh |  46 +---
 pyproject.toml                            |   1 +
 src/rc_gpfs/cli/__init__.py               |   3 +-
 src/rc_gpfs/cli/calculate_churn_cli.py    | 254 ++++++++++++++++++++++
 4 files changed, 264 insertions(+), 40 deletions(-)
 create mode 100644 src/rc_gpfs/cli/calculate_churn_cli.py

diff --git a/example-job-scripts/30-calculate-churn.sh b/example-job-scripts/30-calculate-churn.sh
index d2225fc..486b9dd 100644
--- a/example-job-scripts/30-calculate-churn.sh
+++ b/example-job-scripts/30-calculate-churn.sh
@@ -4,48 +4,16 @@
 #SBATCH --ntasks=1
 #SBATCH --cpus-per-task=16
 #SBATCH --mem=32G
-#SBATCH --partition=amperenodes-reserve
-#SBATCH --time=12:00:00
-#SBATCH --reservation=rc-gpfs
-#SBATCH --output=out/churn-%A-%a.out
-#SBATCH --error=out/churn-%A-%a.err
-#SBATCH --array=0-177
+#SBATCH --partition=express,amd-hdr100
+#SBATCH --time=02:00:00
+#SBATCH --output=out/churn-setup-%j.out
+#SBATCH --error=out/churn-setup-%j.err
 
 module load Anaconda3
 conda activate gpfs
 
 hive="/data/rc/gpfs-policy/data/gpfs-hive/data-project/"
-tlds=($(find ${hive} -name "tld=*" -type d | sed -n "s/.*tld=//p"))
-tld=${tlds[${SLURM_ARRAY_TASK_ID}]}
+start_date=""
+tlds=""
 
-echo "TLD: ${tld}"
-
-python << END
-from pathlib import Path
-import numpy as np
-import time
-from rc_gpfs.db.utils import df_to_sql
-from rc_gpfs.process import calculate_churn
-
-hive_path = Path("${hive}")
-tld = "${tld}"
-
-db = hive_path.parent.joinpath('db','data-project.db')
-
-start_date = '2024-05-08'
-end_date = '2025-05-10'
-
-acq_dates = list(np.arange(np.datetime64(start_date), np.datetime64(end_date), step=1))
-
-churn = calculate_churn(hive_path,tld,acq_dates)
-
-finished = False
-
-while not finished:
-    try:
-        df_to_sql(churn, db, 'churn')
-        finished=True
-    except:
-        time.sleep(1)
-
-END
+calculate_churn --batch ${tlds:+ -t $tlds} ${start_date:+ -s ${start_date}} ${hive}
\ No newline at end of file
diff --git a/pyproject.toml b/pyproject.toml
index f56d1ee..ef65f7a 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -19,6 +19,7 @@ dynamic = ["version","dependencies","classifiers"]
 repository = "https://gitlab.rc.uab.edu/rc/gpfs-policy"
 
 [project.scripts]
+calculate-churn = "rc_gpfs.cli:calculate_churn_cli"
 convert-to-hive = "rc_gpfs.cli.convert_flat_to_hive:convert_flat_to_hive"
 convert-to-parquet = "rc_gpfs.cli.convert_to_parquet:convert_to_parquet"
 split-log = "rc_gpfs.cli.split_log:split_log"
diff --git a/src/rc_gpfs/cli/__init__.py b/src/rc_gpfs/cli/__init__.py
index 53cb54e..270eac1 100644
--- a/src/rc_gpfs/cli/__init__.py
+++ b/src/rc_gpfs/cli/__init__.py
@@ -1,6 +1,7 @@
+from .calculate_churn_cli import calculate_churn_cli
 from .convert_flat_to_hive import convert_flat_to_hive
 from .convert_to_parquet import convert_to_parquet
 from .fparq_cli import fparq_cli
 from .split_log import split_log
 
-__all__ = [convert_flat_to_hive, convert_to_parquet, fparq_cli, split_log]
\ No newline at end of file
+__all__ = [calculate_churn_cli, convert_flat_to_hive, convert_to_parquet, fparq_cli, split_log]
\ No newline at end of file
diff --git a/src/rc_gpfs/cli/calculate_churn_cli.py b/src/rc_gpfs/cli/calculate_churn_cli.py
new file mode 100644
index 0000000..75f034c
--- /dev/null
+++ b/src/rc_gpfs/cli/calculate_churn_cli.py
@@ -0,0 +1,254 @@
+import argparse
+import subprocess
+from pathlib import Path
+import time
+import polars as pl
+from sqlalchemy import create_engine, text
+
+from rc_gpfs.process import calculate_churn
+from rc_gpfs.db.utils import df_to_sql, create_churn_db
+from .utils import batch_parser, define_python_interpreter, setup_slurm_logs
+
+__all__ = ['calculate_churn_cli']
+
+DESCRIPTION = """
+Calculates the rate of churn of files in a GPFS dataset per top-level directory (tld) over time. Churn is measured as the the number and size of files which were created, deleted, and modified from one GPFS policy run to another. The number of files accessed but not modified is also reported but not included in any compsite churn values. 
+
+This operates on a hive parquet dataset partitioned on tld and acquisition date. Output is written to a local SQLite database sorted on tld, the acquisition date for the original log, and the acquisition date for the more recent, comparison log. All sizes are reported as bytes from the "size" column in the log file.
+
+This command supports batch processing with the --batch option. When used, a Slurm array job will process each passed or found tld in its own array task.
+"""
+
+def parse_args(cli_args=None):
+    parser = argparse.ArgumentParser(
+        description=DESCRIPTION,
+        formatter_class=argparse.RawTextHelpFormatter,
+        parents=[
+            batch_parser(
+                cpus_per_task=16, mem='16G', gpus=0, partition="amd-hdr100", time="12:00:00"
+            )
+        ],
+    )
+    parser.add_argument(
+        "hive_path",
+        type=Path,
+        help="Parent directory for the hive.",
+    )
+    parser.add_argument(
+        "--tld",
+        type=str,
+        nargs='*',
+        action='extend',
+        default = None,
+        help="List of tld to convert to hive. If not specified, all unique tld values in the dataset will be processed.",
+    )
+    parser.add_argument(
+        '-s','--start-date',
+        required=True,
+        type=str,
+        help="Date to start churn calculation from. This will be the first original log date and so will not show as a target date in the output. If neither an end date nor a number of acquisitions are specified, all acquisitions after the start date are included."
+    )
+    date_determinants = parser.add_mutually_exclusive_group()
+    date_determinants.add_argument(
+        '-e','--end-date',
+        type=str,
+        default=None,
+        help="Date to end churn calculation on. All date found between start and end will be used. Cannot be specified with --num-acqs"
+    )
+    date_determinants.add_argument(
+        '--num-acqs',
+        type=int,
+        default=None,
+        help="Number of GPFS log acquisitions to calculate churn across starting from --start-date. Cannot be used with --end-date" 
+    )
+    parser.add_argument(
+        '-d','--db-path',
+        type=Path,
+        default=None,
+        help="Path to database to store churn values in. If not specified, will default to hive_path/../db/<hive_name>.db, where the name of the database file matches the name of the hive_path directory. Ex. if the hive_path is '/data/project/gpfs/hive', the default db path would be /data/project/gpfs/db/hive.db"
+    )
+    parser.add_argument(
+        '-r','--retries',
+        type=int,
+        default=10,
+        help="Number of times to retry saving churn data to database. If limit is reached, job will exit with error"
+    )
+
+    args = parser.parse_args(cli_args)
+    return vars(args)
+
+SLURM_OPTS = """\
+#SBATCH --job-name=churn
+#SBATCH --ntasks={ntasks}
+#SBATCH --cpus-per-task={cpus_per_task}
+#SBATCH --partition={partition}
+#SBATCH --time={time}
+#SBATCH --mem={mem}
+#SBATCH --output={output_log}
+#SBATCH --error={error_log}
+#SBATCH --array=1-{ntlds}
+"""
+
+BATCH_CMDS = """\
+{env_cmd}
+
+all_tlds="{tld_str}"
+
+tld=$(echo ${{all_tlds}} | tr ',' '\\n' | sed -n "${{SLURM_ARRAY_TASK_ID}}p")
+
+calculate-churn --tld=${{tld}} -s {start_date} {end_opt} -d {db_path} {hive_path}
+"""
+
+def submit_batch(**kwargs):
+    env_cmd = define_python_interpreter(
+        kwargs.get("python_path"), kwargs.get("conda_env")
+    )
+    kwargs.update({"env_cmd": env_cmd})
+
+    slurm_logs = setup_slurm_logs(kwargs.get("slurm_log_dir"), "churn")
+    kwargs.update(slurm_logs)
+
+    slurm_opts = SLURM_OPTS.format(**kwargs,**kwargs['opt_strs'])
+    if kwargs.get("reservation") is not None:
+        slurm_opts = f"{slurm_opts}#SBATCH --reservation={kwargs.get('reservation')}"
+
+    script = f"#!/bin/bash\n#\n{slurm_opts}\n{BATCH_CMDS.format(**kwargs,**kwargs['opt_strs'])}"
+    try:
+        proc = subprocess.run(
+            ["sbatch"],
+            input=script,
+            shell=True,
+            text=True,
+            check=True,
+            capture_output=True,
+        )
+        print(proc.stdout)
+    except subprocess.CalledProcessError as e:
+        raise subprocess.CalledProcessError(e.stderr.decode())
+    pass
+
+def write_to_db(df, db_path, retries=10):
+    finished = False
+    cur_try = 0
+
+    while not finished and cur_try < retries:
+        try:
+            df_to_sql(df, db_path, "churn")
+            print("INFO: Finished writing to database", flush=True)
+            finished = True
+        except:  # noqa: E722
+            cur_try += 1
+            print(f"WARNING: Unable to write to database. Retry {cur_try}",flush=True)
+            time.sleep(1)
+
+    if not finished:
+        exit("ERROR: Unable to write to database. Exiting")
+
+def _get_existing_records(db_path,tld):
+    # Remove records from df which conflict in the current database (based on primary keys)
+    query = text(f"SELECT tld,log_dt,prior_log_dt FROM churn WHERE tld = '{tld}'")
+
+    uri = create_engine(f"sqlite:///{db_path}")
+    with uri.connect() as conn:
+        cur_records = pl.read_database(
+            query = query,
+            connection=conn
+        )
+
+    return cur_records
+
+def _find_date(acqs,date,hive_path):
+    try:
+         return acqs.index(date)
+    except ValueError:
+        raise ValueError(
+            f"Date {date} was not found in the list of acquisitions in hive {hive_path}."
+        )
+
+def calculate_churn_cli(cli_args=None):
+    args = parse_args(cli_args)
+
+    ## Parse DB path
+    if args['db_path'] is None:
+        db_name = args['hive_path'].name
+        args['db_path'] = args['hive_path'].parent.joinpath('db',f'{db_name}.db')
+        args['db_path'].parent.mkdir(exist_ok=True,parents=True)
+
+    if not args['db_path'].exists():
+        create_churn_db(args['db_path'])
+
+    ## Parse acquisition start and end dates
+
+    # Choose root user here specifically since it should always have something in it and should never be removed from
+    # the system
+    acqs = list(set([d.name.removeprefix('acq=') for d in args['hive_path'].joinpath('tld=root').glob('acq=*')]))
+    acqs.sort()
+
+    start_idx = _find_date(acqs,args['start_date'],args['hive_path'])
+    
+    if args['end_date'] is None and args['num_acqs'] is None:
+        end_idx = len(acqs)
+    elif args['end_date'] is not None:
+        end_idx = _find_date(acqs, args["end_date"], args["hive_path"]) + 1 # Add one for idx offset used in slice later
+    elif args['num_acqs'] is not None:
+        if args['num_acqs'] <= 1:
+            raise ValueError('Specified number of acquisitions should be greater than or equal to 2')
+        end_idx = start_idx + args['num_acqs']
+        if end_idx > len(acqs):
+            print(
+                f"WARNING: The specified number of acquisitions exceeded the number of acquisitions following start date {args['start_date']}. Using the most recent available acquisition {acqs[-1]} as the end date instead.",
+                flush=True,
+            )
+            end_idx = len(acqs)
+
+    if args['tld'] is None:
+        all_tlds = list(set([p.name.removeprefix('tld=') for p in args['hive_path'].glob('tld=*')]))
+    else:
+        all_tlds = args['tld']
+
+    ## Determine which tlds do not have more than 1 relevant acquisition, and remove them from the list
+    for t in all_tlds:
+        tld_acqs = [d.name.removeprefix('acq=') for d in args['hive_path'].joinpath(f'tld={t}').glob('acq=*')]
+        relevant_acqs = [a for a in tld_acqs if a in acqs[start_idx:end_idx]]
+        if len(relevant_acqs) < 2:
+            print(
+                f"WARNING: {t} has fewer than 2 acquisitions in the specified range from {acqs[start_idx]} to {acqs[end_idx - 1]}. {t} will be skipped",
+                flush=True,
+            )
+            all_tlds.remove(t)
+    
+    if len(all_tlds) == 0:
+        print("INFO: No tlds have acquisitions matching the specified criteria. Exiting", flush=True)
+        return
+    
+    if args['batch']:
+        print(f"INFO: {len(all_tlds)} tlds will be processed", flush=True)
+        args['opt_strs'] = {}
+        args["opt_strs"].update(
+            {
+                "tld_str": ",".join(all_tlds),
+                "ntlds": len(all_tlds),
+                # end_idx is a stopping point, equivalent to the stop arg in range(). Subtract 1 to account for this.
+                "end_opt": f"-e {acqs[end_idx - 1]}",
+            }
+        )
+        submit_batch(**args)
+    else:
+        if len(args['tld']) != 1:
+            raise ValueError("Multiple tlds cannot be run in a non-batch context. Please add the --batch flag to the function call")
+        
+        args['tld'] = args['tld'][0]
+        acq_dates = acqs[start_idx:end_idx]
+
+        df = calculate_churn(args['hive_path'],args['tld'],acq_dates)
+
+        exist = _get_existing_records(args['db_path'],args['tld'])
+
+        if not exist.is_empty():
+            df = df.join(exist,on=['tld','log_dt','prior_log_dt'], how = 'anti')
+
+        if df.is_empty():
+            print("INFO: Churn for the given acquisition dates already exist. Exiting without writing")
+            return
+
+        write_to_db(df, args['db_path'], args['retries'])
-- 
GitLab