Skip to content
Snippets Groups Projects
process.py 7.59 KiB
Newer Older
import numpy as np
from ..utils import as_path
from ..db.utils import df_to_sql, create_db
__all__ = ['aggregate_gpfs_dataset','calculate_churn']
    dataset_path = as_path(dataset_path)

    if dataset_path.is_file():
        print(f"INFO: Found 1 file ({dataset_path})")
    elif dataset_path.is_dir():
        n_files = len(list(dataset_path.glob('*.parquet')))
        print(f"INFO: {n_files} parquet files found in dataset")
        
    if not dataset_path.exists():
        raise FileNotFoundError(f"{dataset_path} does not exist. Please check your input")
    
    return dataset_path

# ENH: The default naming scheme for the aggregated data needs to be refined. It should describe what aggregations were 
# performed such as including tld and cutoff date if both were specified
def _check_report_paths(
        report_dir: str | Path | None,
        report_name: str | Path | None,
        parent_path: Path
        ) -> Path:
    if report_dir is None:
        report_dir = parent_path.joinpath('reports')
    elif not isinstance(report_dir,Path):
        report_dir = Path(report_dir)
    
    report_dir.mkdir(exist_ok=True,mode=0o2770)
    
    if report_name is None:
        report_name = 'tld_agg.parquet'
    
    report_path = report_dir.joinpath(report_name)
    return report_path

def _check_timedelta_values(vals,unit):
    if (vals is None) != (unit is None):
        raise ValueError("Must specify either both or neither of delta_vals and delta_unit")
    pass

# ENH: Refactor this, it should not automatically create the compute backend, that should be passed to it. Creating the 
# backend is beyond the scope of the process module and this function specifically. Can wrap this with the backend 
# creation in a CLI command later for convenience, but that shouldn't be baked in

# ENH: Make the grouping more generic as opposed to only working for time delta ranges. Splitting files by size and 
# kballoc is also important in some circumstances

@typechecked
def aggregate_gpfs_dataset(
    dataset_path: str | Path,
    acq_date: pd.Timestamp | None = None,
    delta_vals: int | List[int] | None = None,
    delta_unit: Literal['D','W','M','Y'] | None = None,
    time_val: Literal['access','modify','create'] = 'access',
    report_dir: str | Path | None = None,
    report_name: str | Path | None = None,
    n_workers: int | None = None,
    with_cuda: Literal['infer'] | bool = 'infer',
    with_dask: Literal['infer'] | bool = 'infer',
    **kwargs
) -> None:
    # Input checking
    dataset_path = _check_dataset_path(dataset_path)
    if dataset_path.is_file():
        parent_path = dataset_path.parent.parent
    else:
        parent_path = dataset_path.parent

    report_path = _check_report_paths(report_dir,report_name,parent_path)
    _check_timedelta_values(delta_vals,delta_unit)

    if acq_date is None:
        acq_date = extract_run_date_from_filename(dataset_path)

    manager,backend = start_backend(
        dataset_path=dataset_path,
        with_cuda=with_cuda,
        with_dask=with_dask,
        n_workers=n_workers,
        **kwargs
        )
    
    try:
        aggregator = get_aggregator(backend)
        df = aggregator.read_parquet(dataset_path)
        grps = ['tld']
        if delta_vals is not None:
            cutoffs = aggregator.create_timedelta_cutoffs(delta_vals,delta_unit,acq_date)
            labels  = aggregator.create_timedelta_labels(delta_vals,delta_unit)
            df['dt_grp'] = aggregator.cut_dt(df[time_val],cutoffs,labels)
            grps.append('dt_grp')
        else:
            cutoffs, labels = [None,None]
        
        df_agg = aggregator.aggregate(df, col = ['size'], grps = grps, funcs = ['sum','count'])
        df_agg = df_agg.rename(columns={'sum':'bytes','count':'file_count'})
        df_agg.to_parquet(report_path)
    finally:
        if manager is not None:
            manager.shutdown()
    
@typechecked
def calculate_churn(
    hive_path: str | Path,
    tld: str,
    acq_dates: List[ pd.Timestamp | np.datetime64 | str ],
    with_cuda: Literal['infer'] | bool = 'infer',
    write_db: bool | Path | str = True,
    **kwargs
) -> pd.DataFrame:
    
    ## Input checking
    hive_path = as_path(hive_path)
    dataset_path = hive_path.joinpath(f"tld={tld}")
    acq_dirs = [np.datetime64(d.name.removeprefix('acq=')) for d in dataset_path.glob("acq=*")]

    # Conversion of datetimes to np.datetime64 provides compatibility for either cudf or pandas dataframe backends.
    # At the same time, remove any datetimes for which the given tld does not have data.
    acq_dates = [np.datetime64(d) for d in acq_dates if np.datetime64(d) in acq_dirs]
    acq_dates.sort()

    if len(acq_dates) <= 1:
        raise ValueError(f"Fewer than two given policy acquisition dates contained data for {tld} in {hive_path}.")
    manager,backend = start_backend(
        dataset_path=dataset_path,
        with_cuda=with_cuda,
        with_dask=False,
        **kwargs
    )

    try:
        churn_l = []
        aggregator = get_aggregator(backend)
        
        if backend == 'cudf':
            pool_size = kwargs.pop('pool_size',70*(1024**3)) # 70 GiB as default
            aggregator.create_memory_pool(pool_size,**kwargs)
        
        df_init = (
            aggregator
            .read_parquet(
                dataset_path.joinpath(f"acq={np.datetime_as_string(acq_dates[0],'D')}"),
                columns=['size','modify','access']
            )
        )
        df_init['acq'] = acq_dates[0]
        
        for i in range(1,len(acq_dates)):
            df_target = (
                aggregator
                .read_parquet(
                    dataset_path.joinpath(f"acq={np.datetime_as_string(acq_dates[i],'D')}"),
                    columns=['modify','size','access']
                )
            )
            df_target['acq'] = acq_dates[i]
            churn = (
                aggregator
                .calculate_churn(df_init,df_target)
                .to_frame()
                .T
                .assign(
                    log_dt = df_target['acq'].iloc[0],
                    prior_log_dt = df_init['acq'].iloc[0],
                    tld = tld)
            )
            churn_l.append(churn)

            # This delete pattern paired with the loop creates a type of rotating list where each dataframe, aside from 
            # the initial and final, is processed as the target and the source for which files exist at a given time. 
            # The target is then referred to as the source as we move through the time series. Each source is removed 
            # from memory. This both limits the amount of memory used to only two datasets at a time while also 
            # only reading each dataset once.
            del df_init
            df_init = df_target
            del df_target
            
        churn_df = pd.concat(churn_l).fillna(0).reset_index(drop=True)

        if isinstance(write_db,(str,Path)):
            db = as_path(write_db)
            db.parent.mkdir(exist_ok=True,parents=True)
            df_to_sql(churn_df,db,'churn')
        elif write_db is True:
            db_name = hive_path.name
            db_path = hive_path.parent.joinpath('db')
            db_path.mkdir(parents=True,exist_ok=True)
            db = db_path.joinpath(f"{db_name}.db")
            df_to_sql(churn_df,db,'churn')
        
        return churn_df
    finally:
        if manager is not None:
            manager.shutdown()