Scaling and Optimizing Data Pipelines with Polars
Liam Brannigan
Data Scientist & Polars Contributor


def completed_by_department(requests: pl.LazyFrame) -> pl.LazyFrame:
return
def completed_by_department(requests: pl.LazyFrame) -> pl.LazyFrame:
return (
requests
)
def completed_by_department(requests: pl.LazyFrame) -> pl.LazyFrame:
return (
requests
.group_by("DEPARTMENT")
.len()
)
def completed_by_department(requests: pl.LazyFrame) -> pl.LazyFrame:
return (
requests
.group_by("DEPARTMENT")
.len()
.with_columns(pl.col("len").cast(pl.Int32))
.sort("DEPARTMENT")
)
sample = pl.DataFrame({
"SR_NUMBER": ["SR1", "SR2", "SR3", "SR4"],
"DEPARTMENT": ["Sanitation", "Water", "Sanitation", "Aviation"],
"STATUS": ["Completed", "Open", "Completed", "Completed"],
})
shape: (4, 3)
| SR_NUMBER | DEPARTMENT | STATUS |
| --- | --- | --- |
| str | str | str |
|-----------|------------|-----------|
| SR1 | Sanitation | Completed |
| SR2 | Water | Open |
| SR3 | Sanitation | Completed |
| SR4 | Aviation | Completed |
expected = pl.DataFrame(
{
"DEPARTMENT": ["Aviation", "Sanitation"],
"len": [1, 2],
}
)
shape: (2, 2)
| DEPARTMENT | len |
| --- | --- |
| str | i64 |
|------------|-----|
| Aviation | 1 |
| Sanitation | 2 |
actual = completed_by_department(
actual = completed_by_department(
sample.lazy()
).collect()
actual = completed_by_department(
sample.lazy()
).collect()
shape: (3, 2)
| DEPARTMENT | len |
| --- | --- |
| str | i32 |
|------------|-----|
| Aviation | 1 |
| Sanitation | 2 |
| Water | 1 |
actual.equals(expected)
False
from polars.testing import (
assert_frame_equal,
assert_schema_equal,
)
assert_schema_equal(
)
assert_schema_equal(
actual.schema,
expected.schema,
)
AssertionError: Schemas are different (dtypes do not match)
[left]: [String, Int32]
[right]: [String, Int64]
expected = pl.DataFrame(
{
"DEPARTMENT": ["Aviation", "Sanitation"], "len": [1, 2],
},
schema={"DEPARTMENT": pl.String, "len": pl.Int32},
)
shape: (2, 2)
| DEPARTMENT | len |
| --- | --- |
| str | i32 |
|------------|-----|
| Aviation | 1 |
| Sanitation | 2 |
assert_schema_equal(
actual.schema,
expected.schema,
)
print("Schema test passed!")
Schema test passed!
assert_frame_equal(
)
assert_frame_equal(
actual,
expected,
)
AssertionError: DataFrames are different height (row count) mismatch
[left]: 3
[right]: 2
shape: (3, 2)
| DEPARTMENT | len |
| --- | --- |
| str | i32 |
|------------|-----|
| Aviation | 1 |
| Sanitation | 2 |
| Water | 1 |
shape: (2, 2)
| DEPARTMENT | len |
| --- | --- |
| str | i32 |
|------------|-----|
| Aviation | 1 |
| Sanitation | 2 |
def completed_by_department(requests: pl.LazyFrame) -> pl.LazyFrame:
return (
requests
.filter(pl.col("STATUS") == "Completed")
.group_by("DEPARTMENT")
.len()
.with_columns(pl.col("len").cast(pl.Int32))
.sort("DEPARTMENT")
)
actual = completed_by_department(
sample.lazy()
).collect()
assert_schema_equal(actual.schema, expected.schema)
assert_frame_equal(actual, expected)
print("All tests passed!")
All tests passed!
Scaling and Optimizing Data Pipelines with Polars