From 4c9132045c0c770e0992797927f8f0db7acf906b Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Tue, 20 Aug 2024 13:42:23 -0500 Subject: [PATCH] rename directory --- prep-parquet-for-s5cmd/fpart-db.py | 62 -------- prep-parquet-for-s5cmd/run-fpart-db.sh | 188 ------------------------- 2 files changed, 250 deletions(-) delete mode 100755 prep-parquet-for-s5cmd/fpart-db.py delete mode 100755 prep-parquet-for-s5cmd/run-fpart-db.sh diff --git a/prep-parquet-for-s5cmd/fpart-db.py b/prep-parquet-for-s5cmd/fpart-db.py deleted file mode 100755 index 11ecc9e..0000000 --- a/prep-parquet-for-s5cmd/fpart-db.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/env python3 -import dask.dataframe as dd -import numpy as np -import re -import os -import argparse - -def parse_args(): - parser = argparse.ArgumentParser() - parser.add_argument('-c','--split-count',type=int,default=10000) - parser.add_argument('-p','--partition-dir',type=str,default='part') - parser.add_argument('-f','--filter',type=str,required=True) - parser.add_argument('-i','--input-parquet',type=str,required=True) - parser.add_argument('-d','--destination',type=str,required=True) - parser.add_argument('-n','--dry-run',action='store_true',default=False) - args = parser.parse_args() - return args - -def create_sync_cmd(path,filter,dest): - path_rn = re.sub(filter,dest,path) - cmd = f"sync --raw {path} {path_rn}" - return cmd - -def main(): - args = parse_args() - - split_count = args.split_count - part_dir = args.partition_dir - input_parquet = args.input_parquet - dest = re.sub(r'/*$','',args.destination) - filter = re.sub(r'/*$','',args.filter) - - - ddf = dd.read_parquet(input_parquet) - ddf = ddf.loc[ddf['path'].str.startswith(filter)].sort_values('path') - - ddf = ddf.loc[~ddf['mode'].str.startswith('d')] - - ddf['cmd'] = ddf['path'].map(lambda x: create_sync_cmd(x, filter=filter, dest=dest), meta=str) - - df = ddf[['cmd']].compute().reset_index(drop=True) - df['group'] = np.floor(df.index/split_count).astype(int) + 1 - - - os.makedirs(part_dir,exist_ok=True) - grouped = df.groupby('group') - - # Iterate over each group - for group_number, group_data in grouped: - # Extract the 'value' column as a list - values = group_data['cmd'].tolist() - - # Create a file name using the group number - file_name = f'part_{group_number}.txt' - - # Write the values to the text file - with open(os.path.join(part_dir,file_name), 'wt') as f: - for value in values: - f.write(f"{value}\n") - -if __name__ == "__main__": - main() diff --git a/prep-parquet-for-s5cmd/run-fpart-db.sh b/prep-parquet-for-s5cmd/run-fpart-db.sh deleted file mode 100755 index e89f2b7..0000000 --- a/prep-parquet-for-s5cmd/run-fpart-db.sh +++ /dev/null @@ -1,188 +0,0 @@ -#!/bin/bash - -set -euo pipefail - -############################################################ -# Default Values # -############################################################ - -ntasks=8 -mem="16G" -time="12:00:00" -partition="amd-hdr100" -split_count=10000 -part_dir='./part' -sif="gitlab.rc.uab.edu:4567/mdefende/gpfs-policy:latest" -credentials_file="${HOME}/.aws/credentials" -profile="default" - -############################################################ -# Help # -############################################################ -usage() -{ ->&2 cat << EOF -Usage: $0 [ -h ] [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ] - [ -c | --split-count ] [ -d | --part-dir ] - [ -a | --aws-credentials-file ] [ -u | --credentials-profile ] - filter input_parquet destination -EOF -} - -help() -{ -# Display Help ->&2 cat << EOF -Submits an array job to transfer files listed in a GPFS dataset to a bucket on LTS using s5cmd -Usage: $0 [ -h ] [ -n | --ntasks ] [ -p | --partition] [ -t | --time ] [ -m | --mem ] - [ -c | --split-count ] [ -d | --part-dir ] - [ -a | --aws-credentials-file ] [ -u | --credentials-profile ] - filter input_parquet destination - -General: - -h|--help Print this Help. - --dry-run Only print - -Required: - filter Parent path to transfer. For example, /scratch/user1 will transfer all files in the GPFS log - that begin with /scratch/user1. Object prefixes will retain all subdirectory listings directly - underneath the filter. For example, a file with absolute path /scratch/user1/subdir1/file.txt - will be synced to an LTS bucket with prefix /bucket/subdir1. - input_parquet Path to the GPFS parquet dataset to read from - destination URI to sync data to. Only LTS buckets for now. Should be specified as 's3://bucket[/prefix] where - any additional prefix is optional. - -File Partition: - -c|--split-count Number of files to sync in each s5cmd partition (default: 10000) - -d|--part-dir Location to store the partition files (default: ./part) - -s5cmd: - -a|--aws-credentials-file Path to AWS credentials file containing the access and secret keys (default: $HOME/.aws/credentials) - -u|--credentials-profile Profile to use in the AWS credentials (default: default) - -Job Parameters for File Transfer: - -n|--ntasks Number of tasks for each array index (default: 8) - -p|--partition Partition to submit tasks to (default: amd-hdr100) - -t|--time Max walltime (default: 12:00:00) - -m|--mem Memory for each task (default: 16G) -EOF -exit 0 -} - -args=$(getopt -a -o hn:p:t:m:c:d:a:u: --long help,ntasks:,partition:,time:,mem:,split-count:,part-dir:,aws-credentials-file:,credentials-profile: -- "$@") -if [[ $? -gt 0 ]]; then - usage -fi - -eval set -- ${args} - -while : -do - case $1 in - -h | --help) help ;; - -n | --ntasks) ntasks=$2 ; shift 2 ;; - -p | --partition) partition=$2 ; shift 2 ;; - -t | --time) time=$2 ; shift 2 ;; - -m | --mem) mem=$2 ; shift 2 ;; - -c | --split-count) split_count=$2 ; shift 2 ;; - -d | --part-dir) part_dir=$2 ; shift 2 ;; - -a | --aws-credentials-file) credentials_file=$2 ; shift 2 ;; - -u | --credentials-profile) profile=$2 ; shift 2 ;; - --) shift; break ;; - *) >&2 echo Unsupported option: $1 - usage ;; - esac -done - -if [[ $# -eq 0 ]]; then - usage -fi - -filter="$1" -input_parquet="$2" -destination="$3" - -# Ensure positional arguments are set -if [[ -z $filter || -z $input_parquet || -z $destination ]]; then - echo "Missing positional argument" - usage - exit 1 -fi - -singularity pull --force gpfs.sif docker://${sif} - -split_cmd="singularity exec --bind /data,/scratch \ - gpfs.sif python3 fpart-db.py \ - -c ${split_count} \ - -p ${part_dir} \ - -f ${filter} \ - -i ${input_parquet} \ - -d ${destination}" - -transfer_cmd="singularity exec --bind /data,/scratch \ - gpfs.sif s5cmd \ - --numworkers ${ntasks} \ - --endpoint-url https://s3.lts.rc.uab.edu \ - --credentials-file ${credentials_file} \ - --profile $profile \ - --retry-count 3 \ - run ${part_dir}/part_\${SLURM_ARRAY_TASK_ID}.txt" - ->&2 cat << EOF --------------------------------------------------------------------------------- -filter: ${filter} -input parquet: ${input_parquet} -destination: ${destination} - -sif: ${sif} - -split count: ${split_count} -partition dir: ${part_dir} - -credentials file: ${credentials_file} -credentials profile: ${profile} - -ntasks: ${ntasks} -partition: ${partition} -time: ${time} -mem: ${mem} - -split dataset command: -$(printf "%s" "${split_cmd}") - -transfer command: -$(printf "%s" "${transfer_cmd}") --------------------------------------------------------------------------------- -EOF - -mkdir -p out -mkdir -p err - -############################################################ -# Split Dataset # -############################################################ - -$split_cmd - -nparts=$(ls ${part_dir}/part* | wc -l) - -############################################################ -# Create Array Job Script # -############################################################ - -{ cat; } << EOF -#!/bin/bash -# -#SBATCH --job-name=s5-array-%a -#SBATCH --ntasks=${ntasks} -#SBATCH --partition=${partition} -#SBATCH --time=${time} -#SBATCH --mem=${mem} -#SBATCH --output=out/%A_%a.out -#SBATCH --error=err/%A_%a.err -#SBATCH --array=1-${nparts} - -${transfer_cmd} -EOF - -exit 0 \ No newline at end of file -- GitLab