Spark IO

Introduction

Reading text files is sort of Sparks thing. At least is is one of Spark’s things, as much as a bunch of code can have its own thing… Anyway, we have a couple ways to get data. [new]APIHadoopRDD which will use some underlying Hadoop magic along with some connectors to access data. That is what we did in the last module where we connected to BigQuery. It abstracts a lot of stuff and we don’t have access to the process in the same way we do if we load data directly.

The other ways we will look at are textFile and wholeTextFiles. These load files line by line or one whole file at a time respecivley. We will look at them in some more detail along with looking at how to output files.

File Reading

As mentioned earlier there are two forms of file input we can use, textFile and wholeTExtFiles. The former operating line by line, the latter operating on the entire file.

textFile

SparkContext.textFile(name, minPartitions) reads files line by line, converting each line into a string in Spark. One thing you need to be careful of is that the file needs to be accessible by every node in the cluster. So if you make a file simply on the master node and try to load it, then this will fail.

As an example lets say we have this file

foo, bar, baz
1, 2, 3

and it is loaded up on cloud storage at gs://cs512-bucket/file1.csv, this way all the nodes have access to it. We can call myCSV = sc.textFile("gs://cs512-bucket/file1.csv") and that will load this into an RDD. Then we can call myCSV.collect() and see what we get.

[u'foo, bar, baz', u'1, 2, 3']

We need to be a little careful here. At first glance is looks like we may have a list of strings and numbers. But we actually have only two strings here. One string is foo, bar, baz and the other string is 1, 2, 3. Those are not individual string or individual numbers. We would need to do further parsing on each string to get out values, like this.

myCSV.map(lambda x: x.split(',')).collect()
[[u'foo', u' bar', u' baz'], [u'1', u' 2', u' 3']]  

For each string, we split on the comma. So now we have two lists of three strings each. If we had a big CSV file this would be the sort of thing we would do to split up our data. We would have one list per row and that list would have values for each column in the table. If we had more complex data we might use a CSV reader instead of just the strings split method. We would need to be careful of the header rows and make sure to not include them in the files we read or to find and remove them somehow.

Keep in mind that the ordering of the contents of rows is always going to match up to the order they were in in the file, the order of the rows themselves may not be consistent. The file might get read by many nodes at once and the order in which it is reassembled might not match the order of the rows in the file. So it is possible that we could get back:

[u'1, 2, 3', u'foo, bar, baz']

Notice that the order of the rows has changed, we got back the 2nd row first.

Along with this basic syntax for loading we can also specify a directory or wildcards. So we could call myCSVs = sc.textFile("gs://cs512-bucket/*.csv") to load all of the files ending with .csv in the directory. We will get back a list and again we will have no idea what order the rows were in or which file a row came from. Finally do not forget about specifying the partition count.

sc.textFile("gs://cs512-bucket/*.csv").glom().collect()
[[u'foo, bar, baz', u'1, 2, 3'], [], [u'a,b,c', u'aa,bb,cc']] 

This defaulted to 3 partitions, but the hashing left one of them empty.

sc.textFile("gs://cs512-bucket/*.csv",6).glom().collect()
[[u'foo, bar, baz'], [], [u'1, 2, 3'], [], [u'a,b,c', u'aa,bb,cc'], [], []] 

This on the other hand created 7 partitions (not quite sure why it threw one more in there 6 is the minimum but it liked 7), and left 4 empty. So we managed to use 3 partitions instead of 2. This looks a little silly when we are only dealing with 4 rows, but remember this will usually be working on millions of rows.

wholeTextFiles

SparkContext.wholeTextFiles(path, minPartitions) is the other option for loading text files. In this case instead of each row being its own element, the entire file is loaded as an entire element. So lets look at the previous example, if we now call sc.wholeTextFiles("gs://cs512-bucket/file1.csv").collect() we get back:

[(u'gs://cs512-bucket/file1.csv', u'foo, bar, baz\n1, 2, 3\n')]

There are two major differences here. One is that we get back the whole contents of the file as a single string. So we get the newlines and everything and would have to parse it all ourselves. The other is that each file is returned as a tuple. We get the file path as the key to the tuple and the value is the contents of the file. The most common use case for this would probably be loading JSON files where the contents of the file are stored as a single JSON object.

Lets say we had a file student.json with the following contents:

{
"name":"Alice",
"id":123,
"standing":"Junior"
}

We could load it with sc.wholeTextFiles("gs://cs512-bucket/student.json") and then map json.loads to it to get back an object

import json
sc.wholeTextFiles("gs://cs512-bucket/student.json").map(lambda (x,y): (x,json.loads(y))).collect()
[(u'gs://cs512-bucket/student.json', {u'standing': u'Junior', u'name': u'Alice', u'id': 123})]

If we had a directory of 1000 json files this would work the same, we would just call sc.wholeTextFiles("gs://cs512-bucket/*.json") instead, note the star. We would have a list of tuples of students/file names.

So where previously we went through downloading the file, cleaning it with Dataprep, loading it into BigQuery then finally getting into spark, we could conceivably just work directly on the JSON files before going to Dataprep, or say we wanted to use Dataprep, we could output to CSV instead of BigQuery. But Spark is really well suited to do the exact sort of work that Dataprep does so that would probably be the place to save a bunch of time in this workflow.

Output

Great! We got all this data into Spark, lets get something out. While we loaded stuff using the SparkContext, that was mainly because we didn’t have an RDD yet. The Context would give us one back to work with. Typically when we are writing stuff to disk we have an RDD already so it has the methods to write stuff.

collect

The most basic option is to just call collect() to gather up the whole set and write it to disk using any old method of writing a file in Python. This is problematic when dealing with large data sets for the same reasons as before, collect needs to fit the entire data set.

saveAsTextFile

RDD.saveAsTextFile(path) will take each line and save it as text using its built in str method. This means that it only works well with things that can easily be parsed into and out of string form easily. So you could convert items into the string mapping something like json.dumps and then use saveAsTextFile to save JSON representations of everything.

This is easily portable between applications but may not be the most space efficient. It also won’t work if you have more complex objects which cannot be represented as JSON.

saveAsPickleFile

RDD.saveAsPickleFile(path) is an alternative to saveAsTextFile, it will save the contents of the RDD as binary encoded objects. This means it is not human readable, it also means you won’t be able to easily load it in anything other than a similar Python environment. So this is a good option when you need to store complex objects that will only be used in the Spark environment. It would not be a good thing to use if you are trying to use Spark to clean up data for other programs to use.

File Output

In all of these you specify a directory as an output, not a file. Each node will write its own file into the directory called part-n where n is the worker number that wrote it. You could manually combine these files using a different script or you could leave them as is and load the entire directory next time you need to do work. In general if there isn’t a need to combine them, leave them as separate files.

Activity

You should load a couple CSVs that have data into Google Cloud Storage. Load them into Spark, parse them so that you can work on them, then write them back into Google Cloud Storage.

Do the same with JSON files loaded as whole files, but write them back as single line JSON entries. (There isn’t a super easy way to write back in the whole file format)

Review

This should get you to the point where you can, using a hosted Spark environment, load data from files, do work on it and write those files back to storage. That is more or less the whole lifespan of a Spark job. There are of course more ways you can hook Spark up to this that or the other thing to write and read from a database, but those are very implementation specific and the sort of thing a dedicated programmer would work with. If you are focused more on the data science side of things you are either going to be working with files more directly or hopefully have programmers on your team who can manage writing or using the adapters for other tools.