Newer
Older
Matthew K Defenderfer
committed
from pathlib import Path
import pandas as pd
Matthew K Defenderfer
committed
from ..compute import start_backend
Matthew K Defenderfer
committed
from .utils import extract_run_date_from_filename
from ..db.utils import df_to_sql, create_db
Matthew K Defenderfer
committed
from .factory import get_aggregator
from typing import Literal, List
Matthew K Defenderfer
committed
from typeguard import typechecked
__all__ = ['aggregate_gpfs_dataset','calculate_churn']
Matthew K Defenderfer
committed
def _check_dataset_path(dataset_path) -> Path:
dataset_path = as_path(dataset_path)
Matthew K Defenderfer
committed
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
if dataset_path.is_file():
print(f"INFO: Found 1 file ({dataset_path})")
elif dataset_path.is_dir():
n_files = len(list(dataset_path.glob('*.parquet')))
print(f"INFO: {n_files} parquet files found in dataset")
if not dataset_path.exists():
raise FileNotFoundError(f"{dataset_path} does not exist. Please check your input")
return dataset_path
# ENH: The default naming scheme for the aggregated data needs to be refined. It should describe what aggregations were
# performed such as including tld and cutoff date if both were specified
def _check_report_paths(
report_dir: str | Path | None,
report_name: str | Path | None,
parent_path: Path
) -> Path:
if report_dir is None:
report_dir = parent_path.joinpath('reports')
elif not isinstance(report_dir,Path):
report_dir = Path(report_dir)
report_dir.mkdir(exist_ok=True,mode=0o2770)
if report_name is None:
report_name = 'tld_agg.parquet'
report_path = report_dir.joinpath(report_name)
return report_path
def _check_timedelta_values(vals,unit):
if (vals is None) != (unit is None):
raise ValueError("Must specify either both or neither of delta_vals and delta_unit")
pass
# ENH: Refactor this, it should not automatically create the compute backend, that should be passed to it. Creating the
# backend is beyond the scope of the process module and this function specifically. Can wrap this with the backend
# creation in a CLI command later for convenience, but that shouldn't be baked in
# ENH: Make the grouping more generic as opposed to only working for time delta ranges. Splitting files by size and
# kballoc is also important in some circumstances
Matthew K Defenderfer
committed
@typechecked
def aggregate_gpfs_dataset(
dataset_path: str | Path,
acq_date: pd.Timestamp | None = None,
delta_vals: int | List[int] | None = None,
Matthew K Defenderfer
committed
delta_unit: Literal['D','W','M','Y'] | None = None,
time_val: Literal['access','modify','create'] = 'access',
report_dir: str | Path | None = None,
report_name: str | Path | None = None,
n_workers: int | None = None,
with_cuda: Literal['infer'] | bool = 'infer',
with_dask: Literal['infer'] | bool = 'infer',
**kwargs
) -> None:
# Input checking
dataset_path = _check_dataset_path(dataset_path)
if dataset_path.is_file():
parent_path = dataset_path.parent.parent
else:
parent_path = dataset_path.parent
report_path = _check_report_paths(report_dir,report_name,parent_path)
_check_timedelta_values(delta_vals,delta_unit)
if acq_date is None:
acq_date = extract_run_date_from_filename(dataset_path)
Matthew K Defenderfer
committed
manager,backend = start_backend(
dataset_path=dataset_path,
with_cuda=with_cuda,
with_dask=with_dask,
n_workers=n_workers,
**kwargs
)
try:
aggregator = get_aggregator(backend)
df = aggregator.read_parquet(dataset_path)
grps = ['tld']
if delta_vals is not None:
cutoffs = aggregator.create_timedelta_cutoffs(delta_vals,delta_unit,acq_date)
Matthew K Defenderfer
committed
labels = aggregator.create_timedelta_labels(delta_vals,delta_unit)
df['dt_grp'] = aggregator.cut_dt(df[time_val],cutoffs,labels)
grps.append('dt_grp')
else:
cutoffs, labels = [None,None]
df_agg = aggregator.aggregate(df, col = ['size'], grps = grps, funcs = ['sum','count'])
df_agg = df_agg.rename(columns={'sum':'bytes','count':'file_count'})
df_agg.to_parquet(report_path)
finally:
if manager is not None:
manager.shutdown()
@typechecked
def calculate_churn(
hive_path: str | Path,
tld: str,
acq_dates: List[ pd.Timestamp | np.datetime64 | str ],
with_cuda: Literal['infer'] | bool = 'infer',
write_db: bool | Path | str = True,
**kwargs
) -> pd.DataFrame:
## Input checking
hive_path = as_path(hive_path)
dataset_path = hive_path.joinpath(f"tld={tld}")
acq_dirs = [np.datetime64(d.name.removeprefix('acq=')) for d in dataset_path.glob("acq=*")]
# Conversion of datetimes to np.datetime64 provides compatibility for either cudf or pandas dataframe backends.
# At the same time, remove any datetimes for which the given tld does not have data.
acq_dates = [np.datetime64(d) for d in acq_dates if np.datetime64(d) in acq_dirs]
acq_dates.sort()
if len(acq_dates) <= 1:
raise ValueError(f"Fewer than two given policy acquisition dates contained data for {tld} in {hive_path}.")
Matthew K Defenderfer
committed
manager,backend = start_backend(
dataset_path=dataset_path,
with_cuda=with_cuda,
with_dask=False,
**kwargs
)
try:
churn_l = []
aggregator = get_aggregator(backend)
if backend == 'cudf':
pool_size = kwargs.pop('pool_size',70*(1024**3)) # 70 GiB as default
aggregator.create_memory_pool(pool_size,**kwargs)
df_init = (
aggregator
.read_parquet(
dataset_path.joinpath(f"acq={np.datetime_as_string(acq_dates[0],'D')}"),
columns=['size','modify','access']
)
)
df_init['acq'] = acq_dates[0]
for i in range(1,len(acq_dates)):
df_target = (
aggregator
.read_parquet(
dataset_path.joinpath(f"acq={np.datetime_as_string(acq_dates[i],'D')}"),
columns=['modify','size','access']
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
)
)
df_target['acq'] = acq_dates[i]
churn = (
aggregator
.calculate_churn(df_init,df_target)
.to_frame()
.T
.assign(
log_dt = df_target['acq'].iloc[0],
prior_log_dt = df_init['acq'].iloc[0],
tld = tld)
)
churn_l.append(churn)
# This delete pattern paired with the loop creates a type of rotating list where each dataframe, aside from
# the initial and final, is processed as the target and the source for which files exist at a given time.
# The target is then referred to as the source as we move through the time series. Each source is removed
# from memory. This both limits the amount of memory used to only two datasets at a time while also
# only reading each dataset once.
del df_init
df_init = df_target
del df_target
churn_df = pd.concat(churn_l).fillna(0).reset_index(drop=True)
if isinstance(write_db,(str,Path)):
db = as_path(write_db)
db.parent.mkdir(exist_ok=True,parents=True)
df_to_sql(churn_df,db,'churn')
elif write_db is True:
db_name = hive_path.name
db_path = hive_path.parent.joinpath('db')
db_path.mkdir(parents=True,exist_ok=True)
db = db_path.joinpath(f"{db_name}.db")
df_to_sql(churn_df,db,'churn')
return churn_df
finally:
if manager is not None:
manager.shutdown()