Spark Dataframes

Introduction

Dataframes represent a more structured form of Spark data. RDDs were essentially collections of things. They didn’t need to be the same thing or have the same properties. It is obviously harder to work with a heterogeneous collections of things, but the point still stands, you could if you wanted to.

Dataframes are more analogous to tables in a relational database. They have schemas and named columns. So your data should all have the same number of attributes and be of the same type. You cannot mix strings and integers. Also dataframes are immutable you cannot insert or delete from a dataframe once it is created.

This obviously makes working with dataframes a bit easier, things tend to just work without much extra thought. It also makes getting things into a dataframe trickier because if you haven’t standardized all your records in Python you can end up with things like missing attributes or mismatched types.

Creating Dataframes

Frome Files

You can see the Spark documentation here on the various ways to load data into dataframes. If you have a properly formatted JSON or CSV file this is a good way to go. The DataFrameReader can parse these without much issue and JSON files can either be one JSON object per line or one per file. It can also load from other sources but we generally didn’t cover them in this class so we won’t go over them here.

From RDDs

If you can load from a file, great, go do that, it is better and easier. But what if you do n’t have that luxury. For this module we will be working with the BigQuery public GitHub sample commits table. This table has around 2.5GB of GitHub commits. It has 12 properties, some of which have nested values.

Google provides an adaptor to load a BigQuery table into a Spark RDD but not a Spark Dataframe. This is where things get interesting.

To load the RDD into a Dataframe we need to have each record be a pyspark.sql.Row but we need to be careful about this. If an attribute is NULL in BigQuery, for example, if a commit does not have an encoding specified, then the property simply does not exist in the JSON object. The JSON spec does support null values, but the BigQuery adaptor does not leverage them.

So if you remember what we said earlier about this being a table with a specified number of rows, you can see we might have a problem. Not every commit has an encoding specified so some of the dicts parsed from the JSON will have 11 values, others will have 12.

At this point you could go one of two ways, you could add None values to the dicts that are missing those keys or you could delete the keys from the dicts which have them.

Here is an example of a function that does both

def normalizeKeys(commit):
    key_string = "author commit committer difference message parent repo_name subject trailer tree"
    for k in key_string.split():
        if k not in commit:
            commit[k] = None
    for dict_key in commit.keys():
        if dict_key not in key_string:
            commit.pop(dict_key)
    return commit

The string represents all the keys we want in a particular commit record. If the commit is missing a key that is in the list, we add it and set the value to None. If it has a key that we are not interested in we pop it, removing it from the dict.

This makes sure that we have only the keys we are interested in and that every dict has a value associated with that key.

So we need to map that function to our RDD and then map the Row constructor to it.

rows = json_commits.map(lambda x: normalizeKeys(x)).map(lambda x: Row(**x))

So what is that **x business? The Row constructor has a **kwargs attribute as one of the arguments you can pass into it, in other words the constructor looks like this def __new__(self,**kwargs):. What this means is that it can take an arbitrary number of named attributes like Row(foo=1, bar=2, baz='apple') and then those can be accessed like as a dict inside the function.

The problem is we are already passing it a dict. So putting a ** infront of the dict ‘unpacks’ it into a list of keyword arguments. So {foo=1, bar=2} bascially loses the braces and is passed to the function as foo=1, bar=2.

If that was too confusing, just remember when you are passing a dict to the Row constructor, put ** in front of it.

But after that map, we have a RDD of rows.

df = spark.createDataFrame(rows)

gives us a DataFrame from that RDD.

Dataframe Operations

Great, we have a dataframe, now what do we do with it? Well we can do the typical SQL stuff to it. We can get the top 5 authors by number of commits for example

df.groupBy('author.name').count().orderBy(['count'], ascending=False).limit(5).collect()

It is a little messy, but it should be fairly clear what is going on. We are grouping results by author.name and using the count aggregate function to see how many commits each author has. We then order by the count column and limit the result set to 5. This is basically equivalent to

SELECT author.name, COUNT(*) AS count FROM df GROUP BY author.name ORDER BY count DESC LIMIT 5;

We get back the following result set

[Row(name=u'Linus Torvalds', count=20776), Row(name=u'David S. Miller', count=7552), Row(name=u'Mark Brown', count=6466), Row(name=u'H Hartley Sweeten', count=5857), Row(name=u'Mark Otto', count=5823)]

We can see that this Linus Torvalds fellow seems to be fairly busy. This is expected as he is the maintainer of the Linux kernel so he basically oversees the code that is running the operating system deployed on millions of servers across the world.

Activity

The following code sample can be copied and pasted into the Pyspark command line. At the end of the script you will have the df dataframe. You can run some basic queries like finding the list of people who have contributed to the repo with the name torvalds/linux or finding the 10 repos that have been committed to the most.

import json
from pyspark.sql import Row

bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
# This directory needs to be deleted between runs in Google Cloud Storage, the script does not do that for you.
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input_sql'.format(bucket)

conf = {
    # Input Parameters.
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'bigquery-public-data',
    'mapred.bq.input.dataset.id': 'github_repos',
    'mapred.bq.input.table.id': 'sample_commits',
}

table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

def normalizeKeys(commit):
    key_string = "author commit committer difference message parent repo_name subject trailer tree"
    for k in key_string.split():
        if k not in commit:
            commit[k] = None
    for dict_key in commit.keys():
        if dict_key not in key_string:
            commit.pop(dict_key)
    return commit

rows = table_data.map(lambda (x,y): json.loads(y)).map(lambda x: normalizeKeys(x)).map(lambda x: Row(**x))

df = spark.createDataFrame(rows)

Review

This section goes over the basics of creating and using a dataframe to do SQL like operations on Spark datasets. You can hopefully see that isn’t a fit for all data. You need to be able to get it to fit into a tabular format and the operations need to be ones that can be performed using SQL like functions.

This comprises a very large set of data, but just keep in mind that while this is a popular tool, it won’t be the best fit for all problems.