Skip to content
Snippets Groups Projects
gpfspart.py 3.76 KiB
Newer Older
import re
from pathlib import Path
import pandas as pd

from ..utils import as_path, convert_si

def _to_bytes(val: str | int | None, default: int | None) -> int | None:
    if isinstance(val,str):
        val = re.sub(r'[iB]*$','',val)
        val = convert_si(val,use_binary=True)
    elif val is None and default is not None:
        val = default
    return val

def _write_partition(files, fname, delim=r'\0'):
    with open(fname, 'w') as f:
        f.write(delim.join(files))

def write_partitions(df, partition_path, partition_prefix):
    dig = len(str(df['partition'].max()))
    df['partition'] = df['partition'].astype(str).str.zfill(dig)
    for ind,grp in df.groupby('partition'):
        files = grp.index.values
        fname = Path(partition_path).joinpath(f'{partition_prefix}-{ind}')
        _write_partition(files,fname)

def fpart(series, max_part_files, max_part_size, max_file_size, init_grp=0):
    group = init_grp
    cumsum = 0
    cur_grp_size = 0
    groups = []

    for i,value in enumerate(series):
        if cur_grp_size > max_part_files:
            group += 1
            
        if value > max_file_size:
            if i > 0:
                group += 1
            groups.append(group)
            cumsum = 0
        else:
            cumsum += value
            if cumsum >= max_part_size:
                group += 1
                cumsum = value
            groups.append(group)
    
    return groups

def gpfspart(
        df: pd.DataFrame,
        partition_path: str | Path,
        partition_prefix: str = 'part',
        max_part_size: str | int | None = '50GiB',
        max_part_files: int | None = None,
        tiny_size: str | int | None = '4kiB',
        max_tiny_part_size: str | int | None = '1GiB',
        max_tiny_part_files: int | None = 250000,
        big_size: str | int | float | None = None,
        exclude_nonstandard: bool = False,
        ret_df: bool = False,
        **kwargs
):
    if max_part_files is None and max_part_size is None:
        raise ValueError("At least one of max_part_files or max_part_size must be set")
    
    partition_path = as_path(partition_path)
    partition_path.mkdir(exist_ok=True,parents=True)

    max_part_files = float("inf") if max_part_files is None else max_part_files

    max_part_size = _to_bytes(max_part_size, None)
    tiny_size = _to_bytes(tiny_size, 0)
    max_tiny_part_size = _to_bytes(max_tiny_part_size,1024**3)
    big_size = _to_bytes(big_size, max_part_size)

    if tiny_size == 0:
        bins = [0,big_size,float('inf')]
        labels = ['standard','big']
    else:
        bins=[0, tiny_size, big_size, float("inf")]
        labels=['tiny','standard','big']

    df['size_grp'] = pd.cut(
        df['size'],
        bins=bins,
        labels=labels,
        include_lowest=True,
    )

    if 'tiny' in labels:
        tiny = df.loc[df['size_grp'] == 'tiny']['size'].to_frame()
    else:
        tiny = pd.DataFrame()
    
    big = df.loc[df['size_grp'] == 'big']['size'].to_frame()
    df = df.loc[df['size_grp'] == 'standard']['size'].to_frame()
    
    df['partition'] = fpart(df['size'], max_part_files, max_part_size, big_size)
    
    if not exclude_nonstandard:
        if not tiny.empty:
            init_grp = df['partition'].max() + 1
            tiny['partition'] = fpart(
                tiny['size'], 
                max_tiny_part_files, 
                max_tiny_part_size, 
                tiny_size,
                init_grp=init_grp
            )
            df = pd.concat([df,tiny])

        if not big.empty:
            init_grp = df['partition'].max() + 1
            big['partition'] = range(init_grp, init_grp + big.shape[0])
            df = pd.concat([df,big])

    write_partitions(df, partition_path=partition_path, partition_prefix=partition_prefix)
    
    return df if ret_df else None