Query Plans

Introduction to Spark SQL in Python

Mark Plutowski

Data Scientist

Explain

EXPLAIN SELECT * FROM table1
Introduction to Spark SQL in Python

Load dataframe and register

df = spark.read.load('/temp/df.parquet')
df.registerTempTable('df')
Introduction to Spark SQL in Python

Running an EXPLAIN query

spark.sql('EXPLAIN SELECT * FROM df').first()
Row(plan='== Physical Plan ==\n
*FileScan parquet [word#1928,id#1929L,title#1930,part#1931] 
  Batched: true, 
  Format: Parquet, 
  Location: InMemoryFileIndex[file:/temp/df.parquet], 
  PartitionFilters: [], 
  PushedFilters: [], 
  ReadSchema: struct<word:string,id:bigint,title:string,part:int>')

Introduction to Spark SQL in Python

Interpreting an EXPLAIN query

== Physical Plan ==

  • FileScan parquet [word#1928,id#1929L,title#1930,part#1931]
  • Batched: true,
  • Format: Parquet,
  • Location: InMemoryFileIndex[file:/temp/df.parquet],
  • PartitionFilters: [],
  • PushedFilters: [],
  • ReadSchema: struct<word:string,id:bigint,title:string,part:int>'
Introduction to Spark SQL in Python

df.explain()

df.explain()
== Physical Plan ==
FileScan parquet [word#963,id#964L,title#965,part#966] 
Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/temp/df.parquet], 
PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<word:string,id:bigint,title:string,part:int>
spark.sql("SELECT * FROM df").explain()
== Physical Plan ==
FileScan parquet [word#712,id#713L,title#714,part#715] 
Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/temp/df.parquet], 
PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<word:string,id:bigint,title:string,part:int>

Introduction to Spark SQL in Python

df.explain(), on cached dataframe

df.cache()
df.explain()
== Physical Plan ==
InMemoryTableScan [word#0, id#1L, title#2, part#3]
   +- InMemoryRelation [word#0, id#1L, title#2, part#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      +- FileScan parquet [word#0,id#1L,title#2,part#3] 
         Batched: true, Format: Parquet, Location: 
         InMemoryFileIndex[file:/temp/df.parquet], 
         PartitionFilters: [], PushedFilters: [], 
         ReadSchema: struct<word:string,id:bigint,title:string,part:int>
spark.sql("SELECT * FROM df").explain()
== Physical Plan ==
InMemoryTableScan [word#0, id#1L, title#2, part#3]
   +- InMemoryRelation [word#0, id#1L, title#2, part#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
     +- FileScan parquet [word#0,id#1L,title#2,part#3] 
        Batched: true, Format: Parquet, 
        Location: InMemoryFileIndex[file:/temp/df.parquet], 
        PartitionFilters: [], PushedFilters: [], 
        ReadSchema: struct<word:string,id:bigint,title:string,part:int>

Introduction to Spark SQL in Python

Words sorted by frequency query

SELECT word, COUNT(*) AS count 
FROM df 
GROUP BY word 
ORDER BY count DESC

Equivalent dot notation approach:

df.groupBy('word')
  .count()
  .sort(desc('count'))
  .explain()
Introduction to Spark SQL in Python

Same query using dataframe dot notation

== Physical Plan ==
*Sort [count#1040L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(count#1040L DESC NULLS LAST, 200)
   +- *HashAggregate(keys=[word#963], functions=[count(1)])
      +- Exchange hashpartitioning(word#963, 200)
         +- *HashAggregate(keys=[word#963], functions=[partial_count(1)])
            +- InMemoryTableScan [word#963]
                  +- InMemoryRelation [word#963, id#964L, title#965, part#966],
                     true,10000, StorageLevel(disk, memory, deserialized,
                     1 replicas)
                        +- *FileScan parquet [word#963,id#964L,title#965,part#966]
                           Batched: true, Format: Parquet,
                           Location: InMemoryFileIndex[file:/temp/df.parquet],
                           PartitionFilters: [], PushedFilters: [],
                           ReadSchema: struct<word:string,id:bigint,title:string,part:int>
Introduction to Spark SQL in Python

Reading from bottom up

  • FileScan parquet
  • InMemoryRelation
  • InMemoryTableScan
  • HashAggregate(keys=[word#963], ...)`
  • HashAggregate(keys=[word#963], functions=[count(1)])`
  • Sort [count#1040L DESC NULLS LAST]`
Introduction to Spark SQL in Python

Query plan

== Physical Plan ==
*Sort [count#1160L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(count#1160L DESC NULLS LAST, 200)
   +- *HashAggregate(keys=[word#963], functions=[count(1)])
      +- Exchange hashpartitioning(word#963, 200)
         +- *HashAggregate(keys=[word#963], functions=[partial_count(1)])
            +- *FileScan parquet [word#963] Batched: true, Format: Parquet,
               Location: InMemoryFileIndex[file:/temp/df.parquet], PartitionFilters: [],
               PushedFilters: [], ReadSchema: struct<word:string>

The previous plan had the following lines, which are missing from the plan above:

...
            +- InMemoryTableScan [word#963]
                  +- InMemoryRelation [word#963, id#964L, title#965, part#966], true, 10000,
                     StorageLevel(disk, memory, deserialized, 1 replicas)
...
Introduction to Spark SQL in Python

Let's practice

Introduction to Spark SQL in Python

Preparing Video For Download...