2016年3月24日 星期四

[ Learn Spark ] Ch3. Programming with RDDs - Part2

Common Transformations and Actions 
In this chapter, we tour the most common transformations and actions in Spark. Additional operations are available on RDDs containing certain types of data - for example, statistical functions on RDDs of numbers, and key/value operations such as aggregating data by key on RDDs of key/value pairs. We cover converting between RDD types and these special operations in later sections. 

Basic RDDs 
We will begin by describing what transformations and actions we can perform on all RDDs regardless of the data. 

Element-Wise transformations 
The two most common transformations you will likely be using are map() and filter() (see Figure 3-2). The map() transformation takes in a function and applies it to each element in the RDD with the result of the function being the new value of each element in the resulting RDD. The filter() transformation takes in a function and returns an RDD that only has elements that pass the filter() function. 
 


We can use map() to do any number of things, from fetching the website associated with each URL in our collection to just squaring the numbers. It is useful to note that map()'s return type doesn't have to be the same as its input type, so if we had an RDD String and our map() function were to parse the strings sand return a Double, our input RDD type would be RDD[String] and resulting RDD type would be RDD[Double]. Let's look at a basic example of map() that squares all of the numbers in an RDD: 
- Example 3-26. Python squaring the values in an RDD 

  1. nums = sc.parallelize([1234])  
  2. squared = nums.map(lambda x: x * x).collect()  
  3. for num in squared:  
  4.     print("%i" % (num))  
Sometimes we want to produce multiple output elements for each input element. The operation to do this is called flatMap(). As with map(), the function we provide to flatMap() is called individually for each element in our input RDD. Instead of returning a single element, we return a iterator with our return values. Rather than producing an RDD of iterators, we get back a RDD that consists of the elements from all of the iterators. A simple usage of flatMap() is splitting up an input string into words, as shown in below example: 
- Example 3-29. flatMap() in Python, splitting lines into words 
>>> lines = sc.parallelize(["hello world", "hi"])
>>> words = lines.flatMap(lambda line: line.split(" "))
>>> words.first()
'hello'
>>> words.collect() // Show all element in RDD
['hello', 'world', 'hi']

Below figure illustrate the difference between flatMap() and map(). You can think of flatMap() as "flattening" the iterators returned to it, so that instead of ending up with an RDD of lists, we have an RDD of the elements in those lists. 
 


Pseudo set operations 
RDDs support many of the operations of mathematical sets, such as union and intersection, even when the RDDs themselves are not properly sets. Four operations are shown in below figure. It's important to note that all of those operations require that the RDDs being operated on are of the same type
 


The set property most frequently missing from our RDDs is the uniqueness of elements, as we often have duplicates. If we want only unique elements we can use the RDD.distinct() transformation to provide a new RDD with only distinct items. Note that this function is expensive, however, as it requires shuffling all the data over the network to ensure that we receive only one copy of each element. Shuffling, and how to avoid it, is discussed in more detail in Chapter 4. 

The simplest set operation is union(), which gives back an RDD consisting of the data from both sources. This can be useful in a number of use cases, such as processing logfiles from many sources. Unlike the mathematical union(), if there are duplicates in the input RDDs, the result of Spark's union() will contain duplicates. 

Spark also provides in intersection() method, which returns only elements in both RDDs. This function also removes all duplicates (including duplicates from a single RDD) while running. While intersection() and union() are two similar concepts, the performance of intersection() is much worse since it requires a shuffle over the network to identify common elements. 

Sometimes we need to remove some data from consideration. The subtract() function takes in another RDD and returns and RDD that has only values present in the first RDD and not the second RDD. Like intersection(), it performs a shuffle. 

We can also compute a Cartesian product between two RDDs, as shown in Figure 3-5. The cartesian() transformation returns all possible pairs of (a,b) where a is in the source e RDD and b is in the other RDD. The Cartesian product can be useful when we wish to consider the similarity between all possible pairs, such as computing every user's expected interest in each offer. We can also take the Cartesian product of an RDD with itself, which can be useful for tasks like user similarity. Be warned, however, that the Cartesian product is very expensive for large RDDs
 


Actions 
The most common action on basic RDDs you will likely use is reduce(), which takes a function that operate on two elements of the type in your RDD and returns a new element of the same type. A simple example of such a function is +, which we can use to sum our RDD. With this function, we can easily sum the elements of our RDD, count the number of elements, and perform other types of aggregations: 
- Example 3-32. reduce() in Python 

  1. sum = rdd.reduce(lambda x, y: x + y)  
Similar to reduce() is fold(), which also takes a function with the same signature as needed for reduce(), but in addition takes a "zero value" to be used for the initial call on each partition. The zero value you provide should be the identity element for your operation; that is, applying it multiple times with your function should not change the value (e.g., 0 for +, 1 for *, or an empty list for concatenation). 
Notes. 
You can minimize object creation in fold() by modifying and returning the first of the two parameters in place. However, you should not modify the second parameter.

Both fold() and reduce() require that the return type of our result be the same type as that of the elements in the RDD we are operating over. This works well for operations like sum, but sometimes we want to return a different type. For example, when computing a running average, we need to keep track of both the count so far and the number of elements, which requires us to return a pair. We could work around this by first using map() where we transform every element into the element and the number 1, which is the type we want to return, so that the reduce() function can work on pairs. 

The aggregate() function frees us from the constraint of having the return be the same type as the RDD we are working on. With this function, like fold(), we supply an initial zero value of the type we want to return. We then supply a function to combine the elements from our RDD with the accumulator. Finally, we need to supply a second function to merge two accumulators, given that each node accumulates its own results locally. We can use aggregate() to compute the average of an RDD, avoiding a map() before the fold(), as shown in below example: 
- Example 3-35. aggregate() in Python 
  1. sumCount = nums.aggregate((00),  
  2.                           lambda acc, value: (acc[0] + value), (acc[1] + 1)),  
  3.                           lambda acc1, acc2: (acc1[0] + acc2[0]), (acc1[1] + acc2[1])))  
  4. return sumCount[0] / float(sumCount[1])  
Simple usage as below: 
>>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
>>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
>>> sc.parallelize([1,2,3,4]).aggregate((0, 0), seqOp, combOp)
(10, 4) // Sum of all element is 10, total 4 element


Some actions on RDDs return some or all of the data to our driver program in the form of a regular collection or value. The simplest and most common operations that returns data to our driver program is collect(), which returns the entire RDD's contents. This function is commonly used in unit tests where the entire contents of the RDD are expected to fit in memory, as that makes it easy to compare the value of our RDD with our expected result. However, this function suffers from the restriction that all of your data must fit on a single machine, as it all needs to be copied to the driver. 

take(n) returns n elements from the RDD and attempts to minimize the number of partitions it accesses, so it may represent a biased collection. It's important to note that these operations do not return the elements in the order you might expect. These operations are useful for unit tests and quick debugging, but may introduce bottlenecks when you're dealing with large amounts of data. 

If there is an ordering defined on our data, we can also extract the top elements from an RDD using top(). This function will use the default ordering on the data, but we can supply our own comparison function to extract the top elements. Below is an simple usage example: 
>>> sc.parallelize([10, 4, 2, 12, 3]).top(1) // Desc order
[12]
>>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
[6, 5]
>>> sc.parallelize([10, 4, 2, 12, 3]).top(3, key=str) // Translate number into string and compare
[4, 3, 2]

Sometimes we need a sample of our data in our driver example. The takeSample() function allows use to take a sample of our data either with or without replacement: 
>>> rdd = sc.parallelize(range(0, 10))
>>> rdd.takeSample(True, 5, 1) // Sampling with replacement
[2, 3, 4, 8, 7]
>>> len(rdd.takeSample(False, 15, 3)) // Without replacement, we only have 10 element!
10 

Sometimes it is useful to perform an action on all of the elements in the RDD, but without returning any result to the driver program. A good example of this would be posting JSON to a webserver or inserting records into a database. In either case, the foreach() action lets us perform computation on each element in the RDD without bringing it back locally. 

The further standard operations on a basic RDD all behavior pretty much exactly as you would image from their name. count() returns a count of the elements, and countByValue() returns a map of each unique value to its count. For more APIs, please refer to the RDD documentation

Converting Between RDD Types 
Some functions are available only on certain types of RDDs, such as mean() and variance() on numeric RDDs or join() on key/value pair RDDs. We will cover these special functions for numeric data in Chapter 6 and pair RDDs in Chapter 4. In Scala and Java, these modules aren't defined on the standard RDD class, so to access this additional functionality, we have to make sure we get the correct specialized class. 

Java 
In Java the conversion between the specialized types of RDDs is a bit more explicit. In particular, there are several class called JavaDoubleRDD and JavaPairRDD for RDDs of these types, with extra methods for these types of data. This has the benefit of giving you a greater understanding of what exactly is going on, but can be a bit more cumbersome. To construct RDDs of these special types, instead of always using the Function class, we will need to use specialized versions. If we want to create a JavaDoubleRDD from an RDD of type T, rather than using Function<T, Double>, we use DoubleFunction<T>. Table 3-5 shows the specialized functions and their uses. 



We can modify Example 3-28, where we squared an RDD of numbers to produce a JavaDoubleRDD, as show in below example. This gives us access to the additional JavaDoubleRDD specific functions like mean() and variance()
- Example 3-38. Creating JavaDoubleRDD in Java 

  1. JavaDoubleRDD result = rdd.mapToDouble{  
  2.     new DoubleFunction(){  
  3.         public double call(Integer x) {  
  4.             return (double) x * x;  
  5.         }  
  6.     }  
  7. };  
  8. System.out.println(result.mean());  
Python 
The Python API is structured differently than Java and Scala. In Python, all of the functions are implemented on the base RDD class but will fail at runtime if the type of data in the RDD is incorrect. 

Persistence (Caching) 
As discussed earlier, Spark RDD are lazily evaluated, and sometimes we may wish to use the same RDD multiple times. If we do this naively, Spark will recompute the RDD and all of its dependencies each we call an action on the RDD. This can be especially expensive for iterative algorithms, which look at the data may times. Another trivial example would be doing a count and then writing out the same RDD, as show in below example: 
- Example 3-39. Double execution in Scala 
  1. var result = input.map(x => x * x)  
  2. println(result.count())  
  3. println(result.collect().mkString(","))  
To avoid computing an RDD multiple times, we can ask Spark to persist the data. When we ask Spark to persist an RDD, the nodes that compute the RDD store their partitions. If a node that has data persisted on it fails, Spark will recompute the lost partitions of the data when needed. We can also replicate our data on multiple nodes if we want to be able to handle node failure without slowdown. 

Spark has many levels of persistance to choose from based on what our goals are, as you can see in Table 3-6. In Scala (Example 3-40) and Java, the default persist() will store the data in the JVM heap as unserialized objects. In Python, we always serialize the data that persist stores, so the default is instead stored in the JVM heap as pickled objects. When we write data out to disk or off-heap storage, that data is also always serialized. 


Notes. 

Off-heap caching is experimental and uses Tachyon . If you are interested in off-heap caching with Spark, take a look at the Running Spark on Tachyon guide

- Example 3-40. persist() in Scala 
  1. var result = input.map(x => x * x)  
  2. result.persist(StorageLevel.DISK_ONLY)  
  3. println(result.count())  
  4. println(result.collect().mkString(","))  
Notice that we called persist() on the RDD before the first action. The persist() call on its own doesn't force evaluation. If you attempt to cache too much data to fit in memory, Spark will automatically evict old partitions using a Lest Recently Used (LRU) cache policy. For the memory-only storage levels, it will recompute these partitions the next time they are accessed, while for the memory-and-disk ones, it will write them out to disk. In either case, this means that you don't have to worry about your job breaking if you ask Spark to cache too much data. However, caching unnecessary data can lead to eviction of useful data and more recomputation time. Finally, RDDs come with a method calledunpersist() that lets you manually remove them from the cache.

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...