Working with Multifile Datasets

Scaling and Optimizing Data Pipelines with Polars

Liam Brannigan

Data Scientist & Polars Contributor

Daily 311 files

311_requests_daily_csv/
  311_requests_2025-12-31.csv
  311_requests_2026-01-01.csv
  311_requests_2026-01-02.csv
  311_requests_2026-01-03.csv
  311_requests_2026-01-04.csv
Scaling and Optimizing Data Pipelines with Polars

Scanning many CSV files

pl.scan_csv(


)
Scaling and Optimizing Data Pipelines with Polars

Scanning many CSV files

pl.scan_csv(
    "311_requests_daily_csv/311_requests_2026*.csv",
    try_parse_dates=True,
)
Scaling and Optimizing Data Pipelines with Polars

Scanning many CSV files

pl.scan_csv(
    "311_requests_daily_csv/311_requests_2026*.csv",
    try_parse_dates=True,
).select("REPORTED_AT", "STATUS", "WARD").collect()
shape: (4, 3)
| REPORTED_AT         | STATUS      | WARD |
| ---                 | ---         | ---  |
| datetime[us]        | str         | i64  |
|---------------------|-------------|------|
| 2026-01-01 07:10:00 | In Progress | 46   |
| 2026-01-01 08:25:00 | Open        | 46   |
| 2026-01-04 07:45:00 | Open        | 12   |
Scaling and Optimizing Data Pipelines with Polars

Writing hive partitions

pl.read_csv(
    "311_Service_Requests.csv",
    try_parse_dates=True,
).write_parquet(


)
Scaling and Optimizing Data Pipelines with Polars

Writing hive partitions

pl.read_csv(
    "311_Service_Requests.csv",
    try_parse_dates=True,
).write_parquet(
    "311_requests_parquet/",

)
Scaling and Optimizing Data Pipelines with Polars

Writing hive partitions

pl.read_csv(
    "311_Service_Requests.csv",
    try_parse_dates=True,
).write_parquet(
    "311_requests_parquet/",
    partition_by=["CREATED_DATE"],
)
Scaling and Optimizing Data Pipelines with Polars

Partitioned datasets

311_requests_parquet/
  CREATED_DATE=2025-12-31/
    data.parquet


Scaling and Optimizing Data Pipelines with Polars

Partitioned datasets

311_requests_parquet/
  CREATED_DATE=2025-12-31/
    data.parquet
  CREATED_DATE=2026-01-01/
    data.parquet
Scaling and Optimizing Data Pipelines with Polars

Scanning a hive-partitioned dataset

requests = pl.scan_parquet(
    "311_requests_parquet/",
    hive_partitioning=True,
)

$$

  • Partition values → regular columns
Scaling and Optimizing Data Pipelines with Polars

A partition-aware query

requests.filter(
    pl.col("CREATED_DATE") >= date(2026, 1, 1)
)
Scaling and Optimizing Data Pipelines with Polars

A partition-aware query

requests.filter(
    pl.col("CREATED_DATE") >= date(2026, 1, 1)
).group_by("DEPARTMENT").len().collect()
shape: (2, 2)
| DEPARTMENT     | len |
| ---            | --- |
| str            | u32 |
|----------------|-----|
| Transportation | 208 |
| Sanitation     | 175 |
Scaling and Optimizing Data Pipelines with Polars

A partition-unaware query!

requests.filter(
    pl.col("YEAR") >= 2026
)

$$

  • Polars filters on the partition column directly 💡
  • Filtering on a derived column does not trigger the optimization 💡
Scaling and Optimizing Data Pipelines with Polars

A schema drift problem

# 2026-01-01 daily file
REPORTED_AT,STATUS,WARD,BLOCK_CODE,REPORT_TEXT
2026-01-01 07:10:00,In Progress,46,0021,...
# 2026-01-02 daily file
REPORTED_AT,STATUS,WARD,REPORT_TEXT
2026-01-02 09:05:00,Open,29,...
Scaling and Optimizing Data Pipelines with Polars

Inserting missing columns

requests = pl.scan_csv(
    "311_requests_daily_csv/311_requests_2026*.csv",
    try_parse_dates=True,
    missing_columns="insert",
)
Scaling and Optimizing Data Pipelines with Polars

Checking the combined schema

requests.select(
    "REPORTED_AT", "STATUS", "WARD", "BLOCK_CODE"
).collect()
shape: (8, 4)
| REPORTED_AT         | STATUS      | WARD | BLOCK_CODE |
| ---                 | ---         | ---  | ---        |
| datetime[us]        | str         | i64  | i64        |
|---------------------|-------------|------|------------|
| 2026-01-01 07:10:00 | In Progress | 46   | 21         |
| 2026-01-01 08:25:00 | Open        | 46   | 21         |
| 2026-01-02 09:05:00 | Open        | 29   | null       |
| 2026-01-02 10:42:00 | Completed   | 29   | null       |
Scaling and Optimizing Data Pipelines with Polars

A dtype mismatch problem

# 2025-12-31 daily file
WARD
35

# 2026-01-03 daily file
WARD
"44"
Scaling and Optimizing Data Pipelines with Polars

Two scans with different dtypes

ward_int = pl.scan_csv(
    "311_requests_daily_csv/311_requests_2025-12-31.csv",
    try_parse_dates=True,
)

ward_str = pl.scan_csv(
    "311_requests_daily_csv/311_requests_2026-01-03.csv",
    try_parse_dates=True,
    schema_overrides={"WARD": pl.String},
)
Scaling and Optimizing Data Pipelines with Polars

Combining with vertical_relaxed

combined = pl.concat(
    [ward_int, ward_str],
    how="vertical_relaxed",
)

$$

  • Supertype: a dtype that can hold values from both inputs
  • Here, String can hold values from both Int64 and String
Scaling and Optimizing Data Pipelines with Polars

Checking the relaxed concat

combined.collect()
shape: (4, 5)
| REPORTED_AT         | STATUS    | WARD | BLOCK_CODE | REPORT_TEXT                      |
| ---                 | ---       | ---  | ---        | ---                              |
| datetime[us]        | str       | str  | i64        | str                              |
|---------------------|-----------|------|------------|----------------------------------|
| 2025-12-31 16:20:00 | Open      | 35   | 17         | Massive pothole getting worse    |
| 2025-12-31 18:05:00 | Open      | 35   | 17         | Cyclists forced into traffic     |
| 2026-01-03 07:35:00 | Open      | 44   | 12         | How is this still not repaired   |
| 2026-01-03 08:15:00 | Completed | 44   | 12         | Every bus hit shakes the vehicle |
Scaling and Optimizing Data Pipelines with Polars

Let's practice!

Scaling and Optimizing Data Pipelines with Polars

Preparing Video For Download...