MongoDB#
In this guide, you’ll learn how to use Coiled to create a Dask cluster in your cloud account and use the dask-mongo
connector to read and write a dataset in parallel. You can download this jupyter notebook
to follow along in your own JupyterLab session.
MongoDB is a document-oriented NoSQL database. You can use Coiled and MongoDB together– MongoDB handles the data storage and indexing while Coiled handles the backend infrastructure for dealing with out-of-memory computations in Python.
Loading data from MongoDB into Python typically involves using the PyMongo driver. This works well for small datasets, but can be limiting when working with larger datasets and more complex queries. The dask-mongo connector enables parallel reading and writing between MongoDB your Dask clusters.
Before you start#
You’ll first need install the necessary packages, For the purposes of this example, we’ll do this in a new virtual environment, but you could also install them in whatever environment you’re already using for your project.
$ conda create -n mongo-example -c conda-forge python=3.9 dask-mongo coiled dask jupyterlab dask-labextension pip
$ conda activate mongo-example
(mongo-example) $ pip install 'pymongo[srv]'
You also could use pip, or any other package manager you prefer; conda isn’t required.
When you create a cluster, Coiled will automatically replicate your local mongo-example
environment in your cluster.
Setting up MongoDB Atlas#
In this section you’ll create a database deployed on a free cluster with MongoDB Atlas. You’ll then load the sample_airbnb
dataset provided by Atlas.
Follow MongoDB’s Get Started with Atlas tutorial to create an Atlas account and deploy a free cluster.
Note
For the purposes of this example, you can allow connections from any IP address so your Coiled cluster can connect to your MongoDB instance. You can configure this by selecting Network Access from the left navigation pane and selecting “allow access from anywhere” to add CIDR 0.0.0.0/0 (see the MongoDB documentation on adding IP addresses).
Follow the MongoDB guide on loading sample data to load the
sample_airbnb
dataset into your database.
Create a Coiled cluster#
In this section, you’ll use Coiled to create a Dask cluster in your cloud account. You can start by creating a cluster, using the software
argument to use the Python environment you created previously:
import coiled
cluster = coiled.Cluster(
name="mongo-example",
n_workers=5
)
client = cluster.get_client()
Read data in parallel#
Now you’re ready to use the dask-mongo
connector to read data in parallel using your Coiled cluster! First, you’ll connect to your database deployment using the PyMongo driver (see the MongoDB guide on connecting to your Atlas cluster).
Replace the username, password, and cluster address with your own connection details:
username = "<USERNAME>"
password = "<PASSWORD>"
cluster_address = "<ADDRESS>"
host_uri = f"mongodb+srv://{username}:{password}@{cluster_address}/?retryWrites=true&w=majority"
Now you’re reading to read a sample dataset in parallel with dask-mongo
:
from dask_mongo import read_mongo
# lazily read in the sample_airbnb dataset
my_bag = read_mongo(
connection_kwargs={"host": host_uri},
database="sample_airbnb",
collection="listingsAndReviews",
chunksize=500,
)
# uncomment to look at the first record of the dataset
# my_bag.take(1)
# compute the frequencies of each property type in the dataset
my_bag.pluck("property_type").frequencies().compute()
[('House', 606),
('Apartment', 3626),
('Condominium', 399),
('Loft', 142),
('Guesthouse', 50),
('Hostel', 34),
('Serviced apartment', 185),
('Bed and breakfast', 69),
('Treehouse', 1),
('Bungalow', 14),
('Guest suite', 81),
('Townhouse', 108),
('Villa', 32),
('Cabin', 15),
('Other', 18),
('Chalet', 2),
('Farm stay', 9),
('Boutique hotel', 53),
('Boat', 2),
('Cottage', 20),
('Earth house', 1),
('Aparthotel', 23),
('Resort', 11),
('Tiny house', 7),
('Nature lodge', 2),
('Hotel', 26),
('Casa particular (Cuba)', 9),
('Barn', 1),
('Hut', 1),
('Camper/RV', 2),
('Heritage hotel (India)', 1),
('Pension (South Korea)', 1),
('Campsite', 1),
('Houseboat', 1),
('Castle', 1),
('Train', 1)]
After a few seconds, you should see the first record from the dataset. Since MongoDB stores data in the form of JSON documents, you used the Dask Bag API to lazily read in the sample_airbnb
dataset (see the Dask documentation on Dask Bag). As usual, operations in Dask are lazy until computed, so you used Bag.take
to compute the first record.
Work with Dask#
After you’ve loaded your dataset on to your Coiled cluster, you can perform typical Dask operations. Since this is a rich, unstructured dataset, let’s filter some useful information and get it into a structured Dask DataFrame. You will first flatten down this data to be able to use common pandas operations:
def process(record):
try:
yield {
"accomodates": record["accommodates"],
"bedrooms": record["bedrooms"],
"price": float(str(record["price"])),
"country": record["address"]["country"],
}
except KeyError:
pass
# Filter only apartments
b_flattened = (
my_bag.filter(lambda record: record["property_type"] == "Apartment")
.map(process)
.flatten()
)
b_flattened.take(3)
({'accomodates': 4, 'bedrooms': 1, 'price': 317.0, 'country': 'Brazil'},
{'accomodates': 1, 'bedrooms': 1, 'price': 40.0, 'country': 'United States'},
{'accomodates': 2, 'bedrooms': 1, 'price': 701.0, 'country': 'Brazil'})
You can now convert this Dask Bag into a Dask DataFrame with to_dataframe
:
ddf = b_flattened.to_dataframe()
# look at the first 5 rows
ddf.head()
accomodates | bedrooms | price | country | |
---|---|---|---|---|
0 | 4 | 1 | 317.0 | Brazil |
1 | 1 | 1 | 40.0 | United States |
2 | 2 | 1 | 701.0 | Brazil |
3 | 2 | 1 | 135.0 | United States |
4 | 4 | 1 | 119.0 | Brazil |
# compute the average price of each listing by country
ddf.groupby(["country"])["price"].mean().compute()
country
Australia 168.174174
Brazil 485.767033
Canada 84.860814
Hong Kong 684.622120
Portugal 66.112272
Spain 91.846442
Turkey 366.143552
United States 137.884228
China 448.300000
Name: price, dtype: float64
Don’t forget you can monitor your Dask computations in real-time using the Dask dashboard; get the dashboard address at any time with client.dashboard_link
.
Write data in parallel#
With dask_mongo
, you can write data to your database deployment in parallel using to_mongo
:
# convert dataframe back to a bag before writing
new_bag = ddf.to_bag(index=False, format='dict')
# look at the first record
new_bag.take(1)
({'accomodates': 4, 'bedrooms': 1, 'price': 317.0, 'country': 'Brazil'},)
from dask_mongo import to_mongo
# write in parallel
to_mongo(
new_bag,
database="new_database",
collection="new_collection",
connection_kwargs={"host": host_uri},
)
Once you’re done, you can shutdown the cluster (it will shutdown automatically after 20 minutes of inactivity):
cluster.shutdown()