Skip to content
Snippets Groups Projects
fparq.py 7.22 KiB
Newer Older
import re
from pathlib import Path
import pandas as pd

from ..utils import as_path, convert_si

def _to_bytes(val: str | int | None, default: int | None) -> int | None:
    if isinstance(val,str):
        val = re.sub(r'[iB]*$','',val)
        val = convert_si(val,use_binary=True)
    elif val is None and default is not None:
        val = default
    return val

def _write_partition(files, fname, delim=r'\0'):
    with open(fname, 'w') as f:
        f.write(delim.join(files))

def write_partitions(df, partition_path, partition_prefix):
    dig = len(str(df['partition'].max()))
    df['partition'] = df['partition'].astype(str).str.zfill(dig)
    for ind,grp in df.groupby('partition'):
        files = grp.index.values
        fname = Path(partition_path).joinpath(f'{partition_prefix}-{ind}')
        _write_partition(files,fname)

def fpart(series, max_part_files, max_part_size, max_file_size, init_grp=0):
    group = init_grp
    cumsum = 0
    cur_grp_size = 0
    groups = []

    for i,value in enumerate(series):
        if cur_grp_size > max_part_files:
            group += 1
            
        if value > max_file_size:
            if i > 0:
                group += 1
            groups.append(group)
            cumsum = 0
        else:
            cumsum += value
            if cumsum >= max_part_size:
                group += 1
                cumsum = value
            groups.append(group)
    
    return groups

        df: pd.DataFrame,
        partition_path: str | Path,
        partition_prefix: str = 'part',
        max_part_size: str | int | None = '50GiB',
        max_part_files: int | None = None,
        tiny_size: str | int | None = '4kiB',
        max_tiny_part_size: str | int | None = '1GiB',
        max_tiny_part_files: int | None = 250000,
        big_size: str | int | float | None = None,
        exclude_nonstandard: bool = False,
        ret_df: bool = False,
        **kwargs
):
    """
    Replicates GNU fpart's file partitioning on GPFS policy lists in parquet datasets for use in sync tools such as rsync. This takes in a dataframe of file sizes in bytes indexed by their full file path. Files in the dataframe are sequentially assigned to a partition until the sum of their size reaches a specified threshold or number of files. This behaves identically to fpart's live mode and is the only supported mode.

    Files can also be classified as either 'tiny' or 'big' and partitioned separately from the rest of the dataset. Tiny files can cause performance issues where the overhead of transferring individual files exceeds the time to transfer the file data itself. Instead, tiny files can be archived in a tarball prior to sync and transferred all at once. Transferring very large files can also be slow when done in a single chunk/thread. Instead, big files will be assigned each to their own partition to help facilitate chunking and parallel transfer.

    A file is created for each partition listing the file paths in that partition. Paths are null-delimited to account for cases where newlines are included in file names

    Parameters
    ----------
    df : pd.DataFrame
        Dataframe containing a column called 'size' and the file paths as the index.
    partition_path : str | Path
        Path to a directory to store the partition files
    partition_prefix : str, optional
        String to prepend to the partition number in the file name. By default 'part'
    max_part_size : str | int | None, optional
        Max total size of a standard partition in bytes. This can be specified either as an integer or as a human-readable byte string (e.g. 10*(1024**3), '10GiB', '10GB', and '10G' all refer to 10 Gibibytes). All byte strings are interpreted as base 2 instead of 10. If None, files will only be partitioned by max_part_files. By default '50GiB'
    max_part_files : int | None, optional
        Max number of files in a standard partition. Can be used in conjunction with max_part_size. At least one must be set. By default None
    tiny_size : str | int | None, optional
        Integer or human-readable byte string specifying which files are classified as tiny. If None, tiny_size is set to 0. By default '4kiB'
    max_tiny_part_size : str | int | None, optional
        Max total size per partition for tiny files. By default '1GiB'
    max_tiny_part_files : int | None, optional
        Max total files per partition for tiny files. By default 250000
    big_size : str | int | float | None, optional
        Integer or byte string specifying which files are classified as 'big'. Big files are each assigned to their own partition even if the big_size is much less than the max_part_size. If None, big_size is set to the lesser of max_part_size or float("inf"). By default None
    exclude_nonstandard : bool, optional
        Optionally exclude tiny and big files from the partition lists at the end. If files are categorized as big or tiny while this option is set, they will be ignored, and only files with "standard" sizes will be written to partition files. By default False
    ret_df : bool, optional
        Optionally return the file paths with their assigned partition for further processing and aggregation. By default False

    Returns
    -------
    pd.DataFrame
        Dataframe with file paths and their assigned partitions. Only returned if ret_df is True

    Raises
    ------
    ValueError
        If both max_part_size and max_part_files are None, a ValueError is raised
    """""""""
    if max_part_files is None and max_part_size is None:
        raise ValueError("At least one of max_part_files or max_part_size must be set")
    partition_path = as_path(partition_path)
    partition_path.mkdir(exist_ok=True,parents=True)

    max_part_files = float("inf") if max_part_files is None else max_part_files

    max_part_size = _to_bytes(max_part_size, None)
    tiny_size = _to_bytes(tiny_size, 0)
    max_tiny_part_size = _to_bytes(max_tiny_part_size,1024**3)
    big_size = _to_bytes(big_size, max_part_size)

    if tiny_size == 0:
        bins = [0,big_size,float('inf')]
        labels = ['standard','big']
    else:
        bins=[0, tiny_size, big_size, float("inf")]
        labels=['tiny','standard','big']

    df['size_grp'] = pd.cut(
        df['size'],
        bins=bins,
        labels=labels,
        include_lowest=True,
    )

    if 'tiny' in labels:
        tiny = df.loc[df['size_grp'] == 'tiny']['size'].to_frame()
    else:
        tiny = pd.DataFrame()
    big = df.loc[df['size_grp'] == 'big']['size'].to_frame()
    df = df.loc[df['size_grp'] == 'standard']['size'].to_frame()
    df['partition'] = fpart(df['size'], max_part_files, max_part_size, big_size)
    if not exclude_nonstandard:
        if not tiny.empty:
            init_grp = df['partition'].max() + 1
            tiny['partition'] = fpart(
                tiny['size'], 
                max_tiny_part_files, 
                max_tiny_part_size, 
                tiny_size,
                init_grp=init_grp
            )
            df = pd.concat([df,tiny])

        if not big.empty:
            init_grp = df['partition'].max() + 1
            big['partition'] = range(init_grp, init_grp + big.shape[0])
            df = pd.concat([df,big])

    write_partitions(df, partition_path=partition_path, partition_prefix=partition_prefix)

    return df if ret_df else None