General Python

Python developers do weird stuff.

Sometimes our work fits familiar frameworks, like Dataframes, or Geospatial, or Machine Learning, but sometimes we do weird stuff that doesn’t fit any common pattern.  We’re on our own charting a new path.

Fortunately, Python is made for weird stuff.  The language and ecosystem are designed for rapid prototyping.  Dask and Coiled are the same, they’re designed to support creative and novel workflows at a low level.  This page points you in useful directions if you find yourself in this situation.

First, an illustrative example doing a nested for loop over parameters and data files:



def score(params: dict, data: object) -> float:
    ...

results = []
for filename in filenames:                # Nested for loop
    data = load(filename)                 # Load data in outer loop
    for param in params:                  # Score in inner loop over all pairs
        result = score(param, data)  
        results.append(result)



best = max(results)                       # Get best score
from dask.distributed import LocalCluster

cluster = LocalCluster()                  # Use all cores on local machine
client = cluster.get_client()

futures = []
for filename in filenames:                # Nested for loop
    data = client.submit(load, filename)  # Load data in outer loop
    for param in params:                  # Score in inner loop over all pairs
        future = client.submit(score, param, data)       
        futures.append(future)

results = client.gather(futures)

best = max(results)                       # Get best score
import coiled

cluster = coiled.Cluster(n_workers=100)   # Scale out to 100 machines
client = cluster.get_client()

futures = []
for filename in s3_filenames:             # Nested for loop
    data = client.submit(load, filename)  # Load data in outer loop
    for param in params:                  # Score in inner loop over all pairs
        future = client.submit(score, param, data)       
        futures.append(future)

results = client.gather(futures)

best = max(results)                       # Get best score

Parallel For Loops

Often we want to apply one function in parallel across many files.  This use case is simple.  There are many ways to do this with Coiled, including …

import coiled

@coiled.function(
    memory=...           # Specify hardware details here
    region=...
)
def process(filename):   # Define your own processing function
    ...


result = process(filename)              # Run once on a cloud VM

results = list(process.map(filenames))  # Apply in parallel on many VMs


For more information, see Serverless Functions Documentation, or see the S3 arXiv example.

import coiled

cluster = coiled.Cluster(...)
client = cluster.get_client()


def process(filename):                  # Define your own Python function
   ...

futures = []
for filename in filenames:              # Submit many tasks in a loop
    future = client.submit(process, filename)
    futures.append(future)

results = client.gather(futures)        # Wait until done and Gather results

For more information, see Dask Futures Documentation or see this example video

Complex Workloads

Sometimes the workloads we want to parallelize are highly complex, often including deep nesting of loops, functions calling functions, branching, and so on.  We commonly find these in complex data processing pipelines, or in advanced algorithms used within, for example, quantitative trading or banking companies.

As an example, consider this image, which is a visual depiction of the credit risk model within a commercial bank.  Each point represents a part of the risk model, and each line represents a data dependency between each part.  The workload is complex, and defies any common parallelism pattern, like map-reduce.  It’s not the sort of thing you can write down in a dataframe or array abstraction.

https://blog.dask.org/images/credit_models/simple-model.svg

Complex task graph reprsenting a credit risk model

Fortunately, Dask contains simple tools that you can combine to construct these systems, including asynchronous futures, distributed locks, queues, etc..  You can find more information in the Dask Futures Documentation