Posted By Jakub Nowacki, 05 January 2018
In the previous Dask post we’ve looked into basic data extraction using Dask. In this post we’ll follow up the problem and show how to perform more complex tasks with Dask in a similar way as we’d do in Pandas but on a larger data set.
First we need to import some Dask elements that we will use and load the data;
see the previous Dask post for he data extraction
details. Note that I set the index to id
as we will use it in the later date.
import dask
import dask.bag as db
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import pandas as pd
import re
dask.set_options(get=dask.multiprocessing.get)
%matplotlib inline
posts = dd.read_parquet('data/posts_tags.parq', engine='fastparquet')\
.set_index('id')
posts
creation_date | score | tags | view_count | |
---|---|---|---|---|
npartitions=100 | ||||
4 | datetime64[ns] | int64 | object | int64 |
642315 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
45537494 | ... | ... | ... | ... |
45901673 | ... | ... | ... | ... |
First, lets see how many posts we have per day. Since Dask implements Pandas’
timeseries features under accessor dt
, we can just extract date from
creation_date
, which is a timestamp type, and count the number of occurrences.
posts_count = posts.creation_date.dt.date.value_counts()
posts_count
Dask Series Structure:
npartitions=1
int64
...
Name: creation_date, dtype: int64
Dask Name: value-counts-agg, 616 tasks
with ProgressBar():
posts_count_df = posts_count.compute(get=dask.multiprocessing.get)
posts_count_df.head()
[########################################] | 100% Completed | 25.0s
2017-08-23 9531
2017-07-27 9450
2017-08-24 9366
2017-08-03 9345
2017-03-22 9342
Name: creation_date, dtype: int64
Note, that this was quite quick despite the data volume. This is due to using Parquet, as it stores statistics about the columns. Having the result as Pandas DataFrame, we can plot it.
posts_count_df.plot();
The plot seems fine, but since we have about 9 years of posts, plotting every
day value is a bit messy. We can use Pandas resampling features to aggregate
counts by month. To do that, we need first to cast the index to DatetimeIndex
as extracting date turns it into string.
posts_count_df.index = pd.DatetimeIndex(posts_count_df.index)
Now we can resample the data by month (start) and sum all the counts for the
day; note that you may use count
here as well but this will really count how
many days in a month we have, not the count of posts, as we already performed
this aggregate.
posts_by_month = posts_count_df.resample('MS').sum()\
.rename('count').to_frame()
posts_by_month.head()
count | |
---|---|
2008-07-01 | 4 |
2008-08-01 | 3927 |
2008-09-01 | 14522 |
2008-10-01 | 14925 |
2008-11-01 | 12957 |
posts_by_month.plot();
The resulting plot is much more readable than the previous one. We can clearly see that Stack Overflow becomes more popular with time.
We can also perform a year grouping since we already have a timeseries index. This time we just access index directly and it has the year property already.
posts_count_df.groupby(posts_count_df.index.year).sum()\
.rename('count').to_frame().plot.bar();
As expected, we have similar but more clear plot for a year-to-year comparison. It is now more visible, that while the posts number is on the rise, it is less sharp since 2013.
Lets now focus on the actual task we wanted to perform, i.e. looking into tags
popularity. Since the number of posts is quite large, lets first look into tags
file Tags.xml
. Since we didn’t save the result, lets use a similar data
extraction we performed the previous Dask post.
def extract_column_value(line, col_name, cast_type=str):
pattern_tpl = r'{col}="([^"]*)"'
pattern = pattern_tpl.format(col=col_name)
match = re.search(pattern, line)
if cast_type == int:
null_value = 0
else:
null_value = None
return cast_type(match[1]) if match is not None else null_value
def extract_tags_columns(line):
row = {
'tag_name': extract_column_value(line, 'TagName', str),
'count': extract_column_value(line, 'Count', int),
}
return row
tags = db.read_text('data/Tags.xml')\
.filter(lambda l: l.find('<row') > 0)\
.map(extract_tags_columns)\
.to_dataframe().set_index('tag_name').compute()
print(tags.shape)
tags.head(10)
(50812, 1)
count | |
---|---|
tag_name | |
.a | 104 |
.app | 116 |
.aspxauth | 50 |
.bash-profile | 536 |
.class-file | 212 |
.d.ts | 5 |
.doc | 116 |
.emf | 59 |
.git-folder | 6 |
.git-info-grafts | 5 |
There is a large number of tags, namely over 50 000, hence, we’d better focus on a subset of them. Lets say we’d like to see how the languages popularity grows with time. Since if more people uses the language, there is likely more questions about it, we can just assume, that the post count gives us the measure of popularity of a language.
Below, I selected a number of popular programming languages; note that this selection is rather subjective and can be changed with some other one.
langs = ['c', 'c++', 'c#', 'java', 'javascript', 'python', 'r']
tags.loc[langs, :]
count | |
---|---|
tag_name | |
c | 266262 |
c++ | 545591 |
c# | 1161054 |
java | 1347927 |
javascript | 1519901 |
python | 861621 |
r | 213841 |
Just by looking into the tags file we can see that the tags indeed exist, so we can use them on the posts data.
Lets see how the tags are embedded into posts DataFrame.
posts.head()
creation_date | score | tags | view_count | |
---|---|---|---|---|
id | ||||
4 | 2008-07-31 21:42:52.667 | 506 | <c#><winforms><type-conversion><decimal><opacity> | 32399 |
6 | 2008-07-31 22:08:08.620 | 223 | <html><css><css3><internet-explorer-7> | 14997 |
9 | 2008-07-31 23:40:59.743 | 1546 | <c#><.net><datetime> | 399006 |
11 | 2008-07-31 23:55:37.967 | 1205 | <c#><datetime><time><datediff><relative-time-s... | 122973 |
13 | 2008-08-01 00:42:38.903 | 495 | <javascript><html><browser><timezone><timezone... | 141997 |
Since tags are in a string format, we need to extract and flatten them, to have
date-tag mapping. Fortunately, Dask implements the string accessor str
, known
from Pandas.
Here we use extractall
function, which will list all the matches of the
regular expression pattern.
post_tags = posts.tags.str.lower()\
.str.extractall('<([^>]*)>')\
.rename(columns={0: 'tag'})
post_tags.head()
tag | ||
---|---|---|
id | match | |
4 | 0 | c# |
1 | winforms | |
2 | type-conversion | |
3 | decimal | |
4 | opacity |
Note that this gives us MultiIndex
index, which has currently mixed support in
Dask; e.g. see the issue with
reset_index
.
Since we want only to select the subset of tags, i.e only the tags in the
prevously defined language list. For this operation we can use
where
method. Note that this will replace the
False
conditions with other
argument, NaN
by default. Thus we need to also
remove NaN
values using dropna
. Since we will be using this Dask DataFrame a
few times later, we can use method
persist
to save the result in memory and not
recompute it all the time we want to use it.
with ProgressBar():
post_known_tags = post_tags.where(post_tags.tag.isin(langs))\
.dropna().persist()
post_known_tags.head()
[########################################] | 100% Completed | 8min 19.8s
tag | ||
---|---|---|
id | match | |
4 | 0 | c# |
9 | 0 | c# |
11 | 0 | c# |
13 | 0 | javascript |
16 | 0 | c# |
This operation takes about 8 min, but later the resulting DataFrame at this node of graph is already in memory, so if we use it as a starting point, the following computations will be not repeating this step. For example lets count the tags.
with ProgressBar():
known_tags_count = post_known_tags.tag.value_counts().to_frame().compute()
known_tags_count
[########################################] | 100% Completed | 24.3s
tag | |
---|---|
javascript | 1457900 |
java | 1303961 |
c# | 1129772 |
python | 806760 |
c++ | 529922 |
c | 257772 |
r | 198930 |
You can see that this took only few seconds, as the source DataFrame for computation is already in memory. Otherwise, it would again take more than 8 min.
Now we have tags but not the dates. To get this information we need to join the
tags DataFrame back to posts. Note that to perform a MultiIndex
join, you need
to have the indices named, otherwise it’ll fail. Also we perform two operations
needed for the next step. Namely, we extract the date from creation date, which
is timestamp, and turn the tags from a column of strings (object
) into
categorical type.
post_tag_join = posts.join(post_known_tags, how='inner')\
.assign(creation_date=lambda df: df.creation_date.dt.date)\
.assign(tag=lambda df: df.tag.astype('category'))
post_tag_join
creation_date | score | tags | view_count | tag | |
---|---|---|---|---|---|
npartitions=100 | |||||
4 | object | int64 | object | int64 | category[known] |
642315 | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | ... |
45537494 | ... | ... | ... | ... | ... |
45901673 | ... | ... | ... | ... | ... |
Again, we can persist it for convenience.
with ProgressBar():
post_tag_join = post_tag_join.persist()
[########################################] | 100% Completed | 5min 44.8s
post_tag_join.head()
creation_date | score | tags | view_count | tag | ||
---|---|---|---|---|---|---|
id | match | |||||
4 | 0 | 2008-07-31 | 506 | <c#><winforms><type-conversion><decimal><opacity> | 32399 | c# |
9 | 0 | 2008-07-31 | 1546 | <c#><.net><datetime> | 399006 | c# |
11 | 0 | 2008-07-31 | 1205 | <c#><datetime><time><datediff><relative-time-s... | 122973 | c# |
13 | 0 | 2008-08-01 | 495 | <javascript><html><browser><timezone><timezone... | 141997 | javascript |
16 | 0 | 2008-08-01 | 94 | <c#><linq><web-services><.net-3.5> | 75421 | c# |
Now we can group the data by date and turn tags into columns with post counts,
i.e. performing aggregating pivot. In both Dask and Pandas we have method
pivot_table
for that. While in Pandas the
method has very little constrains, in Dask we need to prepare the data as we did
above. Namely, we need to have the date column already in the right format and
the columns we are pivoting on need to be categorical of known category as
above; if the categories are unknown you can use function cat.to_known
. Since
we did the computation already, we can make the pivot as follows. Note that this
is a hard computation and, thus, the DataFrame structure is not explicitly known
as you can see below.
post_pivot = post_tag_join.pivot_table(
index='creation_date',
columns='tag',
values='score',
aggfunc='count')
post_pivot
tag |
---|
We can compute this DataFrame to Pandas as the result is relatively small.
with ProgressBar():
post_pivot_df = post_pivot.compute()
post_pivot_df.head()
[########################################] | 100% Completed | 1min 45.5s
tag | c | c# | c++ | java | javascript | python | r |
---|---|---|---|---|---|---|---|
creation_date | |||||||
2008-07-31 | 0.0 | 3.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 |
2008-08-01 | 2.0 | 8.0 | 2.0 | 2.0 | 1.0 | 0.0 | 0.0 |
2008-08-02 | 0.0 | 4.0 | 1.0 | 2.0 | 0.0 | 4.0 | 0.0 |
2008-08-03 | 0.0 | 5.0 | 2.0 | 0.0 | 1.0 | 5.0 | 0.0 |
2008-08-04 | 2.0 | 10.0 | 4.0 | 0.0 | 1.0 | 3.0 | 0.0 |
In the resulting DataFrame the days without any mentions of tags are filled with
NaN
.
Having the result we can plot it easily with Pandas.
post_pivot_df.plot();
As before, the by-day plot is not too readable. We can again resample the data and aggregate the counts by month and plot.
post_pivot_df.index = pd.DatetimeIndex(post_pivot_df.index)
tags_by_month = post_pivot_df.resample('MS').sum()
tags_by_month.head()
tag | c | c# | c++ | java | javascript | python | r |
---|---|---|---|---|---|---|---|
creation_date | |||||||
2008-07-01 | 0.0 | 3.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 |
2008-08-01 | 86.0 | 505.0 | 163.0 | 221.0 | 163.0 | 126.0 | 0.0 |
2008-09-01 | 330.0 | 1649.0 | 766.0 | 1146.0 | 640.0 | 546.0 | 6.0 |
2008-10-01 | 306.0 | 1994.0 | 815.0 | 1154.0 | 724.0 | 514.0 | 0.0 |
2008-11-01 | 257.0 | 1732.0 | 739.0 | 966.0 | 577.0 | 454.0 | 1.0 |
tags_by_month.plot();
Now the plot is much more readable and we can see how these languages are becoming popular. In the results I didn’t discount for the grow of Stack Overflow popularity, which would be a good idea if this was a real analysis. These results are following similar observations from Stack Overflow Developer Survey Results 2017 and other analysis of Python popularity increase.
I hope with this post you can see that Dask is interesting almost drop-in substitute of Pandas for biggish data. While it is not fully mature and sometimes requires a few more steps, it allows Pandas user to quickly start working on bigger (or even big) data sets with rather flat learning curve. Also, you don’t need to start a cluster or jump into other programming language to perform the required analysis.
Notebook with the above computations is available for download here.