Creating clusters#
Spinning up Dask clusters with Coiled is done by creating a
coiled.Cluster
instance. coiled.Cluster
objects manage a Dask
cluster much like other cluster object you may have seen before like
distributed.LocalCluster
or dask_kubernetes.KubeCluster
.
The video below will walk you through the process of spinning up a simple custer.
Simple example#
In a simple case, you can create a cluster with five Dask workers with:
import coiled
cluster = coiled.Cluster(n_workers=5)
Note
Creating a cluster involves provisioning various resources on cloud-based infrastructure. This process takes a couple of minutes in most cases.
Once a cluster has been created, you can connect Dask to the cluster by creating
a distributed.Client
instance:
from dask.distributed import Client
client = Client(cluster)
To view the
Dask diagnostic dashboard
for your cluster, navigate to the cluster’s dashboard_link
:
cluster.dashboard_link
which should output a dashboard address similar to
http://35.174.137.175:8787
.
Tip
Any Coiled cluster you create will automatically shut down after 20 minutes of inactivity. You can also customize this idle timeout if needed, see the Custom workers and scheduler section for an example.
The coiled.Cluster
class has several keyword arguments you can use to
further specify the details of your cluster. These parameters are discussed in
the following sections.
Hardware resources#
The hardware resources your cluster is launched on (e.g. number of CPUs, amount
of RAM, etc.) can be configured with the following coiled.Cluster
keyword
arguments:
Parameter |
Description |
Default |
---|---|---|
|
Number of CPUs allocated for each worker |
|
|
Number of GPUs allocated for each worker. Note that GPU access is disabled by default. If you would like access to GPUs, please contact sales@coiled.io. |
|
|
Amount of memory to allocate for each worker |
|
|
Instance types allocated for the workers |
|
|
Number of CPUs allocated for the scheduler |
|
|
Amount of memory to allocate for the scheduler |
|
|
Instance types allocated to the scheduler |
|
For example, the following creates a cluster with five workers, each with 2 CPUs and 8 GiB of memory available, and a scheduler with 2 CPUs (default value) and 8 GiB of memory available:
import coiled
cluster = coiled.Cluster(
n_workers=5,
worker_cpu=2,
worker_memory="8 GiB",
scheduler_memory="8 GiB",
)
Note that while specifying worker_gpu
will give your cluster workers access
to GPUs, there are some additional best practices to ensure GPU-accelerated
hardware is fully utilized. See the GPU best practices
documentation for more information.
Software environment#
The scheduler and each worker in a Coiled cluster are all launched with the same software environment. By default, they will use a software environment with Python, Dask, Distributed, NumPy, Pandas, and a few more commonly used libraries. This default environment is great for basic tasks, but you’ll also want to create your own custom software environments with the packages you need on your cluster.
Coiled supports building and managing custom software environments using pip and
conda environment files. For more details on custom software environments, see
the Software Environments documentation page. Once you have a custom software
environment you can use the software
keyword argument for coiled.Cluster
to use that software environment on your cluster.
Note
Software environments used in Coiled clusters must have
distributed >= 2.23.0
installed as
Distributed is required to launch Dask
scheduler and worker processes.
For example, the following uses a custom software environment with XGBoost installed:
import coiled
cluster = coiled.Cluster(software="examples/scaling-xgboost")
Custom workers and scheduler#
Dask supports using custom worker and scheduler classes in a cluster which
allows for increased flexibility and functionality in some use cases (e.g.
Dask-CUDA’s CUDAWorker
class for
running Dask workers on NVIDIA GPUs). Additionally, worker and scheduler classes
also have keyword arguments that can be specified to control their behavior (for
an example, see
Dask’s worker class API documentation).
The worker and scheduler class used in a Coiled cluster, as well as the keyword
arguments, passed to those classes can be specified with the following
coiled.Cluster
keyword arguments:
Parameter |
Description |
Default |
---|---|---|
|
Class to use for cluster workers |
|
|
Mapping with keyword arguments to pass to |
|
|
Class to use for the cluster scheduler |
|
|
Mapping with keyword arguments to pass to |
|
For example, the following creates a cluster which uses Distributed’s Worker
class for workers (instead of the default Nanny
class) and specifies
idle_timeout="2 hours"
when creating the cluster’s scheduler:
import coiled
cluster = coiled.Cluster(
worker_class="distributed.Worker",
scheduler_options={"idle_timeout": "2 hours"},
)
Instance Types#
Coiled will choose instance types that match your cluster CPU/Memory requirements. If you wish, you can specify a list of instance types when creating your cluster (see the tutorial on Selecting Instance Types). Since cloud providers might have availability issues for a specific instance type, it’s recommended you specify more than one type in the list.
Parameter |
Description |
Default [AWS/GCP] |
---|---|---|
|
List of instance types for the scheduler |
|
|
List of instance types for workers |
|
Backend options#
Depending on where you’re running Coiled, there may be backend-specific options (e.g. which AWS region to use) that you can specify to customize Coiled’s behavior. For more information on what options are available, see the Configure Your Cloud Provider documentation.
Environment Variables#
To add environment variables to your clusters, use the environ
keyword argument of coiled.Cluster
. The input of environ
should be a dictionary.
import coiled
cluster = coiled.Cluster(
n_workers=5,
software="examples/scaling-xgboost",
environ={
"DASK_COILED__ACCOUNT": "alice",
"DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING": True,
"DASK_DISTRIBUTED__LOGGING__DISTRIBUTED": "info",
},
)
Attention
Environment variables are not encrypted and will be available as plain text. For security reasons, you should not use environment variables to add secrets to your clusters.
Communication Protocol#
Coiled uses tls
by default, but in some restricted environments (corporate networks, Binder notebooks, etc.)
the default port (8786) may be blocked. Since the websocket protocol uses the standard port for secure web traffic, it’s unlikely to be blocked.
This can be changed with the protocol
parameter of scheduler_options
and worker_options
.
import coiled
cluster = coiled.Cluster(
n_workers=5,
scheduler_options={"protocol": "wss"},
worker_options={"protocol": "wss"},
)
Tip
Coiled provides a top level protocol
keyword argument that acts as a shortcut for both these options.
import coiled
cluster = coiled.Cluster(
n_workers=5,
protocol="wss",
)
Waiting for workers#
Usually, computations will run better if you wait for most works before submitting tasks to the cluster. By default,
Coiled will wait for 30% of the requested workers, before returning the prompt back to the user. You can override
this behaviour by using the wait_for_workers
parameter.
If you use an int, the Coiled client will wait for that number of workers. Note that you can only wait for a number
between 0 and n_workers
requested.
import coiled
cluster = coiled.Cluster(n_workers=10, wait_for_workers=4)
Alternatively, you might want to use a fraction of workers instead. Note that you need to specify a number between 0.0 and 1.0.
import coiled
cluster = coiled.Cluster(n_workers=60, wait_for_workers=0.5)
You can also set wait_for_workers
to True if you want to wait for all the requested workers. This option should be used
with caution when requesting large clusters, due to availability issues from the chosen cloud provider.
import coiled
cluster = coiled.Cluster(n_workers=25, wait_for_workers=True)
If you would rather get the prompt back as soon as the scheduler is up, you can set wait_for_workers
to False or 0.
import coiled
cluster = coiled.Cluster(n_workers=25, wait_for_workers=False)
Parameter |
Description |
Default |
---|---|---|
|
Number os workers to wait before running computations |
|
If you want to use the same value for the wait_for_workers
parameter, then you can edit your
Coiled configuration file:
# ~/.config/dask/coiled.yaml
coiled:
wait_for_workers: false