diff --git a/.gitignore b/.gitignore index 37640b42b2e16d2c19d4516b21819a02d79b2655..b6c3ca8d2b36c86bf1e2828ea013da533c533d94 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ -data/ +data joblogs/ slurm-* -*.sif \ No newline at end of file +out/ +err/ +*.sif diff --git a/README.md b/README.md index 31113d59c55045b27a044768af414a89b3da6f7d..49ff44d3fbd9c7c7dce3f46d257790665ee251c7 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,24 @@ In order to access the container, you will need to add a personal access token f ### Split and compress +Processing GPFS log outputs is controlled by the `run-convert-to-parquet.sh` script and assumes the GPFS log has been split into a number of files of the form `list-XXX.gz` where `XXX` is an incrementing numeric index. This creates an array job where each task in the array reads the quoted text in one file, parses it into a dataframe, and exports it as a parquet file with the name `list-XXX.parquet`. + +While the file is being parsed, the top-level-directory (`tld`) is extracted for each entry and added as a separate column to make common aggregations easier. + +This script is written to parse the `list-path-external` policy format with quoted special characters. + +``` +Usage: ./run-convert-to-parquet.sh [ -h ] + [ -o | --outdir ] [ -n | --ntasks ] [ -p | --partition] + [ -t | --time ] [ -m | --mem ] + gpfs_logdir" +``` + +- `outdir`: Path to save parquet outputs. Defaults to `${gpfs_logdir}/parquet` +- `gpfs_logdir`: Directory path containing the split log files as `*.gz` + +All other options control the array job resources. The default resources can parse 5 million line files in approximately 3 minutes so should cover all common use cases. + ### Pre-parse output for Python ### Parallel Transfer Using s5cmd diff --git a/src/convert-to-parquet/convert-to-parquet.py b/src/convert-to-parquet/convert-to-parquet.py new file mode 100755 index 0000000000000000000000000000000000000000..2a8b70c693a9594a428fbc53724239052caf7ff7 --- /dev/null +++ b/src/convert-to-parquet/convert-to-parquet.py @@ -0,0 +1,117 @@ +from urllib.parse import unquote +import os +import re +import argparse +import pandas as pd +import gzip +from pathlib import PurePath + +desc = """ +Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n +Works with data from /data/user, /data/user/home, /data/project, and /scratch +The data are parsed and the directory beneath any of those listed above is set as the top-level +directory ('tld'). +""" + +def parse_args(): + parser = argparse.ArgumentParser(description=desc, + formatter_class=argparse.RawTextHelpFormatter + ) + parser.add_argument('-p','--policy',help="Print the policy the script uses as a template and exit",action='store_true') + parser.add_argument('-o','--output-dir',help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to input_file_dir/parquet") + parser.add_argument('-f','--file',help="Log file from mmlspolicy run to be converted to parquet. Can be either a full log or just a part") + args = parser.parse_args() + return args + +def parse_line(line): + try: + ul = unquote(line).strip() + ul = re.sub(r'[\n\t]','',ul) + details,path = re.match(r'^[^|]+\|(.*)\| -- (.*)$', ul).groups() + + d = dict([re.match(r'([\w]+)=(.*)',l).groups() for l in details.split('|')]) + + tld = re.match(r'(?:/data/user(?:/home)?/|/data/project/|/scratch/)([^/]+)',path).group(1) + d.update({'path': path, + 'tld': tld}) + return d + except: + return line + +def print_policy(): + policy = """ +/* list of files to include */ +define( include_list, + (PATH_NAME LIKE 'FILEPATH%') +) + +/* define access_age */ +define(access_age, + (DAYS(CURRENT_TIMESTAMP) - DAYS(ACCESS_TIME)) +) + +RULE 'gather-exec' EXTERNAL LIST 'gather-info' EXEC '' OPTS 'JOBID' ESCAPE '%' +RULE 'list-path' LIST 'gather-info' + SHOW ('|size=' || varchar(FILE_SIZE) || + '|kballoc='|| varchar(KB_ALLOCATED) || + '|access=' || varchar(ACCESS_TIME) || + '|create=' || varchar(CREATION_TIME) || + '|modify=' || varchar(MODIFICATION_TIME) || + '|uid=' || varchar(USER_ID) || + '|gid=' || varchar(GROUP_ID) || + '|heat=' || varchar(FILE_HEAT) || + '|pool=' || varchar(POOL_NAME) || + '|mode=' || varchar(MODE) || + '|misc=' || varchar(MISC_ATTRIBUTES) || + '|' + ) + WHERE include_list +""" + print(policy) + return + +schema = { + 'size': 'int64', + 'kballoc': 'int64', + 'access': 'datetime64[ns]', + 'create': 'datetime64[ns]', + 'modify': 'datetime64[ns]', + 'uid': 'int64', + 'gid': 'int64', + 'heat': 'str', + 'pool': 'str', + 'mode': 'str', + 'misc': 'str', + 'path': 'str', + 'tld': 'str' +} + +def main(): + args = parse_args() + if args.policy: + print_policy() + exit() + + if args.file: + file = PurePath(args.file) + else: + exit('Error: must specify a file to convert') + + if not args.output_dir: + outdir = file.parent.joinpath('parquet') + else: + outdir = PurePath(args.output_dir) + + os.makedirs(outdir,exist_ok=True) + + #bag = db.read_text(file).map(parse_line) + with gzip.open(file,'r') as f: + dicts = [parse_line(l) for l in f] + df = pd.DataFrame.from_dict(dicts).sort_values('tld') + df = df.astype(schema) + + outname = file.with_suffix('.parquet').name + df.to_parquet(outdir.joinpath(outname),engine = 'pyarrow') + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/src/convert-to-parquet/run-convert-to-parquet.sh b/src/convert-to-parquet/run-convert-to-parquet.sh new file mode 100755 index 0000000000000000000000000000000000000000..d08a2db3c477a3d8514ff620a857ce876dd015bf --- /dev/null +++ b/src/convert-to-parquet/run-convert-to-parquet.sh @@ -0,0 +1,135 @@ +#!/bin/bash + +set -euo pipefail + +############################################################ +# Default Values # +############################################################ + +ntasks=1 +mem="16G" +time="02:00:00" +partition="amd-hdr100" +outdir="" +sif="gitlab.rc.uab.edu:4567/mdefende/gpfs-policy:latest" + +############################################################ +# Help # +############################################################ +usage() +{ +>&2 cat << EOF +Usage: $0 [ -h ] [ -o | --outdir ] [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ] gpfs_logdir" +EOF +exit 1 +} + +help() +{ +>&2 cat << EOF +Submits an array job to convert parts of a GPFS log to parquet format +Syntax: $0 [ -h ] [ -o | --outdir ] [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ] gpfs_logdir + +options: + -h|--help Print this Help. + +Required: + gpfs_log_dir Directory containing GPFS log outputs + +Path: + -o|--outdir Directory to save parquet files to + +sbatch options: + -n|--ntasks Number of tasks for each array index (default: 1) + -p|--partition Partition to submit tasks to (default: amd-hdr100) + -t|--time Max walltime (default: 02:00:00) + -m|--mem Memory for each task (default: 16G) +EOF +exit 0 +} + +args=$(getopt -a -o ho:n:p:t:m: --long help,outdir:,ntasks:,partition:,time:,mem: -- "$@") +if [[ $? -gt 0 ]]; then + usage +fi + +eval set -- ${args} + +while : +do + case $1 in + -h | --help) help ;; + -o | --outdir) outdir=$2 ; shift 2 ;; + -n | --ntasks) ntasks=$2 ; shift 2 ;; + -p | --partition) partition=$2 ; shift 2 ;; + -t | --time) time=$2 ; shift 2 ;; + -m | --mem) mem=$2 ; shift 2 ;; + --) shift; break ;; + *) >&2 echo Unsupported option: $1 + usage ;; + esac +done + +if [[ $# -eq 0 ]]; then + usage +fi + +gpfs_logdir="$1" + +# Ensure gpfs_logdir is set +if [[ -z "$gpfs_logdir" ]]; then + echo "Error: gpfs_logdir is a required positional argument." + exit 1 +fi + +# If outdir not set, set to ${gpfs_logdir}/parquet +if [[ -z "$outdir" ]]; then + outdir="${gpfs_logdir}/parquet" +fi + +singularity pull --force gpfs.sif docker://${sif} + +nlogs=$(ls ${gpfs_logdir}/list-* | wc -l) + +cmd="singularity exec --bind /data,/scratch gpfs.sif python3 convert-to-parquet.py -o ${outdir} -f \${log}" + +>&2 cat << EOF +-------------------------------------------------------------------------------- +sif: ${sif} +output dir: ${outdir} +GPFS logs: ${gpfs_logdir} + +ntasks: ${ntasks} +partition: ${partition} +time: ${time} +mem: ${mem} + +command: ${cmd} +-------------------------------------------------------------------------------- +EOF + +mkdir -p out +mkdir -p err + +############################################################ +# Create Array Job Script # +############################################################ + +{ cat | sbatch; } << EOF +#!/bin/bash +# +#SBATCH --job-name=parquet-list-%a +#SBATCH --ntasks=${ntasks} +#SBATCH --partition=${partition} +#SBATCH --time=${time} +#SBATCH --mem=${mem} +#SBATCH --output=out/%A_%a.out +#SBATCH --error=err/%A_%a.err +#SBATCH --array=1-${nlogs} + +log=\$(ls ${gpfs_logdir}/list-* | awk "NR==\${SLURM_ARRAY_TASK_ID} { print \$1 }") + +${cmd} +EOF + +exit 0 \ No newline at end of file