From 88f4a16aca24fd64cc5a9448648c872e579873c4 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Thu, 12 Dec 2024 13:22:00 -0600 Subject: [PATCH] Add CLI functionality for log preprocessing --- create_symlinks.sh | 9 -- pyproject.toml | 4 +- src/rc_gpfs/cli/convert_to_parquet.py | 120 ++++++++++++++++ src/rc_gpfs/cli/split_log.py | 78 ++++++++++ src/rc_gpfs/cli/utils.py | 21 +++ src/rc_gpfs/compute/backend.py | 1 + src/rc_gpfs/compute/utils.py | 17 --- src/rc_gpfs/policy/__init__.py | 2 + src/rc_gpfs/policy/convert-to-parquet.py | 122 ---------------- src/rc_gpfs/policy/convert.py | 68 +++++++++ src/rc_gpfs/policy/policy_defs.py | 15 ++ src/rc_gpfs/policy/run-convert-to-parquet.sh | 133 ----------------- src/rc_gpfs/policy/split-info-file.sh | 144 ------------------- src/rc_gpfs/policy/split.py | 85 +++++++++++ src/rc_gpfs/policy/utils.py | 6 + src/rc_gpfs/utils.py | 16 +++ 16 files changed, 414 insertions(+), 427 deletions(-) delete mode 100755 create_symlinks.sh create mode 100644 src/rc_gpfs/cli/convert_to_parquet.py create mode 100644 src/rc_gpfs/cli/split_log.py create mode 100644 src/rc_gpfs/cli/utils.py delete mode 100755 src/rc_gpfs/policy/convert-to-parquet.py create mode 100755 src/rc_gpfs/policy/convert.py create mode 100644 src/rc_gpfs/policy/policy_defs.py delete mode 100755 src/rc_gpfs/policy/run-convert-to-parquet.sh delete mode 100755 src/rc_gpfs/policy/split-info-file.sh create mode 100644 src/rc_gpfs/policy/split.py create mode 100644 src/rc_gpfs/policy/utils.py diff --git a/create_symlinks.sh b/create_symlinks.sh deleted file mode 100755 index 5f36987..0000000 --- a/create_symlinks.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash -mkdir -p bin -# Find all .py and .sh files in the src directory and its subdirectories -find src/ -type f \( -name "*.py" -o -name "*.sh" \) | while read -r file; do - # Get the base name of the file - base=$(basename "$file") - # Create a symbolic link in the bin directory - ln -sf "../$file" "bin/$base" -done \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index b232963..f7f43d5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,8 +34,8 @@ url="https://pypi.nvidia.com" priority = "supplemental" [tool.poetry.scripts] -gpfs-preproc = "report.cli:aggregate_gpfs_dataset" -gpfs-report = "report.cli:report" +convert-to-parquet = "rc_gpfs.cli:convert_to_parquet" +split-log = "rc_gpfs.cli:split_log" [tool.poetry-dynamic-versioning] enable = true diff --git a/src/rc_gpfs/cli/convert_to_parquet.py b/src/rc_gpfs/cli/convert_to_parquet.py new file mode 100644 index 0000000..791330a --- /dev/null +++ b/src/rc_gpfs/cli/convert_to_parquet.py @@ -0,0 +1,120 @@ +import argparse +import subprocess +from pathlib import Path +import multiprocessing +from .utils import define_python_interpreter +from ..policy import convert +from ..utils import parse_scontrol + +__all__ = ['convert_to_parquet'] + +DESCRIPTION = """ +Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n +Works with data from /data/user, /data/user/home, /data/project, and /scratch +The data are parsed and the directory beneath any of those listed above is set as the top-level +directory ('tld'). +""" + +EPILOGUE = """\ +Local Parallel Processing: + If processing is done via a local parallel pool, the requested cores need to be accessible by the invoking Python process. When run in a Slurm job context where the number of cores were only specified with the --ntasks property, only 1 core will be available to the Python process regardless of the number of cores requested by the job. Instead, use the --cpus-per-task property to set the number of cores paired with --ntasks=1. This will correctly allow the parallel pool to utilize all cores assigned to the job. +""" + +def parse_args(): + parser = argparse.ArgumentParser( + description=DESCRIPTION, + formatter_class=argparse.RawTextHelpFormatter, + epilog=EPILOGUE + ) + parser.add_argument('input', + type=Path, + help="Path to a log file or directory of log files from mmlspolicy to be converted to parquet. If a directory path is given, all files within the directory will be converted. If the --batch option is set, this will be done in a batch array job. Otherwise, it will be done in a local parallel pool using available compute resources.") + + parser.add_argument('-o','--output-dir', + type=Path, + default = None, + help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../parquet") + parser.add_argument('--pool-size',type=int,default=None, + help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process") + + slurm = parser.add_argument_group(title='Slurm Options') + slurm.add_argument('--batch', action='store_true', default=False, + help="Run the conversion as a batch job. If a directory path is given as an input, an array job will be created to run the conversion in parallel.") + slurm.add_argument('-n','--ntasks', type=int, default=1) + slurm.add_argument('-p','--partition', type=str, default='amd-hdr100') + slurm.add_argument('-t','--time',type=str,default='02:00:00') + slurm.add_argument('-m','--mem',type=str,default='16G') + slurm.add_argument('--slurm-log-dir',type=Path,default='./out', + help='Output log directory. If the directory does not exist, it will be created automatically. Logs will be named convert_%A_%a to differentiate amongs job IDs and task IDs') + interpreter = slurm.add_mutually_exclusive_group() + interpreter.add_argument('--python-path',type=Path, + help="Path to Python interpreter to use for conversion. This interpreter should have access to a pandas library, preferably version >=2.0. If not specified, the path to the active python3 interpreter will be used.") + interpreter.add_argument('--conda-env',type=str,default=None, + help="The name or prefix of a conda environment to activate as opposed to a python3 path") + args = parser.parse_args() + return vars(args) + +BATCH_SCRIPT = """\ +#!/bin/bash +# +#SBATCH --job-name=parquet-list-%a +#SBATCH --ntasks={ntasks} +#SBATCH --partition={partition} +#SBATCH --time={time} +#SBATCH --mem={mem} +#SBATCH --output={output_log} +#SBATCH --error={error_log} +#SBATCH --array=1-{nlogs} + +{env_cmd} + +log=$(ls {input}/*.gz | awk "NR==${{SLURM_ARRAY_TASK_ID}} {{ print $1 }}") + +convert-to-parquet -o {output_dir} ${{log}} +""" + +def setup_slurm_logs(slurm_log_dir): + slurm_log_dir = slurm_log_dir.absolute() + slurm_log_dir.mkdir(exist_ok = True,parents=True,mode = 0o2770) + out_log,err_log = [slurm_log_dir.joinpath('convert_%A_%a.out'),slurm_log_dir.joinpath('convert_%A_%a.err')] + slurm_logs = {'output_log':out_log,'error_log':err_log} + return slurm_logs + +def submit_batch(**kwargs): + env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env')) + kwargs.update({"env_cmd":env_cmd}) + + slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir')) + kwargs.update(slurm_logs) + + script = BATCH_SCRIPT.format(**kwargs) + + subprocess.run(['sbatch'],input=script,shell=True,text=True) + pass + +def convert_to_parquet() -> None: + args = parse_args() + if args['output_dir'] is None: + args['output_dir'] = args['input'].parent.parent.joinpath('parquet') + + args['output_dir'].mkdir(exist_ok = True, mode = 0o2770) + + if args['input'].is_file(): + nlogs = 1 + else: + logs = list(args.get('input').glob('*.gz')) + nlogs = len(logs) + + args.update({'nlogs':nlogs}) + + if args['batch']: + submit_batch(**args) + elif nlogs > 1: + ncpus,_ = parse_scontrol() + pool_size = args.get('pool_size',ncpus) + with multiprocessing.Pool(processes=pool_size) as pool: + fargs = list(zip(logs,[args['output_dir']]*nlogs)) + pool.starmap(convert, fargs) + else: + convert(args['input'],args['output_dir']) + pass diff --git a/src/rc_gpfs/cli/split_log.py b/src/rc_gpfs/cli/split_log.py new file mode 100644 index 0000000..0e9730c --- /dev/null +++ b/src/rc_gpfs/cli/split_log.py @@ -0,0 +1,78 @@ +import argparse +import subprocess +from pathlib import Path +from .utils import define_python_interpreter +from ..policy import split + +BATCH_SCRIPT = """\ +#!/bin/bash +# +#SBATCH --job-name=split +#SBATCH --ntasks=1 +#SBATCH --cpus-per-task={cpus_per_task} +#SBATCH --partition={partition} +#SBATCH --time={time} +#SBATCH --mem={mem} +#SBATCH --output={output_log} +#SBATCH --error={error_log} + +{env_cmd} + +split-log -o {output_dir} -l {lines} {log} +""" + +def parse_args(): + parser = argparse.ArgumentParser( + description="Splits a GPFS policy log into multiple parts for batch array processing.", + formatter_class=argparse.RawTextHelpFormatter + ) + parser.add_argument('log', + type=Path, + help="Path to a raw GPFS log file. The log can be either uncompressed or gz compressed.") + + parser.add_argument('-o','--output-dir', + type=Path, + default = None, + help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../chunks") + parser.add_argument('-l', '--lines', type=int, default=int(5e6), + help="Number of lines to split the log file by") + + slurm = parser.add_argument_group(title='Slurm Options') + slurm.add_argument('--batch', action='store_true', default=False, + help="Run as a batch job. Otherwise use the current processing environment") + slurm.add_argument('-n','--cpus-per-task', type=int, default=24, + help="Number of cores assigned to the job. Ntasks is always set to 1") + slurm.add_argument('-p','--partition', type=str, default='amd-hdr100') + slurm.add_argument('-t','--time',type=str,default='02:00:00') + slurm.add_argument('-m','--mem',type=str,default='48G') + slurm.add_argument('--slurm-log-dir',type=Path,default='./out', + help='Output log directory. If the directory does not exist, it will be created automatically') + args = parser.parse_args() + return vars(args) + +def submit_batch(**kwargs): + env_cmd = define_python_interpreter() + kwargs.update({"env_cmd":env_cmd}) + + kwargs.get('slurm_log_dir').mkdir(exist_ok=True,parents=True,mode = 0o2770) + output_log = kwargs.get('slurm_log_dir').joinpath('split.out') + error_log = kwargs.get('slurm_log_dir').joinpath('split.err') + kwargs.update({'output_log':output_log,'error_log':error_log}) + + script = BATCH_SCRIPT.format(**kwargs) + + subprocess.run(['sbatch'],input=script,shell=True,text=True) + pass + +def split_log(): + args = parse_args() + + if args['output_dir'] is None: + args['output_dir'] = args['log'].parent.parent.joinpath('chunks') + args['output_dir'].mkdir(exist_ok = True, mode = 0o2770) + + if args.get('batch'): + submit_batch(**args) + else: + split(**args) + pass \ No newline at end of file diff --git a/src/rc_gpfs/cli/utils.py b/src/rc_gpfs/cli/utils.py new file mode 100644 index 0000000..8115f44 --- /dev/null +++ b/src/rc_gpfs/cli/utils.py @@ -0,0 +1,21 @@ +import sys +import os +from pathlib import Path + +def define_python_interpreter(python_path=None, conda_env=None): + conda_base = "module load Anaconda3\nconda activate {conda_env}" + venv_base = "source {python_path}" + + if conda_env is not None: + env = conda_base.format(conda_env=conda_env) + elif python_path is not None: + parent = Path(python_path).absolute().parent + env = venv_base.format(python_path=parent.joinpath('activate')) + else: + conda_env = os.environ.get('CONDA_PREFIX') + if conda_env: + env = conda_base.format(conda_env=conda_env) + else: + parent = Path(sys.executable).absolute().parent + env = venv_base.format(python_path=parent.joinpath('activate')) + return env \ No newline at end of file diff --git a/src/rc_gpfs/compute/backend.py b/src/rc_gpfs/compute/backend.py index e38f134..5f8022e 100644 --- a/src/rc_gpfs/compute/backend.py +++ b/src/rc_gpfs/compute/backend.py @@ -2,6 +2,7 @@ from .backend_defs import backend_options from dask_cuda import LocalCUDACluster from dask.distributed import Client, LocalCluster from .utils import * +from ..utils import parse_scontrol from typing import Literal __all__ = ['start_backend'] diff --git a/src/rc_gpfs/compute/utils.py b/src/rc_gpfs/compute/utils.py index b6d38b0..723f05d 100644 --- a/src/rc_gpfs/compute/utils.py +++ b/src/rc_gpfs/compute/utils.py @@ -1,27 +1,10 @@ import sys import ast import pyarrow.parquet as pq -import os -import subprocess -import re import pynvml import importlib from typing import Any from pathlib import Path -from ..utils import convert_si - -def parse_scontrol(): - job_id = os.getenv('SLURM_JOB_ID') - - command = f"scontrol show job {job_id} | grep TRES=" - result = subprocess.run(command, shell=True, capture_output=True, text=True).stdout.strip() - - tres_pattern=r'.*cpu=(?P<cores>[\d]+),mem=(?P<mem>[\d]+[KMGT]?).*' - cores,mem = re.search(tres_pattern,result).groupdict().values() - - cores = int(cores) - mem = convert_si(mem,to_unit='G',use_binary=True) - return [cores,mem] def estimate_dataset_size(path: str | Path) -> float: if not isinstance(path,Path): diff --git a/src/rc_gpfs/policy/__init__.py b/src/rc_gpfs/policy/__init__.py index e69de29..754c706 100644 --- a/src/rc_gpfs/policy/__init__.py +++ b/src/rc_gpfs/policy/__init__.py @@ -0,0 +1,2 @@ +from .split import split,compress_logs +from .convert import convert \ No newline at end of file diff --git a/src/rc_gpfs/policy/convert-to-parquet.py b/src/rc_gpfs/policy/convert-to-parquet.py deleted file mode 100755 index 575b512..0000000 --- a/src/rc_gpfs/policy/convert-to-parquet.py +++ /dev/null @@ -1,122 +0,0 @@ -from urllib.parse import unquote -import os -import re -import argparse -import pandas as pd -import gzip -from pathlib import PurePath - -desc = """ -Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n -Works with data from /data/user, /data/user/home, /data/project, and /scratch -The data are parsed and the directory beneath any of those listed above is set as the top-level -directory ('tld'). -""" - -def parse_args(): - parser = argparse.ArgumentParser(description=desc, - formatter_class=argparse.RawTextHelpFormatter - ) - parser.add_argument('-p','--policy',help="Print the policy the script uses as a template and exit",action='store_true') - parser.add_argument('-o','--output-dir',help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to input_file_dir/parquet") - parser.add_argument('-f','--file',help="Log file from mmlspolicy run to be converted to parquet. Can be either a full log or just a part") - args = parser.parse_args() - return args - -def parse_line(line): - try: - ul = unquote(line).strip() - ul = re.sub(r'[\n\t]','',ul) - details,path = re.match(r'^[^|]+\|(.*)\| -- (.*)$', ul).groups() - - d = dict([re.match(r'([\w]+)=(.*)',l).groups() for l in details.split('|')]) - - grp = re.match(r'(?:/data/user(?:/home)?/|/data/project/|/scratch/)([^/]+)',path) - if grp: - tld = grp.group(1) - else: - tld = None - - d.update({'path': path, - 'tld': tld}) - return d - except: - return line - -def print_policy(): - policy = """ -/* list of files to include */ -define( include_list, - (PATH_NAME LIKE 'FILEPATH%') -) - -/* define access_age */ -define(access_age, - (DAYS(CURRENT_TIMESTAMP) - DAYS(ACCESS_TIME)) -) - -RULE 'gather-exec' EXTERNAL LIST 'gather-info' EXEC '' OPTS 'JOBID' ESCAPE '%' -RULE 'list-path' LIST 'gather-info' - SHOW ('|size=' || varchar(FILE_SIZE) || - '|kballoc='|| varchar(KB_ALLOCATED) || - '|access=' || varchar(ACCESS_TIME) || - '|create=' || varchar(CREATION_TIME) || - '|modify=' || varchar(MODIFICATION_TIME) || - '|uid=' || varchar(USER_ID) || - '|gid=' || varchar(GROUP_ID) || - '|heat=' || varchar(FILE_HEAT) || - '|pool=' || varchar(POOL_NAME) || - '|mode=' || varchar(MODE) || - '|misc=' || varchar(MISC_ATTRIBUTES) || - '|' - ) - WHERE include_list -""" - print(policy) - return - -schema = { - 'size': 'int64', - 'kballoc': 'int64', - 'access': 'datetime64[ns]', - 'create': 'datetime64[ns]', - 'modify': 'datetime64[ns]', - 'uid': 'int64', - 'gid': 'int64', - 'heat': 'str', - 'pool': 'str', - 'mode': 'str', - 'misc': 'str', - 'path': 'str', - 'tld': 'str' -} - -def main(): - args = parse_args() - if args.policy: - print_policy() - exit() - - if args.file: - file = PurePath(args.file) - else: - exit('Error: must specify a file to convert') - - if not args.output_dir: - outdir = file.parent.joinpath('parquet') - else: - outdir = PurePath(args.output_dir) - - os.makedirs(outdir,exist_ok=True) - - #bag = db.read_text(file).map(parse_line) - with gzip.open(file,'r') as f: - dicts = [parse_line(l) for l in f] - df = pd.DataFrame.from_dict(dicts).sort_values('tld') - df = df.astype(schema) - - outname = file.with_suffix('.parquet').name - df.to_parquet(outdir.joinpath(outname),engine = 'pyarrow') - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py new file mode 100755 index 0000000..a3f7ad9 --- /dev/null +++ b/src/rc_gpfs/policy/convert.py @@ -0,0 +1,68 @@ +from urllib.parse import unquote +import re +import pandas as pd +import gzip +from pathlib import Path +from .utils import as_path +from .policy_defs import SCHEMA + +def parse_line(line): + try: + ul = unquote(line).strip() + ul = re.sub(r'[\n\t]','',ul) + details,path = re.match(r'^[^|]+\|(.*)\| -- (.*)$', ul).groups() + + d = dict([re.match(r'([\w]+)=(.*)',l).groups() for l in details.split('|')]) + + grp = re.match(r'(?:/data/user(?:/home)?/|/data/project/|/scratch/)([^/]+)',path) + if grp: + tld = grp.group(1) + else: + tld = None + + d.update({'path': path, + 'tld': tld}) + return d + except: + return line + +def convert( + input_file: str | Path, + output_dir: str | Path | None = None, + output_name: str | Path | None = None + ) -> None: + """ + Converts a GPFS log file to parquet format. The data schema assumes the same policy definition from list-path-external and list-path-dirplus. + + Parameters + ---------- + input_file : str | Path + Path to the file to convert. Can be either compressed or uncompressed. + output_dir : str | Path | None, optional + Directory path in which to store the file. If the directory does not exist, it will be created. If None, the output directory will be set to ./input_file/../../parquet in accordance with our standard organization. By default None + output_name : str | Path | None, optional + Name of the output file. The name will be automatically appended with .parquet. If None, the name of the input file will be used. By default None + """ + + input_file = as_path(input_file) + + if output_dir is not None: + output_dir = as_path(output_dir) + else: + output_dir = input_file.parent.parent.joinpath('parquet') + + if output_name is None: + output_name = input_file.with_suffix('.parquet').name + else: + output_name = as_path(output_name).with_suffix('.parquet').name + + output_path = output_dir.joinpath(output_name) + + output_dir.mkdir(mode=0o2770, exist_ok=True) + + with gzip.open(input_file,'r') as f: + dicts = [parse_line(l) for l in f] + df = pd.DataFrame.from_dict(dicts).sort_values('tld') + df = df.astype(SCHEMA) + + df.to_parquet(output_path,engine = 'pyarrow') \ No newline at end of file diff --git a/src/rc_gpfs/policy/policy_defs.py b/src/rc_gpfs/policy/policy_defs.py new file mode 100644 index 0000000..5f2f320 --- /dev/null +++ b/src/rc_gpfs/policy/policy_defs.py @@ -0,0 +1,15 @@ +SCHEMA = { + 'size': 'int64', + 'kballoc': 'int64', + 'access': 'datetime64[ns]', + 'create': 'datetime64[ns]', + 'modify': 'datetime64[ns]', + 'uid': 'int64', + 'gid': 'int64', + 'heat': 'str', + 'pool': 'str', + 'mode': 'str', + 'misc': 'str', + 'path': 'str', + 'tld': 'str' +} \ No newline at end of file diff --git a/src/rc_gpfs/policy/run-convert-to-parquet.sh b/src/rc_gpfs/policy/run-convert-to-parquet.sh deleted file mode 100755 index c729a08..0000000 --- a/src/rc_gpfs/policy/run-convert-to-parquet.sh +++ /dev/null @@ -1,133 +0,0 @@ -#!/bin/bash - -set -euo pipefail - -############################################################ -# Default Values # -############################################################ - -ntasks=1 -mem="16G" -time="02:00:00" -partition="amd-hdr100" -outdir="" - -############################################################ -# Help # -############################################################ -usage() -{ ->&2 cat << EOF -Usage: $0 [ -h ] [ -o | --outdir ] [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ] gpfs_logdir" -EOF -exit 1 -} - -help() -{ ->&2 cat << EOF -Submits an array job to convert parts of a GPFS log to parquet format -Syntax: $0 [ -h ] [ -o | --outdir ] [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ] gpfs_logdir - -options: - -h|--help Print this Help. - -Required: - gpfs_log_dir Directory containing GPFS log outputs - -Path: - -o|--outdir Directory to save parquet files to - -sbatch options: - -n|--ntasks Number of tasks for each array index (default: 1) - -p|--partition Partition to submit tasks to (default: amd-hdr100) - -t|--time Max walltime (default: 02:00:00) - -m|--mem Memory for each task (default: 16G) -EOF -exit 0 -} - -args=$(getopt -a -o ho:n:p:t:m: --long help,outdir:,ntasks:,partition:,time:,mem: -- "$@") -if [[ $? -gt 0 ]]; then - usage -fi - -eval set -- ${args} - -while : -do - case $1 in - -h | --help) help ;; - -o | --outdir) outdir=$2 ; shift 2 ;; - -n | --ntasks) ntasks=$2 ; shift 2 ;; - -p | --partition) partition=$2 ; shift 2 ;; - -t | --time) time=$2 ; shift 2 ;; - -m | --mem) mem=$2 ; shift 2 ;; - --) shift; break ;; - *) >&2 echo Unsupported option: $1 - usage ;; - esac -done - -if [[ $# -eq 0 ]]; then - usage -fi - -gpfs_logdir="$1" - -# Ensure gpfs_logdir is set -if [[ -z "$gpfs_logdir" ]]; then - echo "Error: gpfs_logdir is a required positional argument." - exit 1 -fi - -# If outdir not set, set to ${gpfs_logdir}/parquet -if [[ -z "$outdir" ]]; then - outdir="${gpfs_logdir}/parquet" -fi - -nlogs=$(ls ${gpfs_logdir}/list-* | wc -l) - -cmd="python3 convert-to-parquet.py -o ${outdir} -f \${log}" - ->&2 cat << EOF --------------------------------------------------------------------------------- -output dir: ${outdir} -GPFS logs: ${gpfs_logdir} - -ntasks: ${ntasks} -partition: ${partition} -time: ${time} -mem: ${mem} - -command: ${cmd} --------------------------------------------------------------------------------- -EOF - -mkdir -p out -mkdir -p err - -############################################################ -# Create Array Job Script # -############################################################ - -{ cat | sbatch; } << EOF -#!/bin/bash -# -#SBATCH --job-name=parquet-list-%a -#SBATCH --ntasks=${ntasks} -#SBATCH --partition=${partition} -#SBATCH --time=${time} -#SBATCH --mem=${mem} -#SBATCH --output=out/%A_%a.out -#SBATCH --error=err/%A_%a.err -#SBATCH --array=1-${nlogs} - -source /data/rc/gpfs-policy/venv/bin/activate - -log=\$(ls ${gpfs_logdir}/list-* | awk "NR==\${SLURM_ARRAY_TASK_ID} { print \$1 }") - -${cmd} -EOF - -exit 0 \ No newline at end of file diff --git a/src/rc_gpfs/policy/split-info-file.sh b/src/rc_gpfs/policy/split-info-file.sh deleted file mode 100755 index 56572e1..0000000 --- a/src/rc_gpfs/policy/split-info-file.sh +++ /dev/null @@ -1,144 +0,0 @@ -#!/bin/bash - -set -euxo pipefail - -############################################################ -# Default Values # -############################################################ - -ntasks=4 -mem="16G" -time="12:00:00" -partition="amd-hdr100" -lines=5000000 -outdir="" - -############################################################ -# Help # -############################################################ -usage() -{ ->&2 cat << EOF -Usage: $0 [ -h ] [ -l | --lines ] [ -o | --outdir ] - [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ] - log -EOF -exit 0 -} - -help() -{ -# Display Help ->&2 cat << EOF -Splits a GPFS policy log into multiple parts for batch array processing -Usage: $0 [ -h ] [ -l | --lines ] [ -o | --outdir ] - [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ] - log - -General: - -h|--help Print this help. - -Required: - log Path to the log file to split - -Split Parameters: - -l|--lines Max number of records to save in each split (default: 5000000) - -File Parameters: - -o|--outdir Directory path to store split files in. Defaults to log.d in log's parent directory. - -Job Parameters: - -n|--ntasks Number of job tasks (default: 4) - -p|--partition Partition to submit tasks to (default: amd-hdr100) - -t|--time Max walltime (default: 12:00:00) - -m|--mem Memory (default: 16G) -EOF -exit 0 -} - -args=$(getopt -a -o hl:o:n:p:t:m: --long help,lines:,outdir:,ntasks:,partition:,time:,mem: -- "$@") -if [[ $? -gt 0 ]]; then - usage -fi - -eval set -- ${args} - -while : -do - case $1 in - -h | --help) help ;; - -l | --lines) lines=$2 ; shift 2 ;; - -o | --outdir) outdir=$2 ; shift 2 ;; - -n | --ntasks) ntasks=$2 ; shift 2 ;; - -p | --partition) partition=$2 ; shift 2 ;; - -t | --time) time=$2 ; shift 2 ;; - -m | --mem) mem=$2 ; shift 2 ;; - --) shift; break ;; - *) >&2 echo Unsupported option: $1 - usage ;; - esac -done - -if [[ $# -eq 0 ]]; then - usage -fi - -log=$1 - -if [[ -z "${log}" ]]; then - echo "Log path is required" - usage -fi - -if [[ -z "${outdir}" ]]; then - outdir="$(readlink -f ${log}).d" -fi - -prefix=${outdir}/list- - -split_cmd="cat ${log} | split -a 3 -d -l ${lines} - ${prefix}" -zip_cmd="ls ${prefix}* | xargs -i -P 0 bash -c 'gzip {} && echo {} done'" - -if [[ $(file -b --mime-type ${log}) == *'gzip'* ]]; then - split_cmd="z${split_cmd}" -fi - ->&2 cat << EOF --------------------------------------------------------------------------------- -GPFS log: ${log} -Output Directory ${outdir} -Lines per File: ${lines} - -ntasks: ${ntasks} -partition: ${partition} -time: ${time} -mem: ${mem} - -split cmd: ${split_cmd} -zip cmd: ${zip_cmd} --------------------------------------------------------------------------------- -EOF - -mkdir -p ${outdir} -mkdir -p out -mkdir -p err - -############################################################ -# Create Array Job Script # -############################################################ - -{ cat | sbatch; } << EOF -#!/bin/bash -# -#SBATCH --job-name=split-gpfs-log -#SBATCH --ntasks=${ntasks} -#SBATCH --partition=${partition} -#SBATCH --time=${time} -#SBATCH --mem=${mem} -#SBATCH --output=out/%A.out -#SBATCH --error=err/%A.err - -${split_cmd} -${zip_cmd} -EOF - diff --git a/src/rc_gpfs/policy/split.py b/src/rc_gpfs/policy/split.py new file mode 100644 index 0000000..ac5fe69 --- /dev/null +++ b/src/rc_gpfs/policy/split.py @@ -0,0 +1,85 @@ +from pathlib import Path +import subprocess +from .utils import as_path +from ..utils import parse_scontrol + +__all__ = ['split','compress_logs'] + +def is_gz(filepath) -> bool: + with open(filepath, 'rb') as f: + return f.read(2) == b'\x1f\x8b' + +def pigz_exists() -> bool: + print("INFO: Checking for pigz") + proc = subprocess.run('which pigz',shell=True,text=True,capture_output=True) + + if proc.returncode == 1: + print("WARNING: pigz was not found on the PATH. Defaulting to slower zcat. Install pigz or add it to PATH to improve performance") + return False + else: + print(f"INFO: pigz found at {proc.stdout.strip()}") + return True + +def split(log: str | Path, output_dir: str | Path | None = None, + lines: int = 5e6, prefix: str = 'list-', compress: bool = True,**kwargs) -> None: + """ + Split a raw GPFS log file into smaller chunks. These chunks can be converted to parquet format to create a full parquet dataset for parallel analysis. Chunks are optionally recompressed after being split from the raw log. + + NOTE: pigz is the preferred tool for decompressing the raw log file due to its parallelization compared to gzip. pigz will be used as the default decompression tool if found on the PATH, otherwise gzip will be used. Install or make available pigz to improve splitting performance. + + Parameters + ---------- + log : str | Path + Path to the raw log file. Can be either compressed or uncompressed. + output_dir : str | Path | None, optional + Path to the output directory to store the chunks. If the directory does not exist, it will be created. If None, the path defaults to ./log/../../chunks in accordance with our standard organization scheme. By default None + lines : int, optional + Number of lines to split the log file by. By default 5e6 + prefix : str, optional + String to prefix the individual chunk files with. Each file name will have the form '<prefix>XXX' where XXX is an incrementing index assigned by the 'split' command. By default 'list-' + compress : bool, optional + If True, compress each chunk using gzip. By default True + """ + log = as_path(log) + + if output_dir is None: + output_dir = log.parent.parent.joinpath('chunks') + else: + output_dir = as_path(output_dir) + output_dir.mkdir(exist_ok=True,mode=0o2770) + + out = output_dir.joinpath(prefix) + + if is_gz(log): + cat = 'pigz -dc' if pigz_exists() else 'zcat' + else: + cat = 'cat' + + split_cmd = f"{cat} {log} | split -a 3 -d -l {int(lines)} - {out}" + + subprocess.run(split_cmd,shell=True,text=True) + + if compress: + compress_logs(output_dir) + pass + +def compress_logs(log_dir: str | Path) -> None: + """ + Compress raw logs using gzip in parallel. If a directory contains both uncompressed and compressed logs, the compressed logs will be ignored. + + Parameters + ---------- + log_dir : str | Path + Path to the directory containing the raw logs. + """ + nproc,_ = parse_scontrol() + log_dir = as_path(log_dir) + + logs = [str(p) for p in log_dir.glob('*[!gz]')] + log_str = '\n'.join(logs) + + print(f"INFO: {len(logs)} logs found. Beginning compression") + + zip_cmd = f"echo '{log_str}' | xargs -I {{}} -P {nproc} bash -c 'gzip {{}}'" + subprocess.run(zip_cmd, shell=True) + pass \ No newline at end of file diff --git a/src/rc_gpfs/policy/utils.py b/src/rc_gpfs/policy/utils.py new file mode 100644 index 0000000..bd89f7d --- /dev/null +++ b/src/rc_gpfs/policy/utils.py @@ -0,0 +1,6 @@ +from pathlib import Path + +def as_path(s: str | Path) -> Path: + if not isinstance(s,Path): + s = Path(s) + return s \ No newline at end of file diff --git a/src/rc_gpfs/utils.py b/src/rc_gpfs/utils.py index d109d9a..2c205b9 100644 --- a/src/rc_gpfs/utils.py +++ b/src/rc_gpfs/utils.py @@ -1,4 +1,7 @@ from typing import Literal +import os +import re +import subprocess # ENH: if this package becomes merged with noctua, need to replace this function since it's copied directly from there def convert_si(value: str | float | int, @@ -60,3 +63,16 @@ def convert_si(value: str | float | int, converted_value = base_value / unit_multipliers[to_unit] return converted_value + +def parse_scontrol(): + job_id = os.getenv('SLURM_JOB_ID') + + command = f"scontrol show job {job_id} | grep TRES=" + result = subprocess.run(command, shell=True, capture_output=True, text=True).stdout.strip() + + tres_pattern=r'.*cpu=(?P<cores>[\d]+),mem=(?P<mem>[\d]+[KMGT]?).*' + cores,mem = re.search(tres_pattern,result).groupdict().values() + + cores = int(cores) + mem = convert_si(mem,to_unit='G',use_binary=True) + return [cores,mem] \ No newline at end of file -- GitLab