Data Transformation with Spark SQL in Databricks
Disha Mukherjee
Lead Data Engineer

$$
$$
groupBy() - partitions rows by key columnagg() - applies functions to each group in parallelagg() call = one data scan
category_revenue = ( df_valid .groupBy("Category").agg( F.round(F.sum("Transaction_Amount"), 2).alias("total_revenue"), F.count("Transaction_Amount").alias("transaction_count"), F.round(F.avg("Transaction_Amount"), 2).alias("avg_transaction"), ).orderBy(F.col("total_revenue").desc()))
category_revenue.show(truncate=False)
+-----------+-----------------+-----------------+---------------+
|Category |total_revenue |transaction_count|avg_transaction|
+-----------+-----------------+-----------------+---------------+
|Clothing |338266605.24 |6742 |50173.04 |
|Dining |337631718.64 |6719 |50250.29 |
|Electronics|331102843.72 |6614 |50060.91 |
|Savings |329346609.07 |6596 |49931.26 |
|Groceries |329003451.57 |6537 |50329.43 |
|Unknown |880591.27 |15 |58706.08 |
+-----------+-----------------+-----------------+---------------+

$$
+-----------+----------+
|Category |Department|
+-----------+----------+
|Clothing |Retail |
|Dining |Food |
|Electronics|Tech |
|Groceries |Food |
|Savings |Finance |
+-----------+----------+
df_joined = df_valid.join(df_dim, on="Category", how="left")df_joined.select( "Customer_ID", "Category", "Department", "Transaction_Amount" ).show(5, truncate=False)
+-----------+-----------+----------+------------------+
|Customer_ID|Category |Department|Transaction_Amount|
+-----------+-----------+----------+------------------+
|CUST003 |Electronics|Tech |5752.36 |
|CUST009 |Clothing |Retail |28959.12 |
|CUST010 |Savings |Finance |72098.18 |
|CUST011 |Savings |Finance |49771.05 |
|CUST020 |Groceries |Food |69825.14 |
+-----------+-----------+----------+------------------+


df_joined.explain(mode="formatted")
...
+- PhotonBroadcastHashJoin LeftOuter (18)
:- ...
+- PhotonShuffleExchangeSource (17)
+- PhotonShuffleMapStage (16)
+- PhotonShuffleExchangeSink (15)
+- LocalTableScan (13)
...
.explain() - inspect the execution plan# Wrap the small table in F.broadcast() df_broadcast = df_valid.join( F.broadcast(df_dim), on="Category", how="left" )print(f"Broadcast joined rows: {df_broadcast.count():,}")
Broadcast joined rows: 33,223
df_broadcast.explain(mode="formatted")
+- PhotonBroadcastHashJoin LeftOuter (18)
:- ...
+- PhotonShuffleExchangeSource (17)
+- PhotonShuffleMapStage (16)
+- PhotonShuffleExchangeSink (15)
+- LocalTableScan (13)
...
(15) PhotonShuffleExchangeSink
Arguments: SinglePartition
(16) PhotonShuffleMapStage
Arguments: EXECUTOR_BROADCAST, [id=#11839]
df_valid never movesData Transformation with Spark SQL in Databricks