From 098c6a152edc08eca863cebff44e774765fbc377 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Fri, 20 Dec 2024 13:48:07 -0600 Subject: [PATCH 01/31] Fix pip repo tag. Was trying to install pip for py 3.13.0rc1 --- deps.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/deps.yml b/deps.yml index 46cce6c..b0e1f0e 100644 --- a/deps.yml +++ b/deps.yml @@ -179,7 +179,7 @@ dependencies: - pexpect=4.9.0=pyhd8ed1ab_1 - pickleshare=0.7.5=pyhd8ed1ab_1004 - pillow=11.0.0=py311h49e9ac3_0 - - pip=24.3.1=pyh145f28c_1 + - pip=24.3.1=pyh8b19718_2 - platformdirs=4.3.6=pyhd8ed1ab_1 - prompt-toolkit=3.0.48=pyha770c72_1 - psutil=6.1.0=py311h9ecbd09_0 @@ -207,6 +207,7 @@ dependencies: - rich=13.9.4=pyhd8ed1ab_1 - rmm=24.10.00=cuda12_py311_241009_g3223f841_0 - s2n=1.5.10=hb5b8611_0 + - setuptools=75.6.0=pyhff2d567_1 - six=1.17.0=pyhd8ed1ab_0 - snappy=1.2.1=h8bd8927_1 - sortedcontainers=2.4.0=pyhd8ed1ab_0 @@ -223,6 +224,7 @@ dependencies: - tzdata=2024b=hc8b5060_0 - urllib3=2.2.3=pyhd8ed1ab_1 - wcwidth=0.2.13=pyhd8ed1ab_1 + - wheel=0.45.1=pyhd8ed1ab_1 - xorg-libxau=1.0.12=hb9d3cd8_0 - xorg-libxdmcp=1.1.5=hb9d3cd8_0 - xyzservices=2024.9.0=pyhd8ed1ab_1 -- GitLab From a67fef04ba577dc4599ea02832ae2b4429649e84 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Fri, 20 Dec 2024 15:28:13 -0600 Subject: [PATCH 02/31] add dask-cudf dependency --- deps.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/deps.yml b/deps.yml index b0e1f0e..d4b8f7a 100644 --- a/deps.yml +++ b/deps.yml @@ -62,6 +62,7 @@ dependencies: - dask=2024.9.0=pyhd8ed1ab_0 - dask-core=2024.9.0=pyhd8ed1ab_0 - dask-cuda=24.10.00=py311_241009_g4e45758_0 + - dask-cudf=24.10.01=cuda12_py311_241009_g7b0adfa253_0 - dask-expr=1.1.14=pyhd8ed1ab_0 - debugpy=1.8.11=py311hfdbb021_0 - decorator=5.1.1=pyhd8ed1ab_1 -- GitLab From 3812d048a98a6db40fbbaafdb97af098442fa394 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Mon, 23 Dec 2024 14:10:20 -0600 Subject: [PATCH 03/31] add function for converting flat parquet structure to hive for easier aggregation --- src/rc_gpfs/policy/convert.py | 61 ++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index a3f7ad9..c41b034 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -1,10 +1,20 @@ from urllib.parse import unquote +import os import re import pandas as pd import gzip from pathlib import Path +from typing import Literal +import random +import string +import shutil + +import dask.dataframe as dd +import dask.config + from .utils import as_path from .policy_defs import SCHEMA +from ..compute.backend import infer_cuda def parse_line(line): try: @@ -65,4 +75,53 @@ def convert( df = pd.DataFrame.from_dict(dicts).sort_values('tld') df = df.astype(SCHEMA) - df.to_parquet(output_path,engine = 'pyarrow') \ No newline at end of file + df.to_parquet(output_path,engine = 'pyarrow') + + +def convert_flat_to_hive( + parquet_path: str | Path, + hive_path: str | Path, + tld: str | list[str] | None = None, + staging_path: str | Path | None = None, + partition_size: str = '100MiB', + with_cuda: bool | Literal['infer'] = 'infer' + ) -> None: + parquet_path = as_path(parquet_path) + hive_path = as_path(hive_path) + + if staging_path is None: + rand_str = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) + staging_path = Path(os.getenv('TMPDIR')).joinpath(os.getenv('USER'),f'hive-{rand_str}') + print(f"INFO: Using {staging_path} as temporary directory") + else: + staging_path = as_path(staging_path) + + hive_path.mkdir(exist_ok=True,parents=True) + staging_path.mkdir(exist_ok=True,parents=True) + + if with_cuda == 'infer': + with_cuda = infer_cuda() + + if with_cuda and dask.config.get('dataframe.backend') != 'cuda': + dask.config.set({'dataframe.backend':'cudf'}) + + def indexed_name(ind): + return f"indexed-{ind}.parquet" + + if tld is not None: + if not isinstance(tld,list): + tld = [tld] + predicates = [[('tld','==',g)] for g in tld] + else: + predicates = None + print(f"DEBUG: Filtering predicates are: {predicates}") + + acq = re.search(r'[\d]{4}-[\d]{2}-[\d]{2}',str(parquet_path)).group(0) + print(f"DEBUG: Acquisition date is {acq}") + + df = dd.read_parquet(parquet_path,filters=predicates) + df = df.set_index('path',sort=True).repartition(partition_size=partition_size).assign(acq=acq) + df.to_parquet(staging_path,partition_on=['tld','acq'],name_function = indexed_name) + + shutil.copytree(staging_path,hive_path,dirs_exist_ok=True) + pass \ No newline at end of file -- GitLab From c917f74c38fa7b4bf6b5a85bdf1d7db83c226e4d Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Mon, 23 Dec 2024 14:21:09 -0600 Subject: [PATCH 04/31] add example log scripts --- example-job-scripts/convert-logs.sh | 19 +++++++++++++++++++ example-job-scripts/split-logs.sh | 19 +++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 example-job-scripts/convert-logs.sh create mode 100644 example-job-scripts/split-logs.sh diff --git a/example-job-scripts/convert-logs.sh b/example-job-scripts/convert-logs.sh new file mode 100644 index 0000000..a3f68db --- /dev/null +++ b/example-job-scripts/convert-logs.sh @@ -0,0 +1,19 @@ +#! /bin/bash +# +#SBATCH --job-name=convert +#SBATCH --ntasks=1 +#SBATCH --cpus-per-task=1 +#SBATCH --mem=8G +#SBATCH --partition=amd-hdr100 +#SBATCH --time=02:00:00 +#SBATCH --output=out/convert-%a.out +#SBATCH --error=out/convert-%a.err +#SBATCH --array=0-37 + +module load Anaconda3 +conda activate gpfs + +logs=($(find /data/rc/gpfs-policy/data -path "*/list-policy_data-project_list-path-external_slurm-30*/chunks")) +log=${logs[${SLURM_ARRAY_TASK_ID}]} + +convert-to-parquet --batch ${log} \ No newline at end of file diff --git a/example-job-scripts/split-logs.sh b/example-job-scripts/split-logs.sh new file mode 100644 index 0000000..7622e02 --- /dev/null +++ b/example-job-scripts/split-logs.sh @@ -0,0 +1,19 @@ +#! /bin/bash +# +#SBATCH --job-name=split +#SBATCH --ntasks=1 +#SBATCH --cpus-per-task=24 +#SBATCH --mem=8G +#SBATCH --partition=amd-hdr100 +#SBATCH --time=02:00:00 +#SBATCH --output=out/split-%a.out +#SBATCH --error=out/split-%a.err +#SBATCH --array=0-35 + +module load Anaconda3 +conda activate gpfs-dev + +logs=($(find /data/rc/gpfs-policy/data -path "*/list-policy_data-project_list-path-external_slurm-30*/raw/*.gz")) +log=${logs[${SLURM_ARRAY_TASK_ID}]} + +split-log ${log} \ No newline at end of file -- GitLab From 17a9c3bd9627e99082d181b49ca00004e32eb69f Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Mon, 23 Dec 2024 14:21:22 -0600 Subject: [PATCH 05/31] clean and update gitignore --- .gitignore | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index e979021..f9e7191 100644 --- a/.gitignore +++ b/.gitignore @@ -1,14 +1,27 @@ +# Ignore paths to actual GPFS logs data -local-data/ -joblogs/ + +# Ignore all potential Slurm outputs from running jobs slurm-* out/ err/ -*.sif + +# Ignore cache directories __pycache__ + +# Ignore quarto outputs quarto* -cufile.log *.html general-report_files + +# Ignore CUDA and dask logs +cufile.log +output.log +rmm_log.txt +dask-logs/* + +# Ignore poetry configuration poetry.toml + +# Ignore local vscode config .vscode \ No newline at end of file -- GitLab From 8613604f3ed720857ddb58f320b6a64114514bbf Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Mon, 23 Dec 2024 16:16:05 -0600 Subject: [PATCH 06/31] reorganize module imports --- src/rc_gpfs/policy/convert.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index c41b034..c8db3fb 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -1,14 +1,14 @@ -from urllib.parse import unquote import os import re -import pandas as pd import gzip -from pathlib import Path -from typing import Literal import random import string import shutil +from pathlib import Path +from typing import Literal +from urllib.parse import unquote +import pandas as pd import dask.dataframe as dd import dask.config -- GitLab From 8cdf94f05e1e377cb9a24ee53e587f1e2e116c13 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Mon, 23 Dec 2024 16:17:08 -0600 Subject: [PATCH 07/31] add logic for automatically choosing a dataframe backend based on available compute resources --- src/rc_gpfs/policy/convert.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index c8db3fb..e818427 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -102,8 +102,14 @@ def convert_flat_to_hive( if with_cuda == 'infer': with_cuda = infer_cuda() - if with_cuda and dask.config.get('dataframe.backend') != 'cuda': + if with_cuda: + import dask_cudf as backend + from dask_cudf.core import from_cudf as from_local dask.config.set({'dataframe.backend':'cudf'}) + else: + import dask as backend + from dask.dataframe import from_pandas as from_local + dask.config.set({'dataframe.backend':'pandas'}) def indexed_name(ind): return f"indexed-{ind}.parquet" -- GitLab From 85f3f1fda88f31edecd00244fe42f0a2df783a18 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Mon, 23 Dec 2024 16:17:49 -0600 Subject: [PATCH 08/31] simplify predicate logic to use 'in' instead of many ORs --- src/rc_gpfs/policy/convert.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index e818427..523090f 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -117,7 +117,7 @@ def convert_flat_to_hive( if tld is not None: if not isinstance(tld,list): tld = [tld] - predicates = [[('tld','==',g)] for g in tld] + predicates = [('tld','in',tld)] else: predicates = None print(f"DEBUG: Filtering predicates are: {predicates}") -- GitLab From 91033678f216226814f0b5f8fbca22bebc74d17a Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Mon, 23 Dec 2024 16:19:01 -0600 Subject: [PATCH 09/31] convert between local and distributed dataframe backends depending on the task to improve performance --- src/rc_gpfs/policy/convert.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index 523090f..8d53aa3 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -125,9 +125,15 @@ def convert_flat_to_hive( acq = re.search(r'[\d]{4}-[\d]{2}-[\d]{2}',str(parquet_path)).group(0) print(f"DEBUG: Acquisition date is {acq}") - df = dd.read_parquet(parquet_path,filters=predicates) - df = df.set_index('path',sort=True).repartition(partition_size=partition_size).assign(acq=acq) - df.to_parquet(staging_path,partition_on=['tld','acq'],name_function = indexed_name) + # The flat parquet is initially read in via dask to avoid reading the full dataset into memory which happens even + # when including predicates for filtering. The dask dataframe is converted to a regular dataframe to drastically + # improve indexing and sorting by removing partitions. The sorted dataframe is converted back to a dask dataframe + # to create partitions within the parquet dataset and write to multiple files defined by those partitions. + ddf = backend.read_parquet(parquet_path,filters=predicates) + df = ddf.compute() + df = df.set_index('path').sort_index().assign(acq=acq) + ddf = from_local(df).repartition(partition_size=partition_size) + ddf.to_parquet(staging_path,partition_on=['tld','acq'],name_function = indexed_name) shutil.copytree(staging_path,hive_path,dirs_exist_ok=True) pass \ No newline at end of file -- GitLab From 5025d7ee52832b390a9bf71982ff96ae19e5f9e7 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Mon, 23 Dec 2024 16:19:17 -0600 Subject: [PATCH 10/31] remove the staging directory after everything is finished --- src/rc_gpfs/policy/convert.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index 8d53aa3..7d1511f 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -136,4 +136,5 @@ def convert_flat_to_hive( ddf.to_parquet(staging_path,partition_on=['tld','acq'],name_function = indexed_name) shutil.copytree(staging_path,hive_path,dirs_exist_ok=True) + shutil.rmtree(staging_path) pass \ No newline at end of file -- GitLab From fc0fba24d102601c1ab7cb92c8d6acd6570577bc Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Mon, 23 Dec 2024 18:56:08 -0600 Subject: [PATCH 11/31] wip: add the initial commands for converting flat parquet to hive --- src/rc_gpfs/cli/convert_flat_to_hive.py | 75 +++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 src/rc_gpfs/cli/convert_flat_to_hive.py diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py new file mode 100644 index 0000000..934a844 --- /dev/null +++ b/src/rc_gpfs/cli/convert_flat_to_hive.py @@ -0,0 +1,75 @@ +import argparse +import subprocess +from pathlib import Path +from .utils import define_python_interpreter,gpu_batch_parser +from ..policy import convert_flat_to_hive +from ..utils import parse_scontrol + +DESCRIPTION = """ +Converts flat parquet GPFS datasets to a hive format partitioned by tld and log acquisition date. This essentially creates a timeseries of structured datasets for each tld for much easier more efficient log comparisons within tld. Each file path is set as the index and sorted, and all final output parquets are partitioned to have similar in-memory sizes. + +Batch Processing +---------------- + +This script can be used to convert a number of tlds directly or to automatically process all tlds in a batch array job. When submitting a batch job, tlds will be grouped by their estimated memory usage over the entire parquet dataset, roughly related to the number of rows belonging to any given tld. A tld taking up more space in the log will be grouped with fewer (or no) other tlds than a tld taking up a smaller space. The max size of a group in memory is editable by the user. + +This method was implemented to account for GPFS logs being larger than memory in most cases, especially when using GPUs for processing. Sorting along an index is critical for cross-dataset join performance and must be done for efficient storage of parquet files within each parquet directory. However, this initial sort requires the full dataset to be loaded into memory which is not possible on our current hardware for very large logs while leaving memory available for actual processing. + +When submitting a batch array, be cognizant of the amount of memory available to your job, either RAM for CPU tasks or VRAM for GPU tasks. For example, on an 80 GiB A100, setting an upper group limit of 40 GiB would suffice in most cases. This will minimize the number of array tasks while ensuring enough memory is free for processing. +""" + +def parse_args(): + parser = argparse.ArgumentParser( + description=DESCRIPTION, + formatter_class=argparse.RawTextHelpFormatter, + parents=[gpu_batch_parser()] + ) + parser.add_argument('input', + type=Path, + help="Path to a directory containing a flat parquet dataset.") + + parser.add_argument('hive', + type=Path, + help="Parent directory for the hive. This can be either a new directory or an existing hive directory. If this is an existing hive dataset, new data will be appended to the old data. This will not alter any existing data provided the tld or date of acquisition differ from the existing hive data.") + parser.add_argument('--tld',) + + args = parser.parse_args() + return vars(args) + +BATCH_SCRIPT = """\ +#!/bin/bash +# +#SBATCH --job-name=hivize +#SBATCH --ntasks={ntasks} +#SBATCH --partition={partition} +#SBATCH --time={time} +#SBATCH --mem={mem} +#SBATCH --output={output_log} +#SBATCH --error={error_log} +#SBATCH --array=1-{ngroups} + +{env_cmd} + +tld={tld} + +convert-flat-to-hive -o {hive_dir} {parquet} +""" + +def setup_slurm_logs(slurm_log_dir): + slurm_log_dir = slurm_log_dir.absolute() + slurm_log_dir.mkdir(exist_ok = True,parents=True,mode = 0o2770) + out_log,err_log = [slurm_log_dir.joinpath('convert_%A_%a.out'),slurm_log_dir.joinpath('convert_%A_%a.err')] + slurm_logs = {'output_log':out_log,'error_log':err_log} + return slurm_logs + +def submit_batch(**kwargs): + env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env')) + kwargs.update({"env_cmd":env_cmd}) + + slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir')) + kwargs.update(slurm_logs) + + script = BATCH_SCRIPT.format(**kwargs) + + subprocess.run(['sbatch'],input=script,shell=True,text=True) + pass \ No newline at end of file -- GitLab From d4dd7699b3c9f94649be2f0e3f23dff1edadde27 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Mon, 23 Dec 2024 18:57:54 -0600 Subject: [PATCH 12/31] add functions for creating variable argument parsers for Slurm batch options. This should reduce code duplication by declaring the Slurm options in a single location and building on them with command-specific arguments --- src/rc_gpfs/cli/utils.py | 42 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/src/rc_gpfs/cli/utils.py b/src/rc_gpfs/cli/utils.py index 8115f44..91a9e2c 100644 --- a/src/rc_gpfs/cli/utils.py +++ b/src/rc_gpfs/cli/utils.py @@ -1,3 +1,4 @@ +import argparse import sys import os from pathlib import Path @@ -18,4 +19,43 @@ def define_python_interpreter(python_path=None, conda_env=None): else: parent = Path(sys.executable).absolute().parent env = venv_base.format(python_path=parent.joinpath('activate')) - return env \ No newline at end of file + return env + +class CustomHelpFormatter(argparse.MetavarTypeHelpFormatter): + def add_arguments(self, actions): + # Sort actions by their group title + actions = sorted(actions, key=lambda x: x.container.title if x.container.title else '') + super(CustomHelpFormatter, self).add_arguments(actions) + +def base_batch_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(add_help=False) + return parser + +def add_common_arguments( + parser: argparse.ArgumentParser, + cpus_per_task: int, + gpus: int, + partition: str, + mem: str +) -> None: + slurm = parser.add_argument_group(title='Slurm Options') + slurm.add_argument('--batch', action='store_true', default=False, + help="Convert as a batch array job.") + slurm.add_argument('-n', '--ntasks', type=int, default=1) + slurm.add_argument('-c', '--cpus-per-task', type=int, default=cpus_per_task) + slurm.add_argument('-g', '--gpus', type=int, default=gpus, choices=[0, 1]) + slurm.add_argument('-p', '--partition', type=str, default=partition) + slurm.add_argument('-t', '--time', type=str, default='12:00:00') + slurm.add_argument('-m', '--mem', type=str, default=mem) + slurm.add_argument('--slurm-log-dir', type=Path, default='./out', + help='Output log directory. If the directory does not exist, it will be created automatically.') + +def cpu_batch_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(add_help=False, parents=[base_batch_parser()], formatter_class=CustomHelpFormatter) + add_common_arguments(parser, cpus_per_task=1, gpus=0, partition='amd-hdr100', mem='16G') + return parser + +def gpu_batch_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(add_help=False, parents=[base_batch_parser()], formatter_class=CustomHelpFormatter) + add_common_arguments(parser, cpus_per_task=16, gpus=1, partition='amperenodes', mem='80G') + return parser \ No newline at end of file -- GitLab From a0f6469fb925597a845347eedc1dec5261a7ddf7 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Mon, 23 Dec 2024 18:58:09 -0600 Subject: [PATCH 13/31] add convert_flag_to_hive to import --- src/rc_gpfs/policy/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rc_gpfs/policy/__init__.py b/src/rc_gpfs/policy/__init__.py index 754c706..8a2cfe5 100644 --- a/src/rc_gpfs/policy/__init__.py +++ b/src/rc_gpfs/policy/__init__.py @@ -1,2 +1,2 @@ from .split import split,compress_logs -from .convert import convert \ No newline at end of file +from .convert import convert, convert_flat_to_hive \ No newline at end of file -- GitLab From bbe413b55a94fa48e55a794839d110797b7909cb Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Wed, 1 Jan 2025 01:25:07 -0600 Subject: [PATCH 14/31] add extra to gitignore --- .gitignore | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index f9e7191..c5d8ecd 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,7 @@ dask-logs/* poetry.toml # Ignore local vscode config -.vscode \ No newline at end of file +.vscode + +# Ignore random extra files +extra/ \ No newline at end of file -- GitLab From e625450eaea49eb79324773315eb0cd356f16d8f Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Wed, 1 Jan 2025 01:26:50 -0600 Subject: [PATCH 15/31] add conversion of flat parquet to hive structure as CLI functionality --- pyproject.toml | 1 + src/rc_gpfs/cli/__init__.py | 1 + src/rc_gpfs/cli/convert_flat_to_hive.py | 101 ++++++++++++++++++++++-- 3 files changed, 95 insertions(+), 8 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f7f43d5..1edf54b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ url="https://pypi.nvidia.com" priority = "supplemental" [tool.poetry.scripts] +convert-to-hive = "rc_gpfs.cli:convert_flat_to_hive" convert-to-parquet = "rc_gpfs.cli:convert_to_parquet" split-log = "rc_gpfs.cli:split_log" diff --git a/src/rc_gpfs/cli/__init__.py b/src/rc_gpfs/cli/__init__.py index fcbd7b1..305d2af 100644 --- a/src/rc_gpfs/cli/__init__.py +++ b/src/rc_gpfs/cli/__init__.py @@ -1,2 +1,3 @@ +from .convert_flat_to_hive import convert_flat_to_hive from .convert_to_parquet import convert_to_parquet from .split_log import split_log \ No newline at end of file diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py index 934a844..926074e 100644 --- a/src/rc_gpfs/cli/convert_flat_to_hive.py +++ b/src/rc_gpfs/cli/convert_flat_to_hive.py @@ -1,9 +1,14 @@ import argparse import subprocess from pathlib import Path + +import dask.dataframe as dd +import dask.config +dask.config.set({'dataframe.backend':'cudf'}) +from dask.diagnostics import ProgressBar + from .utils import define_python_interpreter,gpu_batch_parser -from ..policy import convert_flat_to_hive -from ..utils import parse_scontrol +from ..policy import hivize DESCRIPTION = """ Converts flat parquet GPFS datasets to a hive format partitioned by tld and log acquisition date. This essentially creates a timeseries of structured datasets for each tld for much easier more efficient log comparisons within tld. Each file path is set as the index and sorted, and all final output parquets are partitioned to have similar in-memory sizes. @@ -24,14 +29,22 @@ def parse_args(): formatter_class=argparse.RawTextHelpFormatter, parents=[gpu_batch_parser()] ) - parser.add_argument('input', + parser.add_argument('parquet_path', type=Path, help="Path to a directory containing a flat parquet dataset.") - parser.add_argument('hive', + parser.add_argument('hive_path', type=Path, help="Parent directory for the hive. This can be either a new directory or an existing hive directory. If this is an existing hive dataset, new data will be appended to the old data. This will not alter any existing data provided the tld or date of acquisition differ from the existing hive data.") - parser.add_argument('--tld',) + parser.add_argument('--tld', + type=str, + help='Comma-separated list of tld to convert to hive') + parser.add_argument('--cutoff', + type=float, + default=30) + parser.add_argument('--grp-file', + type=Path, + help="Path to an existing group file for batch processing.") args = parser.parse_args() return vars(args) @@ -41,24 +54,26 @@ BATCH_SCRIPT = """\ # #SBATCH --job-name=hivize #SBATCH --ntasks={ntasks} +#SBATCH --cpus-per-task={cpus_per_task} #SBATCH --partition={partition} #SBATCH --time={time} #SBATCH --mem={mem} +#SBATCH --gres=gpu:{gpus} #SBATCH --output={output_log} #SBATCH --error={error_log} #SBATCH --array=1-{ngroups} {env_cmd} -tld={tld} +tld=$(sed -n "${{SLURM_ARRAY_TASK_ID}}p" {grp_file}) -convert-flat-to-hive -o {hive_dir} {parquet} +convert-to-hive --tld ${{tld}} {parquet_path} {hive_path} """ def setup_slurm_logs(slurm_log_dir): slurm_log_dir = slurm_log_dir.absolute() slurm_log_dir.mkdir(exist_ok = True,parents=True,mode = 0o2770) - out_log,err_log = [slurm_log_dir.joinpath('convert_%A_%a.out'),slurm_log_dir.joinpath('convert_%A_%a.err')] + out_log,err_log = [slurm_log_dir.joinpath('hive_%A_%a.out'),slurm_log_dir.joinpath('hive_%A_%a.err')] slurm_logs = {'output_log':out_log,'error_log':err_log} return slurm_logs @@ -72,4 +87,74 @@ def submit_batch(**kwargs): script = BATCH_SCRIPT.format(**kwargs) subprocess.run(['sbatch'],input=script,shell=True,text=True) + pass + +def split_into_groups(series, cutoff): + groups = [] + + while len(series.index) > 0: + current_group = [] + current_sum = 0 + for username, storage_size in series.items(): + if storage_size > cutoff: + groups.append({username}) + series = series.drop(username) + break + elif current_sum + storage_size <= cutoff: + current_group.append(username) + current_sum += storage_size + + series = series.drop(current_group) + if current_group: + groups.append(set(current_group)) + + return groups + +def calc_tld_mem(df): + mem = df.groupby('tld',observed=True).apply(lambda x: x.memory_usage(deep=True).sum()) + return mem + +def define_tld_groups(input,cutoff): + ddf = dd.read_parquet(input,columns = ['size','kballoc','access','create','modify','uid','gid','path','tld']) + with ProgressBar(): + tld_mem = ddf.map_partitions(calc_tld_mem).compute() + tld_mem = tld_mem.groupby(tld_mem.index).sum().divide(1024**3).to_pandas() + grps = split_into_groups(tld_mem,cutoff) + return grps + +def nested_list_to_log(nest,file): + """ + Writes a list of lists to a text log + + Args: + nest (list): A list of lists to be converted. + """ + with open(file, 'w', newline='') as f: + for l in nest: + f.write(f"{','.join(l)}\n") + +def convert_flat_to_hive(): + args = parse_args() + + if args.get('batch'): + if not args.get('grp_file'): + grps = define_tld_groups(args.get('parquet_path'),args.get('cutoff')) + + misc_path = args.get('parquet_path').parent.joinpath('misc','tld_grps.txt') + misc_path.parent.mkdir(exist_ok = True, parents = True) + nested_list_to_log(grps,misc_path) + ngroups = len(grps) + grp_file = str(misc_path) + args.update({'ngroups':ngroups, + 'grp_file':grp_file}) + else: + ngroups = sum(1 for line in open(args.get('grp_file'))) + args.update({'ngroups':ngroups}) + + submit_batch(**args) + + else: + tld = args.get('tld').split(',') + args.update({'tld':tld}) + hivize(**args) pass \ No newline at end of file -- GitLab From e78d450ac6864b850ff854fe13c921d3a4c1b627 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Wed, 1 Jan 2025 01:27:43 -0600 Subject: [PATCH 16/31] add start_local_cluster to all exports to simplify backend creation for some cases --- src/rc_gpfs/compute/backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rc_gpfs/compute/backend.py b/src/rc_gpfs/compute/backend.py index 5f8022e..ba8a86f 100644 --- a/src/rc_gpfs/compute/backend.py +++ b/src/rc_gpfs/compute/backend.py @@ -5,7 +5,7 @@ from .utils import * from ..utils import parse_scontrol from typing import Literal -__all__ = ['start_backend'] +__all__ = ['start_backend','start_local_cluster'] # ENH: Add default parameters for cluster creation based on defined type and available resources. For instance, creating a LocalCluster should default to using all available CPUs and all available RAM. class DaskClusterManager: -- GitLab From 99a3625f7c2d161ffb182894f1158963d95b4d7f Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Wed, 1 Jan 2025 01:28:26 -0600 Subject: [PATCH 17/31] change name of processing function to hivize to distinguish from the CLI function --- src/rc_gpfs/policy/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rc_gpfs/policy/__init__.py b/src/rc_gpfs/policy/__init__.py index 8a2cfe5..c24d1c3 100644 --- a/src/rc_gpfs/policy/__init__.py +++ b/src/rc_gpfs/policy/__init__.py @@ -1,2 +1,2 @@ from .split import split,compress_logs -from .convert import convert, convert_flat_to_hive \ No newline at end of file +from .convert import convert, hivize \ No newline at end of file -- GitLab From 8aa9b28e6b7a9be0664d5a0b0a09cc33a52f7915 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Wed, 1 Jan 2025 01:29:07 -0600 Subject: [PATCH 18/31] add force to make sure partitions can be expanded to the specified size, if necessary --- src/rc_gpfs/policy/convert.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index 7d1511f..5c2f7cd 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -132,7 +132,7 @@ def convert_flat_to_hive( ddf = backend.read_parquet(parquet_path,filters=predicates) df = ddf.compute() df = df.set_index('path').sort_index().assign(acq=acq) - ddf = from_local(df).repartition(partition_size=partition_size) + ddf = from_local(df).repartition(partition_size=partition_size,force=True) ddf.to_parquet(staging_path,partition_on=['tld','acq'],name_function = indexed_name) shutil.copytree(staging_path,hive_path,dirs_exist_ok=True) -- GitLab From dc004e13fa069091b450ea0f74dbc333c21d9c43 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Wed, 1 Jan 2025 01:31:12 -0600 Subject: [PATCH 19/31] remove heat, mode, misc, and pool to reduce memory usage and avoid OOM events on the GPU during index sorting --- src/rc_gpfs/policy/convert.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index 5c2f7cd..941e267 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -129,7 +129,7 @@ def convert_flat_to_hive( # when including predicates for filtering. The dask dataframe is converted to a regular dataframe to drastically # improve indexing and sorting by removing partitions. The sorted dataframe is converted back to a dask dataframe # to create partitions within the parquet dataset and write to multiple files defined by those partitions. - ddf = backend.read_parquet(parquet_path,filters=predicates) + ddf = backend.read_parquet(parquet_path,filters=predicates,columns = ['size','kballoc','access','create','modify','uid','gid','path','tld']) df = ddf.compute() df = df.set_index('path').sort_index().assign(acq=acq) ddf = from_local(df).repartition(partition_size=partition_size,force=True) -- GitLab From 2d88a7868e257f5b0ccf701e7e72f522e383a259 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Wed, 1 Jan 2025 01:33:23 -0600 Subject: [PATCH 20/31] change name of processing function to hivize to distinguish from the CLI function --- src/rc_gpfs/policy/convert.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index 941e267..a19e8a0 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -12,9 +12,9 @@ import pandas as pd import dask.dataframe as dd import dask.config -from .utils import as_path from .policy_defs import SCHEMA from ..compute.backend import infer_cuda +from ..utils import as_path def parse_line(line): try: @@ -78,13 +78,14 @@ def convert( df.to_parquet(output_path,engine = 'pyarrow') -def convert_flat_to_hive( +def hivize( parquet_path: str | Path, hive_path: str | Path, tld: str | list[str] | None = None, staging_path: str | Path | None = None, partition_size: str = '100MiB', - with_cuda: bool | Literal['infer'] = 'infer' + with_cuda: bool | Literal['infer'] = 'infer', + **kwargs ) -> None: parquet_path = as_path(parquet_path) hive_path = as_path(hive_path) -- GitLab From 936b7c1880e8b6100d39b94cd852275df256ba17 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Wed, 1 Jan 2025 01:33:58 -0600 Subject: [PATCH 21/31] move as_path to main utils since it is used in more than one module --- src/rc_gpfs/policy/split.py | 3 +-- src/rc_gpfs/policy/utils.py | 6 ------ src/rc_gpfs/utils.py | 8 +++++++- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/rc_gpfs/policy/split.py b/src/rc_gpfs/policy/split.py index ac5fe69..1438a20 100644 --- a/src/rc_gpfs/policy/split.py +++ b/src/rc_gpfs/policy/split.py @@ -1,7 +1,6 @@ from pathlib import Path import subprocess -from .utils import as_path -from ..utils import parse_scontrol +from ..utils import parse_scontrol,as_path __all__ = ['split','compress_logs'] diff --git a/src/rc_gpfs/policy/utils.py b/src/rc_gpfs/policy/utils.py index bd89f7d..e69de29 100644 --- a/src/rc_gpfs/policy/utils.py +++ b/src/rc_gpfs/policy/utils.py @@ -1,6 +0,0 @@ -from pathlib import Path - -def as_path(s: str | Path) -> Path: - if not isinstance(s,Path): - s = Path(s) - return s \ No newline at end of file diff --git a/src/rc_gpfs/utils.py b/src/rc_gpfs/utils.py index 2c205b9..dc54e47 100644 --- a/src/rc_gpfs/utils.py +++ b/src/rc_gpfs/utils.py @@ -2,6 +2,7 @@ from typing import Literal import os import re import subprocess +from pathlib import Path # ENH: if this package becomes merged with noctua, need to replace this function since it's copied directly from there def convert_si(value: str | float | int, @@ -75,4 +76,9 @@ def parse_scontrol(): cores = int(cores) mem = convert_si(mem,to_unit='G',use_binary=True) - return [cores,mem] \ No newline at end of file + return [cores,mem] + +def as_path(s: str | Path) -> Path: + if not isinstance(s,Path): + s = Path(s) + return s \ No newline at end of file -- GitLab From dab1fd3cd6e4063aff3f2d07159c938ad8ee30c1 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Wed, 1 Jan 2025 10:52:34 -0600 Subject: [PATCH 22/31] add batch_parser as parent to reduce clutter --- src/rc_gpfs/cli/split_log.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/src/rc_gpfs/cli/split_log.py b/src/rc_gpfs/cli/split_log.py index 6c37bde..76950bc 100644 --- a/src/rc_gpfs/cli/split_log.py +++ b/src/rc_gpfs/cli/split_log.py @@ -1,7 +1,7 @@ import argparse import subprocess from pathlib import Path -from .utils import define_python_interpreter +from .utils import define_python_interpreter,batch_parser from ..policy import split BATCH_SCRIPT = """\ @@ -24,7 +24,7 @@ split-log -o {output_dir} -l {lines} {log} def parse_args(): parser = argparse.ArgumentParser( description="Splits a GPFS policy log into multiple parts for batch array processing.", - formatter_class=argparse.RawTextHelpFormatter + parents=[batch_parser(cpus_per_task=24,time='02:00:00',mem='8G',gpus=0,partition='amd-hdr100,express')] ) parser.add_argument('log', type=Path, @@ -37,16 +37,6 @@ def parse_args(): parser.add_argument('-l', '--lines', type=int, default=int(5e6), help="Number of lines to split the log file by") - slurm = parser.add_argument_group(title='Slurm Options') - slurm.add_argument('--batch', action='store_true', default=False, - help="Run as a batch job. Otherwise use the current processing environment") - slurm.add_argument('-n','--cpus-per-task', type=int, default=24, - help="Number of cores assigned to the job. Ntasks is always set to 1") - slurm.add_argument('-p','--partition', type=str, default='amd-hdr100') - slurm.add_argument('-t','--time',type=str,default='02:00:00') - slurm.add_argument('-m','--mem',type=str,default='8G') - slurm.add_argument('--slurm-log-dir',type=Path,default='./out', - help='Output log directory. If the directory does not exist, it will be created automatically') args = parser.parse_args() return vars(args) -- GitLab From fcd806b9f51a3256a7fab52db1d32cef41edfaa5 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Wed, 1 Jan 2025 10:55:13 -0600 Subject: [PATCH 23/31] change to batch_parser and add defaults --- src/rc_gpfs/cli/convert_flat_to_hive.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py index 926074e..caaa89d 100644 --- a/src/rc_gpfs/cli/convert_flat_to_hive.py +++ b/src/rc_gpfs/cli/convert_flat_to_hive.py @@ -7,7 +7,7 @@ import dask.config dask.config.set({'dataframe.backend':'cudf'}) from dask.diagnostics import ProgressBar -from .utils import define_python_interpreter,gpu_batch_parser +from .utils import define_python_interpreter,batch_parser from ..policy import hivize DESCRIPTION = """ @@ -27,7 +27,7 @@ def parse_args(): parser = argparse.ArgumentParser( description=DESCRIPTION, formatter_class=argparse.RawTextHelpFormatter, - parents=[gpu_batch_parser()] + parents=[batch_parser(cpus_per_task=16, gpus=1, partition='amperenodes', mem='90G')] ) parser.add_argument('parquet_path', type=Path, -- GitLab From 33b90555840f49c0f389f1cbe88b561fabb15dac Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Wed, 1 Jan 2025 10:55:39 -0600 Subject: [PATCH 24/31] unify gpu and cpu batch parsers into single batch parser that takes arguments --- src/rc_gpfs/cli/utils.py | 34 +++++++++++++--------------------- 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/src/rc_gpfs/cli/utils.py b/src/rc_gpfs/cli/utils.py index 91a9e2c..e424827 100644 --- a/src/rc_gpfs/cli/utils.py +++ b/src/rc_gpfs/cli/utils.py @@ -27,17 +27,17 @@ class CustomHelpFormatter(argparse.MetavarTypeHelpFormatter): actions = sorted(actions, key=lambda x: x.container.title if x.container.title else '') super(CustomHelpFormatter, self).add_arguments(actions) -def base_batch_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(add_help=False) - return parser - -def add_common_arguments( - parser: argparse.ArgumentParser, - cpus_per_task: int, - gpus: int, - partition: str, - mem: str -) -> None: +def batch_parser( + cpus_per_task: int | None = None, + gpus: int | None = None, + partition: str | None = None, + mem: str | None = None, + time: str | None = '12:00:00', + reservation: str | None = None, + **kwargs +) -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(add_help=False, + formatter_class=CustomHelpFormatter) slurm = parser.add_argument_group(title='Slurm Options') slurm.add_argument('--batch', action='store_true', default=False, help="Convert as a batch array job.") @@ -45,17 +45,9 @@ def add_common_arguments( slurm.add_argument('-c', '--cpus-per-task', type=int, default=cpus_per_task) slurm.add_argument('-g', '--gpus', type=int, default=gpus, choices=[0, 1]) slurm.add_argument('-p', '--partition', type=str, default=partition) - slurm.add_argument('-t', '--time', type=str, default='12:00:00') + slurm.add_argument('-t', '--time', type=str, default=time) slurm.add_argument('-m', '--mem', type=str, default=mem) + slurm.add_argument('--reservation',type=str,default=reservation) slurm.add_argument('--slurm-log-dir', type=Path, default='./out', help='Output log directory. If the directory does not exist, it will be created automatically.') - -def cpu_batch_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(add_help=False, parents=[base_batch_parser()], formatter_class=CustomHelpFormatter) - add_common_arguments(parser, cpus_per_task=1, gpus=0, partition='amd-hdr100', mem='16G') - return parser - -def gpu_batch_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(add_help=False, parents=[base_batch_parser()], formatter_class=CustomHelpFormatter) - add_common_arguments(parser, cpus_per_task=16, gpus=1, partition='amperenodes', mem='80G') return parser \ No newline at end of file -- GitLab From 5a784d6738589903c5a11a25055b464a9c420260 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Wed, 1 Jan 2025 10:59:49 -0600 Subject: [PATCH 25/31] add slurm_log_dir options --- src/rc_gpfs/cli/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rc_gpfs/cli/utils.py b/src/rc_gpfs/cli/utils.py index e424827..530911c 100644 --- a/src/rc_gpfs/cli/utils.py +++ b/src/rc_gpfs/cli/utils.py @@ -34,6 +34,7 @@ def batch_parser( mem: str | None = None, time: str | None = '12:00:00', reservation: str | None = None, + slurm_log_dir: str | Path | None = './out', **kwargs ) -> argparse.ArgumentParser: parser = argparse.ArgumentParser(add_help=False, @@ -48,6 +49,6 @@ def batch_parser( slurm.add_argument('-t', '--time', type=str, default=time) slurm.add_argument('-m', '--mem', type=str, default=mem) slurm.add_argument('--reservation',type=str,default=reservation) - slurm.add_argument('--slurm-log-dir', type=Path, default='./out', + slurm.add_argument('--slurm-log-dir', type=Path, default=slurm_log_dir, help='Output log directory. If the directory does not exist, it will be created automatically.') return parser \ No newline at end of file -- GitLab From b1c129bfd18c6e9022f23b1d3268fe3237da1eb4 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Wed, 1 Jan 2025 11:00:07 -0600 Subject: [PATCH 26/31] switch to batch_parser to reduce argparse clutter --- src/rc_gpfs/cli/convert_to_parquet.py | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/src/rc_gpfs/cli/convert_to_parquet.py b/src/rc_gpfs/cli/convert_to_parquet.py index e684935..3d72495 100644 --- a/src/rc_gpfs/cli/convert_to_parquet.py +++ b/src/rc_gpfs/cli/convert_to_parquet.py @@ -2,7 +2,7 @@ import argparse import subprocess from pathlib import Path import multiprocessing -from .utils import define_python_interpreter +from .utils import define_python_interpreter,batch_parser from ..policy import convert from ..utils import parse_scontrol @@ -23,7 +23,7 @@ Local Parallel Processing: def parse_args(): parser = argparse.ArgumentParser( description=DESCRIPTION, - formatter_class=argparse.RawTextHelpFormatter, + parents=[batch_parser(partition='amd-hdr100,express',time='02:00:00',mem='16G',cpus_per_task=1)], epilog=EPILOGUE ) parser.add_argument('input', @@ -36,21 +36,6 @@ def parse_args(): help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../parquet") parser.add_argument('--pool-size',type=int,default=None, help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process") - - slurm = parser.add_argument_group(title='Slurm Options') - slurm.add_argument('--batch', action='store_true', default=False, - help="Run the conversion as a batch job. If a directory path is given as an input, an array job will be created to run the conversion in parallel.") - slurm.add_argument('-n','--ntasks', type=int, default=1) - slurm.add_argument('-p','--partition', type=str, default='amd-hdr100') - slurm.add_argument('-t','--time',type=str,default='02:00:00') - slurm.add_argument('-m','--mem',type=str,default='16G') - slurm.add_argument('--slurm-log-dir',type=Path, default='./out', - help='Output log directory. If the directory does not exist, it will be created automatically. Logs will be named convert_%%A_%%a to differentiate amongs job IDs and task IDs') - interpreter = slurm.add_mutually_exclusive_group() - interpreter.add_argument('--python-path',type=Path, - help="Path to Python interpreter to use for conversion. This interpreter should have access to a pandas library, preferably version >=2.0. If not specified, the path to the active python3 interpreter will be used.") - interpreter.add_argument('--conda-env',type=str,default=None, - help="The name or prefix of a conda environment to activate as opposed to a python3 path") args = parser.parse_args() return vars(args) -- GitLab From 838cd2f6f6af6be2ea137530970f7d0d42d73de6 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Wed, 1 Jan 2025 11:13:01 -0600 Subject: [PATCH 27/31] move setup_slurm_logs to utils since it's essentially the same for every CLI command --- src/rc_gpfs/cli/convert_flat_to_hive.py | 11 ++--------- src/rc_gpfs/cli/convert_to_parquet.py | 11 +++-------- src/rc_gpfs/cli/split_log.py | 8 +++----- src/rc_gpfs/cli/utils.py | 9 ++++++++- 4 files changed, 16 insertions(+), 23 deletions(-) diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py index caaa89d..f0e169f 100644 --- a/src/rc_gpfs/cli/convert_flat_to_hive.py +++ b/src/rc_gpfs/cli/convert_flat_to_hive.py @@ -7,7 +7,7 @@ import dask.config dask.config.set({'dataframe.backend':'cudf'}) from dask.diagnostics import ProgressBar -from .utils import define_python_interpreter,batch_parser +from .utils import define_python_interpreter,batch_parser,setup_slurm_logs from ..policy import hivize DESCRIPTION = """ @@ -70,18 +70,11 @@ tld=$(sed -n "${{SLURM_ARRAY_TASK_ID}}p" {grp_file}) convert-to-hive --tld ${{tld}} {parquet_path} {hive_path} """ -def setup_slurm_logs(slurm_log_dir): - slurm_log_dir = slurm_log_dir.absolute() - slurm_log_dir.mkdir(exist_ok = True,parents=True,mode = 0o2770) - out_log,err_log = [slurm_log_dir.joinpath('hive_%A_%a.out'),slurm_log_dir.joinpath('hive_%A_%a.err')] - slurm_logs = {'output_log':out_log,'error_log':err_log} - return slurm_logs - def submit_batch(**kwargs): env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env')) kwargs.update({"env_cmd":env_cmd}) - slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir')) + slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'hive') kwargs.update(slurm_logs) script = BATCH_SCRIPT.format(**kwargs) diff --git a/src/rc_gpfs/cli/convert_to_parquet.py b/src/rc_gpfs/cli/convert_to_parquet.py index 3d72495..c67e82b 100644 --- a/src/rc_gpfs/cli/convert_to_parquet.py +++ b/src/rc_gpfs/cli/convert_to_parquet.py @@ -2,7 +2,7 @@ import argparse import subprocess from pathlib import Path import multiprocessing -from .utils import define_python_interpreter,batch_parser +from .utils import define_python_interpreter,batch_parser,setup_slurm_logs from ..policy import convert from ..utils import parse_scontrol @@ -58,18 +58,13 @@ log=$(ls {input}/*.gz | awk "NR==${{SLURM_ARRAY_TASK_ID}} {{ print $1 }}") convert-to-parquet -o {output_dir} ${{log}} """ -def setup_slurm_logs(slurm_log_dir): - slurm_log_dir = slurm_log_dir.absolute() - slurm_log_dir.mkdir(exist_ok = True,parents=True,mode = 0o2770) - out_log,err_log = [str(slurm_log_dir.joinpath('convert_%A_%a.out')),str(slurm_log_dir.joinpath('convert_%A_%a.err'))] - slurm_logs = {'output_log':out_log,'error_log':err_log} - return slurm_logs + def submit_batch(**kwargs): env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env')) kwargs.update({"env_cmd":env_cmd}) - slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir')) + slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'parquet') kwargs.update(slurm_logs) script = BATCH_SCRIPT.format(**kwargs) diff --git a/src/rc_gpfs/cli/split_log.py b/src/rc_gpfs/cli/split_log.py index 76950bc..9117e2f 100644 --- a/src/rc_gpfs/cli/split_log.py +++ b/src/rc_gpfs/cli/split_log.py @@ -1,7 +1,7 @@ import argparse import subprocess from pathlib import Path -from .utils import define_python_interpreter,batch_parser +from .utils import define_python_interpreter,batch_parser,setup_slurm_logs from ..policy import split BATCH_SCRIPT = """\ @@ -44,10 +44,8 @@ def submit_batch(**kwargs): env_cmd = define_python_interpreter() kwargs.update({"env_cmd":env_cmd}) - kwargs.get('slurm_log_dir').mkdir(exist_ok=True,parents=True,mode = 0o2770) - output_log = kwargs.get('slurm_log_dir').joinpath('split.out') - error_log = kwargs.get('slurm_log_dir').joinpath('split.err') - kwargs.update({'output_log':output_log,'error_log':error_log}) + slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'split') + kwargs.update(slurm_logs) script = BATCH_SCRIPT.format(**kwargs) diff --git a/src/rc_gpfs/cli/utils.py b/src/rc_gpfs/cli/utils.py index 530911c..18cb8fb 100644 --- a/src/rc_gpfs/cli/utils.py +++ b/src/rc_gpfs/cli/utils.py @@ -51,4 +51,11 @@ def batch_parser( slurm.add_argument('--reservation',type=str,default=reservation) slurm.add_argument('--slurm-log-dir', type=Path, default=slurm_log_dir, help='Output log directory. If the directory does not exist, it will be created automatically.') - return parser \ No newline at end of file + return parser + +def setup_slurm_logs(slurm_log_dir,log_basename): + slurm_log_dir = slurm_log_dir.absolute() + slurm_log_dir.mkdir(exist_ok = True,parents=True,mode = 0o2770) + out_log,err_log = [str(slurm_log_dir.joinpath(f'{log_basename}_%A_%a.out')),str(slurm_log_dir.joinpath(f'{log_basename}_%A_%a.err'))] + slurm_logs = {'output_log':out_log,'error_log':err_log} + return slurm_logs \ No newline at end of file -- GitLab From ec9e451346695b13668b21499f807d0cfce95282 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Wed, 1 Jan 2025 12:01:23 -0600 Subject: [PATCH 28/31] added no-clobber options to easily skip datasets in batch jobs where preprocessing was already performed --- src/rc_gpfs/cli/convert_to_parquet.py | 10 ++++++++-- src/rc_gpfs/cli/split_log.py | 7 +++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/rc_gpfs/cli/convert_to_parquet.py b/src/rc_gpfs/cli/convert_to_parquet.py index c67e82b..f035843 100644 --- a/src/rc_gpfs/cli/convert_to_parquet.py +++ b/src/rc_gpfs/cli/convert_to_parquet.py @@ -1,3 +1,4 @@ +import sys import argparse import subprocess from pathlib import Path @@ -36,6 +37,8 @@ def parse_args(): help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../parquet") parser.add_argument('--pool-size',type=int,default=None, help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process") + parser.add_argument('--no-clobber', action='store_true',default=False, + help='When set and existing parquet files are found, immediately exits without any processing') args = parser.parse_args() return vars(args) @@ -76,8 +79,12 @@ def convert_to_parquet() -> None: args = parse_args() if args['output_dir'] is None: args['output_dir'] = args['input'].parent.joinpath('parquet') - + args['output_dir'].mkdir(exist_ok = True, mode = 0o2770) + + output_files_exist = len(list(args['output_dir'].glob('*.parquet'))) > 0 + if args['no_clobber'] and output_files_exist: + sys.exit('The output directory already contains parquet files. Exiting') if args['input'].is_file(): nlogs = 1 @@ -97,4 +104,3 @@ def convert_to_parquet() -> None: pool.starmap(convert, fargs) else: convert(args['input'],args['output_dir']) - pass diff --git a/src/rc_gpfs/cli/split_log.py b/src/rc_gpfs/cli/split_log.py index 9117e2f..c0cc64a 100644 --- a/src/rc_gpfs/cli/split_log.py +++ b/src/rc_gpfs/cli/split_log.py @@ -1,3 +1,4 @@ +import sys import argparse import subprocess from pathlib import Path @@ -36,6 +37,8 @@ def parse_args(): help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../chunks") parser.add_argument('-l', '--lines', type=int, default=int(5e6), help="Number of lines to split the log file by") + parser.add_argument('--no-clobber', action='store_true',default=False, + help='When set and existing split logs are found, immediately exits without any processing') args = parser.parse_args() return vars(args) @@ -59,6 +62,10 @@ def split_log(): args['output_dir'] = args['log'].parent.parent.joinpath('chunks') args['output_dir'].mkdir(exist_ok = True, mode = 0o2770) + output_files_exist = len(list(args['output_dir'].glob('*.gz'))) > 0 + if args['no_clobber'] and output_files_exist: + sys.exit('The output directory already contains split log files. Exiting') + if args.get('batch'): submit_batch(**args) else: -- GitLab From 622bce289302cdb5181a5ff7ebe6e820e46212c2 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Thu, 2 Jan 2025 15:15:48 -0600 Subject: [PATCH 29/31] add flush to immediately print to stdout in case of error. errors caused the buffer to be lost without printing debug statements --- src/rc_gpfs/policy/convert.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index a19e8a0..1414580 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -93,7 +93,7 @@ def hivize( if staging_path is None: rand_str = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) staging_path = Path(os.getenv('TMPDIR')).joinpath(os.getenv('USER'),f'hive-{rand_str}') - print(f"INFO: Using {staging_path} as temporary directory") + print(f"INFO: Using {staging_path} as temporary directory",flush=True) else: staging_path = as_path(staging_path) @@ -121,10 +121,10 @@ def hivize( predicates = [('tld','in',tld)] else: predicates = None - print(f"DEBUG: Filtering predicates are: {predicates}") + print(f"DEBUG: Filtering predicates are: {predicates}",flush=True) acq = re.search(r'[\d]{4}-[\d]{2}-[\d]{2}',str(parquet_path)).group(0) - print(f"DEBUG: Acquisition date is {acq}") + print(f"DEBUG: Acquisition date is {acq}",flush=True) # The flat parquet is initially read in via dask to avoid reading the full dataset into memory which happens even # when including predicates for filtering. The dask dataframe is converted to a regular dataframe to drastically -- GitLab From 093df9fb3b3805d089425e94952e2a0417f3980f Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Thu, 2 Jan 2025 15:17:06 -0600 Subject: [PATCH 30/31] add no-clobber option; add jobID back to slurm log names --- example-job-scripts/convert-logs.sh | 14 +++++++------- example-job-scripts/split-logs.sh | 10 +++++----- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/example-job-scripts/convert-logs.sh b/example-job-scripts/convert-logs.sh index a3f68db..f503652 100644 --- a/example-job-scripts/convert-logs.sh +++ b/example-job-scripts/convert-logs.sh @@ -4,16 +4,16 @@ #SBATCH --ntasks=1 #SBATCH --cpus-per-task=1 #SBATCH --mem=8G -#SBATCH --partition=amd-hdr100 +#SBATCH --partition=amd-hdr100,intel-dcb,express #SBATCH --time=02:00:00 -#SBATCH --output=out/convert-%a.out -#SBATCH --error=out/convert-%a.err -#SBATCH --array=0-37 +#SBATCH --output=out/convert-%A-%a.out +#SBATCH --error=out/convert-%A-%a.err +#SBATCH --array=0-49 module load Anaconda3 -conda activate gpfs +conda activate gpfs-dev -logs=($(find /data/rc/gpfs-policy/data -path "*/list-policy_data-project_list-path-external_slurm-30*/chunks")) +logs=($(find /data/rc/gpfs-policy/data -path "*/list-policy_data-project_list-path-external_slurm-*/chunks")) log=${logs[${SLURM_ARRAY_TASK_ID}]} -convert-to-parquet --batch ${log} \ No newline at end of file +convert-to-parquet --batch --no-clobber --partition=amd-hdr100,express,intel-dcb ${log} \ No newline at end of file diff --git a/example-job-scripts/split-logs.sh b/example-job-scripts/split-logs.sh index 7622e02..137fcd3 100644 --- a/example-job-scripts/split-logs.sh +++ b/example-job-scripts/split-logs.sh @@ -6,14 +6,14 @@ #SBATCH --mem=8G #SBATCH --partition=amd-hdr100 #SBATCH --time=02:00:00 -#SBATCH --output=out/split-%a.out -#SBATCH --error=out/split-%a.err -#SBATCH --array=0-35 +#SBATCH --output=out/split-%A-%a.out +#SBATCH --error=out/split-%A-%a.err +#SBATCH --array=0-49 module load Anaconda3 conda activate gpfs-dev -logs=($(find /data/rc/gpfs-policy/data -path "*/list-policy_data-project_list-path-external_slurm-30*/raw/*.gz")) +logs=($(find /data/rc/gpfs-policy/data -path "*/list-policy_data-project_list-path-external_slurm-*/raw/*.gz")) log=${logs[${SLURM_ARRAY_TASK_ID}]} -split-log ${log} \ No newline at end of file +split-log --no-clobber ${log} \ No newline at end of file -- GitLab From 3d75e34c6241afebae6038cde2c4dc39e81cfb80 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Thu, 2 Jan 2025 15:18:14 -0600 Subject: [PATCH 31/31] add control script to submit hive conversion to each found parquet directory for data-project --- example-job-scripts/convert-to-hive.sh | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 example-job-scripts/convert-to-hive.sh diff --git a/example-job-scripts/convert-to-hive.sh b/example-job-scripts/convert-to-hive.sh new file mode 100644 index 0000000..6178f1f --- /dev/null +++ b/example-job-scripts/convert-to-hive.sh @@ -0,0 +1,21 @@ +#! /bin/bash +# +#SBATCH --job-name=hive-setup +#SBATCH --ntasks=1 +#SBATCH --cpus-per-task=16 +#SBATCH --mem=90G +#SBATCH --partition=amperenodes +#SBATCH --time=02:00:00 +#SBATCH --reservation=rc-gpfs +#SBATCH --gres=gpu:1 +#SBATCH --output=out/hive-setup-%A-%a.out +#SBATCH --error=out/hive-setup-%A-%a.err +#SBATCH --array=0-49 + +module load Anaconda3 +conda activate gpfs-dev + +parquets=($(find /data/rc/gpfs-policy/data -path "*/list-policy_data-project_list-path-external_slurm-*/parquet")) +pq=${parquets[${SLURM_ARRAY_TASK_ID}]} + +convert-to-hive --batch ${pq} /scratch/mdefende/project-hive -- GitLab