Data Transformation with Spark SQL in Databricks
Disha Mukherjee
Lead Data Engineer
path = "/Volumes/.../default/transactions.csv"
df = spark.read.csv(path, header=True, inferSchema=True)
df.show(5, truncate=False)

select() → choose columnsfilter() → choose rowsdf.select("ID", "Customer_ID", "Transaction_Amount", "Category").show(5)
+---+-----------+------------------+-----------+
| ID|Customer_ID|Transaction_Amount| Category|
+---+-----------+------------------+-----------+
| 1| CUST001| 27337.49|Electronics|
| 2| CUST002| 97716.6|Electronics|
| 3| CUST003| 5752.36|Electronics|
| 4| CUST004| 93443.22| Savings|
| 5| CUST005| 15109.98|Electronics|
+---+-----------+------------------+-----------+
from pyspark.sql import functions as Ffiltered_df = df.filter(F.col("Transaction_Status") == "Completed") filtered_df.show(5)
+---+-------------------+-----------+------------------+------------------+
| ID| Date|Customer_ID|Transaction_Amount|Transaction_Status|
+---+-------------------+-----------+------------------+------------------+
| 3|2023-01-03 00:00:00| CUST003| 5752.36| Completed|
| 9|2023-01-09 00:00:00| CUST009| 28959.12| Completed|
| 10|2023-01-10 00:00:00| CUST010| 72098.18| Completed|
| 11|2023-01-11 00:00:00| CUST011| 49771.05| Completed|
| 20|2023-01-20 00:00:00| CUST020| 69825.14| Completed|
+---+-------------------+-----------+------------------+------------------+
high_value_df = df.filter((F.col("Transaction_Amount") > 20000) &(F.col("Transaction_Status") == "Completed")) high_value_df.show(3)
+---+-------------------+-----------+------------------+------------------+
| ID| Date|Customer_ID|Transaction_Amount|Transaction_Status|
+---+-------------------+-----------+------------------+------------------+
| 9|2023-01-09 00:00:00| CUST009| 28959.12| Completed|
| 10|2023-01-10 00:00:00| CUST010| 72098.18| Completed|
| 11|2023-01-11 00:00:00| CUST011| 49771.05| Completed|
+---+-------------------+-----------+------------------+------------------+
select(), filter()
$$
$$

$$
df.createOrReplaceTempView("transactions")
%sql SELECT Category, COUNT(*) AS transaction_countFROM transactionsWHERE Transaction_Status = 'Completed' GROUP BY Category ORDER BY transaction_count DESC
+-----------+-----------------+
| Category|transaction_count|
+-----------+-----------------+
| Clothing| 6764|
| Dining| 6732|
|Electronics| 6640|
| Savings| 6629|
| Groceries| 6557|
+-----------+-----------------+
display(filtered_df)

$$
$$
$$
filtered_df.explain()
== Physical Plan ==
*(1) Filter (Transaction_Status = Completed)
+- FileScan csv [ID, Date, Customer_ID, ...]
$$
$$
EXPLAIN SELECT ...df.select("Account").show()
[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column with name `Account`
cannot be resolved. Did you mean one of the following?
[`Date`, `Location`, `Customer_ID`, `Transaction_Amount`]
Data Transformation with Spark SQL in Databricks