Cloud Parquet ETL with Dask DataFrame#
This notebook looks at every Uber and Lyft ride in New York City over the last several years. Because the data is large, we operate on a cluster in the cloud.
Install Dask and Coiled#
This example requires the packages dask s3fs matplotlib coiled
. We do this in a new virtual environment below, but you could also install them in whatever environment you’re already using.
conda create -n coiled-nyc -c conda-forge python=3.11 coiled dask s3fs matplotlib
conda activate coiled-nyc
You also could use pip
for everything, or any other package manager you prefer. conda
isn’t required.
When you later create a Coiled cluster, your local coiled-nyc
environment will be automatically replicated on your cluster.
Create Dask cluster in the cloud#
This dataset is ~100 GB in size, and so is too large for a typical laptop. We use Coiled to create a Dask cluster that’s big enough to handle our data comfortably.
import coiled
cluster = coiled.Cluster(
n_workers=30,
region="us-east-2", # Start workers in same region as data to minimize costs
)
client = cluster.get_client()
This is the only Coiled-specific code. The remainder of this example is normal Dask code that would look the same for any Dask deployment.
Load and prepare data#
Let’s look at our data using Dask DataFrame’s read_parquet
function to load our dataset from S3.
import dask.dataframe as dd
df = dd.read_parquet("s3://coiled-data/uber/")
df.head()
hvfhs_license_num | dispatching_base_num | originating_base_num | request_datetime | on_scene_datetime | pickup_datetime | dropoff_datetime | PULocationID | DOLocationID | trip_miles | ... | sales_tax | congestion_surcharge | airport_fee | tips | driver_pay | shared_request_flag | shared_match_flag | access_a_ride_flag | wav_request_flag | wav_match_flag | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | HV0003 | B02867 | B02867 | 2019-02-01 00:01:26 | 2019-02-01 00:02:55 | 2019-02-01 00:05:18 | 2019-02-01 00:14:57 | 245 | 251 | 2.45 | ... | 0.83 | 0.0 | NaN | 0.0 | 7.480000 | Y | N | N | N | NaN |
1 | HV0003 | B02879 | B02879 | 2019-02-01 00:26:08 | 2019-02-01 00:41:29 | 2019-02-01 00:41:29 | 2019-02-01 00:49:39 | 216 | 197 | 1.71 | ... | 0.70 | 0.0 | NaN | 2.0 | 7.930000 | N | N | N | N | NaN |
2 | HV0005 | B02510 | <NA> | 2019-02-01 00:48:58 | NaT | 2019-02-01 00:51:34 | 2019-02-01 01:28:29 | 261 | 234 | 5.01 | ... | 3.99 | 0.0 | NaN | 0.0 | 35.970001 | N | Y | N | N | NaN |
3 | HV0005 | B02510 | <NA> | 2019-02-01 00:02:15 | NaT | 2019-02-01 00:03:51 | 2019-02-01 00:07:16 | 87 | 87 | 0.34 | ... | 0.64 | 0.0 | NaN | 3.0 | 5.390000 | N | Y | N | N | NaN |
4 | HV0005 | B02510 | <NA> | 2019-02-01 00:06:17 | NaT | 2019-02-01 00:09:44 | 2019-02-01 00:39:56 | 87 | 198 | 6.84 | ... | 2.16 | 0.0 | NaN | 4.0 | 17.070000 | N | Y | N | N | NaN |
5 rows × 24 columns
In preparation for exploring this dataset, let’s apply a few of feature engineering steps. Specifically:
Create a new
tipped
column that indicates if a ride included a tip or not.Create a new
tip_frac
column for the tip amount relative to the overall cost of the ride.Replace the
hvfhs_license_num
column with a more familiarservice
column with names like"uber"
instead of codes like"HV0003"
.
df["tipped"] = df["tips"] != 0
df["tip_frac"] = df["tips"] / (df["base_passenger_fare"] + df["tolls"] + df["bcf"] + df["sales_tax"] + df["congestion_surcharge"].fillna(0) + df["airport_fee"].fillna(0))
df["service"] = df["hvfhs_license_num"].map({
"HV0003": "uber",
"HV0005": "lyft",
"HV0002": "juno",
"HV0004": "via",
})
df = df.drop(columns="hvfhs_license_num")
df.head()
dispatching_base_num | originating_base_num | request_datetime | on_scene_datetime | pickup_datetime | dropoff_datetime | PULocationID | DOLocationID | trip_miles | trip_time | ... | tips | driver_pay | shared_request_flag | shared_match_flag | access_a_ride_flag | wav_request_flag | wav_match_flag | tipped | tip_frac | service | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | B02867 | B02867 | 2019-02-01 00:01:26 | 2019-02-01 00:02:55 | 2019-02-01 00:05:18 | 2019-02-01 00:14:57 | 245 | 251 | 2.45 | 579 | ... | 0.0 | 7.480000 | Y | N | N | N | NaN | False | 0.000000 | uber |
1 | B02879 | B02879 | 2019-02-01 00:26:08 | 2019-02-01 00:41:29 | 2019-02-01 00:41:29 | 2019-02-01 00:49:39 | 216 | 197 | 1.71 | 490 | ... | 2.0 | 7.930000 | N | N | N | N | NaN | True | 0.227015 | uber |
2 | B02510 | <NA> | 2019-02-01 00:48:58 | NaT | 2019-02-01 00:51:34 | 2019-02-01 01:28:29 | 261 | 234 | 5.01 | 2159 | ... | 0.0 | 35.970001 | N | Y | N | N | NaN | False | 0.000000 | lyft |
3 | B02510 | <NA> | 2019-02-01 00:02:15 | NaT | 2019-02-01 00:03:51 | 2019-02-01 00:07:16 | 87 | 87 | 0.34 | 179 | ... | 3.0 | 5.390000 | N | Y | N | N | NaN | True | 0.374532 | lyft |
4 | B02510 | <NA> | 2019-02-01 00:06:17 | NaT | 2019-02-01 00:09:44 | 2019-02-01 00:39:56 | 87 | 198 | 6.84 | 1799 | ... | 4.0 | 17.070000 | N | Y | N | N | NaN | True | 0.147438 | lyft |
5 rows × 26 columns
We’re now ready to explore this dataset in more detail.
Explore#
Let’s analyze our dataset to answer a few questions about tipping practices and driver pay.
As a first step, let’s load the dataset into our cluster’s distributed memory using df.persist()
.
This loads and caches the dataset on the cluster, allowing us to avoid repeated expensive data loading steps when doing multiple computations on the same dataset.
df = df.persist()
Tipping frequency#
How often do New Yorkers tip?
df["tipped"].mean().compute()
0.15895858700806212
About 16% of the time.
Broken down by carrier#
Do ride-share services get tipped at different rates?
df.groupby("service").tipped.mean().compute()
service
juno 0.084400
lyft 0.191230
uber 0.149856
via 0.092949
Name: tipped, dtype: float64
Lyft riders tip more often at a frequency of 19% than Uber riders at 15%.
Tipping amount#
Do riders of different services tip different amounts?
df = df.set_index("pickup_datetime").persist()
import matplotlib.pyplot as plt
ax = plt.subplot()
df2 = df.loc[df["tipped"]]
df2.loc[df2["service"] == "uber", "tip_frac"].resample("1w").median().compute().plot(ax=ax)
df2.loc[df2["service"] == "lyft", "tip_frac"].resample("1w").median().compute().plot(ax=ax)
plt.legend(["Uber", "Lyft"])
plt.title("How much do riders tip?")
plt.xlabel("Time")
plt.ylabel("Tip [%]");
We can see Lyft rider tip relatively consistently at ~18%, while Uber rider’s tip amount is more volatile and generally less than Lyft riders.
Driver Pay#
Let’s also take a look at how much drivers make relative to how much passengers pay.
df.base_passenger_fare.sum().compute()
15818734000.0
df.driver_pay.sum().compute()
12794474000.0
Riders spent about $
16 Billion over the range of the dataset, and Drivers took home almost all of that (about $
13 Billion). Let’s see how that tracks over time.
import matplotlib.pyplot as plt
ax = plt.subplot()
df.base_passenger_fare.resample("1w").sum().compute().plot(ax=ax)
df.driver_pay.resample("1w").sum().compute().plot(ax=ax)
plt.legend(["Base Passener Fare", "Driver Pay"])
plt.xlabel("Time")
plt.ylabel("Weekly Revenue ($)");
We see some big events, like the massive drop in ridership during COVID-19 and a smaller drop when the Omicron variant was first detected, along with an eventual recovery to greater-than-pre-COVID levels.
We can also see how, ever since the financial outlook of Uber/Lyft style companies has become a bit less friendly, they’ve started to ask for a larger share of revenue from their rides.
(df.driver_pay.resample("1w").sum() / df.base_passenger_fare.resample("1w").sum()).compute().plot()
plt.title("How much of a fare do drivers take home?")
plt.xlabel("Time")
plt.ylabel("Percentage (%)");
What’s next?#
The Uber/Lyft Rides dataset is complex and rich. What other questions do you want to answer?
Conclusion#
Dask DataFrame makes it easy to analyze a larger-than-memory cloud dataset using the familiar pandas
API. Coiled makes it easy to scale up computing resources by creating large Dask clusters on the cloud with just a few lines of code.
If you have a cloud account it’s easy to try this out yourself. The data is public and this computation costs less than $0.10 in AWS cloud costs. You can sign up under the Coiled free tier here (setup takes a couple minutes).