Blog

Dask - a new elephant in the room

Posted By Jakub Nowacki, 20 December 2017

While Big Data is with us for a while, long enough to become almost a cliche, its world was largely dominated by Java and related tools and languages. This became an entry barrier for many people not familiar with these technologies, which were mostly engineering-centric. The first glimpse of hope came with Hive’s SQL and Pig’s (pig) Latin. Later Spark came with Python and R support, SQL interface, and with some DataFrames along the way. Now we see a rise of many new and useful Big Data processing technologies, often SQL-based, which made working with big data sets much easier.

Nonetheless, there are two major problems with these technologies: they require some form of infrastructure. Even for the serverless ones you need to be in the cloud, and they bring along new interfaces that we need to master. While I am a great fan of Spark and other Big Data technologies, I can understand that it can be sometimes too heavy for people not knowing their way around JVM, especially if something goes wrong, as the messages are often taken out directly from JVM. Moreover, many companies are in fact dealing with a bigish data problem, as opposed to big data, as they work with data sets up to ca. 1 TB, often even tens of GBs which are hard to process in regular fashion, but are too small to justify getting new infrastructure and learning new tools. This is when Dask comes in.

Dask basics

Following its web page, Dask is a flexible parallel computing library for analytic computing written in Python. You may say that it is another Hadoop/Spark clone, but it is much more. First and foremost it is a tool designed to work on a single machine, harnessing its full power and perform out-of- core computing, i.e. working on larger than memory data sets. It does that using an approach similar to Spark, by lazily constructing directed acyclic graph (DAG) of tasks and splitting them into small portions called partitions. See the below image from Dask’s web page for illustration.

It has three main interfaces, that closely mimic the ones already familiar for Python audience:

While it can work on a distributed cluster, in this instance we will focus mostly on its local abilities, as in my opinion they, apart from the familiar notion, give Dask its competitive edge.

Also, a note for Windows users: while Spark can be setup on a Windows machine as I’ve shown in the previous post, Dask is even easier to get as it only relies on NumPy and Pandas, which you likely have with your Python distribution; also Anaconda has it as a standard. Otherwise, you can use the usual pip (or conda) install for it:

pip install dask

Also, you may want to install graphviz, if you want to see the graph task visualization, and fastparquet for saving data as Parquet files.

Stack Exchange data set

To have something larger to analyze lets get the Stack Exchange data set posted on Archive.org. The files are in XML format, compressed using 7-zip; see readme.txt for details. The problem with this compression is that Python does not support 7-zip files, thus you need to extract them manually. For now we’re going to need two files:

At the date of writing this post, the Posts.xml file has about 57GB.

Data extraction

Since the files are XMLs, they are not directly readable via Pandas or Dask in a tabular form. Thus, we use the bag interface to read them as a semi-structured text files line by line. Lets start from the Tags.xml file as it is much smaller. Lets read the file in and see how it looks.

import dask
import dask.bag as db
import dask.dataframe as dd
from dask.dot import dot_graph
from dask.diagnostics import ProgressBar

import pandas as pd
import re
import html
from datetime import datetime

# We set multiprocessing scheduler globally to speed up the computations.
# This is not required but helps in this case.
# For details see: http://dask.pydata.org/en/latest/scheduler-overview.html
dask.set_options(get=dask.multiprocessing.get)

%matplotlib inline
tags_xml = db.read_text('data/Tags.xml', encoding='utf-8')
tags_xml.take(10)
('\ufeff<?xml version="1.0" encoding="utf-8"?>\n',
 '<tags>\n',
 '  <row Id="1" TagName=".net" Count="257092" ExcerptPostId="3624959" WikiPostId="3607476" />\n',
 '  <row Id="2" TagName="html" Count="683981" ExcerptPostId="3673183" WikiPostId="3673182" />\n',
 '  <row Id="3" TagName="javascript" Count="1457944" ExcerptPostId="3624960" WikiPostId="3607052" />\n',
 '  <row Id="4" TagName="css" Count="490198" ExcerptPostId="3644670" WikiPostId="3644669" />\n',
 '  <row Id="5" TagName="php" Count="1114030" ExcerptPostId="3624936" WikiPostId="3607050" />\n',
 '  <row Id="8" TagName="c" Count="257777" ExcerptPostId="3624961" WikiPostId="3607013" />\n',
 '  <row Id="9" TagName="c#" Count="1129802" ExcerptPostId="3624962" WikiPostId="3607007" />\n',
 '  <row Id="10" TagName="c++" Count="529933" ExcerptPostId="3624963" WikiPostId="3606997" />\n')

As we can see, we are interested in the row lines, thus, drop first anything else.

tags_rows = tags_xml.filter(lambda line: line.find('<row') >= 0)
tags_rows.take(10)
('  <row Id="1" TagName=".net" Count="257092" ExcerptPostId="3624959" WikiPostId="3607476" />\n',
 '  <row Id="2" TagName="html" Count="683981" ExcerptPostId="3673183" WikiPostId="3673182" />\n',
 '  <row Id="3" TagName="javascript" Count="1457944" ExcerptPostId="3624960" WikiPostId="3607052" />\n',
 '  <row Id="4" TagName="css" Count="490198" ExcerptPostId="3644670" WikiPostId="3644669" />\n',
 '  <row Id="5" TagName="php" Count="1114030" ExcerptPostId="3624936" WikiPostId="3607050" />\n',
 '  <row Id="8" TagName="c" Count="257777" ExcerptPostId="3624961" WikiPostId="3607013" />\n',
 '  <row Id="9" TagName="c#" Count="1129802" ExcerptPostId="3624962" WikiPostId="3607007" />\n',
 '  <row Id="10" TagName="c++" Count="529933" ExcerptPostId="3624963" WikiPostId="3606997" />\n',
 '  <row Id="12" TagName="ruby" Count="186002" ExcerptPostId="3624964" WikiPostId="3607043" />\n',
 '  <row Id="14" TagName="lisp" Count="5160" ExcerptPostId="3656743" WikiPostId="3656742" />\n')

Now having the lines filtered, we can begin the extraction. The first function extracts the value for a given row attribute, i.e. a column value. Note that while float and str can handle null (None) values when transforming to DataFrame, int does not have a notion of null, as NaN in NumPy is in fact a float. Thus, we fix the value to 0 for int, which is rather safe in this case.

def extract_column_value(line, col_name, cast_type=str):
    pattern_tpl = r'{col}="([^"]*)"'
    pattern = pattern_tpl.format(col=col_name)
    match = re.search(pattern, line)

    if cast_type == int:
        null_value = 0
    else:
        null_value = None

    return cast_type(match[1]) if match is not None else null_value

test_tags_row = '<row Id="1" TagName=".net" Count="257092" ExcerptPostId="3624959" WikiPostId="3607476" />'
print(extract_column_value(test_tags_row, 'TagName'))
print(extract_column_value(test_tags_row, 'TagName'))
print(extract_column_value(test_tags_row, 'NotHere', str))
print(extract_column_value(test_tags_row, 'NotHere', int))
.net
.net
None
0

The second value extracts specific rows and creates dictionary with column names and values.

def extract_tags_columns(line):
    row = {
        'id': extract_column_value(line, 'Id', int),
        'tag_name': extract_column_value(line, 'TagName', str),
        'count': extract_column_value(line, 'Count', int),
    }
    return row

extract_tags_columns(test_tags_row)
{'count': 257092, 'id': 1, 'tag_name': '.net'}

This format will now allow us to construct a Dask DataFrame as follows.

tags = tags_rows.map(extract_tags_columns).to_dataframe()
tags
Dask DataFrame Structure:
count id tag_name
npartitions=1
int64 int64 object
... ... ...
Dask Name: to_dataframe, 3 tasks

All Dask objects have partitions inside, which can be seen using npartitions parameter.

tags.npartitions
1

Also, Dask task DAG is available under dask parameter.

tags.dask
{('bag-from-delayed-file_to_blocks-filter-lambda-list-map-extract_tags_columns-8ab64c29da24bc2e2d19d70b7c203708',
  0): (<function dask.bag.core.reify>,
  (<function dask.bag.core.map_chunk>,
   <function __main__.extract_tags_columns>,
   [(filter,
     <function __main__.<lambda>>,
     (<function dask.bag.text.file_to_blocks>,
      <dask.bytes.core.OpenFile at 0x1abe7dc2048>))],
   None,
   {})),
 ('map-extract_tags_columns-8ab64c29da24bc2e2d19d70b7c203708',
  0): ('bag-from-delayed-file_to_blocks-filter-lambda-list-map-extract_tags_columns-8ab64c29da24bc2e2d19d70b7c203708', 0),
 ('to_dataframe-90a9541780f3c0286889ab1884ce22e8',
  0): (<function dask.bag.core.to_dataframe>, ('map-extract_tags_columns-8ab64c29da24bc2e2d19d70b7c203708',
   0), ['count', 'id', 'tag_name'], {'count': dtype('int64'),
   'id': dtype('int64'),
   'tag_name': dtype('O')})}

We can visualize it using the available functions.

dot_graph(tags.dask)

png

Though the file Tags.xml is quite small, you likely noticed that we’ve never really done the calculations but just lazily declared the next steps of the DAG. To actually perform the computations we need to perform some action, which asks for a result; to get a Pandas DataFrame out of the Dask one, we can use method compute.

tags_df = tags.compute()
print(type(tags_df))
tags_df.head()
<class 'pandas.core.frame.DataFrame'>
count id tag_name
0 257092 1 .net
1 683981 2 html
2 1457944 3 javascript
3 490198 4 css
4 1114030 5 php

To just get a portion of the data to check if the computations are done correctly, we can use head (or tail) method, as getting the whole DataFrame may overload the memory.

tags.head(10)
count id tag_name
0 257092 1 .net
1 683981 2 html
2 1457944 3 javascript
3 490198 4 css
4 1114030 5 php
5 257777 8 c
6 1129802 9 c#
7 529933 10 c++
8 186002 12 ruby
9 5160 14 lisp

Now lets perform similar extraction and cleaning for Posts.xml. We can also read it line by line and extract the data. Note that I use option blocksize, which specifies the number of bytes to be read from the text file in one block. Note that this takes into account line endings.

posts_xml = db.read_text('data/Posts.xml', encoding='utf-8', blocksize=1e8)
posts_xml
dask.bag<bag-fro..., npartitions=582>

We can inspect the lines sample as follows.

posts_xml.take(5)
('\ufeff<?xml version="1.0" encoding="utf-8"?>\r\n',
 '<posts>\r\n',
 '  <row Id="4" PostTypeId="1" AcceptedAnswerId="7" CreationDate="2008-07-31T21:42:52.667" Score="506" ViewCount="32399" Body="&lt;p&gt;I want to use a track-bar to change a form\'s opacity.&lt;/p&gt;&#xA;&#xA;&lt;p&gt;This is my code:&lt;/p&gt;&#xA;&#xA;&lt;pre&gt;&lt;code&gt;decimal trans = trackBar1.Value / 5000;&#xA;this.Opacity = trans;&#xA;&lt;/code&gt;&lt;/pre&gt;&#xA;&#xA;&lt;p&gt;When I build the application, it gives the following error:&lt;/p&gt;&#xA;&#xA;&lt;blockquote&gt;&#xA;  &lt;p&gt;Cannot implicitly convert type \'decimal\' to \'double\'.&lt;/p&gt;&#xA;&lt;/blockquote&gt;&#xA;&#xA;&lt;p&gt;I tried using &lt;code&gt;trans&lt;/code&gt; and &lt;code&gt;double&lt;/code&gt; but then the control doesn\'t work. This code worked fine in a past VB.NET project. &lt;/p&gt;&#xA;" OwnerUserId="8" LastEditorUserId="126970" LastEditorDisplayName="Rich B" LastEditDate="2017-03-10T15:18:33.147" LastActivityDate="2017-03-10T15:18:33.147" Title="While applying opacity to a form should we use a decimal or double value?" Tags="&lt;c#&gt;&lt;winforms&gt;&lt;type-conversion&gt;&lt;decimal&gt;&lt;opacity&gt;" AnswerCount="13" CommentCount="5" FavoriteCount="37" CommunityOwnedDate="2012-10-31T16:42:47.213" />\r\n',
 '  <row Id="6" PostTypeId="1" AcceptedAnswerId="31" CreationDate="2008-07-31T22:08:08.620" Score="223" ViewCount="14997" Body="&lt;p&gt;I have an absolutely positioned &lt;code&gt;div&lt;/code&gt; containing several children, one of which is a relatively positioned &lt;code&gt;div&lt;/code&gt;. When I use a &lt;strong&gt;percentage-based width&lt;/strong&gt; on the child &lt;code&gt;div&lt;/code&gt;, it collapses to \'0\' width on &lt;a href=&quot;http://en.wikipedia.org/wiki/Internet_Explorer_7&quot; rel=&quot;noreferrer&quot;&gt;Internet&amp;nbsp;Explorer&amp;nbsp;7&lt;/a&gt;, but not on Firefox or Safari.&lt;/p&gt;&#xA;&#xA;&lt;p&gt;If I use &lt;strong&gt;pixel width&lt;/strong&gt;, it works. If the parent is relatively positioned, the percentage width on the child works.&lt;/p&gt;&#xA;&#xA;&lt;ol&gt;&#xA;&lt;li&gt;Is there something I\'m missing here?&lt;/li&gt;&#xA;&lt;li&gt;Is there an easy fix for this besides the &lt;em&gt;pixel-based width&lt;/em&gt; on the&#xA;child?&lt;/li&gt;&#xA;&lt;li&gt;Is there an area of the CSS specification that covers this?&lt;/li&gt;&#xA;&lt;/ol&gt;&#xA;" OwnerUserId="9" LastEditorUserId="63550" LastEditorDisplayName="Rich B" LastEditDate="2016-03-19T06:05:48.487" LastActivityDate="2016-03-19T06:10:52.170" Title="Percentage width child element in absolutely positioned parent on Internet Explorer 7" Tags="&lt;html&gt;&lt;css&gt;&lt;css3&gt;&lt;internet-explorer-7&gt;" AnswerCount="5" CommentCount="0" FavoriteCount="8" />\r\n',
 '  <row Id="7" PostTypeId="2" ParentId="4" CreationDate="2008-07-31T22:17:57.883" Score="363" Body="&lt;p&gt;An explicit cast to double isn\'t necessary.&lt;/p&gt;&#xA;&#xA;&lt;pre&gt;&lt;code&gt;double trans = (double)trackBar1.Value / 5000.0;&#xA;&lt;/code&gt;&lt;/pre&gt;&#xA;&#xA;&lt;p&gt;Identifying the constant as &lt;code&gt;5000.0&lt;/code&gt; (or as &lt;code&gt;5000d&lt;/code&gt;) is sufficient:&lt;/p&gt;&#xA;&#xA;&lt;pre&gt;&lt;code&gt;double trans = trackBar1.Value / 5000.0;&#xA;double trans = trackBar1.Value / 5000d;&#xA;&lt;/code&gt;&lt;/pre&gt;&#xA;" OwnerUserId="9" LastEditorUserId="967315" LastEditDate="2012-10-14T11:50:16.703" LastActivityDate="2012-10-14T11:50:16.703" CommentCount="0" />\r\n')

Again, we filter out anything that is not XML row.

posts_rows = posts_xml.filter(lambda line: line.find('<row') >= 0)

posts_rows.take(5)
('  <row Id="4" PostTypeId="1" AcceptedAnswerId="7" CreationDate="2008-07-31T21:42:52.667" Score="506" ViewCount="32399" Body="&lt;p&gt;I want to use a track-bar to change a form\'s opacity.&lt;/p&gt;&#xA;&#xA;&lt;p&gt;This is my code:&lt;/p&gt;&#xA;&#xA;&lt;pre&gt;&lt;code&gt;decimal trans = trackBar1.Value / 5000;&#xA;this.Opacity = trans;&#xA;&lt;/code&gt;&lt;/pre&gt;&#xA;&#xA;&lt;p&gt;When I build the application, it gives the following error:&lt;/p&gt;&#xA;&#xA;&lt;blockquote&gt;&#xA;  &lt;p&gt;Cannot implicitly convert type \'decimal\' to \'double\'.&lt;/p&gt;&#xA;&lt;/blockquote&gt;&#xA;&#xA;&lt;p&gt;I tried using &lt;code&gt;trans&lt;/code&gt; and &lt;code&gt;double&lt;/code&gt; but then the control doesn\'t work. This code worked fine in a past VB.NET project. &lt;/p&gt;&#xA;" OwnerUserId="8" LastEditorUserId="126970" LastEditorDisplayName="Rich B" LastEditDate="2017-03-10T15:18:33.147" LastActivityDate="2017-03-10T15:18:33.147" Title="While applying opacity to a form should we use a decimal or double value?" Tags="&lt;c#&gt;&lt;winforms&gt;&lt;type-conversion&gt;&lt;decimal&gt;&lt;opacity&gt;" AnswerCount="13" CommentCount="5" FavoriteCount="37" CommunityOwnedDate="2012-10-31T16:42:47.213" />\r\n',
 '  <row Id="6" PostTypeId="1" AcceptedAnswerId="31" CreationDate="2008-07-31T22:08:08.620" Score="223" ViewCount="14997" Body="&lt;p&gt;I have an absolutely positioned &lt;code&gt;div&lt;/code&gt; containing several children, one of which is a relatively positioned &lt;code&gt;div&lt;/code&gt;. When I use a &lt;strong&gt;percentage-based width&lt;/strong&gt; on the child &lt;code&gt;div&lt;/code&gt;, it collapses to \'0\' width on &lt;a href=&quot;http://en.wikipedia.org/wiki/Internet_Explorer_7&quot; rel=&quot;noreferrer&quot;&gt;Internet&amp;nbsp;Explorer&amp;nbsp;7&lt;/a&gt;, but not on Firefox or Safari.&lt;/p&gt;&#xA;&#xA;&lt;p&gt;If I use &lt;strong&gt;pixel width&lt;/strong&gt;, it works. If the parent is relatively positioned, the percentage width on the child works.&lt;/p&gt;&#xA;&#xA;&lt;ol&gt;&#xA;&lt;li&gt;Is there something I\'m missing here?&lt;/li&gt;&#xA;&lt;li&gt;Is there an easy fix for this besides the &lt;em&gt;pixel-based width&lt;/em&gt; on the&#xA;child?&lt;/li&gt;&#xA;&lt;li&gt;Is there an area of the CSS specification that covers this?&lt;/li&gt;&#xA;&lt;/ol&gt;&#xA;" OwnerUserId="9" LastEditorUserId="63550" LastEditorDisplayName="Rich B" LastEditDate="2016-03-19T06:05:48.487" LastActivityDate="2016-03-19T06:10:52.170" Title="Percentage width child element in absolutely positioned parent on Internet Explorer 7" Tags="&lt;html&gt;&lt;css&gt;&lt;css3&gt;&lt;internet-explorer-7&gt;" AnswerCount="5" CommentCount="0" FavoriteCount="8" />\r\n',
 '  <row Id="7" PostTypeId="2" ParentId="4" CreationDate="2008-07-31T22:17:57.883" Score="363" Body="&lt;p&gt;An explicit cast to double isn\'t necessary.&lt;/p&gt;&#xA;&#xA;&lt;pre&gt;&lt;code&gt;double trans = (double)trackBar1.Value / 5000.0;&#xA;&lt;/code&gt;&lt;/pre&gt;&#xA;&#xA;&lt;p&gt;Identifying the constant as &lt;code&gt;5000.0&lt;/code&gt; (or as &lt;code&gt;5000d&lt;/code&gt;) is sufficient:&lt;/p&gt;&#xA;&#xA;&lt;pre&gt;&lt;code&gt;double trans = trackBar1.Value / 5000.0;&#xA;double trans = trackBar1.Value / 5000d;&#xA;&lt;/code&gt;&lt;/pre&gt;&#xA;" OwnerUserId="9" LastEditorUserId="967315" LastEditDate="2012-10-14T11:50:16.703" LastActivityDate="2012-10-14T11:50:16.703" CommentCount="0" />\r\n',
 '  <row Id="9" PostTypeId="1" AcceptedAnswerId="1404" CreationDate="2008-07-31T23:40:59.743" Score="1546" ViewCount="399006" Body="&lt;p&gt;Given a &lt;code&gt;DateTime&lt;/code&gt; representing a person\'s birthday, how do I calculate their age in years?  &lt;/p&gt;&#xA;" OwnerUserId="1" LastEditorUserId="6025198" LastEditorDisplayName="Rich B" LastEditDate="2017-05-10T14:44:11.947" LastActivityDate="2017-08-01T00:04:18.453" Title="Calculate age in C#" Tags="&lt;c#&gt;&lt;.net&gt;&lt;datetime&gt;" AnswerCount="60" CommentCount="8" FavoriteCount="344" CommunityOwnedDate="2011-08-16T19:40:43.080" />\r\n',
 '  <row Id="11" PostTypeId="1" AcceptedAnswerId="1248" CreationDate="2008-07-31T23:55:37.967" Score="1205" ViewCount="122973" Body="&lt;p&gt;Given a specific &lt;code&gt;DateTime&lt;/code&gt; value, how do I display relative time, like:&lt;/p&gt;&#xA;&#xA;&lt;ul&gt;&#xA;&lt;li&gt;2 hours ago&lt;/li&gt;&#xA;&lt;li&gt;3 days ago&lt;/li&gt;&#xA;&lt;li&gt;a month ago&lt;/li&gt;&#xA;&lt;/ul&gt;&#xA;" OwnerUserId="1" LastEditorUserId="6479704" LastEditorDisplayName="user2370523" LastEditDate="2017-06-04T15:51:19.780" LastActivityDate="2017-06-14T11:20:47.150" Title="Calculate relative time in C#" Tags="&lt;c#&gt;&lt;datetime&gt;&lt;time&gt;&lt;datediff&gt;&lt;relative-time-span&gt;" AnswerCount="33" CommentCount="3" FavoriteCount="521" CommunityOwnedDate="2009-09-04T13:15:59.820" />\r\n')

We add a special casting function for string timestamp in ISO 8601 format with microseconds.

def str_to_datetime(timestamp):
    f = '%Y-%m-%dT%H:%M:%S.%f'
    return datetime.strptime(timestamp, f)

str_to_datetime('2008-07-31T23:55:37.967')
datetime.datetime(2008, 7, 31, 23, 55, 37, 967000)

Now we can construct a similar extracting function that returns row as a dictionary.

def extract_posts_columns(line):
    l = html.unescape(line)
    row = {
        'id': extract_column_value(l, 'Id', int),
        'creation_date': extract_column_value(l, 'CreationDate', str_to_datetime),
        'score': extract_column_value(l, 'Score', int),
        'view_count': extract_column_value(l, 'ViewCount', int),
        'tags': extract_column_value(l, 'Tags', str),
    }
    return row

test_posts_row = '  <row Id="4" PostTypeId="1" AcceptedAnswerId="7" CreationDate="2008-07-31T21:42:52.667" Score="506" ViewCount="32399" Body="&lt;p&gt;I want to use a track-bar to change a form\'s opacity.&lt;/p&gt;&#xA;&#xA;&lt;p&gt;This is my code:&lt;/p&gt;&#xA;&#xA;&lt;pre&gt;&lt;code&gt;decimal trans = trackBar1.Value / 5000;&#xA;this.Opacity = trans;&#xA;&lt;/code&gt;&lt;/pre&gt;&#xA;&#xA;&lt;p&gt;When I build the application, it gives the following error:&lt;/p&gt;&#xA;&#xA;&lt;blockquote&gt;&#xA;  &lt;p&gt;Cannot implicitly convert type \'decimal\' to \'double\'.&lt;/p&gt;&#xA;&lt;/blockquote&gt;&#xA;&#xA;&lt;p&gt;I tried using &lt;code&gt;trans&lt;/code&gt; and &lt;code&gt;double&lt;/code&gt; but then the control doesn\'t work. This code worked fine in a past VB.NET project. &lt;/p&gt;&#xA;" OwnerUserId="8" LastEditorUserId="126970" LastEditorDisplayName="Rich B" LastEditDate="2017-03-10T15:18:33.147" LastActivityDate="2017-03-10T15:18:33.147" Title="While applying opacity to a form should we use a decimal or double value?" Tags="&lt;c#&gt;&lt;winforms&gt;&lt;type-conversion&gt;&lt;decimal&gt;&lt;opacity&gt;" AnswerCount="13" CommentCount="5" FavoriteCount="37" CommunityOwnedDate="2012-10-31T16:42:47.213" />\n'
extract_posts_columns(test_posts_row)
{'creation_date': datetime.datetime(2008, 7, 31, 21, 42, 52, 667000),
 'id': 4,
 'score': 506,
 'tags': '<c#><winforms><type-conversion><decimal><opacity>',
 'view_count': 32399}

We may now incorporate it into our computations and make a Dask DataFrame out of it. Note that we skip all the posts without tags.

posts = posts_rows.map(extract_posts_columns)\
    .filter(lambda r: r['tags'] is not None)\
    .to_dataframe()
posts
Dask DataFrame Structure:
creation_date id score tags view_count
npartitions=582
datetime64[ns] int64 int64 object int64
... ... ... ... ...
... ... ... ... ... ...
... ... ... ... ...
... ... ... ... ...
Dask Name: to_dataframe, 1746 tasks

As usual, we can inspect DataFrame. Note that using anything other than head or tail here can cause memory overload as we read from a 57GB file.

posts.head()
creation_date id score tags view_count
0 2008-07-31 21:42:52.667 4 506 <c#><winforms><type-conversion><decimal><opacity> 32399
1 2008-07-31 22:08:08.620 6 223 <html><css><css3><internet-explorer-7> 14997
2 2008-07-31 23:40:59.743 9 1546 <c#><.net><datetime> 399006
3 2008-07-31 23:55:37.967 11 1205 <c#><datetime><time><datediff><relative-time-s... 122973
4 2008-08-01 00:42:38.903 13 495 <javascript><html><browser><timezone><timezone... 141997

Now, since we are happy with the data cleaning and reshaping, we can save the result in more convenient format. Parquet is ideal for the task as it has a large type support, is column oriented and has a compression apart from being binary. Because of these features and the fact that we stripped some data out of the original format, we repartition the DAG as it determines how many files will be stored. Also, we use fastparquet engine as it currently is default for Dask and has a slightly better support. Nonetheless, pyarrow engine, which uses Apache Arrow Parquet saving abilities, is getting much better with time as well and is also quite usable. To see the progress of computation we use a build-in progress bar.

with ProgressBar():
    posts.repartition(npartitions=100)\
        .to_parquet('data/posts_tags.parq', engine='fastparquet', compression='snappy')
[########################################] | 100% Completed | 24min 54.7s

On my laptop(i7-6700HQ, 16GB RAM, HDD for data saving) the computations take about 25 min, which is quite nice considering the size of file is 57GB. The result has just under 0.5GB, which will be much faster to read and process. Lets now read in the saved file and see if it works fine.

posts = dd.read_parquet('data/posts_tags.parq', engine='fastparquet')
posts
Dask DataFrame Structure:
creation_date id score tags view_count
npartitions=100
datetime64[ns] int64 int64 object int64
... ... ... ... ...
... ... ... ... ... ...
... ... ... ... ...
... ... ... ... ...
Dask Name: read-parquet, 100 tasks

Lets now do a sanity check and see how many posts we have per year. We can do the grouping operation directly on Dask DataFrame as we would do in Pandas. Also, the timeseries functionality, i.e. dt, is present in Dask, so we can group by year as follows.

with ProgressBar():
    posts_count = posts.groupby(posts.creation_date.dt.year)\
        .id.count()\
        .to_frame()\
        .rename(columns={'id': 'count'})\
        .compute()
posts_count
[########################################] | 100% Completed |  1min 29.6s
count
creation_date
2008 58651
2009 344836
2010 697794
2011 1204848
2012 1651370
2013 2069635
2014 2169905
2015 2226568
2016 2338379
2017 1696961
posts_count.plot.bar();

png

Note that in Dask it is even more important how you perform the computation. Much quicker alternative is using Series method value_counts, which gives the same result.

with ProgressBar():
    posts_count = posts.creation_date.dt.year.value_counts()\
    .to_frame()\
    .rename(columns={'creation_date': 'count'})\
    .compute()
posts_count
[########################################] | 100% Completed |  8.4s
count
2016 2338379
2015 2226568
2014 2169905
2013 2069635
2017 1696961
2012 1651370
2011 1204848
2010 697794
2009 344836
2008 58651

Also, since the resulting Parquet files for posts are quite small, only 0.5GB, we could read the files in as Pandas DataFrame. Indeed, support for Parquet has been added in Pandas version 0.21.0. But there are some differences. First, Pandas supports reading a single Parquet file, whereas, Dask most often creates many files, one per partition. We can bypass that reading files using Dask and use compute method directly creating Pandas DataFrame. Note that, the resulting object is quite large, about 2GB in this case, and some operations especially on Jupyter, like code completion, take quite long as they have to traverse through a large object. I also encountered some kernel deaths from time to time with large objects present. Nonetheless, with this size you can use Pandas as well.

I hope you’ve found Dask as interesting as I did. In the near future we will continue to show more Dask features and monitor its progress.

Notebook with the above computations is available for download here.

Contact

How can we help?

Drop us a line and we'll respond as soon as possible.

Treble