Shaping data with PySpark and SQL

Data Transformation with Spark SQL in Databricks

Disha Mukherjee

Lead Data Engineer

Shaping your data

path = "/Volumes/.../default/transactions.csv"
df = spark.read.csv(path, header=True, inferSchema=True)
df.show(5, truncate=False)

Previewing outputs in Databricks

  • select() → choose columns
  • filter() → choose rows
Data Transformation with Spark SQL in Databricks

Selecting columns

df.select("ID", "Customer_ID", "Transaction_Amount", "Category").show(5)
+---+-----------+------------------+-----------+
| ID|Customer_ID|Transaction_Amount|   Category|
+---+-----------+------------------+-----------+
|  1|    CUST001|          27337.49|Electronics|
|  2|    CUST002|           97716.6|Electronics|
|  3|    CUST003|           5752.36|Electronics|
|  4|    CUST004|          93443.22|    Savings|
|  5|    CUST005|          15109.98|Electronics|
+---+-----------+------------------+-----------+
Data Transformation with Spark SQL in Databricks

Filtering rows

from pyspark.sql import functions as F


filtered_df = df.filter(F.col("Transaction_Status") == "Completed") filtered_df.show(5)
+---+-------------------+-----------+------------------+------------------+
| ID|               Date|Customer_ID|Transaction_Amount|Transaction_Status|
+---+-------------------+-----------+------------------+------------------+
|  3|2023-01-03 00:00:00|    CUST003|           5752.36|         Completed|
|  9|2023-01-09 00:00:00|    CUST009|          28959.12|         Completed|
| 10|2023-01-10 00:00:00|    CUST010|          72098.18|         Completed|
| 11|2023-01-11 00:00:00|    CUST011|          49771.05|         Completed|
| 20|2023-01-20 00:00:00|    CUST020|          69825.14|         Completed|
+---+-------------------+-----------+------------------+------------------+
Data Transformation with Spark SQL in Databricks

Combining filter conditions

high_value_df = df.filter(

(F.col("Transaction_Amount") > 20000) &
(F.col("Transaction_Status") == "Completed")
) high_value_df.show(3)
+---+-------------------+-----------+------------------+------------------+
| ID|               Date|Customer_ID|Transaction_Amount|Transaction_Status|
+---+-------------------+-----------+------------------+------------------+
|  9|2023-01-09 00:00:00|    CUST009|          28959.12|         Completed|
| 10|2023-01-10 00:00:00|    CUST010|          72098.18|         Completed|
| 11|2023-01-11 00:00:00|    CUST011|          49771.05|         Completed|
+---+-------------------+-----------+------------------+------------------+
Data Transformation with Spark SQL in Databricks

Python methods or SQL

 

  • Python methods: select(), filter()
    • Build transformations step by step

 

$$

$$

  • SQL queries: Standard SQL syntax
    • More readable for aggregations and grouping

Python vs SQL

Data Transformation with Spark SQL in Databricks

Creating a temporary view

$$

df.createOrReplaceTempView("transactions")

 

  • Temporary view = name that points to DataFrame
  • Lasts only for the current Spark session
Data Transformation with Spark SQL in Databricks

Querying with SQL

%sql
SELECT Category, COUNT(*) AS transaction_count

FROM transactions
WHERE Transaction_Status = 'Completed' GROUP BY Category ORDER BY transaction_count DESC
+-----------+-----------------+
|   Category|transaction_count|
+-----------+-----------------+
|   Clothing|             6764|
|     Dining|             6732|
|Electronics|             6640|
|    Savings|             6629|
|  Groceries|             6557|
+-----------+-----------------+
Data Transformation with Spark SQL in Databricks

Validating with display()

display(filtered_df)

GIF showing interactive table with sorting and searching

$$

$$

$$

  • Sort, page through results
  • Spot nulls and outliers
Data Transformation with Spark SQL in Databricks

Inspecting query plans

filtered_df.explain()
== Physical Plan ==
*(1) Filter (Transaction_Status = Completed)
+- FileScan csv [ID, Date, Customer_ID, ...]

$$

$$

  • Also works with SQL: EXPLAIN SELECT ...
  • Useful for fixing performance issues
Data Transformation with Spark SQL in Databricks

Reading Spark errors

df.select("Account").show()
[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column with name `Account` 
cannot be resolved. Did you mean one of the following? 
[`Date`, `Location`, `Customer_ID`, `Transaction_Amount`]
Data Transformation with Spark SQL in Databricks

Let's practice!

Data Transformation with Spark SQL in Databricks

Preparing Video For Download...