From f9708260e1d1580831f8019296a2800c2957fc95 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Wed, 5 Feb 2025 11:41:50 -0600 Subject: [PATCH] Change partitioning name to fparq --- pyproject.toml | 6 ++ src/rc_gpfs/cli/__init__.py | 2 +- .../cli/{gpfs_fpart.py => fparq_cli.py} | 8 +-- src/rc_gpfs/process/__init__.py | 2 +- src/rc_gpfs/process/{gpfspart.py => fparq.py} | 56 ++++++++++++++++--- 5 files changed, 61 insertions(+), 13 deletions(-) rename src/rc_gpfs/cli/{gpfs_fpart.py => fparq_cli.py} (98%) rename src/rc_gpfs/process/{gpfspart.py => fparq.py} (51%) diff --git a/pyproject.toml b/pyproject.toml index 3ea5e67..001321a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,12 @@ name="rapids" url="https://pypi.nvidia.com" priority = "supplemental" +[tool.poetry.scripts] +convert-to-hive = "rc_gpfs.cli.convert_flat_to_hive:convert_flat_to_hive" +convert-to-parquet = "rc_gpfs.cli.convert_to_parquet:convert_to_parquet" +split-log = "rc_gpfs.cli.split_log:split_log" +fparq = "rc_gpfs.cli.fparq_cli:fparq_cli" + [tool.poetry.requires-plugins] poetry-dynamic-versioning = { version = ">=1.0.0,<2.0.0", extras = ["plugin"] } diff --git a/src/rc_gpfs/cli/__init__.py b/src/rc_gpfs/cli/__init__.py index 7250116..c5e66db 100644 --- a/src/rc_gpfs/cli/__init__.py +++ b/src/rc_gpfs/cli/__init__.py @@ -1,4 +1,4 @@ from .convert_flat_to_hive import convert_flat_to_hive from .convert_to_parquet import convert_to_parquet from .split_log import split_log -from .gpfs_fpart import gpfs_fpart \ No newline at end of file +from .fparq_cli import fparq_cli \ No newline at end of file diff --git a/src/rc_gpfs/cli/gpfs_fpart.py b/src/rc_gpfs/cli/fparq_cli.py similarity index 98% rename from src/rc_gpfs/cli/gpfs_fpart.py rename to src/rc_gpfs/cli/fparq_cli.py index 1b881ab..34fd6b0 100644 --- a/src/rc_gpfs/cli/gpfs_fpart.py +++ b/src/rc_gpfs/cli/fparq_cli.py @@ -2,7 +2,7 @@ import argparse from pathlib import Path import pandas as pd -from ..process import gpfspart +from ..process import fparq DESCRIPTION = """ gpfspart is a custom implementation of the fpart algorithm specifically designed to work with processed GPFS policy outputs. fpart crawls a directory tree partitioning the files by size up to a specified max and stores them in a number of lists. These lists are useful for passing to rsync and its derivative implementations to load balance and parallelize large data transfers. However, fpart has to crawl a file tree to create these partitions which can impact performance on very large, network file systems such as GPFS. @@ -59,12 +59,12 @@ def parse_args(): args = parser.parse_args() return vars(args) -def gpfs_fpart(): +def fparq_cli(): args = parse_args() if args.get('partition_path') is None: pq_path = args.get('parquet_path') args.update({'partition_path': pq_path.joinpath('_partitions')}) - + df = pd.read_parquet(args.get('parquet_path')) - gpfspart(df,**args) \ No newline at end of file + fparq(df,**args) diff --git a/src/rc_gpfs/process/__init__.py b/src/rc_gpfs/process/__init__.py index 9bf1d1e..f09e3ee 100644 --- a/src/rc_gpfs/process/__init__.py +++ b/src/rc_gpfs/process/__init__.py @@ -1,2 +1,2 @@ from .process import * -from .gpfspart import gpfspart \ No newline at end of file +from .fparq import fparq diff --git a/src/rc_gpfs/process/gpfspart.py b/src/rc_gpfs/process/fparq.py similarity index 51% rename from src/rc_gpfs/process/gpfspart.py rename to src/rc_gpfs/process/fparq.py index d062324..c84eb28 100644 --- a/src/rc_gpfs/process/gpfspart.py +++ b/src/rc_gpfs/process/fparq.py @@ -48,7 +48,7 @@ def fpart(series, max_part_files, max_part_size, max_file_size, init_grp=0): return groups -def gpfspart( +def fparq( df: pd.DataFrame, partition_path: str | Path, partition_prefix: str = 'part', @@ -62,9 +62,51 @@ def gpfspart( ret_df: bool = False, **kwargs ): + """ + Replicates GNU fpart's file partitioning on GPFS policy lists in parquet datasets for use in sync tools such as rsync. This takes in a dataframe of file sizes in bytes indexed by their full file path. Files in the dataframe are sequentially assigned to a partition until the sum of their size reaches a specified threshold or number of files. This behaves identically to fpart's live mode and is the only supported mode. + + Files can also be classified as either 'tiny' or 'big' and partitioned separately from the rest of the dataset. Tiny files can cause performance issues where the overhead of transferring individual files exceeds the time to transfer the file data itself. Instead, tiny files can be archived in a tarball prior to sync and transferred all at once. Transferring very large files can also be slow when done in a single chunk/thread. Instead, big files will be assigned each to their own partition to help facilitate chunking and parallel transfer. + + A file is created for each partition listing the file paths in that partition. Paths are null-delimited to account for cases where newlines are included in file names + + Parameters + ---------- + df : pd.DataFrame + Dataframe containing a column called 'size' and the file paths as the index. + partition_path : str | Path + Path to a directory to store the partition files + partition_prefix : str, optional + String to prepend to the partition number in the file name. By default 'part' + max_part_size : str | int | None, optional + Max total size of a standard partition in bytes. This can be specified either as an integer or as a human-readable byte string (e.g. 10*(1024**3), '10GiB', '10GB', and '10G' all refer to 10 Gibibytes). All byte strings are interpreted as base 2 instead of 10. If None, files will only be partitioned by max_part_files. By default '50GiB' + max_part_files : int | None, optional + Max number of files in a standard partition. Can be used in conjunction with max_part_size. At least one must be set. By default None + tiny_size : str | int | None, optional + Integer or human-readable byte string specifying which files are classified as tiny. If None, tiny_size is set to 0. By default '4kiB' + max_tiny_part_size : str | int | None, optional + Max total size per partition for tiny files. By default '1GiB' + max_tiny_part_files : int | None, optional + Max total files per partition for tiny files. By default 250000 + big_size : str | int | float | None, optional + Integer or byte string specifying which files are classified as 'big'. Big files are each assigned to their own partition even if the big_size is much less than the max_part_size. If None, big_size is set to the lesser of max_part_size or float("inf"). By default None + exclude_nonstandard : bool, optional + Optionally exclude tiny and big files from the partition lists at the end. If files are categorized as big or tiny while this option is set, they will be ignored, and only files with "standard" sizes will be written to partition files. By default False + ret_df : bool, optional + Optionally return the file paths with their assigned partition for further processing and aggregation. By default False + + Returns + ------- + pd.DataFrame + Dataframe with file paths and their assigned partitions. Only returned if ret_df is True + + Raises + ------ + ValueError + If both max_part_size and max_part_files are None, a ValueError is raised + """"""""" if max_part_files is None and max_part_size is None: raise ValueError("At least one of max_part_files or max_part_size must be set") - + partition_path = as_path(partition_path) partition_path.mkdir(exist_ok=True,parents=True) @@ -93,12 +135,12 @@ def gpfspart( tiny = df.loc[df['size_grp'] == 'tiny']['size'].to_frame() else: tiny = pd.DataFrame() - + big = df.loc[df['size_grp'] == 'big']['size'].to_frame() df = df.loc[df['size_grp'] == 'standard']['size'].to_frame() - + df['partition'] = fpart(df['size'], max_part_files, max_part_size, big_size) - + if not exclude_nonstandard: if not tiny.empty: init_grp = df['partition'].max() + 1 @@ -117,5 +159,5 @@ def gpfspart( df = pd.concat([df,big]) write_partitions(df, partition_path=partition_path, partition_prefix=partition_prefix) - - return df if ret_df else None \ No newline at end of file + + return df if ret_df else None -- GitLab