Parallel read and write with MongoDB#

In this guide, you’ll learn how to use Coiled to create a Dask cluster in your cloud account and use the dask-mongo connector to read and write a dataset in parallel. You can download this jupyter notebook to follow along in your own JupyterLab session.

MongoDB is a document-oriented NoSQL database. You can use Coiled and MongoDB together– MongoDB handles the data storage and indexing while Coiled handles the backend infrastructure for dealing with out-of-memory computations in Python.

Loading data from MongoDB into Python typically involves using the PyMongo driver. This works well for small datasets, but can be limiting when working with larger datasets and more complex queries. The dask-mongo connector enables parallel reading and writing between MongoDB your Dask clusters.

Before you start#

You’ll first need to create consistent local and remote software environments with dask, coiled, and the necessary dependencies installed. If you are unfamiliar with creating software environments, you can first follow the tutorial on setting up a custom software environment.

You can install dask-mongo, pymongo, and coiled-runtime, a Dask meta-package. Save the following file as environment.yml. You can find the most up-to-date version of coiled-runtime from the latest tag in the public coiled-runtime repository.

channels:
  - conda-forge
dependencies:
  - python=3.9
  - coiled-runtime=<x.x.x>
  - dask-mongo=<x.x.x>
  - pip
  - pip:
    - pymongo[srv]=<x.x.x>

Next, create a local software environment using the environment.yml file:

conda env create -f environment.yml -n mongo-example
conda activate mongo-example

Lastly, create a remote software environment using the same environment.yml file:

coiled env create -n mongo-example --conda environment.yml

Setting up MongoDB Atlas#

In this section you’ll create a database deployed on a free cluster with MongoDB Atlas. You’ll then load the sample_airbnb dataset provided by Atlas.

  1. Follow MongoDB’s Get Started with Atlas tutorial to create an Atlas account and deploy a free cluster.

Note

For the purposes of this example, you can allow connections from any IP address so your Coiled cluster can connect to your MongoDB instance. You can configure this by selecting Network Access from the left navigation pane and selecting “allow access from anywhere” to add CIDR 0.0.0.0/0 (see the MongoDB documentation on adding IP addresses).

  1. Follow the MongoDB guide on loading sample data to load the sample_airbnb dataset into your database.

Create a Coiled cluster#

In this section, you’ll use Coiled to create a Dask cluster in your cloud account. You can start by creating a cluster, using the software argument to use the Python environment you created previously:

import coiled
    
cluster = coiled.Cluster(
    name="mongo-example",
    n_workers=5,
    software="mongo-example",
)

and then connecting Dask to your remote Coiled cluster:

from dask.distributed import Client

client = Client(cluster)

Read data in parallel#

Now you’re ready to use the dask-mongo connector to read data in parallel using your Coiled cluster! First, you’ll connect to your database deployment using the PyMongo driver (see the MongoDB guide on connecting to your Atlas cluster).

Replace the username, password, and cluster address with your own connection details:

username = "<USERNAME>"
password = "<PASSWORD>"
cluster_address = "<ADDRESS>"
host_uri = f"mongodb+srv://{username}:{password}@{cluster_address}/?retryWrites=true&w=majority"

Now you’re reading to read a sample dataset in parallel with dask-mongo:

from dask_mongo import read_mongo

# lazily read in the sample_airbnb dataset
my_bag = read_mongo(
    connection_kwargs={"host": host_uri},
    database="sample_airbnb",
    collection="listingsAndReviews",
    chunksize=500,
)
# uncomment to look at the first record of the dataset
# my_bag.take(1)
# compute the frequencies of each property type in the dataset
my_bag.pluck("property_type").frequencies().compute()
[('House', 606),
 ('Apartment', 3626),
 ('Condominium', 399),
 ('Loft', 142),
 ('Guesthouse', 50),
 ('Hostel', 34),
 ('Serviced apartment', 185),
 ('Bed and breakfast', 69),
 ('Treehouse', 1),
 ('Bungalow', 14),
 ('Guest suite', 81),
 ('Townhouse', 108),
 ('Villa', 32),
 ('Cabin', 15),
 ('Other', 18),
 ('Chalet', 2),
 ('Farm stay', 9),
 ('Boutique hotel', 53),
 ('Boat', 2),
 ('Cottage', 20),
 ('Earth house', 1),
 ('Aparthotel', 23),
 ('Resort', 11),
 ('Tiny house', 7),
 ('Nature lodge', 2),
 ('Hotel', 26),
 ('Casa particular (Cuba)', 9),
 ('Barn', 1),
 ('Hut', 1),
 ('Camper/RV', 2),
 ('Heritage hotel (India)', 1),
 ('Pension (South Korea)', 1),
 ('Campsite', 1),
 ('Houseboat', 1),
 ('Castle', 1),
 ('Train', 1)]

After a few seconds, you should see the first record from the dataset. Since MongoDB stores data in the form of JSON documents, you used the Dask Bag API to lazily read in the sample_airbnb dataset (see the Dask documentation on Dask Bag). As usual, operations in Dask are lazy until computed, so you used Bag.take to compute the first record.

Work with Dask#

After you’ve loaded your dataset on to your Coiled cluster, you can perform typical Dask operations. Since this is a rich, unstructured dataset, let’s filter some useful information and get it into a structured Dask DataFrame. You will first flatten down this data to be able to use common pandas operations:

def process(record):
    try:
        yield {
            "accomodates": record["accommodates"],
            "bedrooms": record["bedrooms"],
            "price": float(str(record["price"])),
            "country": record["address"]["country"],
        }
    except KeyError:
        pass

# Filter only apartments 
b_flattened = (
    my_bag.filter(lambda record: record["property_type"] == "Apartment")
    .map(process)
    .flatten()
)

b_flattened.take(3)
({'accomodates': 4, 'bedrooms': 1, 'price': 317.0, 'country': 'Brazil'},
 {'accomodates': 1, 'bedrooms': 1, 'price': 40.0, 'country': 'United States'},
 {'accomodates': 2, 'bedrooms': 1, 'price': 701.0, 'country': 'Brazil'})

You can now convert this Dask Bag into a Dask DataFrame with to_dataframe:

ddf = b_flattened.to_dataframe()

# look at the first 5 rows
ddf.head()
accomodates bedrooms price country
0 4 1 317.0 Brazil
1 1 1 40.0 United States
2 2 1 701.0 Brazil
3 2 1 135.0 United States
4 4 1 119.0 Brazil
# compute the average price of each listing by country
ddf.groupby(["country"])["price"].mean().compute()
country
Australia        168.174174
Brazil           485.767033
Canada            84.860814
Hong Kong        684.622120
Portugal          66.112272
Spain             91.846442
Turkey           366.143552
United States    137.884228
China            448.300000
Name: price, dtype: float64

Don’t forget you can monitor your Dask computations in real-time using the Dask dashboard; get the dashboard address at any time with client.dashboard_link.

Write data in parallel#

With dask_mongo, you can write data to your database deployment in parallel using to_mongo:

# convert dataframe back to a bag before writing
new_bag = ddf.to_bag(index=False, format='dict')

# look at the first record
new_bag.take(1)
({'accomodates': 4, 'bedrooms': 1, 'price': 317.0, 'country': 'Brazil'},)
from dask_mongo import to_mongo

# write in parallel
to_mongo(
    new_bag,
    database="new_database",
    collection="new_collection",
    connection_kwargs={"host": host_uri},
)

Once you’re done, you can shutdown the cluster (it will shutdown automatically after 20 minutes of inactivity):

cluster.close()
client.close()