Aggregating and joining data efficiently

Data Transformation with Spark SQL in Databricks

Disha Mukherjee

Lead Data Engineer

Three questions, one clean dataset

recraft: half: A data analyst reviewing colorful charts and business reports on multiple screens, isolated on transparent background

 

$$

  • Which categories drive the most revenue?
  • Who are the top customers?
  • How do we enrich results with department context?
Data Transformation with Spark SQL in Databricks

How groupBy() and agg() work

 

$$

  • groupBy() - partitions rows by key column
  • agg() - applies functions to each group in parallel
  • Both are lazy - action triggers computation
  • One agg() call = one data scan

recraft: half: Colorful rows of data split into three separate labeled groups with a total value computed below each group, isolated on transparent background

Data Transformation with Spark SQL in Databricks

Category revenue - code

category_revenue = (
    df_valid
    .groupBy("Category")

.agg( F.round(F.sum("Transaction_Amount"), 2).alias("total_revenue"), F.count("Transaction_Amount").alias("transaction_count"), F.round(F.avg("Transaction_Amount"), 2).alias("avg_transaction"), )
.orderBy(F.col("total_revenue").desc())
)
Data Transformation with Spark SQL in Databricks

Category revenue - output

category_revenue.show(truncate=False)
+-----------+-----------------+-----------------+---------------+
|Category   |total_revenue    |transaction_count|avg_transaction|
+-----------+-----------------+-----------------+---------------+
|Clothing   |338266605.24     |6742             |50173.04       |
|Dining     |337631718.64     |6719             |50250.29       |
|Electronics|331102843.72     |6614             |50060.91       |
|Savings    |329346609.07     |6596             |49931.26       |
|Groceries  |329003451.57     |6537             |50329.43       |
|Unknown    |880591.27        |15               |58706.08       |
+-----------+-----------------+-----------------+---------------+
Data Transformation with Spark SQL in Databricks

Enriching with a dimension table

recraft: half: Two database tables being connected by a glowing link or bridge, isolated on transparent background

$$

+-----------+----------+
|Category   |Department|
+-----------+----------+
|Clothing   |Retail    |
|Dining     |Food      |
|Electronics|Tech      |
|Groceries  |Food      |
|Savings    |Finance   |
+-----------+----------+
Data Transformation with Spark SQL in Databricks

Standard left join

df_joined = df_valid.join(df_dim, on="Category", how="left")

df_joined.select( "Customer_ID", "Category", "Department", "Transaction_Amount" ).show(5, truncate=False)
+-----------+-----------+----------+------------------+
|Customer_ID|Category   |Department|Transaction_Amount|
+-----------+-----------+----------+------------------+
|CUST003    |Electronics|Tech      |5752.36           |
|CUST009    |Clothing   |Retail    |28959.12          |
|CUST010    |Savings    |Finance   |72098.18          |
|CUST011    |Savings    |Finance   |49771.05          |
|CUST020    |Groceries  |Food      |69825.14          |
+-----------+-----------+----------+------------------+
Data Transformation with Spark SQL in Databricks

The hidden cost - shuffle

recraft: half: Data packets flowing across a network of connected servers and nodes with arrows showing movement, isolated on transparent background

 

  • Matching keys must land on the same machine
  • Spark ships rows across the network to align them
  • Shuffle = bottleneck at scale
Data Transformation with Spark SQL in Databricks

Spark UI

recraft: full: A dark analytics monitoring dashboard with colorful horizontal bar charts showing job stages, progress bars, and data throughput metrics, isolated on transparent background

 

  • Jobs and Stages - track what ran and how long
  • Shuffle Read/Write - see how much data moved
  • Useful for diagnosing pipeline bottlenecks
Data Transformation with Spark SQL in Databricks

Reading the query plan with .explain()

df_joined.explain(mode="formatted")
...
+- PhotonBroadcastHashJoin LeftOuter (18)
   :- ...
   +- PhotonShuffleExchangeSource (17)
      +- PhotonShuffleMapStage (16)
         +- PhotonShuffleExchangeSink (15)
            +- LocalTableScan (13)
...
  • .explain() - inspect the execution plan
  • Nodes 15 & 16 - no routing Arguments logged
Data Transformation with Spark SQL in Databricks

Broadcast join - the fix

# Wrap the small table in F.broadcast()
df_broadcast = df_valid.join(
    F.broadcast(df_dim),
    on="Category",
    how="left"
)

print(f"Broadcast joined rows: {df_broadcast.count():,}")
Broadcast joined rows: 33,223
Data Transformation with Spark SQL in Databricks

Before and after - verifying the plan

df_broadcast.explain(mode="formatted")
+- PhotonBroadcastHashJoin LeftOuter (18)
   :- ...
   +- PhotonShuffleExchangeSource (17)
      +- PhotonShuffleMapStage (16)
         +- PhotonShuffleExchangeSink (15)
            +- LocalTableScan (13)
...            
(15) PhotonShuffleExchangeSink
Arguments: SinglePartition
(16) PhotonShuffleMapStage
Arguments: EXECUTOR_BROADCAST, [id=#11839]
  • Dim table has routing instructions
  • Large table df_valid never moves
  • One change = minutes to seconds at scale
Data Transformation with Spark SQL in Databricks

Let's practice!

Data Transformation with Spark SQL in Databricks

Preparing Video For Download...