Something went wrong on our end
-
Matthew K Defenderfer authoredbe759db8
convert_flat_to_hive.py 6.44 KiB
import argparse
import subprocess
from pathlib import Path
import dask.dataframe as dd
import dask.config
dask.config.set({'dataframe.backend':'cudf'})
from dask.diagnostics import ProgressBar
from .utils import define_python_interpreter,batch_parser,setup_slurm_logs
from ..policy import hivize
DESCRIPTION = """
Converts flat parquet GPFS datasets to a hive format partitioned by tld and log acquisition date. This essentially creates a timeseries of structured datasets for each tld for much easier more efficient log comparisons within tld. Each file path is set as the index and sorted, and all final output parquets are partitioned to have similar in-memory sizes.
Batch Processing
----------------
This script can be used to convert a number of tlds directly or to automatically process all tlds in a batch array job. When submitting a batch job, tlds will be grouped by their estimated memory usage over the entire parquet dataset, roughly related to the number of rows belonging to any given tld. A tld taking up more space in the log will be grouped with fewer (or no) other tlds than a tld taking up a smaller space. The max size of a group in memory is editable by the user.
This method was implemented to account for GPFS logs being larger than memory in most cases, especially when using GPUs for processing. Sorting along an index is critical for cross-dataset join performance and must be done for efficient storage of parquet files within each parquet directory. However, this initial sort requires the full dataset to be loaded into memory which is not possible on our current hardware for very large logs while leaving memory available for actual processing.
When submitting a batch array, be cognizant of the amount of memory available to your job, either RAM for CPU tasks or VRAM for GPU tasks. For example, on an 80 GiB A100, setting an upper group limit of 40 GiB would suffice in most cases. This will minimize the number of array tasks while ensuring enough memory is free for processing.
"""
def parse_args():
parser = argparse.ArgumentParser(
description=DESCRIPTION,
formatter_class=argparse.RawTextHelpFormatter,
parents=[batch_parser(cpus_per_task=16, gpus=1, partition='amperenodes', mem='90G')]
)
parser.add_argument('parquet_path',
type=Path,
help="Path to a directory containing a flat parquet dataset.")
parser.add_argument('hive_path',
type=Path,
help="Parent directory for the hive. This can be either a new directory or an existing hive directory. If this is an existing hive dataset, new data will be appended to the old data. This will not alter any existing data provided the tld or date of acquisition differ from the existing hive data.")
parser.add_argument('--tld',
type=str,
help='Comma-separated list of tld to convert to hive')
parser.add_argument('--cutoff',
type=float,
default=30)
parser.add_argument('--grp-file',
type=Path,
help="Path to an existing group file for batch processing.")
args = parser.parse_args()
return vars(args)
SLURM_OPTS = """\
#SBATCH --job-name=hivize
#SBATCH --ntasks={ntasks}
#SBATCH --cpus-per-task={cpus_per_task}
#SBATCH --partition={partition}
#SBATCH --time={time}
#SBATCH --mem={mem}
#SBATCH --gres=gpu:{gpus}
#SBATCH --output={output_log}
#SBATCH --error={error_log}
#SBATCH --array=1-{ngroups}
"""
BATCH_CMDS = """\
{env_cmd}
tld=$(sed -n "${{SLURM_ARRAY_TASK_ID}}p" {grp_file})
convert-to-hive --tld ${{tld}} {parquet_path} {hive_path}
"""
def submit_batch(**kwargs):
env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env'))
kwargs.update({"env_cmd":env_cmd})
slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'hive')
kwargs.update(slurm_logs)
slurm_opts = SLURM_OPTS.format(**kwargs)
if kwargs.get('reservation') is not None:
slurm_opts = f"{slurm_opts}#SBATCH --reservation={kwargs.get('reservation')}"
script = f"#!/bin/bash\n#\n{slurm_opts}\n{BATCH_CMDS.format(**kwargs)}"
subprocess.run(['sbatch'],input=script,shell=True,text=True)
pass
def split_into_groups(series, cutoff):
groups = []
while len(series.index) > 0:
current_group = []
current_sum = 0
for username, storage_size in series.items():
if storage_size > cutoff:
groups.append({username})
series = series.drop(username)
break
elif current_sum + storage_size <= cutoff:
current_group.append(username)
current_sum += storage_size
series = series.drop(current_group)
if current_group:
groups.append(set(current_group))
return groups
def calc_tld_mem(df):
mem = df.groupby('tld',observed=True).apply(lambda x: x.memory_usage(deep=True).sum())
return mem
def define_tld_groups(input,cutoff):
ddf = dd.read_parquet(input,columns = ['size','kballoc','access','create','modify','uid','gid','path','tld'])
with ProgressBar():
tld_mem = ddf.map_partitions(calc_tld_mem).compute()
tld_mem = tld_mem.groupby(tld_mem.index).sum().divide(1024**3).to_pandas()
grps = split_into_groups(tld_mem,cutoff)
return grps
def nested_list_to_log(nest,file):
"""
Writes a list of lists to a text log
Args:
nest (list): A list of lists to be converted.
"""
with open(file, 'w', newline='') as f:
for l in nest:
f.write(f"{','.join(l)}\n")
def convert_flat_to_hive():
args = parse_args()
if args.get('batch'):
if not args.get('grp_file'):
grps = define_tld_groups(args.get('parquet_path'),args.get('cutoff'))
misc_path = args.get('parquet_path').parent.joinpath('misc','tld_grps.txt')
misc_path.parent.mkdir(exist_ok = True, parents = True)
nested_list_to_log(grps,misc_path)
ngroups = len(grps)
grp_file = str(misc_path)
args.update({'ngroups':ngroups,
'grp_file':grp_file})
else:
ngroups = sum(1 for line in open(args.get('grp_file')))
args.update({'ngroups':ngroups})
submit_batch(**args)
else:
tld = args.get('tld').split(',')
args.update({'tld':tld})
hivize(**args)
pass