diff --git a/transfer-gpfs-with-s5cmd/fpart-db.py b/transfer-gpfs-with-s5cmd/fpart-db.py new file mode 100755 index 0000000000000000000000000000000000000000..11ecc9e238d9948b8d44f4b6a22ca4402688773a --- /dev/null +++ b/transfer-gpfs-with-s5cmd/fpart-db.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +import dask.dataframe as dd +import numpy as np +import re +import os +import argparse + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument('-c','--split-count',type=int,default=10000) + parser.add_argument('-p','--partition-dir',type=str,default='part') + parser.add_argument('-f','--filter',type=str,required=True) + parser.add_argument('-i','--input-parquet',type=str,required=True) + parser.add_argument('-d','--destination',type=str,required=True) + parser.add_argument('-n','--dry-run',action='store_true',default=False) + args = parser.parse_args() + return args + +def create_sync_cmd(path,filter,dest): + path_rn = re.sub(filter,dest,path) + cmd = f"sync --raw {path} {path_rn}" + return cmd + +def main(): + args = parse_args() + + split_count = args.split_count + part_dir = args.partition_dir + input_parquet = args.input_parquet + dest = re.sub(r'/*$','',args.destination) + filter = re.sub(r'/*$','',args.filter) + + + ddf = dd.read_parquet(input_parquet) + ddf = ddf.loc[ddf['path'].str.startswith(filter)].sort_values('path') + + ddf = ddf.loc[~ddf['mode'].str.startswith('d')] + + ddf['cmd'] = ddf['path'].map(lambda x: create_sync_cmd(x, filter=filter, dest=dest), meta=str) + + df = ddf[['cmd']].compute().reset_index(drop=True) + df['group'] = np.floor(df.index/split_count).astype(int) + 1 + + + os.makedirs(part_dir,exist_ok=True) + grouped = df.groupby('group') + + # Iterate over each group + for group_number, group_data in grouped: + # Extract the 'value' column as a list + values = group_data['cmd'].tolist() + + # Create a file name using the group number + file_name = f'part_{group_number}.txt' + + # Write the values to the text file + with open(os.path.join(part_dir,file_name), 'wt') as f: + for value in values: + f.write(f"{value}\n") + +if __name__ == "__main__": + main() diff --git a/transfer-gpfs-with-s5cmd/run-fpart-db.sh b/transfer-gpfs-with-s5cmd/run-fpart-db.sh new file mode 100755 index 0000000000000000000000000000000000000000..e89f2b7c3b2db7bb060ef942ab379c52ea9ab622 --- /dev/null +++ b/transfer-gpfs-with-s5cmd/run-fpart-db.sh @@ -0,0 +1,188 @@ +#!/bin/bash + +set -euo pipefail + +############################################################ +# Default Values # +############################################################ + +ntasks=8 +mem="16G" +time="12:00:00" +partition="amd-hdr100" +split_count=10000 +part_dir='./part' +sif="gitlab.rc.uab.edu:4567/mdefende/gpfs-policy:latest" +credentials_file="${HOME}/.aws/credentials" +profile="default" + +############################################################ +# Help # +############################################################ +usage() +{ +>&2 cat << EOF +Usage: $0 [ -h ] [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ] + [ -c | --split-count ] [ -d | --part-dir ] + [ -a | --aws-credentials-file ] [ -u | --credentials-profile ] + filter input_parquet destination +EOF +} + +help() +{ +# Display Help +>&2 cat << EOF +Submits an array job to transfer files listed in a GPFS dataset to a bucket on LTS using s5cmd +Usage: $0 [ -h ] [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ] + [ -c | --split-count ] [ -d | --part-dir ] + [ -a | --aws-credentials-file ] [ -u | --credentials-profile ] + filter input_parquet destination + +General: + -h|--help Print this Help. + --dry-run Only print + +Required: + filter Parent path to transfer. For example, /scratch/user1 will transfer all files in the GPFS log + that begin with /scratch/user1. Object prefixes will retain all subdirectory listings directly + underneath the filter. For example, a file with absolute path /scratch/user1/subdir1/file.txt + will be synced to an LTS bucket with prefix /bucket/subdir1. + input_parquet Path to the GPFS parquet dataset to read from + destination URI to sync data to. Only LTS buckets for now. Should be specified as 's3://bucket[/prefix] where + any additional prefix is optional. + +File Partition: + -c|--split-count Number of files to sync in each s5cmd partition (default: 10000) + -d|--part-dir Location to store the partition files (default: ./part) + +s5cmd: + -a|--aws-credentials-file Path to AWS credentials file containing the access and secret keys (default: $HOME/.aws/credentials) + -u|--credentials-profile Profile to use in the AWS credentials (default: default) + +Job Parameters for File Transfer: + -n|--ntasks Number of tasks for each array index (default: 8) + -p|--partition Partition to submit tasks to (default: amd-hdr100) + -t|--time Max walltime (default: 12:00:00) + -m|--mem Memory for each task (default: 16G) +EOF +exit 0 +} + +args=$(getopt -a -o hn:p:t:m:c:d:a:u: --long help,ntasks:,partition:,time:,mem:,split-count:,part-dir:,aws-credentials-file:,credentials-profile: -- "$@") +if [[ $? -gt 0 ]]; then + usage +fi + +eval set -- ${args} + +while : +do + case $1 in + -h | --help) help ;; + -n | --ntasks) ntasks=$2 ; shift 2 ;; + -p | --partition) partition=$2 ; shift 2 ;; + -t | --time) time=$2 ; shift 2 ;; + -m | --mem) mem=$2 ; shift 2 ;; + -c | --split-count) split_count=$2 ; shift 2 ;; + -d | --part-dir) part_dir=$2 ; shift 2 ;; + -a | --aws-credentials-file) credentials_file=$2 ; shift 2 ;; + -u | --credentials-profile) profile=$2 ; shift 2 ;; + --) shift; break ;; + *) >&2 echo Unsupported option: $1 + usage ;; + esac +done + +if [[ $# -eq 0 ]]; then + usage +fi + +filter="$1" +input_parquet="$2" +destination="$3" + +# Ensure positional arguments are set +if [[ -z $filter || -z $input_parquet || -z $destination ]]; then + echo "Missing positional argument" + usage + exit 1 +fi + +singularity pull --force gpfs.sif docker://${sif} + +split_cmd="singularity exec --bind /data,/scratch \ + gpfs.sif python3 fpart-db.py \ + -c ${split_count} \ + -p ${part_dir} \ + -f ${filter} \ + -i ${input_parquet} \ + -d ${destination}" + +transfer_cmd="singularity exec --bind /data,/scratch \ + gpfs.sif s5cmd \ + --numworkers ${ntasks} \ + --endpoint-url https://s3.lts.rc.uab.edu \ + --credentials-file ${credentials_file} \ + --profile $profile \ + --retry-count 3 \ + run ${part_dir}/part_\${SLURM_ARRAY_TASK_ID}.txt" + +>&2 cat << EOF +-------------------------------------------------------------------------------- +filter: ${filter} +input parquet: ${input_parquet} +destination: ${destination} + +sif: ${sif} + +split count: ${split_count} +partition dir: ${part_dir} + +credentials file: ${credentials_file} +credentials profile: ${profile} + +ntasks: ${ntasks} +partition: ${partition} +time: ${time} +mem: ${mem} + +split dataset command: +$(printf "%s" "${split_cmd}") + +transfer command: +$(printf "%s" "${transfer_cmd}") +-------------------------------------------------------------------------------- +EOF + +mkdir -p out +mkdir -p err + +############################################################ +# Split Dataset # +############################################################ + +$split_cmd + +nparts=$(ls ${part_dir}/part* | wc -l) + +############################################################ +# Create Array Job Script # +############################################################ + +{ cat; } << EOF +#!/bin/bash +# +#SBATCH --job-name=s5-array-%a +#SBATCH --ntasks=${ntasks} +#SBATCH --partition=${partition} +#SBATCH --time=${time} +#SBATCH --mem=${mem} +#SBATCH --output=out/%A_%a.out +#SBATCH --error=err/%A_%a.err +#SBATCH --array=1-${nparts} + +${transfer_cmd} +EOF + +exit 0 \ No newline at end of file