Scaling and Optimizing Data Pipelines with Polars
Liam Brannigan
Data Scientist & Polars Contributor

requests = pl.scan_parquet("311_Service_Requests.parquet")
clean_requests = (
requests
)
requests = pl.scan_parquet("311_Service_Requests.parquet")
clean_requests = (
requests
.with_columns(
pl.col("CREATED_DATE").dt.date(),
pl.col("WARD").cast(pl.Int16),
(pl.col("STATUS") == "Completed").alias("IS_COMPLETED"),
)
)
clean_requests.collect(
engine="streaming"
).write_parquet(
"311_requests_clean.parquet"
)
clean_requests.sink_parquet(
"311_requests_clean.parquet",
)
clean_requests.sink_parquet(
"311_requests_clean.parquet",
compression="zstd",
row_group_size=100_000,
)
df.sink_csv("output.csv")
df.sink_ndjson("output.ndjson")
clean_requests.sink_parquet(
pl.PartitionBy(
),
)
clean_requests.sink_parquet(
pl.PartitionBy(
"311_requests_clean/",
key="CREATED_DATE",
max_rows_per_file=1_000_000,
),
)
clean_requests.sink_parquet(
pl.PartitionBy(
"311_requests_clean/",
key="CREATED_DATE",
max_rows_per_file=1_000_000,
),
mkdir=True,
)
311_requests_clean/
CREATED_DATE=2025-12-31/
00000000.parquet
CREATED_DATE=2026-01-01/
00000000.parquet
clean_requests.sink_parquet(
pl.PartitionBy(
"311_requests_clean/",
key=["STATUS", "CREATED_DATE"],
),
mkdir=True,
)
internal_sink = clean_requests.sink_parquet(
"311_requests_internal.parquet",
lazy=True,
)
internal_sink = clean_requests.sink_parquet(
"311_requests_internal.parquet",
lazy=True,
)
public_sink = clean_requests.drop("REQUEST_ID","REPORTER").sink_parquet(
"311_requests_public.parquet",
lazy=True,
)
pl.collect_all(
[internal_sink, public_sink],
)
311_requests_internal.parquet
311_requests_public.parquet
def send_batch(batch: pl.DataFrame) -> None:
print(batch.height)
batch_json = batch.write_json()
# send batch_json to an API
clean_requests.sink_batches(
send_batch,
chunk_size=50_000,
)
50000
50000
...
29268
sink_parquet() to write large query results straight to diskPartitionBy() to split output into a partitioned datasetcollect_all() to plan and execute several outputs togethersink_batches() when each batch should go to a custom function
Scaling and Optimizing Data Pipelines with Polars