From e2f22b0c316ae886c483e395aa98911761131a05 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Fri, 7 Feb 2025 15:03:50 -0600 Subject: [PATCH 01/10] test CLI commands to fix install issues --- pyproject.toml | 1 + src/rc_gpfs/cli/__init__.py | 8 ++++---- src/rc_gpfs/cli/test_cli.py | 15 +++++++++++++++ 3 files changed, 20 insertions(+), 4 deletions(-) create mode 100644 src/rc_gpfs/cli/test_cli.py diff --git a/pyproject.toml b/pyproject.toml index cfa6b73..394bb84 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,7 @@ convert-to-hive = "rc_gpfs.cli.convert_flat_to_hive:convert_flat_to_hive" convert-to-parquet = "rc_gpfs.cli.convert_to_parquet:convert_to_parquet" split-log = "rc_gpfs.cli.split_log:split_log" fparq = "rc_gpfs.cli.fparq_cli:fparq_cli" +test-cli = "rc_gpfs.cli.test_cli:main" [tool.poetry.requires-plugins] poetry-dynamic-versioning = { version = ">=1.0.0,<2.0.0", extras = ["plugin"] } diff --git a/src/rc_gpfs/cli/__init__.py b/src/rc_gpfs/cli/__init__.py index c5e66db..fe74913 100644 --- a/src/rc_gpfs/cli/__init__.py +++ b/src/rc_gpfs/cli/__init__.py @@ -1,4 +1,4 @@ -from .convert_flat_to_hive import convert_flat_to_hive -from .convert_to_parquet import convert_to_parquet -from .split_log import split_log -from .fparq_cli import fparq_cli \ No newline at end of file +#from .convert_flat_to_hive import convert_flat_to_hive +#from .convert_to_parquet import convert_to_parquet +#from .split_log import split_log +#from .fparq_cli import fparq_cli \ No newline at end of file diff --git a/src/rc_gpfs/cli/test_cli.py b/src/rc_gpfs/cli/test_cli.py new file mode 100644 index 0000000..717d4bd --- /dev/null +++ b/src/rc_gpfs/cli/test_cli.py @@ -0,0 +1,15 @@ +import argparse +from .utils import define_python_interpreter,batch_parser,setup_slurm_logs + +def parse_args(): + parser = argparse.ArgumentParser( + description="Meh", + formatter_class=argparse.RawTextHelpFormatter, + parents=[batch_parser(cpus_per_task=16, gpus=1, partition='amperenodes', mem='90G')] + ) + + args = parser.parse_args() + return vars(args) + +def main(): + args = parse_args() -- GitLab From 5827a0979a2b5eb1bbe10359ca4f2fdcff3b59a1 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Tue, 18 Feb 2025 10:59:04 -0600 Subject: [PATCH 02/10] remove extra scripts section --- pyproject.toml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 394bb84..c27f907 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,13 +48,6 @@ name="rapids" url="https://pypi.nvidia.com" priority = "supplemental" -[tool.poetry.scripts] -convert-to-hive = "rc_gpfs.cli.convert_flat_to_hive:convert_flat_to_hive" -convert-to-parquet = "rc_gpfs.cli.convert_to_parquet:convert_to_parquet" -split-log = "rc_gpfs.cli.split_log:split_log" -fparq = "rc_gpfs.cli.fparq_cli:fparq_cli" -test-cli = "rc_gpfs.cli.test_cli:main" - [tool.poetry.requires-plugins] poetry-dynamic-versioning = { version = ">=1.0.0,<2.0.0", extras = ["plugin"] } -- GitLab From 4bb354ef55bc56e701ac65d5b92330cf72f6245a Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Tue, 18 Feb 2025 10:59:32 -0600 Subject: [PATCH 03/10] reference directly to the script instead of the init to avoid loading all cli modules and dependencies --- pyproject.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c27f907..a5f1f62 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,10 +19,10 @@ dynamic = ["version","dependencies","classifiers"] repository = "https://gitlab.rc.uab.edu/rc/gpfs-policy" [project.scripts] -convert-to-hive = "rc_gpfs.cli:convert_flat_to_hive" -convert-to-parquet = "rc_gpfs.cli:convert_to_parquet" -split-log = "rc_gpfs.cli:split_log" -gpfspart = "rc_gpfs.cli:gpfs_fpart" +convert-to-hive = "rc_gpfs.cli.convert_flat_to_hive:convert_flat_to_hive" +convert-to-parquet = "rc_gpfs.cli.convert_to_parquet:convert_to_parquet" +split-log = "rc_gpfs.cli.split_log:split_log" +fparq = "rc_gpfs.cli.fparq_cli:fparq_cli" [tool.poetry] requires-poetry = ">=2.0" -- GitLab From 0e43f597f0cbea117712092040741273f5066fe0 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Tue, 18 Feb 2025 11:00:00 -0600 Subject: [PATCH 04/10] reference down to split.py instead of module init --- src/rc_gpfs/cli/split_log.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rc_gpfs/cli/split_log.py b/src/rc_gpfs/cli/split_log.py index c0cc64a..90c2fee 100644 --- a/src/rc_gpfs/cli/split_log.py +++ b/src/rc_gpfs/cli/split_log.py @@ -3,7 +3,7 @@ import argparse import subprocess from pathlib import Path from .utils import define_python_interpreter,batch_parser,setup_slurm_logs -from ..policy import split +from ..policy.split import split BATCH_SCRIPT = """\ #!/bin/bash -- GitLab From ffce2092a6858ad7bca5da99bea4a754b8eaa400 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Tue, 18 Feb 2025 11:00:18 -0600 Subject: [PATCH 05/10] remove module loading from init for now --- src/rc_gpfs/policy/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rc_gpfs/policy/__init__.py b/src/rc_gpfs/policy/__init__.py index c24d1c3..1a7b96e 100644 --- a/src/rc_gpfs/policy/__init__.py +++ b/src/rc_gpfs/policy/__init__.py @@ -1,2 +1,2 @@ -from .split import split,compress_logs -from .convert import convert, hivize \ No newline at end of file +#from .split import split, compress_logs +#from .convert import convert, hivize \ No newline at end of file -- GitLab From 6a050689bb0ea15b45191b4eb176a444b72e3b2a Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Tue, 18 Feb 2025 11:00:25 -0600 Subject: [PATCH 06/10] remove test --- src/rc_gpfs/cli/test_cli.py | 15 --------------- 1 file changed, 15 deletions(-) delete mode 100644 src/rc_gpfs/cli/test_cli.py diff --git a/src/rc_gpfs/cli/test_cli.py b/src/rc_gpfs/cli/test_cli.py deleted file mode 100644 index 717d4bd..0000000 --- a/src/rc_gpfs/cli/test_cli.py +++ /dev/null @@ -1,15 +0,0 @@ -import argparse -from .utils import define_python_interpreter,batch_parser,setup_slurm_logs - -def parse_args(): - parser = argparse.ArgumentParser( - description="Meh", - formatter_class=argparse.RawTextHelpFormatter, - parents=[batch_parser(cpus_per_task=16, gpus=1, partition='amperenodes', mem='90G')] - ) - - args = parser.parse_args() - return vars(args) - -def main(): - args = parse_args() -- GitLab From df7ab8d969fd8efd12c02d0b5b386994537c0e1d Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Fri, 7 Mar 2025 11:39:57 -0600 Subject: [PATCH 07/10] split hive functions off from standard conversion function. This moves the group computation logic out of the CLI script and into the policy module where it belongs --- src/rc_gpfs/cli/convert_flat_to_hive.py | 66 +------ src/rc_gpfs/policy/convert.py | 191 +----------------- src/rc_gpfs/policy/hive.py | 249 ++++++++++++++++++++++++ 3 files changed, 255 insertions(+), 251 deletions(-) create mode 100644 src/rc_gpfs/policy/hive.py diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py index a10eee0..b676808 100644 --- a/src/rc_gpfs/cli/convert_flat_to_hive.py +++ b/src/rc_gpfs/cli/convert_flat_to_hive.py @@ -2,13 +2,7 @@ import argparse import subprocess from pathlib import Path -import dask.dataframe as dd -import dask.config -dask.config.set({'dataframe.backend':'cudf'}) -from dask.diagnostics import ProgressBar - from .utils import define_python_interpreter,batch_parser,setup_slurm_logs -from ..policy import hivize DESCRIPTION = """ Converts flat parquet GPFS datasets to a hive format partitioned by tld and log acquisition date. This essentially creates a timeseries of structured datasets for each tld for much easier more efficient log comparisons within tld. Each file path is set as the index and sorted, and all final output parquets are partitioned to have similar in-memory sizes. @@ -86,68 +80,18 @@ def submit_batch(**kwargs): subprocess.run(['sbatch'],input=script,shell=True,text=True) pass -def split_into_groups(series, cutoff): - groups = [] - - while len(series.index) > 0: - current_group = [] - current_sum = 0 - for username, storage_size in series.items(): - if storage_size > cutoff: - groups.append({username}) - series = series.drop(username) - break - elif current_sum + storage_size <= cutoff: - current_group.append(username) - current_sum += storage_size - - series = series.drop(current_group) - if current_group: - groups.append(set(current_group)) - - return groups - -def calc_tld_mem(df): - mem = df.groupby('tld',observed=True).apply(lambda x: x.memory_usage(deep=True).sum()) - return mem - -def define_tld_groups(input,cutoff): - ddf = dd.read_parquet(input,columns = ['size','kballoc','access','create','modify','uid','gid','path','tld']) - with ProgressBar(): - tld_mem = ddf.map_partitions(calc_tld_mem).compute() - tld_mem = tld_mem.groupby(tld_mem.index).sum().divide(1024**3).to_pandas() - grps = split_into_groups(tld_mem,cutoff) - return grps - -def nested_list_to_log(nest,file): - """ - Writes a list of lists to a text log - - Args: - nest (list): A list of lists to be converted. - """ - with open(file, 'w', newline='') as f: - for l in nest: - f.write(f"{','.join(l)}\n") - def convert_flat_to_hive(): args = parse_args() + from ..policy.hive import prep_hivize, hivize + if args.get('batch'): if not args.get('grp_file'): - grps = define_tld_groups(args.get('parquet_path'),args.get('cutoff')) - - misc_path = args.get('parquet_path').parent.joinpath('misc','tld_grps.txt') - misc_path.parent.mkdir(exist_ok = True, parents = True) - nested_list_to_log(grps,misc_path) - ngroups = len(grps) - grp_file = str(misc_path) - args.update({'ngroups':ngroups, - 'grp_file':grp_file}) + prep_hivize(args) else: ngroups = sum(1 for line in open(args.get('grp_file'))) - args.update({'ngroups':ngroups}) - + args.update({'ngroups':ngroups}) + submit_batch(**args) else: diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index 73c26d0..54f05f7 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -1,27 +1,11 @@ -import os import re import gzip -import json -import random -import string -import shutil from pathlib import Path -from typing import Literal, List from urllib.parse import unquote -import GPUtil -import cudf import pandas as pd -import numpy as np -import dask.dataframe as dd -import dask.config -import rmm - -import pyarrow as pa -import pyarrow.parquet as pq from .policy_defs import SCHEMA -from ..compute.backend import infer_cuda from ..utils import as_path def parse_line(line): @@ -83,177 +67,4 @@ def convert( df = pd.DataFrame.from_dict(dicts).sort_values('tld') df = df.astype(SCHEMA) - df.to_parquet(output_path,engine = 'pyarrow') - - -def hivize( - parquet_path: str | Path, - hive_path: str | Path, - tld: str | List[str] | None = None, - staging_path: str | Path | None = None, - partition_size: str = '100MiB', - with_cuda: bool | Literal['infer'] = 'infer', - write_metadata: bool = True, - **kwargs - ) -> None: - parquet_path = as_path(parquet_path) - hive_path = as_path(hive_path) - - if staging_path is None: - rand_str = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) - staging_path = Path(os.getenv('TMPDIR')).joinpath(os.getenv('USER'),f'hive-{rand_str}') - print(f"INFO: Using {staging_path} as temporary directory",flush=True) - else: - staging_path = as_path(staging_path) - - hive_path.mkdir(exist_ok=True,parents=True) - staging_path.mkdir(exist_ok=True,parents=True) - - if with_cuda == 'infer': - with_cuda = infer_cuda() - - if with_cuda: - import dask_cudf as backend - from dask_cudf.core import from_cudf as from_local - dask.config.set({'dataframe.backend':'cudf'}) - rmm.reinitialize( - pool_allocator=True, - managed_memory=True, - initial_pool_size='60GiB' - ) - else: - import dask as backend - from dask.dataframe import from_pandas as from_local - dask.config.set({'dataframe.backend':'pandas'}) - - def indexed_name(ind): - return f"indexed-{ind}.parquet" - - if tld is not None: - if not isinstance(tld,list): - tld = [tld] - predicates = [('tld','in',tld)] - else: - predicates = None - print(f"DEBUG: Filtering predicates are: {predicates}",flush=True) - - acq = re.search(r'[\d]{4}-[\d]{2}-[\d]{2}',str(parquet_path)).group(0) - print(f"DEBUG: Acquisition date is {acq}",flush=True) - - # The flat parquet is initially read in via dask to avoid reading the full dataset into memory which happens even - # when including predicates for filtering. The dask dataframe is converted to a regular dataframe to drastically - # improve indexing and sorting by removing partitions. The sorted dataframe is converted back to a dask dataframe - # to create partitions within the parquet dataset and write to multiple files defined by those partitions. - ddf = backend.read_parquet(parquet_path,filters=predicates,columns = ['size','kballoc','access','create','modify','uid','gid','path','tld']) - df = ddf.compute() - df = df.set_index('path').sort_index().assign(acq=acq) - ddf = from_local(df).repartition(partition_size=partition_size,force=True) - ddf.to_parquet(staging_path,partition_on=['tld','acq'],name_function = indexed_name) - - if write_metadata: - ( - df - .groupby(['tld','acq']) - .apply(lambda x: - write_dataset_metadata( - x, - staging_path.joinpath(f"tld={x['tld'].iloc[0]}",f"acq={x['acq'].iloc[0]}") - ) - ) - ) - - shutil.copytree(staging_path,hive_path,dirs_exist_ok=True) - shutil.rmtree(staging_path) - pass - -def bytes_to_human_readable_size(num_bytes): - units = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB'] - - # Handle the case where num_bytes is 0 - if num_bytes == 0: - return "0 B" - - # Calculate the appropriate unit. Take the floor if the number is not divisible by 1024 - unit_index = min(len(units) - 1, int(np.log2(num_bytes) // 10)) - num = int(num_bytes // (1024 ** unit_index)) - - # Format the number to 2 decimal places and append the unit - return f"{num} {units[unit_index]}" - -def create_bin_labels(bins, include_outer_bins = True): - if any([b < 0 for b in bins]): - raise ValueError("File size cannot be less than 0 B") - bins = list(set(bins)) - bins.sort() # Sorts and removes any duplicates - if np.isinf(bins[-1]): - bins.remove(np.inf) - - labels = [] - - # Check for including a bin for all files less than the smallest value if - if include_outer_bins and bins[0] != 0: - labels.append(f"0 B-{bytes_to_human_readable_size(bins[0])}") - - for i in range(len(bins) - 1): - lower_bound = bytes_to_human_readable_size(bins[i]) - upper_bound = bytes_to_human_readable_size(bins[i + 1]) - labels.append(f"{lower_bound}-{upper_bound}") - - if include_outer_bins: - labels.append(f">{bytes_to_human_readable_size(bins[-1])}") - return labels,bins - -def calculate_size_distribution( - sizes: pd.Series | cudf.Series, - size_bins: List[int | float] = [0,4096] + [x*1024**3 for x in [1,10,50,100]] + [1024**4], - include_outer_bins: bool = True, - **kwargs -) -> pd.Series | cudf.Series: - - size_labels,size_bins = create_bin_labels(size_bins,include_outer_bins) - - if include_outer_bins: - if size_bins[0] < 0: - size_bins.insert(0,0) - if float("inf") not in size_bins: - size_bins.append(float("inf")) - - fn = pd.cut if isinstance(sizes,pd.Series) else cudf.cut - size_grps = fn(sizes,bins=size_bins,labels=size_labels,ordered=True,include_lowest=True,**kwargs) - - return size_grps - -def write_dataset_metadata(df: cudf.DataFrame | pd.DataFrame, parquet_path: str | Path) -> dict: - parquet_path = as_path(parquet_path) - if isinstance(df,cudf.DataFrame): - vram_usage = df.memory_usage(deep=True).to_dict() - ram_usage = df.to_pandas().memory_usage(deep=True).to_dict() - else: - if len(GPUtil.getAvailable()) > 0: - vram_usage = cudf.from_pandas(df).memory_usage(deep=True).to_dict() - else: - vram_usage = 'NA' - ram_usage = df.memory_usage(deep=True).to_dict() - - df = df[['size']] - df['grp'] = calculate_size_distribution(df['size']) - - size_dist = ( - df - .groupby('grp',observed=True)['size'] - .agg(['count','sum']) - .rename(columns={'count':'file_count','sum':'bytes'}) - .T - .to_dict() - ) - - metadata = { - 'num_rows': df.shape[0], - 'num_columns': df.shape[1], - 'vram_usage': vram_usage, - 'ram_usage': ram_usage, - 'size_distribution': size_dist - } - - with open(parquet_path.joinpath('_metadata.json'),'w') as f: - json.dump(metadata,f) \ No newline at end of file + df.to_parquet(output_path,engine = 'pyarrow') \ No newline at end of file diff --git a/src/rc_gpfs/policy/hive.py b/src/rc_gpfs/policy/hive.py new file mode 100644 index 0000000..9dca089 --- /dev/null +++ b/src/rc_gpfs/policy/hive.py @@ -0,0 +1,249 @@ +import os +import re +import json +import random +import string +import shutil +from pathlib import Path +from typing import Literal, List +import GPUtil + +import cudf +import pandas as pd +import numpy as np + +import dask.dataframe as dd +import dask.config +dask.config.set({'dataframe.backend':'cudf'}) +from dask.diagnostics import ProgressBar +import rmm + +from ..compute.backend import infer_cuda +from ..utils import as_path + +def split_into_groups(series, cutoff): + groups = [] + + while len(series.index) > 0: + current_group = [] + current_sum = 0 + for username, storage_size in series.items(): + if storage_size > cutoff: + groups.append({username}) + series = series.drop(username) + break + elif current_sum + storage_size <= cutoff: + current_group.append(username) + current_sum += storage_size + + series = series.drop(current_group) + if current_group: + groups.append(set(current_group)) + + return groups + +def calc_tld_mem(df): + mem = df.groupby('tld',observed=True).apply(lambda x: x.memory_usage(deep=True).sum()) + return mem + +def define_tld_groups(input,cutoff): + ddf = dd.read_parquet(input,columns = ['size','kballoc','access','create','modify','uid','gid','path','tld']) + with ProgressBar(): + tld_mem = ddf.map_partitions(calc_tld_mem).compute() + tld_mem = tld_mem.groupby(tld_mem.index).sum().divide(1024**3).to_pandas() + grps = split_into_groups(tld_mem,cutoff) + return grps + +def nested_list_to_log(nest,file): + """ + Writes a list of lists to a text log + + Args: + nest (list): A list of lists to be converted. + """ + with open(file, 'w', newline='') as f: + for l in nest: + f.write(f"{','.join(l)}\n") + +def prep_hivize(args): + grps = define_tld_groups(args.get('parquet_path'),args.get('cutoff')) + misc_path = args.get('parquet_path').parent.joinpath('misc','tld_grps.txt') + misc_path.parent.mkdir(exist_ok = True, parents = True) + nested_list_to_log(grps,misc_path) + ngroups = len(grps) + grp_file = str(misc_path) + args.update({'ngroups':ngroups, + 'grp_file':grp_file}) + return args + +def hivize( + parquet_path: str | Path, + hive_path: str | Path, + tld: str | List[str] | None = None, + staging_path: str | Path | None = None, + partition_size: str = '100MiB', + with_cuda: bool | Literal['infer'] = 'infer', + write_metadata: bool = True, + **kwargs + ) -> None: + parquet_path = as_path(parquet_path) + hive_path = as_path(hive_path) + + if staging_path is None: + rand_str = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) + staging_path = Path(os.getenv('TMPDIR')).joinpath(os.getenv('USER'),f'hive-{rand_str}') + print(f"INFO: Using {staging_path} as temporary directory",flush=True) + else: + staging_path = as_path(staging_path) + + hive_path.mkdir(exist_ok=True,parents=True) + staging_path.mkdir(exist_ok=True,parents=True) + + if with_cuda == 'infer': + with_cuda = infer_cuda() + + if with_cuda: + import dask_cudf as backend + from dask_cudf.core import from_cudf as from_local + dask.config.set({'dataframe.backend':'cudf'}) + rmm.reinitialize( + pool_allocator=True, + managed_memory=True, + initial_pool_size='60GiB' + ) + else: + import dask as backend + from dask.dataframe import from_pandas as from_local + dask.config.set({'dataframe.backend':'pandas'}) + + def indexed_name(ind): + return f"indexed-{ind}.parquet" + + if tld is not None: + if not isinstance(tld,list): + tld = [tld] + predicates = [('tld','in',tld)] + else: + predicates = None + print(f"DEBUG: Filtering predicates are: {predicates}",flush=True) + + acq = re.search(r'[\d]{4}-[\d]{2}-[\d]{2}',str(parquet_path)).group(0) + print(f"DEBUG: Acquisition date is {acq}",flush=True) + + # The flat parquet is initially read in via dask to avoid reading the full dataset into memory which happens even + # when including predicates for filtering. The dask dataframe is converted to a regular dataframe to drastically + # improve indexing and sorting by removing partitions. The sorted dataframe is converted back to a dask dataframe + # to create partitions within the parquet dataset and write to multiple files defined by those partitions. + ddf = backend.read_parquet(parquet_path,filters=predicates,columns = ['size','kballoc','access','create','modify','uid','gid','path','tld']) + df = ddf.compute() + df = df.set_index('path').sort_index().assign(acq=acq) + ddf = from_local(df).repartition(partition_size=partition_size,force=True) + ddf.to_parquet(staging_path,partition_on=['tld','acq'],name_function = indexed_name) + + if write_metadata: + ( + df + .groupby(['tld','acq']) + .apply(lambda x: + write_dataset_metadata( + x, + staging_path.joinpath(f"tld={x['tld'].iloc[0]}",f"acq={x['acq'].iloc[0]}") + ) + ) + ) + + shutil.copytree(staging_path,hive_path,dirs_exist_ok=True) + shutil.rmtree(staging_path) + pass + +def bytes_to_human_readable_size(num_bytes): + units = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB'] + + # Handle the case where num_bytes is 0 + if num_bytes == 0: + return "0 B" + + # Calculate the appropriate unit. Take the floor if the number is not divisible by 1024 + unit_index = min(len(units) - 1, int(np.log2(num_bytes) // 10)) + num = int(num_bytes // (1024 ** unit_index)) + + # Format the number to 2 decimal places and append the unit + return f"{num} {units[unit_index]}" + +def create_bin_labels(bins, include_outer_bins = True): + if any([b < 0 for b in bins]): + raise ValueError("File size cannot be less than 0 B") + bins = list(set(bins)) + bins.sort() # Sorts and removes any duplicates + if np.isinf(bins[-1]): + bins.remove(np.inf) + + labels = [] + + # Check for including a bin for all files less than the smallest value if + if include_outer_bins and bins[0] != 0: + labels.append(f"0 B-{bytes_to_human_readable_size(bins[0])}") + + for i in range(len(bins) - 1): + lower_bound = bytes_to_human_readable_size(bins[i]) + upper_bound = bytes_to_human_readable_size(bins[i + 1]) + labels.append(f"{lower_bound}-{upper_bound}") + + if include_outer_bins: + labels.append(f">{bytes_to_human_readable_size(bins[-1])}") + return labels,bins + +def calculate_size_distribution( + sizes: pd.Series | cudf.Series, + size_bins: List[int | float] = [0,4096] + [x*1024**3 for x in [1,10,50,100]] + [1024**4], + include_outer_bins: bool = True, + **kwargs +) -> pd.Series | cudf.Series: + + size_labels,size_bins = create_bin_labels(size_bins,include_outer_bins) + + if include_outer_bins: + if size_bins[0] < 0: + size_bins.insert(0,0) + if float("inf") not in size_bins: + size_bins.append(float("inf")) + + fn = pd.cut if isinstance(sizes,pd.Series) else cudf.cut + size_grps = fn(sizes,bins=size_bins,labels=size_labels,ordered=True,include_lowest=True,**kwargs) + + return size_grps + +def write_dataset_metadata(df: cudf.DataFrame | pd.DataFrame, parquet_path: str | Path) -> dict: + parquet_path = as_path(parquet_path) + if isinstance(df,cudf.DataFrame): + vram_usage = df.memory_usage(deep=True).to_dict() + ram_usage = df.to_pandas().memory_usage(deep=True).to_dict() + else: + if len(GPUtil.getAvailable()) > 0: + vram_usage = cudf.from_pandas(df).memory_usage(deep=True).to_dict() + else: + vram_usage = 'NA' + ram_usage = df.memory_usage(deep=True).to_dict() + + df = df[['size']] + df['grp'] = calculate_size_distribution(df['size']) + + size_dist = ( + df + .groupby('grp',observed=True)['size'] + .agg(['count','sum']) + .rename(columns={'count':'file_count','sum':'bytes'}) + .T + .to_dict() + ) + + metadata = { + 'num_rows': df.shape[0], + 'num_columns': df.shape[1], + 'vram_usage': vram_usage, + 'ram_usage': ram_usage, + 'size_distribution': size_dist + } + + with open(parquet_path.joinpath('_metadata.json'),'w') as f: + json.dump(metadata,f) \ No newline at end of file -- GitLab From 632ddeb46a491c2e3490f970fae66b7e6f7f89f3 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Fri, 7 Mar 2025 11:41:09 -0600 Subject: [PATCH 08/10] moved business logic imports into the main function below where args are parsed to avoid having to load large modules just to print help messages --- src/rc_gpfs/cli/convert_to_parquet.py | 4 +++- src/rc_gpfs/cli/fparq_cli.py | 6 +++--- src/rc_gpfs/cli/split_log.py | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/rc_gpfs/cli/convert_to_parquet.py b/src/rc_gpfs/cli/convert_to_parquet.py index f035843..5b6e5ef 100644 --- a/src/rc_gpfs/cli/convert_to_parquet.py +++ b/src/rc_gpfs/cli/convert_to_parquet.py @@ -4,7 +4,6 @@ import subprocess from pathlib import Path import multiprocessing from .utils import define_python_interpreter,batch_parser,setup_slurm_logs -from ..policy import convert from ..utils import parse_scontrol __all__ = ['convert_to_parquet'] @@ -77,6 +76,9 @@ def submit_batch(**kwargs): def convert_to_parquet() -> None: args = parse_args() + + from ..policy import convert + if args['output_dir'] is None: args['output_dir'] = args['input'].parent.joinpath('parquet') diff --git a/src/rc_gpfs/cli/fparq_cli.py b/src/rc_gpfs/cli/fparq_cli.py index 34fd6b0..da60a8a 100644 --- a/src/rc_gpfs/cli/fparq_cli.py +++ b/src/rc_gpfs/cli/fparq_cli.py @@ -1,8 +1,5 @@ import argparse from pathlib import Path -import pandas as pd - -from ..process import fparq DESCRIPTION = """ gpfspart is a custom implementation of the fpart algorithm specifically designed to work with processed GPFS policy outputs. fpart crawls a directory tree partitioning the files by size up to a specified max and stores them in a number of lists. These lists are useful for passing to rsync and its derivative implementations to load balance and parallelize large data transfers. However, fpart has to crawl a file tree to create these partitions which can impact performance on very large, network file systems such as GPFS. @@ -62,6 +59,9 @@ def parse_args(): def fparq_cli(): args = parse_args() + import pandas as pd + from ..process import fparq + if args.get('partition_path') is None: pq_path = args.get('parquet_path') args.update({'partition_path': pq_path.joinpath('_partitions')}) diff --git a/src/rc_gpfs/cli/split_log.py b/src/rc_gpfs/cli/split_log.py index 90c2fee..44c8461 100644 --- a/src/rc_gpfs/cli/split_log.py +++ b/src/rc_gpfs/cli/split_log.py @@ -3,7 +3,6 @@ import argparse import subprocess from pathlib import Path from .utils import define_python_interpreter,batch_parser,setup_slurm_logs -from ..policy.split import split BATCH_SCRIPT = """\ #!/bin/bash @@ -69,5 +68,6 @@ def split_log(): if args.get('batch'): submit_batch(**args) else: + from ..policy.split import split split(**args) pass \ No newline at end of file -- GitLab From cc37724e4badec4c87fde9e76e11514ff6d0a114 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Fri, 7 Mar 2025 12:25:25 -0600 Subject: [PATCH 09/10] fix import statement to import from the submodule instead of the policy module --- src/rc_gpfs/cli/convert_to_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rc_gpfs/cli/convert_to_parquet.py b/src/rc_gpfs/cli/convert_to_parquet.py index 5b6e5ef..1e00596 100644 --- a/src/rc_gpfs/cli/convert_to_parquet.py +++ b/src/rc_gpfs/cli/convert_to_parquet.py @@ -77,7 +77,7 @@ def submit_batch(**kwargs): def convert_to_parquet() -> None: args = parse_args() - from ..policy import convert + from ..policy.convert import convert if args['output_dir'] is None: args['output_dir'] = args['input'].parent.joinpath('parquet') -- GitLab From 57318418a3cdbb7dc96d4a577305dfc24aa3ee2d Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Fri, 7 Mar 2025 13:13:03 -0600 Subject: [PATCH 10/10] resolve into absolute paths to correctly extract date even when the full path to the parquet dataset is not given --- src/rc_gpfs/policy/hive.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rc_gpfs/policy/hive.py b/src/rc_gpfs/policy/hive.py index 9dca089..29c9f16 100644 --- a/src/rc_gpfs/policy/hive.py +++ b/src/rc_gpfs/policy/hive.py @@ -86,8 +86,8 @@ def hivize( write_metadata: bool = True, **kwargs ) -> None: - parquet_path = as_path(parquet_path) - hive_path = as_path(hive_path) + parquet_path = as_path(parquet_path).resolve() + hive_path = as_path(hive_path).resolve() if staging_path is None: rand_str = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) -- GitLab