diff --git a/pyproject.toml b/pyproject.toml index 1edf54b0b227e67a99ef983f76a93eaab77ca167..b7c20dd8351d2fa9dcf0db3e57de33915fd1925b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ priority = "supplemental" convert-to-hive = "rc_gpfs.cli:convert_flat_to_hive" convert-to-parquet = "rc_gpfs.cli:convert_to_parquet" split-log = "rc_gpfs.cli:split_log" +gpfspart = "rc_gpfs.cli:gpfs_fpart" [tool.poetry-dynamic-versioning] enable = true diff --git a/src/rc_gpfs/cli/__init__.py b/src/rc_gpfs/cli/__init__.py index 305d2af147fe8f3425e8e6fed55ee08a41e4bd95..725011624c89776ef4c504679a988a51ba8aeb93 100644 --- a/src/rc_gpfs/cli/__init__.py +++ b/src/rc_gpfs/cli/__init__.py @@ -1,3 +1,4 @@ from .convert_flat_to_hive import convert_flat_to_hive from .convert_to_parquet import convert_to_parquet -from .split_log import split_log \ No newline at end of file +from .split_log import split_log +from .gpfs_fpart import gpfs_fpart \ No newline at end of file diff --git a/src/rc_gpfs/cli/gpfs_fpart.py b/src/rc_gpfs/cli/gpfs_fpart.py new file mode 100644 index 0000000000000000000000000000000000000000..1b881ab5c64b5cd44dabe45e888feed01cf06ece --- /dev/null +++ b/src/rc_gpfs/cli/gpfs_fpart.py @@ -0,0 +1,70 @@ +import argparse +from pathlib import Path +import pandas as pd + +from ..process import gpfspart + +DESCRIPTION = """ +gpfspart is a custom implementation of the fpart algorithm specifically designed to work with processed GPFS policy outputs. fpart crawls a directory tree partitioning the files by size up to a specified max and stores them in a number of lists. These lists are useful for passing to rsync and its derivative implementations to load balance and parallelize large data transfers. However, fpart has to crawl a file tree to create these partitions which can impact performance on very large, network file systems such as GPFS. + +Instead, gpfspart reads a GPFS policy log output that has been parsed and converted to a parquet dataset and creates file partitions without needing to crawl the filesystem. It simulates fpart's --live mode, a naive partitioning scheme where the file listing is only run through once and partitions are created as the running total of bytes reaches the specified limit. This creates unoptimized file partitions as far as trying to create as few partitions as possible, but it does preserve the order of the files from the original log if that sorting is important. + +Tiny and Large Files +-------------------- + +Options are included to specify minimum and maximum size cutoffs to define which files are included in the default grouping. These excluded files can then either be partitioned separately or not at all with another option. This was written to optimize functions such as parsyncfp2 which have built-in options to process tiny or large files differently. For example, parsyncfp2 will tar tiny files togather and transfer the tarball as well as chunk large files and transfer chunks concurrently. +""" + +def parse_args(): + parser = argparse.ArgumentParser( + description=DESCRIPTION, + formatter_class=argparse.RawTextHelpFormatter + ) + parser.add_argument('parquet_path', + type=Path, + help="Input path for the parquet GPFS dataset to chunk") + parser.add_argument('-p','--partition-path', + type=Path, + default=None, + help="Path to write partition files. Defaults to ${{parquet_path}}/_partitions") + parser.add_argument('-m','--max-part-size', + type=str, + default='50GiB', + help="Max combined size of all files in a partition. This can be specified either as a human-readable byte string (e.g. 10M[[i]B], 100G[[i]B]) or as a raw integer. Byte strings will be interpreted as base 2 (e.g 1kB is always 1024 bytes)") + parser.add_argument('-f','--max-part-files', + type=int, + default=None, + help="Maximum number of files to include in any partition. Works with --max-size where all partitions meet both criteria") + parser.add_argument('-t','--tiny-size', + type=str, + default=None, + help="Max size of file to be specified as 'tiny'. Tiny files are partitioned separately from other files by default. They can be excluded entirely using the --exclude-nonstandard flag") + parser.add_argument('--max-tiny-part-size', + type=str, + default='1GiB', + help="Max partition size for tiny files") + parser.add_argument('--max-tiny-part-files', + type=int, + default=250000, + help="Max number of files in a partition of tiny files") + parser.add_argument('-b','--big-size', + type=str, + default=None, + help="Minimum file size to specified as 'big'. Files above this limit will be assigned to their own unique partition. This value is implicitly set to the max partition size. Setting this value above the max partition size would have no effect. These files can be excluded entirely using the --exclude-nonstandard flag") + parser.add_argument('--exclude-nonstandard', + default=False, + action="store_true", + help="Exclude all tiny and big files from partitioning. Partitions will only include files between tiny-size and big-size.") + + args = parser.parse_args() + return vars(args) + +def gpfs_fpart(): + args = parse_args() + + if args.get('partition_path') is None: + pq_path = args.get('parquet_path') + args.update({'partition_path': pq_path.joinpath('_partitions')}) + + df = pd.read_parquet(args.get('parquet_path')) + gpfspart(df,**args) \ No newline at end of file diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index 16f1c2dfcead0723660eb0ef5f01e3ff9763f037..73c26d012ebf79a34baf704da79873717a0cca41 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -12,6 +12,7 @@ import GPUtil import cudf import pandas as pd +import numpy as np import dask.dataframe as dd import dask.config import rmm @@ -165,6 +166,63 @@ def hivize( shutil.rmtree(staging_path) pass +def bytes_to_human_readable_size(num_bytes): + units = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB'] + + # Handle the case where num_bytes is 0 + if num_bytes == 0: + return "0 B" + + # Calculate the appropriate unit. Take the floor if the number is not divisible by 1024 + unit_index = min(len(units) - 1, int(np.log2(num_bytes) // 10)) + num = int(num_bytes // (1024 ** unit_index)) + + # Format the number to 2 decimal places and append the unit + return f"{num} {units[unit_index]}" + +def create_bin_labels(bins, include_outer_bins = True): + if any([b < 0 for b in bins]): + raise ValueError("File size cannot be less than 0 B") + bins = list(set(bins)) + bins.sort() # Sorts and removes any duplicates + if np.isinf(bins[-1]): + bins.remove(np.inf) + + labels = [] + + # Check for including a bin for all files less than the smallest value if + if include_outer_bins and bins[0] != 0: + labels.append(f"0 B-{bytes_to_human_readable_size(bins[0])}") + + for i in range(len(bins) - 1): + lower_bound = bytes_to_human_readable_size(bins[i]) + upper_bound = bytes_to_human_readable_size(bins[i + 1]) + labels.append(f"{lower_bound}-{upper_bound}") + + if include_outer_bins: + labels.append(f">{bytes_to_human_readable_size(bins[-1])}") + return labels,bins + +def calculate_size_distribution( + sizes: pd.Series | cudf.Series, + size_bins: List[int | float] = [0,4096] + [x*1024**3 for x in [1,10,50,100]] + [1024**4], + include_outer_bins: bool = True, + **kwargs +) -> pd.Series | cudf.Series: + + size_labels,size_bins = create_bin_labels(size_bins,include_outer_bins) + + if include_outer_bins: + if size_bins[0] < 0: + size_bins.insert(0,0) + if float("inf") not in size_bins: + size_bins.append(float("inf")) + + fn = pd.cut if isinstance(sizes,pd.Series) else cudf.cut + size_grps = fn(sizes,bins=size_bins,labels=size_labels,ordered=True,include_lowest=True,**kwargs) + + return size_grps + def write_dataset_metadata(df: cudf.DataFrame | pd.DataFrame, parquet_path: str | Path) -> dict: parquet_path = as_path(parquet_path) if isinstance(df,cudf.DataFrame): @@ -177,11 +235,24 @@ def write_dataset_metadata(df: cudf.DataFrame | pd.DataFrame, parquet_path: str vram_usage = 'NA' ram_usage = df.memory_usage(deep=True).to_dict() + df = df[['size']] + df['grp'] = calculate_size_distribution(df['size']) + + size_dist = ( + df + .groupby('grp',observed=True)['size'] + .agg(['count','sum']) + .rename(columns={'count':'file_count','sum':'bytes'}) + .T + .to_dict() + ) + metadata = { 'num_rows': df.shape[0], 'num_columns': df.shape[1], 'vram_usage': vram_usage, - 'ram_usage': ram_usage + 'ram_usage': ram_usage, + 'size_distribution': size_dist } with open(parquet_path.joinpath('_metadata.json'),'w') as f: diff --git a/src/rc_gpfs/process/__init__.py b/src/rc_gpfs/process/__init__.py index 40cc503a9b6a1b7dc7455fa23c20642b90982665..9bf1d1e2d5cffd912984faff6950ca8f386d1b38 100644 --- a/src/rc_gpfs/process/__init__.py +++ b/src/rc_gpfs/process/__init__.py @@ -1 +1,2 @@ -from .process import * \ No newline at end of file +from .process import * +from .gpfspart import gpfspart \ No newline at end of file diff --git a/src/rc_gpfs/process/gpfspart.py b/src/rc_gpfs/process/gpfspart.py new file mode 100644 index 0000000000000000000000000000000000000000..d062324394abc362c24c377dbf7dfa7402cf60a9 --- /dev/null +++ b/src/rc_gpfs/process/gpfspart.py @@ -0,0 +1,121 @@ +import re +from pathlib import Path +import pandas as pd + +from ..utils import as_path, convert_si + +def _to_bytes(val: str | int | None, default: int | None) -> int | None: + if isinstance(val,str): + val = re.sub(r'[iB]*$','',val) + val = convert_si(val,use_binary=True) + elif val is None and default is not None: + val = default + return val + +def _write_partition(files, fname, delim=r'\0'): + with open(fname, 'w') as f: + f.write(delim.join(files)) + +def write_partitions(df, partition_path, partition_prefix): + dig = len(str(df['partition'].max())) + df['partition'] = df['partition'].astype(str).str.zfill(dig) + for ind,grp in df.groupby('partition'): + files = grp.index.values + fname = Path(partition_path).joinpath(f'{partition_prefix}-{ind}') + _write_partition(files,fname) + +def fpart(series, max_part_files, max_part_size, max_file_size, init_grp=0): + group = init_grp + cumsum = 0 + cur_grp_size = 0 + groups = [] + + for i,value in enumerate(series): + if cur_grp_size > max_part_files: + group += 1 + + if value > max_file_size: + if i > 0: + group += 1 + groups.append(group) + cumsum = 0 + else: + cumsum += value + if cumsum >= max_part_size: + group += 1 + cumsum = value + groups.append(group) + + return groups + +def gpfspart( + df: pd.DataFrame, + partition_path: str | Path, + partition_prefix: str = 'part', + max_part_size: str | int | None = '50GiB', + max_part_files: int | None = None, + tiny_size: str | int | None = '4kiB', + max_tiny_part_size: str | int | None = '1GiB', + max_tiny_part_files: int | None = 250000, + big_size: str | int | float | None = None, + exclude_nonstandard: bool = False, + ret_df: bool = False, + **kwargs +): + if max_part_files is None and max_part_size is None: + raise ValueError("At least one of max_part_files or max_part_size must be set") + + partition_path = as_path(partition_path) + partition_path.mkdir(exist_ok=True,parents=True) + + max_part_files = float("inf") if max_part_files is None else max_part_files + + max_part_size = _to_bytes(max_part_size, None) + tiny_size = _to_bytes(tiny_size, 0) + max_tiny_part_size = _to_bytes(max_tiny_part_size,1024**3) + big_size = _to_bytes(big_size, max_part_size) + + if tiny_size == 0: + bins = [0,big_size,float('inf')] + labels = ['standard','big'] + else: + bins=[0, tiny_size, big_size, float("inf")] + labels=['tiny','standard','big'] + + df['size_grp'] = pd.cut( + df['size'], + bins=bins, + labels=labels, + include_lowest=True, + ) + + if 'tiny' in labels: + tiny = df.loc[df['size_grp'] == 'tiny']['size'].to_frame() + else: + tiny = pd.DataFrame() + + big = df.loc[df['size_grp'] == 'big']['size'].to_frame() + df = df.loc[df['size_grp'] == 'standard']['size'].to_frame() + + df['partition'] = fpart(df['size'], max_part_files, max_part_size, big_size) + + if not exclude_nonstandard: + if not tiny.empty: + init_grp = df['partition'].max() + 1 + tiny['partition'] = fpart( + tiny['size'], + max_tiny_part_files, + max_tiny_part_size, + tiny_size, + init_grp=init_grp + ) + df = pd.concat([df,tiny]) + + if not big.empty: + init_grp = df['partition'].max() + 1 + big['partition'] = range(init_grp, init_grp + big.shape[0]) + df = pd.concat([df,big]) + + write_partitions(df, partition_path=partition_path, partition_prefix=partition_prefix) + + return df if ret_df else None \ No newline at end of file diff --git a/src/rc_gpfs/utils.py b/src/rc_gpfs/utils.py index dc54e4736619da9a90e2e584c2a103de61d68970..d48634309232a5d59237fbb11b377e2313ac1dad 100644 --- a/src/rc_gpfs/utils.py +++ b/src/rc_gpfs/utils.py @@ -45,7 +45,7 @@ def convert_si(value: str | float | int, # Extract numeric part and unit part value = value.strip() for suffix in ['K', 'M', 'G', 'T']: - if value.endswith(suffix): + if value.upper().endswith(suffix): unit = suffix value = value[:-1] break