Scaling and Optimizing Data Pipelines with Polars
Liam Brannigan
Data Scientist & Polars Contributor






department_request_types = (
requests
)
department_request_types = (
requests
.filter(pl.col("STATUS") == "Completed")
)
department_request_types = (
requests
.filter(pl.col("STATUS") == "Completed")
.group_by("DEPARTMENT")
)
department_request_types = (
requests
.filter(pl.col("STATUS") == "Completed")
.group_by("DEPARTMENT")
.agg(pl.col("TYPE").n_unique().alias("n_request_types"))
)
department_request_types = (
requests
.filter(pl.col("STATUS") == "Completed")
.group_by("DEPARTMENT")
.agg(pl.col("TYPE").n_unique().alias("n_request_types"))
.sort("n_request_types", descending=True)
.head(5)
)
print(department_request_types)
print(department_request_types)
Csv SCAN [311_Service_Requests.csv]
PROJECT */39 COLUMNS
print(department_request_types)
FILTER [(col("STATUS")) == ("Completed")]
FROM
Csv SCAN [311_Service_Requests.csv]
PROJECT */39 COLUMNS
print(department_request_types)
AGGREGATE[maintain_order: false]
[col("TYPE").n_unique().alias("n_request_types")] BY [col("DEPARTMENT")]
FROM
FILTER [(col("STATUS")) == ("Completed")]
FROM
Csv SCAN [311_Service_Requests.csv]
PROJECT */39 COLUMNS
print(department_request_types)
SLICE[offset: 0, len: 5]
SORT BY [descending: [true]] [col("n_request_types")]
AGGREGATE[maintain_order: false]
[col("TYPE").n_unique().alias("n_request_types")] BY [col("DEPARTMENT")]
FROM
FILTER [(col("STATUS")) == ("Completed")]
FROM
Csv SCAN [311_Service_Requests.csv]
PROJECT */39 COLUMNS
print(department_request_types.explain())
Csv SCAN [311_Service_Requests.csv]
PROJECT 3/39 COLUMNS
SELECTION: [(col("STATUS")) == ("Completed")]
simple pi 2/2 ["TYPE", "DEPARTMENT"]
Csv SCAN [311_Service_Requests.csv]
PROJECT 3/39 COLUMNS
SELECTION: [(col("STATUS")) == ("Completed")]
AGGREGATE[maintain_order: false]
[col("TYPE").n_unique().alias("n_request_types")] BY [col("DEPARTMENT")]
FROM
simple pi 2/2 ["TYPE", "DEPARTMENT"]
Csv SCAN [311_Service_Requests.csv]
PROJECT 3/39 COLUMNS
SELECTION: [(col("STATUS")) == ("Completed")]
SORT BY [slice: (0, 10, ...), descending: [true]] [col("n_request_types")]
FILTER col("n_request_types").dynamic_predicate() FROM
AGGREGATE[maintain_order: false]
[col("TYPE").n_unique().alias("n_request_types")] BY [col("DEPARTMENT")]
FROM
simple pi 2/2 ["TYPE", "DEPARTMENT"]
Csv SCAN [311_Service_Requests.csv]
PROJECT 3/39 COLUMNS
SELECTION: [(col("STATUS")) == ("Completed")]
n_request_typesprint(department_request_types.show_graph())

(
requests
)
(
requests
.filter(pl.col("STATUS") == "Completed")
.filter(pl.col("DEPARTMENT") == "Sanitation")
)
(
requests
.filter(pl.col("STATUS") == "Completed")
.filter(pl.col("DEPARTMENT") == "Sanitation")
.with_columns(
pl.col("TYPE").str.to_lowercase().alias("type_lower")
)
.with_columns(
pl.col("STATUS").str.to_lowercase().alias("status_lower")
)
)
Csv SCAN [311_Service_Requests.csv]
PROJECT */39 COLUMNS
SELECTION: [([(col("DEPARTMENT")) == ("Sanitation")]) & ([(col("STATUS")) == ("Completed")])]
AND predicate WITH_COLUMNS:
[col("TYPE").str.to_lowercase().alias("type_lower"), col("STATUS").str.to_lowercase().alias("status_lower")]
Csv SCAN [311_Service_Requests.csv]
PROJECT */39 COLUMNS
SELECTION: [([(col("DEPARTMENT")) == ("Sanitation")]) & ([(col("STATUS")) == ("Completed")])]
AND predicateWITH_COLUMNS expressionsScaling and Optimizing Data Pipelines with Polars