Introduction to Spark SQL in Python
Mark Plutowski
Data Scientist
df.columns
['train_id', 'station', 'time']
df.show(5)
+--------+-------------+-----+
|train_id| station| time|
+--------+-------------+-----+
| 324|San Francisco|7:59 |
| 324| 22nd Street|8:03 |
| 324| Millbrae|8:16 |
| 324| Hillsdale|8:24 |
| 324| Redwood City|8:31 |
+--------+-------------+-----+
df.select('train_id','station')
.show(5)
+--------+-------------+
|train_id| station|
+--------+-------------+
| 324|San Francisco|
| 324| 22nd Street|
| 324| Millbrae|
| 324| Hillsdale|
| 324| Redwood City|
+--------+-------------+
df.select('train_id', 'station')
df.select(df.train_id, df.station)
from pyspark.sql.functions import col
df.select(col('train_id'), col('station'))
df.select('train_id','station')
.withColumnRenamed('train_id','train')
.show(5)
+-----+-------------+
|train| station|
+-----+-------------+
| 324|San Francisco|
| 324| 22nd Street|
| 324| Millbrae|
| 324| Hillsdale|
| 324| Redwood City|
+-----+-------------+
df.select(col('train_id').alias('train'), 'station')
df.select('train_id', df.station, col('time'))
spark.sql('SELECT train_id AS train, station FROM schedule LIMIT 5')
.show()
+-----+-------------+
|train| station|
+-----+-------------+
| 324|San Francisco|
| 324| 22nd Street|
| 324| Millbrae|
| 324| Hillsdale|
| 324| Redwood City|
+-----+-------------+
df.select(col('train_id').alias('train'), 'station')
.limit(5)
.show()
query = """
SELECT *,
ROW_NUMBER() OVER(PARTITION BY train_id ORDER BY time) AS id
FROM schedule
"""
spark.sql(query)
.show(11)
+--------+-------------+-----+---+
|train_id| station| time| id|
+--------+-------------+-----+---+
| 217| Gilroy|6:06 | 1|
| 217| San Martin|6:15 | 2|
| 217| Morgan Hill|6:21 | 3|
| 217| Blossom Hill|6:36 | 4|
| 217| Capitol|6:42 | 5|
| 217| Tamien|6:50 | 6|
| 217| San Jose|6:59 | 7|
| 324|San Francisco|7:59 | 1|
| 324| 22nd Street|8:03 | 2|
| 324| Millbrae|8:16 | 3|
| 324| Hillsdale|8:24 | 4|
+--------+-------------+-----+---+
from pyspark.sql import Window,
from pyspark.sql.functions import row_number
df.withColumn("id", row_number()
.over(
Window.partitionBy('train_id')
.orderBy('time')
)
)
pyspark.sql.functions.row_number
pyspark.sql.Window
pyspark.sql.Window.partitionBy
pyspark.sql.Window.orderBy
over
function in Spark SQL corresponds to a OVER
clause in SQL. pyspark.sql.window.Window
represents the inside of an OVER
clause. window = Window.partitionBy('train_id').orderBy('time')
dfx = df.withColumn('next', lead('time',1).over(window))
Introduction to Spark SQL in Python