From 9fbceed1e5df98a509856a6d8ee23ae5233c7a5a Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Wed, 14 Aug 2024 17:53:01 -0500
Subject: [PATCH] add scripts for converting gpfs logs to parquet via array job

---
 convert-to-parquet/convert-parquet-array.sh  |  17 +++
 convert-to-parquet/convert-to-parquet.py     | 114 +++++++++++++++++
 convert-to-parquet/run-convert-to-parquet.sh | 121 +++++++++++++++++++
 3 files changed, 252 insertions(+)
 create mode 100644 convert-to-parquet/convert-parquet-array.sh
 create mode 100755 convert-to-parquet/convert-to-parquet.py
 create mode 100755 convert-to-parquet/run-convert-to-parquet.sh

diff --git a/convert-to-parquet/convert-parquet-array.sh b/convert-to-parquet/convert-parquet-array.sh
new file mode 100644
index 0000000..b84902f
--- /dev/null
+++ b/convert-to-parquet/convert-parquet-array.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+#
+#SBATCH --job-name=parquet-list-%a
+#SBATCH --ntasks=1
+#SBATCH --partition=amd-hdr100
+#SBATCH --time=02:00:00
+#SBATCH --mem=16G
+#SBATCH --output=out/%A_%a.out
+#SBATCH --error=err/%A_%a.err
+#SBATCH --array=1-111
+
+module load Anaconda3
+conda activate gpfs
+
+log=$(ls /data/rc/gpfs-policy/data/list-policy_data-user_2024-07-23/list-* | awk "NR==${SLURM_ARRAY_TASK_ID} { print $1 }")
+
+python convert-to-parquet.py -o /home/mdefende/Desktop/test_data_user_parquet -f ${log}
diff --git a/convert-to-parquet/convert-to-parquet.py b/convert-to-parquet/convert-to-parquet.py
new file mode 100755
index 0000000..2f716b4
--- /dev/null
+++ b/convert-to-parquet/convert-to-parquet.py
@@ -0,0 +1,114 @@
+from urllib.parse import unquote
+import os
+import re
+import dask.bag as db
+import dask.dataframe as dd
+import argparse
+from pathlib import PurePath
+
+desc = """
+Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n
+Works with data from /data/user, /data/user/home, /data/project, and /scratch
+The data are parsed and the directory beneath any of those listed above is set as the top-level
+directory ('tld'). The tld is then set as the index for the parquet file to improve aggregation speed
+
+If the full log is split into multiple parts, the full dataset will need to be repartitioned after all of
+the parts are converted to parquet to make sure all entries for each tld are in the same partition. This
+can be done with a separate script
+"""
+
+def parse_args():
+    parser = argparse.ArgumentParser(description=desc,
+                                     formatter_class=argparse.RawTextHelpFormatter
+                                     )
+    parser.add_argument('-p','--policy',help="Print the policy the script uses as a template and exit")
+    parser.add_argument('-o','--output-dir',help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to input_file_dir/parquet")
+    parser.add_argument('-f','--file',required=True,help="Log file from mmlspolicy run to be converted to parquet. Can be either a full log or just a part")
+    args = parser.parse_args()
+    return args
+
+def parse_line(line):
+    try:
+        ul = unquote(line).strip()
+        ul = re.sub(r'[\n\t]','',ul)
+        details,path = re.match(r'^[^|]+\|(.*)\| -- (.*)$', ul).groups()
+        
+        d = dict([re.match(r'([\w]+)=(.*)',l).groups() for l in details.split('|')])
+
+        tld = re.match(r'(?:/data/user(?:/home)?/|/data/project/|/scratch/)([^/]+)',path).group(1)
+        d.update({'path': path,
+                  'tld': tld})
+        return d
+    except:
+        return line
+
+def print_policy():
+    policy = """
+/* list of files to include */
+define( include_list,
+  (PATH_NAME LIKE 'FILEPATH%')
+)
+
+/* define access_age */
+define(access_age,
+  (DAYS(CURRENT_TIMESTAMP) - DAYS(ACCESS_TIME))
+)
+
+RULE 'gather-exec' EXTERNAL LIST 'gather-info' EXEC '' OPTS 'JOBID' ESCAPE '%'
+RULE 'list-path' LIST 'gather-info'
+  SHOW ('|size='   || varchar(FILE_SIZE) ||
+        '|kballoc='|| varchar(KB_ALLOCATED) ||
+        '|access=' || varchar(ACCESS_TIME) ||
+        '|create=' || varchar(CREATION_TIME) ||
+        '|modify=' || varchar(MODIFICATION_TIME) ||
+	    '|uid='    || varchar(USER_ID) ||
+        '|gid='    || varchar(GROUP_ID) ||
+        '|heat='   || varchar(FILE_HEAT) ||
+        '|pool='   || varchar(POOL_NAME) ||
+        '|mode='   || varchar(MODE) ||
+        '|misc='   || varchar(MISC_ATTRIBUTES) ||
+        '|'
+       )
+  WHERE include_list
+"""
+    print(policy)
+    return
+
+schema = {
+    'size': 'int64',
+    'kballoc': 'int64',
+    'access': 'datetime64[ns]',
+    'create': 'datetime64[ns]',
+    'modify': 'datetime64[ns]',
+    'uid': 'int64',
+    'gid': 'int64',
+    'heat': 'str',
+    'pool': 'str',
+    'mode': 'str',
+    'misc': 'str',
+    'path': 'str',
+    'tld': 'str'
+}
+
+def main():
+    args   = parse_args()
+    if args.policy:
+        print_policy()
+        exit()
+    
+    file   = PurePath(args.file)
+    outdir = PurePath(args.output_dir)
+
+    if not outdir:
+        outdir = file.parent.joinpath('parquet')
+        
+    os.makedirs(outdir,exist_ok=True)
+    
+    bag = db.read_text(file).map(parse_line)
+    ddf  = bag.to_dataframe(meta=schema).set_index('tld')
+
+    outname = file.with_suffix('.parquet').name
+    ddf.to_parquet(outdir.joinpath(outname))
+
+if __name__ == '__main__':
+    main()
\ No newline at end of file
diff --git a/convert-to-parquet/run-convert-to-parquet.sh b/convert-to-parquet/run-convert-to-parquet.sh
new file mode 100755
index 0000000..58bf3d8
--- /dev/null
+++ b/convert-to-parquet/run-convert-to-parquet.sh
@@ -0,0 +1,121 @@
+#!/bin/bash
+
+set -euo pipefail
+
+############################################################
+# Default Values                                           #
+############################################################
+
+ntasks=1
+mem="16G"
+time="02:00:00"
+partition="amd-hdr100"
+outdir=""
+
+############################################################
+# Help                                                     #
+############################################################
+usage()
+{
+>&2 cat << EOF
+Usage: $0 [ -h ] [ -o | --outdir ] [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ] <gpfs_logdir>"
+EOF
+exit 1
+}
+
+help()
+{
+# Display Help
+echo "Submits an array job to convert parts of a GPFS log to parquet format"
+echo
+echo "Syntax: $0 [ -h ] [ -o | --outdir ] [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ] <gpfs_logdir>"
+echo "options:"
+echo "-h|--help     Print this Help."
+echo 
+echo "Required:"
+echo "   gpfs_log_dir           Directory containing GPFS log outputs"
+echo
+echo "Path:"
+echo "   -o|--outdir      Directory to save parquet files to"
+echo 
+echo "sbatch options:"
+echo "   -n|--ntasks      Number of tasks for each array index (default: 1)"
+echo "   -p|--partition   Partition to submit tasks to (default: amd-hdr100)"
+echo "   -t|--time        Max walltime (default: 02:00:00)"
+echo "   -m|--mem         Memory for each task (default: 16G)"
+echo
+exit 1
+}
+
+args=$(getopt -a -o ho:n:p:t:m: --long help,outdir:,ntasks:,partition:,time:,mem: -- "$@")
+if [[ $? -gt 0 ]]; then
+  usage
+fi
+
+eval set -- ${args}
+
+while :
+do
+  case $1 in
+    -h | --help)      help           ;;
+    -o | --outdir)    outdir=$2      ; shift 2 ;;
+    -n | --ntasks)    ntasks=$2      ; shift 2 ;;
+    -p | --partition) partition=$2   ; shift 2 ;;
+    -t | --time)      time=$2        ; shift 2 ;;
+    -m | --mem)       mem=$2         ; shift 2 ;;
+    # -- means the end of the arguments; drop this, and break out of the while loop
+    --) shift; break ;;
+    *) >&2 echo Unsupported option: $1
+       usage ;;
+  esac
+done
+
+if [[ $# -eq 0 ]]; then
+  usage
+fi
+
+gpfs_logdir="$1"
+
+# Ensure gpfs_logdir is set
+if [[ -z "$gpfs_logdir" ]]; then
+    echo "Error: gpfs_logdir is a required positional argument."
+    exit 1
+fi
+
+>&2 echo "output dir: ${outdir}"
+>&2 echo "GPFS logs:  ${gpfs_logdir}"
+>&2 echo "ntasks:     ${ntasks}"
+>&2 echo "partition:  ${partition}"
+>&2 echo "time:       ${time}"
+>&2 echo "mem:        ${mem}"
+
+nlogs=$(ls ${gpfs_logdir}/list-* | wc -l)
+
+mkdir -p out
+mkdir -p err
+
+############################################################
+# Create Array Job Script                                  #
+############################################################
+
+cat > convert-parquet-array.sh <<EOF
+#!/bin/bash
+#
+#SBATCH --job-name=parquet-list-%a
+#SBATCH --ntasks=${ntasks}
+#SBATCH --partition=${partition}
+#SBATCH --time=${time}
+#SBATCH --mem=${mem}
+#SBATCH --output=out/%A_%a.out
+#SBATCH --error=err/%A_%a.err
+#SBATCH --array=1-${nlogs}
+
+module load Anaconda3
+conda activate gpfs
+
+log=\$(ls ${gpfs_logdir}/list-* | awk "NR==\${SLURM_ARRAY_TASK_ID} { print \$1 }")
+
+python convert-to-parquet.py -o ${outdir} -f \${log}
+EOF
+
+exit 0
\ No newline at end of file
-- 
GitLab