Skip to content
Snippets Groups Projects

Draft: Partition parquet dataset for sync with s5cmd

Closed Matthew K Defenderfer requested to merge partition-parquet-dataset into main
Files
5
+ 63
0
 
#!/usr/bin/env python3
 
import dask.dataframe as dd
 
import numpy as np
 
import re
 
import os
 
import argparse
 
 
def parse_args():
 
parser = argparse.ArgumentParser()
 
parser.add_argument('-c','--split-count',type=int,default=10000)
 
parser.add_argument('-p','--partition-dir',type=str,default='part')
 
parser.add_argument('-f','--filter',type=str,required=True)
 
parser.add_argument('-i','--input-parquet',type=str,required=True)
 
parser.add_argument('-d','--destination',type=str,required=True)
 
parser.add_argument('-n','--dry-run',action='store_true',default=False)
 
args = parser.parse_args()
 
return args
 
 
def create_sync_cmd(path,filter,dest):
 
path_rn = re.sub(filter,dest,path)
 
cmd = f"sync --raw {path} {path_rn}"
 
return cmd
 
 
def main():
 
args = parse_args()
 
 
split_count = args.split_count
 
part_dir = args.partition_dir
 
input_parquet = args.input_parquet
 
dest = re.sub(r'/*$','',args.destination)
 
filter = re.sub(r'/*$','',args.filter)
 
 
 
ddf = dd.read_parquet(input_parquet)
 
ddf = ddf.loc[ddf['path'].str.startswith(filter)].sort_values('path')
 
 
if 'mode' in ddf.columns:
 
ddf = ddf.loc[~ddf['mode'].str.startswith('d')]
 
 
ddf['cmd'] = ddf['path'].map(lambda x: create_sync_cmd(x, filter=filter, dest=dest), meta=str)
 
 
df = ddf[['cmd']].compute().reset_index(drop=True)
 
df['group'] = np.floor(df.index/split_count).astype(int) + 1
 
 
 
os.makedirs(part_dir,exist_ok=True)
 
grouped = df.groupby('group')
 
 
# Iterate over each group
 
for group_number, group_data in grouped:
 
# Extract the 'value' column as a list
 
values = group_data['cmd'].tolist()
 
 
# Create a file name using the group number
 
file_name = f'part_{group_number}.txt'
 
 
# Write the values to the text file
 
with open(os.path.join(part_dir,file_name), 'wt') as f:
 
for value in values:
 
f.write(f"{value}\n")
 
 
if __name__ == "__main__":
 
main()
Loading