Skip to content
Snippets Groups Projects
Commit 25914521 authored by Matthew K Defenderfer's avatar Matthew K Defenderfer
Browse files

Improve hive conversion functionality

parent 70d40694
No related branches found
No related tags found
1 merge request!65Improve hive conversion functionality
import argparse import argparse
import subprocess import subprocess
import re
import time import time
import random import random
from pathlib import Path from pathlib import Path
import polars as pl import polars as pl
from typing import List
from ..policy.hive import hivize from ..policy.hive import hivize
from .utils import define_python_interpreter,batch_parser,setup_slurm_logs from .utils import define_python_interpreter,batch_parser_no_mem,setup_slurm_logs
from ..utils import get_parquet_dataset_size
DESCRIPTION = """ DESCRIPTION = """
Converts flat parquet GPFS datasets to a hive format partitioned by tld and log acquisition date. This creates a timeseries of structured datasets for each tld for much easier more efficient log comparisons within tld. Output parquets are sorted by the path column for convenience, but no index is set. Converts flat parquet GPFS datasets to a hive format partitioned by tld and log acquisition date. This creates a timeseries of structured datasets for each tld for much easier more efficient log comparisons within tld. Output parquets are sorted by the path column for convenience, but no index is set.
Setting the --batch flag will create a Slurm array job where each task processes one tld, either from the passed parameter or from the unique values in the parquet dataset. Setting the --batch flag will create a Slurm array job where each task processes one tld, either from the passed parameter or from the unique values in the parquet dataset. Each tld's in-memory size will be estimated based on header information from the parquet dataset and how many records a given tld has. These estimated sizes are used to group tlds, and each group will be submitted as a separate array job with different requested memory. This helps maximize throughput while also mitigating OOM errors.
All processing is done via Polars and so can take advantage of parallel processing. Higher core counts can provide a limited benefit to performance, and a GPU is not required. All processing is done via Polars and so can take advantage of parallel processing. Higher core counts can provide a limited benefit to performance, and a GPU is not required.
""" """
def parse_args(): def parse_args(cli_args=None):
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description=DESCRIPTION, description=DESCRIPTION,
formatter_class=argparse.RawTextHelpFormatter, formatter_class=argparse.RawTextHelpFormatter,
parents=[batch_parser(cpus_per_task=16, gpus=0, partition='amd-hdr100', mem='32G',time='02:00:00')] parents=[batch_parser_no_mem(cpus_per_task=8, gpus=0, partition='amd-hdr100',time='02:00:00')]
) )
parser.add_argument('parquet_path', parser.add_argument('parquet_path',
type=Path, type=Path,
...@@ -35,18 +38,26 @@ def parse_args(): ...@@ -35,18 +38,26 @@ def parse_args():
parser.add_argument('--partition-size', parser.add_argument('--partition-size',
dest='partition_chunk_size_bytes', dest='partition_chunk_size_bytes',
type=str, type=str,
default='100MiB', default='200MiB',
help='Max size of in-memory data for each partition in a single hive dataset. Smaller partitions cause more files to be written. Can pass the byte size as an integer or as a human-readable byte string. For example, 1024 and 1KiB are equivalent.') help='Max size of in-memory data for each partition in a single hive dataset. Smaller partitions cause more files to be written. Can pass the byte size as an integer or as a human-readable byte string. For example, 1024 and 1KiB are equivalent.')
parser.add_argument('--mem-factor',
type=int,
default=3,
help="Factor to scale each tld's estimated memory by when setting memory requirements in the Slurm job. A factor of 3 accounts for almost all cases. A lower factor can result in more OOM errors but would increase task throughput, and vice versa for a higher factor.")
parser.add_argument('--no-clobber', parser.add_argument('--no-clobber',
default=False, default=False,
action='store_true', action='store_true',
help="Flag to set whether contents of a hive cell will be overwritten. If True, the pipeline will exit if any parquet files are found in the cell directory. No processing will occur in that case. If False (default), any files existing in the cell directory will be removed prior to data writing.") help="Flag to set whether contents of a hive cell will be overwritten. If True, the pipeline will exit if any parquet files are found in the cell directory. No processing will occur in that case. If False (default), any files existing in the cell directory will be removed prior to data writing.")
parser.add_argument('--dry-run',
default=False,
action='store_true',
help="If set, no jobs will be submitted at the end. Tld grouping still occurs, and the text files with the groups are still written.")
args = parser.parse_args() args = parser.parse_args(cli_args)
return vars(args) return vars(args)
SLURM_OPTS = """\ SLURM_OPTS = """\
#SBATCH --job-name=hivize #SBATCH --job-name=hivize-{mem}
#SBATCH --ntasks={ntasks} #SBATCH --ntasks={ntasks}
#SBATCH --cpus-per-task={cpus_per_task} #SBATCH --cpus-per-task={cpus_per_task}
#SBATCH --partition={partition} #SBATCH --partition={partition}
...@@ -78,17 +89,76 @@ def submit_batch(**kwargs): ...@@ -78,17 +89,76 @@ def submit_batch(**kwargs):
script = f"#!/bin/bash\n#\n{slurm_opts}\n{BATCH_CMDS.format(**kwargs)}" script = f"#!/bin/bash\n#\n{slurm_opts}\n{BATCH_CMDS.format(**kwargs)}"
# Wait between 1 and 5 seconds before batch submission. This helps avoid a situation where this setup is running in if not kwargs['dry_run']:
# a batch array job and all of the array tasks submit their child array jobs at the same time. That results in jobs # Wait between 1 and 5 seconds before batch submission. This helps avoid a situation where this setup is running
# failing to be submitted due to overwhelming the scheduler with simultaneous requests. Adding a random delay should # in a batch array job and all of the array tasks submit their child array jobs at the same time. That results
# fix that # in jobs failing to be submitted due to overwhelming the scheduler with simultaneous requests. Adding a random
time.sleep(random.uniform(1,5)) # delay should fix that
time.sleep(random.uniform(1,5))
subprocess.run(['sbatch'],input=script,shell=True,text=True) subprocess.run(['sbatch'],input=script,shell=True,text=True)
pass pass
def convert_flat_to_hive(): def submit_tld_groups(df,grp_dir,args):
args = parse_args() mem_grp = df[0,'req_mem_grp']
tld_file = grp_dir.joinpath(f"tld-{mem_grp}.txt")
tlds = df["tld"].to_list()
with open(tld_file,'wt') as f:
f.writelines('\n'.join(tlds))
args['ntlds'] = len(tlds)
args['tld_file'] = tld_file
args['mem'] = mem_grp
submit_batch(**args)
pass
def get_tld_row_counts(parquet_path: Path) -> pl.DataFrame:
tld_rows = (
pl.scan_parquet(parquet_path, parallel="prefiltered")
.group_by("tld")
.agg(pl.col("tld").count().alias("file_count"))
.collect(engine="streaming")
)
return tld_rows
def estimate_req_mem(
parquet_path: Path,
tld: List[str] | pl.Series,
mem_factor: int = 3
) -> pl.DataFrame:
mem_breaks = [8, 16, 32, 64]
mem_labels = ["8G", "16G", "32G", "64G", "128G"]
dataset_size = get_parquet_dataset_size(parquet_path) / (1024**3)
tld_sizes = get_tld_row_counts(parquet_path)
tld_sizes = (
tld_sizes.with_columns(
(pl.col("file_count") / pl.col("file_count").sum() * dataset_size).alias(
"est_size"
)
)
.with_columns((pl.col("est_size") * mem_factor).alias("est_req_mem"))
.with_columns(
(
pl.col("est_req_mem")
.cut(breaks=mem_breaks, labels=mem_labels)
.alias("req_mem_grp")
)
)
.filter(pl.col("tld").is_in(tld))
)
return tld_sizes
def convert_flat_to_hive(cli_args=None):
args = parse_args(cli_args)
if args['tld'] is None: if args['tld'] is None:
tlds = ( tlds = (
...@@ -102,18 +172,34 @@ def convert_flat_to_hive(): ...@@ -102,18 +172,34 @@ def convert_flat_to_hive():
) )
else: else:
tlds = args['tld'].split(',') tlds = args['tld'].split(',')
if args['no_clobber']:
acq = re.search(r"\d{4}-\d{2}-\d{2}", str(args['parquet_path'].absolute())).group(0)
existing_acq_paths = [p for p in args['hive_path'].rglob(f'*/acq={acq}') if len(list(p.glob('*.parquet'))) > 0]
existing_tlds = [re.search(r"tld=([^/]+)/", str(p)).group(1) for p in existing_acq_paths]
args['tld'] = [t for t in tlds if t not in existing_tlds]
if len(args['tld']) == 0:
print("INFO: All tlds already exist, and no-clobber is set. Exiting without converting")
return
if args['batch']: if args['batch']:
tld_file = args['parquet_path'].parent.joinpath('misc','tld.txt') req_mem = estimate_req_mem(args['parquet_path'],args['tld'],args['mem_factor'])
tld_file.parent.mkdir(parents=True,exist_ok=True)
with open(tld_file,'wt') as f: grp_dir = args["parquet_path"].parent.joinpath("misc")
f.writelines('\n'.join(tlds)) grp_dir.mkdir(parents=True,exist_ok=True)
[f.unlink() for f in grp_dir.glob('tld*.txt')]
args['ntlds'] = len(tlds) req_mem_file = grp_dir.joinpath('tld_est_mem.parquet')
args['tld_file'] = tld_file req_mem_file.unlink(missing_ok=True)
req_mem.write_parquet(req_mem_file)
submit_batch(**args) for grp, df in req_mem.group_by('req_mem_grp'):
print(f"INFO: Submitting array job for {grp[0]}",flush=True)
submit_tld_groups(df,grp_dir,args)
else: else:
_ = args.pop('tld') _ = args.pop('tld')
......
...@@ -27,6 +27,39 @@ class CustomHelpFormatter(argparse.MetavarTypeHelpFormatter): ...@@ -27,6 +27,39 @@ class CustomHelpFormatter(argparse.MetavarTypeHelpFormatter):
actions = sorted(actions, key=lambda x: x.container.title if x.container.title else '') actions = sorted(actions, key=lambda x: x.container.title if x.container.title else '')
super(CustomHelpFormatter, self).add_arguments(actions) super(CustomHelpFormatter, self).add_arguments(actions)
def batch_parser_no_mem(
cpus_per_task: int | None = None,
gpus: int | None = None,
partition: str | None = None,
time: str | None = "12:00:00",
reservation: str | None = None,
slurm_log_dir: str | Path | None = "./out",
**kwargs,
) -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
add_help=False, formatter_class=CustomHelpFormatter
)
slurm = parser.add_argument_group(title="Slurm Options")
slurm.add_argument(
"--batch",
action="store_true",
default=False,
help="Convert as a batch array job.",
)
slurm.add_argument("-n", "--ntasks", type=int, default=1)
slurm.add_argument("-c", "--cpus-per-task", type=int, default=cpus_per_task)
slurm.add_argument("-g", "--gpus", type=int, default=gpus, choices=[0, 1])
slurm.add_argument("-p", "--partition", type=str, default=partition)
slurm.add_argument("-t", "--time", type=str, default=time)
slurm.add_argument("--reservation", type=str, default=reservation)
slurm.add_argument(
"--slurm-log-dir",
type=Path,
default=slurm_log_dir,
help="Output log directory. If the directory does not exist, it will be created automatically.",
)
return parser
def batch_parser( def batch_parser(
cpus_per_task: int | None = None, cpus_per_task: int | None = None,
gpus: int | None = None, gpus: int | None = None,
......
...@@ -16,53 +16,54 @@ from ..utils import ( ...@@ -16,53 +16,54 @@ from ..utils import (
calculate_age_distribution calculate_age_distribution
) )
def collect_hive_df(parquet_path: str | Path, acq: str, tld: str | List[str] | None = None): def collect_hive_df(
queries = [] parquet_path: str | Path,
for pq in parquet_path.glob('*.parquet'): acq: str,
q = ( tld: str | List[str] | None = None,
pl.scan_parquet(pq,parallel='prefiltered',rechunk=True) hive_path: str | Path | None = None,
.select(['size','kballoc','access','create','modify','uid','gid','path','tld']) no_clobber: bool = False
.with_columns(pl.lit(acq).alias('acq')) ) -> pl.DataFrame:
if not isinstance(tld,list) and tld is not None:
tld = [tld]
print("Collecting dataframe",flush=True)
df = (
pl.scan_parquet(parquet_path, parallel="prefiltered", rechunk=True)
.select(
[
"size",
"kballoc",
"access",
"create",
"modify",
"uid",
"gid",
"path",
"tld",
]
) )
.with_columns(pl.lit(acq).alias("acq"))
)
if tld is not None: if tld is not None:
q = q.filter(pl.col('tld').is_in(tld)) df = df.filter(pl.col("tld").is_in(tld))
queries.append(q)
print("Collecting queries",flush=True) if no_clobber:
dfs = pl.collect_all(queries,engine='streaming') existing_tlds = _get_existing_hive_cells(hive_path,acq)
df = pl.concat(dfs).sort('path') df = df.filter(pl.col('tld').is_in(existing_tlds).not_())
print("Finished collecting queries",flush=True)
return df
def _remove_clobbered_cells(df: pl.DataFrame, hive_path: Path): df = df.collect(engine='streaming')
hive_cells = (
df.select("tld", "acq")
.unique()
.with_columns(
pl.struct("tld", "acq")
.map_elements(
lambda x: str(hive_path.joinpath(f"tld={x['tld']}", f"acq={x['acq']}")),
return_dtype=pl.String,
)
.alias("hive_cell")
)
)
clobbered = [] print("Finished collecting queries", flush=True)
for s in hive_cells["hive_cell"].to_list():
pqs = list(Path(s).glob("*.parquet"))
if len(pqs) > 0:
clobbered.append(s)
no_clobber = ( return df
df
.join(hive_cells, how="left", on=["tld", "acq"])
.filter(pl.col("hive_cell").is_in(clobbered).not_())
)
return no_clobber def _get_existing_hive_cells(hive_path: str | Path, acq: str):
hive_path = as_path(hive_path)
existing_pq = hive_path.rglob(f"*/acq={acq}/*.parquet")
existing_tlds = list(set([p.parent.parent.name.removeprefix("tld=") for p in existing_pq]))
return existing_tlds
def hivize( def hivize(
parquet_path: str | Path, parquet_path: str | Path,
...@@ -100,14 +101,15 @@ def hivize( ...@@ -100,14 +101,15 @@ def hivize(
acq = re.search(r'[\d]{4}-[\d]{2}-[\d]{2}',str(parquet_path)).group(0) acq = re.search(r'[\d]{4}-[\d]{2}-[\d]{2}',str(parquet_path)).group(0)
print(f"DEBUG: Acquisition date is {acq}",flush=True) print(f"DEBUG: Acquisition date is {acq}",flush=True)
df = collect_hive_df(parquet_path,acq,tld) df = collect_hive_df(parquet_path, acq, tld, hive_path, no_clobber)
if no_clobber: if df.is_empty():
df = _remove_clobbered_cells(df, hive_path) print(
if df.is_empty(): "INFO: All passed tlds already have parquet files in their hive cell directories. Aborting",
print("INFO: All passed tlds already have parquet files in their hive cell directories. Cleaning temp directories and exiting",flush=True) flush=True,
shutil.rmtree(staging_path) )
return shutil.rmtree(staging_path)
return
print('Writing to hive') print('Writing to hive')
df.write_parquet( df.write_parquet(
...@@ -119,7 +121,11 @@ def hivize( ...@@ -119,7 +121,11 @@ def hivize(
print("Finished writing hive dataset",flush=True) print("Finished writing hive dataset",flush=True)
if write_metadata: if write_metadata:
for grp in df['tld'].unique(): tlds = df["tld"].unique().to_list()
df = df.drop(
[c for c in df.columns if c not in ["tld", "size", "modify", "access"]]
).lazy()
for grp in tlds:
tdf = df.filter(pl.col('tld').eq(grp)) tdf = df.filter(pl.col('tld').eq(grp))
output_dir=staging_path.joinpath(f"tld={grp}",f"acq={acq}") output_dir=staging_path.joinpath(f"tld={grp}",f"acq={acq}")
write_dataset_metadata(tdf,output_dir,acq) write_dataset_metadata(tdf,output_dir,acq)
...@@ -128,49 +134,46 @@ def hivize( ...@@ -128,49 +134,46 @@ def hivize(
shutil.rmtree(staging_path) shutil.rmtree(staging_path)
def write_dataset_metadata( def write_dataset_metadata(
df: pl.DataFrame, df: pl.DataFrame | pl.LazyFrame,
parquet_path: Path, parquet_path: Path,
acq: str, acq: str,
**kwargs **kwargs
) -> dict: ) -> dict:
size_df = df[['size']] df = df.lazy()
size_df = size_df.with_columns(calculate_size_distribution(size_df['size'],**kwargs).alias('grp'))
size_dist = ( size_dist = (
size_df df.select("size")
.group_by('grp') .with_columns(
.agg([ calculate_size_distribution(pl.col("size"), **kwargs).alias("grp")
pl.sum('size').alias('bytes'), )
pl.count('size').alias('file_count') .group_by("grp")
]) .agg([pl.sum("size").alias("bytes"), pl.count("size").alias("file_count")])
.sort("grp", descending=True)
.collect(engine="streaming")
.to_dicts() .to_dicts()
) )
access_df = df[['access','size']]
access_df = access_df.with_columns(calculate_age_distribution(access_df['access'],acq,**kwargs).alias('grp'))
access_dist = ( access_dist = (
access_df df.select("access", "size")
.group_by('grp') .with_columns(
.agg([ calculate_age_distribution(pl.col('access'), acq, **kwargs).alias("grp")
pl.sum('size').alias('bytes'), )
pl.count('size').alias('file_count') .group_by("grp")
]) .agg([pl.sum("size").alias("bytes"), pl.count("size").alias("file_count")])
.sort('grp',descending=True) .sort("grp", descending=True)
.collect(engine='streaming')
.to_dicts() .to_dicts()
) )
modify_df = df[['modify','size']]
modify_df = modify_df.with_columns(calculate_age_distribution(modify_df['modify'],acq,**kwargs).alias('grp'))
modify_dist = ( modify_dist = (
modify_df df.select("modify", "size")
.group_by('grp') .with_columns(
.agg([ calculate_age_distribution(pl.col("modify"), acq, **kwargs).alias("grp")
pl.sum('size').alias('bytes'), )
pl.count('size').alias('file_count') .group_by("grp")
]) .agg([pl.sum("size").alias("bytes"), pl.count("size").alias("file_count")])
.sort('grp',descending=True) .sort("grp", descending=True)
.collect(engine="streaming")
.to_dicts() .to_dicts()
) )
......
...@@ -4,6 +4,7 @@ import subprocess ...@@ -4,6 +4,7 @@ import subprocess
from pathlib import Path from pathlib import Path
from typing import List, Literal, Tuple from typing import List, Literal, Tuple
import polars as pl import polars as pl
import pyarrow.parquet as pq
import numpy as np import numpy as np
from .units import as_bytes, convert_si, create_size_bin_labels from .units import as_bytes, convert_si, create_size_bin_labels
...@@ -103,4 +104,14 @@ def calculate_age_distribution( ...@@ -103,4 +104,14 @@ def calculate_age_distribution(
.cast(pl.String) .cast(pl.String)
.cast(pl.Enum(age_labels)) .cast(pl.Enum(age_labels))
) )
return age_grps return age_grps
\ No newline at end of file
def get_parquet_dataset_size(parquet_path):
tot_size = 0
for p in parquet_path.glob("*.parquet"):
md = pq.read_metadata(p)
for rg in range(0, md.num_row_groups):
tot_size += md.row_group(rg).total_byte_size
return tot_size
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment