{ "cells": [ { "cell_type": "markdown", "id": "91b3d13c", "metadata": {}, "source": [ "# Parallel read and write with MongoDB\n", "\n", "In this guide, you'll learn how to use Coiled to create a Dask cluster in your cloud account and use the `dask-mongo` connector to read and write a dataset in parallel. You can download {download}`this jupyter notebook ` to follow along in your own JupyterLab session.\n", "\n", "[MongoDB](https://www.mongodb.com/) is a document-oriented NoSQL database. You can use Coiled and MongoDB together-- MongoDB handles the data storage and indexing while Coiled handles the backend infrastructure for dealing with out-of-memory computations in Python.\n", "\n", "Loading data from MongoDB into Python typically involves using the [PyMongo driver](https://github.com/mongodb/mongo-python-driver). This works well for small datasets, but can be limiting when working with larger datasets and more complex queries. The [dask-mongo](https://github.com/coiled/dask-mongo) connector enables parallel reading and writing between MongoDB your Dask clusters.\n", "\n", "## Before you start\n", "\n", "You'll first need to create consistent local and remote software environments with `dask`, `coiled`, and the necessary dependencies installed. If you are unfamiliar with creating software environments, you can first follow the [tutorial on setting up a custom software environment](https://docs.coiled.io/user_guide/tutorials/matching_coiled_senvs.html).\n", "\n", "You can install `dask-mongo`, `pymongo`, and [coiled-runtime](https://docs.coiled.io/user_guide/software_environment.html#coiled-runtime), a Dask meta-package. Save the following file as `environment.yml`. You can find the most up-to-date version of coiled-runtime from the [latest tag](https://github.com/coiled/coiled-runtime/tags) in the public coiled-runtime repository." ] }, { "cell_type": "markdown", "id": "e4ba6112", "metadata": {}, "source": [ "```\n", "channels:\n", " - conda-forge\n", "dependencies:\n", " - python=3.9\n", " - coiled-runtime=\n", " - dask-mongo=\n", " - pip\n", " - pip:\n", " - pymongo[srv]=\n", " ```" ] }, { "cell_type": "markdown", "id": "c5ae1415", "metadata": {}, "source": [ "Next, create a local software environment using the `environment.yml` file:" ] }, { "cell_type": "markdown", "id": "867ffdf5", "metadata": {}, "source": [ "```\n", "conda env create -f environment.yml -n mongo-example\n", "conda activate mongo-example\n", "```" ] }, { "cell_type": "markdown", "id": "0638cdd1", "metadata": {}, "source": [ "Lastly, create a remote software environment using the same `environment.yml` file:" ] }, { "cell_type": "markdown", "id": "2b4dacc6", "metadata": {}, "source": [ "```\n", "coiled env create -n mongo-example --conda environment.yml\n", "```" ] }, { "cell_type": "markdown", "id": "a941b588", "metadata": {}, "source": [ "## Setting up MongoDB Atlas\n", "\n", "In this section you'll create a database deployed on a free cluster with MongoDB Atlas. You'll then load the `sample_airbnb` dataset provided by Atlas.\n", "\n", "1. Follow MongoDB's [Get Started with Atlas](https://docs.atlas.mongodb.com/getting-started/) tutorial to create an Atlas account and deploy a free cluster.\n", "\n", ":::{note}\n", "For the purposes of this example, you can allow connections from any IP address so your Coiled cluster can connect to your MongoDB instance. You can configure this by selecting Network Access from the left navigation pane and selecting \"allow access from anywhere\" to add CIDR 0.0.0.0/0 (see the MongoDB documentation on [adding IP addresses](https://www.mongodb.com/docs/atlas/security/ip-access-list/#add-ip-access-list-entries)).\n", ":::\n", "\n", "2. Follow the MongoDB guide on [loading sample data](https://docs.atlas.mongodb.com/sample-data/) to load the `sample_airbnb` dataset into your database.\n", "\n", "## Create a Coiled cluster\n", "\n", "In this section, you'll use Coiled to create a Dask cluster in your cloud account. You can start by creating a cluster, using the `software` argument to use the Python environment you created previously:" ] }, { "cell_type": "code", "execution_count": null, "id": "3307da0a", "metadata": { "tags": [] }, "outputs": [], "source": [ "import coiled\n", " \n", "cluster = coiled.Cluster(\n", " name=\"mongo-example\",\n", " n_workers=5,\n", " software=\"mongo-example\",\n", ")" ] }, { "cell_type": "markdown", "id": "f30b7ef0", "metadata": {}, "source": [ "and then connecting Dask to your remote Coiled cluster:" ] }, { "cell_type": "code", "execution_count": 2, "id": "d76c4dc1", "metadata": {}, "outputs": [], "source": [ "from dask.distributed import Client\n", "\n", "client = Client(cluster)" ] }, { "cell_type": "markdown", "id": "efa163d3", "metadata": {}, "source": [ "## Read data in parallel\n", "\n", "Now you're ready to use the `dask-mongo` connector to read data in parallel using your Coiled cluster! First, you'll connect to your database deployment using the PyMongo driver (see the MongoDB guide on [connecting to your Atlas cluster](https://www.mongodb.com/docs/atlas/tutorial/connect-to-your-cluster/#connect-to-your-atlas-cluster)).\n", "\n", "Replace the username, password, and cluster address with your own connection details:" ] }, { "cell_type": "code", "execution_count": 3, "id": "5f0efe37", "metadata": {}, "outputs": [], "source": [ "username = \"\"\n", "password = \"\"\n", "cluster_address = \"
\"\n", "host_uri = f\"mongodb+srv://{username}:{password}@{cluster_address}/?retryWrites=true&w=majority\"" ] }, { "cell_type": "markdown", "id": "c6081dd1", "metadata": {}, "source": [ "Now you're reading to read a sample dataset in parallel with `dask-mongo`:" ] }, { "cell_type": "code", "execution_count": 4, "id": "cedd98b5", "metadata": {}, "outputs": [], "source": [ "from dask_mongo import read_mongo\n", "\n", "# lazily read in the sample_airbnb dataset\n", "my_bag = read_mongo(\n", " connection_kwargs={\"host\": host_uri},\n", " database=\"sample_airbnb\",\n", " collection=\"listingsAndReviews\",\n", " chunksize=500,\n", ")" ] }, { "cell_type": "code", "execution_count": 6, "id": "e9c998cc", "metadata": {}, "outputs": [], "source": [ "# uncomment to look at the first record of the dataset\n", "# my_bag.take(1)" ] }, { "cell_type": "code", "execution_count": 7, "id": "1a2189fe", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('House', 606),\n", " ('Apartment', 3626),\n", " ('Condominium', 399),\n", " ('Loft', 142),\n", " ('Guesthouse', 50),\n", " ('Hostel', 34),\n", " ('Serviced apartment', 185),\n", " ('Bed and breakfast', 69),\n", " ('Treehouse', 1),\n", " ('Bungalow', 14),\n", " ('Guest suite', 81),\n", " ('Townhouse', 108),\n", " ('Villa', 32),\n", " ('Cabin', 15),\n", " ('Other', 18),\n", " ('Chalet', 2),\n", " ('Farm stay', 9),\n", " ('Boutique hotel', 53),\n", " ('Boat', 2),\n", " ('Cottage', 20),\n", " ('Earth house', 1),\n", " ('Aparthotel', 23),\n", " ('Resort', 11),\n", " ('Tiny house', 7),\n", " ('Nature lodge', 2),\n", " ('Hotel', 26),\n", " ('Casa particular (Cuba)', 9),\n", " ('Barn', 1),\n", " ('Hut', 1),\n", " ('Camper/RV', 2),\n", " ('Heritage hotel (India)', 1),\n", " ('Pension (South Korea)', 1),\n", " ('Campsite', 1),\n", " ('Houseboat', 1),\n", " ('Castle', 1),\n", " ('Train', 1)]" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# compute the frequencies of each property type in the dataset\n", "my_bag.pluck(\"property_type\").frequencies().compute()" ] }, { "cell_type": "markdown", "id": "4e693d23", "metadata": {}, "source": [ "After a few seconds, you should see the first record from the dataset. Since MongoDB stores data in the form of JSON documents, you used the Dask Bag API to lazily read in the `sample_airbnb` dataset (see the [Dask documentation on Dask Bag](https://docs.dask.org/en/stable/bag.html)). As usual, operations in Dask are lazy until computed, so you used `Bag.take` to compute the first record. \n", "\n", "## Work with Dask\n", "\n", "After you've loaded your dataset on to your Coiled cluster, you can perform typical Dask operations. Since this is a rich, unstructured dataset, let's filter some useful information and get it into a structured Dask DataFrame. You will first flatten down this data to be able to use common pandas operations:" ] }, { "cell_type": "code", "execution_count": 8, "id": "48f834b1", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "({'accomodates': 4, 'bedrooms': 1, 'price': 317.0, 'country': 'Brazil'},\n", " {'accomodates': 1, 'bedrooms': 1, 'price': 40.0, 'country': 'United States'},\n", " {'accomodates': 2, 'bedrooms': 1, 'price': 701.0, 'country': 'Brazil'})" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def process(record):\n", " try:\n", " yield {\n", " \"accomodates\": record[\"accommodates\"],\n", " \"bedrooms\": record[\"bedrooms\"],\n", " \"price\": float(str(record[\"price\"])),\n", " \"country\": record[\"address\"][\"country\"],\n", " }\n", " except KeyError:\n", " pass\n", "\n", "# Filter only apartments \n", "b_flattened = (\n", " my_bag.filter(lambda record: record[\"property_type\"] == \"Apartment\")\n", " .map(process)\n", " .flatten()\n", ")\n", "\n", "b_flattened.take(3)" ] }, { "cell_type": "markdown", "id": "b5380e0d", "metadata": {}, "source": [ "You can now convert this Dask Bag into a Dask DataFrame with `to_dataframe`:" ] }, { "cell_type": "code", "execution_count": 9, "id": "c6faf80c", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
accomodatesbedroomspricecountry
041317.0Brazil
11140.0United States
221701.0Brazil
321135.0United States
441119.0Brazil
\n", "
" ], "text/plain": [ " accomodates bedrooms price country\n", "0 4 1 317.0 Brazil\n", "1 1 1 40.0 United States\n", "2 2 1 701.0 Brazil\n", "3 2 1 135.0 United States\n", "4 4 1 119.0 Brazil" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf = b_flattened.to_dataframe()\n", "\n", "# look at the first 5 rows\n", "ddf.head()" ] }, { "cell_type": "code", "execution_count": 10, "id": "ef998795", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "country\n", "Australia 168.174174\n", "Brazil 485.767033\n", "Canada 84.860814\n", "Hong Kong 684.622120\n", "Portugal 66.112272\n", "Spain 91.846442\n", "Turkey 366.143552\n", "United States 137.884228\n", "China 448.300000\n", "Name: price, dtype: float64" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# compute the average price of each listing by country\n", "ddf.groupby([\"country\"])[\"price\"].mean().compute()" ] }, { "cell_type": "markdown", "id": "97808eb9", "metadata": {}, "source": [ "Don't forget you can monitor your Dask computations in real-time using the [Dask dashboard](https://docs.dask.org/en/stable/dashboard.html); get the dashboard address at any time with `client.dashboard_link`." ] }, { "cell_type": "markdown", "id": "f2acf16d", "metadata": {}, "source": [ "## Write data in parallel\n", "\n", "With `dask_mongo`, you can write data to your database deployment in parallel using `to_mongo`:" ] }, { "cell_type": "code", "execution_count": 11, "id": "d86bd5fb", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "({'accomodates': 4, 'bedrooms': 1, 'price': 317.0, 'country': 'Brazil'},)" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# convert dataframe back to a bag before writing\n", "new_bag = ddf.to_bag(index=False, format='dict')\n", "\n", "# look at the first record\n", "new_bag.take(1)" ] }, { "cell_type": "code", "execution_count": 12, "id": "3f83c746", "metadata": {}, "outputs": [], "source": [ "from dask_mongo import to_mongo\n", "\n", "# write in parallel\n", "to_mongo(\n", " new_bag,\n", " database=\"new_database\",\n", " collection=\"new_collection\",\n", " connection_kwargs={\"host\": host_uri},\n", ")" ] }, { "cell_type": "markdown", "id": "b17004a6", "metadata": {}, "source": [ "Once you're done, you can shutdown the cluster (it will shutdown automatically after 20 minutes of inactivity):" ] }, { "cell_type": "code", "execution_count": 13, "id": "bd50be86", "metadata": {}, "outputs": [], "source": [ "cluster.close()\n", "client.close()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.13" } }, "nbformat": 4, "nbformat_minor": 5 }