Scaling and Optimizing Data Pipelines with Polars
Liam Brannigan
Data Scientist & Polars Contributor




requests = pl.scan_parquet("311_Service_Requests.parquet")
query = (
requests
.filter(pl.col("STATUS") == "Completed")
.group_by("DEPARTMENT")
.len()
.sort("len", descending=True)
)
query.collect()
shape: (5, 2)
| DEPARTMENT | len |
| --- | --- |
| str | u32 |
|-------------------|---------|
| 311 City Services | 4859161 |
| Sanitation | 3406631 |
| Aviation | 2337842 |
| Transportation | 1468240 |
| Water | 416535 |
query.collect(
engine="streaming"
)
shape: (5, 2)
| DEPARTMENT | len |
| --- | --- |
| str | u32 |
|-------------------|---------|
| 311 City Services | 4859161 |
| Sanitation | 3406631 |
| Aviation | 2337842 |
| Transportation | 1468240 |
| Water | 416535 |

import polars as pl
pl.Config.set_engine_affinity("streaming")
query.collect(
engine="gpu"
)
$$
$$
$$
$$

completed_requests = (
requests
.filter(pl.col("STATUS") == "Completed")
.select("SR_NUMBER", "TYPE", "DEPARTMENT", "CREATED_DATE")
)
for batch in completed_requests.collect_batches(
):
for batch in completed_requests.collect_batches(
chunk_size=50_000,
):
for batch in completed_requests.collect_batches(
chunk_size=50_000,
):
print(batch.shape)
batch_json = batch.write_json()
(50000, 4)
(50000, 4)
...
(50000, 4)
(12409, 4)
Scaling and Optimizing Data Pipelines with Polars