Skip to content
Snippets Groups Projects
factory.py 9.78 KiB
Newer Older
import cupy
import numpy as np
from ..db.utils import CHURN_TBL_COLS

from typeguard import typechecked

__all__ = ['get_aggregator']

# ENH: In the future, probably need to wrap the manager and backend type into a class. That class would contain the 
# read_parquet function instead of putting it in the aggregation classes. This would separate everything out more 
# sensibly

@typechecked
class Aggregator:
    def __init__(self):
        self.backend = None
        self.cuda = None
    
    def _cut(
        self,
        ser: pd.Series | cudf.Series,
        bins: List[int | pd.Timestamp],
        labels: List[str] | None = None,
        **kwargs
    ) -> pd.Series | cudf.Series:
        right = kwargs.pop('right',False)
               
        if self.cuda:
            func = cudf.cut
            ser = ser.astype('int64')
        else:
            func = pd.cut

        grps = func(ser,bins=bins,labels=labels,right=right,**kwargs)
        if labels is not None:
            grps = grps.cat.reorder_categories(labels[::-1], ordered = True)
        return grps

    def create_timedelta_cutoffs(
        self,
        run_date: pd.Timestamp | np.datetime64
        deltas = pd.Series([as_timedelta(c,delta_unit) for c in delta_vals])
        cutoffs = pd.to_datetime(run_date - deltas)
        cutoffs = (
            pd.concat(
                [
                    cutoffs,
                    pd.Series([pd.to_datetime('today'),pd.to_datetime('1970-01-01')])
                ]
            )
            .sort_values()
        )

        return cutoffs.astype('int64').to_list() if self.cuda else cutoffs.to_list()
    
    def create_timedelta_labels(
        self,
        delta_vals.sort(reverse=True)
        deltas = [f'{d}{delta_unit}' for d in delta_vals]
        labels = [f'>{deltas[0]}'] + [f'{deltas[i+1]}-{deltas[i]}' for i in range(len(deltas)-1)] + [f'<{deltas[-1]}']
        return labels

class PandasAggregator(Aggregator):
    def __init__(self):
        self.backend = 'pandas'
        self.cuda = False

    def read_parquet(self,dataset_path,**kwargs) -> pd.DataFrame:
        return pd.read_parquet(dataset_path,**kwargs)
        
    def cut_dt(self,series,*args,**kwargs) -> pd.Series:
        return self._cut(series,*args,**kwargs)

    def aggregate(
        self,
        df: pd.DataFrame,
        col: str | List[str],
        grps: str | List[str],
        funcs: str | List[str]
    ) -> pd.DataFrame:
        
        df_agg = (
            df.groupby(grps,observed = True)[col]
            .agg(funcs)
            .sort_index(level=[0,1])
            .reset_index()
        )
        return df_agg

    def calculate_churn(self,df1,df2) -> pd.Series:
        dfm = pd.merge(df1,df2,how='outer',left_index=True,right_index=True)
        
        conditions = [
            dfm['access_x'].isna(),
            dfm['access_y'].isna(),
            (dfm['modify_x'] != dfm['modify_y']).fillna(False),
            (dfm['access_x'] != dfm['access_y']).fillna(False)
        ]

        choices = np.arange(0,4)

        mapping = {
            0:'created',
            1:'deleted',
            2:'modified',
            3:'accessed',
            -1:None
        }

        dfm['type'] = np.select(conditions,choices,default=-1)
        dfm['type'] = dfm['type'].map(mapping)

        if dfm['type'].isna().all():
            ser = pd.Series(
                data = 0,
                index = CHURN_TBL_COLS
            )
        else:
            dfm = dfm.loc[dfm['type'].notna()]

            modified = dfm.loc[dfm['type'] == 'modified']
            modified_bytes_net = modified['size_y'].sum() - modified['size_x'].sum()

            # Instead of writing logic to aggregate across initial size for deleted files and final size for all other 
            # files, we can essentially condense size across both columns into a new column. Size of deleted files will 
            # come from size_x while all other files will come from size_y.
            dfm['size'] = dfm['size_y'].where(dfm['size_y'].notna(),dfm['size_x'])

            agg_df = dfm.groupby('type',observed=True)['size'].agg(['sum','count'])
            agg_df.columns = ['bytes','files']
            agg_df = agg_df.melt(value_vars=['bytes','files'],ignore_index=False).set_index('variable',append=True)

            agg_df.index = agg_df.index.map('_'.join).str.removesuffix('_files')
            agg_df.loc['modified_bytes_net'] = modified_bytes_net

            for c in CHURN_TBL_COLS:
                if c not in agg_df.index:
                    agg_df.loc[c] = 0
    
class CUDFAggregator(Aggregator):
    def __init__(self):
        self.backend = 'cudf'
        self.cuda = True

    def read_parquet(self,dataset_path,**kwargs) -> cudf.DataFrame:
        return cudf.read_parquet(dataset_path,**kwargs)
    
    def cut_dt(self,series,*args,**kwargs) -> pd.Series:
        return self._cut(series,*args,**kwargs)
    
    def aggregate(
        self,
        df: cudf.DataFrame,
        col: str | List[str],
        grps: str | List[str],
        funcs: str | List[str]
    ) -> pd.DataFrame:
        df_agg = (
            df.groupby(grps,observed = True)[col]
            .agg(funcs)
            .sort_index(level=[0,1])
            .to_pandas()
            .reset_index()
        )
        return df_agg
    
    def create_memory_pool(self,size,**kwargs):
        pool_allocator = kwargs.pop('pool_allocator',True)
        managed_memory = kwargs.pop('managed_memory',True)
        
        rmm.reinitialize(
            pool_allocator=pool_allocator,
            managed_memory=managed_memory,
            initial_pool_size=size,
            **kwargs
        )
    
    def calculate_churn(self,df1,df2) -> pd.Series:
        dfm = cudf.merge(df1,df2,how='outer',left_index=True,right_index=True)
        
        conditions = [
            dfm['access_x'].isna(),
            dfm['access_y'].isna(),
            (dfm['modify_x'] != dfm['modify_y']).fillna(False),
            (dfm['access_x'] != dfm['access_y']).fillna(False)
        ]

        choices = cupy.arange(0,4)

        mapping = {
            0:'created',
            1:'deleted',
            2:'modified',
            3:'accessed',
            -1:None
        }

        dfm['type'] = cupy.select(conditions,choices,default=-1)
        dfm['type'] = dfm['type'].map(mapping)

        if dfm['type'].isna().all():
            ser = pd.Series(
                data = 0,
                index = CHURN_TBL_COLS
            )
            return ser
        else:
            dfm = dfm.loc[dfm['type'].notna()]

            modified = dfm.loc[dfm['type'] == 'modified']
            modified_bytes_net = modified['size_y'].sum() - modified['size_x'].sum()

            dfm['size'] = dfm['size_y'].where(dfm['size_y'].notna(),dfm['size_x'])

            agg_df = dfm.groupby('type',observed=True)['size'].agg(['sum','count'])
            agg_df.columns = ['bytes','files']
            agg_df = agg_df.to_pandas().melt(value_vars=['bytes','files'],ignore_index=False).set_index('variable',append=True)

            agg_df.index = agg_df.index.map('_'.join).str.removesuffix('_files')
            agg_df.loc['modified_bytes_net'] = modified_bytes_net

            for c in CHURN_TBL_COLS:
                if c not in agg_df.index:
                    agg_df.loc[c] = 0

            return agg_df['value']


class DaskAggregator(Aggregator):
    def __init__(self):
        self.backend = 'dask'
        self.cuda = False

    def cut_dt(self,series,*args,**kwargs) -> cudf.Series:
        return series.map_partitions(self._cut,*args,**kwargs)

    def aggregate(
        self,
        df: dd.DataFrame,
        col: str | List[str],
        grps: str | List[str],
        funcs: str | List[str]
    ) -> pd.DataFrame:
        df_agg = (
            df.groupby(grps,observed = True)[col]
            .agg(funcs)
            .compute()
            .sort_index(level=[0,1])
            .reset_index()
        )
        return df_agg
    
    def read_parquet(self,dataset_path,**kwargs) -> dd.DataFrame:
        split_row_groups = kwargs.pop('split_row_groups',False)
        return dd.read_parquet(dataset_path,split_row_groups=split_row_groups,**kwargs)


class DaskCUDFAggregator(Aggregator):
    def __init__(self):
        self.backend = 'dask_cuda'
        self.cuda = True

    def cut_dt(self,series,*args,**kwargs) -> dask_cudf.Series:
        return series.map_partitions(self._cut,*args,**kwargs)
    
    def aggregate(
        self,
        df: dask_cudf.DataFrame,
        col: str | List[str],
        grps: str | List[str],
        funcs: str | List[str]
    ) -> pd.DataFrame:
        df_agg = (
            df.groupby(grps,observed = True)[col]
            .agg(funcs)
            .compute()
            .sort_index(level=[0,1])
            .to_pandas()
            .reset_index()
        )
        return df_agg
    
    def read_parquet(self,dataset_path,**kwargs) -> dd.DataFrame:
        split_row_groups = kwargs.pop('split_row_groups',False)
        return dd.read_parquet(dataset_path,split_row_groups=split_row_groups,**kwargs)
    


def get_aggregator(backend) -> PandasAggregator | CUDFAggregator | DaskAggregator | DaskCUDFAggregator:
    match backend:
        case 'pandas':
            return PandasAggregator()
        case 'cudf':
            return CUDFAggregator()
        case 'dask':
            return DaskAggregator()
        case 'dask_cuda':
            return DaskCUDFAggregator()
        case _:
            raise ValueError(f"Unsupported backend: {backend}")