Introduction to Spark SQL in Python
Mark Plutowski
Data Scientist
EXPLAIN SELECT * FROM table1
df = spark.read.load('/temp/df.parquet')
df.registerTempTable('df')
spark.sql('EXPLAIN SELECT * FROM df').first()
Row(plan='== Physical Plan ==\n
*FileScan parquet [word#1928,id#1929L,title#1930,part#1931]
Batched: true,
Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [],
PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>')
== Physical Plan ==
FileScan parquet [word#1928,id#1929L,title#1930,part#1931]
Batched: true,
Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [],
PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>'
df.explain()
== Physical Plan ==
FileScan parquet [word#963,id#964L,title#965,part#966]
Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>
spark.sql("SELECT * FROM df").explain()
== Physical Plan ==
FileScan parquet [word#712,id#713L,title#714,part#715]
Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>
df.cache()
df.explain()
== Physical Plan ==
InMemoryTableScan [word#0, id#1L, title#2, part#3]
+- InMemoryRelation [word#0, id#1L, title#2, part#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- FileScan parquet [word#0,id#1L,title#2,part#3]
Batched: true, Format: Parquet, Location:
InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>
spark.sql("SELECT * FROM df").explain()
== Physical Plan ==
InMemoryTableScan [word#0, id#1L, title#2, part#3]
+- InMemoryRelation [word#0, id#1L, title#2, part#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- FileScan parquet [word#0,id#1L,title#2,part#3]
Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>
SELECT word, COUNT(*) AS count
FROM df
GROUP BY word
ORDER BY count DESC
Equivalent dot notation approach:
df.groupBy('word')
.count()
.sort(desc('count'))
.explain()
== Physical Plan ==
*Sort [count#1040L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(count#1040L DESC NULLS LAST, 200)
+- *HashAggregate(keys=[word#963], functions=[count(1)])
+- Exchange hashpartitioning(word#963, 200)
+- *HashAggregate(keys=[word#963], functions=[partial_count(1)])
+- InMemoryTableScan [word#963]
+- InMemoryRelation [word#963, id#964L, title#965, part#966],
true,10000, StorageLevel(disk, memory, deserialized,
1 replicas)
+- *FileScan parquet [word#963,id#964L,title#965,part#966]
Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet],
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<word:string,id:bigint,title:string,part:int>
FileScan parquet
InMemoryRelation
InMemoryTableScan
HashAggregate(keys=[word#963], ...)`
HashAggregate(keys=[word#963], functions=[count(1)])`
Sort [count#1040L DESC NULLS LAST]`
== Physical Plan ==
*Sort [count#1160L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(count#1160L DESC NULLS LAST, 200)
+- *HashAggregate(keys=[word#963], functions=[count(1)])
+- Exchange hashpartitioning(word#963, 200)
+- *HashAggregate(keys=[word#963], functions=[partial_count(1)])
+- *FileScan parquet [word#963] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/temp/df.parquet], PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<word:string>
The previous plan had the following lines, which are missing from the plan above:
...
+- InMemoryTableScan [word#963]
+- InMemoryRelation [word#963, id#964L, title#965, part#966], true, 10000,
StorageLevel(disk, memory, deserialized, 1 replicas)
...
Introduction to Spark SQL in Python