Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • rc/gpfs-policy
1 result
Show changes
Commits on Source (13)
data/
data
joblogs/
slurm-*
out/
err/
*.sif
......@@ -65,6 +65,24 @@ The ouput file is an unsorted list of files in uncompressed ASCII. Further proc
### Pre-parse output for Python
Processing GPFS log outputs is controlled by the `run-convert-to-parquet.sh` script and assumes the GPFS log has been split into a number of files of the form `list-XXX.gz` where `XXX` is an incrementing numeric index. This creates an array job where each task in the array reads the quoted text in one file, parses it into a dataframe, and exports it as a parquet file with the name `list-XXX.parquet`.
While the file is being parsed, the top-level-directory (`tld`) is extracted for each entry and added as a separate column to make common aggregations easier.
This script is written to parse the `list-path-external` policy format with quoted special characters.
```
Usage: ./run-convert-to-parquet.sh [ -h ]
[ -o | --outdir ] [ -n | --ntasks ] [ -p | --partition]
[ -t | --time ] [ -m | --mem ]
gpfs_logdir"
```
- `outdir`: Path to save parquet outputs. Defaults to `${gpfs_logdir}/parquet`
- `gpfs_logdir`: Directory path containing the split log files as `*.gz`
All other options control the array job resources. The default resources can parse 5 million line files in approximately 3 minutes so should cover all common use cases.
## Running reports
### Disk usage by top level directies
......
from urllib.parse import unquote
import os
import re
import argparse
import pandas as pd
import gzip
from pathlib import PurePath
desc = """
Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n
Works with data from /data/user, /data/user/home, /data/project, and /scratch
The data are parsed and the directory beneath any of those listed above is set as the top-level
directory ('tld').
"""
def parse_args():
parser = argparse.ArgumentParser(description=desc,
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('-p','--policy',help="Print the policy the script uses as a template and exit",action='store_true')
parser.add_argument('-o','--output-dir',help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to input_file_dir/parquet")
parser.add_argument('-f','--file',help="Log file from mmlspolicy run to be converted to parquet. Can be either a full log or just a part")
args = parser.parse_args()
return args
def parse_line(line):
try:
ul = unquote(line).strip()
ul = re.sub(r'[\n\t]','',ul)
details,path = re.match(r'^[^|]+\|(.*)\| -- (.*)$', ul).groups()
d = dict([re.match(r'([\w]+)=(.*)',l).groups() for l in details.split('|')])
tld = re.match(r'(?:/data/user(?:/home)?/|/data/project/|/scratch/)([^/]+)',path).group(1)
d.update({'path': path,
'tld': tld})
return d
except:
return line
def print_policy():
policy = """
/* list of files to include */
define( include_list,
(PATH_NAME LIKE 'FILEPATH%')
)
/* define access_age */
define(access_age,
(DAYS(CURRENT_TIMESTAMP) - DAYS(ACCESS_TIME))
)
RULE 'gather-exec' EXTERNAL LIST 'gather-info' EXEC '' OPTS 'JOBID' ESCAPE '%'
RULE 'list-path' LIST 'gather-info'
SHOW ('|size=' || varchar(FILE_SIZE) ||
'|kballoc='|| varchar(KB_ALLOCATED) ||
'|access=' || varchar(ACCESS_TIME) ||
'|create=' || varchar(CREATION_TIME) ||
'|modify=' || varchar(MODIFICATION_TIME) ||
'|uid=' || varchar(USER_ID) ||
'|gid=' || varchar(GROUP_ID) ||
'|heat=' || varchar(FILE_HEAT) ||
'|pool=' || varchar(POOL_NAME) ||
'|mode=' || varchar(MODE) ||
'|misc=' || varchar(MISC_ATTRIBUTES) ||
'|'
)
WHERE include_list
"""
print(policy)
return
schema = {
'size': 'int64',
'kballoc': 'int64',
'access': 'datetime64[ns]',
'create': 'datetime64[ns]',
'modify': 'datetime64[ns]',
'uid': 'int64',
'gid': 'int64',
'heat': 'str',
'pool': 'str',
'mode': 'str',
'misc': 'str',
'path': 'str',
'tld': 'str'
}
def main():
args = parse_args()
if args.policy:
print_policy()
exit()
if args.file:
file = PurePath(args.file)
else:
exit('Error: must specify a file to convert')
if not args.output_dir:
outdir = file.parent.joinpath('parquet')
else:
outdir = PurePath(args.output_dir)
os.makedirs(outdir,exist_ok=True)
#bag = db.read_text(file).map(parse_line)
with gzip.open(file,'r') as f:
dicts = [parse_line(l) for l in f]
df = pd.DataFrame.from_dict(dicts).sort_values('tld')
df = df.astype(schema)
outname = file.with_suffix('.parquet').name
df.to_parquet(outdir.joinpath(outname),engine = 'pyarrow')
if __name__ == '__main__':
main()
\ No newline at end of file
#!/bin/bash
set -euo pipefail
############################################################
# Default Values #
############################################################
ntasks=1
mem="16G"
time="02:00:00"
partition="amd-hdr100"
outdir=""
sif="gitlab.rc.uab.edu:4567/mdefende/gpfs-policy:latest"
############################################################
# Help #
############################################################
usage()
{
>&2 cat << EOF
Usage: $0 [ -h ] [ -o | --outdir ] [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ] gpfs_logdir"
EOF
exit 1
}
help()
{
>&2 cat << EOF
Submits an array job to convert parts of a GPFS log to parquet format
Syntax: $0 [ -h ] [ -o | --outdir ] [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ] gpfs_logdir
options:
-h|--help Print this Help.
Required:
gpfs_log_dir Directory containing GPFS log outputs
Path:
-o|--outdir Directory to save parquet files to
sbatch options:
-n|--ntasks Number of tasks for each array index (default: 1)
-p|--partition Partition to submit tasks to (default: amd-hdr100)
-t|--time Max walltime (default: 02:00:00)
-m|--mem Memory for each task (default: 16G)
EOF
exit 0
}
args=$(getopt -a -o ho:n:p:t:m: --long help,outdir:,ntasks:,partition:,time:,mem: -- "$@")
if [[ $? -gt 0 ]]; then
usage
fi
eval set -- ${args}
while :
do
case $1 in
-h | --help) help ;;
-o | --outdir) outdir=$2 ; shift 2 ;;
-n | --ntasks) ntasks=$2 ; shift 2 ;;
-p | --partition) partition=$2 ; shift 2 ;;
-t | --time) time=$2 ; shift 2 ;;
-m | --mem) mem=$2 ; shift 2 ;;
--) shift; break ;;
*) >&2 echo Unsupported option: $1
usage ;;
esac
done
if [[ $# -eq 0 ]]; then
usage
fi
gpfs_logdir="$1"
# Ensure gpfs_logdir is set
if [[ -z "$gpfs_logdir" ]]; then
echo "Error: gpfs_logdir is a required positional argument."
exit 1
fi
# If outdir not set, set to ${gpfs_logdir}/parquet
if [[ -z "$outdir" ]]; then
outdir="${gpfs_logdir}/parquet"
fi
singularity pull --force gpfs.sif docker://${sif}
nlogs=$(ls ${gpfs_logdir}/list-* | wc -l)
cmd="singularity exec --bind /data,/scratch gpfs.sif python3 convert-to-parquet.py -o ${outdir} -f \${log}"
>&2 cat << EOF
--------------------------------------------------------------------------------
sif: ${sif}
output dir: ${outdir}
GPFS logs: ${gpfs_logdir}
ntasks: ${ntasks}
partition: ${partition}
time: ${time}
mem: ${mem}
command: ${cmd}
--------------------------------------------------------------------------------
EOF
mkdir -p out
mkdir -p err
############################################################
# Create Array Job Script #
############################################################
{ cat | sbatch; } << EOF
#!/bin/bash
#
#SBATCH --job-name=parquet-list-%a
#SBATCH --ntasks=${ntasks}
#SBATCH --partition=${partition}
#SBATCH --time=${time}
#SBATCH --mem=${mem}
#SBATCH --output=out/%A_%a.out
#SBATCH --error=err/%A_%a.err
#SBATCH --array=1-${nlogs}
log=\$(ls ${gpfs_logdir}/list-* | awk "NR==\${SLURM_ARRAY_TASK_ID} { print \$1 }")
${cmd}
EOF
exit 0
\ No newline at end of file