Testing Polars Pipelines

Scaling and Optimizing Data Pipelines with Polars

Liam Brannigan

Data Scientist & Polars Contributor

Why test pipelines?

Diagram showing two DataFrames that are different in one cell.

Scaling and Optimizing Data Pipelines with Polars

Why test pipelines?

Diagram showing a Polars transformation tested against an expected DataFrame.

Scaling and Optimizing Data Pipelines with Polars

Completed requests by department

def completed_by_department(requests: pl.LazyFrame) -> pl.LazyFrame:
    return






Scaling and Optimizing Data Pipelines with Polars

Completed requests by department

def completed_by_department(requests: pl.LazyFrame) -> pl.LazyFrame:
    return (
        requests




    )
Scaling and Optimizing Data Pipelines with Polars

Completed requests by department

def completed_by_department(requests: pl.LazyFrame) -> pl.LazyFrame:
    return (
        requests
        .group_by("DEPARTMENT")
        .len()


    )
Scaling and Optimizing Data Pipelines with Polars

Completed requests by department

def completed_by_department(requests: pl.LazyFrame) -> pl.LazyFrame:
    return (
        requests
        .group_by("DEPARTMENT")
        .len()
        .with_columns(pl.col("len").cast(pl.Int32))
        .sort("DEPARTMENT")
    )
Scaling and Optimizing Data Pipelines with Polars

Test input

sample = pl.DataFrame({
    "SR_NUMBER": ["SR1", "SR2", "SR3", "SR4"],
    "DEPARTMENT": ["Sanitation", "Water", "Sanitation", "Aviation"],
    "STATUS": ["Completed", "Open", "Completed", "Completed"],
})
shape: (4, 3)
| SR_NUMBER | DEPARTMENT | STATUS    |
| ---       | ---        | ---       |
| str       | str        | str       |
|-----------|------------|-----------|
| SR1       | Sanitation | Completed |
| SR2       | Water      | Open      |
| SR3       | Sanitation | Completed |
| SR4       | Aviation   | Completed |
Scaling and Optimizing Data Pipelines with Polars

Expected output

expected = pl.DataFrame(
    {
        "DEPARTMENT": ["Aviation", "Sanitation"],
        "len": [1, 2],
    }
)
shape: (2, 2)
| DEPARTMENT | len |
| ---        | --- |
| str        | i64 |
|------------|-----|
| Aviation   | 1   |
| Sanitation | 2   |
Scaling and Optimizing Data Pipelines with Polars

Getting the actual result

actual = completed_by_department(


Scaling and Optimizing Data Pipelines with Polars

Getting the actual result

actual = completed_by_department(
    sample.lazy()
).collect()
Scaling and Optimizing Data Pipelines with Polars

Getting the actual result

actual = completed_by_department(
    sample.lazy()
).collect()
shape: (3, 2)
| DEPARTMENT | len |
| ---        | --- |
| str        | i32 |
|------------|-----|
| Aviation   | 1   |
| Sanitation | 2   |
| Water      | 1   |
Scaling and Optimizing Data Pipelines with Polars

Comparing with equals

actual.equals(expected)
False
Scaling and Optimizing Data Pipelines with Polars

Importing Polars testing

from polars.testing import (
    assert_frame_equal,
    assert_schema_equal,
)
Scaling and Optimizing Data Pipelines with Polars

Testing the schema

assert_schema_equal(


)
Scaling and Optimizing Data Pipelines with Polars

Testing the schema

assert_schema_equal(
    actual.schema,
    expected.schema,
)
AssertionError: Schemas are different (dtypes do not match)
[left]: [String, Int32]
[right]: [String, Int64]
Scaling and Optimizing Data Pipelines with Polars

Fixing the expected schema

expected = pl.DataFrame(
    {
        "DEPARTMENT": ["Aviation", "Sanitation"], "len": [1, 2],
    },
    schema={"DEPARTMENT": pl.String, "len": pl.Int32},
)
shape: (2, 2)
| DEPARTMENT | len |
| ---        | --- |
| str        | i32 |
|------------|-----|
| Aviation   | 1   |
| Sanitation | 2   |
Scaling and Optimizing Data Pipelines with Polars

Testing the schema

assert_schema_equal(
    actual.schema,
    expected.schema,
)
print("Schema test passed!")
Schema test passed!
Scaling and Optimizing Data Pipelines with Polars

Testing the frame

assert_frame_equal(


)
Scaling and Optimizing Data Pipelines with Polars

Testing the frame

assert_frame_equal(
    actual,
    expected,
)
AssertionError: DataFrames are different height (row count) mismatch
[left]: 3
[right]: 2
Scaling and Optimizing Data Pipelines with Polars

Comparing actual and expected

shape: (3, 2)
| DEPARTMENT | len |
| ---        | --- |
| str        | i32 |
|------------|-----|
| Aviation   | 1   |
| Sanitation | 2   |
| Water      | 1   |
shape: (2, 2)
| DEPARTMENT | len |
| ---        | --- |
| str        | i32 |
|------------|-----|
| Aviation   | 1   |
| Sanitation | 2   |
Scaling and Optimizing Data Pipelines with Polars

Fixing the query

def completed_by_department(requests: pl.LazyFrame) -> pl.LazyFrame:
    return (
        requests
        .filter(pl.col("STATUS") == "Completed")
        .group_by("DEPARTMENT")
        .len()
        .with_columns(pl.col("len").cast(pl.Int32))
        .sort("DEPARTMENT")
    )
actual = completed_by_department(
    sample.lazy()
).collect()
Scaling and Optimizing Data Pipelines with Polars

Final assertions

assert_schema_equal(actual.schema, expected.schema)
assert_frame_equal(actual, expected)
print("All tests passed!")
All tests passed!
Scaling and Optimizing Data Pipelines with Polars

Let's practice!

Scaling and Optimizing Data Pipelines with Polars

Preparing Video For Download...