diff --git a/.gitignore b/.gitignore index e97902184d964bd8e55c51788ff1b2998e4ea737..c5d8ecd25c0343facbcbdee5eff42a463164bdc5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,14 +1,30 @@ +# Ignore paths to actual GPFS logs data -local-data/ -joblogs/ + +# Ignore all potential Slurm outputs from running jobs slurm-* out/ err/ -*.sif + +# Ignore cache directories __pycache__ + +# Ignore quarto outputs quarto* -cufile.log *.html general-report_files + +# Ignore CUDA and dask logs +cufile.log +output.log +rmm_log.txt +dask-logs/* + +# Ignore poetry configuration poetry.toml -.vscode \ No newline at end of file + +# Ignore local vscode config +.vscode + +# Ignore random extra files +extra/ \ No newline at end of file diff --git a/example-job-scripts/convert-logs.sh b/example-job-scripts/convert-logs.sh new file mode 100644 index 0000000000000000000000000000000000000000..f503652120303c2949f75fcec7234a07d9627528 --- /dev/null +++ b/example-job-scripts/convert-logs.sh @@ -0,0 +1,19 @@ +#! /bin/bash +# +#SBATCH --job-name=convert +#SBATCH --ntasks=1 +#SBATCH --cpus-per-task=1 +#SBATCH --mem=8G +#SBATCH --partition=amd-hdr100,intel-dcb,express +#SBATCH --time=02:00:00 +#SBATCH --output=out/convert-%A-%a.out +#SBATCH --error=out/convert-%A-%a.err +#SBATCH --array=0-49 + +module load Anaconda3 +conda activate gpfs-dev + +logs=($(find /data/rc/gpfs-policy/data -path "*/list-policy_data-project_list-path-external_slurm-*/chunks")) +log=${logs[${SLURM_ARRAY_TASK_ID}]} + +convert-to-parquet --batch --no-clobber --partition=amd-hdr100,express,intel-dcb ${log} \ No newline at end of file diff --git a/example-job-scripts/convert-to-hive.sh b/example-job-scripts/convert-to-hive.sh new file mode 100644 index 0000000000000000000000000000000000000000..6178f1f62ae28aa0ec68d6247cbe24e33879711b --- /dev/null +++ b/example-job-scripts/convert-to-hive.sh @@ -0,0 +1,21 @@ +#! /bin/bash +# +#SBATCH --job-name=hive-setup +#SBATCH --ntasks=1 +#SBATCH --cpus-per-task=16 +#SBATCH --mem=90G +#SBATCH --partition=amperenodes +#SBATCH --time=02:00:00 +#SBATCH --reservation=rc-gpfs +#SBATCH --gres=gpu:1 +#SBATCH --output=out/hive-setup-%A-%a.out +#SBATCH --error=out/hive-setup-%A-%a.err +#SBATCH --array=0-49 + +module load Anaconda3 +conda activate gpfs-dev + +parquets=($(find /data/rc/gpfs-policy/data -path "*/list-policy_data-project_list-path-external_slurm-*/parquet")) +pq=${parquets[${SLURM_ARRAY_TASK_ID}]} + +convert-to-hive --batch ${pq} /scratch/mdefende/project-hive diff --git a/example-job-scripts/split-logs.sh b/example-job-scripts/split-logs.sh new file mode 100644 index 0000000000000000000000000000000000000000..137fcd386314f8b700da95ff09419eda7eefa8a3 --- /dev/null +++ b/example-job-scripts/split-logs.sh @@ -0,0 +1,19 @@ +#! /bin/bash +# +#SBATCH --job-name=split +#SBATCH --ntasks=1 +#SBATCH --cpus-per-task=24 +#SBATCH --mem=8G +#SBATCH --partition=amd-hdr100 +#SBATCH --time=02:00:00 +#SBATCH --output=out/split-%A-%a.out +#SBATCH --error=out/split-%A-%a.err +#SBATCH --array=0-49 + +module load Anaconda3 +conda activate gpfs-dev + +logs=($(find /data/rc/gpfs-policy/data -path "*/list-policy_data-project_list-path-external_slurm-*/raw/*.gz")) +log=${logs[${SLURM_ARRAY_TASK_ID}]} + +split-log --no-clobber ${log} \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index f7f43d559f493a5594084ef49712362e68494357..1edf54b0b227e67a99ef983f76a93eaab77ca167 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ url="https://pypi.nvidia.com" priority = "supplemental" [tool.poetry.scripts] +convert-to-hive = "rc_gpfs.cli:convert_flat_to_hive" convert-to-parquet = "rc_gpfs.cli:convert_to_parquet" split-log = "rc_gpfs.cli:split_log" diff --git a/src/rc_gpfs/cli/__init__.py b/src/rc_gpfs/cli/__init__.py index fcbd7b13c818a7dce13ed4e5fe983803d7ad126e..305d2af147fe8f3425e8e6fed55ee08a41e4bd95 100644 --- a/src/rc_gpfs/cli/__init__.py +++ b/src/rc_gpfs/cli/__init__.py @@ -1,2 +1,3 @@ +from .convert_flat_to_hive import convert_flat_to_hive from .convert_to_parquet import convert_to_parquet from .split_log import split_log \ No newline at end of file diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py new file mode 100644 index 0000000000000000000000000000000000000000..f0e169f0f13646002af85dc50dec2dc2ba2e201c --- /dev/null +++ b/src/rc_gpfs/cli/convert_flat_to_hive.py @@ -0,0 +1,153 @@ +import argparse +import subprocess +from pathlib import Path + +import dask.dataframe as dd +import dask.config +dask.config.set({'dataframe.backend':'cudf'}) +from dask.diagnostics import ProgressBar + +from .utils import define_python_interpreter,batch_parser,setup_slurm_logs +from ..policy import hivize + +DESCRIPTION = """ +Converts flat parquet GPFS datasets to a hive format partitioned by tld and log acquisition date. This essentially creates a timeseries of structured datasets for each tld for much easier more efficient log comparisons within tld. Each file path is set as the index and sorted, and all final output parquets are partitioned to have similar in-memory sizes. + +Batch Processing +---------------- + +This script can be used to convert a number of tlds directly or to automatically process all tlds in a batch array job. When submitting a batch job, tlds will be grouped by their estimated memory usage over the entire parquet dataset, roughly related to the number of rows belonging to any given tld. A tld taking up more space in the log will be grouped with fewer (or no) other tlds than a tld taking up a smaller space. The max size of a group in memory is editable by the user. + +This method was implemented to account for GPFS logs being larger than memory in most cases, especially when using GPUs for processing. Sorting along an index is critical for cross-dataset join performance and must be done for efficient storage of parquet files within each parquet directory. However, this initial sort requires the full dataset to be loaded into memory which is not possible on our current hardware for very large logs while leaving memory available for actual processing. + +When submitting a batch array, be cognizant of the amount of memory available to your job, either RAM for CPU tasks or VRAM for GPU tasks. For example, on an 80 GiB A100, setting an upper group limit of 40 GiB would suffice in most cases. This will minimize the number of array tasks while ensuring enough memory is free for processing. +""" + +def parse_args(): + parser = argparse.ArgumentParser( + description=DESCRIPTION, + formatter_class=argparse.RawTextHelpFormatter, + parents=[batch_parser(cpus_per_task=16, gpus=1, partition='amperenodes', mem='90G')] + ) + parser.add_argument('parquet_path', + type=Path, + help="Path to a directory containing a flat parquet dataset.") + + parser.add_argument('hive_path', + type=Path, + help="Parent directory for the hive. This can be either a new directory or an existing hive directory. If this is an existing hive dataset, new data will be appended to the old data. This will not alter any existing data provided the tld or date of acquisition differ from the existing hive data.") + parser.add_argument('--tld', + type=str, + help='Comma-separated list of tld to convert to hive') + parser.add_argument('--cutoff', + type=float, + default=30) + parser.add_argument('--grp-file', + type=Path, + help="Path to an existing group file for batch processing.") + + args = parser.parse_args() + return vars(args) + +BATCH_SCRIPT = """\ +#!/bin/bash +# +#SBATCH --job-name=hivize +#SBATCH --ntasks={ntasks} +#SBATCH --cpus-per-task={cpus_per_task} +#SBATCH --partition={partition} +#SBATCH --time={time} +#SBATCH --mem={mem} +#SBATCH --gres=gpu:{gpus} +#SBATCH --output={output_log} +#SBATCH --error={error_log} +#SBATCH --array=1-{ngroups} + +{env_cmd} + +tld=$(sed -n "${{SLURM_ARRAY_TASK_ID}}p" {grp_file}) + +convert-to-hive --tld ${{tld}} {parquet_path} {hive_path} +""" + +def submit_batch(**kwargs): + env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env')) + kwargs.update({"env_cmd":env_cmd}) + + slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'hive') + kwargs.update(slurm_logs) + + script = BATCH_SCRIPT.format(**kwargs) + + subprocess.run(['sbatch'],input=script,shell=True,text=True) + pass + +def split_into_groups(series, cutoff): + groups = [] + + while len(series.index) > 0: + current_group = [] + current_sum = 0 + for username, storage_size in series.items(): + if storage_size > cutoff: + groups.append({username}) + series = series.drop(username) + break + elif current_sum + storage_size <= cutoff: + current_group.append(username) + current_sum += storage_size + + series = series.drop(current_group) + if current_group: + groups.append(set(current_group)) + + return groups + +def calc_tld_mem(df): + mem = df.groupby('tld',observed=True).apply(lambda x: x.memory_usage(deep=True).sum()) + return mem + +def define_tld_groups(input,cutoff): + ddf = dd.read_parquet(input,columns = ['size','kballoc','access','create','modify','uid','gid','path','tld']) + with ProgressBar(): + tld_mem = ddf.map_partitions(calc_tld_mem).compute() + tld_mem = tld_mem.groupby(tld_mem.index).sum().divide(1024**3).to_pandas() + grps = split_into_groups(tld_mem,cutoff) + return grps + +def nested_list_to_log(nest,file): + """ + Writes a list of lists to a text log + + Args: + nest (list): A list of lists to be converted. + """ + with open(file, 'w', newline='') as f: + for l in nest: + f.write(f"{','.join(l)}\n") + +def convert_flat_to_hive(): + args = parse_args() + + if args.get('batch'): + if not args.get('grp_file'): + grps = define_tld_groups(args.get('parquet_path'),args.get('cutoff')) + + misc_path = args.get('parquet_path').parent.joinpath('misc','tld_grps.txt') + misc_path.parent.mkdir(exist_ok = True, parents = True) + nested_list_to_log(grps,misc_path) + ngroups = len(grps) + grp_file = str(misc_path) + args.update({'ngroups':ngroups, + 'grp_file':grp_file}) + else: + ngroups = sum(1 for line in open(args.get('grp_file'))) + args.update({'ngroups':ngroups}) + + submit_batch(**args) + + else: + tld = args.get('tld').split(',') + args.update({'tld':tld}) + hivize(**args) + pass \ No newline at end of file diff --git a/src/rc_gpfs/cli/convert_to_parquet.py b/src/rc_gpfs/cli/convert_to_parquet.py index e684935bede99c13ec5025320614ecd497adc0e4..f035843cb54588d6394501efc3d3b8f9f7fcdb23 100644 --- a/src/rc_gpfs/cli/convert_to_parquet.py +++ b/src/rc_gpfs/cli/convert_to_parquet.py @@ -1,8 +1,9 @@ +import sys import argparse import subprocess from pathlib import Path import multiprocessing -from .utils import define_python_interpreter +from .utils import define_python_interpreter,batch_parser,setup_slurm_logs from ..policy import convert from ..utils import parse_scontrol @@ -23,7 +24,7 @@ Local Parallel Processing: def parse_args(): parser = argparse.ArgumentParser( description=DESCRIPTION, - formatter_class=argparse.RawTextHelpFormatter, + parents=[batch_parser(partition='amd-hdr100,express',time='02:00:00',mem='16G',cpus_per_task=1)], epilog=EPILOGUE ) parser.add_argument('input', @@ -36,21 +37,8 @@ def parse_args(): help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../parquet") parser.add_argument('--pool-size',type=int,default=None, help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process") - - slurm = parser.add_argument_group(title='Slurm Options') - slurm.add_argument('--batch', action='store_true', default=False, - help="Run the conversion as a batch job. If a directory path is given as an input, an array job will be created to run the conversion in parallel.") - slurm.add_argument('-n','--ntasks', type=int, default=1) - slurm.add_argument('-p','--partition', type=str, default='amd-hdr100') - slurm.add_argument('-t','--time',type=str,default='02:00:00') - slurm.add_argument('-m','--mem',type=str,default='16G') - slurm.add_argument('--slurm-log-dir',type=Path, default='./out', - help='Output log directory. If the directory does not exist, it will be created automatically. Logs will be named convert_%%A_%%a to differentiate amongs job IDs and task IDs') - interpreter = slurm.add_mutually_exclusive_group() - interpreter.add_argument('--python-path',type=Path, - help="Path to Python interpreter to use for conversion. This interpreter should have access to a pandas library, preferably version >=2.0. If not specified, the path to the active python3 interpreter will be used.") - interpreter.add_argument('--conda-env',type=str,default=None, - help="The name or prefix of a conda environment to activate as opposed to a python3 path") + parser.add_argument('--no-clobber', action='store_true',default=False, + help='When set and existing parquet files are found, immediately exits without any processing') args = parser.parse_args() return vars(args) @@ -73,18 +61,13 @@ log=$(ls {input}/*.gz | awk "NR==${{SLURM_ARRAY_TASK_ID}} {{ print $1 }}") convert-to-parquet -o {output_dir} ${{log}} """ -def setup_slurm_logs(slurm_log_dir): - slurm_log_dir = slurm_log_dir.absolute() - slurm_log_dir.mkdir(exist_ok = True,parents=True,mode = 0o2770) - out_log,err_log = [str(slurm_log_dir.joinpath('convert_%A_%a.out')),str(slurm_log_dir.joinpath('convert_%A_%a.err'))] - slurm_logs = {'output_log':out_log,'error_log':err_log} - return slurm_logs + def submit_batch(**kwargs): env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env')) kwargs.update({"env_cmd":env_cmd}) - slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir')) + slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'parquet') kwargs.update(slurm_logs) script = BATCH_SCRIPT.format(**kwargs) @@ -96,8 +79,12 @@ def convert_to_parquet() -> None: args = parse_args() if args['output_dir'] is None: args['output_dir'] = args['input'].parent.joinpath('parquet') - + args['output_dir'].mkdir(exist_ok = True, mode = 0o2770) + + output_files_exist = len(list(args['output_dir'].glob('*.parquet'))) > 0 + if args['no_clobber'] and output_files_exist: + sys.exit('The output directory already contains parquet files. Exiting') if args['input'].is_file(): nlogs = 1 @@ -117,4 +104,3 @@ def convert_to_parquet() -> None: pool.starmap(convert, fargs) else: convert(args['input'],args['output_dir']) - pass diff --git a/src/rc_gpfs/cli/split_log.py b/src/rc_gpfs/cli/split_log.py index 6c37bdeaabdb7d68c7d15570fbdd38c033855a1e..c0cc64a672eca4cd9bdbc28ffe5af51f190d56da 100644 --- a/src/rc_gpfs/cli/split_log.py +++ b/src/rc_gpfs/cli/split_log.py @@ -1,7 +1,8 @@ +import sys import argparse import subprocess from pathlib import Path -from .utils import define_python_interpreter +from .utils import define_python_interpreter,batch_parser,setup_slurm_logs from ..policy import split BATCH_SCRIPT = """\ @@ -24,7 +25,7 @@ split-log -o {output_dir} -l {lines} {log} def parse_args(): parser = argparse.ArgumentParser( description="Splits a GPFS policy log into multiple parts for batch array processing.", - formatter_class=argparse.RawTextHelpFormatter + parents=[batch_parser(cpus_per_task=24,time='02:00:00',mem='8G',gpus=0,partition='amd-hdr100,express')] ) parser.add_argument('log', type=Path, @@ -36,17 +37,9 @@ def parse_args(): help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to ./input/../chunks") parser.add_argument('-l', '--lines', type=int, default=int(5e6), help="Number of lines to split the log file by") + parser.add_argument('--no-clobber', action='store_true',default=False, + help='When set and existing split logs are found, immediately exits without any processing') - slurm = parser.add_argument_group(title='Slurm Options') - slurm.add_argument('--batch', action='store_true', default=False, - help="Run as a batch job. Otherwise use the current processing environment") - slurm.add_argument('-n','--cpus-per-task', type=int, default=24, - help="Number of cores assigned to the job. Ntasks is always set to 1") - slurm.add_argument('-p','--partition', type=str, default='amd-hdr100') - slurm.add_argument('-t','--time',type=str,default='02:00:00') - slurm.add_argument('-m','--mem',type=str,default='8G') - slurm.add_argument('--slurm-log-dir',type=Path,default='./out', - help='Output log directory. If the directory does not exist, it will be created automatically') args = parser.parse_args() return vars(args) @@ -54,10 +47,8 @@ def submit_batch(**kwargs): env_cmd = define_python_interpreter() kwargs.update({"env_cmd":env_cmd}) - kwargs.get('slurm_log_dir').mkdir(exist_ok=True,parents=True,mode = 0o2770) - output_log = kwargs.get('slurm_log_dir').joinpath('split.out') - error_log = kwargs.get('slurm_log_dir').joinpath('split.err') - kwargs.update({'output_log':output_log,'error_log':error_log}) + slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'split') + kwargs.update(slurm_logs) script = BATCH_SCRIPT.format(**kwargs) @@ -71,6 +62,10 @@ def split_log(): args['output_dir'] = args['log'].parent.parent.joinpath('chunks') args['output_dir'].mkdir(exist_ok = True, mode = 0o2770) + output_files_exist = len(list(args['output_dir'].glob('*.gz'))) > 0 + if args['no_clobber'] and output_files_exist: + sys.exit('The output directory already contains split log files. Exiting') + if args.get('batch'): submit_batch(**args) else: diff --git a/src/rc_gpfs/cli/utils.py b/src/rc_gpfs/cli/utils.py index 8115f4466172d2bf7f0aea47c829bce4b028462e..18cb8fb142b99bccad2a03426f38588a333ea38f 100644 --- a/src/rc_gpfs/cli/utils.py +++ b/src/rc_gpfs/cli/utils.py @@ -1,3 +1,4 @@ +import argparse import sys import os from pathlib import Path @@ -18,4 +19,43 @@ def define_python_interpreter(python_path=None, conda_env=None): else: parent = Path(sys.executable).absolute().parent env = venv_base.format(python_path=parent.joinpath('activate')) - return env \ No newline at end of file + return env + +class CustomHelpFormatter(argparse.MetavarTypeHelpFormatter): + def add_arguments(self, actions): + # Sort actions by their group title + actions = sorted(actions, key=lambda x: x.container.title if x.container.title else '') + super(CustomHelpFormatter, self).add_arguments(actions) + +def batch_parser( + cpus_per_task: int | None = None, + gpus: int | None = None, + partition: str | None = None, + mem: str | None = None, + time: str | None = '12:00:00', + reservation: str | None = None, + slurm_log_dir: str | Path | None = './out', + **kwargs +) -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(add_help=False, + formatter_class=CustomHelpFormatter) + slurm = parser.add_argument_group(title='Slurm Options') + slurm.add_argument('--batch', action='store_true', default=False, + help="Convert as a batch array job.") + slurm.add_argument('-n', '--ntasks', type=int, default=1) + slurm.add_argument('-c', '--cpus-per-task', type=int, default=cpus_per_task) + slurm.add_argument('-g', '--gpus', type=int, default=gpus, choices=[0, 1]) + slurm.add_argument('-p', '--partition', type=str, default=partition) + slurm.add_argument('-t', '--time', type=str, default=time) + slurm.add_argument('-m', '--mem', type=str, default=mem) + slurm.add_argument('--reservation',type=str,default=reservation) + slurm.add_argument('--slurm-log-dir', type=Path, default=slurm_log_dir, + help='Output log directory. If the directory does not exist, it will be created automatically.') + return parser + +def setup_slurm_logs(slurm_log_dir,log_basename): + slurm_log_dir = slurm_log_dir.absolute() + slurm_log_dir.mkdir(exist_ok = True,parents=True,mode = 0o2770) + out_log,err_log = [str(slurm_log_dir.joinpath(f'{log_basename}_%A_%a.out')),str(slurm_log_dir.joinpath(f'{log_basename}_%A_%a.err'))] + slurm_logs = {'output_log':out_log,'error_log':err_log} + return slurm_logs \ No newline at end of file diff --git a/src/rc_gpfs/compute/backend.py b/src/rc_gpfs/compute/backend.py index 5f8022ef2261ae71ea797c25d0232f48d879ab48..ba8a86f1110b1860729765cd1516263d68c48094 100644 --- a/src/rc_gpfs/compute/backend.py +++ b/src/rc_gpfs/compute/backend.py @@ -5,7 +5,7 @@ from .utils import * from ..utils import parse_scontrol from typing import Literal -__all__ = ['start_backend'] +__all__ = ['start_backend','start_local_cluster'] # ENH: Add default parameters for cluster creation based on defined type and available resources. For instance, creating a LocalCluster should default to using all available CPUs and all available RAM. class DaskClusterManager: diff --git a/src/rc_gpfs/policy/__init__.py b/src/rc_gpfs/policy/__init__.py index 754c70633780aa388fc8b75ed651cd793a530ed6..c24d1c30ce26f7b9c49e0df278f96df4e4ba2590 100644 --- a/src/rc_gpfs/policy/__init__.py +++ b/src/rc_gpfs/policy/__init__.py @@ -1,2 +1,2 @@ from .split import split,compress_logs -from .convert import convert \ No newline at end of file +from .convert import convert, hivize \ No newline at end of file diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index a3f7ad9ce61b47db1fa8335cd8f2042ec905f982..1414580b74f9b4b46ec7b764820bc5cf47b09b48 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -1,10 +1,20 @@ -from urllib.parse import unquote +import os import re -import pandas as pd import gzip +import random +import string +import shutil from pathlib import Path -from .utils import as_path +from typing import Literal +from urllib.parse import unquote + +import pandas as pd +import dask.dataframe as dd +import dask.config + from .policy_defs import SCHEMA +from ..compute.backend import infer_cuda +from ..utils import as_path def parse_line(line): try: @@ -65,4 +75,67 @@ def convert( df = pd.DataFrame.from_dict(dicts).sort_values('tld') df = df.astype(SCHEMA) - df.to_parquet(output_path,engine = 'pyarrow') \ No newline at end of file + df.to_parquet(output_path,engine = 'pyarrow') + + +def hivize( + parquet_path: str | Path, + hive_path: str | Path, + tld: str | list[str] | None = None, + staging_path: str | Path | None = None, + partition_size: str = '100MiB', + with_cuda: bool | Literal['infer'] = 'infer', + **kwargs + ) -> None: + parquet_path = as_path(parquet_path) + hive_path = as_path(hive_path) + + if staging_path is None: + rand_str = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) + staging_path = Path(os.getenv('TMPDIR')).joinpath(os.getenv('USER'),f'hive-{rand_str}') + print(f"INFO: Using {staging_path} as temporary directory",flush=True) + else: + staging_path = as_path(staging_path) + + hive_path.mkdir(exist_ok=True,parents=True) + staging_path.mkdir(exist_ok=True,parents=True) + + if with_cuda == 'infer': + with_cuda = infer_cuda() + + if with_cuda: + import dask_cudf as backend + from dask_cudf.core import from_cudf as from_local + dask.config.set({'dataframe.backend':'cudf'}) + else: + import dask as backend + from dask.dataframe import from_pandas as from_local + dask.config.set({'dataframe.backend':'pandas'}) + + def indexed_name(ind): + return f"indexed-{ind}.parquet" + + if tld is not None: + if not isinstance(tld,list): + tld = [tld] + predicates = [('tld','in',tld)] + else: + predicates = None + print(f"DEBUG: Filtering predicates are: {predicates}",flush=True) + + acq = re.search(r'[\d]{4}-[\d]{2}-[\d]{2}',str(parquet_path)).group(0) + print(f"DEBUG: Acquisition date is {acq}",flush=True) + + # The flat parquet is initially read in via dask to avoid reading the full dataset into memory which happens even + # when including predicates for filtering. The dask dataframe is converted to a regular dataframe to drastically + # improve indexing and sorting by removing partitions. The sorted dataframe is converted back to a dask dataframe + # to create partitions within the parquet dataset and write to multiple files defined by those partitions. + ddf = backend.read_parquet(parquet_path,filters=predicates,columns = ['size','kballoc','access','create','modify','uid','gid','path','tld']) + df = ddf.compute() + df = df.set_index('path').sort_index().assign(acq=acq) + ddf = from_local(df).repartition(partition_size=partition_size,force=True) + ddf.to_parquet(staging_path,partition_on=['tld','acq'],name_function = indexed_name) + + shutil.copytree(staging_path,hive_path,dirs_exist_ok=True) + shutil.rmtree(staging_path) + pass \ No newline at end of file diff --git a/src/rc_gpfs/policy/split.py b/src/rc_gpfs/policy/split.py index ac5fe693267d078c5980a873a0421c10ee8f186c..1438a20238cab29f5817187ed189855d420dea2d 100644 --- a/src/rc_gpfs/policy/split.py +++ b/src/rc_gpfs/policy/split.py @@ -1,7 +1,6 @@ from pathlib import Path import subprocess -from .utils import as_path -from ..utils import parse_scontrol +from ..utils import parse_scontrol,as_path __all__ = ['split','compress_logs'] diff --git a/src/rc_gpfs/policy/utils.py b/src/rc_gpfs/policy/utils.py index bd89f7db9bbdcc9f2bc657c6fd0d7c0532f3150e..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 --- a/src/rc_gpfs/policy/utils.py +++ b/src/rc_gpfs/policy/utils.py @@ -1,6 +0,0 @@ -from pathlib import Path - -def as_path(s: str | Path) -> Path: - if not isinstance(s,Path): - s = Path(s) - return s \ No newline at end of file diff --git a/src/rc_gpfs/utils.py b/src/rc_gpfs/utils.py index 2c205b90c99bc4dc8ee0d900a68351c10783dceb..dc54e4736619da9a90e2e584c2a103de61d68970 100644 --- a/src/rc_gpfs/utils.py +++ b/src/rc_gpfs/utils.py @@ -2,6 +2,7 @@ from typing import Literal import os import re import subprocess +from pathlib import Path # ENH: if this package becomes merged with noctua, need to replace this function since it's copied directly from there def convert_si(value: str | float | int, @@ -75,4 +76,9 @@ def parse_scontrol(): cores = int(cores) mem = convert_si(mem,to_unit='G',use_binary=True) - return [cores,mem] \ No newline at end of file + return [cores,mem] + +def as_path(s: str | Path) -> Path: + if not isinstance(s,Path): + s = Path(s) + return s \ No newline at end of file