2016年4月9日 星期六

[ Learn Spark ] Ch4. Working with Key/Value Pairs - Part3

Data Partitioning (Advanced) 
The final Spark feature we will discuss in this chapter is how to control dataset's partitioning across nodes. In a distributed program, communication is very expensive, so laying out data to minimize network traffic can greatly improve performance. Much lik how a single-node program needs to choose the right data structure for a collection of records, Spark program can choose to control their RDDs' partitioning to reduce communication. Partitioning will not be helpful in all applications - for example, if a given RDD is scanned only once, there is no point in partitioning it in advance. It is useful only when a dataset is reused multiple times in key-oriented operations such as joins. We will give some examples shortly. 

Spark's partitioning is available on all RDDs of key/value pairs, and causes the system to group elements based on a function of each key. Although Spark doesn't give explicit control of which worker node each key goes to (partly because the system is designed to work even if specific nodes fail), it lets the program ensure that a set of keys will appear together on some node. For example, you might choose to hash-partition an RDD into 100 partitions so that keys that have the same hash value modulo 100 appear on the same node. Or you might range-partition the RDD into sorted range of keys so that elements with keys in the same range appear on the same node. 

As a simple example, consider an application that keeps a large table of user information in memory - say, an RDD of (UserIDUserInfo) pairs, where UserInfo contains a list of topics the user is subscribed to. The application periodically combines this table with a smaller file representing events that happened in the past five minutes - say, a table of (UserIDLinkInfo) pairs for users who have clicked a link on a website in those five minutes. For example, we may wish to count how many users visited a link that was not to one of their subscribed topics. We can perform this combination with Spark's join() operation, which can be used to group the UserInfo and LinkInfo pairs for eachUserID by key. Our application would look like below example: 
- Example 4-22. SCALA Simple application 
  1. // Initialization code; we load the user info from a Hadoop SequenceFile on HDFS.  
  2. // This distributes elements of userData by the HDFS block where they are found,  
  3. // and doesn't provide Spark with anyway of knowing in which partition a   
  4. // particular UserID is located.  
  5. val sc = new SparkContext()  
  6. val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()  
  7.   
  8. // Function called periodically to process a logfile of events in the past 5 minutes;  
  9. // we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs.  
  10. def processNewLogs(logFileName: String)  
  11. {  
  12.     val events = sc.sequenceFile[UserID, LinkInfo](logFileName)  
  13.     val joined = userData.join(events)  // RDD of (UserID, (UserInfo, LinkeInfo)) pairs  
  14.     val offTopicVisits = joined.filter{  
  15.         case(userId, (userInfo, linkInfo))  =>  // Expand the tuple into its components  
  16.             !userInfo.topics.contains(linkInfo.topic)  
  17.         }.count()  
  18.     println("Number of visits to non-subscribed topics: " + offTopicVisits)  
  19. }  
This code will run fine as it is, but it will be inefficient. This is because the join() operation, called each time processNewLogs() is invoked, doesn't know anything about how the keys are partitioned in the datasets. By default, this operation will hash all the keys of both datasets, sending elements with the same key hash across the network the the same machine, and join together the elements with the same key on that machine (see Figure 4-4). Because we expect the userData table to be much larger than the small log of events seen every five minutes, this wastes a lot of work: the userData table is hashed and shuffled across the network on every call, even through it doesn't change. 
 

Fixing this is simple: just use the partitionBy() transformation on userData to hash-partition it at the start of program. We do this by passing a org.apache.spark.HashPartitioner object to partitionBy, as show in below example: 
- Example 4-23. Scala custom partitioner 
  1. val sc = new SparkContext(...)  
  2. val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")  
  3.                           .partitionBy(new HashPartitioner(100))   // Create 100 partitions  
  4.                           .persist()  
The processNewLogs() method can remain unchanged: the events RDD is local to processNewLogs(), and is used only once within this method, so there is no advantage in specifying a partitioner for events. Because we calledpartitionBy() when building userData, Spark will know that it is hash-partitioned, and calls to join() on it will take advantage of this information. In particular, when we call userData.join(events), Spark will shuffle only the events RDD, sending events with each particular UserID to the machine that contains the corresponding hash partition of userData (see Figure 4-5). The result is that a lot less data is communicated over the network, and the program runs significantly faster. 
 

Note that partitionBy() is a transformation, so it always returns a new RDD - it does not change the original RDD in place. RDDs can never be modified once created. Therefore it is important to persist and save as userData the result ofpartitionBy(), not the original sequenceFile()Also the 100 passed to partitionBy() represents the number of partitions, which will control how many parallel tasks perform further operations on the RDD (e.g., joins); in general, make this at least as large as the number of cores in your cluster. 

In fact, many other Spark operations automatically result in an RDD with known partitioning information, and many operations other than join() will take advantage of this information. For example, sortByKey() and groupByKey() will result in range-partitioned and hash-partitioned RDDs, respectively. On the other hand, operations like map() cause the new RDD to forget the parent's partitioning information, because such operations could theoretically modify the key of each record. The next few sections describe how to determine how an RDD is partitioned, and exactly how partitioning affects the various Spark operations. 

Determining an RDD's Partitioner 
In Scala and Java, you can determine how an RDD is partitioned using its partitioner property (or partitioner() method in Java). This returns a scala.Option object, which is a Scala class for a container that may or may not contain one item. You can call isDefined() on the Option to check whether it has a value, and get() to get this value. If present, the value will be a spark.Partitioner object. This is essentially a function telling the RDD which partition each key goes into; we'll talk more about this later. 

The partitioner property is a great way to test in the Spark shell how different Spark operations affect partitioning, and to check that the operations you want to do in your program will yield the right result (see Example 4-24). 
- Example 4-24. Determining partitioner of an RDD 
scala> val pairs = sc.parallelize(List((1,1), (2,2), (3,3)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at :27

scala> pairs.partitioner
res0: Option[org.apache.spark.Partitioner] = None

scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner

scala> val partitioned = pairs.partitionBy(new HashPartitioner(2))
partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at :30

scala> partitioned.partitioner
res1: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)

In this short session, we created an RDD of (Int, Int) pairs, which initially have no partitioning information (an Option with value None). We then created a second RDD by hash-partitioning the first. If we actually wanted to use partitioned in further operations, then we should have appended persist() to the third line of input, in which partitioned is defined. This is for the same reason that we needed persist() for userData in the previous example: withoutpersist(), subsequent RDD actions will evaluate the entire lineage of partitoned, which will cause pairs to be hash-partitioned over and over. 

Operations That Benefit from Partitioning 
Many of Spark's operations involve shuffling data by key across the network. All of these will benefit from partitioning. As of Spark 1.0, the operations that benefit from partitioning are cogroup()groupWith()join()leftOuterJoin(),rightOuterJoin()groupByKey()reduceByKey()combineByKey(), and lookup()

For operations that act on a single RDD, such as reduceByKey(), running on a pre-partitioned RDD will cause all the values for each key to be computed locally on a single machine, requiring only the final, locally reduced value to be sent from each worker node back to the master. For binary operations, such as cogroup() and join(), pre-partitioning will cause at least one of the RDDs (the one with the known partitioner) to not be shuffled. If both RDDs have the same partitioner, and if they are cached on the same machines (e.g., one was created using mapValues() on the other, which preserves keys and partitioning) or if one of them has not yet been computed, then no shuffling across the network will occur. 

Operations That Accept Partitioning 
Spark knows internally how each of its operations affects partitioning, and automatically sets the partioniner on RDDs created by operations that partitioning the data. For example, suppose you called join() to join two RDDs; because the elements with the same key have been hashed to the same machine, Spark knows that the result is hash-partitioned, and operations like reduceByKey() on the join result are going to be significantly faster. The flipside, however, is that for transformations that cannot guaranteed to produce a known partitioning, the output RDD will not have a partitioner set. For example, if you call map() on a hash-partitioned RDD of key/value pairs, the function passed to map() can in theory change the key of each element, so the result will not have a partitioner. Spark doesn't analyze your functions to check whether they retain the key. Instead, it provides two other operations,mapValues() and flatMapValues(), which guarantee that each tuple's key remains the same. 

All that said, here are all the operations that result in a partitioner being set on the output RDD: cogroup()groupByKey()join()leftOuterJoin()rightOuterJoin()groupByKey()reduceByKey()combineByKey()combineByKey(),partitionBy()sort()mapValues() (if the parent RDD has a partitioner), flatMapValues() (if parent RDD has a partitioner), and filter() (if parent has a partitioner). All other operations will produce a result with no partitioner

Finally, for binary operations, which partitioner is set on the output depends on the parent RDDs' partitioners. By default, it is a hash partitioner, with the number of partitions set to the level of parallelism of the operation. However, if one of the parents has a partitioner set, it will be that partitioner; and if both parents have a partitioner set, it will be the partitioner of the first parent. 

Example: PageRank 
As an example of a more involved algorithm that can benefit from RDD partitioning, we consider PageRank. The PageRank algorithm, named after Google's Larry Page, aims to assign a measure of importance (a "rank") to each document in a set based on how many documents have links to it. It can be used to rank web pages, of course, but also scientific articles, or influential users in a social network. 

PageRank is an iterative algorithms that performs many joins, so it is good use case for RDD partitioning. The algorithm maintain two datasets: one of (pageID, linkLlist) elements containing the list of neighbors of each page, and one of (pageID, rank) elements containing the current rank for each page. It proceeds as follows: 
1. Initialize each page's rank to 1.0.
2. On each iteration, have page p send a contribution of rank(p)/numNeighbors(p) to its neighbors (the page it has links to).
3. Set each page's rank to 0.15 + 0.85 * contributionsReceived

The last two steps repeat for several iterations, during which algorithm will converge to the correct PageRank value for each page. In practice, it's typical to run about 10 iterations. Below example gives the code to implement PageRank:
- Example 4-25. Scala PageRank 
  1. // Assume that our neighbor list was saved as a Spark objectFile  
  2. val links = sc.objectFile[(String, Seq[String])]("links").partitionBy(new HashPartitioner(100)).persist()  
  3.   
  4. // Initialize each page's rank to 1.0; since we use mapValues, the resulting RDD will have t he same partitioner as links  
  5. var ranks = links.mapValues(v => 1.0)  
  6.   
  7. // Run 10 iterations of PageRank  
  8. for (i <- nbsp="" span="">0 until 10)  
  9. {  
  10.     val contributions = links.join(ranks).flatMap  
  11.         {  
  12.             case (pageId, (pageLinks, rank)) =>  
  13.                 pageLinks.map(dest => (dest, rank / pageLinks.size()))  
  14.         }  
  15.     ranks = contributions.reduceByKey((x, y) => x + y).mapValues( v=> 0.15 + 0.85*v )  
  16. }  
  17.   
  18. // Write out the final ranks  
  19. ranks.saveAsTextFile("ranks")  
That's it! The algorithm starts with a ranks RDD initialized at 1.0 for each element, and keeps updating the ranks variable on each iteration. The body of PageRank is pretty simple to express in Spark: it first does a join() between the current ranks RDD and the static links one, in order to obtain the link list and rank for each page ID together, then uses this in a flatMap to create "contribution" values to send to each of the page's neighbors. We then add up these values by page ID (i.e., by the page receiving the contribution) and set that page's rank to 0.15 + 0.85 x contributionsReceived

Although the code itself is simple, the example does several things to ensure that the RDDs are parallelized in an efficient way, and to minimize communication: 
1. Notice that the links RDD is joined against ranks on each iteration. Since links is a static dataset, we partition it at the start with partitionBy(), so that it does not need to be shuffled across the network. In practice, the links RDD is also likely to be much larger in terms of bytes than ranks, since it contains a list of neighbors for each page ID instead of just a Double, so this optimization saves considerable network traffic over a simple implementation of PageRank (e.g., in plain MapReduce.)

2. For the same reason, we call persist() on links to keep it in RAM across iterations.

3. When we first create ranks, we use mapValues() instead of map() to preserve the partitioning of the parent RDD(links), so that our first join against it is cheap.

4. In the loop body, we follow our reduceByKey() with mapValues(); because the result of reduceByKey() is already hash-partitioned, this will make it more efficient to join the mapped result against links on the next iteration.


Custom Partitioners 
While Spark's HashPartitioner and RangePartitioner are well suited to many use cases, Spark also allow you to turn how an RDD is partitioned by providing a custom Partitioner object. This can help you further reduce communication by taking advantage of domain-specific knowledge. 

For example, suppose we wanted to run the PageRank algorithm in the previous section on a set of web pages. Here each page'ID (the key in our RDD) will be its URL. Using a simple hash function to do the partitioning, pages with similar URLs might be hashed to completely different nodes. However, we know that web pages within the same domain tend to link to each other a lot. Because PageRank needs to send a messages from each page to each of its neighbors on each iteration, it helps to group these pages into the same partition. We can do this with a custom Partitioner that looks at just the domain name instead of the whole URL. 

To implement a custom partitioner, you need to subclass the org.apache.spark.Partitioner class and implement three methods: 
* numPartitions: Int 
Which returns the number of partitions you will create.

* getPartition(key: Any): Int 
Which returns the partition ID (0 to numPartitions - 1) for a given key

* equals() 
The standard Java equality method. This is important to implement because Spark will need to test your Partitioner object against other instances of itself when it decides whether two of your RDDs are partitioned the same way!

One gotcha is that you relay on Java's hashCode() method in your algorithm, it can return negative number. You need to be careful to ensure that getPartition() always returns a non-negative result. Below example shows how we would write the domain-name-based partitioner sketched previously, when hashes only the domain of each URL: 
- Example 4-26. Scala custom partitioner 
  1. class DomainNamePartitioner(numParts: Int) extends Partitioner   
  2. {  
  3.     override def numPartitions: Int = numParts  
  4.     override def getPartition(key: Any): Int = {  
  5.         val domain = new java.net.URL(key.toString).getHost()  
  6.         val code = (domain.hashCode % numPartitions)  
  7.         if (code < 0)  
  8.         {  
  9.             code + numPartitions // Make it non-negative  
  10.         }  
  11.         else  
  12.         {  
  13.             code  
  14.         }  
  15.     }  
  16.     // Java equals method to let Spark compare our Partitioner objects  
  17.     override def equals(other: Any): Boolean = other match {  
  18.         case dnp: DomainNamePartitioner =>  
  19.             dnp.numPartitions == numPartitions  
  20.         case _ =>  
  21.             false      
  22.     }  
  23. }  
Note that in the equals() method, we used Scala's pattern matching opearator(match) to test whether other is a DomainNamePartitioner, and cast it if so; this is the same as using instanceOf() in Java. Using a custom Partitioner is easy: just pass it to the partitionBy() method. Many of the shuffle-based methods in Spark, such as join() and groupByKey(), can also take an optional Partitioner object to control the partitioning of the output. In Python, you do not extend a Partitioner class, but instead pass a hash function as an additional argument to RDD.partitionBy(). Below example demonstrates: 
- Example 4-27. Python custom partitioner 
  1. import urlparse  
  2. def hash_domain(url):  
  3.     return hash(urlparse.urlparse(url).netloc)  
  4.   
  5. rdd.partitionBy(20, hash_domain)  # Create 20 partitons  
Note that the hash function you pass will be compared by identity to that of other RDDs. If you want to partition multiple RDDs with the same partitioner, pass the same function object (e.g., a global function) instead of creating a new lambda for each one! 

Below is the demonstration of PageRank code line by line in Python (first iteration): 
>>> links = sc.parallelize([('A', []), ('B', ['C']), ('C', ['B']), ('D', ['A', 'B']), ('E', ['B', 'D', 'F']), ('F', ['B', 'E'])])
>>> links.collect()
[('A', []), ('B', ['C']), ('C', ['B']), ('D', ['A', 'B']), ('E', ['B', 'D', 'F']), ('F', ['B', 'E'])]
>>> ranks = links.mapValues(lambda t: 1.0)
>>> ranks.collect()
[('A', 1.0), ('B', 1.0), ('C', 1.0), ('D', 1.0), ('E', 1.0), ('F', 1.0)]
>>> contributions_t1 = links.join(ranks)
>>> contributions_t1.collect()
[('A', ([], 1.0)), ('C', (['B'], 1.0)), ('B', (['C'], 1.0)), ('E', (['B', 'D', 'F'], 1.0)), ('D', (['A', 'B'], 1.0)), ('F', (['B', 'E'], 1.0))]
>>> contributions_t2 = contributions_t1.flatMap(lambda t: map(lambda dest: (dest, t[1][1]/len(t[1][0])),t[1][0]))
>>> contributions_t2.collect()
[('B', 1.0), ('C', 1.0), ('B', 0.3333333333333333), ('D', 0.3333333333333333), ('F', 0.3333333333333333), ('A', 0.5), ('B', 0.5), ('B', 0.5), ('E', 0.5)]
>>> ranks_1 = contributions_t2.reduceByKey(lambda x, y: x + y) // Sum up the contribution 
>>> ranks_1.collect()
[('A', 0.5), ('C', 1.0), ('B', 2.333333333333333), ('E', 0.5), ('D', 0.3333333333333333), ('F', 0.3333333333333333)]
>>> ranks = ranks_1.mapValues(lambda v: 0.15 + 0.85 * v) // Update the ranks with formula: 0.15 + 0.85 x contributionsReceived
>>> ranks.collect()
[('A', 0.575), ('C', 1.0), ('B', 2.133333333333333), ('E', 0.575), ('D', 0.43333333333333335), ('F', 0.43333333333333335)]


Supplement 
Ch4. Working with Key/Value Pairs - Part1 
Ch4. Working with Key/Value Pairs - Part2 
Ch4. Working with Key/Value Pairs - Part3

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...