Sinking large queries to disk

Scaling and Optimizing Data Pipelines with Polars

Liam Brannigan

Data Scientist & Polars Contributor

A large output problem

Diagram showing a lazy query producing a large row-preserving extract that should be written directly to disk.

Scaling and Optimizing Data Pipelines with Polars

A cleaned request dataset

requests = pl.scan_parquet("311_Service_Requests.parquet")

clean_requests = (
    requests





)
Scaling and Optimizing Data Pipelines with Polars

A cleaned request dataset

requests = pl.scan_parquet("311_Service_Requests.parquet")

clean_requests = (
    requests
    .with_columns(
        pl.col("CREATED_DATE").dt.date(),
        pl.col("WARD").cast(pl.Int16),
        (pl.col("STATUS") == "Completed").alias("IS_COMPLETED"),
    )
)
Scaling and Optimizing Data Pipelines with Polars

Collect then write

clean_requests.collect(
    engine="streaming"
).write_parquet(
    "311_requests_clean.parquet"
)
Scaling and Optimizing Data Pipelines with Polars

Sinking to Parquet

clean_requests.sink_parquet(
    "311_requests_clean.parquet",


)
Scaling and Optimizing Data Pipelines with Polars

Controlling the sink

clean_requests.sink_parquet(
    "311_requests_clean.parquet",
    compression="zstd",
    row_group_size=100_000,
)
Scaling and Optimizing Data Pipelines with Polars

Other sink methods

df.sink_csv("output.csv")
df.sink_ndjson("output.ndjson")
Scaling and Optimizing Data Pipelines with Polars

Partitioned output

clean_requests.sink_parquet(
    pl.PartitionBy(



    ),

)
Scaling and Optimizing Data Pipelines with Polars

Partitioned output

clean_requests.sink_parquet(
    pl.PartitionBy(
        "311_requests_clean/",
        key="CREATED_DATE",
        max_rows_per_file=1_000_000,
    ),

)
Scaling and Optimizing Data Pipelines with Polars

Partitioned output

clean_requests.sink_parquet(
    pl.PartitionBy(
        "311_requests_clean/",
        key="CREATED_DATE",
        max_rows_per_file=1_000_000,
    ),
    mkdir=True,
)
Scaling and Optimizing Data Pipelines with Polars

Partitioned output

311_requests_clean/
  CREATED_DATE=2025-12-31/
    00000000.parquet
  CREATED_DATE=2026-01-01/
    00000000.parquet
Scaling and Optimizing Data Pipelines with Polars

Multi-column partitions

clean_requests.sink_parquet(
    pl.PartitionBy(
        "311_requests_clean/",
        key=["STATUS", "CREATED_DATE"],
    ),
    mkdir=True,
)
Scaling and Optimizing Data Pipelines with Polars

Building lazy sinks

internal_sink = clean_requests.sink_parquet(
    "311_requests_internal.parquet",
    lazy=True,
)





Scaling and Optimizing Data Pipelines with Polars

Building lazy sinks

internal_sink = clean_requests.sink_parquet(
    "311_requests_internal.parquet",
    lazy=True,
)

public_sink = clean_requests.drop("REQUEST_ID","REPORTER").sink_parquet(
    "311_requests_public.parquet",
    lazy=True,
)
Scaling and Optimizing Data Pipelines with Polars

Multiplexing sinks

pl.collect_all(
    [internal_sink, public_sink],
)
Scaling and Optimizing Data Pipelines with Polars

Written outputs

311_requests_internal.parquet
311_requests_public.parquet
Scaling and Optimizing Data Pipelines with Polars

Custom batch sinks

def send_batch(batch: pl.DataFrame) -> None:
    print(batch.height)
    batch_json = batch.write_json()
    # send batch_json to an API
Scaling and Optimizing Data Pipelines with Polars

Custom batch sinks

clean_requests.sink_batches(
    send_batch,
    chunk_size=50_000,
)
50000
50000
...
29268
Scaling and Optimizing Data Pipelines with Polars

Sink workflow

  • Use sink_parquet() to write large query results straight to disk
  • Use PartitionBy() to split output into a partitioned dataset
  • Use lazy sinks with collect_all() to plan and execute several outputs together
  • Use sink_batches() when each batch should go to a custom function

Isometric view of a data sorting system distributing data into organized partitioned drawers

Scaling and Optimizing Data Pipelines with Polars

Let's practice!

Scaling and Optimizing Data Pipelines with Polars

Preparing Video For Download...