Data Transformation with Spark SQL in Databricks
Disha Mukherjee
Lead Data Engineer


StructType defines types explicitly, every runfrom pyspark.sql.types import ( StructType, StructField, StringType, DoubleType, IntegerType, TimestampType )from pyspark.sql import functions as F
schema = StructType([ StructField("ID", IntegerType(), True), StructField("Date", TimestampType(), True), StructField("Transaction_Amount", DoubleType(), True), StructField("Transaction_Status", StringType(), True), # ... 6 more fields ])df = (spark.read.format("csv") .option("header", "true") .schema(schema) .load("/Volumes/.../transactions.csv"))df.printSchema()
|-- ID: integer (nullable = true)
|-- Date: timestamp (nullable = true)
|-- Transaction_Amount: double (nullable = true)
|-- Transaction_Status: string (nullable = true)
...
null_counts = df.select([ F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns ])null_counts.show()
+-----------+------------------+--------+--------+
|Customer_ID|Transaction_Amount|Category|Location|
+-----------+------------------+--------+--------+
|30 |50 |40 |30 |
+-----------+------------------+--------+--------+
df_no_nulls = df.na.drop(subset=["Customer_ID"])df_filled = df_no_nulls.na.fill({ "Transaction_Amount": 0.0, "Category": "Unknown", "Location": "Unknown" })print(f"Before: {df.count()}") print(f"After: {df_filled.count()}")
Before: 100,150
After: 100,120
total = df_filled.count()
distinct = df_filled.distinct().count()
print(f"Duplicates: {total - distinct}")
Duplicates: 149
df_deduped = df_filled.dropDuplicates( subset=["Customer_ID", "Date", "Transaction_Amount"] )print(f"Rows after dedup: {df_deduped.count()}")
Rows after dedup: 99,971
df_valid = df_deduped.filter(F.col("Transaction_Amount") > 0)df_valid = df_valid.filter( F.col("Transaction_Status") == "Completed" )print(f"Valid transactions: {df_valid.count()}")
Valid transactions: 33,223
df_enriched = df_valid.withColumn("Revenue_Band", F.when(F.col("Transaction_Amount") >= 50000, "High") .when(F.col("Transaction_Amount") >= 10000, "Medium").otherwise("Low"))df_enriched.select("Customer_ID", "Transaction_Amount", "Revenue_Band").show(3)
+-----------+------------------+------------+
|Customer_ID|Transaction_Amount|Revenue_Band|
+-----------+------------------+------------+
|CUST003 |5752.36 |Low |
|CUST009 |28959.12 |Medium |
|CUST010 |72098.18 |High |
+-----------+------------------+------------+
total_rows = df_enriched.count() distinct_rows = df_enriched.distinct().count()null_rate = ( df_enriched.filter(F.col("Customer_ID").isNull()) .count() / total_rows * 100 )print(f"Total rows: {total_rows}") print(f"Duplicates: {total_rows - distinct_rows}") print(f"Null rate: {null_rate:.2f}%")
Total rows: 33,223
Duplicates: 0
Null rate: 0.00%
Data Transformation with Spark SQL in Databricks