Posted By Jakub Nowacki, 25 August 2017
So you’ve probably already did the hello-world of distributed computing, which is word count. On the other hand a lot of tutorials about Spark SQL (and SQL in general) deal mostly with structured data in tabular format. But not all data have structure right away, we sometimes need to create one. This is especially true with all forms of text documents.
Something to count
A typical example of a text document is a book. So let us get the data first to
perform some analysis. A great source of data (and reading material) is
Gutenberg Project. The below code will get some
well-known books and save them in a local data
folder.
import os
import urllib.request
if not os.path.exists('data'):
os.mkdir('data')
urllib.request.urlretrieve('https://www.gutenberg.org/files/2591/2591-0.txt', 'data/grimms_fairy_tales.txt')
urllib.request.urlretrieve('http://www.gutenberg.org/cache/epub/345/pg345.txt', 'data/dracula.txt')
urllib.request.urlretrieve('http://www.gutenberg.org/cache/epub/84/pg84.txt', 'data/frankenstein.txt')
urllib.request.urlretrieve('http://www.gutenberg.org/files/2701/2701-0.txt', 'data/moby_dick.txt')
urllib.request.urlretrieve('http://www.gutenberg.org/files/74/74-0.txt', 'data/tom_sawyer.txt')
urllib.request.urlretrieve('http://www.gutenberg.org/files/2600/2600-0.txt', 'data/war_and_peace.txt');
The original (RDD) word count
We can now create the Spark Session as usual.
import pyspark
import pyspark.sql.functions as func
spark = pyspark.sql.SparkSession.builder\
.appName('word_count_sql')\
.getOrCreate()
For completeness sake, let’s first do a typical word count, as we’ve seen many times before. The small extension is that we normalize the words to all lower case. Note that we count words for all the books in the folder.
lines = spark.sparkContext.textFile('data/')
counts = lines.flatMap(lambda line: line.lower().split())\
.map(lambda word: (word, 1))\
.reduceByKey(lambda x, y: x + y)
counts.top(10, lambda t: t[1])
[('the', 72154),
('and', 45006),
('to', 32244),
('of', 31093),
('a', 23232),
('in', 18612),
('he', 17167),
('that', 15367),
('his', 14243),
('was', 14045)]
Since all the books are in English, as expected the top results are English stop words.
Word count in SQL
Now let’s go into the Spark SQL realm, where all the data are in tables. We use a
DataFrameReader text
, which reads files line by line,
similarly to the old textFile
we used before, though we get DataFrame (DF)
with rows being lines in file(s).
Straight away we can make a small detour and create a column with the name of the
book that the line is taken from. The simplest way is to use the function inpu
t_file_name
. Thus, as a result of the below code we get
a DF with lines in column value
and file URI in column file
.
file_lines = spark.read.text('data/')\
.withColumn('file', func.input_file_name())
file_lines.show()
+--------------------+--------------------+
| value| file|
+--------------------+--------------------+
| |file:///C:/Users/...|
|The Project Guten...|file:///C:/Users/...|
| |file:///C:/Users/...|
|This eBook is for...|file:///C:/Users/...|
|no restrictions w...|file:///C:/Users/...|
|it under the term...|file:///C:/Users/...|
|eBook or online a...|file:///C:/Users/...|
| |file:///C:/Users/...|
| |file:///C:/Users/...|
|Title: War and Peace|file:///C:/Users/...|
| |file:///C:/Users/...|
| Author: Leo Tolstoy|file:///C:/Users/...|
| |file:///C:/Users/...|
|Translators: Loui...|file:///C:/Users/...|
| |file:///C:/Users/...|
|Posting Date: Jan...|file:///C:/Users/...|
| |file:///C:/Users/...|
|Last Updated: Dec...|file:///C:/Users/...|
| |file:///C:/Users/...|
| Language: English|file:///C:/Users/...|
+--------------------+--------------------+
only showing top 20 rows
To extract the book name we can use function regexp_extract
to get the last portion of the file URI; the file name column is dropped
as it will not be used later.
book_lines = file_lines \
.withColumn('book', func.regexp_extract('file', '^.*/(.*)\.txt$', 1)) \
.drop('file')
book_lines.show()
+--------------------+-------------+
| value| book|
+--------------------+-------------+
| |war_and_peace|
|The Project Guten...|war_and_peace|
| |war_and_peace|
|This eBook is for...|war_and_peace|
|no restrictions w...|war_and_peace|
|it under the term...|war_and_peace|
|eBook or online a...|war_and_peace|
| |war_and_peace|
| |war_and_peace|
|Title: War and Peace|war_and_peace|
| |war_and_peace|
| Author: Leo Tolstoy|war_and_peace|
| |war_and_peace|
|Translators: Loui...|war_and_peace|
| |war_and_peace|
|Posting Date: Jan...|war_and_peace|
| |war_and_peace|
|Last Updated: Dec...|war_and_peace|
| |war_and_peace|
| Language: English|war_and_peace|
+--------------------+-------------+
only showing top 20 rows
To check if the data extraction works as expected, let’s count the number of lines for each book as follows:
book_lines.groupBy('book').count().show()
+------------------+-----+
| book|count|
+------------------+-----+
| moby_dick|22659|
| war_and_peace|66055|
| dracula|15973|
| tom_sawyer| 9209|
|grimms_fairy_tales| 9571|
| frankenstein| 7653|
+------------------+-----+
Since it seems fine, another step is to get the words out of the lines in the
books. A convenient function for that is split
, which splits
the string in given column using regular expression pattern; for us the split is
everything that is not a word:
b = book_lines.select('book', func.split('value', '[\W_]+').alias('line'))
b.printSchema()
b.show()
root
|-- book: string (nullable = false)
|-- line: array (nullable = true)
| |-- element: string (containsNull = true)
+-------------+--------------------+
| book| line|
+-------------+--------------------+
|war_and_peace| []|
|war_and_peace|[The, Project, Gu...|
|war_and_peace| []|
|war_and_peace|[This, eBook, is,...|
|war_and_peace|[no, restrictions...|
|war_and_peace|[it, under, the, ...|
|war_and_peace|[eBook, or, onlin...|
|war_and_peace| []|
|war_and_peace| []|
|war_and_peace|[Title, War, and,...|
|war_and_peace| []|
|war_and_peace|[Author, Leo, Tol...|
|war_and_peace| []|
|war_and_peace|[Translators, Lou...|
|war_and_peace| []|
|war_and_peace|[Posting, Date, J...|
|war_and_peace| []|
|war_and_peace|[Last, Updated, D...|
|war_and_peace| []|
|war_and_peace| [Language, English]|
+-------------+--------------------+
only showing top 20 rows
The result is the same number of rows with arrays of strings, which contains words in them.
To get particular words, we need to use function explode
,
which returns a row for each element in the array; note that the values for
other columns, in our case book
, are filled automatically. Also with the same
step, let’s normalize the words by using functions trim
and lower
.
book_words = book_lines\
.select('book', func.explode(func.split('value', '[\W_]+')).alias('word'))\
.where(func.length('word') > 0)\
.select('book', func.trim(func.lower(func.col('word'))).alias('word'))\
book_words.show()
+-------------+---------+
| book| word|
+-------------+---------+
|war_and_peace| the|
|war_and_peace| project|
|war_and_peace|gutenberg|
|war_and_peace| ebook|
|war_and_peace| of|
|war_and_peace| war|
|war_and_peace| and|
|war_and_peace| peace|
|war_and_peace| by|
|war_and_peace| leo|
|war_and_peace| tolstoy|
|war_and_peace| this|
|war_and_peace| ebook|
|war_and_peace| is|
|war_and_peace| for|
|war_and_peace| the|
|war_and_peace| use|
|war_and_peace| of|
|war_and_peace| anyone|
|war_and_peace| anywhere|
+-------------+---------+
only showing top 20 rows
The result is DF with book
column and word
column as shown above.
With this we can perform the typical word-count very easily, without a need to
put any new values in. Note that book
column is not necessary for word-count
and it’s not used.
book_words.groupBy('word')\
.count()\
.orderBy('count', ascending=False)\
.show(10)
+----+-----+
|word|count|
+----+-----+
| the|73104|
| and|46591|
| to|32937|
| of|31333|
| a|23792|
| in|19267|
| he|18453|
|that|17053|
| i|16486|
| was|14467|
+----+-----+
only showing top 10 rows
As before we get the top 10 English stop words.
In the next blog post we will continue this example and calculate the TF-IDF measure values to find the most characteristic words in each of the analyzed books.