Creating clusters

Spinning up Dask clusters with Coiled is done by creating a coiled.Cluster instance. coiled.Cluster objects manage a Dask cluster much like other cluster object you may have seen before like distributed.LocalCluster or dask_kubernetes.KubeCluster.

The video below will walk you through the process of spinning up a simple custer.

Simple example

In a simple case, you can create a cluster with five Dask workers with:

import coiled

cluster = coiled.Cluster(n_workers=5)

Note

Creating a cluster involves provisioning various resources on cloud-based infrastructure. This process takes a couple of minutes in most cases.

Once a cluster has been created, you can connect Dask to the cluster by creating a distributed.Client instance:

from dask.distributed import Client

client = Client(cluster)

To view the Dask diagnostic dashboard for your cluster, navigate to the cluster’s dashboard_link:

cluster.dashboard_link

which should output an address along the lines of "https://ec2-...compute.amazonaws.com:8787/status".

Tip

Any Coiled cluster you create will automatically shut down after 20 minutes of inactivity. No more large bills from leaving a cluster running over the weekend 🎉! You can also customize this idle timeout if needed – see the Custom workers and scheduler section for an example that does this.

The coiled.Cluster class has several keyword arguments you can use to further specify the details of your cluster. These parameters are discussed in the following sections.

Hardware resources

The hardware resources your cluster is launched on (e.g. number of CPUs, amount of RAM, etc.) can be configured with the following coiled.Cluster keyword arguments:

Parameter

Description

Default

worker_cpu

Number of CPUs allocated for each worker

4

worker_gpu

Number of GPUs allocated for each worker. Note that GPU access is disabled by default. If you would like access to GPUs, please contact sales@coiled.io.

0

worker_memory

Amount of memory to allocate for each worker

"16 GiB"

scheduler_cpu

Number of CPUs allocated for the scheduler

1

scheduler_memory

Amount of memory to allocate for the scheduler

"8 GiB"

For example, the following creates a cluster with five workers, each with 2 CPUs and 8 GiB of memory available, and a scheduler with 2 CPUs (default value) and 16 GiB of memory available:

import coiled

cluster = coiled.Cluster(
    n_workers=5,
    worker_cpu=2,
    worker_memory="8 GiB",
    scheduler_memory="16 GiB",
)

Note that while specifying worker_gpu will give your cluster workers access to GPUs, there are some additional best practices to ensure GPU-accelerated hardware is fully utilized. See the GPU best practices documentation for more information.

Software environment

The scheduler and each worker in a Coiled cluster are all launched with the same software environment. By default, they will use a software environment with Python, Dask, Distributed, NumPy, Pandas, and a few more commonly used libraries. This default environment is great for basic tasks, but you’ll also want to create your own custom software environments with the packages you need on your cluster.

Coiled supports building and managing custom software environments using pip and conda environment files. For more details on custom software environments, see the Software Environments documentation page. Once you have a custom software environment you can use the software keyword argument for coiled.Cluster to use that software environment on your cluster.

Note

Software environments used in Coiled clusters must have distributed >= 2.23.0 installed as Distributed is required to launch Dask scheduler and worker processes.

For example, the following uses a custom software environment with XGBoost installed:

import coiled

cluster = coiled.Cluster(software="examples/scaling-xgboost")

Custom workers and scheduler

Dask supports using custom worker and scheduler classes in a cluster which allows for increased flexibility and functionality in some use cases (e.g. Dask-CUDA’s CUDAWorker class for running Dask workers on NVIDIA GPUs). Additionally, worker and scheduler classes also have keyword arguments that can be specified to control their behavior (for an example, see Dask’s worker class API documentation).

The worker and scheduler class used in a Coiled cluster, as well as the keyword arguments, passed to those classes can be specified with the following coiled.Cluster keyword arguments:

Parameter

Description

Default

worker_class

Class to use for cluster workers

"distributed.Nanny"

worker_options

Mapping with keyword arguments to pass to worker_class

{}

scheduler_class

Class to use for the cluster scheduler

"distributed.Scheduler"

scheduler_options

Mapping with keyword arguments to pass to scheduler_class

{}

For example, the following creates a cluster which uses Distributed’s Worker class for workers (instead of the default Nanny class) and specifies idle_timeout="2 hours" when creating the cluster’s scheduler:

import coiled

cluster = coiled.Cluster(
    worker_class="distributed.Worker",
    scheduler_options={"idle_timeout": "2 hours"},
)

Backend options

Depending on where you’re running Coiled, there may be backend-specific options (e.g. which AWS region to use) that you can specify to customize Coiled’s behavior. For more information on what options are available, see the Backends documentation.

Environment Variables

To add environment variables to your clusters, use the environ keyword argument of coiled.Cluster. The input of environ should be a dictionary.

import coiled

cluster = coiled.Cluster(
    n_workers=5,
    software="examples/scaling-xgboost",
    environ={
        "DASK_COILED__ACCOUNT": "alice",
        "DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING": True,
        "DASK_DISTRIBUTED__LOGGING__DISTRIBUTED": "info",
    },
)

Attention

Environment variables are not encrypted and will be available as plain text. For security reasons, you should not use environment variables to add secrets to your clusters.

Cluster configurations

As seen in the previous sections, there are a variety of ways in which you can customize your clusters. In some cases it’s useful to save the various parameters that define a cluster (e.g. worker_memory, scheduler_options, etc.) so they can be easily reused or shared with colleagues.

Coiled supports this through the concept of a cluster configuration. A cluster configuration is a group of cluster parameters (e.g. worker_memory, scheduler_cpu, software, etc.) that can be saved and then reused by yourself or other Coiled users. When creating a cluster, you can use the configuration keyword argument for coiled.Cluster to specify a cluster configuration to use.

Parameter

Description

Default

configuration

Name of cluster configuration to create cluster from

"coiled/default"

Cluster configurations are discussed in detail on the Cluster configurations documentation page.

Communication Protocol

Coiled uses tls by default, but in some restricted environments (corporate networks, Binder notebooks, etc.) the default port (8786) may be blocked. Since the websocket protocol uses the standard port for secure web traffic, it’s unlikely to be blocked. This can be changed with the protocol parameter of scheduler_options and worker_options.

import coiled

cluster = coiled.Cluster(
    n_workers=5,
    scheduler_options={"protocol": "wss"},
    worker_options={"protocol": "wss"},
)

Tip

Coiled provides a top level protocol keyword argument that acts as a shortcut for both these options.

import coiled

cluster = coiled.Cluster(
    n_workers=5,
    protocol="wss",
)