Working with the streaming and GPU engines

Scaling and Optimizing Data Pipelines with Polars

Liam Brannigan

Data Scientist & Polars Contributor

Query execution engines

Flow diagram comparing a query going from the API, through the query optimizer to the in-memory engine.

Scaling and Optimizing Data Pipelines with Polars

Query execution engines

Flow diagram comparing a query going from the API, through the query optimizer to the in-memory engine.

Scaling and Optimizing Data Pipelines with Polars

Query execution engines

Flow diagram comparing a query going from the API, through the query optimizer to the in-memory and streaming engines.

Scaling and Optimizing Data Pipelines with Polars

Query execution engines

Flow diagram comparing a query going from the API, through the query optimizer to the in-memory, streaming and GPU engines.

Scaling and Optimizing Data Pipelines with Polars

A lazy request summary

requests = pl.scan_parquet("311_Service_Requests.parquet")

query = (
    requests
    .filter(pl.col("STATUS") == "Completed")
    .group_by("DEPARTMENT")
    .len()
    .sort("len", descending=True)
)
Scaling and Optimizing Data Pipelines with Polars

Using the default engine

query.collect()
shape: (5, 2)
| DEPARTMENT        | len     |
| ---               | ---     |
| str               | u32     |
|-------------------|---------|
| 311 City Services | 4859161 |
| Sanitation        | 3406631 |
| Aviation          | 2337842 |
| Transportation    | 1468240 |
| Water             | 416535  |
Scaling and Optimizing Data Pipelines with Polars

Targeting the streaming engine

query.collect(
    engine="streaming"
)
shape: (5, 2)
| DEPARTMENT        | len     |
| ---               | ---     |
| str               | u32     |
|-------------------|---------|
| 311 City Services | 4859161 |
| Sanitation        | 3406631 |
| Aviation          | 2337842 |
| Transportation    | 1468240 |
| Water             | 416535  |
Scaling and Optimizing Data Pipelines with Polars

Targeting the streaming engine

Image of large table being streamed.

  • Unsupported streaming steps automatically fall back to the default engine
Scaling and Optimizing Data Pipelines with Polars

Targeting the streaming engine

import polars as pl
pl.Config.set_engine_affinity("streaming")
Scaling and Optimizing Data Pipelines with Polars

Targeting the GPU engine

query.collect(
    engine="gpu"
)

$$

  • Requires NVIDIA GPU hardware
Scaling and Optimizing Data Pipelines with Polars

Choosing an engine

$$

  • Default engine first; switch only if memory is a concern

$$

  • Streaming for large, out-of-memory queries

$$

  • GPU for speed, but only if NVIDIA hardware is available

Choosing an engine

Scaling and Optimizing Data Pipelines with Polars

Working in batches

completed_requests = (
    requests
    .filter(pl.col("STATUS") == "Completed")
    .select("SR_NUMBER", "TYPE", "DEPARTMENT", "CREATED_DATE")
)
Scaling and Optimizing Data Pipelines with Polars

Processing each batch

for batch in completed_requests.collect_batches(

):


Scaling and Optimizing Data Pipelines with Polars

Processing each batch

for batch in completed_requests.collect_batches(
    chunk_size=50_000,
):


Scaling and Optimizing Data Pipelines with Polars

Processing each batch

for batch in completed_requests.collect_batches(
    chunk_size=50_000,
):
    print(batch.shape)
    batch_json = batch.write_json()
(50000, 4)
(50000, 4)
...
(50000, 4)
(12409, 4)
Scaling and Optimizing Data Pipelines with Polars

Let's practice!

Scaling and Optimizing Data Pipelines with Polars

Preparing Video For Download...