Spark Partitions

Introduction

Partitions generally speaking are simply divisions of stuff. In Spark these divisions are divisions of data that must be kept together. In other words, if the data set is split up or chunks of the data are sent to workers, they are sent based on the partitions, one partition at a time. So at the absolute smallest a partition needs to be able to hold one record from your data set. If your data set are people and their tax 2015 tax returns, you would not be able to split up the person and part of the tax return between multiple partitions (or at least it would take a lot of work to do that). Instead you would want to make sure that a person and all of their tax return was kept together. And in all likelihood you would have many per partition.

RDDs and Partitions

RDDs are Resilient Distributed Datasets. They are created by either reading in data or using the parallelize function of your Spark context. Typically you can control how many partitions are made when the RDD is created by passing in an additional parameter. sc.parallelize([1,2,3,4,5],3) will split up the array among 3 partitions. Spark recommends having 2-4 partitions per worker node. So if we had 5 CPUs available aas workers then we would want to have between 10 and 20 partitions of data.

Assuming that we manage to randomly split up the data to these partitions this will help ensure that no node will simply get one easy partition while a different node gets a difficult partitions, forcing everyone to wait on the node that got the hard work. If every node needs to go through a couple partitions then the easy and hard nodes will balance out a bit. If we start introducing too many more nodes then we start running into performance problems due to the overhead of distributing nodes to the various workers.

A final piece to consider is how large the partitions are. You don’t want partitions to exceeded the working memory of your workers, that will cause a lot of issues. This is particularly relevant for small clusters with only a few workers. If we have 100gb of data but only 3 workers we could end up with 16GB of data per worker. This may well exceed the worker memory so we need to have more partitions even if we might exceed the recommended number of partitions per worker. This is preferable to either crashing or having to write content from memory to disk while doing computations.

Shuffles

Shuffles are one of the major things we want to avoid if possible in our Spark programs. This happens when a worker needs data from more than one partition. The ‘useful’ work needs to be put on hold while we find all the relevant data and send it over the network to the appropriate node. The places where shuffles happen are places where we need to group things that are not already grouped. So say we have data about city residents spread across nodes. Now lets say we want to get average age of the population of each city. To do this we need to find all the people who live in Shelbyville across lots of partitions and collect all that data to compute the average. This is costly in terms of time to complete it.

A lot of being ‘good’ at Spark deals with how to efficiently structure and organize data so that the number of shuffles can be minimized.

Repartitioning

Spark offers a couple ways to fiddle around with partitions after an RDD has been created. One is the repartition(n) method of an RDD. It will create a new RDD with n partitions. It will use a hash function to generate a number and then mod that number to figure out what partition to put an element in. This generally does an OK job splitting up large data sets but you need to be careful if you have a small number of data, say less than 1000, that it isn’t leaving a lot of empty partitions.

Another is the coalesce(n) function which is a bit like repartition but can only reduce the number of partitions. It uses some internal optimizations to make this happen with less work.

These are the functions you would want to use if there isn’t really any sort of logical grouping of your data. They will do a reasonably good job of splitting up the data so that each worker gets its fair share of work.

But a lot of times you can do a bit better than this.

Consider playing a game of cards. There are 4 suits, clubs, diamonds, hearts and spades. Suppose your cards are randomly sorted. If you want to find the highest card of each suit you would need to operate on your entire hand to calculate every maximum. First you would go through looking for the highest club, then the highest diamond and so forth. These sorts of operations would be a lot easier if your cards were grouped by suit, which is what most card players would do. They would have all the clubs together in their hand, all the diamonds together and so forth. This makes it much easier to figure out the highest diamond or highest club.

In Spark we often want to do the same thing. We do it by working with key value pairs. In Pyspark these are represented as tuples of the form (key, value). The key is typically a simple type like an integer or a string. The value can be more or less anything. To get our data into a key value format we might use a map function. Say we had student grade objects and we wanted to use the students id as the key, we might map a function like lambda x: (x['id'], x). This would simply use the objects id attribute as the key and the object as the value.

Once we do this we can use the partitionBy(n) function. This will create n partitions but use the key as the hash. So in our case of student/grade values, maybe each student has 40 different grades from different classes. Using the partitionBy function we would make sure that all of the grades associated to a particular student would end up on the same partition. So if we later wanted to calculate the students GPA we would not need to shuffle to collect the data onto one worker.

Debugging/Exploring Partitions

So this is all great, but how do we actually see how many partitions we are working with? An RDDs getNumPartitions will return the number of partitions on the RDD. This is useful for sanity checking. Did the command to make more partitions actually work? Or did my partitions persist after doing the previous map? It can give you a rough estimate of these things. But it is far from telling the whole story.

The key bit of information it is leaving out is the number of elements in each partition. The glom method is our friend here. It will take all the elements in a partition and convert them into a list. So you might have had 10 partitions that stored Student objects. glom will turn that into 10 lists of students, one per partition. At that point you can do something like myRdd.map(lambda x: len).collect() to return a list of the number of elements in each partition. For very small sets when you are just experimenting you can simply call something like myRdd.glom().collect() to visually see what elements at mapped to what partition. Again, the collect method really does not work for large result sets but for experimenting it is great.

Activity

Alright, lets get some practice with partitions. There are a handful of tasks you should be able to perform using some combination of the information above.

  • Make a list of 25 integers across 3 partitions.
  • Make a list of 50 integers across 4 partitions, efficiently convert it to 2 partitions.
  • Starting with a list of 26 integers 0 through 25 on 1 partition, end with a list of 26 integers split among two partitions, even numbers on one and odd on the other.
  • Starting with 20 strings split somewhat evenly across 3 partitions, end with 4 partitions will ALL of the strings stored in one with the other 3 empty.
  • Compare the results of using repartition(20) directly on an RDD containing the values 0 through 99 with the results of first making a key value pair using the value as the key, then using partitionBy(20)

Review

This section should give you some further insight into the inner working of Spark and what it is doing in terms of splitting up data. You probably saw some behavior that made a lot of sense and maybe you saw some behavior that didn’t make so much sense (I am looking at you repartition). Hopefully you took away the value of keys and can maybe see how you might use keys on more complex data types to keep things grouped together in a more reasonable way.