Prefect#

Prefect on Coiled#

Note

This is experimental and only works on Linux and/or an environment where a user is running from a path entirely contained in /home/ for now.

You can use the coiled prefect worker start CLI tool to start Prefect workers for use in running your deployed workflows

coiled prefect worker start --help

Get Started#

  1. Have the following installed in your environment:

    • coiled>=0.8.0

    • prefect~=2.10.15

    • prefect-dask~=0.2.4

  2. Sign up for an acount at prefect.io.

  3. Authenticate with Prefect locally in your terminal

    prefect cloud login -w
    

Example#

Let’s start by creating a simple Prefect flow in a file we’ll call myflow.py:

# file: myflow.py

from prefect import flow, task
from prefect.deployments import Deployment
from coiled.extensions.prefect import CoiledTaskRunner


@task
def inc(x):
    return x + 1


@task
def double(x):
    return x * 2


@task
def add(x, y):
    return x + y


@task
def sum_all(z):
    return sum(z)


@flow(log_prints=True, task_runner=CoiledTaskRunner())
def my_flow():
    a = inc.map(range(10))
    b = double.map(range(10))
    c = add.map(x=a, y=b)
    return sum_all(c)


# Make deployment from this flow
if __name__ == "__main__":

    deployment = Deployment.build_from_flow(
        flow=my_flow,
        name="prefect-on-coiled",
        work_queue_name="default",
        work_pool_name="default"
    )
    deployment.apply()

Start a Prefect Worker on Coiled#

With that in place, let’s go start a Prefect worker using the coiled prefect worker start CLI:

coiled prefect worker start --prefect-workspace MY_WORKSPACE_NAME

Note

It’s assumed the above is ran in the current directory where myflow.py is located, and from the same environment the flow’s dependencies are installed. This will leverage the syncing of this flow’s code along with automatic package synchronization feature of Coiled.

Additionally, this will upload the entire working directory.

This functionality will not be enabled if you are using the --container option.

In Coiled we can see our worker is running:

Running cluster on Coiled.

Cluster running our Prefect worker on Coiled (click to enlarge)#

And the work-queue we created is available in Prefect:

Healthy work queue.

The work queue is healthy! (click to enlarge)#

Make and Run a Prefect Deployment#

Now we make a Deployment to Prefect Cloud by running our myflow.py file:

python myflow.py

which we can see in Prefect:

Deployment on Prefect Cloud.

Deployment on Prefect Cloud (click to enlarge)#

Things are looking good, let’s run the deployment using the prefect deployment run CLI:

Running deployment from terminal.

Running our deployment from the terminal (click to enlarge)#

In Prefect we see the successful flow run:

Completed flow run

Completed flow run using cluster hosted on Coiled (click to enlarge)#

API#

coiled prefect worker start#

Deploy Prefect Worker(s) on a Coiled cluster.

This should be ran from the same code (and environment) used when deploying your Prefect flow/deployment. It will sync that directory’s code to the VM(s) where the worker(s) will be running. Running the command from the desired Python environment also ensures all packages are synced to the Worker(s)/VM(s).

coiled prefect worker start [OPTIONS]

Options

--prefect-workspace <prefect_workspace>#

Required Name of the Prefect workspace.

--prefect-key <prefect_key>#

API key for Prefect workspace, used to login Prefect workers.

--prefect-pool <prefect_pool>#

Name of pool for Prefect worker.

--prefect-work-queue <prefect_work_queue>#

Name of Prefect work-queue for worker.

--cluster-name <cluster_name>#

Name of Coiled cluster

--n-workers <n_workers>#

Number of Prefect workers to launch

--disable-adaptive#

Opt out of defaulting to an adaptive cluster

--adaptive-min <adaptive_min>#

Minimum workers in adaptive cluster configuration

--adaptive-max <adaptive_max>#

Maximum workers in adaptive cluster configuration

--container <container>#

Docker image uri to use, ex: coiled/default:sha-2021.06.0