Spark Rows, Views and Schemas

Introduction

While probably not something you are going to spend a whole lot of time on rows and views are important things to know about in Spark. You have briefly seen rows in the previous section where we converted some data into rows to make a dataframe. We will talk a little bit more about how rows are made in this section.

Views are a way to let us access a dataframe using more or less direct SQL. They let us name a dataframe so that we can use the Spark context to write SQL queries which will be interpreted and run against that dataframe without having to string together a long chain of dataframe functions ourselves.

Finally schemas are part of the dataframe but we didn’t talk about it in the last section. The schema can often be inferred from the data. It will take a sample of rows and look at the data types between them. If they all match then the type of that row can be inferred. One needs to be a little careful with sparse data. If 90% of the time an attribute is null your schema might not have any valid data to infer the type from.

Views

Views are a little like aliases in SQL. It is just a way to assign a name to a dataframe to be used in SQL queries. The createOrReplaceTempView(name) of a dataframe takes name as a string and registeres it in the current Spark session. This is different from the Spark context, usually named sc. The Spark session is conventionally names spark. It has a sql method which lets you directly call SQL queries on registered views. For example:

df.createOrReplaceTempView("commits")

spark.sql('SELECT author.name, COUNT(*) AS count FROM commits GROUP BY author.name ORDER BY count DESC')

This will get us the same result set as our query directly on the dataframe in the last section.

Rows

Rows are really similar to dicts in Python. They are key value pairs. Rows are constructed using the pyspark.sql.Row class and the can be constructed in two ways you can either specify all the keys and none of the values

r1 = pyspark.sql.Row("name","address","age")

Or you can specify every key and value

r2 = pyspark.sql.Row(name="Bob",address="123 1st St",age=24)

You cannot mix them. So you can’t have Row('foo',bar=2). This will throw an error an fail.

That is really the only sticking point to rows. Remember that you can pass the Row constructor a dict if you preface it with **. So Row(**some_dict) will work, Row(some_dict) however will throw an error.

Schemas

Schemas in dataframes are more complicated than the previous two topics. You can read about creating dataframes here. Note that you can optionally pass a schema. This would be for situations where you don’t have data to populate it with at first for some reason or if it is unable to determine the data types automatically from your data set.

My preferred way of passing information to this is with the StructType class. You can pass it a list of structField which are named types that you can define as nullable or not. Finally the list of DataTypes can be found here.

A simple schema might look like this:

from pyspark.sql.types import *

my_schema = StructType([StructField("name", StringType(), False),
StructField("age", IntegerType(), True)
])
my_rdd = sc.parallelize([{'name':'Alice', 'age':43},{'name':'Mallory','age':35}])
my_df = spark.createDataFrame(my_rdd, my_schema)
my_df.collect()

Here we defined a schema then provided an RDD with appropriately named dicts to populate the schema. You need to be careful here because if you have an error with your RDD where a type or a key isn’t correct you won’t find out about it until you do an operation on the dataframe.

my_rdd = sc.parallelize([{'not_a_name':3.14, 'age':43},{'name':'Mallory','age':35}])
my_df = spark.createDataFrame(my_rdd, my_schema)

The above code will run without an error despite having an incorrect key name and value type. But as soon as you run my_df.collect() or do something else that requires processing of the dataframe you will get an error and it might not be totally clear what the issue is because the genesis of the error was actually earlier in the code when you first declared the schema and populated the dataframe.

The last point is note that I ran from pyspark.sql.types import *. Generally this is frowned upon and probably you should not do it in production code unless you have a style guide that says otherwise, but there are so many things needed from that library that importing * is really the only way to do it without killing your fingers from typing.