From 94a2b791e92a096c39832853b89e927839b246ef Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Fri, 31 Jan 2025 12:42:12 -0600
Subject: [PATCH] Add log partitioning similar to fpart

---
 pyproject.toml                  |   1 +
 src/rc_gpfs/cli/__init__.py     |   3 +-
 src/rc_gpfs/cli/gpfs_fpart.py   |  70 ++++++++++++++++++
 src/rc_gpfs/policy/convert.py   |  73 ++++++++++++++++++-
 src/rc_gpfs/process/__init__.py |   3 +-
 src/rc_gpfs/process/gpfspart.py | 121 ++++++++++++++++++++++++++++++++
 src/rc_gpfs/utils.py            |   2 +-
 7 files changed, 269 insertions(+), 4 deletions(-)
 create mode 100644 src/rc_gpfs/cli/gpfs_fpart.py
 create mode 100644 src/rc_gpfs/process/gpfspart.py

diff --git a/pyproject.toml b/pyproject.toml
index 1edf54b..b7c20dd 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -37,6 +37,7 @@ priority = "supplemental"
 convert-to-hive = "rc_gpfs.cli:convert_flat_to_hive"
 convert-to-parquet = "rc_gpfs.cli:convert_to_parquet"
 split-log = "rc_gpfs.cli:split_log"
+gpfspart = "rc_gpfs.cli:gpfs_fpart"
 
 [tool.poetry-dynamic-versioning]
 enable = true
diff --git a/src/rc_gpfs/cli/__init__.py b/src/rc_gpfs/cli/__init__.py
index 305d2af..7250116 100644
--- a/src/rc_gpfs/cli/__init__.py
+++ b/src/rc_gpfs/cli/__init__.py
@@ -1,3 +1,4 @@
 from .convert_flat_to_hive import convert_flat_to_hive
 from .convert_to_parquet import convert_to_parquet
-from .split_log import split_log
\ No newline at end of file
+from .split_log import split_log
+from .gpfs_fpart import gpfs_fpart
\ No newline at end of file
diff --git a/src/rc_gpfs/cli/gpfs_fpart.py b/src/rc_gpfs/cli/gpfs_fpart.py
new file mode 100644
index 0000000..1b881ab
--- /dev/null
+++ b/src/rc_gpfs/cli/gpfs_fpart.py
@@ -0,0 +1,70 @@
+import argparse
+from pathlib import Path
+import pandas as pd
+
+from ..process import gpfspart
+
+DESCRIPTION = """
+gpfspart is a custom implementation of the fpart algorithm specifically designed to work with processed GPFS policy outputs. fpart crawls a directory tree partitioning the files by size up to a specified max and stores them in a number of lists. These lists are useful for passing to rsync and its derivative implementations to load balance and parallelize large data transfers. However, fpart has to crawl a file tree to create these partitions which can impact performance on very large, network file systems such as GPFS.
+
+Instead, gpfspart reads a GPFS policy log output that has been parsed and converted to a parquet dataset and creates file partitions without needing to crawl the filesystem. It simulates fpart's --live mode, a naive partitioning scheme where the file listing is only run through once and partitions are created as the running total of bytes reaches the specified limit. This creates unoptimized file partitions as far as trying to create as few partitions as possible, but it does preserve the order of the files from the original log if that sorting is important.
+
+Tiny and Large Files
+--------------------
+
+Options are included to specify minimum and maximum size cutoffs to define which files are included in the default grouping. These excluded files can then either be partitioned separately or not at all with another option. This was written to optimize functions such as parsyncfp2 which have built-in options to process tiny or large files differently. For example, parsyncfp2 will tar tiny files togather and transfer the tarball as well as chunk large files and transfer chunks concurrently.
+"""
+
+def parse_args():
+    parser = argparse.ArgumentParser(
+        description=DESCRIPTION,
+        formatter_class=argparse.RawTextHelpFormatter
+    )
+    parser.add_argument('parquet_path',
+                        type=Path,
+                        help="Input path for the parquet GPFS dataset to chunk")
+    parser.add_argument('-p','--partition-path',
+                        type=Path,
+                        default=None,
+                        help="Path to write partition files. Defaults to ${{parquet_path}}/_partitions")
+    parser.add_argument('-m','--max-part-size',
+                        type=str,
+                        default='50GiB',
+                        help="Max combined size of all files in a partition. This can be specified either as a human-readable byte string (e.g. 10M[[i]B], 100G[[i]B]) or as a raw integer. Byte strings will be interpreted as base 2 (e.g 1kB is always 1024 bytes)")
+    parser.add_argument('-f','--max-part-files',
+                        type=int,
+                        default=None,
+                        help="Maximum number of files to include in any partition. Works with --max-size where all partitions meet both criteria")
+    parser.add_argument('-t','--tiny-size',
+                        type=str,
+                        default=None,
+                        help="Max size of file to be specified as 'tiny'. Tiny files are partitioned separately from other files by default. They can be excluded entirely using the --exclude-nonstandard flag")
+    parser.add_argument('--max-tiny-part-size',
+                        type=str,
+                        default='1GiB',
+                        help="Max partition size for tiny files")
+    parser.add_argument('--max-tiny-part-files',
+                        type=int,
+                        default=250000,
+                        help="Max number of files in a partition of tiny files")
+    parser.add_argument('-b','--big-size',
+                        type=str,
+                        default=None,
+                        help="Minimum file size to specified as 'big'. Files above this limit will be assigned to their own unique partition. This value is implicitly set to the max partition size. Setting this value above the max partition size would have no effect. These files can be excluded entirely using the --exclude-nonstandard flag")
+    parser.add_argument('--exclude-nonstandard',
+                        default=False,
+                        action="store_true",
+                        help="Exclude all tiny and big files from partitioning. Partitions will only include files between tiny-size and big-size.")
+
+    args = parser.parse_args()
+    return vars(args)
+
+def gpfs_fpart():
+    args = parse_args()
+
+    if args.get('partition_path') is None:
+        pq_path = args.get('parquet_path')
+        args.update({'partition_path': pq_path.joinpath('_partitions')})
+    
+    df = pd.read_parquet(args.get('parquet_path'))
+    gpfspart(df,**args)
\ No newline at end of file
diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py
index 16f1c2d..73c26d0 100755
--- a/src/rc_gpfs/policy/convert.py
+++ b/src/rc_gpfs/policy/convert.py
@@ -12,6 +12,7 @@ import GPUtil
 
 import cudf
 import pandas as pd
+import numpy as np
 import dask.dataframe as dd
 import dask.config
 import rmm
@@ -165,6 +166,63 @@ def hivize(
     shutil.rmtree(staging_path)
     pass
 
+def bytes_to_human_readable_size(num_bytes):
+    units = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']
+    
+    # Handle the case where num_bytes is 0
+    if num_bytes == 0:
+        return "0 B"
+    
+    # Calculate the appropriate unit. Take the floor if the number is not divisible by 1024
+    unit_index = min(len(units) - 1, int(np.log2(num_bytes) // 10))
+    num = int(num_bytes // (1024 ** unit_index))
+    
+    # Format the number to 2 decimal places and append the unit
+    return f"{num} {units[unit_index]}"
+
+def create_bin_labels(bins, include_outer_bins = True):
+    if any([b < 0 for b in bins]):
+        raise ValueError("File size cannot be less than 0 B")
+    bins = list(set(bins))
+    bins.sort() # Sorts and removes any duplicates
+    if np.isinf(bins[-1]):
+        bins.remove(np.inf)
+    
+    labels = []
+    
+    # Check for including a bin for all files less than the smallest value if 
+    if include_outer_bins and bins[0] != 0:
+        labels.append(f"0 B-{bytes_to_human_readable_size(bins[0])}")
+    
+    for i in range(len(bins) - 1):
+        lower_bound = bytes_to_human_readable_size(bins[i])
+        upper_bound = bytes_to_human_readable_size(bins[i + 1])
+        labels.append(f"{lower_bound}-{upper_bound}")
+    
+    if include_outer_bins:
+        labels.append(f">{bytes_to_human_readable_size(bins[-1])}")
+    return labels,bins
+
+def calculate_size_distribution(
+        sizes: pd.Series | cudf.Series, 
+        size_bins: List[int | float] = [0,4096] + [x*1024**3 for x in [1,10,50,100]] + [1024**4],
+        include_outer_bins: bool = True,
+        **kwargs
+) -> pd.Series | cudf.Series:
+    
+    size_labels,size_bins = create_bin_labels(size_bins,include_outer_bins)
+
+    if include_outer_bins:
+        if size_bins[0] < 0:
+            size_bins.insert(0,0)
+        if float("inf") not in size_bins:
+            size_bins.append(float("inf"))
+
+    fn = pd.cut if isinstance(sizes,pd.Series) else cudf.cut
+    size_grps = fn(sizes,bins=size_bins,labels=size_labels,ordered=True,include_lowest=True,**kwargs)
+
+    return size_grps
+
 def write_dataset_metadata(df: cudf.DataFrame | pd.DataFrame, parquet_path: str | Path) -> dict:
     parquet_path = as_path(parquet_path)
     if isinstance(df,cudf.DataFrame):
@@ -177,11 +235,24 @@ def write_dataset_metadata(df: cudf.DataFrame | pd.DataFrame, parquet_path: str
             vram_usage = 'NA'
         ram_usage = df.memory_usage(deep=True).to_dict()
     
+    df = df[['size']]
+    df['grp'] = calculate_size_distribution(df['size'])
+    
+    size_dist = (
+        df
+        .groupby('grp',observed=True)['size']
+        .agg(['count','sum'])
+        .rename(columns={'count':'file_count','sum':'bytes'})
+        .T
+        .to_dict()
+    )
+
     metadata = {
         'num_rows': df.shape[0],
         'num_columns': df.shape[1],
         'vram_usage': vram_usage,
-        'ram_usage': ram_usage
+        'ram_usage': ram_usage,
+        'size_distribution': size_dist
     }
 
     with open(parquet_path.joinpath('_metadata.json'),'w') as f:
diff --git a/src/rc_gpfs/process/__init__.py b/src/rc_gpfs/process/__init__.py
index 40cc503..9bf1d1e 100644
--- a/src/rc_gpfs/process/__init__.py
+++ b/src/rc_gpfs/process/__init__.py
@@ -1 +1,2 @@
-from .process import *
\ No newline at end of file
+from .process import *
+from .gpfspart import gpfspart
\ No newline at end of file
diff --git a/src/rc_gpfs/process/gpfspart.py b/src/rc_gpfs/process/gpfspart.py
new file mode 100644
index 0000000..d062324
--- /dev/null
+++ b/src/rc_gpfs/process/gpfspart.py
@@ -0,0 +1,121 @@
+import re
+from pathlib import Path
+import pandas as pd
+
+from ..utils import as_path, convert_si
+
+def _to_bytes(val: str | int | None, default: int | None) -> int | None:
+    if isinstance(val,str):
+        val = re.sub(r'[iB]*$','',val)
+        val = convert_si(val,use_binary=True)
+    elif val is None and default is not None:
+        val = default
+    return val
+
+def _write_partition(files, fname, delim=r'\0'):
+    with open(fname, 'w') as f:
+        f.write(delim.join(files))
+
+def write_partitions(df, partition_path, partition_prefix):
+    dig = len(str(df['partition'].max()))
+    df['partition'] = df['partition'].astype(str).str.zfill(dig)
+    for ind,grp in df.groupby('partition'):
+        files = grp.index.values
+        fname = Path(partition_path).joinpath(f'{partition_prefix}-{ind}')
+        _write_partition(files,fname)
+
+def fpart(series, max_part_files, max_part_size, max_file_size, init_grp=0):
+    group = init_grp
+    cumsum = 0
+    cur_grp_size = 0
+    groups = []
+
+    for i,value in enumerate(series):
+        if cur_grp_size > max_part_files:
+            group += 1
+            
+        if value > max_file_size:
+            if i > 0:
+                group += 1
+            groups.append(group)
+            cumsum = 0
+        else:
+            cumsum += value
+            if cumsum >= max_part_size:
+                group += 1
+                cumsum = value
+            groups.append(group)
+    
+    return groups
+
+def gpfspart(
+        df: pd.DataFrame,
+        partition_path: str | Path,
+        partition_prefix: str = 'part',
+        max_part_size: str | int | None = '50GiB',
+        max_part_files: int | None = None,
+        tiny_size: str | int | None = '4kiB',
+        max_tiny_part_size: str | int | None = '1GiB',
+        max_tiny_part_files: int | None = 250000,
+        big_size: str | int | float | None = None,
+        exclude_nonstandard: bool = False,
+        ret_df: bool = False,
+        **kwargs
+):
+    if max_part_files is None and max_part_size is None:
+        raise ValueError("At least one of max_part_files or max_part_size must be set")
+    
+    partition_path = as_path(partition_path)
+    partition_path.mkdir(exist_ok=True,parents=True)
+
+    max_part_files = float("inf") if max_part_files is None else max_part_files
+
+    max_part_size = _to_bytes(max_part_size, None)
+    tiny_size = _to_bytes(tiny_size, 0)
+    max_tiny_part_size = _to_bytes(max_tiny_part_size,1024**3)
+    big_size = _to_bytes(big_size, max_part_size)
+
+    if tiny_size == 0:
+        bins = [0,big_size,float('inf')]
+        labels = ['standard','big']
+    else:
+        bins=[0, tiny_size, big_size, float("inf")]
+        labels=['tiny','standard','big']
+
+    df['size_grp'] = pd.cut(
+        df['size'],
+        bins=bins,
+        labels=labels,
+        include_lowest=True,
+    )
+
+    if 'tiny' in labels:
+        tiny = df.loc[df['size_grp'] == 'tiny']['size'].to_frame()
+    else:
+        tiny = pd.DataFrame()
+    
+    big = df.loc[df['size_grp'] == 'big']['size'].to_frame()
+    df = df.loc[df['size_grp'] == 'standard']['size'].to_frame()
+    
+    df['partition'] = fpart(df['size'], max_part_files, max_part_size, big_size)
+    
+    if not exclude_nonstandard:
+        if not tiny.empty:
+            init_grp = df['partition'].max() + 1
+            tiny['partition'] = fpart(
+                tiny['size'], 
+                max_tiny_part_files, 
+                max_tiny_part_size, 
+                tiny_size,
+                init_grp=init_grp
+            )
+            df = pd.concat([df,tiny])
+
+        if not big.empty:
+            init_grp = df['partition'].max() + 1
+            big['partition'] = range(init_grp, init_grp + big.shape[0])
+            df = pd.concat([df,big])
+
+    write_partitions(df, partition_path=partition_path, partition_prefix=partition_prefix)
+    
+    return df if ret_df else None
\ No newline at end of file
diff --git a/src/rc_gpfs/utils.py b/src/rc_gpfs/utils.py
index dc54e47..d486343 100644
--- a/src/rc_gpfs/utils.py
+++ b/src/rc_gpfs/utils.py
@@ -45,7 +45,7 @@ def convert_si(value: str | float | int,
         # Extract numeric part and unit part
         value = value.strip()
         for suffix in ['K', 'M', 'G', 'T']:
-            if value.endswith(suffix):
+            if value.upper().endswith(suffix):
                 unit = suffix
                 value = value[:-1]
                 break
-- 
GitLab