Skip to content
Snippets Groups Projects
factory.py 9.25 KiB
Newer Older
import cupy
import numpy as np
from typeguard import typechecked

__all__ = ['get_aggregator']

# ENH: In the future, probably need to wrap the manager and backend type into a class. That class would contain the 
# read_parquet function instead of putting it in the aggregation classes. This would separate everything out more 
# sensibly

@typechecked
class Aggregator:
    def __init__(self):
        self.backend = None
        self.cuda = None
    
    def _cut(
        self,
        ser: pd.Series | cudf.Series,
        bins: List[int | pd.Timestamp],
        labels: List[str] | None = None,
        **kwargs
    ) -> pd.Series | cudf.Series:
        right = kwargs.pop('right',False)
               
        if self.cuda:
            func = cudf.cut
            ser = ser.astype('int64')
        else:
            func = pd.cut

        grps = func(ser,bins=bins,labels=labels,right=right,**kwargs)
        if labels is not None:
            grps = grps.cat.reorder_categories(labels[::-1], ordered = True)
        return grps

    def create_timedelta_cutoffs(
        self,
        delta_unit: Literal['D','W','M','Y'],
        run_date: pd.Timestamp
        deltas = pd.Series([as_timedelta(c,delta_unit) for c in delta_vals])
        cutoffs = pd.to_datetime(run_date - deltas)
        cutoffs = (
            pd.concat(
                [
                    cutoffs,
                    pd.Series([pd.to_datetime('today'),pd.to_datetime('1970-01-01')])
                ]
            )
            .sort_values()
        )

        return cutoffs.astype('int64').to_list() if self.cuda else cutoffs.to_list()
    
    def create_timedelta_labels(
        self,
        delta_vals.sort(reverse=True)
        deltas = [f'{d}{delta_unit}' for d in delta_vals]
        labels = [f'>{deltas[0]}'] + [f'{deltas[i+1]}-{deltas[i]}' for i in range(len(deltas)-1)] + [f'<{deltas[-1]}']
        return labels

class PandasAggregator(Aggregator):
    def __init__(self):
        self.backend = 'pandas'
        self.cuda = False

    def read_parquet(self,dataset_path,**kwargs) -> pd.DataFrame:
        return pd.read_parquet(dataset_path,**kwargs)
        
    def cut_dt(self,series,*args,**kwargs) -> pd.Series:
        return self._cut(series,*args,**kwargs)

    def aggregate(
        self,
        df: pd.DataFrame,
        col: str | List[str],
        grps: str | List[str],
        funcs: str | List[str]
    ) -> pd.DataFrame:
        
        df_agg = (
            df.groupby(grps,observed = True)[col]
            .agg(funcs)
            .sort_index(level=[0,1])
            .reset_index()
        )
        return df_agg

    def calculate_churn(df1,df2):
        df = pd.concat([df1,df2])
        df['acq'] = cudf.to_datetime(df['acq'].astype('str'))
        diff = (
            df
            .reset_index()
            .drop_duplicates(subset=[c for c in df.columns if c != 'acq'],keep=False)
            .set_index('path')
        )
        agg = diff.groupby(level=0)['acq'].agg(['count','max'])

        dates = diff['acq'].unique()

        # When using duplicates to find if files change, there are 4 possible options for any file:
        # 1. 2 records with different modification times: 'modified'
        # 2. 1 record with `acq` from the older GPFS log: 'deleted'
        # 3. 1 record with `acq` from the newer GPFS log: 'created'
        # 4. 0 records: 'unchanged' and ignored
        conditions = [agg['count'] == 2, agg['max'] == dates.min(), agg['max'] == dates.max()]
        choices = ['modified','deleted','created']

        agg['type'] = np.select(conditions,choices,default=np.array(np.nan, dtype='object'))
        agg['type'] = agg['type'].astype('category').cat.set_categories([choices])

        if agg.empty:
            ser = pd.Series(data = [0,0,0], name='count')

            # The index needs to be specified in exactly this manner to match how type is being converted to a 
            # categorical above. This essentially empty series needs to have the exact same structure as a non-empty 
            # series
            ser.index = pd.CategoricalIndex(
                data=['created', 'deleted', 'modified'],
                categories=['modified','deleted','created'],
                ordered=False,
                name='type')
            return ser

        return agg['type'].value_counts()

    
class CUDFAggregator(Aggregator):
    def __init__(self):
        self.backend = 'cudf'
        self.cuda = True

    def read_parquet(self,dataset_path,**kwargs) -> cudf.DataFrame:
        return cudf.read_parquet(dataset_path,**kwargs)
    
    def cut_dt(self,series,*args,**kwargs) -> pd.Series:
        return self._cut(series,*args,**kwargs)
    
    def aggregate(
        self,
        df: cudf.DataFrame,
        col: str | List[str],
        grps: str | List[str],
        funcs: str | List[str]
    ) -> pd.DataFrame:
        df_agg = (
            df.groupby(grps,observed = True)[col]
            .agg(funcs)
            .sort_index(level=[0,1])
            .to_pandas()
            .reset_index()
        )
        return df_agg
    
    def create_memory_pool(self,size,**kwargs):
        pool_allocator = kwargs.pop('pool_allocator',True)
        managed_memory = kwargs.pop('managed_memory',True)
        
        rmm.reinitialize(
            pool_allocator=pool_allocator,
            managed_memory=managed_memory,
            initial_pool_size=size,
            **kwargs
        )

    def calculate_churn(self,df1,df2):
        df = cudf.concat([df1,df2])
        #df['acq'] = cudf.to_datetime(df['acq'].astype('str'))
        diff = (
            df
            .reset_index()
            .drop_duplicates(subset=[c for c in df.columns if c != 'acq'],keep=False)
            .set_index('path')
        )
        agg = diff.groupby(level=0)['acq'].agg(['count','max'])

        dates = np.unique(df['acq'].to_numpy())
        conditions = [agg['count'] == 2, agg['max'] == dates.min(), agg['max'] == dates.max()]
        choices = cupy.array([0,1,2])

        agg['type'] = cupy.select(conditions,choices)
        agg['type'] = agg['type'].map({0:'modified',1:'deleted',2:'created'})
        agg['type'] = agg['type'].astype('category').cat.set_categories(['modified','deleted','created'])

        if agg.empty:
            ser = cudf.Series(data = [0,0,0], name='count')

            # The index needs to be specified in exactly this manner to match how type is being converted to a 
            # categorical above. This essentially empty series needs to have the exact same structure as a non-empty 
            # series
            ser.index = cudf.CategoricalIndex(
                data=['created', 'deleted', 'modified'],
                categories=['modified','deleted','created'],
                ordered=False)
            return ser
        else:
            counts = agg['type'].value_counts()
            counts.index.name = None
            return counts


class DaskAggregator(Aggregator):
    def __init__(self):
        self.backend = 'dask'
        self.cuda = False

    def cut_dt(self,series,*args,**kwargs) -> cudf.Series:
        return series.map_partitions(self._cut,*args,**kwargs)

    def aggregate(
        self,
        df: dd.DataFrame,
        col: str | List[str],
        grps: str | List[str],
        funcs: str | List[str]
    ) -> pd.DataFrame:
        df_agg = (
            df.groupby(grps,observed = True)[col]
            .agg(funcs)
            .compute()
            .sort_index(level=[0,1])
            .reset_index()
        )
        return df_agg
    
    def read_parquet(self,dataset_path,**kwargs) -> dd.DataFrame:
        split_row_groups = kwargs.pop('split_row_groups',False)
        return dd.read_parquet(dataset_path,split_row_groups=split_row_groups,**kwargs)


class DaskCUDFAggregator(Aggregator):
    def __init__(self):
        self.backend = 'dask_cuda'
        self.cuda = True

    def cut_dt(self,series,*args,**kwargs) -> dask_cudf.Series:
        return series.map_partitions(self._cut,*args,**kwargs)
    
    def aggregate(
        self,
        df: dask_cudf.DataFrame,
        col: str | List[str],
        grps: str | List[str],
        funcs: str | List[str]
    ) -> pd.DataFrame:
        df_agg = (
            df.groupby(grps,observed = True)[col]
            .agg(funcs)
            .compute()
            .sort_index(level=[0,1])
            .to_pandas()
            .reset_index()
        )
        return df_agg
    
    def read_parquet(self,dataset_path,**kwargs) -> dd.DataFrame:
        split_row_groups = kwargs.pop('split_row_groups',False)
        return dd.read_parquet(dataset_path,split_row_groups=split_row_groups,**kwargs)
    


def get_aggregator(backend) -> PandasAggregator | CUDFAggregator | DaskAggregator | DaskCUDFAggregator:
    match backend:
        case 'pandas':
            return PandasAggregator()
        case 'cudf':
            return CUDFAggregator()
        case 'dask':
            return DaskAggregator()
        case 'dask_cuda':
            return DaskCUDFAggregator()
        case _:
            raise ValueError(f"Unsupported backend: {backend}")