Skip to content
Snippets Groups Projects
Commit ce4e2735 authored by Matthew K Defenderfer's avatar Matthew K Defenderfer
Browse files

add script to partition a parquet dataset to prep for s5cmd transfer

parent 91e8f412
No related branches found
No related tags found
1 merge request!9Draft: Partition parquet dataset for sync with s5cmd
#!/usr/bin/env python3
import dask.dataframe as dd
import numpy as np
import re
import os
import argparse
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('-c','--split-count',type=int,default=10000)
parser.add_argument('-p','--partition-dir',type=str,default='part')
parser.add_argument('-f','--filter',type=str,required=True)
parser.add_argument('-i','--input-parquet',type=str,required=True)
parser.add_argument('-d','--destination',type=str,required=True)
parser.add_argument('-n','--dry-run',action='store_true',default=False)
args = parser.parse_args()
return args
def create_sync_cmd(path,filter,dest):
path_rn = re.sub(filter,dest,path)
cmd = f"sync --raw {path} {path_rn}"
return cmd
def main():
args = parse_args()
split_count = args.split_count
part_dir = args.partition_dir
input_parquet = args.input_parquet
dest = re.sub(r'/$','',args.destination)
filter = re.sub(r'/$','',args.filter)
ddf = dd.read_parquet(input_parquet)
ddf = ddf.loc[ddf['path'].str.startswith(filter)].sort_values('path')
ddf = ddf.loc[~ddf['mode'].str.startswith('d')].reset_index(drop=True)
ddf['group'] = np.floor(ddf.index/split_count).astype(int)
ddf['cmd'] = ddf['path'].map(lambda x: create_sync_cmd(x, filter=filter, dest=dest), meta=str)
df = ddf[['group','cmd']].compute()
os.makedirs(part_dir,exist_ok=True)
grouped = df.groupby('group')
# Iterate over each group
for group_number, group_data in grouped:
# Extract the 'value' column as a list
values = group_data['cmd'].tolist()
# Create a file name using the group number
file_name = f'part_{group_number}.txt'
# Write the values to the text file
with open(os.path.join(part_dir,file_name), 'wt') as f:
for value in values:
f.write(f"{value}\n")
array_sh = f"""#!/bin/bash
#
#SBATCH --job-name=s5-array-%a
#SBATCH --partition=amd-hdr100
#SBATCH --ntasks=8
#SBATCH --mem=16G
#SBATCH --time=02:00:00
#SBATCH --output=out/%A-%a.out
#SBATCH --error=err/%A-%a.err
#SBATCH --array=0-{df['group'].max()}%10
module load Anaconda3
conda activate s3
s5cmd --nworkers 8 --endpoint-url https://s3.lts.rc.uab.edu run {part_dir}/part_${{SLURM_ARRAY_TASK_ID}}.txt
"""
with open('s5cmd_array.sh','wt') as f:
f.write(array_sh)
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment