Scaling and Optimizing Data Pipelines with Polars
Liam Brannigan
Data Scientist & Polars Contributor
monthly_requests
shape: (5, 3)
| MONTH | DEPARTMENT | count |
| --- | --- | --- |
| i8 | str | u32 |
|-------|-----------------|----------|
| 1 | Aviation | 1 |
| 1 | Animal Control | 342 |
| 1 | Fire | 2 |
| 1 | Health | 42 |
| 2 | Finance | 47 |
department_names = ["Aviation", "Animal Control", "Finance", "Fire", "Health"]
monthly_requests.pivot(
)
monthly_requests.pivot(
on="DEPARTMENT",
index="MONTH",
values="count",
)
monthly_requests.pivot(
on="DEPARTMENT",
index="MONTH",
values="count",
on_columns=department_names,
)
shape: (8, 6)
| MONTH | Aviation | Animal Control | Finance | Fire | Health |
| --- | --- | --- | --- | --- | --- |
| i8 | u32 | u32 | u32 | u32 | u32 |
|-------|----------|----------------|---------|------|--------|
| 1 | 1 | 342 | 0 | 2 | 42 |
| 2 | 1086 | 311 | 47 | 10 | 28 |
| 3 | 386 | 309 | 16 | 2 | 46 |
| 4 | 4 | 260 | 0 | 1 | 44 |
result, timings = requests
result, timings = requests.with_columns(pl.selectors.integer().cast(pl.Int32))
result, timings = requests.with_columns(pl.selectors.integer().cast(pl.Int32)).filter(
pl.col("STATUS") == "Completed"
)
result, timings = requests.with_columns(pl.selectors.integer().cast(pl.Int32)).filter(
pl.col("STATUS") == "Completed"
).sort("COST")
result, timings = requests.with_columns(pl.selectors.integer().cast(pl.Int32)).filter(
pl.col("STATUS") == "Completed"
).sort("COST").profile()
timings
shape: (3, 4)
| node | start | end |
| --- | --- | --- |
| str | u64 | u64 |
|---------------------------------|----------|----------|
| optimization | 0 | 16115139 |
| with_column(ZIP_CODE, STREET_N | 16115139 | 18423529 |
| sort(COST) | 18423529 | 41897636 |
timings
shape: (3, 4)
| node | start | end | duration_seconds |
| --- | --- | --- | --- |
| str | u64 | u64 | f64 |
|---------------------------------|----------|----------|------------------|
| optimization | 0 | 16115139 | 16.11 |
| with_column(ZIP_CODE, STREET_N | 16115139 | 18423529 | 2.030 |
| sort(COST) | 18423529 | 41897636 | 23.47 |


requests
requests.set_sorted("CREATED_DATE")
requests.set_sorted("CREATED_DATE").filter(
pl.col("CREATED_DATE") < pl.date(2020,1,1)
).collect()
shape: (4, 39)
| TYPE | STATUS | DEPARTMENT | CREATED_DATE | ... |
| --- | --- | --- | --- | --- |
| str | str | str | str | ... |
|------------------------------|-----------|----------------|---------------------|-----|
| Pothole in Street Complaint | Completed | Transportation | 2019-12-16T10:09:08 | ... |
| Tree Trim Request | Canceled | Sanitation | 2019-09-18T01:05:08 | ... |
| Pothole in Street Complaint | Completed | Transportation | 2019-03-21T10:41:01 | ... |
| Alley Light Out | Completed | Sanitation | 2019-11-03T08:22:15 | ... |
requests.set_sorted("CREATED_DATE").select(
)
requests.set_sorted("CREATED_DATE").select(
pl.col("CREATED_DATE").min().alias("min"),
pl.col("CREATED_DATE").median().alias("median"),
)
shape: (1, 2)
| min | median |
| --- | --- |
| datetime[µs] | datetime[µs] |
|---------------------|-------------------------|
| 2018-07-01 01:01:29 | 2019-06-03 09:46:29.500 |
requests.set_sorted("CREATED_DATE").join(
responses.set_sorted("CREATED_DATE"),on="CREATED_DATE"
)
Scaling and Optimizing Data Pipelines with Polars