Window functions and streaming queries

Data Transformation with Spark SQL in Databricks

Disha Mukherjee

Lead Data Engineer

When groupBy isn't enough

recraft: half: A magnifying glass over rows of spreadsheet data with highlighted calculated values

 

  • groupBy() → one row per group (totals, averages)
  • Window function → adds a column, keeps every row
  • Use case: running totals, rankings, row comparisons
Data Transformation with Spark SQL in Databricks

Running total

from pyspark.sql.window import Window

window_spec = (
    Window.partitionBy("Customer_ID")
    .orderBy("Date")
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

df_running = df_valid.withColumn( "running_total", F.round(F.sum("Transaction_Amount").over(window_spec), 2) )
Data Transformation with Spark SQL in Databricks

Running total

+-----------+-------------------+------------------+-------------+
|Customer_ID|Date               |Transaction_Amount|running_total|
+-----------+-------------------+------------------+-------------+
|CUST003    |2023-01-03 00:00:00|5752.36           |5752.36      |
|CUST003    |2023-02-14 00:00:00|12340.00          |18092.36     |
+-----------+-------------------+------------------+-------------+
Data Transformation with Spark SQL in Databricks

Ranking customers

customer_totals = (
    df_valid.groupBy("Customer_ID")
    .agg(F.round(F.sum("Transaction_Amount"), 2).alias("total_revenue"))
)


rank_window = Window.orderBy(F.col("total_revenue").desc())
df_ranked = customer_totals.withColumn("revenue_rank", F.rank().over(rank_window))
+-----------+-------------+------------+
|Customer_ID|total_revenue|revenue_rank|
+-----------+-------------+------------+
|CUST1469   |99996.20     |1           |
|CUST19129  |99990.14     |2           |
|CUST39417  |99982.21     |3           |
+-----------+-------------+------------+
Data Transformation with Spark SQL in Databricks

What is streaming?

 

  • Batch - process a fixed dataset all at once
  • Streaming - process data incrementally as it arrives
  • Ideal for transaction feeds, logs, and event data

recraft: half: Data flowing continuously as a stream into a processing engine with arrows

Data Transformation with Spark SQL in Databricks

File-based streaming

$$

Stream directory:
  day_1.csv  (108 KB)
  day_2.csv  (108 KB)
  day_3.csv  (108 KB)
  day_4.csv  (108 KB)
  day_5.csv  (108 KB)

 

  • Directory of CSV files = streaming source
  • Each new file = one micro-batch
  • New files picked up automatically
  • Schema must be defined explicitly
Data Transformation with Spark SQL in Databricks

Reading the stream

df_stream = (
    spark.readStream.format("csv")
    .option("header", "true")
    .schema(streaming_schema)
    .load(STREAM_DIR)
)

print(df_stream.isStreaming)
True
Data Transformation with Spark SQL in Databricks

What are checkpoints?

recraft: half: A bookmark or save point marker in a flowing data pipeline with progress saved

 

$$

  • Checkpoint = metadata directory on disk
  • Records which files have been processed
  • On restart, Spark resumes where it left off
  • Prevents duplicate processing
Data Transformation with Spark SQL in Databricks

Writing the stream

query = (
    df_stream.writeStream.format("delta")
    .outputMode("append")
    .option("checkpointLocation", CHECKPOINT_DIR)
    .option("path", DELTA_PATH)
    .trigger(availableNow=True)
    .start()
)
query.awaitTermination()
Status:       Stopped
Rows written: 5,000
Data Transformation with Spark SQL in Databricks

Monitoring: status and lastProgress

print(query.status)

progress = query.lastProgress
print(f"Rows processed: {progress['numInputRows']}")
print(f"Rows/sec: {progress['processedRowsPerSecond']:.0f}")
{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}
Rows processed: 5,000
Rows/sec:       752
Data Transformation with Spark SQL in Databricks

Checkpoint recovery

query_restart = (
    df_stream.writeStream.format("delta")
    .option("checkpointLocation", CHECKPOINT_DIR)
    .option("path", DELTA_PATH)
    .trigger(availableNow=True)
    .start()
)
query_restart.awaitTermination()
Rows on restart: 0
Data Transformation with Spark SQL in Databricks

Let's practice!

Data Transformation with Spark SQL in Databricks

Preparing Video For Download...