API#
- class coiled.Cluster(name=None, *, software=None, container=None, n_workers=None, worker_class=None, worker_options=None, worker_vm_types=None, worker_cpu=None, worker_memory=None, worker_disk_size=None, worker_disk_throughput=None, worker_gpu=None, worker_gpu_type=None, scheduler_class=None, scheduler_options=None, scheduler_vm_types=None, scheduler_cpu=None, scheduler_memory=None, scheduler_disk_size=None, scheduler_gpu=None, asynchronous=False, cloud=None, account=None, shutdown_on_close=None, idle_timeout=None, use_scheduler_public_ip=None, use_dashboard_https=None, credentials='local', credentials_duration_seconds=None, timeout=None, environ=None, tags=None, send_dask_config=True, backend_options=None, show_widget=True, custom_widget=None, configure_logging=None, wait_for_workers=None, package_sync=False, package_sync_strict=False, package_sync_ignore=None, package_sync_fail_on='critical-only', private_to_creator=None, use_best_zone=True, compute_purchase_option=None, spot_policy=None, extra_worker_on_scheduler=None, _n_worker_specs_per_host=None, scheduler_port=None, allow_ingress_from=None, allow_ssh=None, jupyter=None, region=None, arm=None)
Create a Dask cluster with Coiled
- Parameters
n_workers (
Optional
[int
]) – Number of workers in this cluster. Defaults to 4. If argument this is not specified, adaptive scaling is enabled.name (
Optional
[str
]) – Name to use for identifying this cluster. Defaults toNone
.software (
Optional
[str
]) – Name of the software environment to use; this allows you to use and re-use existing Coiled software environments, and should not be used with package sync or when specifying a container to use for this specific cluster.container (
Optional
[str
]) – Name or URI of container image to use; when using a pre-made container image with Coiled, this allows you to skip the step of explicitly creating a Coiled software environment from that image. Note that this should not be used with package sync or when specifying an existing Coiled software environment.worker_class (
Optional
[str
]) – Worker class to use. Defaults todistributed.nanny.Nanny
.worker_options (
Optional
[dict
]) – Mapping with keyword arguments to pass toworker_class
. Defaults to{}
.worker_vm_types (
Optional
[list
]) – List of instance types that you would like workers to use, default instance type selected contains 4 cores. You can use the commandcoiled.list_instance_types()
to see a list of allowed types.worker_cpu (
Union
[int
,List
[int
],None
]) – Number, or range, of CPUs requested for each worker. Specify a range by using a list of two elements, for example:worker_cpu=[2, 8]
.worker_memory (
Union
[str
,List
[str
],None
]) – Amount of memory to request for each worker, Coiled will use a +/- 10% buffer from the memory that you specify. You may specify a range of memory by using a list of two elements, for example:worker_memory=["2GiB", "4GiB"]
.worker_disk_size (
Optional
[int
]) – Non-default size of persistent disk attached to each worker instance, specified in GiB.worker_disk_throughput (
Optional
[int
]) – EXPERIMENTAL. For AWS, non-default throughput (in MB/s) for EBS gp3 volumes attached to workers.worker_gpu (
Union
[int
,bool
,None
]) – Number of GPUs to attach to each worker. Default is 0,True
is interpreted as 1. Note that this is ignored if you’re explicitly specifying an instance type which includes a fixed number of GPUs.worker_gpu_type (
Optional
[str
]) – For GCP, this lets you specify type of guest GPU for instances. Should match the way the cloud provider specifies the GPU, for example:worker_gpu_type="nvidia-tesla-t4"
. By default, Coiled will request NVIDIA T4 if GPU type isn’t specified. For AWS, if you want GPU other than T4, you’ll need to explicitly specify the VM instance type (e.g.,p3.2xlarge
for instance with one NVIDIA Tesla V100).scheduler_class (
Optional
[str
]) – Scheduler class to use. Defaults todistributed.scheduler.Scheduler
.scheduler_options (
Optional
[dict
]) – Mapping with keyword arguments to pass toscheduler_class
. Defaults to{}
.scheduler_vm_types (
Optional
[list
]) – List of instance types that you would like the scheduler to use, default instances type selected contains 4 cores. You can use the commandcoiled.list_instance_types()
to se a list of allowed types.scheduler_cpu (
Union
[int
,List
[int
],None
]) – Number, or range, of CPUs requested for the scheduler. Specify a range by using a list of two elements, for example:scheduler_cpu=[2, 8]
.scheduler_memory (
Union
[str
,List
[str
],None
]) – Amount of memory to request for the scheduler, Coiled will use a +/-10% buffer from the memory what you specify. You may specify a range of memory by using a list of two elements, for example:scheduler_memory=["2GiB", "4GiB"]
.scheduler_gpu (
Optional
[bool
]) – Whether to attach GPU to scheduler; this would be a single NVIDIA T4. The best practice for Dask is to have a GPU on the scheduler if you are using GPUs on your workers, so if you don’t explicitly specify, Coiled will follow this best practice and give you a scheduler GPU just in case you haveworker_gpu
set.asynchronous (
bool
) – Set to True if using this Cloud withinasync
/await
functions or within Tornadogen.coroutines
. Otherwise this should remainFalse
for normal use. Default isFalse
.cloud (
Optional
[CloudV2
]) – Cloud object to use for interacting with Coiled. This object contains user/authentication/account information. If this is None (default), we look for a recently-cached Cloud object, and if none exists create one.account (
Optional
[str
]) – Name of Coiled account to use. If not provided, will default to the user account for thecloud
object being used.shutdown_on_close – Whether or not to shut down the cluster when it finishes. Defaults to True, unless name points to an existing cluster.
idle_timeout (
Optional
[str
]) – Shut down the cluster after this duration if no activity has occurred. E.g. “30 minutes” Default: “20 minutes”use_scheduler_public_ip (
Optional
[bool
]) – Boolean value that determines if the Python client connects to the Dask scheduler using the scheduler machine’s public IP address. The default behaviour when set to True is to connect to the scheduler using its public IP address, which means traffic will be routed over the public internet. When set to False, traffic will be routed over the local network the scheduler lives in, so make sure the scheduler private IP address is routable from where this function call is made when setting this to False.use_dashboard_https (
Optional
[bool
]) – When public IP address is used for dashboard, we’ll enable HTTPS + auth by default. You may want to disable this if using something that needs to connect directly to the scheduler dashboard without authentication, such as jupyter dask-labextension<=6.1.0.credentials (
Optional
[str
]) – Which credentials to use for Dask operations and forward to Dask clusters – options are “local”, or None. The default behavior is to use local credentials if available. NOTE: credential handling currently only works with AWS credentials.credentials_duration_seconds (
Optional
[int
]) – For “local” credentials shipped to cluster as STS token, set the duration of STS token. If not specified, the AWS default will be used.timeout (
Union
[int
,float
,None
]) – Timeout in seconds to wait for a cluster to start, will usedefault_cluster_timeout
set on parent Cloud by default.environ (
Optional
[Dict
[str
,str
]]) – Dictionary of environment variables. Values will be transmitted to Coiled; for private environment variables (e.g., passwords or access keys you use for data access),send_private_envs()
is recommended.send_dask_config (
bool
) – Whether to send a frozen copy of local dask.config to the cluster.backend_options (
Union
[AWSOptions
,GCPOptions
,None
]) – Dictionary of backend specific options.show_widget (
bool
) – Whether to use the rich-based widget display in IPython/Jupyter (ignored if not in those environments). For use cases involving multiple Clusters at once, show_widget=False is recommended. (Default: True)custom_widget (
Optional
[ClusterWidget
]) – Use the rich-based widget display outside of IPython/Jupyter (Default: False)tags (
Optional
[Dict
[str
,str
]]) – Dictionary of tags.wait_for_workers (
Union
[int
,float
,bool
,None
]) – Whether to wait for a number of workers before returning control of the prompt back to the user. Usually, computations will run better if you wait for most workers before submitting tasks to the cluster. You can wait for all workers by passingTrue
, or not wait for any by passingFalse
. You can pass a fraction of the total number of workers requested as a float(like 0.6), or a fixed number of workers as an int (like 13). If None, the value fromcoiled.wait-for-workers
in your Dask config will be used. Default: 0.3. If the requested number of workers don’t launch within 10 minutes, the cluster will be shut down, then a TimeoutError is raised.package_sync (
Union
[bool
,List
[str
]]) – Synchronize package versions between your local environment and the cluster. Cannot be used with thesoftware
option. PassingTrue
will sync all packages (recommended). Ifsoftware
is not given, defaults to True. Passing specific packages as a list of strings will attempt to synchronize only those packages, use with caution. We recommend reading the additional documentation for this featurepackage_sync_ignore (
Optional
[List
[str
]]) – A list of package names to exclude from the environment. Note their dependencies may still be installed, or they may be installed by another package that depends on them!package_sync_strict (
bool
) – Only allow exact packages matches, not recommended unless your client platform/architecture matches the cluster platform/architectureprivate_to_creator (
Optional
[bool
]) – Only allow the cluster creator, not other members of team account, to connect to this cluster.use_best_zone (
bool
) – Allow the cloud provider to pick the zone (in your specified region) that has best availability for your requested instances. We’ll keep the scheduler and workers all in a single zone in order to avoid any interzone network traffic (which would be billed).spot_policy (
Optional
[Literal
[‘on-demand’, ‘spot’, ‘spot_with_fallback’]]) – Purchase option to use for workers in your cluster, options are “on-demand”, “spot”, and “spot_with_fallback”; by default this is “on-demand”. (Google Cloud refers to this as “provisioning model” for your instances.) Spot instances are much cheaper, but can have more limited availability and may be terminated while you’re still using them if the cloud provider needs more capacity for other customers. On-demand instances have the best availability and are almost never terminated while still in use, but they’re significantly more expensive than spot instances. For most workloads, “spot_with_fallback” is likely to be a good choice: Coiled will try to get as many spot instances as we can, and if we get less than you requested, we’ll try to get the remaining instances as on-demand. For AWS, when we’re notified that an active spot instance is going to be terminated, we’ll attempt to get a replacement instance (spot if available, but could be on-demand if you’ve enabled “fallback”). Dask on the active instance will attempt a graceful shutdown before the instance is terminated so that computed results won’t be lost.scheduler_port (
Optional
[int
]) – Specify a port other than the default (8786) for communication with Dask scheduler; this is useful if your client is on a network that blocks 8786.allow_ingress_from (
Optional
[str
]) – Control the CIDR from which cluster firewall allows ingress to scheduler; by default this is open to any source address (0.0.0.0/0). You can specify CIDR, or “me” for just your IP address.allow_ssh (
Optional
[bool
]) – Allow connections to scheduler over port 22, used for SSH.jupyter (
Optional
[bool
]) – Start a Jupyter server in the same process as Dask scheduler. The Jupyter server will be behind HTTPS with authentication (unless you disableuse_dashboard_https
, which we strongly recommend against). Note thatjupyterlab
will need to be installed in the software environment used on the cluster (or in your local environment if using package sync). Once the cluster is running, you can usejupyter_link
to get link to access the Jupyter server.region (
Optional
[str
]) – The cloud provider region in which to run the cluster.arm (
Optional
[bool
]) – Use ARM instances for cluster; default is x86 (Intel) instances.
- adapt(Adaptive=<class 'coiled.cluster.CoiledAdaptive'>, *, minimum=1, maximum=200, target_duration='3m', wait_count=24, interval='5s', **kwargs)[source]
Dynamically scale the number of workers in the cluster based on scaling heuristics.
- Parameters
minimum (int) – Minimum number of workers that the cluster should have while on low load, defaults to 1.
maximum (int) – Maximum numbers of workers that the cluster should have while on high load.
wait_count (int) – Number of consecutive times that a worker should be suggested for removal before the cluster removes it.
interval (timedelta or str) – Milliseconds between checks, defaults to 5000 ms.
target_duration (timedelta or str) – Amount of time we want a computation to take. This affects how aggressively the cluster scales up.
- Return type
- property asynchronous
Are we running in the event loop?
- close(force_shutdown=False)[source]
Close the cluster.
- Return type
Optional
[Awaitable
[None
]]
- get_client()[source]
Return client for the cluster
If a client has already been initialized for the cluster, return that otherwise initialize a new client object.
- get_logs(scheduler=True, workers=True)[source]
Return logs for the scheduler and workers :type scheduler:
bool
:param scheduler: Whether or not to collect logs for the scheduler :type scheduler: boolean :type workers:bool
:param workers: Whether or not to collect logs for the workers :type workers: boolean- Returns
logs – A dictionary of logs, with one item for the scheduler and one for the workers
- Return type
Dict[str]
- async recommendations(target)[source]
Make scale up/down recommendations based on current state and target.
Return a recommendation of the form - {“status”: “same”} - {“status”: “up”, “n”: <desired number of total workers>} - {“status”: “down”, “workers”: <list of workers to close>}
- Return type
dict
- scale(n)[source]
Scale cluster to
n
workers- Parameters
n (
int
) – Number of workers to scale cluster size to.- Return type
Optional
[Awaitable
[None
]]
- async scale_up(n, reason=None)[source]
Scales up to a target number of
n
workersIt’s documented that scale_up should scale up to a certain target, not scale up BY a certain amount:
https://github.com/dask/distributed/blob/main/distributed/deploy/adaptive_core.py#L60
- Return type
None
- send_credentials(automatic_refresh=False)[source]
Manually trigger sending STS token to cluster.
Usually STS token is automatically sent and refreshed by default, this allows you to manually force a refresh in case that’s needed for any reason.
- send_private_envs(env)[source]
Send potentially private environment variables to be set on scheduler and all workers.
You can use this to send secrets (passwords, auth tokens) that you can use in code running on cluster. Unlike environment variables set with
coiled.Cluster(environ=...)
, the values will be transmitted directly to your cluster without being transmitted to Coiled, logged, or written to disk.The Dask scheduler will ensure that these environment variables are set on any new workers you add to the cluster.
- set_keepalive(keepalive)[source]
Set how long to keep cluster running if all the clients have disconnected.
This is a way to shut down no-longer-used cluster, in additional to dask idle timeout. With no keepalive set, cluster will not shut down on account of clients going away.
- Parameters
keepalive – duration string like “30s” or “5m”
- shutdown()[source]
Shutdown the cluster; useful when shutdown_on_close is False.
- Return type
Optional
[Awaitable
[None
]]
- sync(func, *args, asynchronous=None, callback_timeout=None, **kwargs)[source]
Call func with args synchronously or asynchronously depending on the calling context
- Return type
Union
[TypeVar
(_T
),Coroutine
[Any
,Any
,TypeVar
(_T
)]]
- wait_for_workers(n_workers='__no_default__', timeout=None)[source]
Blocking call to wait for n workers before continuing
- Parameters
n_workers (int) – The number of workers
timeout (number, optional) – Time in seconds after which to raise a
dask.distributed.TimeoutError
- Return type
None
- async workers_to_close(target)[source]
Determine which, if any, workers should potentially be removed from the cluster.
Notes
Cluster.workers_to_close
dispatches to Scheduler.workers_to_close(), but may be overridden in subclasses.- Return type
List of worker addresses to close, if any
See also
Scheduler.workers_to_close