Introduction
This chapter introduces Spark's core abstraction for working with data, the resilient distributed dataset (RDD). An RDD is imply a distributed collection of elements. In Spark, all work is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result. Under the hood, Spark automatically distributed the data contained in RDDs across your cluster and parallelizes the operations you perform on them. Both data scientists and engineers should read this chapter, as RDDs are the core concept in Spark. We highly recommend you try some of these examples in an interactive shell. In addition, all code in this chapter is available in the book'sGitHub repository.
RDD Basics
An RDD in Spark is simply an immutable distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the clusters. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes. User create RDDs in two ways: by locating an external dataset, or by distributing a collection of objects (e.g., a list or set) in their driver program. We have already seen loading a text file as an RDD of strings using SparkContext.textFile(), as shown in below example:
- Example 3-1. Creating an RDD of strings with textFile() in Python
Once created, RDDs offer two types of operations: transformations and actions. Transformations constructs a new RDD from a previous one. For example, one conmon transformation is filtering data that matches a predicate. In our text file example, we can use this to create a new RDD holding just the strings that contain the workd 'Python', as show in below example:
- Example 3-2. Calling the filter() transformation
Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS). One example of an action we called earlier is first(), which returns the first element in an RDD and is demonstrated in below example:
- Example 3-3. Calling the first() action
Transformation and actions are different because of the way Spark computes RDDs. Although you can define new RDDs any time, Spark computes them only in a lazy fashion - that is, the first time they are used in an action. This approach might seem unusual at first, but makes a lot of sense when you are working with Big Data. For instance, consider Example 3-2 and Example 3-3, where we defined a text file and then filtered the lines that include 'Python'. If Spark were to load and store all the lines in the file as soon as we wrote lines = sc.textFile(...), it would waste a lot of storage space, given that we then immediately filter out many lines. Instead, once Spark sees the whole chain of transformations, it can compute just the data needed for its result. In fact, for the first() action, Spark scans the file only until it finds the first matching line; it doesn't even read the whole file!
Finally, Spark's RDDs are by default recomputed each time you run an action on them. If you would like to reuse an RDD in multiple actions, you can ask Spark to persist it using RDD.persist(). We can ask Spark to persist our data in a number of different places, which will be covered later. After computing it the first time, Spark will store the RDD contents in memory (partitioned across the machines in your clusters), and reuse them in future actions. Persisting RDDs on disk instead of memory is also possible. The behavior of not persisting by default may again seem unusual, but it makes a lot of sense for big datasets: if you will not reuse the RDD, there's no reason to waste storage space when Spark could instead stream through the data once and just compute the result!
In practice, you will often use persist() to load a subset of your data into memory and query it repeatedly. For example, if we knew that we wanted to compute multiple results about the README lines that contain 'Python', we could write the script show in below example:
To summarize, every Spark program and shell session will work as follow:
Notes.
In the rest of this chapter, we'll go through each of these steps in detail, and cover some of the most common RDD operations in Spark
Creating RDD
Spark provides two ways to create RDDs: loading an external dataset and parallelizing a collection in your driver program. The simplest way to create RDDs is to take an existing collection in your program and pass it to SparkContext'sparallelize() method, as show in below example 3-5 through 3-7. This approach is very useful when you are learning Spark, since you can quickly create your own RDDs in the shell and perform operations on them. Keep in mind, however that outside of prototyping and testing, this is not widely used since it requires that you have your entire dataset in memory on one machine.
- Example 3-5. parallelize() method in Python
- Example 3-6 parallelize() method in Scala
- var lines = sc.parallelize(List("pandas", "i like pandas"))
- JavaRDD
lines = sc.parallelize(Arrays.asList( "pandas", "i like pandas"));
A more common way to create RDDs is to load data from external storage. Loading external datasets is covered in detail in Chapter 5. However, we already saw one method that loads a text file as an RDD of strings,SparkContext.textFile(), which is shown in Example 3-8 through Example 3-10:
- Example 3-8. textFile() method in Python
- lines = sc.textFile("/path/to/README.md")
- var lines = sc.textFile("/path/to/README.md")
- JavaRDD
lines = sc.textFile( "/path/to/README.md");
As we've discussed, RDDs support two types of operations: transformations and actions. Transformations are operations on RDDs that return a new RDD, such as map() and filter(). Actions are operations that return a reult to the driver program or write it to storage, and kick off a computation, such as count() and first(). Spark treats transformations and actions very differently, so understanding which type of operations you are performing will be important. If you are ever confused whether a given function is a transformation or an action, you can look at its return type: transformations return RDDs, whereas actions return some other data type.
Transformations
Transformations are operations on RDDs that return a new RDD. As discussed in "Lazy Evaluation" before, transformed RDDs are computed lazily, only when you use them in an action. Many transformations are element-wise; that is, they work on one element at a time; but this is not true for all transformations. As an example, suppose that we have a logfile, log.txt, with a number of messages, and we want to select only the error messages. We can use the filter()transformation seen before. This time, though, we'll show a filter in all three Spark's language APIs:
- Example 3-11. filter() transformation in Python
- inputRDD = sc.textFile("log.txt")
- errorsRDD = inputRDD.filter(lambda x: "error" in x)
- var inputRDD = sc.textFile("log.txt")
- var errorsRDD = inputRDD.filter(line => line.contains("error"))
- JavaRDD
inputRDD = sc.textFile( "log.txt"); - JavaRDD
errorsRDD = inputRDD.filter({ - new Function
(){ - public boolean call(String x) { return x.contains("error"); }
- }
- })
- Example 3-14. union() transformation in Python
- errorsRDD = inputRDD.filter(lambda x: "error" in x)
- warningRDD = inputRDD.filter(lambda x: "warning" in x)
- badlinesRDD = errorsRDD.union(warningRDD)
Notes.
Actions
We've seen how to create RDDs from each other with transformations, but at some point, we'll want to actually do something with our dataset. Actions are the second type of RDD operations. They are the operations that return a final value to the driver program or write data to an external storage system. Actions force the evaluation of the transformations required for the RDD they were called on, since they need to actually produce output.
Continuing the log example from the previous section, we might want to print out some information about the badLinesRDD. To do that, we'll use two actions, count(), which returns the count as a number, and take(), which collects a number of elements from the RDD, as show in below example:
- Example 3-15. Python error count using actions
- print "Input had " + badLinesRDD.count() + " concerning lines"
- print "Here are 10 examples:"
- for line in badLinesRDD.take(10):
- print line
In most cases RDDs can't just be collect()ed to the driver because they are too large. In these cases, it's common to write data out to a distributed storage system such as HDFS or Amazon S3. You can save the contents of an RDD using the saveAsTextFile() action, saveAsSequenceFile(), or any of a number of actions for various built-in formats. We will cover the different options for exporting data in Chapter 5. It is important to note that each time we call a new action, the entire RDD must be computed "from scratch". To avoid this inefficiency, users can persist intermediate results, as we will cover in "Persistence (Caching)" later.
Lazy Evaluation
As you read earlier, transformations on RDDs are lazily evaluated, meaning that Spark will not begin to execute until it sees an action. This can be somewhat counter-intuitive for new users, but may be familiar for those who have used functional languages such as Haskell or LINQ-like data processing frameworks. Lazy evaluation means that when we call a transformation on a RDD (for instance, calling map()), the operation is not immediate performed. Instead, Spark internally records metadata to indicate that this operation has been requested. Rather than thinking of an RDD as containing specific data, it is best to think of each RDD as consisting of instructions on how to compute the data that we build up through transformations. Loading data into an RDD is lazily evaluated in the same way transformations are. So, when we call sc.textFile(), the data is not loaded until it is necessary. As with transformations, the operations (in this case, reading the data) can occur multiple times.
Notes.
Spark uses lazy evaluation to reduce the number of passes it has to take over our data by grouping operations together. In systems like Hadoop MapReduce, developers often have to spend a lot of time considering how to group together operations to minimize the number of MapReduce passes. In Spark, there is no substantial benefit to writing a single complex map instead of chaining together many simple operations. Thus, users are free to organize their program into smaller, more manageable operations.
Passing Functions to Spark
Most of Spark's transformations, and some of its actions, depend on passing in functions that are used by Spark to compute data. Each of the core languages has a slightly different mechanism for parsing functions to Spark.
Python
In Python, we have three options for passing functions into Spark. For shorter functions, we can pass in lambda expression, as we did in Example 3-2, and as Example 3-18 demonstrates. Alternatively, we can pass in top-level functions, or locally defined functions as below example:
- Example 3-18, Passing functions in Python
- word = rdd.filter(lambda s: "error" in s)
- def containsError(s):
- return "error" in s
- word = rdd.filter(containsError)
- Example 3-19. Passing a function with field field references (don't do this!)
- class SearchFunctions(object):
- def __init__(self, query):
- self.query = query
- def isMatch(self, s):
- return self.query in s
- def getMatchesFunctionReference(self, rdd):
- # Problem: references all of "self" in "self.isMatch"
- return rdd.filter(self.isMatch)
- def getMatchesMemberReference(self, rdd):
- # Problem: references all of "self" in "self.query"
- return rdd.filter(lambda x: self.query in x)
- Example 3-20. Python function passing without field references
- class WordFunctions(object):
- ...
- def getMatchesNoReference(self, rdd):
- # Safe: extract only the field we need into a local variable
- query = self.query
- return rdd.filter(lambda x: query in x)
For Scala/Java, please refer to Book:Learning Spark.
沒有留言:
張貼留言