Optimization and query plans

Scaling and Optimizing Data Pipelines with Polars

Liam Brannigan

Data Scientist & Polars Contributor

Introducing query optimization

Image of table with rows and columns

Scaling and Optimizing Data Pipelines with Polars

Introducing query optimization

Image of table with rows and columns

Scaling and Optimizing Data Pipelines with Polars

Introducing query optimization

Image of running operations in serial

Scaling and Optimizing Data Pipelines with Polars

Introducing query optimization

Image of running operations in parallel

Scaling and Optimizing Data Pipelines with Polars

Introducing query optimization

Pipeline with duplicate operations carried out twice

Scaling and Optimizing Data Pipelines with Polars

Introducing query optimization

Pipeline with duplicate operations carried out twice

Scaling and Optimizing Data Pipelines with Polars

Most request types by department

department_request_types = (
    requests





)
Scaling and Optimizing Data Pipelines with Polars

Most request types by department

department_request_types = (
    requests
    .filter(pl.col("STATUS") == "Completed")




)
Scaling and Optimizing Data Pipelines with Polars

Most request types by department

department_request_types = (
    requests
    .filter(pl.col("STATUS") == "Completed")
    .group_by("DEPARTMENT")



)
Scaling and Optimizing Data Pipelines with Polars

Most request types by department

department_request_types = (
    requests    
    .filter(pl.col("STATUS") == "Completed")
    .group_by("DEPARTMENT")
    .agg(pl.col("TYPE").n_unique().alias("n_request_types"))


)
Scaling and Optimizing Data Pipelines with Polars

Most request types by department

department_request_types = (
    requests
    .filter(pl.col("STATUS") == "Completed")
    .group_by("DEPARTMENT")
    .agg(pl.col("TYPE").n_unique().alias("n_request_types"))
    .sort("n_request_types", descending=True)
    .head(5)
)
  • Naive plan
Scaling and Optimizing Data Pipelines with Polars

Unoptimized plan

print(department_request_types)
Scaling and Optimizing Data Pipelines with Polars

Unoptimized plan

print(department_request_types)







        Csv SCAN [311_Service_Requests.csv]
        PROJECT */39 COLUMNS
Scaling and Optimizing Data Pipelines with Polars

Unoptimized plan

print(department_request_types)





      FILTER [(col("STATUS")) == ("Completed")]
      FROM
        Csv SCAN [311_Service_Requests.csv]
        PROJECT */39 COLUMNS
Scaling and Optimizing Data Pipelines with Polars

Unoptimized plan

print(department_request_types)


    AGGREGATE[maintain_order: false]
      [col("TYPE").n_unique().alias("n_request_types")] BY [col("DEPARTMENT")]
      FROM
      FILTER [(col("STATUS")) == ("Completed")]
      FROM
        Csv SCAN [311_Service_Requests.csv]
        PROJECT */39 COLUMNS
Scaling and Optimizing Data Pipelines with Polars

Unoptimized plan

print(department_request_types)
SLICE[offset: 0, len: 5]
  SORT BY [descending: [true]] [col("n_request_types")]
    AGGREGATE[maintain_order: false]
      [col("TYPE").n_unique().alias("n_request_types")] BY [col("DEPARTMENT")]
      FROM
      FILTER [(col("STATUS")) == ("Completed")]
      FROM
        Csv SCAN [311_Service_Requests.csv]
        PROJECT */39 COLUMNS
Scaling and Optimizing Data Pipelines with Polars

Optimized plan

print(department_request_types.explain())
Scaling and Optimizing Data Pipelines with Polars

Optimized plan







        Csv SCAN [311_Service_Requests.csv]
        PROJECT 3/39 COLUMNS
        SELECTION: [(col("STATUS")) == ("Completed")]
Scaling and Optimizing Data Pipelines with Polars

Optimized plan






      simple pi 2/2 ["TYPE", "DEPARTMENT"]
        Csv SCAN [311_Service_Requests.csv]
        PROJECT 3/39 COLUMNS
        SELECTION: [(col("STATUS")) == ("Completed")]
Scaling and Optimizing Data Pipelines with Polars

Optimized plan



    AGGREGATE[maintain_order: false]
      [col("TYPE").n_unique().alias("n_request_types")] BY [col("DEPARTMENT")]
      FROM
      simple pi 2/2 ["TYPE", "DEPARTMENT"]
        Csv SCAN [311_Service_Requests.csv]
        PROJECT 3/39 COLUMNS
        SELECTION: [(col("STATUS")) == ("Completed")]
Scaling and Optimizing Data Pipelines with Polars

Optimized plan

SORT BY [slice: (0, 10, ...), descending: [true]] [col("n_request_types")]
  FILTER col("n_request_types").dynamic_predicate() FROM
    AGGREGATE[maintain_order: false]
      [col("TYPE").n_unique().alias("n_request_types")] BY [col("DEPARTMENT")]
      FROM
      simple pi 2/2 ["TYPE", "DEPARTMENT"]
        Csv SCAN [311_Service_Requests.csv]
        PROJECT 3/39 COLUMNS
        SELECTION: [(col("STATUS")) == ("Completed")]
  • Find the top 10 rows in n_request_types
  • Filter out the rest
  • Sort the 10 rows
Scaling and Optimizing Data Pipelines with Polars

Optimized plan as a graph

print(department_request_types.show_graph())

Graph view of the optimized plan showing CSV scan with optimizations.

Scaling and Optimizing Data Pipelines with Polars

Further optimizations

(
    requests








)
Scaling and Optimizing Data Pipelines with Polars

Further optimizations

(
    requests
    .filter(pl.col("STATUS") == "Completed")
    .filter(pl.col("DEPARTMENT") == "Sanitation")






)
Scaling and Optimizing Data Pipelines with Polars

Further optimizations

(
    requests
    .filter(pl.col("STATUS") == "Completed")
    .filter(pl.col("DEPARTMENT") == "Sanitation")
    .with_columns(
        pl.col("TYPE").str.to_lowercase().alias("type_lower")
    )
    .with_columns(
        pl.col("STATUS").str.to_lowercase().alias("status_lower")
    )
)
Scaling and Optimizing Data Pipelines with Polars

Further optimizations



  Csv SCAN [311_Service_Requests.csv]
  PROJECT */39 COLUMNS
  SELECTION: [([(col("DEPARTMENT")) == ("Sanitation")]) & ([(col("STATUS")) == ("Completed")])]
  • Combined AND predicate
Scaling and Optimizing Data Pipelines with Polars

Further optimizations

 WITH_COLUMNS:
 [col("TYPE").str.to_lowercase().alias("type_lower"), col("STATUS").str.to_lowercase().alias("status_lower")]
  Csv SCAN [311_Service_Requests.csv]
  PROJECT */39 COLUMNS
  SELECTION: [([(col("DEPARTMENT")) == ("Sanitation")]) & ([(col("STATUS")) == ("Completed")])]
  • Combined AND predicate
  • Clustered WITH_COLUMNS expressions
Scaling and Optimizing Data Pipelines with Polars

Let's practice!

Scaling and Optimizing Data Pipelines with Polars

Preparing Video For Download...