Blog

Word count is Spark SQL with a pinch of TF-IDF

Posted By Jakub Nowacki, 25 August 2017

So you’ve probably already did the hello-world of distributed computing, which is word count. On the other hand a lot of tutorials about Spark SQL (and SQL in general) deal mostly with structured data in tabular format. But not all data have structure right away, we sometimes need to create one. This is especially true with all forms of text documents.

Something to count

A typical example of a text document is a book. So let us get the data first to perform some analysis. A great source of data (and reading material) is Gutenberg Project. The below code will get some well-known books and save them in a local data folder.

import os
import urllib.request

if not os.path.exists('data'):
    os.mkdir('data')

urllib.request.urlretrieve('https://www.gutenberg.org/files/2591/2591-0.txt', 'data/grimms_fairy_tales.txt')
urllib.request.urlretrieve('http://www.gutenberg.org/cache/epub/345/pg345.txt', 'data/dracula.txt')
urllib.request.urlretrieve('http://www.gutenberg.org/cache/epub/84/pg84.txt', 'data/frankenstein.txt')
urllib.request.urlretrieve('http://www.gutenberg.org/files/2701/2701-0.txt', 'data/moby_dick.txt')
urllib.request.urlretrieve('http://www.gutenberg.org/files/74/74-0.txt', 'data/tom_sawyer.txt')
urllib.request.urlretrieve('http://www.gutenberg.org/files/2600/2600-0.txt', 'data/war_and_peace.txt');

The original (RDD) word count

We can now create the Spark Session as usual.

import pyspark
import pyspark.sql.functions as func

spark = pyspark.sql.SparkSession.builder\
    .appName('word_count_sql')\
    .getOrCreate()

For completeness sake, let’s first do a typical word count, as we’ve seen many times before. The small extension is that we normalize the words to all lower case. Note that we count words for all the books in the folder.

lines = spark.sparkContext.textFile('data/')

counts = lines.flatMap(lambda line: line.lower().split())\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda x, y: x + y)

counts.top(10, lambda t: t[1])
[('the', 72154),
 ('and', 45006),
 ('to', 32244),
 ('of', 31093),
 ('a', 23232),
 ('in', 18612),
 ('he', 17167),
 ('that', 15367),
 ('his', 14243),
 ('was', 14045)]

Since all the books are in English, as expected the top results are English stop words.

Word count in SQL

Now let’s go into the Spark SQL realm, where all the data are in tables. We use a DataFrameReader text, which reads files line by line, similarly to the old textFile we used before, though we get DataFrame (DF) with rows being lines in file(s).

Straight away we can make a small detour and create a column with the name of the book that the line is taken from. The simplest way is to use the function inpu t_file_name. Thus, as a result of the below code we get a DF with lines in column value and file URI in column file.

file_lines = spark.read.text('data/')\
    .withColumn('file', func.input_file_name())
file_lines.show()
+--------------------+--------------------+
|               value|                file|
+--------------------+--------------------+
|                    |file:///C:/Users/...|
|The Project Guten...|file:///C:/Users/...|
|                    |file:///C:/Users/...|
|This eBook is for...|file:///C:/Users/...|
|no restrictions w...|file:///C:/Users/...|
|it under the term...|file:///C:/Users/...|
|eBook or online a...|file:///C:/Users/...|
|                    |file:///C:/Users/...|
|                    |file:///C:/Users/...|
|Title: War and Peace|file:///C:/Users/...|
|                    |file:///C:/Users/...|
| Author: Leo Tolstoy|file:///C:/Users/...|
|                    |file:///C:/Users/...|
|Translators: Loui...|file:///C:/Users/...|
|                    |file:///C:/Users/...|
|Posting Date: Jan...|file:///C:/Users/...|
|                    |file:///C:/Users/...|
|Last Updated: Dec...|file:///C:/Users/...|
|                    |file:///C:/Users/...|
|   Language: English|file:///C:/Users/...|
+--------------------+--------------------+
only showing top 20 rows

To extract the book name we can use function regexp_extract to get the last portion of the file URI; the file name column is dropped as it will not be used later.

book_lines = file_lines \
    .withColumn('book', func.regexp_extract('file', '^.*/(.*)\.txt$', 1)) \
    .drop('file')
book_lines.show()
+--------------------+-------------+
|               value|         book|
+--------------------+-------------+
|                    |war_and_peace|
|The Project Guten...|war_and_peace|
|                    |war_and_peace|
|This eBook is for...|war_and_peace|
|no restrictions w...|war_and_peace|
|it under the term...|war_and_peace|
|eBook or online a...|war_and_peace|
|                    |war_and_peace|
|                    |war_and_peace|
|Title: War and Peace|war_and_peace|
|                    |war_and_peace|
| Author: Leo Tolstoy|war_and_peace|
|                    |war_and_peace|
|Translators: Loui...|war_and_peace|
|                    |war_and_peace|
|Posting Date: Jan...|war_and_peace|
|                    |war_and_peace|
|Last Updated: Dec...|war_and_peace|
|                    |war_and_peace|
|   Language: English|war_and_peace|
+--------------------+-------------+
only showing top 20 rows

To check if the data extraction works as expected, let’s count the number of lines for each book as follows:

book_lines.groupBy('book').count().show()
+------------------+-----+
|              book|count|
+------------------+-----+
|         moby_dick|22659|
|     war_and_peace|66055|
|           dracula|15973|
|        tom_sawyer| 9209|
|grimms_fairy_tales| 9571|
|      frankenstein| 7653|
+------------------+-----+

Since it seems fine, another step is to get the words out of the lines in the books. A convenient function for that is split, which splits the string in given column using regular expression pattern; for us the split is everything that is not a word:

b = book_lines.select('book', func.split('value', '[\W_]+').alias('line'))
b.printSchema()
b.show()
root
 |-- book: string (nullable = false)
 |-- line: array (nullable = true)
 |    |-- element: string (containsNull = true)

+-------------+--------------------+
|         book|                line|
+-------------+--------------------+
|war_and_peace|                  []|
|war_and_peace|[The, Project, Gu...|
|war_and_peace|                  []|
|war_and_peace|[This, eBook, is,...|
|war_and_peace|[no, restrictions...|
|war_and_peace|[it, under, the, ...|
|war_and_peace|[eBook, or, onlin...|
|war_and_peace|                  []|
|war_and_peace|                  []|
|war_and_peace|[Title, War, and,...|
|war_and_peace|                  []|
|war_and_peace|[Author, Leo, Tol...|
|war_and_peace|                  []|
|war_and_peace|[Translators, Lou...|
|war_and_peace|                  []|
|war_and_peace|[Posting, Date, J...|
|war_and_peace|                  []|
|war_and_peace|[Last, Updated, D...|
|war_and_peace|                  []|
|war_and_peace| [Language, English]|
+-------------+--------------------+
only showing top 20 rows

The result is the same number of rows with arrays of strings, which contains words in them.

To get particular words, we need to use function explode, which returns a row for each element in the array; note that the values for other columns, in our case book, are filled automatically. Also with the same step, let’s normalize the words by using functions trim and lower.

book_words = book_lines\
    .select('book', func.explode(func.split('value', '[\W_]+')).alias('word'))\
    .where(func.length('word') > 0)\
    .select('book', func.trim(func.lower(func.col('word'))).alias('word'))\

book_words.show()    
+-------------+---------+
|         book|     word|
+-------------+---------+
|war_and_peace|      the|
|war_and_peace|  project|
|war_and_peace|gutenberg|
|war_and_peace|    ebook|
|war_and_peace|       of|
|war_and_peace|      war|
|war_and_peace|      and|
|war_and_peace|    peace|
|war_and_peace|       by|
|war_and_peace|      leo|
|war_and_peace|  tolstoy|
|war_and_peace|     this|
|war_and_peace|    ebook|
|war_and_peace|       is|
|war_and_peace|      for|
|war_and_peace|      the|
|war_and_peace|      use|
|war_and_peace|       of|
|war_and_peace|   anyone|
|war_and_peace| anywhere|
+-------------+---------+
only showing top 20 rows

The result is DF with book column and word column as shown above.

With this we can perform the typical word-count very easily, without a need to put any new values in. Note that book column is not necessary for word-count and it’s not used.

book_words.groupBy('word')\
    .count()\
    .orderBy('count', ascending=False)\
    .show(10)
+----+-----+
|word|count|
+----+-----+
| the|73104|
| and|46591|
|  to|32937|
|  of|31333|
|   a|23792|
|  in|19267|
|  he|18453|
|that|17053|
|   i|16486|
| was|14467|
+----+-----+
only showing top 10 rows

As before we get the top 10 English stop words.

In the next blog post we will continue this example and calculate the TF-IDF measure values to find the most characteristic words in each of the analyzed books.

Contact

How can we help?

Drop us a line and we'll respond as soon as possible.

Treble