Newer
Older
Matthew K Defenderfer
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import argparse
from pathlib import Path
from pandas import to_datetime, Timestamp
from typing import Literal
from . import process
from . import report
__all__ = ['preproc']
def _as_path(p) -> Path:
return Path(p).absolute()
def _as_datetime(d) -> Timestamp:
return to_datetime(d)
def _parse_preprocessing_args():
parser = argparse.ArgumentParser()
dataset_path = parser.add_argument(
'dataset',
type=_as_path,
help="Path to parquet file/dataset for processing. This can be given as the path to the containing directory if there are multiple parquet files in the dataset."
)
run_date = parser.add_argument(
'-r','--policy-run-date',
type=_as_datetime
)
delta_vals = parser.add_argument(
'-a','--atime-deltas',
nargs='+',
type=int
)
delta_unit = parser.add_argument(
'-u','--delta-unit',
type=str,
choices=['D','W','M','Y']
)
report_dir = parser.add_argument(
'-d','--outdir',
type=_as_path
)
report_name = parser.add_argument(
'-o','--outfile',
type=str,
)
cluster = parser.add_argument_group(
title='Local Cluster Options',
description='Arguments to control Dask local cluster behavior'
)
n_workers = cluster.add_argument('-n','--n-workers', type = int)
with_cuda = cluster.add_argument('--with-cuda', default = 'infer')
with_dask = cluster.add_argument('--with-dask', default = 'infer')
args = parser.parse_args()
return vars(args)
def _str_to_bool(val):
"""Convert a string representation of truth to true (1) or false (0).
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
'val' is anything else.
"""
val = val.lower()
if val in ('y', 'yes', 't', 'true', 'on', '1'):
return True
elif val in ('n', 'no', 'f', 'false', 'off', '0'):
return False
else:
return 'infer'
def _convert_int(val):
try:
return int(val)
except:
return None
def _fix_cluster_args(args) -> dict:
args['with_cuda'] = _str_to_bool(args['with_cuda'])
args['with_dask'] = _str_to_bool(args['with_dask'])
args['n_workers'] = _convert_int(args['n_workers'])
return args
def preproc():
args = _parse_preprocessing_args()
args = _fix_cluster_args(args)
df_agg = process.aggregate_gpfs_dataset()
return df_agg