Source code for coiled.types

from enum import Enum
from logging import getLogger
from pathlib import Path
from typing import BinaryIO, Dict, List, Optional, Tuple, Union

from typing_extensions import Literal, TypedDict
from urllib3.util import parse_url

logger = getLogger("coiled.package_sync")


event_type_list = Literal[
    "add_role_to_profile",
    "attach_gateway_to_router",
    "attach_subnet_to_router",
    "create_vm",
    "create_machine_image",
    "create_schedulercreate_worker",
    "delete_machine_image",
    "create_fw_rule",
    "create_fw",
    "create_network_cidr",
    "create_subnet",
    "create_network",
    "create_log_sink",
    "create_router",
    "create_iam_role",
    "create_log_bucket",
    "create_storage_bucket",
    "create_instance_profile",
    "check_log_sink_exists",
    "check_or_attach_cloudwatch_policy",
    "delete_vm",
    "delete_route",
    "get_firewall",
    "get_network",
    "get_subnet",
    "get_policy_arn",
    "get_log_group",
    "gcp_instance_create",
    "net_gateways_get_or_create",
    "scale",
]


class CondaPlaceHolder(dict):
    pass


class PackageInfo(TypedDict):
    name: str
    path: Optional[Path]
    source: Literal["pip", "conda"]
    channel_url: Optional[str]
    channel: Optional[str]
    subdir: Optional[str]
    conda_name: Optional[str]
    version: str
    wheel_target: Optional[str]


class PackageSchema(TypedDict):
    name: str
    source: Literal["pip", "conda"]
    channel: Optional[str]
    conda_name: Optional[str]
    client_version: Optional[str]
    specifier: str
    include: bool
    file: Optional[int]


class ResolvedPackageInfo(TypedDict):
    name: str
    source: Literal["pip", "conda"]
    channel: Optional[str]
    conda_name: Optional[str]
    client_version: Optional[str]
    specifier: str
    include: bool
    note: Optional[str]
    error: Optional[str]
    sdist: Optional[BinaryIO]
    md5: Optional[str]


class PackageLevelEnum(int, Enum):
    """
    Package mismatch severity level
    Using a high int so we have room to add extra levels as needed

    Ordering is allow comparison like

    if somelevel >= PackageLevelEnum.STRICT_MATCH:
        <some logic for high or critical levels>
    """

    CRITICAL = 100
    STRICT_MATCH = 75
    WARN = 50
    NONE = 0
    LOOSE = -1
    IGNORE = -2
    MATCH_MINOR = -3


class ApproximatePackageRequest(TypedDict):
    name: str
    priority_override: Optional[PackageLevelEnum]
    python_major_version: str
    python_minor_version: str
    python_patch_version: str
    source: Literal["pip", "conda"]
    channel_url: Optional[str]
    channel: Optional[str]
    subdir: Optional[str]
    conda_name: Optional[str]
    version: str
    wheel_target: Optional[str]


class ApproximatePackageResult(TypedDict):
    name: str
    conda_name: Optional[str]
    specifier: Optional[str]
    include: bool
    note: Optional[str]
    error: Optional[str]
    channel_url: Optional[str]
    client_version: Optional[str]


class PiplessCondaEnvSchema(TypedDict, total=False):
    name: Optional[str]
    channels: List[str]
    dependencies: List[str]


class CondaEnvSchema(TypedDict, total=False):
    name: Optional[str]
    channels: List[str]
    dependencies: List[Union[str, Dict[str, List[str]]]]


class SoftwareEnvSpec(TypedDict):
    packages: List[PackageSchema]
    raw_pip: Optional[List[str]]
    raw_conda: Optional[CondaEnvSchema]


# This function is in this module to prevent circular import issues
def parse_conda_channel(package_name: str, channel: str, subdir: str) -> Tuple[Optional[str], str]:
    """Return a channel and channel_url for a conda package with any extra information removed."""
    # Handle unknown channels
    if channel == "<unknown>":
        logger.warning(f"Channel for {package_name} is unknown, setting to conda-forge")
        channel = "conda-forge"
    # Strip subdir from channel
    if channel.endswith(f"/{subdir}"):
        channel = channel[: -len(f"/{subdir}")]
    # Handle channel urls
    if channel.startswith(("http:", "https:")):
        channel_url = channel
        channel = parse_url(channel).path or ""
        if channel:
            channel = channel.strip("/")
        channel = channel
    # TODO: Actually upload these files to S3
    elif channel.startswith("file:"):
        logger.warning(f"Channel for {package_name} is a local file, which is not currently supported")
        channel_url = channel
        channel = channel
    else:
        if channel.startswith("pkgs/"):
            domain = "repo.anaconda.com"
        elif channel.startswith("repo/"):
            domain = "repo.anaconda.cloud"
        else:
            domain = "conda.anaconda.org"
        channel_url = f"https://{domain}/{channel}"
        channel = channel
    return (channel or None), channel_url


class CondaPackage:
    def __init__(self, meta_json: Dict, prefix: Path):
        self.prefix = prefix
        self.name: str = meta_json["name"]
        self.version: str = meta_json["version"]
        self.subdir: str = meta_json["subdir"]
        self.files: str = meta_json["files"]
        self.depends: List[str] = meta_json.get("depends", [])
        self.constrains: List[str] = meta_json.get("constrains", [])
        self.channel, self.channel_url = parse_conda_channel(self.name, meta_json["channel"], self.subdir)

    def __repr__(self):
        return (
            f"CondaPackage(meta_json={{'name': {self.name!r}, 'version': "
            f"{self.version!r}, 'subdir': {self.subdir!r}, 'files': {self.files!r}, "
            f"'depends': {self.depends!r}, 'constrains': {self.constrains!r}, "
            f"'channel': {self.channel!r}}}, prefix={self.prefix!r})"
        )

    def __str__(self):
        return f"{self.name} {self.version} from {self.channel_url}"


class PackageLevel(TypedDict):
    name: str
    level: PackageLevelEnum
    source: Literal["pip", "conda"]


class ApiBase(TypedDict):
    id: int
    created: str
    updated: str


class SoftwareEnvironmentBuild(ApiBase):
    state: Literal["built", "building", "error", "queued"]


class SoftwareEnvironmentSpec(ApiBase):
    latest_build: Optional[SoftwareEnvironmentBuild]


class SoftwareEnvironmentAlias(ApiBase):
    name: str
    latest_spec: Optional[SoftwareEnvironmentSpec]


class ArchitectureTypesEnum(str, Enum):
    """
    All currently supported architecture types
    """

    X86_64 = "x86_64"
    ARM64 = "aarch64"

    def __str__(self) -> str:
        return self.value

    @property
    def conda_suffix(self) -> str:
        if self == ArchitectureTypesEnum.X86_64:
            return "64"
        else:
            return self.value

    @property
    def vm_arch(self) -> Literal["x86_64", "arm64"]:
        if self == ArchitectureTypesEnum.ARM64:
            return "arm64"
        else:
            return self.value


class ClusterDetailsState(TypedDict):
    state: str
    reason: str
    updated: str


class ClusterDetailsProcess(TypedDict):
    created: str
    name: str
    current_state: ClusterDetailsState
    instance: dict


class ClusterDetails(TypedDict):
    id: int
    name: str
    workers: List[ClusterDetailsProcess]
    scheduler: Optional[ClusterDetailsProcess]
    current_state: ClusterDetailsState
    created: str


class FirewallOptions(TypedDict):
    """
    A dictionary with the following key/value pairs

    Parameters
    ----------
    ports
        List of ports to open to cidr on the scheduler.
        For example, ``[22, 8786]`` opens port 22 for SSH and 8786 for client to Dask connection.
    cidr
        CIDR block from which to allow access. For example ``0.0.0.0/0`` allows access from any IP address.
    """

    ports: List[int]
    cidr: str


[docs] class BackendOptions(TypedDict, total=False): """ A dictionary with the following key/value pairs Parameters ---------- region_name Region name to launch cluster in. For example: us-east-2 zone_name Zone name to launch cluster in. For example: us-east-2a firewall Deprecated; use ``ingress`` instead. ingress Allows you to specify multiple CIDR blocks (and corresponding ports) to open for ingress on the scheduler firewall. spot Whether to request spot instances. spot_on_demand_fallback If requesting spot, whether to request non-spot instances if we get fewer spot instances than desired. spot_replacement By default we'll attempt to replace interrupted spot instances; set to False to disable. multizone Tell the cloud provider to pick zone with best availability; all VMs will be in a single zone unless you also use ``multizone_allow_cross_zone``. multizone_allow_cross_zone: By default, "multizone" cluster is still in a single zone (which zone is picked by cloud provider). This option allows the cluster to have VMs in distinct zones. There's a cost for cross-zone traffic (usually pennies per GB), so this is a bad choice for shuffle-heavy workloads, but can be a good choice for large embarrassingly parallel workloads. use_dashboard_public_ip Public IP is used by default, lets you choose to use private IP for dashboard link. use_dashboard_https When public IP address is used for dashboard, we'll enable HTTPS + auth by default. You may want to disable this if using something that needs to connect directly to the scheduler dashboard without authentication, such as jupyter dask-labextension. network_volumes Very experimental option to allow mounting SMB volume on cluster nodes. docker_shm_size Non-default value for shm_size. """ region_name: Optional[str] zone_name: Optional[str] firewall: Optional[FirewallOptions] # TODO deprecate, use ingress instead ingress: Optional[List[FirewallOptions]] spot: Optional[bool] spot_on_demand_fallback: Optional[bool] spot_replacement: Optional[bool] multizone: Optional[bool] multizone_allow_cross_zone: Optional[bool] use_dashboard_public_ip: Optional[bool] use_dashboard_https: Optional[bool] send_prometheus_metrics: Optional[bool] # TODO deprecate prometheus_write: Optional[dict] # TODO deprecate network_volumes: Optional[List[dict]] docker_shm_size: Optional[str]
class AWSOptions(BackendOptions, total=False): """ A dictionary with the following key/value pairs plus any pairs in :py:class:`BackendOptions` Parameters ---------- keypair_name AWS Keypair to assign worker/scheduler instances. This would need to be an existing keypair in your account, and needs to be in the same region as your cluster. Note that Coiled can also manage adding a unique, ephemeral keypair for SSH access to your cluster; see :doc:`ssh` for more information. use_placement_group If possible, this will attempt to put workers in the same cluster placement group (in theory this can result in better network between workers, since they'd be physically close to each other in datacenter, though we haven't seen this to have much benefit in practice). """ keypair_name: Optional[str] use_placement_group: Optional[bool] class GCPOptions(BackendOptions, total=False): """ A dictionary with GCP specific key/value pairs plus any pairs in :py:class:`BackendOptions` """ scheduler_accelerator_count: Optional[int] scheduler_accelerator_type: Optional[str] worker_accelerator_count: Optional[int] worker_accelerator_type: Optional[str]