Skip to content
Snippets Groups Projects
Commit 9fbceed1 authored by Matthew K Defenderfer's avatar Matthew K Defenderfer
Browse files

add scripts for converting gpfs logs to parquet via array job

parent 91e8f412
No related branches found
No related tags found
1 merge request!8Automate conversion of GPFS policy outputs to parquet without Jupyter
#!/bin/bash
#
#SBATCH --job-name=parquet-list-%a
#SBATCH --ntasks=1
#SBATCH --partition=amd-hdr100
#SBATCH --time=02:00:00
#SBATCH --mem=16G
#SBATCH --output=out/%A_%a.out
#SBATCH --error=err/%A_%a.err
#SBATCH --array=1-111
module load Anaconda3
conda activate gpfs
log=$(ls /data/rc/gpfs-policy/data/list-policy_data-user_2024-07-23/list-* | awk "NR==${SLURM_ARRAY_TASK_ID} { print $1 }")
python convert-to-parquet.py -o /home/mdefende/Desktop/test_data_user_parquet -f ${log}
from urllib.parse import unquote
import os
import re
import dask.bag as db
import dask.dataframe as dd
import argparse
from pathlib import PurePath
desc = """
Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n
Works with data from /data/user, /data/user/home, /data/project, and /scratch
The data are parsed and the directory beneath any of those listed above is set as the top-level
directory ('tld'). The tld is then set as the index for the parquet file to improve aggregation speed
If the full log is split into multiple parts, the full dataset will need to be repartitioned after all of
the parts are converted to parquet to make sure all entries for each tld are in the same partition. This
can be done with a separate script
"""
def parse_args():
parser = argparse.ArgumentParser(description=desc,
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('-p','--policy',help="Print the policy the script uses as a template and exit")
parser.add_argument('-o','--output-dir',help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to input_file_dir/parquet")
parser.add_argument('-f','--file',required=True,help="Log file from mmlspolicy run to be converted to parquet. Can be either a full log or just a part")
args = parser.parse_args()
return args
def parse_line(line):
try:
ul = unquote(line).strip()
ul = re.sub(r'[\n\t]','',ul)
details,path = re.match(r'^[^|]+\|(.*)\| -- (.*)$', ul).groups()
d = dict([re.match(r'([\w]+)=(.*)',l).groups() for l in details.split('|')])
tld = re.match(r'(?:/data/user(?:/home)?/|/data/project/|/scratch/)([^/]+)',path).group(1)
d.update({'path': path,
'tld': tld})
return d
except:
return line
def print_policy():
policy = """
/* list of files to include */
define( include_list,
(PATH_NAME LIKE 'FILEPATH%')
)
/* define access_age */
define(access_age,
(DAYS(CURRENT_TIMESTAMP) - DAYS(ACCESS_TIME))
)
RULE 'gather-exec' EXTERNAL LIST 'gather-info' EXEC '' OPTS 'JOBID' ESCAPE '%'
RULE 'list-path' LIST 'gather-info'
SHOW ('|size=' || varchar(FILE_SIZE) ||
'|kballoc='|| varchar(KB_ALLOCATED) ||
'|access=' || varchar(ACCESS_TIME) ||
'|create=' || varchar(CREATION_TIME) ||
'|modify=' || varchar(MODIFICATION_TIME) ||
'|uid=' || varchar(USER_ID) ||
'|gid=' || varchar(GROUP_ID) ||
'|heat=' || varchar(FILE_HEAT) ||
'|pool=' || varchar(POOL_NAME) ||
'|mode=' || varchar(MODE) ||
'|misc=' || varchar(MISC_ATTRIBUTES) ||
'|'
)
WHERE include_list
"""
print(policy)
return
schema = {
'size': 'int64',
'kballoc': 'int64',
'access': 'datetime64[ns]',
'create': 'datetime64[ns]',
'modify': 'datetime64[ns]',
'uid': 'int64',
'gid': 'int64',
'heat': 'str',
'pool': 'str',
'mode': 'str',
'misc': 'str',
'path': 'str',
'tld': 'str'
}
def main():
args = parse_args()
if args.policy:
print_policy()
exit()
file = PurePath(args.file)
outdir = PurePath(args.output_dir)
if not outdir:
outdir = file.parent.joinpath('parquet')
os.makedirs(outdir,exist_ok=True)
bag = db.read_text(file).map(parse_line)
ddf = bag.to_dataframe(meta=schema).set_index('tld')
outname = file.with_suffix('.parquet').name
ddf.to_parquet(outdir.joinpath(outname))
if __name__ == '__main__':
main()
\ No newline at end of file
#!/bin/bash
set -euo pipefail
############################################################
# Default Values #
############################################################
ntasks=1
mem="16G"
time="02:00:00"
partition="amd-hdr100"
outdir=""
############################################################
# Help #
############################################################
usage()
{
>&2 cat << EOF
Usage: $0 [ -h ] [ -o | --outdir ] [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ] <gpfs_logdir>"
EOF
exit 1
}
help()
{
# Display Help
echo "Submits an array job to convert parts of a GPFS log to parquet format"
echo
echo "Syntax: $0 [ -h ] [ -o | --outdir ] [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ] <gpfs_logdir>"
echo "options:"
echo "-h|--help Print this Help."
echo
echo "Required:"
echo " gpfs_log_dir Directory containing GPFS log outputs"
echo
echo "Path:"
echo " -o|--outdir Directory to save parquet files to"
echo
echo "sbatch options:"
echo " -n|--ntasks Number of tasks for each array index (default: 1)"
echo " -p|--partition Partition to submit tasks to (default: amd-hdr100)"
echo " -t|--time Max walltime (default: 02:00:00)"
echo " -m|--mem Memory for each task (default: 16G)"
echo
exit 1
}
args=$(getopt -a -o ho:n:p:t:m: --long help,outdir:,ntasks:,partition:,time:,mem: -- "$@")
if [[ $? -gt 0 ]]; then
usage
fi
eval set -- ${args}
while :
do
case $1 in
-h | --help) help ;;
-o | --outdir) outdir=$2 ; shift 2 ;;
-n | --ntasks) ntasks=$2 ; shift 2 ;;
-p | --partition) partition=$2 ; shift 2 ;;
-t | --time) time=$2 ; shift 2 ;;
-m | --mem) mem=$2 ; shift 2 ;;
# -- means the end of the arguments; drop this, and break out of the while loop
--) shift; break ;;
*) >&2 echo Unsupported option: $1
usage ;;
esac
done
if [[ $# -eq 0 ]]; then
usage
fi
gpfs_logdir="$1"
# Ensure gpfs_logdir is set
if [[ -z "$gpfs_logdir" ]]; then
echo "Error: gpfs_logdir is a required positional argument."
exit 1
fi
>&2 echo "output dir: ${outdir}"
>&2 echo "GPFS logs: ${gpfs_logdir}"
>&2 echo "ntasks: ${ntasks}"
>&2 echo "partition: ${partition}"
>&2 echo "time: ${time}"
>&2 echo "mem: ${mem}"
nlogs=$(ls ${gpfs_logdir}/list-* | wc -l)
mkdir -p out
mkdir -p err
############################################################
# Create Array Job Script #
############################################################
cat > convert-parquet-array.sh <<EOF
#!/bin/bash
#
#SBATCH --job-name=parquet-list-%a
#SBATCH --ntasks=${ntasks}
#SBATCH --partition=${partition}
#SBATCH --time=${time}
#SBATCH --mem=${mem}
#SBATCH --output=out/%A_%a.out
#SBATCH --error=err/%A_%a.err
#SBATCH --array=1-${nlogs}
module load Anaconda3
conda activate gpfs
log=\$(ls ${gpfs_logdir}/list-* | awk "NR==\${SLURM_ARRAY_TASK_ID} { print \$1 }")
python convert-to-parquet.py -o ${outdir} -f \${log}
EOF
exit 0
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment