Skip to content
Snippets Groups Projects
backend.py 7.7 KiB
Newer Older
from .backend_defs import backend_options
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, LocalCluster
from .utils import *
from ..utils import parse_scontrol
__all__ = ['start_backend','start_local_cluster']

# ENH: Add default parameters for cluster creation based on defined type and available resources. For instance, creating a LocalCluster should default to using all available CPUs and all available RAM.
class DaskClusterManager:
    def __init__(self,  cluster_type: LocalCluster | LocalCUDACluster=LocalCluster, **kwargs):
        """_summary_

        Parameters
        ----------
        cluster_type : LocalCluster | LocalCUDACluster, default = LocalCluster
            Sets the type of cluster to be created. Specifically designed to create either a CPU-only cluster (LocalCluster) or a GPU-enabled cluster (LocalCUDACluster). All specified kwargs will be passed to the cluster creator.
        """

        # Initialize the cluster (adjust this to the cluster type you are using)
        self.cluster = cluster_type(**kwargs)
        
        # Initialize the client
        self.client = Client(self.cluster)
    
    def scale(self, n_workers):
        """Scale the cluster to the specified number of workers."""
        self.cluster.scale(n_workers)

    def close(self):
        """Close the client and cluster."""
        print("INFO: Closing Dask client and cluster")
        self.client.close()
        self.cluster.close()

    def __getattr__(self, name):
        """
        Delegate attribute access to the client or cluster.

        This allows accessing methods and properties from both objects directly
        through this manager class.
        """
        if hasattr(self.client, name):
            return getattr(self.client, name)
        elif hasattr(self.cluster, name):
            return getattr(self.cluster, name)
        raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")

    def __repr__(self):
        return (f"<DaskClusterManager(cluster={self.cluster}, "
                f"client={self.client})>")

def start_local_cluster(with_cuda=False,**kwargs) -> DaskClusterManager:
    if with_cuda:
        cluster_type = LocalCUDACluster
        n_workers = kwargs.pop('n_workers',get_gpu_info()[0])
        local_directory = kwargs.pop('local_directory','/scratch/local')
        manager = DaskClusterManager(cluster_type=cluster_type, 
                                     n_workers=n_workers,
                                     local_directory=local_directory,
                                     **kwargs)
    else:
        cluster_type = LocalCluster
        n_workers = kwargs.pop('n_workers',parse_scontrol()[0])
        memory_limit = kwargs.pop('memory_limit',None)
        manager = DaskClusterManager(cluster_type=cluster_type, 
                                     n_workers = n_workers,
                                     memory_limit = memory_limit,
                                     **kwargs)
    return manager

def select_backend_type(with_cuda, with_dask) -> Literal['dask_cuda','cudf','dask','pandas']:
    if with_cuda and with_dask:
        backend = 'dask_cuda'
    elif with_cuda and not with_dask:
        backend = 'cudf'
    elif not with_cuda and with_dask:
        backend = 'dask'
    else:
        backend = 'pandas'
    return backend
    
def infer_cuda() -> bool:
    ngpus,_ = get_gpu_info()
    
    print(f"INFO: {ngpus} GPUs are available.")
    if ngpus > 0:
        print("INFO: Using CUDA compute")
    else:
        print("INFO: USING CPU compute")
    return ngpus > 0

def infer_dask(est_dataset_gb: str | Path, 
               ram: int | float = 0, 
               mem_frac: float = 0.9, 
               ngpus: int = 0, 
               vram_per_gpu: int | float = 0) -> bool:
    if ngpus > 1:
        print("INFO: Multi-GPU detected. Setting Dask CUDA as backend")
        with_dask = True
    elif ngpus == 1:
        with_dask = (vram_per_gpu * mem_frac) < est_dataset_gb

        if with_dask:
            print(f"INFO: Estimated dataset size ({round(est_dataset_gb,2)} GB) exceeds the GPU VRAM limit ({vram_per_gpu*mem_frac} GB) after deducting {round(vram_per_gpu*(1-mem_frac),2)} GB reserved for compute.")
            print(f"INFO: Setting Dask CUDA as the backend to process the dataset in partitions. Increasing mem_frac above {mem_frac} may allow complete in-memory processing but increases the likelihood of OOM errors.")
        else:
            print(f"INFO: Estimated dataset size ({round(est_dataset_gb,2)} GB) is within the GPU VRAM limit ({vram_per_gpu*mem_frac} GB) after deducting {round(vram_per_gpu*(1-mem_frac),2)} GB reserved for compute.")
            print(f"Setting cuDF as the compute backend")
    else:
        with_dask = (ram * mem_frac) < est_dataset_gb

        if with_dask:
            print(f"INFO: Estimated dataset size ({round(est_dataset_gb,2)} GB) exceeds the job's allocated RAM ({ram*mem_frac} GB) after deducting {round(ram*(1-mem_frac),2)} GB reserved for compute.")
            print(f"INFO: Setting Dask as the compute backend to process the dataset in partitions. Increasing mem_frac above {mem_frac} may allow complete in-memory processing but increases the likelihood of OOM errors.")
        else:
            print(f"INFO: Estimated dataset size ({round(est_dataset_gb,2)} GB) is within the allocated memory limit ({ram} GB) after deducting {round(ram*(1-mem_frac),2)} GB reserved for compute.")
            print(f"INFO: Setting pandas as the compute backend")
    return with_dask

def start_backend(dataset_path: str | Path,
                  with_cuda: bool | Literal['infer'] = 'infer',
                  with_dask: bool | Literal['infer'] = 'infer',
                  **kwargs) -> DaskClusterManager | None:
    
    # Collect compute and dataset information to make an informed choice on the backend
    est_dataset_gb = estimate_dataset_size(dataset_path)
    cores,mem = parse_scontrol()
    ngpus,vram_per_gpu = get_gpu_info() if with_cuda in ['infer',True] else [0,0]
    
    print(f"cores: {cores} \nmem: {mem} \nngpus: {ngpus} \nvram: {vram_per_gpu}")
    
    # If either cuda or dask is set to infer, need to determine which of those is applicable to the current dataset
    # based on the resources available
    if with_cuda == 'infer':
        with_cuda = infer_cuda()
    if with_dask == 'infer':
        mem_frac = kwargs.get('mem_frac',0.9)
        with_dask = infer_dask(est_dataset_gb, 
                               ram = mem, 
                               mem_frac=mem_frac, 
                               ngpus = ngpus, 
                               vram_per_gpu = vram_per_gpu)

    backend = select_backend_type(with_cuda,with_dask)

    imports = backend_options[backend].get('imports',{})
    imports_zipped = zip(imports.get('package',[]), imports.get('alias',[]))
    pre_import_hooks = backend_options[backend].get('pre_import_hooks',{})
    post_import_hooks = backend_options[backend].get('post_import_hooks',{})
    
    for package, alias in imports_zipped:
        if package in pre_import_hooks.keys():
            hook = pre_import_hooks[package]
            wrap_hook(hook)
        
        import_package(package,alias)

        if package in post_import_hooks.keys():
            hook = post_import_hooks[package]
            wrap_hook(hook)

    guidance_message = backend_options[backend]['guidance_message']
    if guidance_message is not None:
        print(f"INFO: {guidance_message}")

    if backend in ['dask','dask_cuda']:
        n_workers_default = cores if backend == 'dask' else ngpus
        n_workers = kwargs.pop('n_workers',n_workers_default)
        manager = start_local_cluster(with_cuda,n_workers=n_workers,**kwargs)
    else:
        manager = None
    return [manager,backend]