Skip to content
Snippets Groups Projects
Commit f9708260 authored by Matthew K Defenderfer's avatar Matthew K Defenderfer
Browse files

Change partitioning name to fparq

parent defe2c26
No related branches found
No related tags found
1 merge request!49Change partitioning name to fparq
......@@ -48,6 +48,12 @@ name="rapids"
url="https://pypi.nvidia.com"
priority = "supplemental"
[tool.poetry.scripts]
convert-to-hive = "rc_gpfs.cli.convert_flat_to_hive:convert_flat_to_hive"
convert-to-parquet = "rc_gpfs.cli.convert_to_parquet:convert_to_parquet"
split-log = "rc_gpfs.cli.split_log:split_log"
fparq = "rc_gpfs.cli.fparq_cli:fparq_cli"
[tool.poetry.requires-plugins]
poetry-dynamic-versioning = { version = ">=1.0.0,<2.0.0", extras = ["plugin"] }
......
from .convert_flat_to_hive import convert_flat_to_hive
from .convert_to_parquet import convert_to_parquet
from .split_log import split_log
from .gpfs_fpart import gpfs_fpart
\ No newline at end of file
from .fparq_cli import fparq_cli
\ No newline at end of file
......@@ -2,7 +2,7 @@ import argparse
from pathlib import Path
import pandas as pd
from ..process import gpfspart
from ..process import fparq
DESCRIPTION = """
gpfspart is a custom implementation of the fpart algorithm specifically designed to work with processed GPFS policy outputs. fpart crawls a directory tree partitioning the files by size up to a specified max and stores them in a number of lists. These lists are useful for passing to rsync and its derivative implementations to load balance and parallelize large data transfers. However, fpart has to crawl a file tree to create these partitions which can impact performance on very large, network file systems such as GPFS.
......@@ -59,12 +59,12 @@ def parse_args():
args = parser.parse_args()
return vars(args)
def gpfs_fpart():
def fparq_cli():
args = parse_args()
if args.get('partition_path') is None:
pq_path = args.get('parquet_path')
args.update({'partition_path': pq_path.joinpath('_partitions')})
df = pd.read_parquet(args.get('parquet_path'))
gpfspart(df,**args)
\ No newline at end of file
fparq(df,**args)
from .process import *
from .gpfspart import gpfspart
\ No newline at end of file
from .fparq import fparq
......@@ -48,7 +48,7 @@ def fpart(series, max_part_files, max_part_size, max_file_size, init_grp=0):
return groups
def gpfspart(
def fparq(
df: pd.DataFrame,
partition_path: str | Path,
partition_prefix: str = 'part',
......@@ -62,9 +62,51 @@ def gpfspart(
ret_df: bool = False,
**kwargs
):
"""
Replicates GNU fpart's file partitioning on GPFS policy lists in parquet datasets for use in sync tools such as rsync. This takes in a dataframe of file sizes in bytes indexed by their full file path. Files in the dataframe are sequentially assigned to a partition until the sum of their size reaches a specified threshold or number of files. This behaves identically to fpart's live mode and is the only supported mode.
Files can also be classified as either 'tiny' or 'big' and partitioned separately from the rest of the dataset. Tiny files can cause performance issues where the overhead of transferring individual files exceeds the time to transfer the file data itself. Instead, tiny files can be archived in a tarball prior to sync and transferred all at once. Transferring very large files can also be slow when done in a single chunk/thread. Instead, big files will be assigned each to their own partition to help facilitate chunking and parallel transfer.
A file is created for each partition listing the file paths in that partition. Paths are null-delimited to account for cases where newlines are included in file names
Parameters
----------
df : pd.DataFrame
Dataframe containing a column called 'size' and the file paths as the index.
partition_path : str | Path
Path to a directory to store the partition files
partition_prefix : str, optional
String to prepend to the partition number in the file name. By default 'part'
max_part_size : str | int | None, optional
Max total size of a standard partition in bytes. This can be specified either as an integer or as a human-readable byte string (e.g. 10*(1024**3), '10GiB', '10GB', and '10G' all refer to 10 Gibibytes). All byte strings are interpreted as base 2 instead of 10. If None, files will only be partitioned by max_part_files. By default '50GiB'
max_part_files : int | None, optional
Max number of files in a standard partition. Can be used in conjunction with max_part_size. At least one must be set. By default None
tiny_size : str | int | None, optional
Integer or human-readable byte string specifying which files are classified as tiny. If None, tiny_size is set to 0. By default '4kiB'
max_tiny_part_size : str | int | None, optional
Max total size per partition for tiny files. By default '1GiB'
max_tiny_part_files : int | None, optional
Max total files per partition for tiny files. By default 250000
big_size : str | int | float | None, optional
Integer or byte string specifying which files are classified as 'big'. Big files are each assigned to their own partition even if the big_size is much less than the max_part_size. If None, big_size is set to the lesser of max_part_size or float("inf"). By default None
exclude_nonstandard : bool, optional
Optionally exclude tiny and big files from the partition lists at the end. If files are categorized as big or tiny while this option is set, they will be ignored, and only files with "standard" sizes will be written to partition files. By default False
ret_df : bool, optional
Optionally return the file paths with their assigned partition for further processing and aggregation. By default False
Returns
-------
pd.DataFrame
Dataframe with file paths and their assigned partitions. Only returned if ret_df is True
Raises
------
ValueError
If both max_part_size and max_part_files are None, a ValueError is raised
"""""""""
if max_part_files is None and max_part_size is None:
raise ValueError("At least one of max_part_files or max_part_size must be set")
partition_path = as_path(partition_path)
partition_path.mkdir(exist_ok=True,parents=True)
......@@ -93,12 +135,12 @@ def gpfspart(
tiny = df.loc[df['size_grp'] == 'tiny']['size'].to_frame()
else:
tiny = pd.DataFrame()
big = df.loc[df['size_grp'] == 'big']['size'].to_frame()
df = df.loc[df['size_grp'] == 'standard']['size'].to_frame()
df['partition'] = fpart(df['size'], max_part_files, max_part_size, big_size)
if not exclude_nonstandard:
if not tiny.empty:
init_grp = df['partition'].max() + 1
......@@ -117,5 +159,5 @@ def gpfspart(
df = pd.concat([df,big])
write_partitions(df, partition_path=partition_path, partition_prefix=partition_prefix)
return df if ret_df else None
\ No newline at end of file
return df if ret_df else None
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment