Data Transformation with Spark SQL in Databricks
Disha Mukherjee
Lead Data Engineer

groupBy() → one row per group (totals, averages)from pyspark.sql.window import Window window_spec = ( Window.partitionBy("Customer_ID") .orderBy("Date") .rowsBetween(Window.unboundedPreceding, Window.currentRow) )df_running = df_valid.withColumn( "running_total", F.round(F.sum("Transaction_Amount").over(window_spec), 2) )
+-----------+-------------------+------------------+-------------+
|Customer_ID|Date |Transaction_Amount|running_total|
+-----------+-------------------+------------------+-------------+
|CUST003 |2023-01-03 00:00:00|5752.36 |5752.36 |
|CUST003 |2023-02-14 00:00:00|12340.00 |18092.36 |
+-----------+-------------------+------------------+-------------+
customer_totals = ( df_valid.groupBy("Customer_ID") .agg(F.round(F.sum("Transaction_Amount"), 2).alias("total_revenue")) )rank_window = Window.orderBy(F.col("total_revenue").desc())df_ranked = customer_totals.withColumn("revenue_rank", F.rank().over(rank_window))
+-----------+-------------+------------+
|Customer_ID|total_revenue|revenue_rank|
+-----------+-------------+------------+
|CUST1469 |99996.20 |1 |
|CUST19129 |99990.14 |2 |
|CUST39417 |99982.21 |3 |
+-----------+-------------+------------+

$$
Stream directory:
day_1.csv (108 KB)
day_2.csv (108 KB)
day_3.csv (108 KB)
day_4.csv (108 KB)
day_5.csv (108 KB)
df_stream = (
spark.readStream.format("csv")
.option("header", "true")
.schema(streaming_schema)
.load(STREAM_DIR)
)
print(df_stream.isStreaming)
True

$$
query = (
df_stream.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", CHECKPOINT_DIR)
.option("path", DELTA_PATH)
.trigger(availableNow=True)
.start()
)
query.awaitTermination()
Status: Stopped
Rows written: 5,000
print(query.status)
progress = query.lastProgress
print(f"Rows processed: {progress['numInputRows']}")
print(f"Rows/sec: {progress['processedRowsPerSecond']:.0f}")
{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}
Rows processed: 5,000
Rows/sec: 752
query_restart = (
df_stream.writeStream.format("delta")
.option("checkpointLocation", CHECKPOINT_DIR)
.option("path", DELTA_PATH)
.trigger(availableNow=True)
.start()
)
query_restart.awaitTermination()
Rows on restart: 0
Data Transformation with Spark SQL in Databricks