Newer
Older
import argparse
import subprocess
from pathlib import Path
from ..policy.hive import hivize
from .utils import define_python_interpreter,batch_parser,setup_slurm_logs
DESCRIPTION = """
Converts flat parquet GPFS datasets to a hive format partitioned by tld and log acquisition date. This creates a timeseries of structured datasets for each tld for much easier more efficient log comparisons within tld. Output parquets are sorted by the path column for convenience, but no index is set.
Setting the --batch flag will create a Slurm array job where each task processes one tld, either from the passed parameter or from the unique values in the parquet dataset.
All processing is done via Polars and so can take advantage of parallel processing. Higher core counts can provide a limited benefit to performance, and a GPU is not required.
"""
def parse_args():
parser = argparse.ArgumentParser(
description=DESCRIPTION,
formatter_class=argparse.RawTextHelpFormatter,
parents=[batch_parser(cpus_per_task=16, gpus=0, partition='amd-hdr100', mem='32G',time='02:00:00')]
)
parser.add_argument('parquet_path',
type=Path,
help="Path to a directory containing a flat parquet dataset.")
parser.add_argument('hive_path',
type=Path,
help="Parent directory for the hive. This can be either a new directory or an existing hive directory. If this is an existing hive dataset, new data will be appended to the old data. This will not alter any existing data provided the tld or date of acquisition differ from the existing hive data.")
parser.add_argument('--tld',
type=str,
help='Comma-separated list of tld to convert to hive. If not specified, all unique tld values in the dataset will be processed.')
parser.add_argument('--partition-size',
dest='partition_chunk_size_bytes',
type=str,
default='100MiB',
help='Max size of in-memory data for each partition in a single hive dataset. Smaller partitions cause more files to be written. Can pass the byte size as an integer or as a human-readable byte string. For example, 1024 and 1KiB are equivalent.')
parser.add_argument('--no-clobber',
default=False,
action='store_true',
help="Flag to set whether contents of a hive cell will be overwritten. If True, the pipeline will exit if any parquet files are found in the cell directory. No processing will occur in that case. If False (default), any files existing in the cell directory will be removed prior to data writing.")
args = parser.parse_args()
return vars(args)
#SBATCH --job-name=hivize
#SBATCH --ntasks={ntasks}
#SBATCH --cpus-per-task={cpus_per_task}
#SBATCH --partition={partition}
#SBATCH --time={time}
#SBATCH --mem={mem}
#SBATCH --output={output_log}
#SBATCH --error={error_log}
tld=$(sed -n "${{SLURM_ARRAY_TASK_ID}}p" {tld_file})
convert-to-hive --tld ${{tld}} --partition-size={partition_chunk_size_bytes} {parquet_path} {hive_path}
"""
def submit_batch(**kwargs):
env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env'))
kwargs.update({"env_cmd":env_cmd})
slurm_logs = setup_slurm_logs(kwargs.get('slurm_log_dir'),'hive')
kwargs.update(slurm_logs)
slurm_opts = SLURM_OPTS.format(**kwargs)
if kwargs.get('reservation') is not None:
slurm_opts = f"{slurm_opts}#SBATCH --reservation={kwargs.get('reservation')}"
script = f"#!/bin/bash\n#\n{slurm_opts}\n{BATCH_CMDS.format(**kwargs)}"
subprocess.run(['sbatch'],input=script,shell=True,text=True)
pass
def convert_flat_to_hive():
args = parse_args()
if args['tld'] is None:
tlds = (
pl.scan_parquet(args['parquet_path'],cache=False)
.select('tld')
.unique()
.collect(engine='streaming')
.sort('tld')
.get_column('tld')
.to_list()
)
else:
tlds = args['tld'].split(',')
if args['batch']:
tld_file = args['parquet_path'].parent.joinpath('misc','tld.txt')
tld_file.parent.mkdir(parents=True,exist_ok=True)
with open(tld_file,'wt') as f:
f.writelines('\n'.join(tlds))
args['ntlds'] = len(tlds)
args['tld_file'] = tld_file
_ = args.pop('tld')
for tld in tlds:
hivize(**args,tld=tld)