2016年3月21日 星期一

[ Learn Spark ] Ch3. Programming with RDDs - Part1


Introduction 
This chapter introduces Spark's core abstraction for working with data, the resilient distributed dataset (RDD). An RDD is imply a distributed collection of elements. In Spark, all work is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result. Under the hood, Spark automatically distributed the data contained in RDDs across your cluster and parallelizes the operations you perform on them. Both data scientists and engineers should read this chapter, as RDDs are the core concept in Spark. We highly recommend you try some of these examples in an interactive shell. In addition, all code in this chapter is available in the book'sGitHub repository

RDD Basics 
An RDD in Spark is simply an immutable distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the clusters. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes. User create RDDs in two ways: by locating an external dataset, or by distributing a collection of objects (e.g., a list or set) in their driver program. We have already seen loading a text file as an RDD of strings using SparkContext.textFile(), as shown in below example: 
- Example 3-1. Creating an RDD of strings with textFile() in Python 
>>> lines = sc.textFile("README.md")

Once created, RDDs offer two types of operations: transformations and actions. Transformations constructs a new RDD from a previous one. For example, one conmon transformation is filtering data that matches a predicate. In our text file example, we can use this to create a new RDD holding just the strings that contain the workd 'Python', as show in below example: 
- Example 3-2. Calling the filter() transformation 
>>> pythonLines = lines.filter(lambda line: "Python" in line)

Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS). One example of an action we called earlier is first(), which returns the first element in an RDD and is demonstrated in below example: 
- Example 3-3. Calling the first() action 
>>> pythonLines.first()
u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'

Transformation and actions are different because of the way Spark computes RDDs. Although you can define new RDDs any time, Spark computes them only in a lazy fashion - that is, the first time they are used in an action. This approach might seem unusual at first, but makes a lot of sense when you are working with Big Data. For instance, consider Example 3-2 and Example 3-3, where we defined a text file and then filtered the lines that include 'Python'. If Spark were to load and store all the lines in the file as soon as we wrote lines = sc.textFile(...), it would waste a lot of storage space, given that we then immediately filter out many lines. Instead, once Spark sees the whole chain of transformations, it can compute just the data needed for its result. In fact, for the first() action, Spark scans the file only until it finds the first matching line; it doesn't even read the whole file! 

Finally, Spark's RDDs are by default recomputed each time you run an action on them. If you would like to reuse an RDD in multiple actions, you can ask Spark to persist it using RDD.persist(). We can ask Spark to persist our data in a number of different places, which will be covered later. After computing it the first time, Spark will store the RDD contents in memory (partitioned across the machines in your clusters), and reuse them in future actions. Persisting RDDs on disk instead of memory is also possible. The behavior of not persisting by default may again seem unusual, but it makes a lot of sense for big datasets: if you will not reuse the RDD, there's no reason to waste storage space when Spark could instead stream through the data once and just compute the result

In practice, you will often use persist() to load a subset of your data into memory and query it repeatedly. For example, if we knew that we wanted to compute multiple results about the README lines that contain 'Python', we could write the script show in below example: 
>>> pythonLines.persist

>>> pythonLines.count()
3
>>> pythonLines.first()
u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'

To summarize, every Spark program and shell session will work as follow: 
1. Create some input RDDs from external data.
2. Transform them to define new RDDs using transformations like filter().
3. Ask Spark to persist() any intermediate RDDs that will need to be reused.
4. Launch actions such as count() and first() to kick off a parallel computation, which is then optimized and executed by Spark.

Notes. 
cache() is the same as calling persist() with the default storage level.

In the rest of this chapter, we'll go through each of these steps in detail, and cover some of the most common RDD operations in Spark 

Creating RDD 
Spark provides two ways to create RDDs: loading an external dataset and parallelizing a collection in your driver program. The simplest way to create RDDs is to take an existing collection in your program and pass it to SparkContext'sparallelize() method, as show in below example 3-5 through 3-7. This approach is very useful when you are learning Spark, since you can quickly create your own RDDs in the shell and perform operations on them. Keep in mind, however that outside of prototyping and testing, this is not widely used since it requires that you have your entire dataset in memory on one machine. 
- Example 3-5. parallelize() method in Python 
>>> lines = sc.parallelize(["pandas", "i like pandas"])

- Example 3-6 parallelize() method in Scala 
  1. var lines = sc.parallelize(List("pandas""i like pandas"))  
- Example 3-7. parallelize() method in Java 
  1. JavaRDD lines = sc.parallelize(Arrays.asList("pandas""i like pandas"));  

A more common way to create RDDs is to load data from external storage. Loading external datasets is covered in detail in Chapter 5. However, we already saw one method that loads a text file as an RDD of strings,SparkContext.textFile(), which is shown in Example 3-8 through Example 3-10: 
- Example 3-8. textFile() method in Python 
  1. lines = sc.textFile("/path/to/README.md")  
- Example 3-9. textFile() method in Scala 
  1. var lines = sc.textFile("/path/to/README.md")  
- Example 3-10. textFile() method in Java 
  1. JavaRDD lines = sc.textFile("/path/to/README.md");  
RDD Operations 
As we've discussed, RDDs support two types of operations: transformations and actionsTransformations are operations on RDDs that return a new RDD, such as map() and filter()Actions are operations that return a reult to the driver program or write it to storage, and kick off a computation, such as count() and first(). Spark treats transformations and actions very differently, so understanding which type of operations you are performing will be important. If you are ever confused whether a given function is a transformation or an action, you can look at its return type: transformations return RDDs, whereas actions return some other data type. 

Transformations 
Transformations are operations on RDDs that return a new RDD. As discussed in "Lazy Evaluation" before, transformed RDDs are computed lazily, only when you use them in an action. Many transformations are element-wise; that is, they work on one element at a time; but this is not true for all transformations. As an example, suppose that we have a logfile, log.txt, with a number of messages, and we want to select only the error messages. We can use the filter()transformation seen before. This time, though, we'll show a filter in all three Spark's language APIs: 
- Example 3-11. filter() transformation in Python 
  1. inputRDD = sc.textFile("log.txt")  
  2. errorsRDD = inputRDD.filter(lambda x: "error" in x)  
- Example 3-12. filter() transformation in Scala 
  1. var inputRDD = sc.textFile("log.txt")  
  2. var errorsRDD = inputRDD.filter(line => line.contains("error"))  
- Example 3-13. filter() transformation in Java 
  1. JavaRDD inputRDD = sc.textFile("log.txt");  
  2. JavaRDD errorsRDD = inputRDD.filter({  
  3.     new Function(){  
  4.         public boolean call(String x) { return x.contains("error"); }  
  5.     }  
  6. })  
Note that the filter() operation does not mutate the existing inputRDD. Instead, it returns a pointer to an entirely new RDD. inputRDD can still be reused later in the program - for instance, to search for other words. In fact, let's use inputRDD again to search for lines with the word warning in them. Then, we'll use another transformation, union(), to print out the number of lines that contained either error or warning. We show Python in Example 3-14, but the union()function is identical in all three languages. 
- Example 3-14. union() transformation in Python 
  1. errorsRDD = inputRDD.filter(lambda x: "error" in x)  
  2. warningRDD = inputRDD.filter(lambda x: "warning" in x)  
  3. badlinesRDD = errorsRDD.union(warningRDD)  
union() is a bit different than filter(), in that it operates on two RDDs instead of one. Transformations can actually operate on any number of input RDDs. 
Notes. 
A better way to accomplish the same result as in Example 3-14 would be simply filter the inputRDD once, looking for either error or warning.

Finally, as you derive new RDDs from each other using transformations, Spark keeps track of the set of dependencies between different RDDs, called the lineage graph. It uses this information to compute each RDD on demand and to recover lost data if part of a persistent RDD is lost. Below figure 3-1 shows a lineage graph for Example 3-14: 
 

Actions 
We've seen how to create RDDs from each other with transformations, but at some point, we'll want to actually do something with our dataset. Actions are the second type of RDD operations. They are the operations that return a final value to the driver program or write data to an external storage system. Actions force the evaluation of the transformations required for the RDD they were called on, since they need to actually produce output. 

Continuing the log example from the previous section, we might want to print out some information about the badLinesRDD. To do that, we'll use two actions, count(), which returns the count as a number, and take(), which collects a number of elements from the RDD, as show in below example: 
- Example 3-15. Python error count using actions 
  1. print "Input had " + badLinesRDD.count() + " concerning lines"  
  2. print "Here are 10 examples:"  
  3. for line in badLinesRDD.take(10):  
  4.     print line  
In this example, we used take() to retrieve a small number of elements in the RDD at the driver program. We then iterate over them locally to print out information at the driver. RDDs also have a collect() function to retrieve the entire RDD. This can be useful if your program filters RDDs down to a verry small size and you'd like to deal with it locally. Keep in mind that your entire dataset must fit in memory on a single machine to use collect() on it. 

In most cases RDDs can't just be collect()ed to the driver because they are too large. In these cases, it's common to write data out to a distributed storage system such as HDFS or Amazon S3. You can save the contents of an RDD using the saveAsTextFile() action, saveAsSequenceFile(), or any of a number of actions for various built-in formats. We will cover the different options for exporting data in Chapter 5. It is important to note that each time we call a new action, the entire RDD must be computed "from scratch". To avoid this inefficiency, users can persist intermediate results, as we will cover in "Persistence (Caching)" later. 

Lazy Evaluation 
As you read earlier, transformations on RDDs are lazily evaluated, meaning that Spark will not begin to execute until it sees an action. This can be somewhat counter-intuitive for new users, but may be familiar for those who have used functional languages such as Haskell or LINQ-like data processing frameworks. Lazy evaluation means that when we call a transformation on a RDD (for instance, calling map()), the operation is not immediate performed. Instead, Spark internally records metadata to indicate that this operation has been requested. Rather than thinking of an RDD as containing specific data, it is best to think of each RDD as consisting of instructions on how to compute the data that we build up through transformations. Loading data into an RDD is lazily evaluated in the same way transformations are. So, when we call sc.textFile(), the data is not loaded until it is necessary. As with transformations, the operations (in this case, reading the data) can occur multiple times. 
Notes. 
Although transformations are lazy, you can force Spark to execute them at any time by running an action, such as count(). This is an easy way to test out just part of your program.


Spark uses lazy evaluation to reduce the number of passes it has to take over our data by grouping operations together. In systems like Hadoop MapReduce, developers often have to spend a lot of time considering how to group together operations to minimize the number of MapReduce passes. In Spark, there is no substantial benefit to writing a single complex map instead of chaining together many simple operations. Thus, users are free to organize their program into smaller, more manageable operations. 

Passing Functions to Spark 
Most of Spark's transformations, and some of its actions, depend on passing in functions that are used by Spark to compute data. Each of the core languages has a slightly different mechanism for parsing functions to Spark. 

Python 
In Python, we have three options for passing functions into Spark. For shorter functions, we can pass in lambda expression, as we did in Example 3-2, and as Example 3-18 demonstrates. Alternatively, we can pass in top-level functions, or locally defined functions as below example: 
- Example 3-18, Passing functions in Python 
  1. word = rdd.filter(lambda s: "error" in s)  
  2. def containsError(s):  
  3.     return "error" in s  
  4. word = rdd.filter(containsError)  
One issue to watch out for when passing functions is inadvertently serializing the object containing the function. When you pass a function that is the member of an object, or contains references to fields in an object (e.g.,self.field), Spark sends the entire object to worker nodes, which can be much larger than the bit of information you need (see Example 3-19). Sometimes this can also cause your program to fail, if your class contains objects that Python can't figure out how to pickle
- Example 3-19. Passing a function with field field references (don't do this!) 
  1. class SearchFunctions(object):  
  2.     def __init__(self, query):  
  3.         self.query = query  
  4.     def isMatch(self, s):  
  5.         return self.query in s  
  6.     def getMatchesFunctionReference(self, rdd):  
  7.         # Problem: references all of "self" in "self.isMatch"  
  8.         return rdd.filter(self.isMatch)  
  9.     def getMatchesMemberReference(self, rdd):  
  10.         # Problem: references all of "self" in "self.query"  
  11.         return rdd.filter(lambda x: self.query in x)  
Instead, just extract the fields you need from your object into a local variable and pass that in, like we do in Example 3-20: 
- Example 3-20. Python function passing without field references 
  1. class WordFunctions(object):  
  2.     ...  
  3.     def getMatchesNoReference(self, rdd):  
  4.         # Safe: extract only the field we need into a local variable  
  5.         query = self.query  
  6.         return rdd.filter(lambda x: query in x)  
Notes. 
Both anonymous inner classes and lambda expression can reference any final variables in the method enclosing them, so you can pass these variables to Spark just as in Python and Scala.


For Scala/Java, please refer to Book:Learning Spark.

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...