#### Blog

##### Posted By Jakub Nowacki, 05 January 2018

In the previous Dask post we’ve looked into basic data extraction using Dask. In this post we’ll follow up the problem and show how to perform more complex tasks with Dask in a similar way as we’d do in Pandas but on a larger data set.

First we need to import some Dask elements that we will use and load the data; see the previous Dask post for he data extraction details. Note that I set the index to id as we will use it in the later date.

import dask

import pandas as pd
import re

%matplotlib inline

posts = dd.read_parquet('data/posts_tags.parq', engine='fastparquet')\
.set_index('id')
posts

creation_date score tags view_count
npartitions=100
4 datetime64[ns] int64 object int64
642315 ... ... ... ...
... ... ... ... ...
45537494 ... ... ... ...
45901673 ... ... ... ...

First, lets see how many posts we have per day. Since Dask implements Pandas’ timeseries features under accessor dt, we can just extract date from creation_date, which is a timestamp type, and count the number of occurrences.

posts_count = posts.creation_date.dt.date.value_counts()
posts_count

Dask Series Structure:
npartitions=1
int64
...
Name: creation_date, dtype: int64

with ProgressBar():


[########################################] | 100% Completed | 25.0s

2017-08-23    9531
2017-07-27    9450
2017-08-24    9366
2017-08-03    9345
2017-03-22    9342
Name: creation_date, dtype: int64


Note, that this was quite quick despite the data volume. This is due to using Parquet, as it stores statistics about the columns. Having the result as Pandas DataFrame, we can plot it.

posts_count_df.plot();


The plot seems fine, but since we have about 9 years of posts, plotting every day value is a bit messy. We can use Pandas resampling features to aggregate counts by month. To do that, we need first to cast the index to DatetimeIndex as extracting date turns it into string.

posts_count_df.index = pd.DatetimeIndex(posts_count_df.index)


Now we can resample the data by month (start) and sum all the counts for the day; note that you may use count here as well but this will really count how many days in a month we have, not the count of posts, as we already performed this aggregate.

posts_by_month = posts_count_df.resample('MS').sum()\
.rename('count').to_frame()

count
2008-07-01 4
2008-08-01 3927
2008-09-01 14522
2008-10-01 14925
2008-11-01 12957
posts_by_month.plot();


The resulting plot is much more readable than the previous one. We can clearly see that Stack Overflow becomes more popular with time.

We can also perform a year grouping since we already have a timeseries index. This time we just access index directly and it has the year property already.

posts_count_df.groupby(posts_count_df.index.year).sum()\
.rename('count').to_frame().plot.bar();


As expected, we have similar but more clear plot for a year-to-year comparison. It is now more visible, that while the posts number is on the rise, it is less sharp since 2013.

Lets now focus on the actual task we wanted to perform, i.e. looking into tags popularity. Since the number of posts is quite large, lets first look into tags file Tags.xml. Since we didn’t save the result, lets use a similar data extraction we performed the previous Dask post.

def extract_column_value(line, col_name, cast_type=str):
pattern_tpl = r'{col}="([^"]*)"'
pattern = pattern_tpl.format(col=col_name)
match = re.search(pattern, line)

if cast_type == int:
null_value = 0
else:
null_value = None

return cast_type(match[1]) if match is not None else null_value

def extract_tags_columns(line):
row = {
'tag_name': extract_column_value(line, 'TagName', str),
'count': extract_column_value(line, 'Count', int),
}
return row

tags = db.read_text('data/Tags.xml')\
.filter(lambda l: l.find('<row') > 0)\
.map(extract_tags_columns)\
.to_dataframe().set_index('tag_name').compute()
print(tags.shape)

(50812, 1)

count
tag_name
.a 104
.app 116
.aspxauth 50
.bash-profile 536
.class-file 212
.d.ts 5
.doc 116
.emf 59
.git-folder 6
.git-info-grafts 5

There is a large number of tags, namely over 50 000, hence, we’d better focus on a subset of them. Lets say we’d like to see how the languages popularity grows with time. Since if more people uses the language, there is likely more questions about it, we can just assume, that the post count gives us the measure of popularity of a language.

Below, I selected a number of popular programming languages; note that this selection is rather subjective and can be changed with some other one.

langs = ['c', 'c++', 'c#', 'java', 'javascript', 'python', 'r']
tags.loc[langs, :]

count
tag_name
c 266262
c++ 545591
c# 1161054
java 1347927
javascript 1519901
python 861621
r 213841

Just by looking into the tags file we can see that the tags indeed exist, so we can use them on the posts data.

Lets see how the tags are embedded into posts DataFrame.

posts.head()

creation_date score tags view_count
id
4 2008-07-31 21:42:52.667 506 <c#><winforms><type-conversion><decimal><opacity> 32399
6 2008-07-31 22:08:08.620 223 <html><css><css3><internet-explorer-7> 14997
9 2008-07-31 23:40:59.743 1546 <c#><.net><datetime> 399006
11 2008-07-31 23:55:37.967 1205 <c#><datetime><time><datediff><relative-time-s... 122973
13 2008-08-01 00:42:38.903 495 <javascript><html><browser><timezone><timezone... 141997

Since tags are in a string format, we need to extract and flatten them, to have date-tag mapping. Fortunately, Dask implements the string accessor str, known from Pandas.

Here we use extractall function, which will list all the matches of the regular expression pattern.

post_tags = posts.tags.str.lower()\
.str.extractall('<([^>]*)>')\
.rename(columns={0: 'tag'})

tag
id match
4 0 c#
1 winforms
2 type-conversion
3 decimal
4 opacity

Note that this gives us MultiIndex index, which has currently mixed support in Dask; e.g. see the issue with reset_index.

Since we want only to select the subset of tags, i.e only the tags in the prevously defined language list. For this operation we can use where method. Note that this will replace the False conditions with other argument, NaN by default. Thus we need to also remove NaN values using dropna. Since we will be using this Dask DataFrame a few times later, we can use method persist to save the result in memory and not recompute it all the time we want to use it.

with ProgressBar():
post_known_tags = post_tags.where(post_tags.tag.isin(langs))\
.dropna().persist()

[########################################] | 100% Completed |  8min 19.8s

tag
id match
4 0 c#
9 0 c#
11 0 c#
13 0 javascript
16 0 c#

This operation takes about 8 min, but later the resulting DataFrame at this node of graph is already in memory, so if we use it as a starting point, the following computations will be not repeating this step. For example lets count the tags.

with ProgressBar():
known_tags_count = post_known_tags.tag.value_counts().to_frame().compute()
known_tags_count

[########################################] | 100% Completed | 24.3s

tag
javascript 1457900
java 1303961
c# 1129772
python 806760
c++ 529922
c 257772
r 198930

You can see that this took only few seconds, as the source DataFrame for computation is already in memory. Otherwise, it would again take more than 8 min.

Now we have tags but not the dates. To get this information we need to join the tags DataFrame back to posts. Note that to perform a MultiIndex join, you need to have the indices named, otherwise it’ll fail. Also we perform two operations needed for the next step. Namely, we extract the date from creation date, which is timestamp, and turn the tags from a column of strings (object) into categorical type.

post_tag_join = posts.join(post_known_tags, how='inner')\
.assign(creation_date=lambda df: df.creation_date.dt.date)\
.assign(tag=lambda df: df.tag.astype('category'))

post_tag_join

creation_date score tags view_count tag
npartitions=100
4 object int64 object int64 category[known]
642315 ... ... ... ... ...
... ... ... ... ... ...
45537494 ... ... ... ... ...
45901673 ... ... ... ... ...

Again, we can persist it for convenience.

with ProgressBar():
post_tag_join = post_tag_join.persist()

[########################################] | 100% Completed |  5min 44.8s

post_tag_join.head()

creation_date score tags view_count tag
id match
4 0 2008-07-31 506 <c#><winforms><type-conversion><decimal><opacity> 32399 c#
9 0 2008-07-31 1546 <c#><.net><datetime> 399006 c#
11 0 2008-07-31 1205 <c#><datetime><time><datediff><relative-time-s... 122973 c#
13 0 2008-08-01 495 <javascript><html><browser><timezone><timezone... 141997 javascript
16 0 2008-08-01 94 <c#><linq><web-services><.net-3.5> 75421 c#

Now we can group the data by date and turn tags into columns with post counts, i.e. performing aggregating pivot. In both Dask and Pandas we have method pivot_table for that. While in Pandas the method has very little constrains, in Dask we need to prepare the data as we did above. Namely, we need to have the date column already in the right format and the columns we are pivoting on need to be categorical of known category as above; if the categories are unknown you can use function cat.to_known. Since we did the computation already, we can make the pivot as follows. Note that this is a hard computation and, thus, the DataFrame structure is not explicitly known as you can see below.

post_pivot = post_tag_join.pivot_table(
index='creation_date',
columns='tag',
values='score',
aggfunc='count')

post_pivot

tag

We can compute this DataFrame to Pandas as the result is relatively small.

with ProgressBar():
post_pivot_df = post_pivot.compute()


[########################################] | 100% Completed |  1min 45.5s

tag c c# c++ java javascript python r
creation_date
2008-07-31 0.0 3.0 0.0 0.0 0.0 0.0 0.0
2008-08-01 2.0 8.0 2.0 2.0 1.0 0.0 0.0
2008-08-02 0.0 4.0 1.0 2.0 0.0 4.0 0.0
2008-08-03 0.0 5.0 2.0 0.0 1.0 5.0 0.0
2008-08-04 2.0 10.0 4.0 0.0 1.0 3.0 0.0

In the resulting DataFrame the days without any mentions of tags are filled with NaN.

Having the result we can plot it easily with Pandas.

post_pivot_df.plot();


As before, the by-day plot is not too readable. We can again resample the data and aggregate the counts by month and plot.

post_pivot_df.index = pd.DatetimeIndex(post_pivot_df.index)

tags_by_month = post_pivot_df.resample('MS').sum()

tag c c# c++ java javascript python r
creation_date
2008-07-01 0.0 3.0 0.0 0.0 0.0 0.0 0.0
2008-08-01 86.0 505.0 163.0 221.0 163.0 126.0 0.0
2008-09-01 330.0 1649.0 766.0 1146.0 640.0 546.0 6.0
2008-10-01 306.0 1994.0 815.0 1154.0 724.0 514.0 0.0
2008-11-01 257.0 1732.0 739.0 966.0 577.0 454.0 1.0
tags_by_month.plot();


Now the plot is much more readable and we can see how these languages are becoming popular. In the results I didn’t discount for the grow of Stack Overflow popularity, which would be a good idea if this was a real analysis. These results are following similar observations from Stack Overflow Developer Survey Results 2017 and other analysis of Python popularity increase.

I hope with this post you can see that Dask is interesting almost drop-in substitute of Pandas for biggish data. While it is not fully mature and sometimes requires a few more steps, it allows Pandas user to quickly start working on bigger (or even big) data sets with rather flat learning curve. Also, you don’t need to start a cluster or jump into other programming language to perform the required analysis.

# How can we help?

Drop us a line and we'll respond as soon as possible.

##### Call Us
(+48) 22 203 56 00
Nowogrodzka 62C, Warsaw, Poland
office@sigdelta.com
UTC / GMT +1