Introduction to Spark SQL in Python
Mark Plutowski
Data Scientist
The Project Gutenberg eBook of The Adventures of Sherlock Holmes,
by Sir Arthur Conan Doyle.
Available from gutenberg.org
df = spark.read.text('sherlock.txt')
print(df.first())
Row(value='The Project Gutenberg EBook of The Adventures of Sherlock Holmes')
print(df.count())
5500
df1 = spark.read.load('sherlock.parquet')
df1.show(15, truncate=False)
+--------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------+
|The Project Gutenberg EBook of The Adventures of Sherlock Holmes |
|by Sir Arthur Conan Doyle |
|(#15 in our series by Sir Arthur Conan Doyle) |
| |
|Copyright laws are changing all over the world. Be sure to check the|
|copyright laws for your country before downloading or redistributing|
|this or any other Project Gutenberg eBook. |
| |
|This header should be the first thing seen when viewing this Project|
|Gutenberg file. Please do not remove it. Do not change or edit the|
|header without written permission. |
| |
|Please read the "legal small print," and other information about the|
|eBook and Project Gutenberg at the bottom of this file. Included is|
|important information about your specific rights and restrictions in|
+--------------------------------------------------------------------+
df = df1.select(lower(col('value')))
print(df.first())
Row(lower(value)=
'the project gutenberg ebook of the adventures of sherlock holmes')
df.columns
['lower(value)']
df = df1.select(lower(col('value')).alias('v'))
df.columns
['v']
df = df1.select(regexp_replace('value', 'Mr\.', 'Mr').alias('v'))
"Mr. Holmes." ==> "Mr Holmes."
df = df1.select(regexp_replace('value', 'don\'t', 'do not').alias('v'))
"don't know." ==> "do not know."
df = df2.select(split('v', '[ ]').alias('words'))
df.show(truncate=False)
+--------------------------------------------------------------------------------------+
|words |
+--------------------------------------------------------------------------------------+
|[the, project, gutenberg, ebook, of, the, adventures, of, sherlock, holmes] |
|[by, sir, arthur, conan, doyle] |
|[(#15, in, our, series, by, sir, arthur, conan, doyle)] |
|[]
.
.
.
|[please, read, the, "legal, small, print,", and, other, information, about, the] |
.
.
.
|[**welcome, to, the, world, of, free, plain, vanilla, electronic, texts**] |
+--------------------------------------------------------------------------------------+
punctuation = "_|.\?\!\",\'\[\]\*()"
df3 = df2.select(split('v', '[ %s]' % punctuation).alias('words'))
df3.show(truncate=False)
+---------------------------------------------------------------------------------------+
|words |
+---------------------------------------------------------------------------------------+
|[the, project, gutenberg, ebook, of, the, adventures, of, sherlock, holmes] |
|[by, sir, arthur, conan, doyle] |
|[, #15, in, our, series, by, sir, arthur, conan, doyle, ] |
|[] .
.
.
|[please, read, the, , legal, small, print, , , and, other, information, about, the] |
.
.
.
[, , welcome, to, the, world, of, free, plain, vanilla, electronic, texts, , ] |
++---------------------------------------------------------------------------------------+
df4 = df3.select(explode('words').alias('word'))
df4.show()
+----------+
| word|
+----------+
| the|
| project|
| gutenberg|
| ebook|
| of|
| the|
|adventures|
| of|
| sherlock|
| holmes|
| by|
| sir|
| arthur|
| conan|
| doyle|
+----------+
print(df3.count())
5500
print(df4.count())
131404
print(df.count())
131404
nonblank_df = df.where(length('word') > 0)
print(nonblank_df.count())
107320
df2 = df.select('word', monotonically_increasing_id().alias('id'))
df2.show()
+----------+---+
| word| id|
+----------+---+
| the| 0|
| project| 1|
| gutenberg| 2|
| ebook| 3|
| of| 4|
| the| 5|
|adventures| 6|
| of| 7|
| sherlock| 8|
| holmes| 9|
| by| 10|
| sir| 11|
| arthur| 12|
| conan| 13|
| doyle| 14|
| #15| 15|
+----------+---+
df2 = df.withColumn('title', when(df.id < 25000, 'Preface')
.when(df.id < 50000, 'Chapter 1')
.when(df.id < 75000, 'Chapter 2')
.otherwise('Chapter 3'))
df2 = df2.withColumn('part', when(df2.id < 25000, 0)
.when(df2.id < 50000, 1)
.when(df2.id < 75000, 2)
.otherwise(3))
.show()
+----------+---+------------+----+
|word |id |title |part|
+----------+---+------------+----+
|the |0 | Preface|0 |
|project |1 | Preface|0 |
|gutenberg |2 | Preface|0 |
|ebook |3 | Preface|0 |
|of |4 | Preface|0 |
|the |5 | Preface|0 |
|adventures|6 | Preface|0 |
|of |7 | Preface|0 |
|sherlock |8 | Preface|0 |
|holmes |9 | Preface|0 |
df2 = df.repartition(4, 'part')
print(df2.rdd.getNumPartitions())
4
$ ls sherlock_parts
sherlock_part0.txt
sherlock_part1.txt
sherlock_part2.txt
sherlock_part3.txt
sherlock_part4.txt
sherlock_part5.txt
sherlock_part6.txt
sherlock_part7.txt
sherlock_part8.txt
sherlock_part9.txt
sherlock_part10.txt
sherlock_part11.txt
sherlock_part12.txt
sherlock_part13.txt
df_parts = spark.read.text('sherlock_parts')
Introduction to Spark SQL in Python