Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import re
from pathlib import Path
import pandas as pd
from ..utils import as_path, convert_si
def _to_bytes(val: str | int | None, default: int | None) -> int | None:
if isinstance(val,str):
val = re.sub(r'[iB]*$','',val)
val = convert_si(val,use_binary=True)
elif val is None and default is not None:
val = default
return val
def _write_partition(files, fname, delim=r'\0'):
with open(fname, 'w') as f:
f.write(delim.join(files))
def write_partitions(df, partition_path, partition_prefix):
dig = len(str(df['partition'].max()))
df['partition'] = df['partition'].astype(str).str.zfill(dig)
for ind,grp in df.groupby('partition'):
files = grp.index.values
fname = Path(partition_path).joinpath(f'{partition_prefix}-{ind}')
_write_partition(files,fname)
def fpart(series, max_part_files, max_part_size, max_file_size, init_grp=0):
group = init_grp
cumsum = 0
cur_grp_size = 0
groups = []
for i,value in enumerate(series):
if cur_grp_size > max_part_files:
group += 1
if value > max_file_size:
if i > 0:
group += 1
groups.append(group)
cumsum = 0
else:
cumsum += value
if cumsum >= max_part_size:
group += 1
cumsum = value
groups.append(group)
return groups
df: pd.DataFrame,
partition_path: str | Path,
partition_prefix: str = 'part',
max_part_size: str | int | None = '50GiB',
max_part_files: int | None = None,
tiny_size: str | int | None = '4kiB',
max_tiny_part_size: str | int | None = '1GiB',
max_tiny_part_files: int | None = 250000,
big_size: str | int | float | None = None,
exclude_nonstandard: bool = False,
ret_df: bool = False,
**kwargs
):
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""
Replicates GNU fpart's file partitioning on GPFS policy lists in parquet datasets for use in sync tools such as rsync. This takes in a dataframe of file sizes in bytes indexed by their full file path. Files in the dataframe are sequentially assigned to a partition until the sum of their size reaches a specified threshold or number of files. This behaves identically to fpart's live mode and is the only supported mode.
Files can also be classified as either 'tiny' or 'big' and partitioned separately from the rest of the dataset. Tiny files can cause performance issues where the overhead of transferring individual files exceeds the time to transfer the file data itself. Instead, tiny files can be archived in a tarball prior to sync and transferred all at once. Transferring very large files can also be slow when done in a single chunk/thread. Instead, big files will be assigned each to their own partition to help facilitate chunking and parallel transfer.
A file is created for each partition listing the file paths in that partition. Paths are null-delimited to account for cases where newlines are included in file names
Parameters
----------
df : pd.DataFrame
Dataframe containing a column called 'size' and the file paths as the index.
partition_path : str | Path
Path to a directory to store the partition files
partition_prefix : str, optional
String to prepend to the partition number in the file name. By default 'part'
max_part_size : str | int | None, optional
Max total size of a standard partition in bytes. This can be specified either as an integer or as a human-readable byte string (e.g. 10*(1024**3), '10GiB', '10GB', and '10G' all refer to 10 Gibibytes). All byte strings are interpreted as base 2 instead of 10. If None, files will only be partitioned by max_part_files. By default '50GiB'
max_part_files : int | None, optional
Max number of files in a standard partition. Can be used in conjunction with max_part_size. At least one must be set. By default None
tiny_size : str | int | None, optional
Integer or human-readable byte string specifying which files are classified as tiny. If None, tiny_size is set to 0. By default '4kiB'
max_tiny_part_size : str | int | None, optional
Max total size per partition for tiny files. By default '1GiB'
max_tiny_part_files : int | None, optional
Max total files per partition for tiny files. By default 250000
big_size : str | int | float | None, optional
Integer or byte string specifying which files are classified as 'big'. Big files are each assigned to their own partition even if the big_size is much less than the max_part_size. If None, big_size is set to the lesser of max_part_size or float("inf"). By default None
exclude_nonstandard : bool, optional
Optionally exclude tiny and big files from the partition lists at the end. If files are categorized as big or tiny while this option is set, they will be ignored, and only files with "standard" sizes will be written to partition files. By default False
ret_df : bool, optional
Optionally return the file paths with their assigned partition for further processing and aggregation. By default False
Returns
-------
pd.DataFrame
Dataframe with file paths and their assigned partitions. Only returned if ret_df is True
Raises
------
ValueError
If both max_part_size and max_part_files are None, a ValueError is raised
"""""""""
if max_part_files is None and max_part_size is None:
raise ValueError("At least one of max_part_files or max_part_size must be set")
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
partition_path = as_path(partition_path)
partition_path.mkdir(exist_ok=True,parents=True)
max_part_files = float("inf") if max_part_files is None else max_part_files
max_part_size = _to_bytes(max_part_size, None)
tiny_size = _to_bytes(tiny_size, 0)
max_tiny_part_size = _to_bytes(max_tiny_part_size,1024**3)
big_size = _to_bytes(big_size, max_part_size)
if tiny_size == 0:
bins = [0,big_size,float('inf')]
labels = ['standard','big']
else:
bins=[0, tiny_size, big_size, float("inf")]
labels=['tiny','standard','big']
df['size_grp'] = pd.cut(
df['size'],
bins=bins,
labels=labels,
include_lowest=True,
)
if 'tiny' in labels:
tiny = df.loc[df['size_grp'] == 'tiny']['size'].to_frame()
else:
tiny = pd.DataFrame()
big = df.loc[df['size_grp'] == 'big']['size'].to_frame()
df = df.loc[df['size_grp'] == 'standard']['size'].to_frame()
df['partition'] = fpart(df['size'], max_part_files, max_part_size, big_size)
if not exclude_nonstandard:
if not tiny.empty:
init_grp = df['partition'].max() + 1
tiny['partition'] = fpart(
tiny['size'],
max_tiny_part_files,
max_tiny_part_size,
tiny_size,
init_grp=init_grp
)
df = pd.concat([df,tiny])
if not big.empty:
init_grp = df['partition'].max() + 1
big['partition'] = range(init_grp, init_grp + big.shape[0])
df = pd.concat([df,big])
write_partitions(df, partition_path=partition_path, partition_prefix=partition_prefix)
return df if ret_df else None