diff --git a/prep-parquet-for-s5cmd/fpart-db.py b/prep-parquet-for-s5cmd/fpart-db.py new file mode 100755 index 0000000000000000000000000000000000000000..3e5e6c23bc7c01b21dca66de71785eda952995b0 --- /dev/null +++ b/prep-parquet-for-s5cmd/fpart-db.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +import dask.dataframe as dd +import numpy as np +import re +import os +import argparse + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument('-c','--split-count',type=int,default=10000) + parser.add_argument('-p','--partition-dir',type=str,default='part') + parser.add_argument('-f','--filter',type=str,required=True) + parser.add_argument('-i','--input-parquet',type=str,required=True) + parser.add_argument('-d','--destination',type=str,required=True) + parser.add_argument('-n','--dry-run',action='store_true',default=False) + args = parser.parse_args() + return args + +def create_sync_cmd(path,filter,dest): + path_rn = re.sub(filter,dest,path) + cmd = f"sync --raw {path} {path_rn}" + return cmd + +def main(): + args = parse_args() + + split_count = args.split_count + part_dir = args.partition_dir + input_parquet = args.input_parquet + dest = re.sub(r'/$','',args.destination) + filter = re.sub(r'/$','',args.filter) + + + ddf = dd.read_parquet(input_parquet) + ddf = ddf.loc[ddf['path'].str.startswith(filter)].sort_values('path') + + + ddf = ddf.loc[~ddf['mode'].str.startswith('d')].reset_index(drop=True) + + + ddf['group'] = np.floor(ddf.index/split_count).astype(int) + + ddf['cmd'] = ddf['path'].map(lambda x: create_sync_cmd(x, filter=filter, dest=dest), meta=str) + + + df = ddf[['group','cmd']].compute() + + + os.makedirs(part_dir,exist_ok=True) + grouped = df.groupby('group') + + # Iterate over each group + for group_number, group_data in grouped: + # Extract the 'value' column as a list + values = group_data['cmd'].tolist() + + # Create a file name using the group number + file_name = f'part_{group_number}.txt' + + # Write the values to the text file + with open(os.path.join(part_dir,file_name), 'wt') as f: + for value in values: + f.write(f"{value}\n") + + + array_sh = f"""#!/bin/bash + # + #SBATCH --job-name=s5-array-%a + #SBATCH --partition=amd-hdr100 + #SBATCH --ntasks=8 + #SBATCH --mem=16G + #SBATCH --time=02:00:00 + #SBATCH --output=out/%A-%a.out + #SBATCH --error=err/%A-%a.err + #SBATCH --array=0-{df['group'].max()}%10 + + module load Anaconda3 + conda activate s3 + + s5cmd --nworkers 8 --endpoint-url https://s3.lts.rc.uab.edu run {part_dir}/part_${{SLURM_ARRAY_TASK_ID}}.txt + """ + + + with open('s5cmd_array.sh','wt') as f: + f.write(array_sh) + + +if __name__ == "__main__": + main()