Scaling and Optimizing Data Pipelines with Polars
Liam Brannigan
Data Scientist & Polars Contributor
311_requests_daily_csv/
311_requests_2025-12-31.csv
311_requests_2026-01-01.csv
311_requests_2026-01-02.csv
311_requests_2026-01-03.csv
311_requests_2026-01-04.csv
pl.scan_csv(
)
pl.scan_csv(
"311_requests_daily_csv/311_requests_2026*.csv",
try_parse_dates=True,
)
pl.scan_csv(
"311_requests_daily_csv/311_requests_2026*.csv",
try_parse_dates=True,
).select("REPORTED_AT", "STATUS", "WARD").collect()
shape: (4, 3)
| REPORTED_AT | STATUS | WARD |
| --- | --- | --- |
| datetime[us] | str | i64 |
|---------------------|-------------|------|
| 2026-01-01 07:10:00 | In Progress | 46 |
| 2026-01-01 08:25:00 | Open | 46 |
| 2026-01-04 07:45:00 | Open | 12 |
pl.read_csv(
"311_Service_Requests.csv",
try_parse_dates=True,
).write_parquet(
)
pl.read_csv(
"311_Service_Requests.csv",
try_parse_dates=True,
).write_parquet(
"311_requests_parquet/",
)
pl.read_csv(
"311_Service_Requests.csv",
try_parse_dates=True,
).write_parquet(
"311_requests_parquet/",
partition_by=["CREATED_DATE"],
)
311_requests_parquet/
CREATED_DATE=2025-12-31/
data.parquet
311_requests_parquet/
CREATED_DATE=2025-12-31/
data.parquet
CREATED_DATE=2026-01-01/
data.parquet
requests = pl.scan_parquet(
"311_requests_parquet/",
hive_partitioning=True,
)
$$
requests.filter(
pl.col("CREATED_DATE") >= date(2026, 1, 1)
)
requests.filter(
pl.col("CREATED_DATE") >= date(2026, 1, 1)
).group_by("DEPARTMENT").len().collect()
shape: (2, 2)
| DEPARTMENT | len |
| --- | --- |
| str | u32 |
|----------------|-----|
| Transportation | 208 |
| Sanitation | 175 |
requests.filter(
pl.col("YEAR") >= 2026
)
$$
# 2026-01-01 daily file
REPORTED_AT,STATUS,WARD,BLOCK_CODE,REPORT_TEXT
2026-01-01 07:10:00,In Progress,46,0021,...
# 2026-01-02 daily file
REPORTED_AT,STATUS,WARD,REPORT_TEXT
2026-01-02 09:05:00,Open,29,...
requests = pl.scan_csv(
"311_requests_daily_csv/311_requests_2026*.csv",
try_parse_dates=True,
missing_columns="insert",
)
requests.select(
"REPORTED_AT", "STATUS", "WARD", "BLOCK_CODE"
).collect()
shape: (8, 4)
| REPORTED_AT | STATUS | WARD | BLOCK_CODE |
| --- | --- | --- | --- |
| datetime[us] | str | i64 | i64 |
|---------------------|-------------|------|------------|
| 2026-01-01 07:10:00 | In Progress | 46 | 21 |
| 2026-01-01 08:25:00 | Open | 46 | 21 |
| 2026-01-02 09:05:00 | Open | 29 | null |
| 2026-01-02 10:42:00 | Completed | 29 | null |
# 2025-12-31 daily file
WARD
35
# 2026-01-03 daily file
WARD
"44"
ward_int = pl.scan_csv(
"311_requests_daily_csv/311_requests_2025-12-31.csv",
try_parse_dates=True,
)
ward_str = pl.scan_csv(
"311_requests_daily_csv/311_requests_2026-01-03.csv",
try_parse_dates=True,
schema_overrides={"WARD": pl.String},
)
combined = pl.concat(
[ward_int, ward_str],
how="vertical_relaxed",
)
$$
combined.collect()
shape: (4, 5)
| REPORTED_AT | STATUS | WARD | BLOCK_CODE | REPORT_TEXT |
| --- | --- | --- | --- | --- |
| datetime[us] | str | str | i64 | str |
|---------------------|-----------|------|------------|----------------------------------|
| 2025-12-31 16:20:00 | Open | 35 | 17 | Massive pothole getting worse |
| 2025-12-31 18:05:00 | Open | 35 | 17 | Cyclists forced into traffic |
| 2026-01-03 07:35:00 | Open | 44 | 12 | How is this still not repaired |
| 2026-01-03 08:15:00 | Completed | 44 | 12 | Every bus hit shakes the vehicle |
Scaling and Optimizing Data Pipelines with Polars