Skip to content
Snippets Groups Projects

Draft: Partition parquet dataset for sync with s5cmd

Closed Matthew K Defenderfer requested to merge partition-parquet-dataset into main
Files
5
+ 63
0
#!/usr/bin/env python3
import dask.dataframe as dd
import numpy as np
import re
import os
import argparse
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('-c','--split-count',type=int,default=10000)
parser.add_argument('-p','--partition-dir',type=str,default='part')
parser.add_argument('-f','--filter',type=str,required=True)
parser.add_argument('-i','--input-parquet',type=str,required=True)
parser.add_argument('-d','--destination',type=str,required=True)
parser.add_argument('-n','--dry-run',action='store_true',default=False)
args = parser.parse_args()
return args
def create_sync_cmd(path,filter,dest):
path_rn = re.sub(filter,dest,path)
cmd = f"sync --raw {path} {path_rn}"
return cmd
def main():
args = parse_args()
split_count = args.split_count
part_dir = args.partition_dir
input_parquet = args.input_parquet
dest = re.sub(r'/*$','',args.destination)
filter = re.sub(r'/*$','',args.filter)
ddf = dd.read_parquet(input_parquet)
ddf = ddf.loc[ddf['path'].str.startswith(filter)].sort_values('path')
if 'mode' in ddf.columns:
ddf = ddf.loc[~ddf['mode'].str.startswith('d')]
ddf['cmd'] = ddf['path'].map(lambda x: create_sync_cmd(x, filter=filter, dest=dest), meta=str)
df = ddf[['cmd']].compute().reset_index(drop=True)
df['group'] = np.floor(df.index/split_count).astype(int) + 1
os.makedirs(part_dir,exist_ok=True)
grouped = df.groupby('group')
# Iterate over each group
for group_number, group_data in grouped:
# Extract the 'value' column as a list
values = group_data['cmd'].tolist()
# Create a file name using the group number
file_name = f'part_{group_number}.txt'
# Write the values to the text file
with open(os.path.join(part_dir,file_name), 'wt') as f:
for value in values:
f.write(f"{value}\n")
if __name__ == "__main__":
main()
Loading