Newer
Older
Matthew K Defenderfer
committed
from .backend_defs import backend_options
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, LocalCluster
from .utils import *
from ..utils import parse_scontrol
Matthew K Defenderfer
committed
from typing import Literal
__all__ = ['start_backend','start_local_cluster']
Matthew K Defenderfer
committed
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# ENH: Add default parameters for cluster creation based on defined type and available resources. For instance, creating a LocalCluster should default to using all available CPUs and all available RAM.
class DaskClusterManager:
def __init__(self, cluster_type: LocalCluster | LocalCUDACluster=LocalCluster, **kwargs):
"""_summary_
Parameters
----------
cluster_type : LocalCluster | LocalCUDACluster, default = LocalCluster
Sets the type of cluster to be created. Specifically designed to create either a CPU-only cluster (LocalCluster) or a GPU-enabled cluster (LocalCUDACluster). All specified kwargs will be passed to the cluster creator.
"""
# Initialize the cluster (adjust this to the cluster type you are using)
self.cluster = cluster_type(**kwargs)
# Initialize the client
self.client = Client(self.cluster)
def scale(self, n_workers):
"""Scale the cluster to the specified number of workers."""
self.cluster.scale(n_workers)
def close(self):
"""Close the client and cluster."""
print("INFO: Closing Dask client and cluster")
self.client.close()
self.cluster.close()
def __getattr__(self, name):
"""
Delegate attribute access to the client or cluster.
This allows accessing methods and properties from both objects directly
through this manager class.
"""
if hasattr(self.client, name):
return getattr(self.client, name)
elif hasattr(self.cluster, name):
return getattr(self.cluster, name)
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
def __repr__(self):
return (f"<DaskClusterManager(cluster={self.cluster}, "
f"client={self.client})>")
def start_local_cluster(with_cuda=False,**kwargs) -> DaskClusterManager:
if with_cuda:
cluster_type = LocalCUDACluster
n_workers = kwargs.pop('n_workers',get_gpu_info()[0])
local_directory = kwargs.pop('local_directory','/scratch/local')
manager = DaskClusterManager(cluster_type=cluster_type,
n_workers=n_workers,
local_directory=local_directory,
**kwargs)
else:
cluster_type = LocalCluster
n_workers = kwargs.pop('n_workers',parse_scontrol()[0])
memory_limit = kwargs.pop('memory_limit',None)
manager = DaskClusterManager(cluster_type=cluster_type,
n_workers = n_workers,
memory_limit = memory_limit,
**kwargs)
return manager
def select_backend_type(with_cuda, with_dask) -> Literal['dask_cuda','cudf','dask','pandas']:
if with_cuda and with_dask:
backend = 'dask_cuda'
elif with_cuda and not with_dask:
backend = 'cudf'
elif not with_cuda and with_dask:
backend = 'dask'
else:
backend = 'pandas'
return backend
def infer_cuda() -> bool:
ngpus,_ = get_gpu_info()
print(f"INFO: {ngpus} GPUs are available.")
if ngpus > 0:
print("INFO: Using CUDA compute")
else:
print("INFO: USING CPU compute")
return ngpus > 0
def infer_dask(est_dataset_gb: str | Path,
ram: int | float = 0,
mem_frac: float = 0.9,
ngpus: int = 0,
vram_per_gpu: int | float = 0) -> bool:
if ngpus > 1:
print("INFO: Multi-GPU detected. Setting Dask CUDA as backend")
with_dask = True
elif ngpus == 1:
with_dask = (vram_per_gpu * mem_frac) < est_dataset_gb
if with_dask:
print(f"INFO: Estimated dataset size ({round(est_dataset_gb,2)} GB) exceeds the GPU VRAM limit ({vram_per_gpu*mem_frac} GB) after deducting {round(vram_per_gpu*(1-mem_frac),2)} GB reserved for compute.")
print(f"INFO: Setting Dask CUDA as the backend to process the dataset in partitions. Increasing mem_frac above {mem_frac} may allow complete in-memory processing but increases the likelihood of OOM errors.")
else:
print(f"INFO: Estimated dataset size ({round(est_dataset_gb,2)} GB) is within the GPU VRAM limit ({vram_per_gpu*mem_frac} GB) after deducting {round(vram_per_gpu*(1-mem_frac),2)} GB reserved for compute.")
print(f"Setting cuDF as the compute backend")
else:
with_dask = (ram * mem_frac) < est_dataset_gb
if with_dask:
print(f"INFO: Estimated dataset size ({round(est_dataset_gb,2)} GB) exceeds the job's allocated RAM ({ram*mem_frac} GB) after deducting {round(ram*(1-mem_frac),2)} GB reserved for compute.")
print(f"INFO: Setting Dask as the compute backend to process the dataset in partitions. Increasing mem_frac above {mem_frac} may allow complete in-memory processing but increases the likelihood of OOM errors.")
else:
print(f"INFO: Estimated dataset size ({round(est_dataset_gb,2)} GB) is within the allocated memory limit ({ram} GB) after deducting {round(ram*(1-mem_frac),2)} GB reserved for compute.")
print(f"INFO: Setting pandas as the compute backend")
return with_dask
def start_backend(dataset_path: str | Path,
with_cuda: bool | Literal['infer'] = 'infer',
with_dask: bool | Literal['infer'] = 'infer',
**kwargs) -> DaskClusterManager | None:
# Collect compute and dataset information to make an informed choice on the backend
est_dataset_gb = estimate_dataset_size(dataset_path)
cores,mem = parse_scontrol()
ngpus,vram_per_gpu = get_gpu_info() if with_cuda in ['infer',True] else [0,0]
print(f"cores: {cores} \nmem: {mem} \nngpus: {ngpus} \nvram: {vram_per_gpu}")
# If either cuda or dask is set to infer, need to determine which of those is applicable to the current dataset
# based on the resources available
if with_cuda == 'infer':
with_cuda = infer_cuda()
if with_dask == 'infer':
mem_frac = kwargs.get('mem_frac',0.9)
with_dask = infer_dask(est_dataset_gb,
ram = mem,
mem_frac=mem_frac,
ngpus = ngpus,
vram_per_gpu = vram_per_gpu)
backend = select_backend_type(with_cuda,with_dask)
imports = backend_options[backend].get('imports',{})
imports_zipped = zip(imports.get('package',[]), imports.get('alias',[]))
pre_import_hooks = backend_options[backend].get('pre_import_hooks',{})
post_import_hooks = backend_options[backend].get('post_import_hooks',{})
for package, alias in imports_zipped:
if package in pre_import_hooks.keys():
hook = pre_import_hooks[package]
wrap_hook(hook)
import_package(package,alias)
if package in post_import_hooks.keys():
hook = post_import_hooks[package]
wrap_hook(hook)
guidance_message = backend_options[backend]['guidance_message']
if guidance_message is not None:
print(f"INFO: {guidance_message}")
if backend in ['dask','dask_cuda']:
n_workers_default = cores if backend == 'dask' else ngpus
n_workers = kwargs.pop('n_workers',n_workers_default)
manager = start_local_cluster(with_cuda,n_workers=n_workers,**kwargs)
else:
manager = None
return [manager,backend]