Skip to content
Snippets Groups Projects

Draft: Partition parquet dataset for sync with s5cmd

Closed Matthew K Defenderfer requested to merge partition-parquet-dataset into main
Files
6
@@ -27,23 +27,19 @@ def main():
split_count = args.split_count
part_dir = args.partition_dir
input_parquet = args.input_parquet
dest = re.sub(r'/$','',args.destination)
filter = re.sub(r'/$','',args.filter)
dest = re.sub(r'/*$','',args.destination)
filter = re.sub(r'/*$','',args.filter)
ddf = dd.read_parquet(input_parquet)
ddf = ddf.loc[ddf['path'].str.startswith(filter)].sort_values('path')
ddf = ddf.loc[~ddf['mode'].str.startswith('d')].reset_index(drop=True)
ddf['group'] = np.floor(ddf.index/split_count).astype(int)
ddf = ddf.loc[~ddf['mode'].str.startswith('d')]
ddf['cmd'] = ddf['path'].map(lambda x: create_sync_cmd(x, filter=filter, dest=dest), meta=str)
df = ddf[['group','cmd']].compute()
df = ddf[['cmd']].compute().reset_index(drop=True)
df['group'] = np.floor(df.index/split_count).astype(int) + 1
os.makedirs(part_dir,exist_ok=True)
@@ -62,28 +58,5 @@ def main():
for value in values:
f.write(f"{value}\n")
array_sh = f"""#!/bin/bash
#
#SBATCH --job-name=s5-array-%a
#SBATCH --partition=amd-hdr100
#SBATCH --ntasks=8
#SBATCH --mem=16G
#SBATCH --time=02:00:00
#SBATCH --output=out/%A-%a.out
#SBATCH --error=err/%A-%a.err
#SBATCH --array=0-{df['group'].max()}%10
module load Anaconda3
conda activate s3
s5cmd --nworkers 8 --endpoint-url https://s3.lts.rc.uab.edu run {part_dir}/part_${{SLURM_ARRAY_TASK_ID}}.txt
"""
with open('s5cmd_array.sh','wt') as f:
f.write(array_sh)
if __name__ == "__main__":
main()
Loading