https://pandas.pydata.org/static/img/pandas.svg https://docs.dask.org/en/stable/_images/dask_horizontal.svg

Cloud Parquet ETL with Dask DataFrame

This notebook looks at every Uber and Lyft ride in New York City over the last several years. Because the data is large, we operate on a cluster in the cloud.

Install Dask and Coiled

This example requires the packages dask s3fs matplotlib coiled. We do this in a new virtual environment below, but you could also install them in whatever environment you’re already using.

conda create -n coiled-nyc -c conda-forge python=3.11 coiled dask s3fs matplotlib
conda activate coiled-nyc

You also could use pip for everything, or any other package manager you prefer. conda isn’t required.

When you later create a Coiled cluster, your local coiled-nyc environment will be automatically replicated on your cluster.

Create Dask cluster in the cloud

This dataset is ~100 GB in size, and so is too large for a typical laptop. We use Coiled to create a Dask cluster that’s big enough to handle our data comfortably.

import coiled

cluster = coiled.Cluster(
    n_workers=30,
    region="us-east-2",  # Start workers in same region as data to minimize costs
)

client = cluster.get_client()

This is the only Coiled-specific code. The remainder of this example is normal Dask code that would look the same for any Dask deployment.

Load and prepare data

Let’s look at our data using Dask DataFrame’s read_parquet function to load our dataset from S3.

import dask.dataframe as dd

df = dd.read_parquet("s3://coiled-data/uber/")
df.head()
hvfhs_license_num dispatching_base_num originating_base_num request_datetime on_scene_datetime pickup_datetime dropoff_datetime PULocationID DOLocationID trip_miles ... sales_tax congestion_surcharge airport_fee tips driver_pay shared_request_flag shared_match_flag access_a_ride_flag wav_request_flag wav_match_flag
0 HV0003 B02867 B02867 2019-02-01 00:01:26 2019-02-01 00:02:55 2019-02-01 00:05:18 2019-02-01 00:14:57 245 251 2.45 ... 0.83 0.0 NaN 0.0 7.480000 Y N N N NaN
1 HV0003 B02879 B02879 2019-02-01 00:26:08 2019-02-01 00:41:29 2019-02-01 00:41:29 2019-02-01 00:49:39 216 197 1.71 ... 0.70 0.0 NaN 2.0 7.930000 N N N N NaN
2 HV0005 B02510 <NA> 2019-02-01 00:48:58 NaT 2019-02-01 00:51:34 2019-02-01 01:28:29 261 234 5.01 ... 3.99 0.0 NaN 0.0 35.970001 N Y N N NaN
3 HV0005 B02510 <NA> 2019-02-01 00:02:15 NaT 2019-02-01 00:03:51 2019-02-01 00:07:16 87 87 0.34 ... 0.64 0.0 NaN 3.0 5.390000 N Y N N NaN
4 HV0005 B02510 <NA> 2019-02-01 00:06:17 NaT 2019-02-01 00:09:44 2019-02-01 00:39:56 87 198 6.84 ... 2.16 0.0 NaN 4.0 17.070000 N Y N N NaN

5 rows × 24 columns

In preparation for exploring this dataset, let’s apply a few of feature engineering steps. Specifically:

  • Create a new tipped column that indicates if a ride included a tip or not.

  • Create a new tip_frac column for the tip amount relative to the overall cost of the ride.

  • Replace the hvfhs_license_num column with a more familiar service column with names like "uber" instead of codes like "HV0003".

df["tipped"] = df["tips"] != 0
df["tip_frac"] = df["tips"] / (df["base_passenger_fare"] + df["tolls"] + df["bcf"] + df["sales_tax"] + df["congestion_surcharge"].fillna(0) + df["airport_fee"].fillna(0))
df["service"] = df["hvfhs_license_num"].map({
    "HV0003": "uber",
    "HV0005": "lyft",
    "HV0002": "juno",
    "HV0004": "via",
})
df = df.drop(columns="hvfhs_license_num")
df.head()
dispatching_base_num originating_base_num request_datetime on_scene_datetime pickup_datetime dropoff_datetime PULocationID DOLocationID trip_miles trip_time ... tips driver_pay shared_request_flag shared_match_flag access_a_ride_flag wav_request_flag wav_match_flag tipped tip_frac service
0 B02867 B02867 2019-02-01 00:01:26 2019-02-01 00:02:55 2019-02-01 00:05:18 2019-02-01 00:14:57 245 251 2.45 579 ... 0.0 7.480000 Y N N N NaN False 0.000000 uber
1 B02879 B02879 2019-02-01 00:26:08 2019-02-01 00:41:29 2019-02-01 00:41:29 2019-02-01 00:49:39 216 197 1.71 490 ... 2.0 7.930000 N N N N NaN True 0.227015 uber
2 B02510 <NA> 2019-02-01 00:48:58 NaT 2019-02-01 00:51:34 2019-02-01 01:28:29 261 234 5.01 2159 ... 0.0 35.970001 N Y N N NaN False 0.000000 lyft
3 B02510 <NA> 2019-02-01 00:02:15 NaT 2019-02-01 00:03:51 2019-02-01 00:07:16 87 87 0.34 179 ... 3.0 5.390000 N Y N N NaN True 0.374532 lyft
4 B02510 <NA> 2019-02-01 00:06:17 NaT 2019-02-01 00:09:44 2019-02-01 00:39:56 87 198 6.84 1799 ... 4.0 17.070000 N Y N N NaN True 0.147438 lyft

5 rows × 26 columns

We’re now ready to explore this dataset in more detail.

Explore

Let’s analyze our dataset to answer a few questions about tipping practices and driver pay.

As a first step, let’s load the dataset into our cluster’s distributed memory using df.persist(). This loads and caches the dataset on the cluster, allowing us to avoid repeated expensive data loading steps when doing multiple computations on the same dataset.

df = df.persist()

Tipping frequency

How often do New Yorkers tip?

df["tipped"].mean().compute()
0.15895858700806212

About 16% of the time.

Broken down by carrier

Do ride-share services get tipped at different rates?

df.groupby("service").tipped.mean().compute()
service
juno    0.084400
lyft    0.191230
uber    0.149856
via     0.092949
Name: tipped, dtype: float64

Lyft riders tip more often at a frequency of 19% than Uber riders at 15%.

Tipping amount

Do riders of different services tip different amounts?

df = df.set_index("pickup_datetime").persist()
import matplotlib.pyplot as plt

ax = plt.subplot()
df2 = df.loc[df["tipped"]]
df2.loc[df2["service"] == "uber", "tip_frac"].resample("1w").median().compute().plot(ax=ax)
df2.loc[df2["service"] == "lyft", "tip_frac"].resample("1w").median().compute().plot(ax=ax)
plt.legend(["Uber", "Lyft"])
plt.title("How much do riders tip?")
plt.xlabel("Time")
plt.ylabel("Tip [%]");
../_images/fd639592807b5acf8d6b64b5fa08ff741f51b95ae9e13b95f837b1a2721c50c4.png

We can see Lyft rider tip relatively consistently at ~18%, while Uber rider’s tip amount is more volatile and generally less than Lyft riders.

Driver Pay

Let’s also take a look at how much drivers make relative to how much passengers pay.

df.base_passenger_fare.sum().compute()
15818734000.0
df.driver_pay.sum().compute()
12794474000.0

Riders spent about $16 Billion over the range of the dataset, and Drivers took home almost all of that (about $13 Billion). Let’s see how that tracks over time.

import matplotlib.pyplot as plt

ax = plt.subplot()
df.base_passenger_fare.resample("1w").sum().compute().plot(ax=ax)
df.driver_pay.resample("1w").sum().compute().plot(ax=ax)
plt.legend(["Base Passener Fare", "Driver Pay"])
plt.xlabel("Time")
plt.ylabel("Weekly Revenue ($)");
../_images/28817e0ed87eface34da0714dc6aa53ff8d61ae7750a24b23dc270af6e1495ec.png

We see some big events, like the massive drop in ridership during COVID-19 and a smaller drop when the Omicron variant was first detected, along with an eventual recovery to greater-than-pre-COVID levels.

We can also see how, ever since the financial outlook of Uber/Lyft style companies has become a bit less friendly, they’ve started to ask for a larger share of revenue from their rides.

(df.driver_pay.resample("1w").sum() / df.base_passenger_fare.resample("1w").sum()).compute().plot()

plt.title("How much of a fare do drivers take home?")
plt.xlabel("Time")
plt.ylabel("Percentage (%)");
../_images/01c7813a47b8b80b9f99d052f3af7d4215456bcd2904770942749ff662287c43.png

What’s next?

The Uber/Lyft Rides dataset is complex and rich. What other questions do you want to answer?

Conclusion

Dask DataFrame makes it easy to analyze a larger-than-memory cloud dataset using the familiar pandas API. Coiled makes it easy to scale up computing resources by creating large Dask clusters on the cloud with just a few lines of code.

If you have a cloud account it’s easy to try this out yourself. The data is public and this computation costs less than $0.10 in AWS cloud costs. You can sign up under the Coiled free tier here (setup takes a couple minutes).