diff --git a/poetry.lock b/poetry.lock index 02961ca52705cfe03743868fd762d95a17cc1c9c..24f58d2d829bfc926703c90db341920b82602bab 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,5 +1,18 @@ # This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. +[[package]] +name = "colorama" +version = "0.4.6" +description = "Cross-platform colored terminal text." +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +groups = ["main"] +markers = "sys_platform == \"win32\"" +files = [ + {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, + {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, +] + [[package]] name = "colormaps" version = "0.4.2" @@ -82,6 +95,18 @@ files = [ docs = ["Sphinx", "furo"] test = ["objgraph", "psutil"] +[[package]] +name = "iniconfig" +version = "2.1.0" +description = "brain-dead simple config-ini parsing" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760"}, + {file = "iniconfig-2.1.0.tar.gz", hash = "sha256:3abbd2e30b36733fee78f9c7f7308f2d0050e88f0087fd25c2645f63c773e1c7"}, +] + [[package]] name = "numpy" version = "2.2.5" @@ -175,6 +200,22 @@ files = [ packaging = "*" tenacity = ">=6.2.0" +[[package]] +name = "pluggy" +version = "1.5.0" +description = "plugin and hook calling mechanisms for python" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, + {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, +] + +[package.extras] +dev = ["pre-commit", "tox"] +testing = ["pytest", "pytest-benchmark"] + [[package]] name = "polars" version = "1.27.1" @@ -274,6 +315,42 @@ files = [ [package.extras] test = ["cffi", "hypothesis", "pandas", "pytest", "pytz"] +[[package]] +name = "pytest" +version = "8.3.5" +description = "pytest: simple powerful testing with Python" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "pytest-8.3.5-py3-none-any.whl", hash = "sha256:c69214aa47deac29fad6c2a4f590b9c4a9fdb16a403176fe154b79c0b4d4d820"}, + {file = "pytest-8.3.5.tar.gz", hash = "sha256:f4efe70cc14e511565ac476b57c279e12a855b11f48f212af1080ef2263d3845"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "sys_platform == \"win32\""} +iniconfig = "*" +packaging = "*" +pluggy = ">=1.5,<2" + +[package.extras] +dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] + +[[package]] +name = "pytest-datafiles" +version = "3.0.0" +description = "py.test plugin to create a 'tmp_path' containing predefined files/directories." +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "pytest-datafiles-3.0.0.tar.gz", hash = "sha256:a70c4c66a36d1cdcfc095607f04eee66eaef3fa64cbb62d60c47ce169901d1d4"}, + {file = "pytest_datafiles-3.0.0-py2.py3-none-any.whl", hash = "sha256:2176e10d3f6e76f358925a897e21e2bcc5a0170b92fac4e66ed055eaa2ca6a22"}, +] + +[package.dependencies] +pytest = ">=3.6" + [[package]] name = "sqlalchemy" version = "2.0.40" @@ -419,5 +496,5 @@ files = [ [metadata] lock-version = "2.1" -python-versions = ">=3.12" -content-hash = "b94c26465ac498c7656fafdf498f424379a1e882e5da6ca3a9dcac48b076b770" +python-versions = ">=3.12,<4.0" +content-hash = "b774e71210a804105de6c03976133f42953ebfc5be59ef0d2c6cc62c842a6372" diff --git a/pyproject.toml b/pyproject.toml index 62cb958abe3568216da745ee959c520a021029ba..0120b6f4a8fd720fa86ccb718ef2b30839b78eed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ maintainers = [ ] license = "AFL" readme = "README.md" -requires-python = ">=3.12" +requires-python = ">=3.12,<4.0" keywords = ["GPFS", "policy", "aggregation", "reporting"] dynamic = ["version","dependencies","classifiers"] @@ -33,12 +33,14 @@ classifiers = [ version = "0.0.0" [tool.poetry.dependencies] -python = ">=3.12" +python = ">=3.12,<4.0" colormaps = "*" numpy = "*" plotly = "^5.24.1" polars = ">=1.27.0" pyarrow = "^19.0.1" +pytest = "^8.3.5" +pytest-datafiles = "^3.0.0" sqlalchemy = "*" typeguard = "*" @@ -66,3 +68,10 @@ folders = [ requires = ["poetry-core>=2.0.0,<3.0.0","poetry-dynamic-versioning>=1.0.0,<2.0.0"] build-backend = "poetry_dynamic_versioning.backend" + +[tool.pytest.ini_options] +addopts = [ + "--import-mode=importlib", +] +required_plugins = ["pytest-datafiles>=3.0.0"] +testpaths = ["tests"] \ No newline at end of file diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 0000000000000000000000000000000000000000..10bc61b33a98623344329b56b0d07a195f11e0d9 --- /dev/null +++ b/ruff.toml @@ -0,0 +1,85 @@ +# Exclude a variety of commonly ignored directories. +exclude = [ + ".bzr", + ".direnv", + ".eggs", + ".git", + ".gitlab-ci.yml", + ".git-rewrite", + ".hg", + ".ipynb_checkpoints", + ".mypy_cache", + ".nox", + ".pants.d", + ".poetry", + ".pyenv", + ".pytest_cache", + ".pytype", + ".ruff_cache", + ".svn", + ".tox", + ".venv", + ".vscode", + "__pypackages__", + "_build", + "buck-out", + "build", + "dist", + "node_modules", + "site-packages", + "venv", + "poetry.toml", + "poetry.lock", + "test-data", + "legacy-scripts", + "extra", + "data" +] + +# Same as Black. +line-length = 88 +indent-width = 4 + +# Assume Python 3.13 +target-version = "py313" + +[lint] +# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. +# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or +# McCabe complexity (`C901`) by default. +select = ["E4", "E7", "E9", "F"] +ignore = [] + +# Allow fix for all enabled rules (when `--fix`) is provided. +fixable = ["ALL"] +unfixable = [] + +# Allow unused variables when underscore-prefixed. +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" + +[format] +# Like Black, use double quotes for strings. +quote-style = "double" + +# Like Black, indent with spaces, rather than tabs. +indent-style = "space" + +# Like Black, respect magic trailing commas. +skip-magic-trailing-comma = false + +# Like Black, automatically detect the appropriate line ending. +line-ending = "auto" + +# Enable auto-formatting of code examples in docstrings. Markdown, +# reStructuredText code/literal blocks and doctests are all supported. +# +# This is currently disabled by default, but it is planned for this +# to be opt-out in the future. +docstring-code-format = false + +# Set the line length limit used when formatting code snippets in +# docstrings. +# +# This only has an effect when the `docstring-code-format` setting is +# enabled. +docstring-code-line-length = "dynamic" \ No newline at end of file diff --git a/src/rc_gpfs/__init__.py b/src/rc_gpfs/__init__.py index 3ff8aafb283c19a37cefeefdd4979757d6a44c6c..52a1dfab1ab3ec32fb0ce6b45ee2ecaa1d518646 100644 --- a/src/rc_gpfs/__init__.py +++ b/src/rc_gpfs/__init__.py @@ -1 +1,3 @@ -from .__version__ import __version__, __version_tuple__ \ No newline at end of file +from .__version__ import __version__, __version_tuple__ + +__all__ = [__version__, __version_tuple__] \ No newline at end of file diff --git a/src/rc_gpfs/cli/__init__.py b/src/rc_gpfs/cli/__init__.py index c5e66dbcd6f50d016ed18fcb1183bb27ce008fb2..53cb54e73051f1ce1b9ef059c7b896663f64309e 100644 --- a/src/rc_gpfs/cli/__init__.py +++ b/src/rc_gpfs/cli/__init__.py @@ -1,4 +1,6 @@ from .convert_flat_to_hive import convert_flat_to_hive from .convert_to_parquet import convert_to_parquet +from .fparq_cli import fparq_cli from .split_log import split_log -from .fparq_cli import fparq_cli \ No newline at end of file + +__all__ = [convert_flat_to_hive, convert_to_parquet, fparq_cli, split_log] \ No newline at end of file diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py index 0fe64634dcc47dde28327efd60c8c8942754506e..64ee8822e933ea4655d701a48f36dceebc94e0ec 100644 --- a/src/rc_gpfs/cli/convert_flat_to_hive.py +++ b/src/rc_gpfs/cli/convert_flat_to_hive.py @@ -1,15 +1,17 @@ import argparse -import subprocess +import random import re +import subprocess import time -import random from pathlib import Path -import polars as pl from typing import List +import numpy as np +import polars as pl + from ..policy.hive import hivize -from .utils import define_python_interpreter,batch_parser_no_mem,setup_slurm_logs from ..utils import get_parquet_dataset_size +from .utils import batch_parser_no_mem, define_python_interpreter, setup_slurm_logs DESCRIPTION = """ Converts flat parquet GPFS datasets to a hive format partitioned by tld and log acquisition date. This creates a timeseries of structured datasets for each tld for much easier more efficient log comparisons within tld. Output parquets are sorted by the path column for convenience, but no index is set. @@ -19,45 +21,64 @@ Setting the --batch flag will create a Slurm array job where each task processes All processing is done via Polars and so can take advantage of parallel processing. Higher core counts can provide a limited benefit to performance, and a GPU is not required. """ + def parse_args(cli_args=None): parser = argparse.ArgumentParser( description=DESCRIPTION, formatter_class=argparse.RawTextHelpFormatter, - parents=[batch_parser_no_mem(cpus_per_task=8, gpus=0, partition='amd-hdr100',time='02:00:00')] + parents=[ + batch_parser_no_mem( + cpus_per_task=8, gpus=0, partition="amd-hdr100", time="02:00:00" + ) + ], + ) + parser.add_argument( + "parquet_path", + type=Path, + help="Path to a directory containing a flat parquet dataset.", + ) + parser.add_argument( + "hive_path", + type=Path, + help="Parent directory for the hive. This can be either a new directory or an existing hive directory. If this is an existing hive dataset, new data will be appended to the old data. This will not alter any existing data provided the tld or date of acquisition differ from the existing hive data.", + ) + parser.add_argument( + "--tld", + type=str, + help="Comma-separated list of tld to convert to hive. If not specified, all unique tld values in the dataset will be processed.", + ) + parser.add_argument( + "--partition-size", + dest="partition_chunk_size_bytes", + type=str, + default="200MiB", + help="Max size of in-memory data for each partition in a single hive dataset. Smaller partitions cause more files to be written. Can pass the byte size as an integer or as a human-readable byte string. For example, 1024 and 1KiB are equivalent.", + ) + parser.add_argument( + "--mem-factor", + type=int, + default=3, + help="Factor to scale each tld's estimated memory by when setting memory requirements in the Slurm job. A factor of 3 accounts for almost all cases. A lower factor can result in more OOM errors but would increase task throughput, and vice versa for a higher factor.", + ) + parser.add_argument( + "--no-clobber", + default=False, + action="store_true", + help="Flag to set whether contents of a hive cell will be overwritten. If True, the pipeline will exit if any parquet files are found in the cell directory. No processing will occur in that case. If False (default), any files existing in the cell directory will be removed prior to data writing.", + ) + parser.add_argument( + "--dry-run", + default=False, + action="store_true", + help="If set, no jobs will be submitted at the end. Tld grouping still occurs, and the text files with the groups are still written.", ) - parser.add_argument('parquet_path', - type=Path, - help="Path to a directory containing a flat parquet dataset.") - - parser.add_argument('hive_path', - type=Path, - help="Parent directory for the hive. This can be either a new directory or an existing hive directory. If this is an existing hive dataset, new data will be appended to the old data. This will not alter any existing data provided the tld or date of acquisition differ from the existing hive data.") - parser.add_argument('--tld', - type=str, - help='Comma-separated list of tld to convert to hive. If not specified, all unique tld values in the dataset will be processed.') - parser.add_argument('--partition-size', - dest='partition_chunk_size_bytes', - type=str, - default='200MiB', - help='Max size of in-memory data for each partition in a single hive dataset. Smaller partitions cause more files to be written. Can pass the byte size as an integer or as a human-readable byte string. For example, 1024 and 1KiB are equivalent.') - parser.add_argument('--mem-factor', - type=int, - default=3, - help="Factor to scale each tld's estimated memory by when setting memory requirements in the Slurm job. A factor of 3 accounts for almost all cases. A lower factor can result in more OOM errors but would increase task throughput, and vice versa for a higher factor.") - parser.add_argument('--no-clobber', - default=False, - action='store_true', - help="Flag to set whether contents of a hive cell will be overwritten. If True, the pipeline will exit if any parquet files are found in the cell directory. No processing will occur in that case. If False (default), any files existing in the cell directory will be removed prior to data writing.") - parser.add_argument('--dry-run', - default=False, - action='store_true', - help="If set, no jobs will be submitted at the end. Tld grouping still occurs, and the text files with the groups are still written.") args = parser.parse_args(cli_args) return vars(args) + SLURM_OPTS = """\ -#SBATCH --job-name=hivize-{mem} +#SBATCH --job-name=hivize #SBATCH --ntasks={ntasks} #SBATCH --cpus-per-task={cpus_per_task} #SBATCH --partition={partition} @@ -65,59 +86,82 @@ SLURM_OPTS = """\ #SBATCH --mem={mem} #SBATCH --output={output_log} #SBATCH --error={error_log} -#SBATCH --array=1-{ntlds} +#SBATCH --array=0-{ngrps} """ BATCH_CMDS = """\ {env_cmd} -tld=$(sed -n "${{SLURM_ARRAY_TASK_ID}}p" {tld_file}) +tld=$(cat {grp_dir}/tld-${{SLURM_ARRAY_TASK_ID}}.txt | paste -s -d ',') -convert-to-hive --tld ${{tld}} --partition-size={partition_chunk_size_bytes} {parquet_path} {hive_path} +convert-to-hive --tld=${{tld}} --partition-size={partition_chunk_size_bytes} {parquet_path} {hive_path} """ + def submit_batch(**kwargs): - env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env')) - kwargs.update({"env_cmd":env_cmd}) - - slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'hive') + env_cmd = define_python_interpreter( + kwargs.get("python_path"), kwargs.get("conda_env") + ) + kwargs.update({"env_cmd": env_cmd}) + + slurm_logs = setup_slurm_logs(kwargs.get("slurm_log_dir"), "hive") kwargs.update(slurm_logs) - + slurm_opts = SLURM_OPTS.format(**kwargs) - if kwargs.get('reservation') is not None: + if kwargs.get("reservation") is not None: slurm_opts = f"{slurm_opts}#SBATCH --reservation={kwargs.get('reservation')}" script = f"#!/bin/bash\n#\n{slurm_opts}\n{BATCH_CMDS.format(**kwargs)}" - if not kwargs['dry_run']: + if not kwargs["dry_run"]: # Wait between 1 and 5 seconds before batch submission. This helps avoid a situation where this setup is running # in a batch array job and all of the array tasks submit their child array jobs at the same time. That results # in jobs failing to be submitted due to overwhelming the scheduler with simultaneous requests. Adding a random # delay should fix that - time.sleep(random.uniform(1,5)) + time.sleep(random.uniform(1, 5)) - subprocess.run(['sbatch'],input=script,shell=True,text=True) + try: + subprocess.run( + ["sbatch"], + input=script, + shell=True, + text=True, + check=True, + capture_output=True, + ) + except subprocess.CalledProcessError as e: + raise subprocess.CalledProcessError(e.stderr.decode()) + else: + print(script) pass -def submit_tld_groups(df,grp_dir,args): - mem_grp = df[0,'req_mem_grp'] - - tld_file = grp_dir.joinpath(f"tld-{mem_grp}.txt") - - tlds = df["tld"].to_list() - - with open(tld_file,'wt') as f: - f.writelines('\n'.join(tlds)) - - args['ntlds'] = len(tlds) - args['tld_file'] = tld_file - args['mem'] = mem_grp - - submit_batch(**args) - pass + +def _calc_closest_pwr(val, base=2) -> int: + exp = np.max([np.ceil(np.emath.logn(n=base, x=val)), 0]) + return int(base**exp) + + +def _group_by_cumsum(series: pl.Series, cutoff: float) -> pl.Series: + # Compute cumulative sum + cumsum = series.cum_sum() + + # Determine group boundaries where cumulative sum exceeds cutoff + group = (cumsum / cutoff).floor().cast(pl.Int32) + + # Reset group counter when cumulative sum crosses cutoff + # This ensures each group's sum is <= cutoff + group_diff = group.diff().fill_null(0) + group_counter = group_diff.cum_sum() + + return group_counter + + +def group_by_required_mem(ser: pl.Series, mem_cutoff: int) -> pl.Series: + grps = _group_by_cumsum(ser, mem_cutoff).cast(pl.Int32) + return grps + def get_tld_row_counts(parquet_path: Path) -> pl.DataFrame: - tld_rows = ( pl.scan_parquet(parquet_path, parallel="prefiltered") .group_by("tld") @@ -126,14 +170,10 @@ def get_tld_row_counts(parquet_path: Path) -> pl.DataFrame: ) return tld_rows + def estimate_req_mem( - parquet_path: Path, - tld: List[str] | pl.Series, - mem_factor: int = 3 + parquet_path: Path, tld: List[str] | pl.Series, mem_factor: int = 3 ) -> pl.DataFrame: - mem_breaks = [8, 16, 32, 64] - mem_labels = ["8G", "16G", "32G", "64G", "128G"] - dataset_size = get_parquet_dataset_size(parquet_path) / (1024**3) tld_sizes = get_tld_row_counts(parquet_path) @@ -145,64 +185,83 @@ def estimate_req_mem( ) ) .with_columns((pl.col("est_size") * mem_factor).alias("est_req_mem")) - .with_columns( - ( - pl.col("est_req_mem") - .cut(breaks=mem_breaks, labels=mem_labels) - .alias("req_mem_grp") - ) - ) .filter(pl.col("tld").is_in(tld)) + .sort("est_req_mem") ) return tld_sizes + def convert_flat_to_hive(cli_args=None): args = parse_args(cli_args) - if args['tld'] is None: - tlds = ( - pl.scan_parquet(args['parquet_path'],cache=False) - .select('tld') + if args["tld"] is None: + args["tld"] = ( + pl.scan_parquet(args["parquet_path"], cache=False) + .select("tld") .unique() - .collect(engine='streaming') - .sort('tld') - .get_column('tld') + .collect(engine="streaming") + .sort("tld") + .get_column("tld") .to_list() ) else: - tlds = args['tld'].split(',') + args["tld"] = args["tld"].split(",") - if args['no_clobber']: - acq = re.search(r"\d{4}-\d{2}-\d{2}", str(args['parquet_path'].absolute())).group(0) + if args["no_clobber"]: + acq = re.search( + r"\d{4}-\d{2}-\d{2}", str(args["parquet_path"].absolute()) + ).group(0) - existing_acq_paths = [p for p in args['hive_path'].rglob(f'*/acq={acq}') if len(list(p.glob('*.parquet'))) > 0] - existing_tlds = [re.search(r"tld=([^/]+)/", str(p)).group(1) for p in existing_acq_paths] + existing_acq_paths = [ + p + for p in args["hive_path"].rglob(f"*/acq={acq}") + if len(list(p.glob("*.parquet"))) > 0 + ] + existing_tlds = [ + re.search(r"tld=([^/]+)/", str(p)).group(1) for p in existing_acq_paths + ] - args['tld'] = [t for t in tlds if t not in existing_tlds] + args["tld"] = [t for t in args["tld"] if t not in existing_tlds] - if len(args['tld']) == 0: - print("INFO: All tlds already exist, and no-clobber is set. Exiting without converting") + if len(args["tld"]) == 0: + print( + "INFO: All tlds already exist, and no-clobber is set. Exiting without converting" + ) return - - if args['batch']: - req_mem = estimate_req_mem(args['parquet_path'],args['tld'],args['mem_factor']) - - grp_dir = args["parquet_path"].parent.joinpath("misc") - grp_dir.mkdir(parents=True,exist_ok=True) - [f.unlink() for f in grp_dir.glob('tld*.txt')] - - req_mem_file = grp_dir.joinpath('tld_est_mem.parquet') + + if args["batch"]: + req_mem = estimate_req_mem( + args["parquet_path"], args["tld"], args["mem_factor"] + ) + + # Group sizes will be set either to the next largest power of 2 above the max estimated memory required for the + # largest tld or 16, whichever is larger + mem = max([_calc_closest_pwr(req_mem["est_req_mem"].max()), 16]) + args["mem"] = f"{mem}G" + + req_mem = req_mem.with_columns( + group_by_required_mem(pl.col("est_req_mem"), mem).alias("grp") + ) + + args["ngrps"] = req_mem["grp"].n_unique() - 1 + + args["grp_dir"] = args["parquet_path"].parent.joinpath("misc") + args["grp_dir"].mkdir(parents=True, exist_ok=True) + _ = [f.unlink() for f in args["grp_dir"].glob("tld*.txt")] + + for grp in req_mem["grp"].unique().to_list(): + grp_tlds = req_mem.filter(pl.col("grp").eq(grp))["tld"].to_list() + with open(args["grp_dir"].joinpath(f"tld-{grp}.txt"), "w") as f: + f.write("\n".join(grp_tlds)) + + req_mem_file = args["grp_dir"].joinpath("tld_est_mem.parquet") req_mem_file.unlink(missing_ok=True) req_mem.write_parquet(req_mem_file) - for grp, df in req_mem.group_by('req_mem_grp'): - print(f"INFO: Submitting array job for {grp[0]}",flush=True) - submit_tld_groups(df,grp_dir,args) - + submit_batch(**args) + else: - _ = args.pop('tld') - for tld in tlds: - hivize(**args,tld=tld) - pass \ No newline at end of file + hivize(**args) + pass diff --git a/src/rc_gpfs/cli/convert_to_parquet.py b/src/rc_gpfs/cli/convert_to_parquet.py index ac84448023782b1752affaaa69aac0dfdafd423c..ede25587ea22728cff8ae81cd89d8da7735124c3 100644 --- a/src/rc_gpfs/cli/convert_to_parquet.py +++ b/src/rc_gpfs/cli/convert_to_parquet.py @@ -1,15 +1,15 @@ import argparse -import time +import multiprocessing import random import subprocess +import time from pathlib import Path -import multiprocessing from ..policy.convert import convert, set_output_filename -from .utils import define_python_interpreter,batch_parser,setup_slurm_logs from ..utils import parse_scontrol +from .utils import batch_parser, define_python_interpreter, setup_slurm_logs -__all__ = ['convert_to_parquet'] +__all__ = ["convert_to_parquet"] DESCRIPTION = """ Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n @@ -23,27 +23,49 @@ Local Parallel Processing: If processing is done via a local parallel pool, the requested cores need to be accessible by the invoking Python process. When run in a Slurm job context where the number of cores were only specified with the --ntasks property, only 1 core will be available to the Python process regardless of the number of cores requested by the job. Instead, use the --cpus-per-task property to set the number of cores paired with --ntasks=1. This will correctly allow the parallel pool to utilize all cores assigned to the job. """ + def parse_args(arg_str=None): parser = argparse.ArgumentParser( description=DESCRIPTION, - parents=[batch_parser(partition='amd-hdr100,express',time='02:00:00',mem='16G',cpus_per_task=1)], - epilog=EPILOGUE + parents=[ + batch_parser( + partition="amd-hdr100,express", + time="02:00:00", + mem="16G", + cpus_per_task=1, + ) + ], + epilog=EPILOGUE, + ) + parser.add_argument( + "input", + type=Path, + help="Path to a log file or directory of log files from mmlspolicy to be converted to parquet. If a directory path is given, all files within the directory will be converted. If the --batch option is set, this will be done in a batch array job. Otherwise, it will be done in a local parallel pool using available compute resources.", + ) + + parser.add_argument( + "-o", + "--output-dir", + type=Path, + default=None, + help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../parquet", + ) + parser.add_argument( + "--pool-size", + type=int, + default=None, + help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process", + ) + parser.add_argument( + "--no-clobber", + action="store_true", + default=False, + help="When set, skips any log chunks that already have corresponding parquet files. Chunks without a parquet file are processed as normal.", ) - parser.add_argument('input', - type=Path, - help="Path to a log file or directory of log files from mmlspolicy to be converted to parquet. If a directory path is given, all files within the directory will be converted. If the --batch option is set, this will be done in a batch array job. Otherwise, it will be done in a local parallel pool using available compute resources.") - - parser.add_argument('-o','--output-dir', - type=Path, - default = None, - help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../parquet") - parser.add_argument('--pool-size',type=int,default=None, - help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process") - parser.add_argument('--no-clobber', action='store_true',default=False, - help='When set, skips any log chunks that already have corresponding parquet files. Chunks without a parquet file are processed as normal.') args = parser.parse_args(arg_str) return vars(args) + BATCH_SCRIPT = """\ #!/bin/bash # @@ -65,13 +87,16 @@ log=$(ls {input}/*.gz | sort | awk "NR==${{idx}} {{ print $1 }}") convert-to-parquet {no_clobber_opt} -o {output_dir} ${{log}} """ + def submit_batch(**kwargs): - env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env')) - kwargs.update({"env_cmd":env_cmd}) - - slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'parquet') + env_cmd = define_python_interpreter( + kwargs.get("python_path"), kwargs.get("conda_env") + ) + kwargs.update({"env_cmd": env_cmd}) + + slurm_logs = setup_slurm_logs(kwargs.get("slurm_log_dir"), "parquet") kwargs.update(slurm_logs) - + script = BATCH_SCRIPT.format(**kwargs) # Wait between 1 and 5 seconds before batch submission. This helps avoid a situation where this setup is running in @@ -80,9 +105,20 @@ def submit_batch(**kwargs): # fix that time.sleep(random.uniform(1, 5)) - subprocess.run(['sbatch'],input=script,shell=True,text=True) + try: + subprocess.run( + ["sbatch"], + input=script, + shell=True, + text=True, + check=True, + capture_output=True, + ) + except subprocess.CalledProcessError as e: + raise subprocess.CalledProcessError(e.stderr.decode()) pass + def _find_sequential_indexes(idxs): if not idxs: return [] @@ -110,67 +146,66 @@ def _find_sequential_indexes(idxs): return result + def _get_missing_indexes(chunks, parquets): missing_indexes = [ index for index, element in enumerate(chunks) if element not in parquets ] return missing_indexes + def convert_to_parquet() -> None: args = parse_args() - if args['output_dir'] is None: - if args['input'].is_file(): - args['output_dir'] = args['input'].parent.parent.joinpath('parquet') + if args["output_dir"] is None: + if args["input"].is_file(): + args["output_dir"] = args["input"].parent.parent.joinpath("parquet") else: args["output_dir"] = args["input"].parent.joinpath("parquet") - args['output_dir'].mkdir(exist_ok = True, mode = 0o2770) + args["output_dir"].mkdir(exist_ok=True, mode=0o2770) - if args['input'].is_file(): + if args["input"].is_file(): nlogs = 1 else: - logs = list(args.get('input').glob('*.gz')) + logs = list(args.get("input").glob("*.gz")) nlogs = len(logs) - + if args["no_clobber"]: args.update({"no_clobber_opt": "--no-clobber"}) - if args['input'].is_dir(): + if args["input"].is_dir(): chunks = logs chunks.sort() else: chunks = [args["input"]] - + pqs = [f.name for f in args["output_dir"].glob("*.parquet")] target_pqs = [set_output_filename(f) for f in chunks] - idxs_to_run = _get_missing_indexes(target_pqs,pqs) - + idxs_to_run = _get_missing_indexes(target_pqs, pqs) + if len(idxs_to_run) == 0: - print("INFO: All log chunks have been converted to parquet. Exiting without processing") + print( + "INFO: All log chunks have been converted to parquet. Exiting without processing" + ) return - + array_idxs = _find_sequential_indexes(idxs_to_run) - args['array_idxs'] = ','.join(array_idxs) + args["array_idxs"] = ",".join(array_idxs) else: - args.update( - { - "no_clobber_opt": "", - "array_idxs" : f"0-{nlogs-1}" - } - ) - - args.update({'nlogs':nlogs}) + args.update({"no_clobber_opt": "", "array_idxs": f"0-{nlogs - 1}"}) + + args.update({"nlogs": nlogs}) - if args['batch']: + if args["batch"]: submit_batch(**args) elif nlogs > 1: - ncpus,_ = parse_scontrol() - pool_size = args.get('pool_size',ncpus) + ncpus, _ = parse_scontrol() + pool_size = args.get("pool_size", ncpus) with multiprocessing.Pool(processes=pool_size) as pool: - fargs = list(zip(logs,[args['output_dir']]*nlogs)) + fargs = list(zip(logs, [args["output_dir"]] * nlogs)) pool.starmap(convert, fargs) else: - convert(args['input'],args['output_dir']) + convert(args["input"], args["output_dir"]) diff --git a/src/rc_gpfs/cli/fparq_cli.py b/src/rc_gpfs/cli/fparq_cli.py index be3a1c054158bdc3ae41ec966b6d5fefd6c123a7..a2ea26d7c9a8cdbb328c9549c3a25d5752a7be3d 100644 --- a/src/rc_gpfs/cli/fparq_cli.py +++ b/src/rc_gpfs/cli/fparq_cli.py @@ -12,59 +12,86 @@ Tiny and Large Files Options are included to specify minimum and maximum size cutoffs to define which files are included in the default grouping. These excluded files can then either be partitioned separately or not at all with another option. This was written to optimize functions such as parsyncfp2 which have built-in options to process tiny or large files differently. For example, parsyncfp2 will tar tiny files togather and transfer the tarball as well as chunk large files and transfer chunks concurrently. """ + def parse_args(): parser = argparse.ArgumentParser( - description=DESCRIPTION, - formatter_class=argparse.RawTextHelpFormatter + description=DESCRIPTION, formatter_class=argparse.RawTextHelpFormatter + ) + parser.add_argument( + "parquet_path", + type=Path, + help="Input path for the parquet GPFS dataset to chunk", + ) + parser.add_argument( + "-p", + "--partition-path", + type=Path, + default=None, + help="Path to write partition files. Defaults to ${{parquet_path}}/_partitions", + ) + parser.add_argument( + "-m", + "--max-part-size", + type=str, + default="50GiB", + help="Max combined size of all files in a partition. This can be specified either as a human-readable byte string (e.g. 10M[[i]B], 100G[[i]B]) or as a raw integer. Byte strings will be interpreted as base 2 (e.g 1kB is always 1024 bytes)", + ) + parser.add_argument( + "-f", + "--max-part-files", + type=int, + default=None, + help="Maximum number of files to include in any partition. Works with --max-size where all partitions meet both criteria", + ) + parser.add_argument( + "-t", + "--tiny-size", + type=str, + default=None, + help="Max size of file to be specified as 'tiny'. Tiny files are partitioned separately from other files by default. They can be excluded entirely using the --exclude-nonstandard flag", + ) + parser.add_argument( + "--max-tiny-part-size", + type=str, + default="1GiB", + help="Max partition size for tiny files", + ) + parser.add_argument( + "--max-tiny-part-files", + type=int, + default=250000, + help="Max number of files in a partition of tiny files", + ) + parser.add_argument( + "-b", + "--big-size", + type=str, + default=None, + help="Minimum file size to specified as 'big'. Files above this limit will be assigned to their own unique partition. This value is implicitly set to the max partition size. Setting this value above the max partition size would have no effect. These files can be excluded entirely using the --exclude-nonstandard flag", + ) + parser.add_argument( + "--exclude-nonstandard", + default=False, + action="store_true", + help="Exclude all tiny and big files from partitioning. Partitions will only include files between tiny-size and big-size.", ) - parser.add_argument('parquet_path', - type=Path, - help="Input path for the parquet GPFS dataset to chunk") - parser.add_argument('-p','--partition-path', - type=Path, - default=None, - help="Path to write partition files. Defaults to ${{parquet_path}}/_partitions") - parser.add_argument('-m','--max-part-size', - type=str, - default='50GiB', - help="Max combined size of all files in a partition. This can be specified either as a human-readable byte string (e.g. 10M[[i]B], 100G[[i]B]) or as a raw integer. Byte strings will be interpreted as base 2 (e.g 1kB is always 1024 bytes)") - parser.add_argument('-f','--max-part-files', - type=int, - default=None, - help="Maximum number of files to include in any partition. Works with --max-size where all partitions meet both criteria") - parser.add_argument('-t','--tiny-size', - type=str, - default=None, - help="Max size of file to be specified as 'tiny'. Tiny files are partitioned separately from other files by default. They can be excluded entirely using the --exclude-nonstandard flag") - parser.add_argument('--max-tiny-part-size', - type=str, - default='1GiB', - help="Max partition size for tiny files") - parser.add_argument('--max-tiny-part-files', - type=int, - default=250000, - help="Max number of files in a partition of tiny files") - parser.add_argument('-b','--big-size', - type=str, - default=None, - help="Minimum file size to specified as 'big'. Files above this limit will be assigned to their own unique partition. This value is implicitly set to the max partition size. Setting this value above the max partition size would have no effect. These files can be excluded entirely using the --exclude-nonstandard flag") - parser.add_argument('--exclude-nonstandard', - default=False, - action="store_true", - help="Exclude all tiny and big files from partitioning. Partitions will only include files between tiny-size and big-size.") args = parser.parse_args() return vars(args) + def fparq_cli(): args = parse_args() import polars as pl + from ..process import fparq - if args.get('partition_path') is None: - pq_path = args.get('parquet_path') - args.update({'partition_path': pq_path.joinpath('_partitions')}) + if args.get("partition_path") is None: + pq_path = args.get("parquet_path") + args.update({"partition_path": pq_path.joinpath("_partitions")}) - df = pl.read_parquet(args.get('parquet_path').joinpath('*.parquet'), columns=['path','size']) - fparq(df,**args) + df = pl.read_parquet( + args.get("parquet_path").joinpath("*.parquet"), columns=["path", "size"] + ) + fparq(df, **args) diff --git a/src/rc_gpfs/cli/split_log.py b/src/rc_gpfs/cli/split_log.py index 44c8461365465ba6bc529eb2f4642a29f2df1fa7..f8fc7c38a3ccd1c2fbb7c399f62ed4708f1ff26b 100644 --- a/src/rc_gpfs/cli/split_log.py +++ b/src/rc_gpfs/cli/split_log.py @@ -1,8 +1,9 @@ -import sys import argparse import subprocess +import sys from pathlib import Path -from .utils import define_python_interpreter,batch_parser,setup_slurm_logs + +from .utils import batch_parser, define_python_interpreter, setup_slurm_logs BATCH_SCRIPT = """\ #!/bin/bash @@ -21,53 +22,89 @@ BATCH_SCRIPT = """\ split-log -o {output_dir} -l {lines} {log} """ + def parse_args(): parser = argparse.ArgumentParser( description="Splits a GPFS policy log into multiple parts for batch array processing.", - parents=[batch_parser(cpus_per_task=24,time='02:00:00',mem='8G',gpus=0,partition='amd-hdr100,express')] + parents=[ + batch_parser( + cpus_per_task=24, + time="02:00:00", + mem="8G", + gpus=0, + partition="amd-hdr100,express", + ) + ], + ) + parser.add_argument( + "log", + type=Path, + help="Path to a raw GPFS log file. The log can be either uncompressed or gz compressed.", + ) + + parser.add_argument( + "-o", + "--output-dir", + type=Path, + default=None, + help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../chunks", + ) + parser.add_argument( + "-l", + "--lines", + type=int, + default=int(5e6), + help="Number of lines to split the log file by", + ) + parser.add_argument( + "--no-clobber", + action="store_true", + default=False, + help="When set and existing split logs are found, immediately exits without any processing", ) - parser.add_argument('log', - type=Path, - help="Path to a raw GPFS log file. The log can be either uncompressed or gz compressed.") - - parser.add_argument('-o','--output-dir', - type=Path, - default = None, - help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../chunks") - parser.add_argument('-l', '--lines', type=int, default=int(5e6), - help="Number of lines to split the log file by") - parser.add_argument('--no-clobber', action='store_true',default=False, - help='When set and existing split logs are found, immediately exits without any processing') args = parser.parse_args() return vars(args) + def submit_batch(**kwargs): env_cmd = define_python_interpreter() - kwargs.update({"env_cmd":env_cmd}) - - slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'split') + kwargs.update({"env_cmd": env_cmd}) + + slurm_logs = setup_slurm_logs(kwargs.get("slurm_log_dir"), "split") kwargs.update(slurm_logs) script = BATCH_SCRIPT.format(**kwargs) - subprocess.run(['sbatch'],input=script,shell=True,text=True) + try: + subprocess.run( + ["sbatch"], + input=script, + shell=True, + text=True, + check=True, + capture_output=True, + ) + except subprocess.CalledProcessError as e: + raise subprocess.CalledProcessError(e.stderr.decode()) pass + def split_log(): args = parse_args() - if args['output_dir'] is None: - args['output_dir'] = args['log'].parent.parent.joinpath('chunks') - args['output_dir'].mkdir(exist_ok = True, mode = 0o2770) + if args["output_dir"] is None: + args["output_dir"] = args["log"].parent.parent.joinpath("chunks") + args["output_dir"].mkdir(exist_ok=True, mode=0o2770) - output_files_exist = len(list(args['output_dir'].glob('*.gz'))) > 0 - if args['no_clobber'] and output_files_exist: - sys.exit('The output directory already contains split log files. Exiting') + output_files_exist = len(list(args["output_dir"].glob("*.gz"))) > 0 + if args["no_clobber"] and output_files_exist: + sys.exit("The output directory already contains split log files. Exiting") - if args.get('batch'): + if args.get("batch"): submit_batch(**args) else: from ..policy.split import split + split(**args) - pass \ No newline at end of file + pass diff --git a/src/rc_gpfs/cli/utils.py b/src/rc_gpfs/cli/utils.py index e879259e8cbf9581a229fbbaa2e96fbcf08be5d4..63326f2f92c7898454aa1adcb6e40f5f9ba88de9 100644 --- a/src/rc_gpfs/cli/utils.py +++ b/src/rc_gpfs/cli/utils.py @@ -1,32 +1,37 @@ import argparse -import sys import os +import sys from pathlib import Path + def define_python_interpreter(python_path=None, conda_env=None): conda_base = "module load Anaconda3\nconda activate {conda_env}" venv_base = "source {python_path}" if conda_env is not None: - env = conda_base.format(conda_env=conda_env) + env = conda_base.format(conda_env=conda_env) elif python_path is not None: parent = Path(python_path).absolute().parent - env = venv_base.format(python_path=parent.joinpath('activate')) + env = venv_base.format(python_path=parent.joinpath("activate")) else: - conda_env = os.environ.get('CONDA_PREFIX') + conda_env = os.environ.get("CONDA_PREFIX") if conda_env: - env = conda_base.format(conda_env=conda_env) + env = conda_base.format(conda_env=conda_env) else: parent = Path(sys.executable).absolute().parent - env = venv_base.format(python_path=parent.joinpath('activate')) + env = venv_base.format(python_path=parent.joinpath("activate")) return env + class CustomHelpFormatter(argparse.MetavarTypeHelpFormatter): def add_arguments(self, actions): # Sort actions by their group title - actions = sorted(actions, key=lambda x: x.container.title if x.container.title else '') + actions = sorted( + actions, key=lambda x: x.container.title if x.container.title else "" + ) super(CustomHelpFormatter, self).add_arguments(actions) + def batch_parser_no_mem( cpus_per_task: int | None = None, gpus: int | None = None, @@ -60,35 +65,49 @@ def batch_parser_no_mem( ) return parser + def batch_parser( - cpus_per_task: int | None = None, - gpus: int | None = None, - partition: str | None = None, - mem: str | None = None, - time: str | None = '12:00:00', - reservation: str | None = None, - slurm_log_dir: str | Path | None = './out', - **kwargs + cpus_per_task: int | None = None, + gpus: int | None = None, + partition: str | None = None, + mem: str | None = None, + time: str | None = "12:00:00", + reservation: str | None = None, + slurm_log_dir: str | Path | None = "./out", + **kwargs, ) -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(add_help=False, - formatter_class=CustomHelpFormatter) - slurm = parser.add_argument_group(title='Slurm Options') - slurm.add_argument('--batch', action='store_true', default=False, - help="Convert as a batch array job.") - slurm.add_argument('-n', '--ntasks', type=int, default=1) - slurm.add_argument('-c', '--cpus-per-task', type=int, default=cpus_per_task) - slurm.add_argument('-g', '--gpus', type=int, default=gpus, choices=[0, 1]) - slurm.add_argument('-p', '--partition', type=str, default=partition) - slurm.add_argument('-t', '--time', type=str, default=time) - slurm.add_argument('-m', '--mem', type=str, default=mem) - slurm.add_argument('--reservation',type=str,default=reservation) - slurm.add_argument('--slurm-log-dir', type=Path, default=slurm_log_dir, - help='Output log directory. If the directory does not exist, it will be created automatically.') + parser = argparse.ArgumentParser( + add_help=False, formatter_class=CustomHelpFormatter + ) + slurm = parser.add_argument_group(title="Slurm Options") + slurm.add_argument( + "--batch", + action="store_true", + default=False, + help="Convert as a batch array job.", + ) + slurm.add_argument("-n", "--ntasks", type=int, default=1) + slurm.add_argument("-c", "--cpus-per-task", type=int, default=cpus_per_task) + slurm.add_argument("-g", "--gpus", type=int, default=gpus, choices=[0, 1]) + slurm.add_argument("-p", "--partition", type=str, default=partition) + slurm.add_argument("-t", "--time", type=str, default=time) + slurm.add_argument("-m", "--mem", type=str, default=mem) + slurm.add_argument("--reservation", type=str, default=reservation) + slurm.add_argument( + "--slurm-log-dir", + type=Path, + default=slurm_log_dir, + help="Output log directory. If the directory does not exist, it will be created automatically.", + ) return parser -def setup_slurm_logs(slurm_log_dir,log_basename): + +def setup_slurm_logs(slurm_log_dir, log_basename): slurm_log_dir = slurm_log_dir.absolute() - slurm_log_dir.mkdir(exist_ok = True,parents=True,mode = 0o2770) - out_log,err_log = [str(slurm_log_dir.joinpath(f'{log_basename}_%A_%a.out')),str(slurm_log_dir.joinpath(f'{log_basename}_%A_%a.err'))] - slurm_logs = {'output_log':out_log,'error_log':err_log} - return slurm_logs \ No newline at end of file + slurm_log_dir.mkdir(exist_ok=True, parents=True, mode=0o2770) + out_log, err_log = [ + str(slurm_log_dir.joinpath(f"{log_basename}_%A_%a.out")), + str(slurm_log_dir.joinpath(f"{log_basename}_%A_%a.err")), + ] + slurm_logs = {"output_log": out_log, "error_log": err_log} + return slurm_logs diff --git a/src/rc_gpfs/db/__init__.py b/src/rc_gpfs/db/__init__.py index 2d3eee9c34c99764449776d4d6468dbe10edabd9..1b5049335222216865394d230ebd70fb65e992bf 100644 --- a/src/rc_gpfs/db/__init__.py +++ b/src/rc_gpfs/db/__init__.py @@ -1,2 +1,3 @@ +from .utils import create_db, df_to_sql -from .utils import create_db, df_to_sql \ No newline at end of file +__all__ = [create_db, df_to_sql] diff --git a/src/rc_gpfs/db/utils.py b/src/rc_gpfs/db/utils.py index af816a05f9fa8c50bdd93a3273098d715d105ecb..467a30c3de4a67ad804fc3e6d939a86083ef641d 100644 --- a/src/rc_gpfs/db/utils.py +++ b/src/rc_gpfs/db/utils.py @@ -1,13 +1,14 @@ -import polars as pl from pathlib import Path + +import polars as pl from sqlalchemy import create_engine, text from ..utils import as_path -__all__= ['create_db','df_to_sql'] +__all__ = ["create_db", "df_to_sql"] -# Definitions are fairly self-explanatory except for modified bytes. Modified bytes are calculated in two different +# Definitions are fairly self-explanatory except for modified bytes. Modified bytes are calculated in two different # ways: # 1. modified_bytes_new: total storage used by the new version of the modified files # 2. modified_bytes_net: net storage difference between old and new versions of modified files @@ -27,25 +28,36 @@ accessed_bytes INTEGER, PRIMARY KEY (tld, log_dt) """ -CHURN_TBL_COLS = ['created','created_bytes','deleted','deleted_bytes', - 'modified','modified_bytes','modified_bytes_net', - 'accessed','accessed_bytes'] +CHURN_TBL_COLS = [ + "created", + "created_bytes", + "deleted", + "deleted_bytes", + "modified", + "modified_bytes", + "modified_bytes_net", + "accessed", + "accessed_bytes", +] + def create_db(db_path: str | Path, table: str, definition: str): db_path = as_path(db_path) - + db_path.parent.mkdir(exist_ok=True, parents=True) engine = create_engine(f"sqlite:///{db_path}") with engine.connect() as conn: conn.execute(text(f"CREATE TABLE IF NOT EXISTS {table} ({definition})")) + def df_to_sql(df: pl.DataFrame, path: Path | str, table: str): path = as_path(path) uri = f"sqlite:///{path}" - df.write_database(table,connection=uri,if_table_exists='append') + df.write_database(table, connection=uri, if_table_exists="append") + def create_churn_db(db_path: str | Path): - table = 'churn' - definition=CHURN_TBL_DEFINITION - create_db(db_path, table, definition) \ No newline at end of file + table = "churn" + definition = CHURN_TBL_DEFINITION + create_db(db_path, table, definition) diff --git a/src/rc_gpfs/policy/__init__.py b/src/rc_gpfs/policy/__init__.py index 71457c264be1b2b6b411c3a40383f2dfef7da54b..08002eb18ccd6ba56123ec7492b2521affa841cd 100644 --- a/src/rc_gpfs/policy/__init__.py +++ b/src/rc_gpfs/policy/__init__.py @@ -1,3 +1,5 @@ -from .split import split, compress_logs from .convert import convert -from .hive import hivize \ No newline at end of file +from .hive import hivize +from .split import compress_logs, split + +__all__ = [convert, hivize, compress_logs, split] \ No newline at end of file diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index 0873e9a07adc92ef826c5bfc72d037f0798bace2..6f1f673f06fe962d0007e948c736a80ee4d9aebc 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -1,47 +1,53 @@ -import re import gzip +import re from pathlib import Path from urllib.parse import unquote import polars as pl -from .policy_defs import SCHEMA from ..utils import as_path +from .policy_defs import SCHEMA + def parse_line(line): try: ul = unquote(line).strip() - ul = re.sub(r'[\n\t]','',ul) - details,path = re.match(r'^[^|]+\|(.*)\| -- (.*)$', ul).groups() - - d = dict([re.match(r'([\w]+)=(.*)',l).groups() for l in details.split('|')]) + ul = re.sub(r"[\n\t]", "", ul) + details, path = re.match(r"^[^|]+\|(.*)\| -- (.*)$", ul).groups() + + d = dict( + [re.match(r"([\w]+)=(.*)", line).groups() for line in details.split("|")] + ) - grp = re.match(r'(?:/data/user(?:/home)?/|/data/project/|/scratch/)([^/]+)',path) + grp = re.match( + r"(?:/data/user(?:/home)?/|/data/project/|/scratch/)([^/]+)", path + ) if grp: tld = grp.group(1) else: tld = None - - d.update({'path': path, - 'tld': tld}) + + d.update({"path": path, "tld": tld}) return d - except: + except AttributeError: return line -def set_output_filename(input_file,output_name = None): + +def set_output_filename(input_file, output_name=None): if output_name is None: output_name = input_file.with_suffix(".parquet").name else: output_name = as_path(output_name).with_suffix(".parquet").name - + return str(output_name) + def convert( - input_file: str | Path, - output_dir: str | Path | None = None, - output_name: str | Path | None = None, - no_clobber: bool = False, - ) -> None: + input_file: str | Path, + output_dir: str | Path | None = None, + output_name: str | Path | None = None, + no_clobber: bool = False, +) -> None: """ Converts a GPFS log file to parquet format. The data schema assumes the same policy definition from list-path-external and list-path-dirplus. @@ -56,15 +62,15 @@ def convert( no_clobber : bool, optional When set to True, if output_dir/output_name.parquet already exists, exit without overwriting the existing file. If False (default), any existing parquet file will be overwritten """ - + input_file = as_path(input_file) - + if output_dir is not None: output_dir = as_path(output_dir) else: - output_dir = input_file.parent.parent.joinpath('parquet') + output_dir = input_file.parent.parent.joinpath("parquet") - output_name = set_output_filename(input_file,output_name) + output_name = set_output_filename(input_file, output_name) output_path = output_dir.joinpath(output_name) if output_path.exists() and no_clobber: @@ -72,20 +78,21 @@ def convert( "INFO: Output file already exists. Pass no_clobber=False to overwrite any existing output parquet file.", flush=True, ) - print("INFO: Cleaning and exiting.",flush=True) + print("INFO: Cleaning and exiting.", flush=True) return output_dir.mkdir(mode=0o2770, exist_ok=True) - - with gzip.open(input_file,'r') as f: - dicts = [parse_line(l) for l in f] - + + with gzip.open(input_file, "r") as f: + dicts = [parse_line(line) for line in f] + df = ( - pl.from_dicts(dicts,schema=SCHEMA) + pl.from_dicts(dicts, schema=SCHEMA) .with_columns( - pl.col(name).str.to_datetime(time_unit='ns') for name in ['access','create','modify'] + pl.col(name).str.to_datetime(time_unit="ns") + for name in ["access", "create", "modify"] ) - .sort('path') + .sort("path") ) - df.write_parquet(output_path,use_pyarrow=True) \ No newline at end of file + df.write_parquet(output_path, statistics="full") diff --git a/src/rc_gpfs/policy/hive.py b/src/rc_gpfs/policy/hive.py index db840bc862bf8df3ed9d5226e64f5b8a3fea86a9..2e0d34f3110208d14f370218f3516f5ec870a4dd 100644 --- a/src/rc_gpfs/policy/hive.py +++ b/src/rc_gpfs/policy/hive.py @@ -1,33 +1,34 @@ -import os -import re import json +import os import random -import string +import re import shutil +import string from pathlib import Path -from typing import Literal, List +from typing import List import polars as pl from ..utils import ( - as_path, - as_bytes, + as_bytes, + as_path, + calculate_age_distribution, calculate_size_distribution, - calculate_age_distribution ) + def collect_hive_df( - parquet_path: str | Path, - acq: str, - tld: str | List[str] | None = None, - hive_path: str | Path | None = None, - no_clobber: bool = False + parquet_path: str | Path, + acq: str, + tld: str | List[str] | None = None, + hive_path: str | Path | None = None, + no_clobber: bool = False, ) -> pl.DataFrame: - if not isinstance(tld,list) and tld is not None: + if not isinstance(tld, list) and tld is not None: tld = [tld] - - print("Collecting dataframe",flush=True) - + + print("Collecting dataframe", flush=True) + df = ( pl.scan_parquet(parquet_path, parallel="prefiltered", rechunk=True) .select( @@ -50,56 +51,61 @@ def collect_hive_df( df = df.filter(pl.col("tld").is_in(tld)) if no_clobber: - existing_tlds = _get_existing_hive_cells(hive_path,acq) - df = df.filter(pl.col('tld').is_in(existing_tlds).not_()) + existing_tlds = _get_existing_hive_cells(hive_path, acq) + df = df.filter(pl.col("tld").is_in(existing_tlds).not_()) - df = df.collect(engine='streaming') + df = df.collect(engine="streaming") print("Finished collecting queries", flush=True) return df + def _get_existing_hive_cells(hive_path: str | Path, acq: str): hive_path = as_path(hive_path) existing_pq = hive_path.rglob(f"*/acq={acq}/*.parquet") - existing_tlds = list(set([p.parent.parent.name.removeprefix("tld=") for p in existing_pq])) + existing_tlds = list( + set([p.parent.parent.name.removeprefix("tld=") for p in existing_pq]) + ) return existing_tlds + def hivize( - parquet_path: str | Path, - hive_path: str | Path, - tld: str | List[str] | None = None, - partition_chunk_size_bytes: int | str = '200MiB', - staging_path: str | Path | None = None, - write_metadata: bool = True, - no_clobber: bool = False, - **kwargs - ) -> None: - + parquet_path: str | Path, + hive_path: str | Path, + tld: str | List[str] | None = None, + partition_chunk_size_bytes: int | str = "200MiB", + staging_path: str | Path | None = None, + write_metadata: bool = True, + no_clobber: bool = False, + **kwargs, +) -> None: parquet_path = as_path(parquet_path).resolve() hive_path = as_path(hive_path).resolve() partition_chunk_size_bytes = int(as_bytes(partition_chunk_size_bytes)) if staging_path is None: - rand_str = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) - staging_path = Path(os.getenv('TMPDIR')).joinpath(os.getenv('USER'),f'hive-{rand_str}') - print(f"INFO: Using {staging_path} as temporary directory",flush=True) + rand_str = "".join(random.choices(string.ascii_letters + string.digits, k=8)) + staging_path = Path(os.getenv("TMPDIR")).joinpath( + os.getenv("USER"), f"hive-{rand_str}" + ) + print(f"INFO: Using {staging_path} as temporary directory", flush=True) else: staging_path = as_path(staging_path) - - hive_path.mkdir(exist_ok=True,parents=True) - staging_path.mkdir(exist_ok=True,parents=True) - + + hive_path.mkdir(exist_ok=True, parents=True) + staging_path.mkdir(exist_ok=True, parents=True) + if tld is not None: - if not isinstance(tld,list): + if not isinstance(tld, list): tld = [tld] - print(f"DEBUG: Hivizing {','.join(tld)}",flush=True) + print(f"DEBUG: Hivizing {','.join(tld)}", flush=True) else: - print(f"DEBUG: Hivizing all tlds",flush=True) + print("DEBUG: Hivizing all tlds", flush=True) - acq = re.search(r'[\d]{4}-[\d]{2}-[\d]{2}',str(parquet_path)).group(0) - print(f"DEBUG: Acquisition date is {acq}",flush=True) + acq = re.search(r"[\d]{4}-[\d]{2}-[\d]{2}", str(parquet_path)).group(0) + print(f"DEBUG: Acquisition date is {acq}", flush=True) df = collect_hive_df(parquet_path, acq, tld, hive_path, no_clobber) @@ -111,14 +117,14 @@ def hivize( shutil.rmtree(staging_path) return - print('Writing to hive') + print("Writing to hive") df.write_parquet( staging_path, - partition_by=['tld','acq'], + partition_by=["tld", "acq"], partition_chunk_size_bytes=partition_chunk_size_bytes, - statistics='full' + statistics="full", ) - print("Finished writing hive dataset",flush=True) + print("Finished writing hive dataset", flush=True) if write_metadata: tlds = df["tld"].unique().to_list() @@ -126,18 +132,16 @@ def hivize( [c for c in df.columns if c not in ["tld", "size", "modify", "access"]] ).lazy() for grp in tlds: - tdf = df.filter(pl.col('tld').eq(grp)) - output_dir=staging_path.joinpath(f"tld={grp}",f"acq={acq}") - write_dataset_metadata(tdf,output_dir,acq) + tdf = df.filter(pl.col("tld").eq(grp)) + output_dir = staging_path.joinpath(f"tld={grp}", f"acq={acq}") + write_dataset_metadata(tdf, output_dir, acq) - shutil.copytree(staging_path,hive_path,dirs_exist_ok=True) + shutil.copytree(staging_path, hive_path, dirs_exist_ok=True) shutil.rmtree(staging_path) + def write_dataset_metadata( - df: pl.DataFrame | pl.LazyFrame, - parquet_path: Path, - acq: str, - **kwargs + df: pl.DataFrame | pl.LazyFrame, parquet_path: Path, acq: str, **kwargs ) -> dict: df = df.lazy() @@ -156,15 +160,15 @@ def write_dataset_metadata( access_dist = ( df.select("access", "size") .with_columns( - calculate_age_distribution(pl.col('access'), acq, **kwargs).alias("grp") + calculate_age_distribution(pl.col("access"), acq, **kwargs).alias("grp") ) .group_by("grp") .agg([pl.sum("size").alias("bytes"), pl.count("size").alias("file_count")]) .sort("grp", descending=True) - .collect(engine='streaming') + .collect(engine="streaming") .to_dicts() ) - + modify_dist = ( df.select("modify", "size") .with_columns( @@ -178,10 +182,10 @@ def write_dataset_metadata( ) metadata = { - 'file_sizes': size_dist, - 'access_times': access_dist, - 'modify_times': modify_dist + "file_sizes": size_dist, + "access_times": access_dist, + "modify_times": modify_dist, } - with open(parquet_path.joinpath('_metadata.json'),'w') as f: - json.dump(metadata,f) \ No newline at end of file + with open(parquet_path.joinpath("_metadata.json"), "w") as f: + json.dump(metadata, f) diff --git a/src/rc_gpfs/policy/policy_defs.py b/src/rc_gpfs/policy/policy_defs.py index 2cd72c9244d0c999870b0166c7c67ffd84d86980..aa32bd27edd173347f63375e05f2442ce3155cba 100644 --- a/src/rc_gpfs/policy/policy_defs.py +++ b/src/rc_gpfs/policy/policy_defs.py @@ -1,16 +1,19 @@ import polars as pl -SCHEMA = pl.Schema({ - 'size': pl.Int64, - 'kballoc': pl.Int64, - 'access': pl.String, - 'create': pl.String, - 'modify': pl.String, - 'uid': pl.Int64, - 'gid': pl.Int64, - 'heat': pl.String, - 'pool': pl.String, - 'mode': pl.String, - 'misc': pl.String, - 'path': pl.String, - 'tld': pl.String -}) \ No newline at end of file + +SCHEMA = pl.Schema( + { + "size": pl.Int64, + "kballoc": pl.Int64, + "access": pl.String, + "create": pl.String, + "modify": pl.String, + "uid": pl.Int64, + "gid": pl.Int64, + "heat": pl.String, + "pool": pl.String, + "mode": pl.String, + "misc": pl.String, + "path": pl.String, + "tld": pl.String, + } +) diff --git a/src/rc_gpfs/policy/split.py b/src/rc_gpfs/policy/split.py index 1438a20238cab29f5817187ed189855d420dea2d..e02b588419e2fb2cf066c0c909c25475594b63d0 100644 --- a/src/rc_gpfs/policy/split.py +++ b/src/rc_gpfs/policy/split.py @@ -1,26 +1,38 @@ -from pathlib import Path import subprocess -from ..utils import parse_scontrol,as_path +from pathlib import Path + +from ..utils import as_path, parse_scontrol + +__all__ = ["split", "compress_logs"] -__all__ = ['split','compress_logs'] def is_gz(filepath) -> bool: - with open(filepath, 'rb') as f: - return f.read(2) == b'\x1f\x8b' + with open(filepath, "rb") as f: + return f.read(2) == b"\x1f\x8b" + def pigz_exists() -> bool: print("INFO: Checking for pigz") - proc = subprocess.run('which pigz',shell=True,text=True,capture_output=True) - + proc = subprocess.run("which pigz", shell=True, text=True, capture_output=True) + if proc.returncode == 1: - print("WARNING: pigz was not found on the PATH. Defaulting to slower zcat. Install pigz or add it to PATH to improve performance") + print( + "WARNING: pigz was not found on the PATH. Defaulting to slower zcat. Install pigz or add it to PATH to improve performance" + ) return False else: print(f"INFO: pigz found at {proc.stdout.strip()}") return True -def split(log: str | Path, output_dir: str | Path | None = None, - lines: int = 5e6, prefix: str = 'list-', compress: bool = True,**kwargs) -> None: + +def split( + log: str | Path, + output_dir: str | Path | None = None, + lines: int = 5e6, + prefix: str = "list-", + compress: bool = True, + **kwargs, +) -> None: """ Split a raw GPFS log file into smaller chunks. These chunks can be converted to parquet format to create a full parquet dataset for parallel analysis. Chunks are optionally recompressed after being split from the raw log. @@ -40,28 +52,29 @@ def split(log: str | Path, output_dir: str | Path | None = None, If True, compress each chunk using gzip. By default True """ log = as_path(log) - + if output_dir is None: - output_dir = log.parent.parent.joinpath('chunks') + output_dir = log.parent.parent.joinpath("chunks") else: output_dir = as_path(output_dir) - output_dir.mkdir(exist_ok=True,mode=0o2770) + output_dir.mkdir(exist_ok=True, mode=0o2770) out = output_dir.joinpath(prefix) if is_gz(log): - cat = 'pigz -dc' if pigz_exists() else 'zcat' + cat = "pigz -dc" if pigz_exists() else "zcat" else: - cat = 'cat' - + cat = "cat" + split_cmd = f"{cat} {log} | split -a 3 -d -l {int(lines)} - {out}" - subprocess.run(split_cmd,shell=True,text=True) + subprocess.run(split_cmd, shell=True, text=True) if compress: compress_logs(output_dir) pass + def compress_logs(log_dir: str | Path) -> None: """ Compress raw logs using gzip in parallel. If a directory contains both uncompressed and compressed logs, the compressed logs will be ignored. @@ -71,14 +84,14 @@ def compress_logs(log_dir: str | Path) -> None: log_dir : str | Path Path to the directory containing the raw logs. """ - nproc,_ = parse_scontrol() + nproc, _ = parse_scontrol() log_dir = as_path(log_dir) - logs = [str(p) for p in log_dir.glob('*[!gz]')] - log_str = '\n'.join(logs) + logs = [str(p) for p in log_dir.glob("*[!gz]")] + log_str = "\n".join(logs) print(f"INFO: {len(logs)} logs found. Beginning compression") - + zip_cmd = f"echo '{log_str}' | xargs -I {{}} -P {nproc} bash -c 'gzip {{}}'" subprocess.run(zip_cmd, shell=True) - pass \ No newline at end of file + pass diff --git a/src/rc_gpfs/process/__init__.py b/src/rc_gpfs/process/__init__.py index f09e3ee82185929ce08fa74931fc5aeba8e94a8f..423dc0cfb5b209a998ad190ada7e11938fe2e82d 100644 --- a/src/rc_gpfs/process/__init__.py +++ b/src/rc_gpfs/process/__init__.py @@ -1,2 +1,4 @@ -from .process import * from .fparq import fparq +from .process import aggregate_gpfs_dataset, calculate_churn + +__all__ = [fparq, aggregate_gpfs_dataset, calculate_churn] diff --git a/src/rc_gpfs/process/fparq.py b/src/rc_gpfs/process/fparq.py index 8f9cbb1145313110530d7dad23c183eda68bfdb6..c96bd57b30ed824239b27fc2ae12437230cdec37 100644 --- a/src/rc_gpfs/process/fparq.py +++ b/src/rc_gpfs/process/fparq.py @@ -1,32 +1,41 @@ -import re from pathlib import Path -import polars as pl from typing import List -from ..utils import as_path, as_bytes +import polars as pl + +from ..utils import as_bytes, as_path + -def _write_partition(files: List[str], fname: str | Path, delim: str = r'\0'): - with open(fname, 'w') as f: +def _write_partition(files: List[str], fname: str | Path, delim: str = r"\0"): + with open(fname, "w") as f: f.write(delim.join(files)) + def write_partitions(df: pl.DataFrame, partition_path: str, partition_prefix: str): - dig = len(str(df['partition'].max())) - df = df.with_columns(pl.col('partition').cast(str).str.zfill(dig)) - for ind, grp in df.group_by('partition'): - files = grp['path'].to_list() - fname = Path(partition_path).joinpath(f'{partition_prefix}-{ind[0]}') - _write_partition(files,fname) - -def fpart(sizes: List[int], max_part_files: int, max_part_size: int, max_file_size: int, init_grp: int = 0): + dig = len(str(df["partition"].max())) + df = df.with_columns(pl.col("partition").cast(str).str.zfill(dig)) + for ind, grp in df.group_by("partition"): + files = grp["path"].to_list() + fname = Path(partition_path).joinpath(f"{partition_prefix}-{ind[0]}") + _write_partition(files, fname) + + +def fpart( + sizes: List[int], + max_part_files: int, + max_part_size: int, + max_file_size: int, + init_grp: int = 0, +): group = init_grp cumsum = 0 cur_grp_size = 0 groups = [] - for i,value in enumerate(sizes): + for i, value in enumerate(sizes): if cur_grp_size > max_part_files: group += 1 - + if value > max_file_size: if i > 0: group += 1 @@ -38,24 +47,26 @@ def fpart(sizes: List[int], max_part_files: int, max_part_size: int, max_file_si group += 1 cumsum = value groups.append(group) - + return groups + def fparq( - df: pl.DataFrame, - partition_path: str | Path, - partition_prefix: str = 'part', - max_part_size: str | int | None = '50GiB', - max_part_files: int | None = None, - tiny_size: str | int | None = '4kiB', - max_tiny_part_size: str | int | None = '1GiB', - max_tiny_part_files: int | None = 250000, - big_size: str | int | float | None = None, - exclude_nonstandard: bool = False, - ret_df: bool = False, - **kwargs + df: pl.DataFrame, + partition_path: str | Path, + partition_prefix: str = "part", + max_part_size: str | int | None = "50GiB", + max_part_files: int | None = None, + tiny_size: str | int | None = "4kiB", + max_tiny_part_size: str | int | None = "1GiB", + max_tiny_part_files: int | None = 250000, + big_size: str | int | float | None = None, + exclude_nonstandard: bool = False, + ret_df: bool = False, + **kwargs, ): - """ + ( + """ Replicates GNU fpart's file partitioning on GPFS policy lists in parquet datasets for use in sync tools such as rsync. This takes in a dataframe of file sizes in bytes indexed by their full file path. Files in the dataframe are sequentially assigned to a partition until the sum of their size reaches a specified threshold or number of files. This behaves identically to fpart's live mode and is the only supported mode. Files can also be classified as either 'tiny' or 'big' and partitioned separately from the rest of the dataset. Tiny files can cause performance issues where the overhead of transferring individual files exceeds the time to transfer the file data itself. Instead, tiny files can be archived in a tarball prior to sync and transferred all at once. Transferring very large files can also be slow when done in a single chunk/thread. Instead, big files will be assigned each to their own partition to help facilitate chunking and parallel transfer. @@ -96,74 +107,74 @@ def fparq( ------ ValueError If both max_part_size and max_part_files are None, a ValueError is raised - """"""""" + """ + """""" + ) if max_part_files is None and max_part_size is None: raise ValueError("At least one of max_part_files or max_part_size must be set") partition_path = as_path(partition_path) - partition_path.mkdir(exist_ok=True,parents=True) + partition_path.mkdir(exist_ok=True, parents=True) max_part_files = float("inf") if max_part_files is None else max_part_files max_part_size = as_bytes(max_part_size, None) tiny_size = as_bytes(tiny_size, 0) - max_tiny_part_size = as_bytes(max_tiny_part_size,1024**3) + max_tiny_part_size = as_bytes(max_tiny_part_size, 1024**3) big_size = as_bytes(big_size, max_part_size) if tiny_size == 0: breaks = [big_size] - labels = ['standard','big'] + labels = ["standard", "big"] else: breaks = [tiny_size, big_size] - labels = ['tiny','standard','big'] + labels = ["tiny", "standard", "big"] df = df.with_columns( - pl.col('size') - .cut( - breaks = breaks, - labels = labels, - left_closed=True - ) - .alias('size_grp') + pl.col("size") + .cut(breaks=breaks, labels=labels, left_closed=True) + .alias("size_grp") ) - if 'tiny' in labels: - tiny = df.filter(pl.col('size_grp').eq('tiny')) + if "tiny" in labels: + tiny = df.filter(pl.col("size_grp").eq("tiny")) else: tiny = pl.DataFrame() - big = df.filter(pl.col('size_grp').eq('big')) - df = df.filter(pl.col('size_grp').eq('standard')) + big = df.filter(pl.col("size_grp").eq("big")) + df = df.filter(pl.col("size_grp").eq("standard")) df = df.with_columns( pl.Series( - name='partition', - values = fpart(df['size'].to_list(), max_part_files, max_part_size, big_size) + name="partition", + values=fpart(df["size"].to_list(), max_part_files, max_part_size, big_size), ) ) if not exclude_nonstandard: if not tiny.is_empty(): - init_grp = df['partition'].max() + 1 + init_grp = df["partition"].max() + 1 tiny = tiny.with_columns( pl.Series( - name='partition', + name="partition", values=fpart( - tiny['size'].to_list(), + tiny["size"].to_list(), max_tiny_part_files, max_tiny_part_size, tiny_size, - init_grp=init_grp - ) + init_grp=init_grp, + ), ) ) - df = pl.concat([df,tiny]) + df = pl.concat([df, tiny]) if not big.is_empty(): - init_grp = df['partition'].max() + 1 + init_grp = df["partition"].max() + 1 big = big.with_columns(partition=range(init_grp, init_grp + big.shape[0])) - df = pl.concat([df,big]) + df = pl.concat([df, big]) - write_partitions(df, partition_path=partition_path, partition_prefix=partition_prefix) + write_partitions( + df, partition_path=partition_path, partition_prefix=partition_prefix + ) return df if ret_df else None diff --git a/src/rc_gpfs/process/process.py b/src/rc_gpfs/process/process.py index 76abf70f2d0f7464a42990167c0b4aebb1a0a429..6d0a3c39025468812157114cdcf73a4b7db2a354 100644 --- a/src/rc_gpfs/process/process.py +++ b/src/rc_gpfs/process/process.py @@ -1,20 +1,22 @@ from pathlib import Path -import polars as pl +from typing import List, Literal + import numpy as np -from typing import Literal, List +import polars as pl from typeguard import typechecked from ..db.utils import CHURN_TBL_COLS from ..utils import as_path, prep_age_distribution, prep_size_distribution -__all__ = ['aggregate_gpfs_dataset','calculate_churn'] +__all__ = ["aggregate_gpfs_dataset", "calculate_churn"] -def _check_dataset_path(dataset_path,file_glob) -> Path: + +def _check_dataset_path(dataset_path, file_glob) -> Path: dataset_path = as_path(dataset_path).resolve() if not dataset_path.exists(): raise FileNotFoundError(f"{dataset_path} does not exist") - + if dataset_path.is_file(): if file_glob is not None: print(f"INFO: {dataset_path} is a file, ignoring glob.") @@ -23,216 +25,218 @@ def _check_dataset_path(dataset_path,file_glob) -> Path: elif dataset_path.is_dir(): n_files = len(list(dataset_path.glob(file_glob))) if n_files == 0: - raise FileNotFoundError(f"No parquet files were found in {dataset_path.joinpath(file_glob)}") + raise FileNotFoundError( + f"No parquet files were found in {dataset_path.joinpath(file_glob)}" + ) dataset_path = dataset_path.joinpath(file_glob) - + print(f"INFO: {n_files} parquet file(s) found in {dataset_path}") - + return dataset_path -def _check_timedelta_values(vals,unit): + +def _check_timedelta_values(vals, unit): if (vals is None) != (unit is None): - print(f"WARNING: time_breakpoints and time_unit were not both set. Skipping age aggregation") + print( + "WARNING: time_breakpoints and time_unit were not both set. Skipping age aggregation" + ) vals = None unit = None - return vals,unit + return vals, unit + @typechecked def aggregate_gpfs_dataset( df: pl.DataFrame | pl.LazyFrame, acq: str | np.datetime64, time_breakpoints: int | List[int] | None = [30, 60, 90, 180], - time_unit: Literal['D','W','M','Y'] | None = 'D', - time_val: Literal['access','modify','create'] | None = 'access', - size_breakpoints: int | str | List[int | str] | None = ['4 kiB','4 MiB','1 GiB','10 GiB','100 GiB','1 TiB'], - **kwargs + time_unit: Literal["D", "W", "M", "Y"] | None = "D", + time_val: Literal["access", "modify", "create"] | None = "access", + size_breakpoints: int | str | List[int | str] | None = [ + "4 kiB", + "4 MiB", + "1 GiB", + "10 GiB", + "100 GiB", + "1 TiB", + ], + **kwargs, ) -> pl.DataFrame: - # Cast to LazyFrame to optimize aggregation. Is idempotent, will not affect a passed LazyFrame df = df.lazy() # Input checking - time_breakpoints,time_unit = _check_timedelta_values(time_breakpoints,time_unit) - - grps = ['tld'] + time_breakpoints, time_unit = _check_timedelta_values(time_breakpoints, time_unit) + + grps = ["tld"] if time_breakpoints is not None: - age_bins, age_labels = prep_age_distribution(acq,time_breakpoints,time_unit) - - df = ( - df - .with_columns( - pl.col(time_val) - .cut(breaks=age_bins,labels=age_labels) - .cast(pl.String) - .cast(pl.Enum(age_labels)) - .alias('dt_grp') - ) + age_bins, age_labels = prep_age_distribution(acq, time_breakpoints, time_unit) + + df = df.with_columns( + pl.col(time_val) + .cut(breaks=age_bins, labels=age_labels) + .cast(pl.String) + .cast(pl.Enum(age_labels)) + .alias("dt_grp") ) - grps.append('dt_grp') + grps.append("dt_grp") if size_breakpoints is not None: size_bins, size_labels = prep_size_distribution(size_breakpoints) - df = ( - df - .with_columns( - pl.col('size') - .cut(breaks=size_bins,labels=size_labels) - .cast(pl.String) - .cast(pl.Enum(size_labels)) - .alias('size_grp') - ) + df = df.with_columns( + pl.col("size") + .cut(breaks=size_bins, labels=size_labels) + .cast(pl.String) + .cast(pl.Enum(size_labels)) + .alias("size_grp") ) - grps.append('size_grp') - + grps.append("size_grp") + df_agg = ( - df - .group_by(grps) - .agg([ - pl.col('size').sum().alias('bytes'), - pl.col('size').count().alias('file_count') - ]) + df.group_by(grps) + .agg( + [ + pl.col("size").sum().alias("bytes"), + pl.col("size").count().alias("file_count"), + ] + ) .sort(grps) - .collect(engine='streaming') + .collect(engine="streaming") ) - + return df_agg - + + @typechecked def calculate_churn( - hive_path: str | Path, - tld: str, - acq_dates: List[ np.datetime64 | str ], - **kwargs + hive_path: str | Path, tld: str, acq_dates: List[np.datetime64 | str], **kwargs ) -> pl.DataFrame: - - acq_dates = [np.datetime_as_string(d,'D') if isinstance(d,np.datetime64) else d for d in acq_dates] + acq_dates = [ + np.datetime_as_string(d, "D") if isinstance(d, np.datetime64) else d + for d in acq_dates + ] acq_dates.sort() ## Input checking hive_path = as_path(hive_path) dataset_path = hive_path.joinpath(f"tld={tld}") - acq_dirs = [d.name.removeprefix('acq=') for d in dataset_path.glob("acq=*")] + acq_dirs = [d.name.removeprefix("acq=") for d in dataset_path.glob("acq=*")] # remove any datetimes for which the given tld does not have data. acq_dates = [d for d in acq_dates if d in acq_dirs] if len(acq_dates) <= 1: - raise ValueError(f"Fewer than two given policy acquisition dates contained data for {tld} in {hive_path}.") + raise ValueError( + f"Fewer than two given policy acquisition dates contained data for {tld} in {hive_path}." + ) churn_l = [] - + df_init = ( - pl.scan_parquet( - dataset_path.joinpath('**/*.parquet'), + pl.scan_parquet( + dataset_path.joinpath("**/*.parquet"), hive_partitioning=True, - hive_schema=pl.Schema({'tld':pl.String,'acq':pl.String}), - parallel='prefiltered' + hive_schema=pl.Schema({"tld": pl.String, "acq": pl.String}), + parallel="prefiltered", ) - .filter(pl.col('tld').eq(tld), pl.col('acq').eq(acq_dates[0])) - .select(['path','modify','size','access']) - ).collect(engine='streaming') - - for i in range(1,len(acq_dates)): + .filter(pl.col("tld").eq(tld), pl.col("acq").eq(acq_dates[0])) + .select(["path", "modify", "size", "access"]) + ).collect(engine="streaming") + + for i in range(1, len(acq_dates)): df_target = ( pl.scan_parquet( - dataset_path.joinpath('**/*.parquet'), + dataset_path.joinpath("**/*.parquet"), hive_partitioning=True, - hive_schema=pl.Schema({'tld':pl.String,'acq':pl.String}), - parallel='prefiltered' - ) - .filter(pl.col('tld').eq(tld), pl.col('acq').eq(acq_dates[i])) - .select(['path','modify','size','access']) - ).collect(engine='streaming') - - churn = ( - _calculate_churn(df_init,df_target) - .with_columns( - pl.lit(acq_dates[i]).alias('log_dt'), - pl.lit(acq_dates[i-1]).alias('prior_log_dt'), - pl.lit(tld).alias('tld') + hive_schema=pl.Schema({"tld": pl.String, "acq": pl.String}), + parallel="prefiltered", ) + .filter(pl.col("tld").eq(tld), pl.col("acq").eq(acq_dates[i])) + .select(["path", "modify", "size", "access"]) + ).collect(engine="streaming") + + churn = _calculate_churn(df_init, df_target).with_columns( + pl.lit(acq_dates[i]).alias("log_dt"), + pl.lit(acq_dates[i - 1]).alias("prior_log_dt"), + pl.lit(tld).alias("tld"), ) churn_l.append(churn) - # This delete pattern paired with the loop creates a type of rotating list where each dataframe, aside from - # the initial and final, is processed as the target and the source for which files exist at a given time. - # The target is then referred to as the source as we move through the time series. Each source is removed - # from memory. This both limits the amount of memory used to only two datasets at a time while also + # This delete pattern paired with the loop creates a type of rotating list where each dataframe, aside from + # the initial and final, is processed as the target and the source for which files exist at a given time. + # The target is then referred to as the source as we move through the time series. Each source is removed + # from memory. This both limits the amount of memory used to only two datasets at a time while also # only reading each dataset once. del df_init - df_init = df_target + df_init = df_target # noqa: F841 del df_target - + churn_df = pl.concat(churn_l) - + return churn_df -def _calculate_churn(df1,df2) -> None: - empty_df = pl.DataFrame(data = {'name':CHURN_TBL_COLS,'value':0}) + +def _calculate_churn(df1, df2) -> None: + empty_df = pl.DataFrame(data={"name": CHURN_TBL_COLS, "value": 0}) if df1.equals(df2): - return empty_df.transpose(column_names='name') + return empty_df.transpose(column_names="name") - dfm = df1.join(df2,how='full',on='path',suffix='_updated',coalesce=True) + dfm = df1.join(df2, how="full", on="path", suffix="_updated", coalesce=True) conditions = [ - dfm['access'].is_null(), - dfm['access_updated'].is_null(), - (dfm['modify'] != dfm['modify_updated']).fill_null(False), - (dfm['access'] != dfm['access_updated']).fill_null(False) + dfm["access"].is_null(), + dfm["access_updated"].is_null(), + (dfm["modify"] != dfm["modify_updated"]).fill_null(False), + (dfm["access"] != dfm["access_updated"]).fill_null(False), ] - choices = ['created','deleted','modified','accessed'] + choices = ["created", "deleted", "modified", "accessed"] - dfm = ( - dfm.with_columns( - pl.Series( - name='type', - values=np.select(conditions,choices,default='unchanged'), - dtype=pl.Categorical - ) + dfm = dfm.with_columns( + pl.Series( + name="type", + values=np.select(conditions, choices, default="unchanged"), + dtype=pl.Categorical, ) ) - dfm = ( - dfm - .filter(pl.col('type').ne('unchanged')) - .drop(['modify','access','modify_updated','access_updated']) + dfm = dfm.filter(pl.col("type").ne("unchanged")).drop( + ["modify", "access", "modify_updated", "access_updated"] ) - modified = dfm.filter(pl.col('type').eq('modified')) - modified_bytes_net = modified['size_updated'].sum() - modified['size'].sum() - - # Instead of writing logic to aggregate across initial size for deleted files and final size for all other - # files, we can essentially condense size across both columns into a new column. Size of deleted files will + modified = dfm.filter(pl.col("type").eq("modified")) + modified_bytes_net = modified["size_updated"].sum() - modified["size"].sum() + + # Instead of writing logic to aggregate across initial size for deleted files and final size for all other + # files, we can essentially condense size across both columns into a new column. Size of deleted files will # come from size while all other files will come from size_updated. dfm = ( - dfm - .with_columns( - pl.col('size_updated').fill_null(pl.col('size')) - ) - .drop('size') - .rename({'size_updated':'size'}) + dfm.with_columns(pl.col("size_updated").fill_null(pl.col("size"))) + .drop("size") + .rename({"size_updated": "size"}) ) agg_df = ( - dfm - .group_by('type') - .agg([ - pl.sum('size').alias('bytes'), - pl.count('size').alias('files') - ]) - .unpivot(on=['bytes','files'],index='type') - .with_columns(pl.concat_str([pl.col('type'),pl.col('variable')],separator='_').alias('name')) - .with_columns(pl.col('name').str.strip_suffix('_files')) - .select(['name','value']) - .extend(pl.DataFrame({'name':'modified_bytes_net','value':modified_bytes_net})) - .join(empty_df.select('name'), on='name', how='right') + dfm.group_by("type") + .agg([pl.sum("size").alias("bytes"), pl.count("size").alias("files")]) + .unpivot(on=["bytes", "files"], index="type") + .with_columns( + pl.concat_str([pl.col("type"), pl.col("variable")], separator="_").alias( + "name" + ) + ) + .with_columns(pl.col("name").str.strip_suffix("_files")) + .select(["name", "value"]) + .extend( + pl.DataFrame({"name": "modified_bytes_net", "value": modified_bytes_net}) + ) + .join(empty_df.select("name"), on="name", how="right") .fill_null(0) - .transpose(column_names='name') + .transpose(column_names="name") ) - return agg_df \ No newline at end of file + return agg_df diff --git a/src/rc_gpfs/process/utils.py b/src/rc_gpfs/process/utils.py index daf8d2b69c3e97008e83d14d60ec380d3e3c8148..5c747f8c250c36afbfef3748d8f4d49e6bd3072f 100644 --- a/src/rc_gpfs/process/utils.py +++ b/src/rc_gpfs/process/utils.py @@ -1,17 +1,19 @@ import re from pathlib import Path + from numpy import datetime64 + def extract_run_date_from_filename( - path: str | Path, - pattern: str = r'[\d]{4}-[\d]{2}-[\d]{2}' - ) -> datetime64: - if isinstance(path,Path): + path: str | Path, pattern: str = r"[\d]{4}-[\d]{2}-[\d]{2}" +) -> datetime64: + if isinstance(path, Path): path = str(path.absolute()) - + try: - run_date = re.search(pattern,path).group() + run_date = re.search(pattern, path).group() return run_date - except: - raise ValueError("No acquisition date was passed or could be inferred from the dataset path. Is the date formatted as YYYY-MM-DD?") - \ No newline at end of file + except AttributeError: + raise ValueError( + "No acquisition date was passed or could be inferred from the dataset path. Is the date formatted as YYYY-MM-DD?" + ) diff --git a/src/rc_gpfs/report/plotting.py b/src/rc_gpfs/report/plotting.py index a316b08d2dfea906b9f211e7fa00527d58e84f04..c387d17836fca4632da065957cdf14e8964350da 100644 --- a/src/rc_gpfs/report/plotting.py +++ b/src/rc_gpfs/report/plotting.py @@ -1,45 +1,67 @@ -import polars as pl -from plotly.graph_objects import Figure import plotly.graph_objects as go +import polars as pl from plotly import colors +from plotly.graph_objects import Figure + +__all__ = ["bar_plot", "pareto_chart"] -__all__ = ['bar_plot','pareto_chart'] -def choose_appropriate_storage_unit(size,starting_unit='B'): +def choose_appropriate_storage_unit(size, starting_unit="B"): if hasattr(size, "__len__"): size = size.max() - + try: - units = ['B','kiB','MiB','GiB','TiB'] - units_b10 = ['kB','MB','GB','TB'] + units = ["B", "kiB", "MiB", "GiB", "TiB"] + units_b10 = ["kB", "MB", "GB", "TB"] if starting_unit in units_b10: - starting_unit = starting_unit.replace('B','iB') + starting_unit = starting_unit.replace("B", "iB") # add logging message here saying the specified base 10 unit is being interpreted as base 2 exp = units.index(starting_unit) - except (ValueError): - raise(f"{starting_unit} is not a valid storage unit. Choose from 'B','kB','MB','GB', or 'TB'") - - while ((size/1024) >= 1) & (exp < 4): - size = size/1024 + except ValueError: + raise ( + f"{starting_unit} is not a valid storage unit. Choose from 'B','kB','MB','GB', or 'TB'" + ) + + while ((size / 1024) >= 1) & (exp < 4): + size = size / 1024 exp += 1 - return exp,units[exp] + return exp, units[exp] + -def _format_number(num,dec=2): +def _format_number(num, dec=2): return f"{num:,.{dec}f}" -def bar_plot(df,x,y,show_legend=True,legend_labels=None,add_text=True,textposition=None,text_decimals=2, - group_colors=colors.qualitative.Plotly,title=None,xlabel=None,ylabel=None,enable_text_hover=False) -> Figure: - if not isinstance(y,list): + +def bar_plot( + df, + x, + y, + show_legend=True, + legend_labels=None, + add_text=True, + textposition=None, + text_decimals=2, + group_colors=colors.qualitative.Plotly, + title=None, + xlabel=None, + ylabel=None, + enable_text_hover=False, +) -> Figure: + if not isinstance(y, list): y = [y] if show_legend and legend_labels is None: legend_labels = y - + textposition = textposition if add_text else None - + fig = go.Figure() - for idx,c in enumerate(y): - text = df[c].map_elements(lambda x: _format_number(x,dec=text_decimals)) if add_text else None + for idx, c in enumerate(y): + text = ( + df[c].map_elements(lambda x: _format_number(x, dec=text_decimals)) + if add_text + else None + ) fig.add_bar( x=df[x], y=df[c], @@ -47,56 +69,70 @@ def bar_plot(df,x,y,show_legend=True,legend_labels=None,add_text=True,textpositi textposition=textposition, name=legend_labels[idx], marker_color=group_colors[idx], - uid=idx + uid=idx, ) # If plotting multiple traces, make some updates to the layout if len(y) > 1: for idx in range(len(y)): - fig.update_traces( - patch = {'offsetgroup':idx}, - selector = {'uid':idx} - ) - + fig.update_traces(patch={"offsetgroup": idx}, selector={"uid": idx}) + fig.update_layout( - barmode='group', # Grouped bar chart - bargap=0.3 + barmode="group", # Grouped bar chart + bargap=0.3, ) fig.update_layout( title_text=title, title_x=0.5, - title_xanchor='center', - title_font_size = 24, + title_xanchor="center", + title_font_size=24, xaxis_title=xlabel, yaxis_title=ylabel, margin=dict(t=100, b=20, l=40, r=40), - template='plotly_white', - hovermode=enable_text_hover + template="plotly_white", + hovermode=enable_text_hover, ) return fig -def pareto_chart(df, x, y, csum_col=None, show_legend=True, legend_labels=['Raw','Cumulative'], add_text=True, - textposition_bar=None, textposition_scatter=None, text_decimals=2, - group_colors=colors.qualitative.Plotly, title=None,xlabel=None,ylabel=None,enable_text_hover=False) -> Figure: + +def pareto_chart( + df, + x, + y, + csum_col=None, + show_legend=True, + legend_labels=["Raw", "Cumulative"], + add_text=True, + textposition_bar=None, + textposition_scatter=None, + text_decimals=2, + group_colors=colors.qualitative.Plotly, + title=None, + xlabel=None, + ylabel=None, + enable_text_hover=False, +) -> Figure: df_ = df.clone() if csum_col is None: - csum_col = f'{y}_cumsum' + csum_col = f"{y}_cumsum" df_ = df_.with_columns(pl.col(y).cum_sum().alias(csum_col)) - + if show_legend and legend_labels is None: - legend_labels = [y,csum_col] - + legend_labels = [y, csum_col] + if add_text and textposition_bar is None: - textposition_bar = 'outside' - + textposition_bar = "outside" + if add_text and textposition_scatter is None: - textposition_scatter = 'top center' - + textposition_scatter = "top center" + fig = go.Figure() if add_text: - bar_text = df_[y].map_elements(lambda x: _format_number(x,dec=text_decimals),return_dtype=pl.String) + bar_text = df_[y].map_elements( + lambda x: _format_number(x, dec=text_decimals), return_dtype=pl.String + ) else: bar_text = None fig.add_bar( @@ -105,41 +141,37 @@ def pareto_chart(df, x, y, csum_col=None, show_legend=True, legend_labels=['Raw' text=bar_text, textposition=textposition_bar, name=legend_labels[0], - marker_color=group_colors[0] + marker_color=group_colors[0], ) if add_text: - scatter_text = ( - df_[csum_col] - .map_elements( - lambda x: _format_number(x,dec=text_decimals), - return_dtype=pl.String - ) + scatter_text = df_[csum_col].map_elements( + lambda x: _format_number(x, dec=text_decimals), return_dtype=pl.String ) scatter_text[0] = None else: scatter_text = None - + fig.add_scatter( x=df_[x], y=df_[csum_col], text=scatter_text, textposition=textposition_scatter, name=legend_labels[1], - mode='lines+markers+text', - marker_color=group_colors[1] + mode="lines+markers+text", + marker_color=group_colors[1], ) fig.update_layout( title_text=title, title_x=0.5, - title_xanchor='center', - title_font_size = 24, + title_xanchor="center", + title_font_size=24, xaxis_title=xlabel, yaxis_title=ylabel, margin=dict(t=100, b=20, l=40, r=40), - template='plotly_white', - hovermode=enable_text_hover + template="plotly_white", + hovermode=enable_text_hover, ) - return fig \ No newline at end of file + return fig diff --git a/src/rc_gpfs/utils/__init__.py b/src/rc_gpfs/utils/__init__.py index e9e9fd4d3c451e2f9ed8ee5368c6618cffb7da56..fe2452bbe9b4d3eacc5c42a9595b4615afa2e375 100644 --- a/src/rc_gpfs/utils/__init__.py +++ b/src/rc_gpfs/utils/__init__.py @@ -1,3 +1,3 @@ -from .core import * -from .datetime import * -from .units import * \ No newline at end of file +from .core import * # noqa: F403 +from .datetime import * # noqa: F403 +from .units import * # noqa: F403 \ No newline at end of file diff --git a/src/rc_gpfs/utils/core.py b/src/rc_gpfs/utils/core.py index 1409d4f2633ef35b457cd0ad55a55bcc9ede6d65..3638ebc488b43d4bbdf08d5c9b850693d582cf2b 100644 --- a/src/rc_gpfs/utils/core.py +++ b/src/rc_gpfs/utils/core.py @@ -3,109 +3,127 @@ import re import subprocess from pathlib import Path from typing import List, Literal, Tuple + +import numpy as np import polars as pl import pyarrow.parquet as pq -import numpy as np +from .datetime import as_datetime, create_timedelta_breakpoints, create_timedelta_labels from .units import as_bytes, convert_si, create_size_bin_labels -from .datetime import * + def parse_scontrol(): - job_id = os.getenv('SLURM_JOB_ID') + job_id = os.getenv("SLURM_JOB_ID") command = f"scontrol show job {job_id} | grep TRES=" - result = subprocess.run(command, shell=True, capture_output=True, text=True).stdout.strip() + result = subprocess.run( + command, shell=True, capture_output=True, text=True + ).stdout.strip() + + tres_pattern = r".*cpu=(?P<cores>[\d]+),mem=(?P<mem>[\d]+[KMGT]?).*" + cores, mem = re.search(tres_pattern, result).groupdict().values() - tres_pattern=r'.*cpu=(?P<cores>[\d]+),mem=(?P<mem>[\d]+[KMGT]?).*' - cores,mem = re.search(tres_pattern,result).groupdict().values() - cores = int(cores) - mem = convert_si(mem,to_unit='G',use_binary=True) - return [cores,mem] + mem = convert_si(mem, to_unit="G", use_binary=True) + return [cores, mem] + def as_path(s: str | Path) -> Path: - if not isinstance(s,Path): + if not isinstance(s, Path): s = Path(s) return s + def prep_size_distribution( - size_bins: int | str | List[int | str] = ['4 kiB','4 MiB','1 GiB','10 GiB','100 GiB','1 TiB'], - **kwargs -) -> Tuple[List[int],List[str]]: - if not isinstance(size_bins,list): + size_bins: int | str | List[int | str] = [ + "4 kiB", + "4 MiB", + "1 GiB", + "10 GiB", + "100 GiB", + "1 TiB", + ], + **kwargs, +) -> Tuple[List[int], List[str]]: + if not isinstance(size_bins, list): size_bins = [size_bins] - size_bins = [as_bytes(s) if isinstance(s,str) else s for s in size_bins] + size_bins = [as_bytes(s) if isinstance(s, str) else s for s in size_bins] size_bins = list(set(size_bins)) - size_bins.sort() # Sorts and removes any duplicates - size_bins = [s for s in size_bins if s > 0] # Removes 0, as it will be implicit as the left-most break point - + size_bins.sort() # Sorts and removes any duplicates + size_bins = [ + s for s in size_bins if s > 0 + ] # Removes 0, as it will be implicit as the left-most break point + size_labels = create_size_bin_labels(size_bins) - return size_bins,size_labels + return size_bins, size_labels + def calculate_size_distribution( - sizes: pl.Series, - size_bins: int | str | List[int | str] = ['4 kiB','4 MiB','1 GiB','10 GiB','100 GiB','1 TiB'], - **kwargs + sizes: pl.Series, + size_bins: int | str | List[int | str] = [ + "4 kiB", + "4 MiB", + "1 GiB", + "10 GiB", + "100 GiB", + "1 TiB", + ], + **kwargs, ) -> pl.Series: - - size_bins,size_labels = prep_size_distribution(size_bins) + size_bins, size_labels = prep_size_distribution(size_bins) size_grps = ( - sizes - .cut( - breaks=size_bins, - labels=size_labels, - **kwargs - ) + sizes.cut(breaks=size_bins, labels=size_labels, **kwargs) .cast(pl.String) .cast(pl.Enum(size_labels)) ) return size_grps + def prep_age_distribution( - acq: str | np.datetime64, - age_breakpoints: int | List[int], - time_unit: Literal['D','W'] -) -> Tuple[List[np.datetime64],List[str]]: - if any([b < 0 for b in age_breakpoints]): - raise ValueError("Timestamp cannot be newer than the current date time") - age_breakpoints = list(set(age_breakpoints)) + acq: str | np.datetime64, + age_breakpoints: int | List[int], + time_unit: Literal["D", "W"], +) -> Tuple[List[np.datetime64], List[str]]: + if not isinstance(age_breakpoints, list): + age_breakpoints = [age_breakpoints] + else: + age_breakpoints = list(set(age_breakpoints)) + age_breakpoints.sort() age_breakpoints = [t for t in age_breakpoints if t > 0] # Create age bin labels before converting to duration for easier parsing - age_labels = create_timedelta_labels(age_breakpoints,time_unit) + age_labels = create_timedelta_labels(age_breakpoints, time_unit) # # Create age bins by subtracting the number of days from the date - age_breakpoints = create_timedelta_breakpoints(as_datetime(acq),age_breakpoints,time_unit) + age_breakpoints = create_timedelta_breakpoints( + as_datetime(acq), age_breakpoints, time_unit + ) + + return age_breakpoints, age_labels - return age_breakpoints,age_labels def calculate_age_distribution( - timestamps: pl.Series, - acq: str | np.datetime64, - age_breakpoints: List[ int ] = [30,60,90,180], - time_unit: Literal['D','W'] = 'D', - **kwargs + timestamps: pl.Series, + acq: str | np.datetime64, + age_breakpoints: List[int] = [30, 60, 90, 180], + time_unit: Literal["D", "W"] = "D", + **kwargs, ) -> pl.Series: - age_breakpoints, age_labels = prep_age_distribution(acq, age_breakpoints, time_unit) - + age_grps = ( - timestamps - .cut( - breaks=age_breakpoints, - labels=age_labels, - **kwargs - ) + timestamps.cut(breaks=age_breakpoints, labels=age_labels, **kwargs) .cast(pl.String) .cast(pl.Enum(age_labels)) ) return age_grps + def get_parquet_dataset_size(parquet_path): tot_size = 0 @@ -113,5 +131,5 @@ def get_parquet_dataset_size(parquet_path): md = pq.read_metadata(p) for rg in range(0, md.num_row_groups): tot_size += md.row_group(rg).total_byte_size - - return tot_size \ No newline at end of file + + return tot_size diff --git a/src/rc_gpfs/utils/datetime.py b/src/rc_gpfs/utils/datetime.py index 06c30b277234db21c93e6ba5b96b3c6e4c78f7ca..d460f3755117bd6001f0b581bfe26c7482a5bc7a 100644 --- a/src/rc_gpfs/utils/datetime.py +++ b/src/rc_gpfs/utils/datetime.py @@ -1,38 +1,51 @@ -from typing import Literal, List +from typing import List, Literal + import numpy as np + # As the logic changed from revision to revision, these are solely transparent wrappers for numpy's datetime and # timedelta conversions. These are most likely unnecessary and can be removed at a later date -def as_datetime( - date: str | np.datetime64 - ) -> np.datetime64: - return np.datetime64(date,'ns') if isinstance(date,str) else date - -def as_timedelta( - val: int, - unit: Literal['D','W'] - ) -> np.timedelta64: +def as_datetime(date: str | np.datetime64) -> np.datetime64: + return np.datetime64(date, "ns") if isinstance(date, str) else date + + +def as_timedelta(val: int, unit: Literal["D", "W"]) -> np.timedelta64: return np.timedelta64(val, unit) + def create_timedelta_breakpoints( run_date: str | np.datetime64, delta_vals: int | List[int], - delta_unit: Literal['D','W'] + delta_unit: Literal["D", "W"], ) -> List[np.datetime64]: + if not isinstance(delta_vals, list): + delta_vals = [delta_vals] run_date = as_datetime(run_date) - return [run_date - as_timedelta(c,delta_unit) for c in delta_vals] + return [run_date - as_timedelta(c, delta_unit) for c in delta_vals if c > 0] + def create_timedelta_labels( delta_vals: int | List[int], - delta_unit: Literal['D','W'], + delta_unit: Literal["D", "W"], ) -> List[str]: - if not isinstance(delta_vals,list): - delta_vals=[delta_vals] + if not isinstance(delta_vals, list): + delta_vals = [delta_vals] + + delta_vals = [v for v in delta_vals if v > 0] + + if len(delta_vals) == 0: + raise ValueError( + "Passed delta_vals are all less than or equal to 0. delta_vals must be positive integers" + ) delta_vals.sort(reverse=True) - deltas = [f'{d}{delta_unit}' for d in delta_vals] + deltas = [f"{d}{delta_unit}" for d in delta_vals] if len(delta_vals) > 1: - labels = [f'>{deltas[0]}'] + [f'{deltas[i+1]}-{deltas[i]}' for i in range(len(deltas)-1)] + [f'<{deltas[-1]}'] + labels = ( + [f">{deltas[0]}"] + + [f"{deltas[i + 1]}-{deltas[i]}" for i in range(len(deltas) - 1)] + + [f"<{deltas[-1]}"] + ) else: - labels = [f'>{deltas[0]}'] + [f'<{deltas[0]}'] - return labels \ No newline at end of file + labels = [f">{deltas[0]}"] + [f"<{deltas[0]}"] + return labels diff --git a/src/rc_gpfs/utils/units.py b/src/rc_gpfs/utils/units.py index 01e3372e181d7698a2b31ac718a8f94aced22dad..dc2b9b06fccee5fde706efed01b4fc413fcac3b8 100644 --- a/src/rc_gpfs/utils/units.py +++ b/src/rc_gpfs/utils/units.py @@ -1,12 +1,15 @@ import re +from typing import List, Literal + import numpy as np -from typing import Literal, List + # ENH: if this package becomes merged with noctua, need to replace this function since it's copied directly from there -def convert_si(value: str | float | int, - unit: Literal['base','K','M','G','T'] | None = None, - to_unit: Literal['base','K','M','G','T'] = 'base', - use_binary: bool = False +def convert_si( + value: str | float | int, + unit: Literal["base", "K", "M", "G", "T"] | None = None, + to_unit: Literal["base", "K", "M", "G", "T"] = "base", + use_binary: bool = False, ) -> float: """_summary_ @@ -31,76 +34,72 @@ def convert_si(value: str | float | int, # Unit multipliers unit_multipliers = { - 'base': 1, - 'K': factor, - 'M': factor ** 2, - 'G': factor ** 3, - 'T': factor ** 4, + "base": 1, + "K": factor, + "M": factor**2, + "G": factor**3, + "T": factor**4, } # If value is a string, extract the number and the unit if isinstance(value, str): # Extract numeric part and unit part value = value.strip() - for suffix in ['K', 'M', 'G', 'T']: + for suffix in ["K", "M", "G", "T"]: if value.upper().endswith(suffix): unit = suffix value = value[:-1] break else: - unit = 'base' + unit = "base" value = float(value) - + # If value is numeric, use the provided unit or default to 'base' if unit is None: - unit = 'base' + unit = "base" # Convert the input value to base base_value = value * unit_multipliers[unit] # Convert base value to the target unit converted_value = base_value / unit_multipliers[to_unit] - + return converted_value -def as_bytes( - val: str | int | None, - default: int | None = None -) -> int | None: - if isinstance(val,str): - val = re.sub(r'[iB]*$','',val) - val = convert_si(val,use_binary=True) + +def as_bytes(val: str | int | None, default: int | None = None) -> int | None: + if isinstance(val, str): + val = re.sub(r"[iB]*$", "", val) + val = convert_si(val, use_binary=True) elif val is None and default is not None: val = default return val -def bytes_to_human_readable_size( - num_bytes: int -) -> str: - units = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB'] - + +def bytes_to_human_readable_size(num_bytes: int) -> str: + units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"] + # Handle the case where num_bytes is 0 if num_bytes == 0: return "0 B" - + # Calculate the appropriate unit. Take the floor if the number is not divisible by 1024 unit_index = min(len(units) - 1, int(np.log2(num_bytes) // 10)) - num = int(num_bytes // (1024 ** unit_index)) - + num = int(num_bytes // (1024**unit_index)) + # Format the number to 2 decimal places and append the unit return f"{num} {units[unit_index]}" -def create_size_bin_labels( - size_bins: int | List[int] -) -> List[str]: + +def create_size_bin_labels(size_bins: int | List[int]) -> List[str]: labels = [] - labels.append(f"0 B-{bytes_to_human_readable_size(size_bins[0])}") - + labels.append(f"0 B-{bytes_to_human_readable_size(size_bins[0])}") + for i in range(len(size_bins) - 1): lower_bound = bytes_to_human_readable_size(size_bins[i]) upper_bound = bytes_to_human_readable_size(size_bins[i + 1]) labels.append(f"{lower_bound}-{upper_bound}") - + labels.append(f">{bytes_to_human_readable_size(size_bins[-1])}") - return labels \ No newline at end of file + return labels diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..dc1802e761b9533c7bcfabf958da49a28d8b1569 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,346 @@ +import pytest +from typing import Literal +from pathlib import Path +from rc_gpfs import utils + +from polars.testing import assert_series_equal +import polars as pl +import numpy as np + + +### General Purpose Utils +@pytest.mark.parametrize("path", ["/data/rc/gpfs-policy", Path("/data/rc/gpfs-policy")]) +def test_as_path_valid(path: Path | Literal["/data/rc/gpfs-policy"]): + p_path = utils.as_path(path) + assert isinstance(p_path, Path) + + +@pytest.mark.parametrize( + "value,unit,to_unit,use_binary,expected", + [ + (1, "G", "K", False, 1000000), + (4, "K", "base", True, 4096), + ("100", "base", "T", False, 1e-10), + ], +) +def test_convert_si(value, unit, to_unit, use_binary, expected): + assert utils.convert_si(value, unit, to_unit, use_binary) == expected + + +### Memory and File Size Utils +@pytest.mark.parametrize( + "val,default,expected", + [ + ("1 kiB", None, 1024), + ("1 kiB", (1024**4), 1024), + ("10 TiB", None, 10 * (1024**4)), + (None, 1024, 1024), + (None, None, None), + ("1.5 MiB", None, 1572864), + ], +) +def test_as_bytes(val, default, expected): + assert utils.as_bytes(val, default) == expected + + +class TestSizeGrouping: + input_sizes = pl.Series( + name="size", + values=[ + 0, + 2048, + 4096, + 1024**2, # 1 MiB + 1024**3, # 1 GiB + 20 * 1024**3, # 20 GiB + 1024**5, # 1 PiB + ], + dtype=pl.Int128(), + ) + + expected_size_groups = [ + pl.Series( + name="size", + values=[ + "0 B-4 KiB", + "0 B-4 KiB", + "0 B-4 KiB", + "4 KiB-4 MiB", + "4 MiB-1 GiB", + "10 GiB-100 GiB", + ">1 TiB", + ], + dtype=pl.Enum( + [ + "0 B-4 KiB", + "4 KiB-4 MiB", + "4 MiB-1 GiB", + "1 GiB-10 GiB", + "10 GiB-100 GiB", + "100 GiB-1 TiB", + ">1 TiB", + ] + ), + ), + pl.Series( + name="size", + values=[ + "0 B-4 KiB", + "0 B-4 KiB", + "0 B-4 KiB", + "4 KiB-4 MiB", + "4 MiB-1 GiB", + "10 GiB-100 GiB", + ">1 TiB", + ], + dtype=pl.Enum( + [ + "0 B-4 KiB", + "4 KiB-4 MiB", + "4 MiB-1 GiB", + "1 GiB-10 GiB", + "10 GiB-100 GiB", + "100 GiB-1 TiB", + ">1 TiB", + ] + ), + ), + pl.Series( + name="size", + values=[ + "0 B-1 KiB", + "1 KiB-4 KiB", + "1 KiB-4 KiB", + "4 KiB-10 GiB", + "4 KiB-10 GiB", + ">10 GiB", + ">10 GiB", + ], + dtype=pl.Enum(["0 B-1 KiB", "1 KiB-4 KiB", "4 KiB-10 GiB", ">10 GiB"]), + ), + pl.Series( + name="size", + values=[ + "0 B-1 MiB", + "0 B-1 MiB", + "0 B-1 MiB", + "0 B-1 MiB", + ">1 MiB", + ">1 MiB", + ">1 MiB", + ], + dtype=pl.Enum( + [ + "0 B-1 MiB", + ">1 MiB", + ] + ), + ), + ] + + input_bins = [ + ["4 KiB", "4 MiB", "1 GiB", "10 GiB", "100 GiB", "1 TiB"], + [4096, 4194304, 1073741824, 10737418240, 107374182400, 1099511627776], + ["10 GiB", 1024, 4096, "1 KiB", 0], + "1 MiB", + ] + + expected_bins = [ + [4096, 4194304, 1073741824, 10737418240, 107374182400, 1099511627776], + [4096, 4194304, 1073741824, 10737418240, 107374182400, 1099511627776], + [1024, 4096, 10737418240], + [1048576], + ] + + expected_labels = [ + [ + "0 B-4 KiB", + "4 KiB-4 MiB", + "4 MiB-1 GiB", + "1 GiB-10 GiB", + "10 GiB-100 GiB", + "100 GiB-1 TiB", + ">1 TiB", + ], + [ + "0 B-4 KiB", + "4 KiB-4 MiB", + "4 MiB-1 GiB", + "1 GiB-10 GiB", + "10 GiB-100 GiB", + "100 GiB-1 TiB", + ">1 TiB", + ], + ["0 B-1 KiB", "1 KiB-4 KiB", "4 KiB-10 GiB", ">10 GiB"], + ["0 B-1 MiB", ">1 MiB"], + ] + + @pytest.mark.parametrize("bins,expected", list(zip(expected_bins, expected_labels))) + def test_create_size_bin_labels(self, bins, expected): + assert utils.create_size_bin_labels(bins) == expected + + @pytest.mark.parametrize( + "bins,expected", + list(zip(input_bins, list(zip(expected_bins, expected_labels)))), + ) + def test_prep_size_distribution(self, bins, expected): + assert utils.prep_size_distribution(size_bins=bins) == expected + + @pytest.mark.parametrize( + "bins,expected", list(zip(input_bins, expected_size_groups)), ids=[1, 2, 3, 4] + ) + def test_calculate_size_distribution(self, bins, expected): + assert_series_equal( + utils.calculate_size_distribution(self.input_sizes, size_bins=bins), + expected, + ) + + +### DateTime and File Age Utils +@pytest.mark.parametrize( + "date,expected", + [ + ("2025-01-01", np.datetime64("2025-01-01T00:00:00.000000000")), + ( + np.datetime64("2025-01-01T00:00:00.000000000"), + np.datetime64("2025-01-01T00:00:00.000000000"), + ), + (1735689600000000000, 1735689600000000000), + (None, None), + ], +) +def test_as_datetime(date, expected): + assert utils.as_datetime(date) == expected + + +def test_as_datetime_fails(): + with pytest.raises(ValueError): + utils.as_datetime("not a date") + + +@pytest.mark.parametrize("val", [1, 3, 5]) +@pytest.mark.parametrize("unit", ["D", "W"]) +def test_as_timedelta(val, unit): + assert utils.as_timedelta(val, unit) == np.timedelta64(val, unit) + + +class TestAgeGrouping: + timestamps = pl.Series( + name="access", + values=[ + "2024-05-12", + "2025-02-21", + "2025-04-02", + "2025-04-17", + "2025-05-17", + ], + ).str.to_datetime(time_unit="ns") + + acq_date = "2025-06-01" + + delta_vals = [[-2, 30, 60, 90, 180], 365, [0, 4, 8, 12, 16], 52] + + delta_unit = ["D", "D", "W", "W"] + + expected_breakpoints = [ + [ + np.datetime64("2025-05-02T00:00:00.000000000"), + np.datetime64("2025-04-02T00:00:00.000000000"), + np.datetime64("2025-03-03T00:00:00.000000000"), + np.datetime64("2024-12-03T00:00:00.000000000"), + ], + np.datetime64("2024-06-01T00:00:00.000000000"), + [ + np.datetime64("2025-05-04T00:00:00.000000000"), + np.datetime64("2025-04-06T00:00:00.000000000"), + np.datetime64("2025-03-09T00:00:00.000000000"), + np.datetime64("2025-02-09T00:00:00.000000000"), + ], + np.datetime64("2024-06-02T00:00:00.000000000"), + ] + + expected_labels = [ + [">180D", "90D-180D", "60D-90D", "30D-60D", "<30D"], + [ + ">365D", + "<365D", + ], + [">16W", "12W-16W", "8W-12W", "4W-8W", "<4W"], + [">52W", "<52W"], + ] + + expected_age_groups_days = [ + pl.Series( + name="access", + values=[ + ">180D", + "90D-180D", + "60D-90D", + "30D-60D", + "<30D", + ], + dtype=pl.Enum([">180D", "90D-180D", "60D-90D", "30D-60D", "<30D"]), + ), + pl.Series( + name="access", + values=[">365D", "<365D", "<365D", "<365D", "<365D"], + dtype=pl.Enum([">365D", "<365D"]), + ), + pl.Series( + name="access", + values=[">16W", "12W-16W", "8W-12W", "4W-8W", "<4W"], + dtype=pl.Enum( + [">16W", "12W-16W", "8W-12W", "4W-8W", "<4W"], + ), + ), + pl.Series( + name="access", + values=[">52W", "<52W", "<52W", "<52W", "<52W"], + dtype=pl.Enum([">52W", "<52W"]), + ), + ] + + @pytest.mark.parametrize( + "delta_vals,delta_unit,expected", + list(zip(delta_vals, delta_unit, expected_breakpoints)), + ) + def test_create_timedelta_breakpoints_days(self, delta_vals, delta_unit, expected): + assert ( + utils.create_timedelta_breakpoints(self.acq_date, delta_vals, delta_unit) + == expected + ) + + @pytest.mark.parametrize( + "delta_vals,delta_unit,expected", + list(zip(delta_vals, delta_unit, expected_labels)), + ) + def test_create_timedelta_labels(self, delta_vals, delta_unit, expected): + assert utils.create_timedelta_labels(delta_vals, delta_unit) == expected + + @pytest.mark.parametrize( + "age_breakpoints,delta_unit,expected", + list( + zip( + delta_vals, delta_unit, list(zip(expected_breakpoints, expected_labels)) + ) + ), + ) + def test_prep_age_distribution_days(self, age_breakpoints, delta_unit, expected): + assert ( + utils.prep_age_distribution(self.acq_date, age_breakpoints, delta_unit) + == expected + ) + + @pytest.mark.parametrize( + "age_breakpoints,delta_unit,expected", + list(zip(delta_vals, delta_unit, expected_age_groups_days)), + ) + def test_calculate_age_distribution_days( + self, age_breakpoints, delta_unit, expected + ): + assert_series_equal( + utils.calculate_age_distribution( + self.timestamps, self.acq_date, age_breakpoints, delta_unit + ), + expected, + )