import cudf import cupy import pandas as pd import numpy as np import dask.dataframe as dd import dask_cudf import rmm from .utils import as_timedelta from ..db.utils import CHURN_TBL_COLS from typing import Literal, List from typeguard import typechecked __all__ = ['get_aggregator'] # ENH: In the future, probably need to wrap the manager and backend type into a class. That class would contain the # read_parquet function instead of putting it in the aggregation classes. This would separate everything out more # sensibly @typechecked class Aggregator: def __init__(self): self.backend = None self.cuda = None def _cut( self, ser: pd.Series | cudf.Series, bins: List[int | pd.Timestamp], labels: List[str] | None = None, **kwargs ) -> pd.Series | cudf.Series: right = kwargs.pop('right',False) if self.cuda: func = cudf.cut ser = ser.astype('int64') else: func = pd.cut grps = func(ser,bins=bins,labels=labels,right=right,**kwargs) if labels is not None: grps = grps.cat.reorder_categories(labels[::-1], ordered = True) return grps def create_timedelta_cutoffs( self, delta_vals: int | List[int], delta_unit: Literal['D','W','M','Y'], run_date: pd.Timestamp ) -> List[int | pd.Timestamp]: deltas = pd.Series([as_timedelta(c,delta_unit) for c in delta_vals]) cutoffs = pd.to_datetime(run_date - deltas) cutoffs = ( pd.concat( [ cutoffs, pd.Series([pd.to_datetime('today'),pd.to_datetime('1970-01-01')]) ] ) .sort_values() ) return cutoffs.astype('int64').to_list() if self.cuda else cutoffs.to_list() def create_timedelta_labels( self, delta_vals: List[int], delta_unit: Literal['D','W','M','Y'], ) -> List[str]: delta_vals.sort(reverse=True) deltas = [f'{d}{delta_unit}' for d in delta_vals] labels = [f'>{deltas[0]}'] + [f'{deltas[i+1]}-{deltas[i]}' for i in range(len(deltas)-1)] + [f'<{deltas[-1]}'] return labels class PandasAggregator(Aggregator): def __init__(self): self.backend = 'pandas' self.cuda = False def read_parquet(self,dataset_path,**kwargs) -> pd.DataFrame: return pd.read_parquet(dataset_path,**kwargs) def cut_dt(self,series,*args,**kwargs) -> pd.Series: return self._cut(series,*args,**kwargs) def aggregate( self, df: pd.DataFrame, col: str | List[str], grps: str | List[str], funcs: str | List[str] ) -> pd.DataFrame: df_agg = ( df.groupby(grps,observed = True)[col] .agg(funcs) .sort_index(level=[0,1]) .reset_index() ) return df_agg def calculate_churn(self,df1,df2) -> pd.Series: dfm = pd.merge(df1,df2,how='outer',left_index=True,right_index=True) conditions = [ dfm['access_x'].isna(), dfm['access_y'].isna(), (dfm['modify_x'] != dfm['modify_y']).fillna(False), (dfm['access_x'] != dfm['access_y']).fillna(False) ] choices = np.arange(0,4) mapping = { 0:'created', 1:'deleted', 2:'modified', 3:'accessed', -1:None } dfm['type'] = np.select(conditions,choices,default=-1) dfm['type'] = dfm['type'].map(mapping) if dfm['type'].isna().all(): ser = pd.Series( data = 0, index = CHURN_TBL_COLS ) return ser else: dfm = dfm.loc[dfm['type'].notna()] modified = dfm.loc[dfm['type'] == 'modified'] modified_bytes_net = modified['size_y'].sum() - modified['size_x'].sum() # Instead of writing logic to aggregate across initial size for deleted files and final size for all other # files, we can essentially condense size across both columns into a new column. Size of deleted files will # come from size_x while all other files will come from size_y. dfm['size'] = dfm['size_y'].where(dfm['size_y'].notna(),dfm['size_x']) agg_df = dfm.groupby('type',observed=True)['size'].agg(['sum','count']) agg_df.columns = ['bytes','files'] agg_df = agg_df.melt(value_vars=['bytes','files'],ignore_index=False).set_index('variable',append=True) agg_df.index = agg_df.index.map('_'.join).str.removesuffix('_files') agg_df.loc['modified_bytes_net'] = modified_bytes_net for c in CHURN_TBL_COLS: if c not in agg_df.index: agg_df.loc[c] = 0 return agg_df['value'] class CUDFAggregator(Aggregator): def __init__(self): self.backend = 'cudf' self.cuda = True def read_parquet(self,dataset_path,**kwargs) -> cudf.DataFrame: return cudf.read_parquet(dataset_path,**kwargs) def cut_dt(self,series,*args,**kwargs) -> pd.Series: return self._cut(series,*args,**kwargs) def aggregate( self, df: cudf.DataFrame, col: str | List[str], grps: str | List[str], funcs: str | List[str] ) -> pd.DataFrame: df_agg = ( df.groupby(grps,observed = True)[col] .agg(funcs) .sort_index(level=[0,1]) .to_pandas() .reset_index() ) return df_agg def create_memory_pool(self,size,**kwargs): pool_allocator = kwargs.pop('pool_allocator',True) managed_memory = kwargs.pop('managed_memory',True) rmm.reinitialize( pool_allocator=pool_allocator, managed_memory=managed_memory, initial_pool_size=size, **kwargs ) def calculate_churn(self,df1,df2) -> pd.Series: dfm = cudf.merge(df1,df2,how='outer',left_index=True,right_index=True) conditions = [ dfm['access_x'].isna(), dfm['access_y'].isna(), (dfm['modify_x'] != dfm['modify_y']).fillna(False), (dfm['access_x'] != dfm['access_y']).fillna(False) ] choices = cupy.arange(0,4) mapping = { 0:'created', 1:'deleted', 2:'modified', 3:'accessed', -1:None } dfm['type'] = cupy.select(conditions,choices,default=-1) dfm['type'] = dfm['type'].map(mapping) if dfm['type'].isna().all(): ser = pd.Series( data = 0, index = CHURN_TBL_COLS ) return ser else: dfm = dfm.loc[dfm['type'].notna()] modified = dfm.loc[dfm['type'] == 'modified'] modified_bytes_net = modified['size_y'].sum() - modified['size_x'].sum() dfm['size'] = dfm['size_y'].where(dfm['size_y'].notna(),dfm['size_x']) agg_df = dfm.groupby('type',observed=True)['size'].agg(['sum','count']) agg_df.columns = ['bytes','files'] agg_df = agg_df.to_pandas().melt(value_vars=['bytes','files'],ignore_index=False).set_index('variable',append=True) agg_df.index = agg_df.index.map('_'.join).str.removesuffix('_files') agg_df.loc['modified_bytes_net'] = modified_bytes_net for c in CHURN_TBL_COLS: if c not in agg_df.index: agg_df.loc[c] = 0 return agg_df['value'] class DaskAggregator(Aggregator): def __init__(self): self.backend = 'dask' self.cuda = False def cut_dt(self,series,*args,**kwargs) -> cudf.Series: return series.map_partitions(self._cut,*args,**kwargs) def aggregate( self, df: dd.DataFrame, col: str | List[str], grps: str | List[str], funcs: str | List[str] ) -> pd.DataFrame: df_agg = ( df.groupby(grps,observed = True)[col] .agg(funcs) .compute() .sort_index(level=[0,1]) .reset_index() ) return df_agg def read_parquet(self,dataset_path,**kwargs) -> dd.DataFrame: split_row_groups = kwargs.pop('split_row_groups',False) return dd.read_parquet(dataset_path,split_row_groups=split_row_groups,**kwargs) class DaskCUDFAggregator(Aggregator): def __init__(self): self.backend = 'dask_cuda' self.cuda = True def cut_dt(self,series,*args,**kwargs) -> dask_cudf.Series: return series.map_partitions(self._cut,*args,**kwargs) def aggregate( self, df: dask_cudf.DataFrame, col: str | List[str], grps: str | List[str], funcs: str | List[str] ) -> pd.DataFrame: df_agg = ( df.groupby(grps,observed = True)[col] .agg(funcs) .compute() .sort_index(level=[0,1]) .to_pandas() .reset_index() ) return df_agg def read_parquet(self,dataset_path,**kwargs) -> dd.DataFrame: split_row_groups = kwargs.pop('split_row_groups',False) return dd.read_parquet(dataset_path,split_row_groups=split_row_groups,**kwargs) def get_aggregator(backend) -> PandasAggregator | CUDFAggregator | DaskAggregator | DaskCUDFAggregator: match backend: case 'pandas': return PandasAggregator() case 'cudf': return CUDFAggregator() case 'dask': return DaskAggregator() case 'dask_cuda': return DaskCUDFAggregator() case _: raise ValueError(f"Unsupported backend: {backend}")