Skip to content
Snippets Groups Projects
gpfs_preproc.py 2.48 KiB
Newer Older
import argparse
from pathlib import Path
from pandas import to_datetime, Timestamp
from typing import Literal
from . import process
from . import report

__all__ = ['preproc']

def _as_path(p) -> Path:
    return Path(p).absolute()

def _as_datetime(d) -> Timestamp:
    return to_datetime(d)

def _parse_preprocessing_args():
    parser = argparse.ArgumentParser()
    dataset_path = parser.add_argument(
        'dataset',
        type=_as_path,
        help="Path to parquet file/dataset for processing. This can be given as the path to the containing directory if there are multiple parquet files in the dataset."
    )
    run_date = parser.add_argument(
        '-r','--policy-run-date',
        type=_as_datetime
    )
    delta_vals   = parser.add_argument(
        '-a','--atime-deltas',
        nargs='+',
        type=int
    )
    delta_unit   = parser.add_argument(
        '-u','--delta-unit',
        type=str,
        choices=['D','W','M','Y']
    )
    report_dir   = parser.add_argument(
        '-d','--outdir',
        type=_as_path

    )
    report_name  = parser.add_argument(
        '-o','--outfile',
        type=str,
    )

    cluster      = parser.add_argument_group(
        title='Local Cluster Options',
        description='Arguments to control Dask local cluster behavior'
    )
    
    n_workers    = cluster.add_argument('-n','--n-workers', type = int)
    with_cuda    = cluster.add_argument('--with-cuda', default = 'infer')
    with_dask    = cluster.add_argument('--with-dask', default = 'infer')
    args = parser.parse_args()
    return vars(args)

def _str_to_bool(val):
    """Convert a string representation of truth to true (1) or false (0).
    True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
    are 'n', 'no', 'f', 'false', 'off', and '0'.  Raises ValueError if
    'val' is anything else.
    """
    val = val.lower()
    if val in ('y', 'yes', 't', 'true', 'on', '1'):
        return True
    elif val in ('n', 'no', 'f', 'false', 'off', '0'):
        return False
    else:
        return 'infer'

def _convert_int(val):
    try:
        return int(val)
    except:
        return None

def _fix_cluster_args(args) -> dict:
    args['with_cuda'] = _str_to_bool(args['with_cuda'])
    args['with_dask'] = _str_to_bool(args['with_dask'])
    args['n_workers'] = _convert_int(args['n_workers'])
    return args

def preproc():
    args = _parse_preprocessing_args()
    args = _fix_cluster_args(args)

    df_agg = process.aggregate_gpfs_dataset()
    return df_agg