Skip to content
Snippets Groups Projects
Commit b56e9346 authored by Matthew K Defenderfer's avatar Matthew K Defenderfer
Browse files

rename directory

parent dea299f6
No related branches found
No related tags found
1 merge request!9Draft: Partition parquet dataset for sync with s5cmd
#!/usr/bin/env python3
import dask.dataframe as dd
import numpy as np
import re
import os
import argparse
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('-c','--split-count',type=int,default=10000)
parser.add_argument('-p','--partition-dir',type=str,default='part')
parser.add_argument('-f','--filter',type=str,required=True)
parser.add_argument('-i','--input-parquet',type=str,required=True)
parser.add_argument('-d','--destination',type=str,required=True)
parser.add_argument('-n','--dry-run',action='store_true',default=False)
args = parser.parse_args()
return args
def create_sync_cmd(path,filter,dest):
path_rn = re.sub(filter,dest,path)
cmd = f"sync --raw {path} {path_rn}"
return cmd
def main():
args = parse_args()
split_count = args.split_count
part_dir = args.partition_dir
input_parquet = args.input_parquet
dest = re.sub(r'/*$','',args.destination)
filter = re.sub(r'/*$','',args.filter)
ddf = dd.read_parquet(input_parquet)
ddf = ddf.loc[ddf['path'].str.startswith(filter)].sort_values('path')
ddf = ddf.loc[~ddf['mode'].str.startswith('d')]
ddf['cmd'] = ddf['path'].map(lambda x: create_sync_cmd(x, filter=filter, dest=dest), meta=str)
df = ddf[['cmd']].compute().reset_index(drop=True)
df['group'] = np.floor(df.index/split_count).astype(int) + 1
os.makedirs(part_dir,exist_ok=True)
grouped = df.groupby('group')
# Iterate over each group
for group_number, group_data in grouped:
# Extract the 'value' column as a list
values = group_data['cmd'].tolist()
# Create a file name using the group number
file_name = f'part_{group_number}.txt'
# Write the values to the text file
with open(os.path.join(part_dir,file_name), 'wt') as f:
for value in values:
f.write(f"{value}\n")
if __name__ == "__main__":
main()
#!/bin/bash
set -euo pipefail
############################################################
# Default Values #
############################################################
ntasks=8
mem="16G"
time="12:00:00"
partition="amd-hdr100"
split_count=10000
part_dir='./part'
sif="gitlab.rc.uab.edu:4567/mdefende/gpfs-policy:latest"
credentials_file="${HOME}/.aws/credentials"
profile="default"
############################################################
# Help #
############################################################
usage()
{
>&2 cat << EOF
Usage: $0 [ -h ] [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ]
[ -c | --split-count ] [ -d | --part-dir ]
[ -a | --aws-credentials-file ] [ -u | --credentials-profile ]
filter input_parquet destination
EOF
}
help()
{
# Display Help
>&2 cat << EOF
Submits an array job to transfer files listed in a GPFS dataset to a bucket on LTS using s5cmd
Usage: $0 [ -h ] [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ]
[ -c | --split-count ] [ -d | --part-dir ]
[ -a | --aws-credentials-file ] [ -u | --credentials-profile ]
filter input_parquet destination
General:
-h|--help Print this Help.
--dry-run Only print
Required:
filter Parent path to transfer. For example, /scratch/user1 will transfer all files in the GPFS log
that begin with /scratch/user1. Object prefixes will retain all subdirectory listings directly
underneath the filter. For example, a file with absolute path /scratch/user1/subdir1/file.txt
will be synced to an LTS bucket with prefix /bucket/subdir1.
input_parquet Path to the GPFS parquet dataset to read from
destination URI to sync data to. Only LTS buckets for now. Should be specified as 's3://bucket[/prefix] where
any additional prefix is optional.
File Partition:
-c|--split-count Number of files to sync in each s5cmd partition (default: 10000)
-d|--part-dir Location to store the partition files (default: ./part)
s5cmd:
-a|--aws-credentials-file Path to AWS credentials file containing the access and secret keys (default: $HOME/.aws/credentials)
-u|--credentials-profile Profile to use in the AWS credentials (default: default)
Job Parameters for File Transfer:
-n|--ntasks Number of tasks for each array index (default: 8)
-p|--partition Partition to submit tasks to (default: amd-hdr100)
-t|--time Max walltime (default: 12:00:00)
-m|--mem Memory for each task (default: 16G)
EOF
exit 0
}
args=$(getopt -a -o hn:p:t:m:c:d:a:u: --long help,ntasks:,partition:,time:,mem:,split-count:,part-dir:,aws-credentials-file:,credentials-profile: -- "$@")
if [[ $? -gt 0 ]]; then
usage
fi
eval set -- ${args}
while :
do
case $1 in
-h | --help) help ;;
-n | --ntasks) ntasks=$2 ; shift 2 ;;
-p | --partition) partition=$2 ; shift 2 ;;
-t | --time) time=$2 ; shift 2 ;;
-m | --mem) mem=$2 ; shift 2 ;;
-c | --split-count) split_count=$2 ; shift 2 ;;
-d | --part-dir) part_dir=$2 ; shift 2 ;;
-a | --aws-credentials-file) credentials_file=$2 ; shift 2 ;;
-u | --credentials-profile) profile=$2 ; shift 2 ;;
--) shift; break ;;
*) >&2 echo Unsupported option: $1
usage ;;
esac
done
if [[ $# -eq 0 ]]; then
usage
fi
filter="$1"
input_parquet="$2"
destination="$3"
# Ensure positional arguments are set
if [[ -z $filter || -z $input_parquet || -z $destination ]]; then
echo "Missing positional argument"
usage
exit 1
fi
singularity pull --force gpfs.sif docker://${sif}
split_cmd="singularity exec --bind /data,/scratch \
gpfs.sif python3 fpart-db.py \
-c ${split_count} \
-p ${part_dir} \
-f ${filter} \
-i ${input_parquet} \
-d ${destination}"
transfer_cmd="singularity exec --bind /data,/scratch \
gpfs.sif s5cmd \
--numworkers ${ntasks} \
--endpoint-url https://s3.lts.rc.uab.edu \
--credentials-file ${credentials_file} \
--profile $profile \
--retry-count 3 \
run ${part_dir}/part_\${SLURM_ARRAY_TASK_ID}.txt"
>&2 cat << EOF
--------------------------------------------------------------------------------
filter: ${filter}
input parquet: ${input_parquet}
destination: ${destination}
sif: ${sif}
split count: ${split_count}
partition dir: ${part_dir}
credentials file: ${credentials_file}
credentials profile: ${profile}
ntasks: ${ntasks}
partition: ${partition}
time: ${time}
mem: ${mem}
split dataset command:
$(printf "%s" "${split_cmd}")
transfer command:
$(printf "%s" "${transfer_cmd}")
--------------------------------------------------------------------------------
EOF
mkdir -p out
mkdir -p err
############################################################
# Split Dataset #
############################################################
$split_cmd
nparts=$(ls ${part_dir}/part* | wc -l)
############################################################
# Create Array Job Script #
############################################################
{ cat; } << EOF
#!/bin/bash
#
#SBATCH --job-name=s5-array-%a
#SBATCH --ntasks=${ntasks}
#SBATCH --partition=${partition}
#SBATCH --time=${time}
#SBATCH --mem=${mem}
#SBATCH --output=out/%A_%a.out
#SBATCH --error=err/%A_%a.err
#SBATCH --array=1-${nparts}
${transfer_cmd}
EOF
exit 0
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment