Data cleaning and quality checks

Data Transformation with Spark SQL in Databricks

Disha Mukherjee

Lead Data Engineer

Three problems in real data

recraft: half: A pile of scattered documents and files with errors and gaps, isolated on transparent background

 

  • Missing values in key columns
  • Duplicate rows from repeated ingestion
  • Logically invalid records
Data Transformation with Spark SQL in Databricks

Why define a schema explicitly?

recraft: half: A confused scientist with question marks floating over spreadsheet rows, isolated on transparent background

 

  • Spark can misread column types
  • Numeric columns can be inferred as strings
  • StructType defines types explicitly, every run
Data Transformation with Spark SQL in Databricks

Imports

from pyspark.sql.types import (
    StructType, StructField,
    StringType, DoubleType, IntegerType, TimestampType
)

from pyspark.sql import functions as F
Data Transformation with Spark SQL in Databricks

Defining the schema

schema = StructType([
    StructField("ID",                 IntegerType(),   True),
    StructField("Date",               TimestampType(), True),
    StructField("Transaction_Amount", DoubleType(),    True),
    StructField("Transaction_Status", StringType(),    True),
    # ... 6 more fields
])

df = (spark.read.format("csv") .option("header", "true") .schema(schema) .load("/Volumes/.../transactions.csv"))
df.printSchema()
 |-- ID: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Transaction_Amount: double (nullable = true)
 |-- Transaction_Status: string (nullable = true)
 ...
Data Transformation with Spark SQL in Databricks

Finding the nulls

null_counts = df.select([
    F.sum(F.col(c).isNull().cast("int")).alias(c)
    for c in df.columns
])

null_counts.show()
+-----------+------------------+--------+--------+
|Customer_ID|Transaction_Amount|Category|Location|
+-----------+------------------+--------+--------+
|30         |50                |40      |30      |
+-----------+------------------+--------+--------+
Data Transformation with Spark SQL in Databricks

Dropping and filling nulls

df_no_nulls = df.na.drop(subset=["Customer_ID"])

df_filled = df_no_nulls.na.fill({ "Transaction_Amount": 0.0, "Category": "Unknown", "Location": "Unknown" })
print(f"Before: {df.count()}") print(f"After: {df_filled.count()}")
Before: 100,150
After:  100,120
Data Transformation with Spark SQL in Databricks

Handling duplicates

total = df_filled.count()
distinct = df_filled.distinct().count()

print(f"Duplicates: {total - distinct}")
Duplicates: 149
Data Transformation with Spark SQL in Databricks

Handling duplicates

df_deduped = df_filled.dropDuplicates(
    subset=["Customer_ID", "Date", "Transaction_Amount"]
)

print(f"Rows after dedup: {df_deduped.count()}")
Rows after dedup: 99,971
Data Transformation with Spark SQL in Databricks

Filtering invalid records

df_valid = df_deduped.filter(F.col("Transaction_Amount") > 0)

df_valid = df_valid.filter( F.col("Transaction_Status") == "Completed" )
print(f"Valid transactions: {df_valid.count()}")
Valid transactions: 33,223
Data Transformation with Spark SQL in Databricks

Creating derived columns

df_enriched = df_valid.withColumn("Revenue_Band",
    F.when(F.col("Transaction_Amount") >= 50000, "High")
     .when(F.col("Transaction_Amount") >= 10000, "Medium")

.otherwise("Low"))
df_enriched.select("Customer_ID", "Transaction_Amount", "Revenue_Band").show(3)
+-----------+------------------+------------+
|Customer_ID|Transaction_Amount|Revenue_Band|
+-----------+------------------+------------+
|CUST003    |5752.36           |Low         |
|CUST009    |28959.12          |Medium      |
|CUST010    |72098.18          |High        |
+-----------+------------------+------------+
Data Transformation with Spark SQL in Databricks

Data quality check

total_rows = df_enriched.count()
distinct_rows = df_enriched.distinct().count()

null_rate = ( df_enriched.filter(F.col("Customer_ID").isNull()) .count() / total_rows * 100 )
print(f"Total rows: {total_rows}") print(f"Duplicates: {total_rows - distinct_rows}") print(f"Null rate: {null_rate:.2f}%")
Total rows:   33,223
Duplicates:   0
Null rate:    0.00%
Data Transformation with Spark SQL in Databricks

Let's practice!

Data Transformation with Spark SQL in Databricks

Preparing Video For Download...