Source code for coiled.analytics

from __future__ import annotations

from types import ModuleType
from typing import Optional

import dask
import urllib3
from dask.distributed import Client, get_client
from dask.utils import has_keyword

import coiled


[docs]def register(client: Optional[Client] = None): """ Register Coiled analytics with your Dask cluster. Parameters ---------- client: The Dask client that you want to connect to This will use the most recently created Dask client by default (usually what you want anyway) Examples -------- >>> from dask.distributed import Client >>> client = Client() >>> import coiled.analytics >>> coiled.analytics.register() Or use this module as a preload $ dask-scheduler --preload coiled.analytics """ with coiled.Cloud(): # verify that we can log in pass client = client or get_client() url = dask.config.get("coiled.server") + "/api/v2/analytics/preload" with urllib3.PoolManager() as http: response = http.request("GET", url) if response.status != 200: raise Exception("Unable to collect Coiled plugin", response.status) compiled = compile(response.data, url, "exec") module = ModuleType(url) exec(compiled, module.__dict__) plugin = module.CoiledTelemetry() # type: ignore if has_keyword(client.register_scheduler_plugin, "idempotent"): # type: ignore client.register_scheduler_plugin(plugin, idempotent=True) # type: ignore else: client.register_scheduler_plugin(plugin) # type: ignore
[docs]def list_clusters( account: Optional[str] = None, since: Optional[str] = "7 days", user: Optional[str] = None, ): """List clusters associated to this account Clusters arrive in reverse chronological order (the first element is the most recent). Parameters ---------- account: The account whose clusters you want to list You must be a member of this account Your default account will be used if none is provided since: The amount of time to go back in history to list clusters. Defaults to seven days. Leave as ``None`` to collect all history. Accepts any value parseable by dask.utils.parse_timedelta user: Optionally filter on username Providing ``None`` selects all users Examples -------- >>> import coiled.analytics >>> coiled.analytics.list_clusters() # doctest: +SKIP >>> coiled.analytics.list_clusters(since="30 days", user="alice") # doctest: +SKIP """ with coiled.Cloud() as c: return c.list_dask_scheduler(account=account, since=since, user=user)
[docs]def list_computations(cluster_id: int, account: Optional[str] = None): """List computations associated to a cluster Parameters ---------- cluster_id: The identifier of the cluster that you want to select account: The account whose clusters you want to list You must be a member of this account Your default account will be used if none is provided Examples -------- >>> import coiled.analytics >>> clusters = coiled.analytics.list_clusters() # doctest: +SKIP >>> coiled.analytics.list_computations(clusters[0]["id"]) # doctest: +SKIP See Also -------- list_clusters """ with coiled.Cloud() as c: return c.list_computations(cluster_id, account=account)
[docs]def list_events(cluster_id: int, account: Optional[str] = None): """List events associated to a cluster Parameters ---------- cluster_id: The identifier of the cluster that you want to select account: The account whose clusters you want to list You must be a member of this account Your default account will be used if none is provided Examples -------- >>> import coiled.analytics >>> clusters = coiled.analytics.list_clusters() # doctest: +SKIP >>> coiled.analytics.list_events(clusters[0]["id"]) # doctest: +SKIP See Also -------- list_clusters """ with coiled.Cloud() as c: return c.list_events(cluster_id, account=account)
def close(cluster_id: int, account: Optional[str] = None): """Close a cluster This sends a request to the remote scheduler asking it to shut down. It calls Scheduler.close on that machine. It is common (although not guaranteed) that workers should shut themselves down after a suitable time. Parameters ---------- cluster_id: The identifier of the cluster that you want to select account: The account whose clusters you want to list You must be a member of this account Your default account will be used if none is provided Examples -------- >>> import coiled.analytics >>> clusters = coiled.analytics.list_clusters() # doctest: +SKIP >>> coiled.analytics.close(clusters[0]["id"]) # doctest: +SKIP See Also -------- list_clusters """ with coiled.Cloud() as c: return c.send_state(cluster_id, account=account, desired_status="stopped") def list_exceptions( account: Optional[str] = None, since: Optional[str] = "7 days", user: Optional[str] = None, cluster_id: Optional[int] = None, scheduler_id: Optional[int] = None, ): """List user exceptions Parameters ---------- account (optional): The account whose clusters you want to list You must be a member of this account Your default account will be used if none is provided since (optional): The amount of time to go back in history to list clusters. Defaults to seven days. Leave as ``None`` to collect all history. Accepts any value parseable by dask.utils.parse_timedelta user (optional): Optionally filter on username Providing ``None`` selects all users cluster_id (optional): The identifier of the cluster that you want to select scheduler_id (optional): The identifier of the scheduler analytics that you want to select Examples -------- >>> import coiled.analytics >>> coiled.analytics.list_exceptions() # doctest: +SKIP >>> coiled.analytics.list_exceptions(since="30 days", user="alice") # doctest: +SKIP """ with coiled.Cloud() as c: return c.list_exceptions( account=account, since=since, user=user, cluster_id=cluster_id, scheduler_id=scheduler_id, ) async def dask_setup(scheduler): url = dask.config.get("coiled.server") + "/api/v2/analytics/preload" with urllib3.PoolManager() as http: response = http.request("GET", url) if response.status != 200: raise Exception("Unable to collect Coiled plugin", response.status) compiled = compile(response.data, url, "exec") module = ModuleType(url) exec(compiled, module.__dict__) plugin = module.CoiledTelemetry() # type: ignore if plugin.name in scheduler.plugins: return scheduler.add_plugin(plugin, name=plugin.name, idempotent=True) await plugin.start(scheduler)