Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import re
from pathlib import Path
import pandas as pd
from ..utils import as_path, convert_si
def _to_bytes(val: str | int | None, default: int | None) -> int | None:
if isinstance(val,str):
val = re.sub(r'[iB]*$','',val)
val = convert_si(val,use_binary=True)
elif val is None and default is not None:
val = default
return val
def _write_partition(files, fname, delim=r'\0'):
with open(fname, 'w') as f:
f.write(delim.join(files))
def write_partitions(df, partition_path, partition_prefix):
dig = len(str(df['partition'].max()))
df['partition'] = df['partition'].astype(str).str.zfill(dig)
for ind,grp in df.groupby('partition'):
files = grp.index.values
fname = Path(partition_path).joinpath(f'{partition_prefix}-{ind}')
_write_partition(files,fname)
def fpart(series, max_part_files, max_part_size, max_file_size, init_grp=0):
group = init_grp
cumsum = 0
cur_grp_size = 0
groups = []
for i,value in enumerate(series):
if cur_grp_size > max_part_files:
group += 1
if value > max_file_size:
if i > 0:
group += 1
groups.append(group)
cumsum = 0
else:
cumsum += value
if cumsum >= max_part_size:
group += 1
cumsum = value
groups.append(group)
return groups
def gpfspart(
df: pd.DataFrame,
partition_path: str | Path,
partition_prefix: str = 'part',
max_part_size: str | int | None = '50GiB',
max_part_files: int | None = None,
tiny_size: str | int | None = '4kiB',
max_tiny_part_size: str | int | None = '1GiB',
max_tiny_part_files: int | None = 250000,
big_size: str | int | float | None = None,
exclude_nonstandard: bool = False,
ret_df: bool = False,
**kwargs
):
if max_part_files is None and max_part_size is None:
raise ValueError("At least one of max_part_files or max_part_size must be set")
partition_path = as_path(partition_path)
partition_path.mkdir(exist_ok=True,parents=True)
max_part_files = float("inf") if max_part_files is None else max_part_files
max_part_size = _to_bytes(max_part_size, None)
tiny_size = _to_bytes(tiny_size, 0)
max_tiny_part_size = _to_bytes(max_tiny_part_size,1024**3)
big_size = _to_bytes(big_size, max_part_size)
if tiny_size == 0:
bins = [0,big_size,float('inf')]
labels = ['standard','big']
else:
bins=[0, tiny_size, big_size, float("inf")]
labels=['tiny','standard','big']
df['size_grp'] = pd.cut(
df['size'],
bins=bins,
labels=labels,
include_lowest=True,
)
if 'tiny' in labels:
tiny = df.loc[df['size_grp'] == 'tiny']['size'].to_frame()
else:
tiny = pd.DataFrame()
big = df.loc[df['size_grp'] == 'big']['size'].to_frame()
df = df.loc[df['size_grp'] == 'standard']['size'].to_frame()
df['partition'] = fpart(df['size'], max_part_files, max_part_size, big_size)
if not exclude_nonstandard:
if not tiny.empty:
init_grp = df['partition'].max() + 1
tiny['partition'] = fpart(
tiny['size'],
max_tiny_part_files,
max_tiny_part_size,
tiny_size,
init_grp=init_grp
)
df = pd.concat([df,tiny])
if not big.empty:
init_grp = df['partition'].max() + 1
big['partition'] = range(init_grp, init_grp + big.shape[0])
df = pd.concat([df,big])
write_partitions(df, partition_path=partition_path, partition_prefix=partition_prefix)
return df if ret_df else None