程式扎記: [ Learn Spark ] Ch5. Loading and Saving Your Data - Part1

標籤

2016年4月17日 星期日

[ Learn Spark ] Ch5. Loading and Saving Your Data - Part1

Introduction 
Both engineers and data scientists will find parts of this chapter useful. Engineers may wish to explore more output formats to see if there is something well suited to their intended downstream consumer. Data Scientists can likely focus on the format that their data is already in. We've looked at a number of operations we can perform on our data once we have it distributed in Spark. So far our examples have loaded and saved all of their data from a native collection and regular files, but odds are that your data doesn't fit on a single machine, so it's time to explore our options for loading and saving. 

Spark supports a wide range of input and output sources, partly because it builds on the ecosystem available for Hadoop.. In particular, Spark can access data through the InputFormat and OutputFormat interfaces used by Hadoop MapReduce, which are available for many common file formats and storage systems (e.g., S3, HDFS, Cassandra, HBase, etc). The section "Hadoop Input and Output Formats" on later section shows how to use these formats directly. 

More commonly, though, you will want to use higher-level APIs built on top of these row interfaces. Luckily, Spark and its ecosystem provide many options here. In this chapter, we will cover three common sets of data sources: 
- File formats and filesystems 
For data stored in a local or distributed filesystem, such as NFS, HDFS, or Amazon S3, Spark can access a variety of file formats including text, JSON, SequenceFiles, and protocol buffers. We will show how to use several common formats, as well as how to point Spark to different filesystems and configure compression.

- Structured data sources through Spark SQL 
The Spark SQL module, covered in Chapter 9, provides a nicer and often more efficient API for structured data sources, including JSON and Apache Hive. We will briefly sketch how to use Spark SQL, but leave the bulk of the details in Ch9.

- Databases and key/value stores 
We will sketch built-in and third-party libraries for connecting to Cassandra, HBase, Elasticsearch, and JDBC databases.


We choose most of the methods here to be available in all of Spark language, but some libraries are still Java and Scala only. We will point out when that is the case. 

File Formats 
Spark makes it very simple to load and save data in a large number of file formats. Formats range from unstructed, like text, to semistructed, like JSON, to structured, like SequenceFile (see Table 5-1). Spark wraps a number of input formats through methods like textFile(), and as part of the wrapper, automatically handles compression based on file extension. 


In addition to the output mechanisms supported directly in Spark, we can use both Hadoop's new and old file APIs for keyed (or paired) data. We can use these only with key/value data, because the Hadoop interface require key/value data, even through some formats ignore the key. In cases where the format ignores the key, it is common to use a dummy key (such as null). 

Text Files 
Text files are very simple to load from and save to with Spark. When we load a single text file as an RDD, each input line becomes an element in the RDD. We can also load multiple whole text files at the same time into a pair RDD, with the key being the name and the value being the contents of each file. 

Loading text files 
Loading a single text file is as simple as calling the textFile() function on our SparkContext with the path to the file, as you can see in below example. If we want to control the number of partitions we can also specify minPartitions
- Example 5-1. Loading a text file in Python 
>>> input = sc.textFile("/root/README.md")
>>> input.first()
u'# Apache Spark'

Multipart inputs in the form of a directory containing all of the parts can be handled in two ways. We can just use the same textFile() method and pass it a directory and it will load all of the parts into our RDD. Sometimes it's important to know which file each piece of input came from (such as time data with the key in the file) or we need to process an entire file at a time. If our files are small enough, then we can use the SparkContext.wholeTextFiles() method and get back a pair RDD with the key is the name of the input file. SparkContext.wholeTextFiles() can be very useful when each file represents a certain time period's data. If we had files representing sales data from different periods, we could easily compute the average for each period, as show in below example: 
- Example 5-4. Average values per file in Python 
>>> input = sc.wholeTextFiles("/root/datas/sales")
>>> input.collect()
[(u'file:/root/datas/sales/20160411.txt', u'100.0\n200.0\n300.0\n150.0\n'), (u'file:/root/datas/sales/20160412.txt', u'30.0\n500.0\n240.0\n10.0\n700.0\n')]
>>> nums = input.mapValues(lambda y: map(float, y.strip().split("\n"))) // Translate string into numbers
>>> nums.collect()
[(u'file:/root/datas/sales/20160411.txt', [100.0, 200.0, 300.0, 150.0]), (u'file:/root/datas/sales/20160412.txt', [30.0, 500.0, 240.0, 10.0, 700.0])]
>>> average = nums.mapValues(lambda nums: sum(nums)/len(nums)) // Calculate the average of numbers
>>> average.collect()
[(u'file:/root/datas/sales/20160411.txt', 187.5), (u'file:/root/datas/sales/20160412.txt', 296.0)]

Saving text files 
Outputting text files is also quite simple. The method saveAsTextFile(), demonstrated in below example, takes a path and will output the contents of the RDD to that file. 
- Example 5-5. Saving as a text file in Python 
- /root/datas/people.json 
  1. [{"name":"Peter""lovesPandas":false}]  
  2. [{"name":"Ken""lovesPandas":true}]  
  3. [{"name":"John""lovesPandas":true}]  
  4. [{"name":"Mary""lovesPandas":false}]  
>>> average.saveAsTextFile('/root/datas/sales/average')
>>> exit() // Exit the pyspark 
# ls /root/datas/sales/average/
part-00000 _SUCCESS
# cat /root/datas/sales/average/part-00000
(u'file:/root/datas/sales/20160411.txt', 187.5)
(u'file:/root/datas/sales/20160412.txt', 296.0)

JSON 
JSON is a popular semistructured data format. The simplest way to load JSON data is by loading data as a text file and then mapping over the values with a JSON parser. Likewise, we can use our preferred JSON serialization library to write out the value to strings, which we can then write out. In Java and Scala we can also work with JSON data using a custom Hadoop format. In the later section, we will show how to load JSON data with Spark SQL. 

Loading JSON 
Loading the data as a text file and then parsing the JSON data is an approach that we can use in all of the supported languages. This works assuming that you have one JSON record per row; if you have multiline JSON files, you will instead have to load the while file and then parse each line. If constructing a JSON parser is expensive in your language, you can use mapPartitions() to reuse the parser; see "Working on a Per-Partition Basis" in later section. 

There are a wide variety of JSON libraries available for the three languages (Python/Scala/Java) we are looking at, but for simplicity's sake we are considering only native library. In Python, we will use the built-in library json package. 
- Example 5-6. Loading unstructed JSON in Python 
>>> input = sc.textFile('/root/datas/foo.json')
>>> import json
>>> json.loads(input.collect()[0])
[u'foo', {u'bar': [u'baz', None, 1.0, 2]}]
>>> data = input.map(lambda x: json.loads(x))
>>> data.collect()
[[u'foo', {u'bar': [u'baz', None, 1.0, 2]}]]
>>> input.collect()
[u'["foo", {"bar":["baz", null, 1.0, 2]}]']

Saving JSON 
Writing out JSON files is much simpler compared to loading it, because we don't have to worry about incorrectly formatted data and we know the type of data that we are writing out. We can use the same libraries we used to convert our RDD of strings into parsed JSON data and instead take our RDD of structured data and convert it into an RDD of strings, which we can then write out using Spark's text file API. Let's say we were running a promotion for people who love pandas. We can take our input from the first step and filter it for the people who love pandas, as show in below example: 
- Example 5-9. Saving JSON in Python 
>>> input = sc.textFile('/root/datas/people.json')
>>> import json
>>> data = input.map(lambda x: json.loads(x))
>>> data.collect()
[[{u'name': u'Peter', u'lovesPandas': False}], [{u'name': u'Ken', u'lovesPandas': True}], [{u'name': u'John', u'lovesPandas': True}], [{u'name': u'Mary', u'lovesPandas': False}]]
>>> data_lovePanda = data.filter(lambda x: x[0]['lovesPandas'])
>>> data_lovePanda.collect()
[[{u'name': u'Ken', u'lovesPandas': True}], [{u'name': u'John', u'lovesPandas': True}]]
>>> data_lovePanda_str = data_lovePanda.map(lambda x: json.dumps(x))
>>> data_lovePanda_str.collect()
['[{"name": "Ken", "lovesPandas": true}]', '[{"name": "John", "lovesPandas": true}]']
>>> data_lovePanda_str.saveAsTextFile('/root/data/lovePandas.json')
>>> exit()
# ls /root/data/lovePandas.json/
part-00000 part-00001 _SUCCESS
# cat /root/data/lovePandas.json/part-00000
[{"name": "Ken", "lovesPandas": true}]

Comma-Separated Values and Tab-Separated Values 
Comma-separated value (CSV) files are supported to contain a fixed number of fields per line, and the fields are separated by a comma (or a tab in the case of tab-separated value, or TSV, files). Records are often stored one per line, but this is not always the case as records can sometimes span lines. CSV and TSV files can sometimes be inconsistent, most frequently with respect to handling newlines, escaping, and rendering non-ASCII characters, or noninteger numbers. CSVs cannot handle nested field types natively, so we have to unpack and pack to specific fields manually. 

Unlike with JSON fields, each record doesn't have field name associated with it; instead we get back row numbers. It is common practice in single CSV file to make the first row's column values the names of each field. 

Loading CSV 
Loading CSV/TSV data is similar to loading JSON data in that we can first load it as text and then process it. The lack of standardization of format leads to different versions of the same library sometimes handling input in different ways. As with JSON, there are many different CSV libraries, but we will use only one for each language. Once again, in Python we use the included csv library. If your CSV data happens to not contain newline in any of the fields, you can load your data with textFile() and parse it, as show in below example: 
- /root/datas/test.csv 
  1. Name,Age,Note  
  2. John,36,IBM  
  3. Ken,18,NTU  
  4. Mary,24,TrendMicro  
- Example 5-12. Loading CSV with textFile() in Python 
>>> import csv
>>> import StringIO
>>> def loadRecord(line):
... """Parse a CSV line"""
... input = StringIO.StringIO(line)
... reader = csv.DictReader(input, fieldnames=["Name", "Age", "Note"])
... return reader.next()
...
>>> input = sc.textFile('/root/datas/test.csv').map(loadRecord)
>>> input.collect()
[{'Note': 'Note', 'Age': 'Age', 'Name': 'Name'}, {'Note': 'IBM', 'Age': '36', 'Name': 'John'}, {'Note': 'NTU', 'Age': '18', 'Name': 'Ken'}, {'Note': 'TrendMicro', 'Age': '24', 'Name': 'Mary'}]

Saving CSV 
As with JSON data, writing out CSV/TSV data is quite simple and we can benefit from reusing the output encoding object. Since in CSV we don't output the field name with each record, to have a consistent output we need to create a mapping. One of the easy ways to do this is to just write a function that converts the fields to given positions in an array: 
  1. def writeRecords(records):  
  2.     "Write out CSV lines"  
  3.     output = StringIO.StingIO()  
  4.     writer = csv.DictWriter(output, fieldnames=["Name""Age""Note"])  
  5.     for record in records:  
  6.         writer.writerow(record)  
  7.         return [output.getvalue()]  
In Python, if we are outputing dictionaries the CSV writer can do this for us based on the order in which we provide the fieldnames when constructing the writer. The CSV libraries we are using to output to files/writers so we can useStringWriter/StringIO to allow us to put the result in our RDD, as you can see in below example: 
- Example 5-18. Writing CSV in Python 
>>> input.collect()
[{'Note': 'Note', 'Age': 'Age', 'Name': 'Name'}, {'Note': 'IBM', 'Age': '36', 'Name': 'John'}, {'Note': 'NTU', 'Age': '18', 'Name': 'Ken'}, {'Note': 'TrendMicro', 'Age': '24', 'Name': 'Mary'}]
>>> a = input.mapPartitions(writeRecords)
>>> a.collect()
['Name,Age,Note\r\nJohn,36,IBM\r\nKen,18,NTU\r\n', 'Mary,24,TrendMicro\r\n']
>>> a.saveAsTextFile('/root/datas/csvOutput')

After exiting the pyspark shell, let's check what we have output: 
# ls datas/csvOutput/
part-00000 part-00001 _SUCCESS
# cat datas/csvOutput/part-00000
Name,Age,Note
John,36,IBM
Ken,18,NTU

As you may have noticed, the preceding examples work only provided that we know all of the fields that we will be outputing. However, if some of the field names are determined at runtime from user input, we need to take a different approach. The simplest approach is going over all of our data and extracting the distinct keys and then taking another pass for output. 

SequenceFiles 
SequenceFiles are a popular Hadoop format composed of flat files with key/value pairs. SequenceFiles have sync markers that allow Spark to seek to a point in the file and then resynchronize with the record boundaries. This allow Spark to efficienly read SequenceFiles in parallel from muliple nodes. SequenceFiles are a common input/output format for Hadoop MapReduce jobs as well, so if you are working with an existing Hadoop system there is a good chance your data will be available as a SequenceFile

SequenceFiles consist of elements that implement Hadoop's Writable interface, as Hadoop uses a custom serialization framework. Table 5-2 lists some common types and their corresponding Writable class. The standard rule of thumb is to try adding the word Writable to the end of your class name and see if it is a known subclass of org.apache.hadoop.io.Writable. If you can't find a Writable for the data you are trying to write out (for example, a custom case class), you can go ahead and implement your own Writable class by overriding readFields and write from org.apache.hadoop.io.Writable


In Spark 1.0 and earlier, SequenceFile was available only in Java and Scala, but Spark 1.1 added the ability to load and save them in Python as well. Note that you will need to use Java and Scala to define custome Writable types, however. The Python Spark API knows only how to convert the basic Writables available in Hadoop to Python, and makes a best effort for other classes based on their available getter methods. 

Saving SequenceFiles 
Writing data out to a SequenceFile is fairly similar in Scala. First, because SequenceFiles re key/value pairs, we need a PairRDD with types that our SequenceFile can write out. Implicit conversions between Scala types and Hadooporg.apache.hadoop.io.Writable exist for many native types, so if you are writing out a native type you can just save your PairRDD by calling saveAsSequenceFile(path), and it will write out the data for you. If there isn't an automatic conversion from our key and value to org.apache.hadoop.io.Writable, or we want to use variable-length types (e.g., VIntWritable), we can just map over the data and convert it before saving. Below is a simple example: 
- Example 5-23. Saving a SequenceFile in Python 
>>> data = sc.parallelize([("John", 36), ("Mary", 28), ("Peter", 30)])
>>> data.collect()
[('John', 36), ('Mary', 28), ('Peter', 30)]
>>> data.saveAsSequenceFile('/root/datas/sequenceFiles')
>>> exit() // Exit pyspark shell
# ls sequenceFiles/
part-00000 part-00001 part-00002 part-00003 _SUCCESS


Loading SequenceFiles 
Spark has a specialized API for reading in SequenceFiles. On the SparkContext we can call sequenceFile(path, keyClass, valueClass, minPartitions). As mentioned earlier, SequenceFiles work with Writable classes, so our keyClass andvalueClass will both have to be the correct Writable class. For simplicity, we'll work with key as Text and value as IntWritable in below example: 
>>> data = sc.sequenceFile('/root/datas/sequenceFiles')
>>> data.collect()
[(u'John', 36), (u'Peter', 30), (u'Mary', 28)]


Object Files 
Object files are a deceptively simple wrapper around SequenceFiles that allows us to save our RDDs containing just values. Unlike with SequenceFiles, with object files the values are written out using Java Serialization. Using Java Serialization for object files has a number of implications. Unlike with normal SequenceFiles, the output will be different than Hadoop outputing the same objects. Unlike the other formats, object files re mostly intended to be used for Spark jobs communicating with other Spark jobs. Java Serialization can also be quite slow. 

Saving an object file is as simple as calling saveAsObjectFile on an RDD. Reading an object file back is also quite simple: the function objectFile() on the SparkContext takes in a path and returns an RDD. With all of these warnings about object files, you might wonder why anyone would use them. The primary reason to use object files is that they require almost no work to save arbitrary objects. 

Object files are not available in Python, but the Python RDDs and SparkContext support methods called saveAsPickleFile() and pickleFile() instead. These use Python pickle serialization library. The same caveats for object files apply to pickle files, however: the pickle library can be slow, and old files may not be readable if you change your classes.

沒有留言:

張貼留言

網誌存檔

關於我自己

我的相片
Where there is a will, there is a way!