diff --git a/src/rc_gpfs/__init__.py b/src/rc_gpfs/__init__.py index 3ff8aafb283c19a37cefeefdd4979757d6a44c6c..52a1dfab1ab3ec32fb0ce6b45ee2ecaa1d518646 100644 --- a/src/rc_gpfs/__init__.py +++ b/src/rc_gpfs/__init__.py @@ -1 +1,3 @@ -from .__version__ import __version__, __version_tuple__ \ No newline at end of file +from .__version__ import __version__, __version_tuple__ + +__all__ = [__version__, __version_tuple__] \ No newline at end of file diff --git a/src/rc_gpfs/cli/__init__.py b/src/rc_gpfs/cli/__init__.py index c5e66dbcd6f50d016ed18fcb1183bb27ce008fb2..53cb54e73051f1ce1b9ef059c7b896663f64309e 100644 --- a/src/rc_gpfs/cli/__init__.py +++ b/src/rc_gpfs/cli/__init__.py @@ -1,4 +1,6 @@ from .convert_flat_to_hive import convert_flat_to_hive from .convert_to_parquet import convert_to_parquet +from .fparq_cli import fparq_cli from .split_log import split_log -from .fparq_cli import fparq_cli \ No newline at end of file + +__all__ = [convert_flat_to_hive, convert_to_parquet, fparq_cli, split_log] \ No newline at end of file diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py index b2e17f5a376b6df2e05397c933a4b6cb0bc173e2..64ee8822e933ea4655d701a48f36dceebc94e0ec 100644 --- a/src/rc_gpfs/cli/convert_flat_to_hive.py +++ b/src/rc_gpfs/cli/convert_flat_to_hive.py @@ -1,16 +1,17 @@ import argparse -import subprocess +import random import re +import subprocess import time -import random from pathlib import Path +from typing import List + import numpy as np import polars as pl -from typing import List from ..policy.hive import hivize -from .utils import define_python_interpreter, batch_parser_no_mem, setup_slurm_logs from ..utils import get_parquet_dataset_size +from .utils import batch_parser_no_mem, define_python_interpreter, setup_slurm_logs DESCRIPTION = """ Converts flat parquet GPFS datasets to a hive format partitioned by tld and log acquisition date. This creates a timeseries of structured datasets for each tld for much easier more efficient log comparisons within tld. Output parquets are sorted by the path column for convenience, but no index is set. diff --git a/src/rc_gpfs/cli/convert_to_parquet.py b/src/rc_gpfs/cli/convert_to_parquet.py index 5c94df4e0f5a09cacfe347c18e3395699a36c13b..ede25587ea22728cff8ae81cd89d8da7735124c3 100644 --- a/src/rc_gpfs/cli/convert_to_parquet.py +++ b/src/rc_gpfs/cli/convert_to_parquet.py @@ -1,15 +1,15 @@ import argparse -import time +import multiprocessing import random import subprocess +import time from pathlib import Path -import multiprocessing from ..policy.convert import convert, set_output_filename -from .utils import define_python_interpreter,batch_parser,setup_slurm_logs from ..utils import parse_scontrol +from .utils import batch_parser, define_python_interpreter, setup_slurm_logs -__all__ = ['convert_to_parquet'] +__all__ = ["convert_to_parquet"] DESCRIPTION = """ Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n @@ -23,27 +23,49 @@ Local Parallel Processing: If processing is done via a local parallel pool, the requested cores need to be accessible by the invoking Python process. When run in a Slurm job context where the number of cores were only specified with the --ntasks property, only 1 core will be available to the Python process regardless of the number of cores requested by the job. Instead, use the --cpus-per-task property to set the number of cores paired with --ntasks=1. This will correctly allow the parallel pool to utilize all cores assigned to the job. """ + def parse_args(arg_str=None): parser = argparse.ArgumentParser( description=DESCRIPTION, - parents=[batch_parser(partition='amd-hdr100,express',time='02:00:00',mem='16G',cpus_per_task=1)], - epilog=EPILOGUE + parents=[ + batch_parser( + partition="amd-hdr100,express", + time="02:00:00", + mem="16G", + cpus_per_task=1, + ) + ], + epilog=EPILOGUE, + ) + parser.add_argument( + "input", + type=Path, + help="Path to a log file or directory of log files from mmlspolicy to be converted to parquet. If a directory path is given, all files within the directory will be converted. If the --batch option is set, this will be done in a batch array job. Otherwise, it will be done in a local parallel pool using available compute resources.", + ) + + parser.add_argument( + "-o", + "--output-dir", + type=Path, + default=None, + help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../parquet", + ) + parser.add_argument( + "--pool-size", + type=int, + default=None, + help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process", + ) + parser.add_argument( + "--no-clobber", + action="store_true", + default=False, + help="When set, skips any log chunks that already have corresponding parquet files. Chunks without a parquet file are processed as normal.", ) - parser.add_argument('input', - type=Path, - help="Path to a log file or directory of log files from mmlspolicy to be converted to parquet. If a directory path is given, all files within the directory will be converted. If the --batch option is set, this will be done in a batch array job. Otherwise, it will be done in a local parallel pool using available compute resources.") - - parser.add_argument('-o','--output-dir', - type=Path, - default = None, - help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../parquet") - parser.add_argument('--pool-size',type=int,default=None, - help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process") - parser.add_argument('--no-clobber', action='store_true',default=False, - help='When set, skips any log chunks that already have corresponding parquet files. Chunks without a parquet file are processed as normal.') args = parser.parse_args(arg_str) return vars(args) + BATCH_SCRIPT = """\ #!/bin/bash # @@ -65,13 +87,16 @@ log=$(ls {input}/*.gz | sort | awk "NR==${{idx}} {{ print $1 }}") convert-to-parquet {no_clobber_opt} -o {output_dir} ${{log}} """ + def submit_batch(**kwargs): - env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env')) - kwargs.update({"env_cmd":env_cmd}) - - slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'parquet') + env_cmd = define_python_interpreter( + kwargs.get("python_path"), kwargs.get("conda_env") + ) + kwargs.update({"env_cmd": env_cmd}) + + slurm_logs = setup_slurm_logs(kwargs.get("slurm_log_dir"), "parquet") kwargs.update(slurm_logs) - + script = BATCH_SCRIPT.format(**kwargs) # Wait between 1 and 5 seconds before batch submission. This helps avoid a situation where this setup is running in @@ -81,11 +106,19 @@ def submit_batch(**kwargs): time.sleep(random.uniform(1, 5)) try: - subprocess.run(['sbatch'],input=script,shell=True,text=True,check=True,capture_output=True) + subprocess.run( + ["sbatch"], + input=script, + shell=True, + text=True, + check=True, + capture_output=True, + ) except subprocess.CalledProcessError as e: raise subprocess.CalledProcessError(e.stderr.decode()) pass + def _find_sequential_indexes(idxs): if not idxs: return [] @@ -113,67 +146,66 @@ def _find_sequential_indexes(idxs): return result + def _get_missing_indexes(chunks, parquets): missing_indexes = [ index for index, element in enumerate(chunks) if element not in parquets ] return missing_indexes + def convert_to_parquet() -> None: args = parse_args() - if args['output_dir'] is None: - if args['input'].is_file(): - args['output_dir'] = args['input'].parent.parent.joinpath('parquet') + if args["output_dir"] is None: + if args["input"].is_file(): + args["output_dir"] = args["input"].parent.parent.joinpath("parquet") else: args["output_dir"] = args["input"].parent.joinpath("parquet") - args['output_dir'].mkdir(exist_ok = True, mode = 0o2770) + args["output_dir"].mkdir(exist_ok=True, mode=0o2770) - if args['input'].is_file(): + if args["input"].is_file(): nlogs = 1 else: - logs = list(args.get('input').glob('*.gz')) + logs = list(args.get("input").glob("*.gz")) nlogs = len(logs) - + if args["no_clobber"]: args.update({"no_clobber_opt": "--no-clobber"}) - if args['input'].is_dir(): + if args["input"].is_dir(): chunks = logs chunks.sort() else: chunks = [args["input"]] - + pqs = [f.name for f in args["output_dir"].glob("*.parquet")] target_pqs = [set_output_filename(f) for f in chunks] - idxs_to_run = _get_missing_indexes(target_pqs,pqs) - + idxs_to_run = _get_missing_indexes(target_pqs, pqs) + if len(idxs_to_run) == 0: - print("INFO: All log chunks have been converted to parquet. Exiting without processing") + print( + "INFO: All log chunks have been converted to parquet. Exiting without processing" + ) return - + array_idxs = _find_sequential_indexes(idxs_to_run) - args['array_idxs'] = ','.join(array_idxs) + args["array_idxs"] = ",".join(array_idxs) else: - args.update( - { - "no_clobber_opt": "", - "array_idxs" : f"0-{nlogs-1}" - } - ) - - args.update({'nlogs':nlogs}) + args.update({"no_clobber_opt": "", "array_idxs": f"0-{nlogs - 1}"}) + + args.update({"nlogs": nlogs}) - if args['batch']: + if args["batch"]: submit_batch(**args) elif nlogs > 1: - ncpus,_ = parse_scontrol() - pool_size = args.get('pool_size',ncpus) + ncpus, _ = parse_scontrol() + pool_size = args.get("pool_size", ncpus) with multiprocessing.Pool(processes=pool_size) as pool: - fargs = list(zip(logs,[args['output_dir']]*nlogs)) + fargs = list(zip(logs, [args["output_dir"]] * nlogs)) pool.starmap(convert, fargs) else: - convert(args['input'],args['output_dir']) + convert(args["input"], args["output_dir"]) diff --git a/src/rc_gpfs/cli/fparq_cli.py b/src/rc_gpfs/cli/fparq_cli.py index be3a1c054158bdc3ae41ec966b6d5fefd6c123a7..a2ea26d7c9a8cdbb328c9549c3a25d5752a7be3d 100644 --- a/src/rc_gpfs/cli/fparq_cli.py +++ b/src/rc_gpfs/cli/fparq_cli.py @@ -12,59 +12,86 @@ Tiny and Large Files Options are included to specify minimum and maximum size cutoffs to define which files are included in the default grouping. These excluded files can then either be partitioned separately or not at all with another option. This was written to optimize functions such as parsyncfp2 which have built-in options to process tiny or large files differently. For example, parsyncfp2 will tar tiny files togather and transfer the tarball as well as chunk large files and transfer chunks concurrently. """ + def parse_args(): parser = argparse.ArgumentParser( - description=DESCRIPTION, - formatter_class=argparse.RawTextHelpFormatter + description=DESCRIPTION, formatter_class=argparse.RawTextHelpFormatter + ) + parser.add_argument( + "parquet_path", + type=Path, + help="Input path for the parquet GPFS dataset to chunk", + ) + parser.add_argument( + "-p", + "--partition-path", + type=Path, + default=None, + help="Path to write partition files. Defaults to ${{parquet_path}}/_partitions", + ) + parser.add_argument( + "-m", + "--max-part-size", + type=str, + default="50GiB", + help="Max combined size of all files in a partition. This can be specified either as a human-readable byte string (e.g. 10M[[i]B], 100G[[i]B]) or as a raw integer. Byte strings will be interpreted as base 2 (e.g 1kB is always 1024 bytes)", + ) + parser.add_argument( + "-f", + "--max-part-files", + type=int, + default=None, + help="Maximum number of files to include in any partition. Works with --max-size where all partitions meet both criteria", + ) + parser.add_argument( + "-t", + "--tiny-size", + type=str, + default=None, + help="Max size of file to be specified as 'tiny'. Tiny files are partitioned separately from other files by default. They can be excluded entirely using the --exclude-nonstandard flag", + ) + parser.add_argument( + "--max-tiny-part-size", + type=str, + default="1GiB", + help="Max partition size for tiny files", + ) + parser.add_argument( + "--max-tiny-part-files", + type=int, + default=250000, + help="Max number of files in a partition of tiny files", + ) + parser.add_argument( + "-b", + "--big-size", + type=str, + default=None, + help="Minimum file size to specified as 'big'. Files above this limit will be assigned to their own unique partition. This value is implicitly set to the max partition size. Setting this value above the max partition size would have no effect. These files can be excluded entirely using the --exclude-nonstandard flag", + ) + parser.add_argument( + "--exclude-nonstandard", + default=False, + action="store_true", + help="Exclude all tiny and big files from partitioning. Partitions will only include files between tiny-size and big-size.", ) - parser.add_argument('parquet_path', - type=Path, - help="Input path for the parquet GPFS dataset to chunk") - parser.add_argument('-p','--partition-path', - type=Path, - default=None, - help="Path to write partition files. Defaults to ${{parquet_path}}/_partitions") - parser.add_argument('-m','--max-part-size', - type=str, - default='50GiB', - help="Max combined size of all files in a partition. This can be specified either as a human-readable byte string (e.g. 10M[[i]B], 100G[[i]B]) or as a raw integer. Byte strings will be interpreted as base 2 (e.g 1kB is always 1024 bytes)") - parser.add_argument('-f','--max-part-files', - type=int, - default=None, - help="Maximum number of files to include in any partition. Works with --max-size where all partitions meet both criteria") - parser.add_argument('-t','--tiny-size', - type=str, - default=None, - help="Max size of file to be specified as 'tiny'. Tiny files are partitioned separately from other files by default. They can be excluded entirely using the --exclude-nonstandard flag") - parser.add_argument('--max-tiny-part-size', - type=str, - default='1GiB', - help="Max partition size for tiny files") - parser.add_argument('--max-tiny-part-files', - type=int, - default=250000, - help="Max number of files in a partition of tiny files") - parser.add_argument('-b','--big-size', - type=str, - default=None, - help="Minimum file size to specified as 'big'. Files above this limit will be assigned to their own unique partition. This value is implicitly set to the max partition size. Setting this value above the max partition size would have no effect. These files can be excluded entirely using the --exclude-nonstandard flag") - parser.add_argument('--exclude-nonstandard', - default=False, - action="store_true", - help="Exclude all tiny and big files from partitioning. Partitions will only include files between tiny-size and big-size.") args = parser.parse_args() return vars(args) + def fparq_cli(): args = parse_args() import polars as pl + from ..process import fparq - if args.get('partition_path') is None: - pq_path = args.get('parquet_path') - args.update({'partition_path': pq_path.joinpath('_partitions')}) + if args.get("partition_path") is None: + pq_path = args.get("parquet_path") + args.update({"partition_path": pq_path.joinpath("_partitions")}) - df = pl.read_parquet(args.get('parquet_path').joinpath('*.parquet'), columns=['path','size']) - fparq(df,**args) + df = pl.read_parquet( + args.get("parquet_path").joinpath("*.parquet"), columns=["path", "size"] + ) + fparq(df, **args) diff --git a/src/rc_gpfs/cli/split_log.py b/src/rc_gpfs/cli/split_log.py index a535893eae316f63a676bb230ecc1a2e8d3b2b7d..f8fc7c38a3ccd1c2fbb7c399f62ed4708f1ff26b 100644 --- a/src/rc_gpfs/cli/split_log.py +++ b/src/rc_gpfs/cli/split_log.py @@ -1,8 +1,9 @@ -import sys import argparse import subprocess +import sys from pathlib import Path -from .utils import define_python_interpreter,batch_parser,setup_slurm_logs + +from .utils import batch_parser, define_python_interpreter, setup_slurm_logs BATCH_SCRIPT = """\ #!/bin/bash @@ -21,56 +22,89 @@ BATCH_SCRIPT = """\ split-log -o {output_dir} -l {lines} {log} """ + def parse_args(): parser = argparse.ArgumentParser( description="Splits a GPFS policy log into multiple parts for batch array processing.", - parents=[batch_parser(cpus_per_task=24,time='02:00:00',mem='8G',gpus=0,partition='amd-hdr100,express')] + parents=[ + batch_parser( + cpus_per_task=24, + time="02:00:00", + mem="8G", + gpus=0, + partition="amd-hdr100,express", + ) + ], + ) + parser.add_argument( + "log", + type=Path, + help="Path to a raw GPFS log file. The log can be either uncompressed or gz compressed.", + ) + + parser.add_argument( + "-o", + "--output-dir", + type=Path, + default=None, + help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../chunks", + ) + parser.add_argument( + "-l", + "--lines", + type=int, + default=int(5e6), + help="Number of lines to split the log file by", + ) + parser.add_argument( + "--no-clobber", + action="store_true", + default=False, + help="When set and existing split logs are found, immediately exits without any processing", ) - parser.add_argument('log', - type=Path, - help="Path to a raw GPFS log file. The log can be either uncompressed or gz compressed.") - - parser.add_argument('-o','--output-dir', - type=Path, - default = None, - help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../chunks") - parser.add_argument('-l', '--lines', type=int, default=int(5e6), - help="Number of lines to split the log file by") - parser.add_argument('--no-clobber', action='store_true',default=False, - help='When set and existing split logs are found, immediately exits without any processing') args = parser.parse_args() return vars(args) + def submit_batch(**kwargs): env_cmd = define_python_interpreter() - kwargs.update({"env_cmd":env_cmd}) - - slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'split') + kwargs.update({"env_cmd": env_cmd}) + + slurm_logs = setup_slurm_logs(kwargs.get("slurm_log_dir"), "split") kwargs.update(slurm_logs) script = BATCH_SCRIPT.format(**kwargs) try: - subprocess.run(['sbatch'],input=script,shell=True,text=True,check=True,capture_output=True) + subprocess.run( + ["sbatch"], + input=script, + shell=True, + text=True, + check=True, + capture_output=True, + ) except subprocess.CalledProcessError as e: raise subprocess.CalledProcessError(e.stderr.decode()) pass + def split_log(): args = parse_args() - if args['output_dir'] is None: - args['output_dir'] = args['log'].parent.parent.joinpath('chunks') - args['output_dir'].mkdir(exist_ok = True, mode = 0o2770) + if args["output_dir"] is None: + args["output_dir"] = args["log"].parent.parent.joinpath("chunks") + args["output_dir"].mkdir(exist_ok=True, mode=0o2770) - output_files_exist = len(list(args['output_dir'].glob('*.gz'))) > 0 - if args['no_clobber'] and output_files_exist: - sys.exit('The output directory already contains split log files. Exiting') + output_files_exist = len(list(args["output_dir"].glob("*.gz"))) > 0 + if args["no_clobber"] and output_files_exist: + sys.exit("The output directory already contains split log files. Exiting") - if args.get('batch'): + if args.get("batch"): submit_batch(**args) else: from ..policy.split import split + split(**args) - pass \ No newline at end of file + pass diff --git a/src/rc_gpfs/cli/utils.py b/src/rc_gpfs/cli/utils.py index e879259e8cbf9581a229fbbaa2e96fbcf08be5d4..63326f2f92c7898454aa1adcb6e40f5f9ba88de9 100644 --- a/src/rc_gpfs/cli/utils.py +++ b/src/rc_gpfs/cli/utils.py @@ -1,32 +1,37 @@ import argparse -import sys import os +import sys from pathlib import Path + def define_python_interpreter(python_path=None, conda_env=None): conda_base = "module load Anaconda3\nconda activate {conda_env}" venv_base = "source {python_path}" if conda_env is not None: - env = conda_base.format(conda_env=conda_env) + env = conda_base.format(conda_env=conda_env) elif python_path is not None: parent = Path(python_path).absolute().parent - env = venv_base.format(python_path=parent.joinpath('activate')) + env = venv_base.format(python_path=parent.joinpath("activate")) else: - conda_env = os.environ.get('CONDA_PREFIX') + conda_env = os.environ.get("CONDA_PREFIX") if conda_env: - env = conda_base.format(conda_env=conda_env) + env = conda_base.format(conda_env=conda_env) else: parent = Path(sys.executable).absolute().parent - env = venv_base.format(python_path=parent.joinpath('activate')) + env = venv_base.format(python_path=parent.joinpath("activate")) return env + class CustomHelpFormatter(argparse.MetavarTypeHelpFormatter): def add_arguments(self, actions): # Sort actions by their group title - actions = sorted(actions, key=lambda x: x.container.title if x.container.title else '') + actions = sorted( + actions, key=lambda x: x.container.title if x.container.title else "" + ) super(CustomHelpFormatter, self).add_arguments(actions) + def batch_parser_no_mem( cpus_per_task: int | None = None, gpus: int | None = None, @@ -60,35 +65,49 @@ def batch_parser_no_mem( ) return parser + def batch_parser( - cpus_per_task: int | None = None, - gpus: int | None = None, - partition: str | None = None, - mem: str | None = None, - time: str | None = '12:00:00', - reservation: str | None = None, - slurm_log_dir: str | Path | None = './out', - **kwargs + cpus_per_task: int | None = None, + gpus: int | None = None, + partition: str | None = None, + mem: str | None = None, + time: str | None = "12:00:00", + reservation: str | None = None, + slurm_log_dir: str | Path | None = "./out", + **kwargs, ) -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(add_help=False, - formatter_class=CustomHelpFormatter) - slurm = parser.add_argument_group(title='Slurm Options') - slurm.add_argument('--batch', action='store_true', default=False, - help="Convert as a batch array job.") - slurm.add_argument('-n', '--ntasks', type=int, default=1) - slurm.add_argument('-c', '--cpus-per-task', type=int, default=cpus_per_task) - slurm.add_argument('-g', '--gpus', type=int, default=gpus, choices=[0, 1]) - slurm.add_argument('-p', '--partition', type=str, default=partition) - slurm.add_argument('-t', '--time', type=str, default=time) - slurm.add_argument('-m', '--mem', type=str, default=mem) - slurm.add_argument('--reservation',type=str,default=reservation) - slurm.add_argument('--slurm-log-dir', type=Path, default=slurm_log_dir, - help='Output log directory. If the directory does not exist, it will be created automatically.') + parser = argparse.ArgumentParser( + add_help=False, formatter_class=CustomHelpFormatter + ) + slurm = parser.add_argument_group(title="Slurm Options") + slurm.add_argument( + "--batch", + action="store_true", + default=False, + help="Convert as a batch array job.", + ) + slurm.add_argument("-n", "--ntasks", type=int, default=1) + slurm.add_argument("-c", "--cpus-per-task", type=int, default=cpus_per_task) + slurm.add_argument("-g", "--gpus", type=int, default=gpus, choices=[0, 1]) + slurm.add_argument("-p", "--partition", type=str, default=partition) + slurm.add_argument("-t", "--time", type=str, default=time) + slurm.add_argument("-m", "--mem", type=str, default=mem) + slurm.add_argument("--reservation", type=str, default=reservation) + slurm.add_argument( + "--slurm-log-dir", + type=Path, + default=slurm_log_dir, + help="Output log directory. If the directory does not exist, it will be created automatically.", + ) return parser -def setup_slurm_logs(slurm_log_dir,log_basename): + +def setup_slurm_logs(slurm_log_dir, log_basename): slurm_log_dir = slurm_log_dir.absolute() - slurm_log_dir.mkdir(exist_ok = True,parents=True,mode = 0o2770) - out_log,err_log = [str(slurm_log_dir.joinpath(f'{log_basename}_%A_%a.out')),str(slurm_log_dir.joinpath(f'{log_basename}_%A_%a.err'))] - slurm_logs = {'output_log':out_log,'error_log':err_log} - return slurm_logs \ No newline at end of file + slurm_log_dir.mkdir(exist_ok=True, parents=True, mode=0o2770) + out_log, err_log = [ + str(slurm_log_dir.joinpath(f"{log_basename}_%A_%a.out")), + str(slurm_log_dir.joinpath(f"{log_basename}_%A_%a.err")), + ] + slurm_logs = {"output_log": out_log, "error_log": err_log} + return slurm_logs diff --git a/src/rc_gpfs/db/__init__.py b/src/rc_gpfs/db/__init__.py index 2d3eee9c34c99764449776d4d6468dbe10edabd9..1b5049335222216865394d230ebd70fb65e992bf 100644 --- a/src/rc_gpfs/db/__init__.py +++ b/src/rc_gpfs/db/__init__.py @@ -1,2 +1,3 @@ +from .utils import create_db, df_to_sql -from .utils import create_db, df_to_sql \ No newline at end of file +__all__ = [create_db, df_to_sql] diff --git a/src/rc_gpfs/db/utils.py b/src/rc_gpfs/db/utils.py index af816a05f9fa8c50bdd93a3273098d715d105ecb..467a30c3de4a67ad804fc3e6d939a86083ef641d 100644 --- a/src/rc_gpfs/db/utils.py +++ b/src/rc_gpfs/db/utils.py @@ -1,13 +1,14 @@ -import polars as pl from pathlib import Path + +import polars as pl from sqlalchemy import create_engine, text from ..utils import as_path -__all__= ['create_db','df_to_sql'] +__all__ = ["create_db", "df_to_sql"] -# Definitions are fairly self-explanatory except for modified bytes. Modified bytes are calculated in two different +# Definitions are fairly self-explanatory except for modified bytes. Modified bytes are calculated in two different # ways: # 1. modified_bytes_new: total storage used by the new version of the modified files # 2. modified_bytes_net: net storage difference between old and new versions of modified files @@ -27,25 +28,36 @@ accessed_bytes INTEGER, PRIMARY KEY (tld, log_dt) """ -CHURN_TBL_COLS = ['created','created_bytes','deleted','deleted_bytes', - 'modified','modified_bytes','modified_bytes_net', - 'accessed','accessed_bytes'] +CHURN_TBL_COLS = [ + "created", + "created_bytes", + "deleted", + "deleted_bytes", + "modified", + "modified_bytes", + "modified_bytes_net", + "accessed", + "accessed_bytes", +] + def create_db(db_path: str | Path, table: str, definition: str): db_path = as_path(db_path) - + db_path.parent.mkdir(exist_ok=True, parents=True) engine = create_engine(f"sqlite:///{db_path}") with engine.connect() as conn: conn.execute(text(f"CREATE TABLE IF NOT EXISTS {table} ({definition})")) + def df_to_sql(df: pl.DataFrame, path: Path | str, table: str): path = as_path(path) uri = f"sqlite:///{path}" - df.write_database(table,connection=uri,if_table_exists='append') + df.write_database(table, connection=uri, if_table_exists="append") + def create_churn_db(db_path: str | Path): - table = 'churn' - definition=CHURN_TBL_DEFINITION - create_db(db_path, table, definition) \ No newline at end of file + table = "churn" + definition = CHURN_TBL_DEFINITION + create_db(db_path, table, definition) diff --git a/src/rc_gpfs/policy/__init__.py b/src/rc_gpfs/policy/__init__.py index 71457c264be1b2b6b411c3a40383f2dfef7da54b..08002eb18ccd6ba56123ec7492b2521affa841cd 100644 --- a/src/rc_gpfs/policy/__init__.py +++ b/src/rc_gpfs/policy/__init__.py @@ -1,3 +1,5 @@ -from .split import split, compress_logs from .convert import convert -from .hive import hivize \ No newline at end of file +from .hive import hivize +from .split import compress_logs, split + +__all__ = [convert, hivize, compress_logs, split] \ No newline at end of file diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index 4890bd60c18c6df2abd4ed8a2ec9963b1c71b858..6f1f673f06fe962d0007e948c736a80ee4d9aebc 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -1,47 +1,53 @@ -import re import gzip +import re from pathlib import Path from urllib.parse import unquote import polars as pl -from .policy_defs import SCHEMA from ..utils import as_path +from .policy_defs import SCHEMA + def parse_line(line): try: ul = unquote(line).strip() - ul = re.sub(r'[\n\t]','',ul) - details,path = re.match(r'^[^|]+\|(.*)\| -- (.*)$', ul).groups() - - d = dict([re.match(r'([\w]+)=(.*)',l).groups() for l in details.split('|')]) + ul = re.sub(r"[\n\t]", "", ul) + details, path = re.match(r"^[^|]+\|(.*)\| -- (.*)$", ul).groups() + + d = dict( + [re.match(r"([\w]+)=(.*)", line).groups() for line in details.split("|")] + ) - grp = re.match(r'(?:/data/user(?:/home)?/|/data/project/|/scratch/)([^/]+)',path) + grp = re.match( + r"(?:/data/user(?:/home)?/|/data/project/|/scratch/)([^/]+)", path + ) if grp: tld = grp.group(1) else: tld = None - - d.update({'path': path, - 'tld': tld}) + + d.update({"path": path, "tld": tld}) return d - except: + except AttributeError: return line -def set_output_filename(input_file,output_name = None): + +def set_output_filename(input_file, output_name=None): if output_name is None: output_name = input_file.with_suffix(".parquet").name else: output_name = as_path(output_name).with_suffix(".parquet").name - + return str(output_name) + def convert( - input_file: str | Path, - output_dir: str | Path | None = None, - output_name: str | Path | None = None, - no_clobber: bool = False, - ) -> None: + input_file: str | Path, + output_dir: str | Path | None = None, + output_name: str | Path | None = None, + no_clobber: bool = False, +) -> None: """ Converts a GPFS log file to parquet format. The data schema assumes the same policy definition from list-path-external and list-path-dirplus. @@ -56,15 +62,15 @@ def convert( no_clobber : bool, optional When set to True, if output_dir/output_name.parquet already exists, exit without overwriting the existing file. If False (default), any existing parquet file will be overwritten """ - + input_file = as_path(input_file) - + if output_dir is not None: output_dir = as_path(output_dir) else: - output_dir = input_file.parent.parent.joinpath('parquet') + output_dir = input_file.parent.parent.joinpath("parquet") - output_name = set_output_filename(input_file,output_name) + output_name = set_output_filename(input_file, output_name) output_path = output_dir.joinpath(output_name) if output_path.exists() and no_clobber: @@ -72,20 +78,21 @@ def convert( "INFO: Output file already exists. Pass no_clobber=False to overwrite any existing output parquet file.", flush=True, ) - print("INFO: Cleaning and exiting.",flush=True) + print("INFO: Cleaning and exiting.", flush=True) return output_dir.mkdir(mode=0o2770, exist_ok=True) - - with gzip.open(input_file,'r') as f: - dicts = [parse_line(l) for l in f] - + + with gzip.open(input_file, "r") as f: + dicts = [parse_line(line) for line in f] + df = ( - pl.from_dicts(dicts,schema=SCHEMA) + pl.from_dicts(dicts, schema=SCHEMA) .with_columns( - pl.col(name).str.to_datetime(time_unit='ns') for name in ['access','create','modify'] + pl.col(name).str.to_datetime(time_unit="ns") + for name in ["access", "create", "modify"] ) - .sort('path') + .sort("path") ) - df.write_parquet(output_path,statistics='full') \ No newline at end of file + df.write_parquet(output_path, statistics="full") diff --git a/src/rc_gpfs/policy/hive.py b/src/rc_gpfs/policy/hive.py index db840bc862bf8df3ed9d5226e64f5b8a3fea86a9..2e0d34f3110208d14f370218f3516f5ec870a4dd 100644 --- a/src/rc_gpfs/policy/hive.py +++ b/src/rc_gpfs/policy/hive.py @@ -1,33 +1,34 @@ -import os -import re import json +import os import random -import string +import re import shutil +import string from pathlib import Path -from typing import Literal, List +from typing import List import polars as pl from ..utils import ( - as_path, - as_bytes, + as_bytes, + as_path, + calculate_age_distribution, calculate_size_distribution, - calculate_age_distribution ) + def collect_hive_df( - parquet_path: str | Path, - acq: str, - tld: str | List[str] | None = None, - hive_path: str | Path | None = None, - no_clobber: bool = False + parquet_path: str | Path, + acq: str, + tld: str | List[str] | None = None, + hive_path: str | Path | None = None, + no_clobber: bool = False, ) -> pl.DataFrame: - if not isinstance(tld,list) and tld is not None: + if not isinstance(tld, list) and tld is not None: tld = [tld] - - print("Collecting dataframe",flush=True) - + + print("Collecting dataframe", flush=True) + df = ( pl.scan_parquet(parquet_path, parallel="prefiltered", rechunk=True) .select( @@ -50,56 +51,61 @@ def collect_hive_df( df = df.filter(pl.col("tld").is_in(tld)) if no_clobber: - existing_tlds = _get_existing_hive_cells(hive_path,acq) - df = df.filter(pl.col('tld').is_in(existing_tlds).not_()) + existing_tlds = _get_existing_hive_cells(hive_path, acq) + df = df.filter(pl.col("tld").is_in(existing_tlds).not_()) - df = df.collect(engine='streaming') + df = df.collect(engine="streaming") print("Finished collecting queries", flush=True) return df + def _get_existing_hive_cells(hive_path: str | Path, acq: str): hive_path = as_path(hive_path) existing_pq = hive_path.rglob(f"*/acq={acq}/*.parquet") - existing_tlds = list(set([p.parent.parent.name.removeprefix("tld=") for p in existing_pq])) + existing_tlds = list( + set([p.parent.parent.name.removeprefix("tld=") for p in existing_pq]) + ) return existing_tlds + def hivize( - parquet_path: str | Path, - hive_path: str | Path, - tld: str | List[str] | None = None, - partition_chunk_size_bytes: int | str = '200MiB', - staging_path: str | Path | None = None, - write_metadata: bool = True, - no_clobber: bool = False, - **kwargs - ) -> None: - + parquet_path: str | Path, + hive_path: str | Path, + tld: str | List[str] | None = None, + partition_chunk_size_bytes: int | str = "200MiB", + staging_path: str | Path | None = None, + write_metadata: bool = True, + no_clobber: bool = False, + **kwargs, +) -> None: parquet_path = as_path(parquet_path).resolve() hive_path = as_path(hive_path).resolve() partition_chunk_size_bytes = int(as_bytes(partition_chunk_size_bytes)) if staging_path is None: - rand_str = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) - staging_path = Path(os.getenv('TMPDIR')).joinpath(os.getenv('USER'),f'hive-{rand_str}') - print(f"INFO: Using {staging_path} as temporary directory",flush=True) + rand_str = "".join(random.choices(string.ascii_letters + string.digits, k=8)) + staging_path = Path(os.getenv("TMPDIR")).joinpath( + os.getenv("USER"), f"hive-{rand_str}" + ) + print(f"INFO: Using {staging_path} as temporary directory", flush=True) else: staging_path = as_path(staging_path) - - hive_path.mkdir(exist_ok=True,parents=True) - staging_path.mkdir(exist_ok=True,parents=True) - + + hive_path.mkdir(exist_ok=True, parents=True) + staging_path.mkdir(exist_ok=True, parents=True) + if tld is not None: - if not isinstance(tld,list): + if not isinstance(tld, list): tld = [tld] - print(f"DEBUG: Hivizing {','.join(tld)}",flush=True) + print(f"DEBUG: Hivizing {','.join(tld)}", flush=True) else: - print(f"DEBUG: Hivizing all tlds",flush=True) + print("DEBUG: Hivizing all tlds", flush=True) - acq = re.search(r'[\d]{4}-[\d]{2}-[\d]{2}',str(parquet_path)).group(0) - print(f"DEBUG: Acquisition date is {acq}",flush=True) + acq = re.search(r"[\d]{4}-[\d]{2}-[\d]{2}", str(parquet_path)).group(0) + print(f"DEBUG: Acquisition date is {acq}", flush=True) df = collect_hive_df(parquet_path, acq, tld, hive_path, no_clobber) @@ -111,14 +117,14 @@ def hivize( shutil.rmtree(staging_path) return - print('Writing to hive') + print("Writing to hive") df.write_parquet( staging_path, - partition_by=['tld','acq'], + partition_by=["tld", "acq"], partition_chunk_size_bytes=partition_chunk_size_bytes, - statistics='full' + statistics="full", ) - print("Finished writing hive dataset",flush=True) + print("Finished writing hive dataset", flush=True) if write_metadata: tlds = df["tld"].unique().to_list() @@ -126,18 +132,16 @@ def hivize( [c for c in df.columns if c not in ["tld", "size", "modify", "access"]] ).lazy() for grp in tlds: - tdf = df.filter(pl.col('tld').eq(grp)) - output_dir=staging_path.joinpath(f"tld={grp}",f"acq={acq}") - write_dataset_metadata(tdf,output_dir,acq) + tdf = df.filter(pl.col("tld").eq(grp)) + output_dir = staging_path.joinpath(f"tld={grp}", f"acq={acq}") + write_dataset_metadata(tdf, output_dir, acq) - shutil.copytree(staging_path,hive_path,dirs_exist_ok=True) + shutil.copytree(staging_path, hive_path, dirs_exist_ok=True) shutil.rmtree(staging_path) + def write_dataset_metadata( - df: pl.DataFrame | pl.LazyFrame, - parquet_path: Path, - acq: str, - **kwargs + df: pl.DataFrame | pl.LazyFrame, parquet_path: Path, acq: str, **kwargs ) -> dict: df = df.lazy() @@ -156,15 +160,15 @@ def write_dataset_metadata( access_dist = ( df.select("access", "size") .with_columns( - calculate_age_distribution(pl.col('access'), acq, **kwargs).alias("grp") + calculate_age_distribution(pl.col("access"), acq, **kwargs).alias("grp") ) .group_by("grp") .agg([pl.sum("size").alias("bytes"), pl.count("size").alias("file_count")]) .sort("grp", descending=True) - .collect(engine='streaming') + .collect(engine="streaming") .to_dicts() ) - + modify_dist = ( df.select("modify", "size") .with_columns( @@ -178,10 +182,10 @@ def write_dataset_metadata( ) metadata = { - 'file_sizes': size_dist, - 'access_times': access_dist, - 'modify_times': modify_dist + "file_sizes": size_dist, + "access_times": access_dist, + "modify_times": modify_dist, } - with open(parquet_path.joinpath('_metadata.json'),'w') as f: - json.dump(metadata,f) \ No newline at end of file + with open(parquet_path.joinpath("_metadata.json"), "w") as f: + json.dump(metadata, f) diff --git a/src/rc_gpfs/policy/policy_defs.py b/src/rc_gpfs/policy/policy_defs.py index 2cd72c9244d0c999870b0166c7c67ffd84d86980..aa32bd27edd173347f63375e05f2442ce3155cba 100644 --- a/src/rc_gpfs/policy/policy_defs.py +++ b/src/rc_gpfs/policy/policy_defs.py @@ -1,16 +1,19 @@ import polars as pl -SCHEMA = pl.Schema({ - 'size': pl.Int64, - 'kballoc': pl.Int64, - 'access': pl.String, - 'create': pl.String, - 'modify': pl.String, - 'uid': pl.Int64, - 'gid': pl.Int64, - 'heat': pl.String, - 'pool': pl.String, - 'mode': pl.String, - 'misc': pl.String, - 'path': pl.String, - 'tld': pl.String -}) \ No newline at end of file + +SCHEMA = pl.Schema( + { + "size": pl.Int64, + "kballoc": pl.Int64, + "access": pl.String, + "create": pl.String, + "modify": pl.String, + "uid": pl.Int64, + "gid": pl.Int64, + "heat": pl.String, + "pool": pl.String, + "mode": pl.String, + "misc": pl.String, + "path": pl.String, + "tld": pl.String, + } +) diff --git a/src/rc_gpfs/policy/split.py b/src/rc_gpfs/policy/split.py index 1438a20238cab29f5817187ed189855d420dea2d..e02b588419e2fb2cf066c0c909c25475594b63d0 100644 --- a/src/rc_gpfs/policy/split.py +++ b/src/rc_gpfs/policy/split.py @@ -1,26 +1,38 @@ -from pathlib import Path import subprocess -from ..utils import parse_scontrol,as_path +from pathlib import Path + +from ..utils import as_path, parse_scontrol + +__all__ = ["split", "compress_logs"] -__all__ = ['split','compress_logs'] def is_gz(filepath) -> bool: - with open(filepath, 'rb') as f: - return f.read(2) == b'\x1f\x8b' + with open(filepath, "rb") as f: + return f.read(2) == b"\x1f\x8b" + def pigz_exists() -> bool: print("INFO: Checking for pigz") - proc = subprocess.run('which pigz',shell=True,text=True,capture_output=True) - + proc = subprocess.run("which pigz", shell=True, text=True, capture_output=True) + if proc.returncode == 1: - print("WARNING: pigz was not found on the PATH. Defaulting to slower zcat. Install pigz or add it to PATH to improve performance") + print( + "WARNING: pigz was not found on the PATH. Defaulting to slower zcat. Install pigz or add it to PATH to improve performance" + ) return False else: print(f"INFO: pigz found at {proc.stdout.strip()}") return True -def split(log: str | Path, output_dir: str | Path | None = None, - lines: int = 5e6, prefix: str = 'list-', compress: bool = True,**kwargs) -> None: + +def split( + log: str | Path, + output_dir: str | Path | None = None, + lines: int = 5e6, + prefix: str = "list-", + compress: bool = True, + **kwargs, +) -> None: """ Split a raw GPFS log file into smaller chunks. These chunks can be converted to parquet format to create a full parquet dataset for parallel analysis. Chunks are optionally recompressed after being split from the raw log. @@ -40,28 +52,29 @@ def split(log: str | Path, output_dir: str | Path | None = None, If True, compress each chunk using gzip. By default True """ log = as_path(log) - + if output_dir is None: - output_dir = log.parent.parent.joinpath('chunks') + output_dir = log.parent.parent.joinpath("chunks") else: output_dir = as_path(output_dir) - output_dir.mkdir(exist_ok=True,mode=0o2770) + output_dir.mkdir(exist_ok=True, mode=0o2770) out = output_dir.joinpath(prefix) if is_gz(log): - cat = 'pigz -dc' if pigz_exists() else 'zcat' + cat = "pigz -dc" if pigz_exists() else "zcat" else: - cat = 'cat' - + cat = "cat" + split_cmd = f"{cat} {log} | split -a 3 -d -l {int(lines)} - {out}" - subprocess.run(split_cmd,shell=True,text=True) + subprocess.run(split_cmd, shell=True, text=True) if compress: compress_logs(output_dir) pass + def compress_logs(log_dir: str | Path) -> None: """ Compress raw logs using gzip in parallel. If a directory contains both uncompressed and compressed logs, the compressed logs will be ignored. @@ -71,14 +84,14 @@ def compress_logs(log_dir: str | Path) -> None: log_dir : str | Path Path to the directory containing the raw logs. """ - nproc,_ = parse_scontrol() + nproc, _ = parse_scontrol() log_dir = as_path(log_dir) - logs = [str(p) for p in log_dir.glob('*[!gz]')] - log_str = '\n'.join(logs) + logs = [str(p) for p in log_dir.glob("*[!gz]")] + log_str = "\n".join(logs) print(f"INFO: {len(logs)} logs found. Beginning compression") - + zip_cmd = f"echo '{log_str}' | xargs -I {{}} -P {nproc} bash -c 'gzip {{}}'" subprocess.run(zip_cmd, shell=True) - pass \ No newline at end of file + pass diff --git a/src/rc_gpfs/process/__init__.py b/src/rc_gpfs/process/__init__.py index f09e3ee82185929ce08fa74931fc5aeba8e94a8f..423dc0cfb5b209a998ad190ada7e11938fe2e82d 100644 --- a/src/rc_gpfs/process/__init__.py +++ b/src/rc_gpfs/process/__init__.py @@ -1,2 +1,4 @@ -from .process import * from .fparq import fparq +from .process import aggregate_gpfs_dataset, calculate_churn + +__all__ = [fparq, aggregate_gpfs_dataset, calculate_churn] diff --git a/src/rc_gpfs/process/fparq.py b/src/rc_gpfs/process/fparq.py index 8f9cbb1145313110530d7dad23c183eda68bfdb6..c96bd57b30ed824239b27fc2ae12437230cdec37 100644 --- a/src/rc_gpfs/process/fparq.py +++ b/src/rc_gpfs/process/fparq.py @@ -1,32 +1,41 @@ -import re from pathlib import Path -import polars as pl from typing import List -from ..utils import as_path, as_bytes +import polars as pl + +from ..utils import as_bytes, as_path + -def _write_partition(files: List[str], fname: str | Path, delim: str = r'\0'): - with open(fname, 'w') as f: +def _write_partition(files: List[str], fname: str | Path, delim: str = r"\0"): + with open(fname, "w") as f: f.write(delim.join(files)) + def write_partitions(df: pl.DataFrame, partition_path: str, partition_prefix: str): - dig = len(str(df['partition'].max())) - df = df.with_columns(pl.col('partition').cast(str).str.zfill(dig)) - for ind, grp in df.group_by('partition'): - files = grp['path'].to_list() - fname = Path(partition_path).joinpath(f'{partition_prefix}-{ind[0]}') - _write_partition(files,fname) - -def fpart(sizes: List[int], max_part_files: int, max_part_size: int, max_file_size: int, init_grp: int = 0): + dig = len(str(df["partition"].max())) + df = df.with_columns(pl.col("partition").cast(str).str.zfill(dig)) + for ind, grp in df.group_by("partition"): + files = grp["path"].to_list() + fname = Path(partition_path).joinpath(f"{partition_prefix}-{ind[0]}") + _write_partition(files, fname) + + +def fpart( + sizes: List[int], + max_part_files: int, + max_part_size: int, + max_file_size: int, + init_grp: int = 0, +): group = init_grp cumsum = 0 cur_grp_size = 0 groups = [] - for i,value in enumerate(sizes): + for i, value in enumerate(sizes): if cur_grp_size > max_part_files: group += 1 - + if value > max_file_size: if i > 0: group += 1 @@ -38,24 +47,26 @@ def fpart(sizes: List[int], max_part_files: int, max_part_size: int, max_file_si group += 1 cumsum = value groups.append(group) - + return groups + def fparq( - df: pl.DataFrame, - partition_path: str | Path, - partition_prefix: str = 'part', - max_part_size: str | int | None = '50GiB', - max_part_files: int | None = None, - tiny_size: str | int | None = '4kiB', - max_tiny_part_size: str | int | None = '1GiB', - max_tiny_part_files: int | None = 250000, - big_size: str | int | float | None = None, - exclude_nonstandard: bool = False, - ret_df: bool = False, - **kwargs + df: pl.DataFrame, + partition_path: str | Path, + partition_prefix: str = "part", + max_part_size: str | int | None = "50GiB", + max_part_files: int | None = None, + tiny_size: str | int | None = "4kiB", + max_tiny_part_size: str | int | None = "1GiB", + max_tiny_part_files: int | None = 250000, + big_size: str | int | float | None = None, + exclude_nonstandard: bool = False, + ret_df: bool = False, + **kwargs, ): - """ + ( + """ Replicates GNU fpart's file partitioning on GPFS policy lists in parquet datasets for use in sync tools such as rsync. This takes in a dataframe of file sizes in bytes indexed by their full file path. Files in the dataframe are sequentially assigned to a partition until the sum of their size reaches a specified threshold or number of files. This behaves identically to fpart's live mode and is the only supported mode. Files can also be classified as either 'tiny' or 'big' and partitioned separately from the rest of the dataset. Tiny files can cause performance issues where the overhead of transferring individual files exceeds the time to transfer the file data itself. Instead, tiny files can be archived in a tarball prior to sync and transferred all at once. Transferring very large files can also be slow when done in a single chunk/thread. Instead, big files will be assigned each to their own partition to help facilitate chunking and parallel transfer. @@ -96,74 +107,74 @@ def fparq( ------ ValueError If both max_part_size and max_part_files are None, a ValueError is raised - """"""""" + """ + """""" + ) if max_part_files is None and max_part_size is None: raise ValueError("At least one of max_part_files or max_part_size must be set") partition_path = as_path(partition_path) - partition_path.mkdir(exist_ok=True,parents=True) + partition_path.mkdir(exist_ok=True, parents=True) max_part_files = float("inf") if max_part_files is None else max_part_files max_part_size = as_bytes(max_part_size, None) tiny_size = as_bytes(tiny_size, 0) - max_tiny_part_size = as_bytes(max_tiny_part_size,1024**3) + max_tiny_part_size = as_bytes(max_tiny_part_size, 1024**3) big_size = as_bytes(big_size, max_part_size) if tiny_size == 0: breaks = [big_size] - labels = ['standard','big'] + labels = ["standard", "big"] else: breaks = [tiny_size, big_size] - labels = ['tiny','standard','big'] + labels = ["tiny", "standard", "big"] df = df.with_columns( - pl.col('size') - .cut( - breaks = breaks, - labels = labels, - left_closed=True - ) - .alias('size_grp') + pl.col("size") + .cut(breaks=breaks, labels=labels, left_closed=True) + .alias("size_grp") ) - if 'tiny' in labels: - tiny = df.filter(pl.col('size_grp').eq('tiny')) + if "tiny" in labels: + tiny = df.filter(pl.col("size_grp").eq("tiny")) else: tiny = pl.DataFrame() - big = df.filter(pl.col('size_grp').eq('big')) - df = df.filter(pl.col('size_grp').eq('standard')) + big = df.filter(pl.col("size_grp").eq("big")) + df = df.filter(pl.col("size_grp").eq("standard")) df = df.with_columns( pl.Series( - name='partition', - values = fpart(df['size'].to_list(), max_part_files, max_part_size, big_size) + name="partition", + values=fpart(df["size"].to_list(), max_part_files, max_part_size, big_size), ) ) if not exclude_nonstandard: if not tiny.is_empty(): - init_grp = df['partition'].max() + 1 + init_grp = df["partition"].max() + 1 tiny = tiny.with_columns( pl.Series( - name='partition', + name="partition", values=fpart( - tiny['size'].to_list(), + tiny["size"].to_list(), max_tiny_part_files, max_tiny_part_size, tiny_size, - init_grp=init_grp - ) + init_grp=init_grp, + ), ) ) - df = pl.concat([df,tiny]) + df = pl.concat([df, tiny]) if not big.is_empty(): - init_grp = df['partition'].max() + 1 + init_grp = df["partition"].max() + 1 big = big.with_columns(partition=range(init_grp, init_grp + big.shape[0])) - df = pl.concat([df,big]) + df = pl.concat([df, big]) - write_partitions(df, partition_path=partition_path, partition_prefix=partition_prefix) + write_partitions( + df, partition_path=partition_path, partition_prefix=partition_prefix + ) return df if ret_df else None diff --git a/src/rc_gpfs/process/process.py b/src/rc_gpfs/process/process.py index 76abf70f2d0f7464a42990167c0b4aebb1a0a429..6d0a3c39025468812157114cdcf73a4b7db2a354 100644 --- a/src/rc_gpfs/process/process.py +++ b/src/rc_gpfs/process/process.py @@ -1,20 +1,22 @@ from pathlib import Path -import polars as pl +from typing import List, Literal + import numpy as np -from typing import Literal, List +import polars as pl from typeguard import typechecked from ..db.utils import CHURN_TBL_COLS from ..utils import as_path, prep_age_distribution, prep_size_distribution -__all__ = ['aggregate_gpfs_dataset','calculate_churn'] +__all__ = ["aggregate_gpfs_dataset", "calculate_churn"] -def _check_dataset_path(dataset_path,file_glob) -> Path: + +def _check_dataset_path(dataset_path, file_glob) -> Path: dataset_path = as_path(dataset_path).resolve() if not dataset_path.exists(): raise FileNotFoundError(f"{dataset_path} does not exist") - + if dataset_path.is_file(): if file_glob is not None: print(f"INFO: {dataset_path} is a file, ignoring glob.") @@ -23,216 +25,218 @@ def _check_dataset_path(dataset_path,file_glob) -> Path: elif dataset_path.is_dir(): n_files = len(list(dataset_path.glob(file_glob))) if n_files == 0: - raise FileNotFoundError(f"No parquet files were found in {dataset_path.joinpath(file_glob)}") + raise FileNotFoundError( + f"No parquet files were found in {dataset_path.joinpath(file_glob)}" + ) dataset_path = dataset_path.joinpath(file_glob) - + print(f"INFO: {n_files} parquet file(s) found in {dataset_path}") - + return dataset_path -def _check_timedelta_values(vals,unit): + +def _check_timedelta_values(vals, unit): if (vals is None) != (unit is None): - print(f"WARNING: time_breakpoints and time_unit were not both set. Skipping age aggregation") + print( + "WARNING: time_breakpoints and time_unit were not both set. Skipping age aggregation" + ) vals = None unit = None - return vals,unit + return vals, unit + @typechecked def aggregate_gpfs_dataset( df: pl.DataFrame | pl.LazyFrame, acq: str | np.datetime64, time_breakpoints: int | List[int] | None = [30, 60, 90, 180], - time_unit: Literal['D','W','M','Y'] | None = 'D', - time_val: Literal['access','modify','create'] | None = 'access', - size_breakpoints: int | str | List[int | str] | None = ['4 kiB','4 MiB','1 GiB','10 GiB','100 GiB','1 TiB'], - **kwargs + time_unit: Literal["D", "W", "M", "Y"] | None = "D", + time_val: Literal["access", "modify", "create"] | None = "access", + size_breakpoints: int | str | List[int | str] | None = [ + "4 kiB", + "4 MiB", + "1 GiB", + "10 GiB", + "100 GiB", + "1 TiB", + ], + **kwargs, ) -> pl.DataFrame: - # Cast to LazyFrame to optimize aggregation. Is idempotent, will not affect a passed LazyFrame df = df.lazy() # Input checking - time_breakpoints,time_unit = _check_timedelta_values(time_breakpoints,time_unit) - - grps = ['tld'] + time_breakpoints, time_unit = _check_timedelta_values(time_breakpoints, time_unit) + + grps = ["tld"] if time_breakpoints is not None: - age_bins, age_labels = prep_age_distribution(acq,time_breakpoints,time_unit) - - df = ( - df - .with_columns( - pl.col(time_val) - .cut(breaks=age_bins,labels=age_labels) - .cast(pl.String) - .cast(pl.Enum(age_labels)) - .alias('dt_grp') - ) + age_bins, age_labels = prep_age_distribution(acq, time_breakpoints, time_unit) + + df = df.with_columns( + pl.col(time_val) + .cut(breaks=age_bins, labels=age_labels) + .cast(pl.String) + .cast(pl.Enum(age_labels)) + .alias("dt_grp") ) - grps.append('dt_grp') + grps.append("dt_grp") if size_breakpoints is not None: size_bins, size_labels = prep_size_distribution(size_breakpoints) - df = ( - df - .with_columns( - pl.col('size') - .cut(breaks=size_bins,labels=size_labels) - .cast(pl.String) - .cast(pl.Enum(size_labels)) - .alias('size_grp') - ) + df = df.with_columns( + pl.col("size") + .cut(breaks=size_bins, labels=size_labels) + .cast(pl.String) + .cast(pl.Enum(size_labels)) + .alias("size_grp") ) - grps.append('size_grp') - + grps.append("size_grp") + df_agg = ( - df - .group_by(grps) - .agg([ - pl.col('size').sum().alias('bytes'), - pl.col('size').count().alias('file_count') - ]) + df.group_by(grps) + .agg( + [ + pl.col("size").sum().alias("bytes"), + pl.col("size").count().alias("file_count"), + ] + ) .sort(grps) - .collect(engine='streaming') + .collect(engine="streaming") ) - + return df_agg - + + @typechecked def calculate_churn( - hive_path: str | Path, - tld: str, - acq_dates: List[ np.datetime64 | str ], - **kwargs + hive_path: str | Path, tld: str, acq_dates: List[np.datetime64 | str], **kwargs ) -> pl.DataFrame: - - acq_dates = [np.datetime_as_string(d,'D') if isinstance(d,np.datetime64) else d for d in acq_dates] + acq_dates = [ + np.datetime_as_string(d, "D") if isinstance(d, np.datetime64) else d + for d in acq_dates + ] acq_dates.sort() ## Input checking hive_path = as_path(hive_path) dataset_path = hive_path.joinpath(f"tld={tld}") - acq_dirs = [d.name.removeprefix('acq=') for d in dataset_path.glob("acq=*")] + acq_dirs = [d.name.removeprefix("acq=") for d in dataset_path.glob("acq=*")] # remove any datetimes for which the given tld does not have data. acq_dates = [d for d in acq_dates if d in acq_dirs] if len(acq_dates) <= 1: - raise ValueError(f"Fewer than two given policy acquisition dates contained data for {tld} in {hive_path}.") + raise ValueError( + f"Fewer than two given policy acquisition dates contained data for {tld} in {hive_path}." + ) churn_l = [] - + df_init = ( - pl.scan_parquet( - dataset_path.joinpath('**/*.parquet'), + pl.scan_parquet( + dataset_path.joinpath("**/*.parquet"), hive_partitioning=True, - hive_schema=pl.Schema({'tld':pl.String,'acq':pl.String}), - parallel='prefiltered' + hive_schema=pl.Schema({"tld": pl.String, "acq": pl.String}), + parallel="prefiltered", ) - .filter(pl.col('tld').eq(tld), pl.col('acq').eq(acq_dates[0])) - .select(['path','modify','size','access']) - ).collect(engine='streaming') - - for i in range(1,len(acq_dates)): + .filter(pl.col("tld").eq(tld), pl.col("acq").eq(acq_dates[0])) + .select(["path", "modify", "size", "access"]) + ).collect(engine="streaming") + + for i in range(1, len(acq_dates)): df_target = ( pl.scan_parquet( - dataset_path.joinpath('**/*.parquet'), + dataset_path.joinpath("**/*.parquet"), hive_partitioning=True, - hive_schema=pl.Schema({'tld':pl.String,'acq':pl.String}), - parallel='prefiltered' - ) - .filter(pl.col('tld').eq(tld), pl.col('acq').eq(acq_dates[i])) - .select(['path','modify','size','access']) - ).collect(engine='streaming') - - churn = ( - _calculate_churn(df_init,df_target) - .with_columns( - pl.lit(acq_dates[i]).alias('log_dt'), - pl.lit(acq_dates[i-1]).alias('prior_log_dt'), - pl.lit(tld).alias('tld') + hive_schema=pl.Schema({"tld": pl.String, "acq": pl.String}), + parallel="prefiltered", ) + .filter(pl.col("tld").eq(tld), pl.col("acq").eq(acq_dates[i])) + .select(["path", "modify", "size", "access"]) + ).collect(engine="streaming") + + churn = _calculate_churn(df_init, df_target).with_columns( + pl.lit(acq_dates[i]).alias("log_dt"), + pl.lit(acq_dates[i - 1]).alias("prior_log_dt"), + pl.lit(tld).alias("tld"), ) churn_l.append(churn) - # This delete pattern paired with the loop creates a type of rotating list where each dataframe, aside from - # the initial and final, is processed as the target and the source for which files exist at a given time. - # The target is then referred to as the source as we move through the time series. Each source is removed - # from memory. This both limits the amount of memory used to only two datasets at a time while also + # This delete pattern paired with the loop creates a type of rotating list where each dataframe, aside from + # the initial and final, is processed as the target and the source for which files exist at a given time. + # The target is then referred to as the source as we move through the time series. Each source is removed + # from memory. This both limits the amount of memory used to only two datasets at a time while also # only reading each dataset once. del df_init - df_init = df_target + df_init = df_target # noqa: F841 del df_target - + churn_df = pl.concat(churn_l) - + return churn_df -def _calculate_churn(df1,df2) -> None: - empty_df = pl.DataFrame(data = {'name':CHURN_TBL_COLS,'value':0}) + +def _calculate_churn(df1, df2) -> None: + empty_df = pl.DataFrame(data={"name": CHURN_TBL_COLS, "value": 0}) if df1.equals(df2): - return empty_df.transpose(column_names='name') + return empty_df.transpose(column_names="name") - dfm = df1.join(df2,how='full',on='path',suffix='_updated',coalesce=True) + dfm = df1.join(df2, how="full", on="path", suffix="_updated", coalesce=True) conditions = [ - dfm['access'].is_null(), - dfm['access_updated'].is_null(), - (dfm['modify'] != dfm['modify_updated']).fill_null(False), - (dfm['access'] != dfm['access_updated']).fill_null(False) + dfm["access"].is_null(), + dfm["access_updated"].is_null(), + (dfm["modify"] != dfm["modify_updated"]).fill_null(False), + (dfm["access"] != dfm["access_updated"]).fill_null(False), ] - choices = ['created','deleted','modified','accessed'] + choices = ["created", "deleted", "modified", "accessed"] - dfm = ( - dfm.with_columns( - pl.Series( - name='type', - values=np.select(conditions,choices,default='unchanged'), - dtype=pl.Categorical - ) + dfm = dfm.with_columns( + pl.Series( + name="type", + values=np.select(conditions, choices, default="unchanged"), + dtype=pl.Categorical, ) ) - dfm = ( - dfm - .filter(pl.col('type').ne('unchanged')) - .drop(['modify','access','modify_updated','access_updated']) + dfm = dfm.filter(pl.col("type").ne("unchanged")).drop( + ["modify", "access", "modify_updated", "access_updated"] ) - modified = dfm.filter(pl.col('type').eq('modified')) - modified_bytes_net = modified['size_updated'].sum() - modified['size'].sum() - - # Instead of writing logic to aggregate across initial size for deleted files and final size for all other - # files, we can essentially condense size across both columns into a new column. Size of deleted files will + modified = dfm.filter(pl.col("type").eq("modified")) + modified_bytes_net = modified["size_updated"].sum() - modified["size"].sum() + + # Instead of writing logic to aggregate across initial size for deleted files and final size for all other + # files, we can essentially condense size across both columns into a new column. Size of deleted files will # come from size while all other files will come from size_updated. dfm = ( - dfm - .with_columns( - pl.col('size_updated').fill_null(pl.col('size')) - ) - .drop('size') - .rename({'size_updated':'size'}) + dfm.with_columns(pl.col("size_updated").fill_null(pl.col("size"))) + .drop("size") + .rename({"size_updated": "size"}) ) agg_df = ( - dfm - .group_by('type') - .agg([ - pl.sum('size').alias('bytes'), - pl.count('size').alias('files') - ]) - .unpivot(on=['bytes','files'],index='type') - .with_columns(pl.concat_str([pl.col('type'),pl.col('variable')],separator='_').alias('name')) - .with_columns(pl.col('name').str.strip_suffix('_files')) - .select(['name','value']) - .extend(pl.DataFrame({'name':'modified_bytes_net','value':modified_bytes_net})) - .join(empty_df.select('name'), on='name', how='right') + dfm.group_by("type") + .agg([pl.sum("size").alias("bytes"), pl.count("size").alias("files")]) + .unpivot(on=["bytes", "files"], index="type") + .with_columns( + pl.concat_str([pl.col("type"), pl.col("variable")], separator="_").alias( + "name" + ) + ) + .with_columns(pl.col("name").str.strip_suffix("_files")) + .select(["name", "value"]) + .extend( + pl.DataFrame({"name": "modified_bytes_net", "value": modified_bytes_net}) + ) + .join(empty_df.select("name"), on="name", how="right") .fill_null(0) - .transpose(column_names='name') + .transpose(column_names="name") ) - return agg_df \ No newline at end of file + return agg_df diff --git a/src/rc_gpfs/process/utils.py b/src/rc_gpfs/process/utils.py index daf8d2b69c3e97008e83d14d60ec380d3e3c8148..5c747f8c250c36afbfef3748d8f4d49e6bd3072f 100644 --- a/src/rc_gpfs/process/utils.py +++ b/src/rc_gpfs/process/utils.py @@ -1,17 +1,19 @@ import re from pathlib import Path + from numpy import datetime64 + def extract_run_date_from_filename( - path: str | Path, - pattern: str = r'[\d]{4}-[\d]{2}-[\d]{2}' - ) -> datetime64: - if isinstance(path,Path): + path: str | Path, pattern: str = r"[\d]{4}-[\d]{2}-[\d]{2}" +) -> datetime64: + if isinstance(path, Path): path = str(path.absolute()) - + try: - run_date = re.search(pattern,path).group() + run_date = re.search(pattern, path).group() return run_date - except: - raise ValueError("No acquisition date was passed or could be inferred from the dataset path. Is the date formatted as YYYY-MM-DD?") - \ No newline at end of file + except AttributeError: + raise ValueError( + "No acquisition date was passed or could be inferred from the dataset path. Is the date formatted as YYYY-MM-DD?" + ) diff --git a/src/rc_gpfs/report/plotting.py b/src/rc_gpfs/report/plotting.py index a316b08d2dfea906b9f211e7fa00527d58e84f04..c387d17836fca4632da065957cdf14e8964350da 100644 --- a/src/rc_gpfs/report/plotting.py +++ b/src/rc_gpfs/report/plotting.py @@ -1,45 +1,67 @@ -import polars as pl -from plotly.graph_objects import Figure import plotly.graph_objects as go +import polars as pl from plotly import colors +from plotly.graph_objects import Figure + +__all__ = ["bar_plot", "pareto_chart"] -__all__ = ['bar_plot','pareto_chart'] -def choose_appropriate_storage_unit(size,starting_unit='B'): +def choose_appropriate_storage_unit(size, starting_unit="B"): if hasattr(size, "__len__"): size = size.max() - + try: - units = ['B','kiB','MiB','GiB','TiB'] - units_b10 = ['kB','MB','GB','TB'] + units = ["B", "kiB", "MiB", "GiB", "TiB"] + units_b10 = ["kB", "MB", "GB", "TB"] if starting_unit in units_b10: - starting_unit = starting_unit.replace('B','iB') + starting_unit = starting_unit.replace("B", "iB") # add logging message here saying the specified base 10 unit is being interpreted as base 2 exp = units.index(starting_unit) - except (ValueError): - raise(f"{starting_unit} is not a valid storage unit. Choose from 'B','kB','MB','GB', or 'TB'") - - while ((size/1024) >= 1) & (exp < 4): - size = size/1024 + except ValueError: + raise ( + f"{starting_unit} is not a valid storage unit. Choose from 'B','kB','MB','GB', or 'TB'" + ) + + while ((size / 1024) >= 1) & (exp < 4): + size = size / 1024 exp += 1 - return exp,units[exp] + return exp, units[exp] + -def _format_number(num,dec=2): +def _format_number(num, dec=2): return f"{num:,.{dec}f}" -def bar_plot(df,x,y,show_legend=True,legend_labels=None,add_text=True,textposition=None,text_decimals=2, - group_colors=colors.qualitative.Plotly,title=None,xlabel=None,ylabel=None,enable_text_hover=False) -> Figure: - if not isinstance(y,list): + +def bar_plot( + df, + x, + y, + show_legend=True, + legend_labels=None, + add_text=True, + textposition=None, + text_decimals=2, + group_colors=colors.qualitative.Plotly, + title=None, + xlabel=None, + ylabel=None, + enable_text_hover=False, +) -> Figure: + if not isinstance(y, list): y = [y] if show_legend and legend_labels is None: legend_labels = y - + textposition = textposition if add_text else None - + fig = go.Figure() - for idx,c in enumerate(y): - text = df[c].map_elements(lambda x: _format_number(x,dec=text_decimals)) if add_text else None + for idx, c in enumerate(y): + text = ( + df[c].map_elements(lambda x: _format_number(x, dec=text_decimals)) + if add_text + else None + ) fig.add_bar( x=df[x], y=df[c], @@ -47,56 +69,70 @@ def bar_plot(df,x,y,show_legend=True,legend_labels=None,add_text=True,textpositi textposition=textposition, name=legend_labels[idx], marker_color=group_colors[idx], - uid=idx + uid=idx, ) # If plotting multiple traces, make some updates to the layout if len(y) > 1: for idx in range(len(y)): - fig.update_traces( - patch = {'offsetgroup':idx}, - selector = {'uid':idx} - ) - + fig.update_traces(patch={"offsetgroup": idx}, selector={"uid": idx}) + fig.update_layout( - barmode='group', # Grouped bar chart - bargap=0.3 + barmode="group", # Grouped bar chart + bargap=0.3, ) fig.update_layout( title_text=title, title_x=0.5, - title_xanchor='center', - title_font_size = 24, + title_xanchor="center", + title_font_size=24, xaxis_title=xlabel, yaxis_title=ylabel, margin=dict(t=100, b=20, l=40, r=40), - template='plotly_white', - hovermode=enable_text_hover + template="plotly_white", + hovermode=enable_text_hover, ) return fig -def pareto_chart(df, x, y, csum_col=None, show_legend=True, legend_labels=['Raw','Cumulative'], add_text=True, - textposition_bar=None, textposition_scatter=None, text_decimals=2, - group_colors=colors.qualitative.Plotly, title=None,xlabel=None,ylabel=None,enable_text_hover=False) -> Figure: + +def pareto_chart( + df, + x, + y, + csum_col=None, + show_legend=True, + legend_labels=["Raw", "Cumulative"], + add_text=True, + textposition_bar=None, + textposition_scatter=None, + text_decimals=2, + group_colors=colors.qualitative.Plotly, + title=None, + xlabel=None, + ylabel=None, + enable_text_hover=False, +) -> Figure: df_ = df.clone() if csum_col is None: - csum_col = f'{y}_cumsum' + csum_col = f"{y}_cumsum" df_ = df_.with_columns(pl.col(y).cum_sum().alias(csum_col)) - + if show_legend and legend_labels is None: - legend_labels = [y,csum_col] - + legend_labels = [y, csum_col] + if add_text and textposition_bar is None: - textposition_bar = 'outside' - + textposition_bar = "outside" + if add_text and textposition_scatter is None: - textposition_scatter = 'top center' - + textposition_scatter = "top center" + fig = go.Figure() if add_text: - bar_text = df_[y].map_elements(lambda x: _format_number(x,dec=text_decimals),return_dtype=pl.String) + bar_text = df_[y].map_elements( + lambda x: _format_number(x, dec=text_decimals), return_dtype=pl.String + ) else: bar_text = None fig.add_bar( @@ -105,41 +141,37 @@ def pareto_chart(df, x, y, csum_col=None, show_legend=True, legend_labels=['Raw' text=bar_text, textposition=textposition_bar, name=legend_labels[0], - marker_color=group_colors[0] + marker_color=group_colors[0], ) if add_text: - scatter_text = ( - df_[csum_col] - .map_elements( - lambda x: _format_number(x,dec=text_decimals), - return_dtype=pl.String - ) + scatter_text = df_[csum_col].map_elements( + lambda x: _format_number(x, dec=text_decimals), return_dtype=pl.String ) scatter_text[0] = None else: scatter_text = None - + fig.add_scatter( x=df_[x], y=df_[csum_col], text=scatter_text, textposition=textposition_scatter, name=legend_labels[1], - mode='lines+markers+text', - marker_color=group_colors[1] + mode="lines+markers+text", + marker_color=group_colors[1], ) fig.update_layout( title_text=title, title_x=0.5, - title_xanchor='center', - title_font_size = 24, + title_xanchor="center", + title_font_size=24, xaxis_title=xlabel, yaxis_title=ylabel, margin=dict(t=100, b=20, l=40, r=40), - template='plotly_white', - hovermode=enable_text_hover + template="plotly_white", + hovermode=enable_text_hover, ) - return fig \ No newline at end of file + return fig diff --git a/src/rc_gpfs/utils/__init__.py b/src/rc_gpfs/utils/__init__.py index e9e9fd4d3c451e2f9ed8ee5368c6618cffb7da56..fe2452bbe9b4d3eacc5c42a9595b4615afa2e375 100644 --- a/src/rc_gpfs/utils/__init__.py +++ b/src/rc_gpfs/utils/__init__.py @@ -1,3 +1,3 @@ -from .core import * -from .datetime import * -from .units import * \ No newline at end of file +from .core import * # noqa: F403 +from .datetime import * # noqa: F403 +from .units import * # noqa: F403 \ No newline at end of file diff --git a/src/rc_gpfs/utils/core.py b/src/rc_gpfs/utils/core.py index d62115ababdfc742f6da29b8552becbaae90f107..3638ebc488b43d4bbdf08d5c9b850693d582cf2b 100644 --- a/src/rc_gpfs/utils/core.py +++ b/src/rc_gpfs/utils/core.py @@ -3,111 +3,127 @@ import re import subprocess from pathlib import Path from typing import List, Literal, Tuple + +import numpy as np import polars as pl import pyarrow.parquet as pq -import numpy as np +from .datetime import as_datetime, create_timedelta_breakpoints, create_timedelta_labels from .units import as_bytes, convert_si, create_size_bin_labels -from .datetime import as_datetime,create_timedelta_breakpoints,create_timedelta_labels + def parse_scontrol(): - job_id = os.getenv('SLURM_JOB_ID') + job_id = os.getenv("SLURM_JOB_ID") command = f"scontrol show job {job_id} | grep TRES=" - result = subprocess.run(command, shell=True, capture_output=True, text=True).stdout.strip() + result = subprocess.run( + command, shell=True, capture_output=True, text=True + ).stdout.strip() + + tres_pattern = r".*cpu=(?P<cores>[\d]+),mem=(?P<mem>[\d]+[KMGT]?).*" + cores, mem = re.search(tres_pattern, result).groupdict().values() - tres_pattern=r'.*cpu=(?P<cores>[\d]+),mem=(?P<mem>[\d]+[KMGT]?).*' - cores,mem = re.search(tres_pattern,result).groupdict().values() - cores = int(cores) - mem = convert_si(mem,to_unit='G',use_binary=True) - return [cores,mem] + mem = convert_si(mem, to_unit="G", use_binary=True) + return [cores, mem] + def as_path(s: str | Path) -> Path: - if not isinstance(s,Path): + if not isinstance(s, Path): s = Path(s) return s + def prep_size_distribution( - size_bins: int | str | List[int | str] = ['4 kiB','4 MiB','1 GiB','10 GiB','100 GiB','1 TiB'], - **kwargs -) -> Tuple[List[int],List[str]]: - if not isinstance(size_bins,list): + size_bins: int | str | List[int | str] = [ + "4 kiB", + "4 MiB", + "1 GiB", + "10 GiB", + "100 GiB", + "1 TiB", + ], + **kwargs, +) -> Tuple[List[int], List[str]]: + if not isinstance(size_bins, list): size_bins = [size_bins] - size_bins = [as_bytes(s) if isinstance(s,str) else s for s in size_bins] + size_bins = [as_bytes(s) if isinstance(s, str) else s for s in size_bins] size_bins = list(set(size_bins)) - size_bins.sort() # Sorts and removes any duplicates - size_bins = [s for s in size_bins if s > 0] # Removes 0, as it will be implicit as the left-most break point - + size_bins.sort() # Sorts and removes any duplicates + size_bins = [ + s for s in size_bins if s > 0 + ] # Removes 0, as it will be implicit as the left-most break point + size_labels = create_size_bin_labels(size_bins) - return size_bins,size_labels + return size_bins, size_labels + def calculate_size_distribution( - sizes: pl.Series, - size_bins: int | str | List[int | str] = ['4 kiB','4 MiB','1 GiB','10 GiB','100 GiB','1 TiB'], - **kwargs + sizes: pl.Series, + size_bins: int | str | List[int | str] = [ + "4 kiB", + "4 MiB", + "1 GiB", + "10 GiB", + "100 GiB", + "1 TiB", + ], + **kwargs, ) -> pl.Series: - - size_bins,size_labels = prep_size_distribution(size_bins) + size_bins, size_labels = prep_size_distribution(size_bins) size_grps = ( - sizes - .cut( - breaks=size_bins, - labels=size_labels, - **kwargs - ) + sizes.cut(breaks=size_bins, labels=size_labels, **kwargs) .cast(pl.String) .cast(pl.Enum(size_labels)) ) return size_grps + def prep_age_distribution( - acq: str | np.datetime64, - age_breakpoints: int | List[int], - time_unit: Literal['D','W'] -) -> Tuple[List[np.datetime64],List[str]]: - if not isinstance(age_breakpoints,list): + acq: str | np.datetime64, + age_breakpoints: int | List[int], + time_unit: Literal["D", "W"], +) -> Tuple[List[np.datetime64], List[str]]: + if not isinstance(age_breakpoints, list): age_breakpoints = [age_breakpoints] else: age_breakpoints = list(set(age_breakpoints)) - + age_breakpoints.sort() age_breakpoints = [t for t in age_breakpoints if t > 0] # Create age bin labels before converting to duration for easier parsing - age_labels = create_timedelta_labels(age_breakpoints,time_unit) + age_labels = create_timedelta_labels(age_breakpoints, time_unit) # # Create age bins by subtracting the number of days from the date - age_breakpoints = create_timedelta_breakpoints(as_datetime(acq),age_breakpoints,time_unit) + age_breakpoints = create_timedelta_breakpoints( + as_datetime(acq), age_breakpoints, time_unit + ) + + return age_breakpoints, age_labels - return age_breakpoints,age_labels def calculate_age_distribution( - timestamps: pl.Series, - acq: str | np.datetime64, - age_breakpoints: List[ int ] = [30,60,90,180], - time_unit: Literal['D','W'] = 'D', - **kwargs + timestamps: pl.Series, + acq: str | np.datetime64, + age_breakpoints: List[int] = [30, 60, 90, 180], + time_unit: Literal["D", "W"] = "D", + **kwargs, ) -> pl.Series: - age_breakpoints, age_labels = prep_age_distribution(acq, age_breakpoints, time_unit) - + age_grps = ( - timestamps - .cut( - breaks=age_breakpoints, - labels=age_labels, - **kwargs - ) + timestamps.cut(breaks=age_breakpoints, labels=age_labels, **kwargs) .cast(pl.String) .cast(pl.Enum(age_labels)) ) return age_grps + def get_parquet_dataset_size(parquet_path): tot_size = 0 @@ -115,5 +131,5 @@ def get_parquet_dataset_size(parquet_path): md = pq.read_metadata(p) for rg in range(0, md.num_row_groups): tot_size += md.row_group(rg).total_byte_size - - return tot_size \ No newline at end of file + + return tot_size diff --git a/src/rc_gpfs/utils/datetime.py b/src/rc_gpfs/utils/datetime.py index 2f5067adb35e18c73b1ce40b22f1a4464aa32b37..d460f3755117bd6001f0b581bfe26c7482a5bc7a 100644 --- a/src/rc_gpfs/utils/datetime.py +++ b/src/rc_gpfs/utils/datetime.py @@ -1,45 +1,51 @@ -from typing import Literal, List +from typing import List, Literal + import numpy as np + # As the logic changed from revision to revision, these are solely transparent wrappers for numpy's datetime and # timedelta conversions. These are most likely unnecessary and can be removed at a later date -def as_datetime( - date: str | np.datetime64 - ) -> np.datetime64: - return np.datetime64(date,'ns') if isinstance(date,str) else date - -def as_timedelta( - val: int, - unit: Literal['D','W'] - ) -> np.timedelta64: +def as_datetime(date: str | np.datetime64) -> np.datetime64: + return np.datetime64(date, "ns") if isinstance(date, str) else date + + +def as_timedelta(val: int, unit: Literal["D", "W"]) -> np.timedelta64: return np.timedelta64(val, unit) + def create_timedelta_breakpoints( run_date: str | np.datetime64, delta_vals: int | List[int], - delta_unit: Literal['D','W'] + delta_unit: Literal["D", "W"], ) -> List[np.datetime64]: - if not isinstance(delta_vals,list): - delta_vals=[delta_vals] + if not isinstance(delta_vals, list): + delta_vals = [delta_vals] run_date = as_datetime(run_date) - return [run_date - as_timedelta(c,delta_unit) for c in delta_vals if c > 0] + return [run_date - as_timedelta(c, delta_unit) for c in delta_vals if c > 0] + def create_timedelta_labels( delta_vals: int | List[int], - delta_unit: Literal['D','W'], + delta_unit: Literal["D", "W"], ) -> List[str]: - if not isinstance(delta_vals,list): - delta_vals=[delta_vals] - + if not isinstance(delta_vals, list): + delta_vals = [delta_vals] + delta_vals = [v for v in delta_vals if v > 0] if len(delta_vals) == 0: - raise ValueError('Passed delta_vals are all less than or equal to 0. delta_vals must be positive integers') + raise ValueError( + "Passed delta_vals are all less than or equal to 0. delta_vals must be positive integers" + ) delta_vals.sort(reverse=True) - deltas = [f'{d}{delta_unit}' for d in delta_vals] + deltas = [f"{d}{delta_unit}" for d in delta_vals] if len(delta_vals) > 1: - labels = [f'>{deltas[0]}'] + [f'{deltas[i+1]}-{deltas[i]}' for i in range(len(deltas)-1)] + [f'<{deltas[-1]}'] + labels = ( + [f">{deltas[0]}"] + + [f"{deltas[i + 1]}-{deltas[i]}" for i in range(len(deltas) - 1)] + + [f"<{deltas[-1]}"] + ) else: - labels = [f'>{deltas[0]}'] + [f'<{deltas[0]}'] - return labels \ No newline at end of file + labels = [f">{deltas[0]}"] + [f"<{deltas[0]}"] + return labels diff --git a/src/rc_gpfs/utils/units.py b/src/rc_gpfs/utils/units.py index 01e3372e181d7698a2b31ac718a8f94aced22dad..dc2b9b06fccee5fde706efed01b4fc413fcac3b8 100644 --- a/src/rc_gpfs/utils/units.py +++ b/src/rc_gpfs/utils/units.py @@ -1,12 +1,15 @@ import re +from typing import List, Literal + import numpy as np -from typing import Literal, List + # ENH: if this package becomes merged with noctua, need to replace this function since it's copied directly from there -def convert_si(value: str | float | int, - unit: Literal['base','K','M','G','T'] | None = None, - to_unit: Literal['base','K','M','G','T'] = 'base', - use_binary: bool = False +def convert_si( + value: str | float | int, + unit: Literal["base", "K", "M", "G", "T"] | None = None, + to_unit: Literal["base", "K", "M", "G", "T"] = "base", + use_binary: bool = False, ) -> float: """_summary_ @@ -31,76 +34,72 @@ def convert_si(value: str | float | int, # Unit multipliers unit_multipliers = { - 'base': 1, - 'K': factor, - 'M': factor ** 2, - 'G': factor ** 3, - 'T': factor ** 4, + "base": 1, + "K": factor, + "M": factor**2, + "G": factor**3, + "T": factor**4, } # If value is a string, extract the number and the unit if isinstance(value, str): # Extract numeric part and unit part value = value.strip() - for suffix in ['K', 'M', 'G', 'T']: + for suffix in ["K", "M", "G", "T"]: if value.upper().endswith(suffix): unit = suffix value = value[:-1] break else: - unit = 'base' + unit = "base" value = float(value) - + # If value is numeric, use the provided unit or default to 'base' if unit is None: - unit = 'base' + unit = "base" # Convert the input value to base base_value = value * unit_multipliers[unit] # Convert base value to the target unit converted_value = base_value / unit_multipliers[to_unit] - + return converted_value -def as_bytes( - val: str | int | None, - default: int | None = None -) -> int | None: - if isinstance(val,str): - val = re.sub(r'[iB]*$','',val) - val = convert_si(val,use_binary=True) + +def as_bytes(val: str | int | None, default: int | None = None) -> int | None: + if isinstance(val, str): + val = re.sub(r"[iB]*$", "", val) + val = convert_si(val, use_binary=True) elif val is None and default is not None: val = default return val -def bytes_to_human_readable_size( - num_bytes: int -) -> str: - units = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB'] - + +def bytes_to_human_readable_size(num_bytes: int) -> str: + units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"] + # Handle the case where num_bytes is 0 if num_bytes == 0: return "0 B" - + # Calculate the appropriate unit. Take the floor if the number is not divisible by 1024 unit_index = min(len(units) - 1, int(np.log2(num_bytes) // 10)) - num = int(num_bytes // (1024 ** unit_index)) - + num = int(num_bytes // (1024**unit_index)) + # Format the number to 2 decimal places and append the unit return f"{num} {units[unit_index]}" -def create_size_bin_labels( - size_bins: int | List[int] -) -> List[str]: + +def create_size_bin_labels(size_bins: int | List[int]) -> List[str]: labels = [] - labels.append(f"0 B-{bytes_to_human_readable_size(size_bins[0])}") - + labels.append(f"0 B-{bytes_to_human_readable_size(size_bins[0])}") + for i in range(len(size_bins) - 1): lower_bound = bytes_to_human_readable_size(size_bins[i]) upper_bound = bytes_to_human_readable_size(size_bins[i + 1]) labels.append(f"{lower_bound}-{upper_bound}") - + labels.append(f">{bytes_to_human_readable_size(size_bins[-1])}") - return labels \ No newline at end of file + return labels