The final Spark feature we will discuss in this chapter is how to control dataset's partitioning across nodes. In a distributed program, communication is very expensive, so laying out data to minimize network traffic can greatly improve performance. Much lik how a single-node program needs to choose the right data structure for a collection of records, Spark program can choose to control their RDDs' partitioning to reduce communication. Partitioning will not be helpful in all applications - for example, if a given RDD is scanned only once, there is no point in partitioning it in advance. It is useful only when a dataset is reused multiple times in key-oriented operations such as joins. We will give some examples shortly.
Spark's partitioning is available on all RDDs of key/value pairs, and causes the system to group elements based on a function of each key. Although Spark doesn't give explicit control of which worker node each key goes to (partly because the system is designed to work even if specific nodes fail), it lets the program ensure that a set of keys will appear together on some node. For example, you might choose to hash-partition an RDD into 100 partitions so that keys that have the same hash value modulo 100 appear on the same node. Or you might range-partition the RDD into sorted range of keys so that elements with keys in the same range appear on the same node.
As a simple example, consider an application that keeps a large table of user information in memory - say, an RDD of (UserID, UserInfo) pairs, where UserInfo contains a list of topics the user is subscribed to. The application periodically combines this table with a smaller file representing events that happened in the past five minutes - say, a table of (UserID, LinkInfo) pairs for users who have clicked a link on a website in those five minutes. For example, we may wish to count how many users visited a link that was not to one of their subscribed topics. We can perform this combination with Spark's join() operation, which can be used to group the UserInfo and LinkInfo pairs for eachUserID by key. Our application would look like below example:
- Example 4-22. SCALA Simple application
- // Initialization code; we load the user info from a Hadoop SequenceFile on HDFS.
- // This distributes elements of userData by the HDFS block where they are found,
- // and doesn't provide Spark with anyway of knowing in which partition a
- // particular UserID is located.
- val sc = new SparkContext()
- val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
- // Function called periodically to process a logfile of events in the past 5 minutes;
- // we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs.
- def processNewLogs(logFileName: String)
- {
- val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
- val joined = userData.join(events) // RDD of (UserID, (UserInfo, LinkeInfo)) pairs
- val offTopicVisits = joined.filter{
- case(userId, (userInfo, linkInfo)) => // Expand the tuple into its components
- !userInfo.topics.contains(linkInfo.topic)
- }.count()
- println("Number of visits to non-subscribed topics: " + offTopicVisits)
- }
Fixing this is simple: just use the partitionBy() transformation on userData to hash-partition it at the start of program. We do this by passing a org.apache.spark.HashPartitioner object to partitionBy, as show in below example:
- Example 4-23. Scala custom partitioner
- val sc = new SparkContext(...)
- val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
- .partitionBy(new HashPartitioner(100)) // Create 100 partitions
- .persist()
Note that partitionBy() is a transformation, so it always returns a new RDD - it does not change the original RDD in place. RDDs can never be modified once created. Therefore it is important to persist and save as userData the result ofpartitionBy(), not the original sequenceFile(). Also the 100 passed to partitionBy() represents the number of partitions, which will control how many parallel tasks perform further operations on the RDD (e.g., joins); in general, make this at least as large as the number of cores in your cluster.
In fact, many other Spark operations automatically result in an RDD with known partitioning information, and many operations other than join() will take advantage of this information. For example, sortByKey() and groupByKey() will result in range-partitioned and hash-partitioned RDDs, respectively. On the other hand, operations like map() cause the new RDD to forget the parent's partitioning information, because such operations could theoretically modify the key of each record. The next few sections describe how to determine how an RDD is partitioned, and exactly how partitioning affects the various Spark operations.
Determining an RDD's Partitioner
In Scala and Java, you can determine how an RDD is partitioned using its partitioner property (or partitioner() method in Java). This returns a scala.Option object, which is a Scala class for a container that may or may not contain one item. You can call isDefined() on the Option to check whether it has a value, and get() to get this value. If present, the value will be a spark.Partitioner object. This is essentially a function telling the RDD which partition each key goes into; we'll talk more about this later.
The partitioner property is a great way to test in the Spark shell how different Spark operations affect partitioning, and to check that the operations you want to do in your program will yield the right result (see Example 4-24).
- Example 4-24. Determining partitioner of an RDD
In this short session, we created an RDD of (Int, Int) pairs, which initially have no partitioning information (an Option with value None). We then created a second RDD by hash-partitioning the first. If we actually wanted to use partitioned in further operations, then we should have appended persist() to the third line of input, in which partitioned is defined. This is for the same reason that we needed persist() for userData in the previous example: withoutpersist(), subsequent RDD actions will evaluate the entire lineage of partitoned, which will cause pairs to be hash-partitioned over and over.
Operations That Benefit from Partitioning
Many of Spark's operations involve shuffling data by key across the network. All of these will benefit from partitioning. As of Spark 1.0, the operations that benefit from partitioning are cogroup(), groupWith(), join(), leftOuterJoin(),rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), and lookup().
For operations that act on a single RDD, such as reduceByKey(), running on a pre-partitioned RDD will cause all the values for each key to be computed locally on a single machine, requiring only the final, locally reduced value to be sent from each worker node back to the master. For binary operations, such as cogroup() and join(), pre-partitioning will cause at least one of the RDDs (the one with the known partitioner) to not be shuffled. If both RDDs have the same partitioner, and if they are cached on the same machines (e.g., one was created using mapValues() on the other, which preserves keys and partitioning) or if one of them has not yet been computed, then no shuffling across the network will occur.
Operations That Accept Partitioning
Spark knows internally how each of its operations affects partitioning, and automatically sets the partioniner on RDDs created by operations that partitioning the data. For example, suppose you called join() to join two RDDs; because the elements with the same key have been hashed to the same machine, Spark knows that the result is hash-partitioned, and operations like reduceByKey() on the join result are going to be significantly faster. The flipside, however, is that for transformations that cannot guaranteed to produce a known partitioning, the output RDD will not have a partitioner set. For example, if you call map() on a hash-partitioned RDD of key/value pairs, the function passed to map() can in theory change the key of each element, so the result will not have a partitioner. Spark doesn't analyze your functions to check whether they retain the key. Instead, it provides two other operations,mapValues() and flatMapValues(), which guarantee that each tuple's key remains the same.
All that said, here are all the operations that result in a partitioner being set on the output RDD: cogroup(), groupByKey(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), combineByKey(),partitionBy(), sort(), mapValues() (if the parent RDD has a partitioner), flatMapValues() (if parent RDD has a partitioner), and filter() (if parent has a partitioner). All other operations will produce a result with no partitioner.
Finally, for binary operations, which partitioner is set on the output depends on the parent RDDs' partitioners. By default, it is a hash partitioner, with the number of partitions set to the level of parallelism of the operation. However, if one of the parents has a partitioner set, it will be that partitioner; and if both parents have a partitioner set, it will be the partitioner of the first parent.
Example: PageRank
As an example of a more involved algorithm that can benefit from RDD partitioning, we consider PageRank. The PageRank algorithm, named after Google's Larry Page, aims to assign a measure of importance (a "rank") to each document in a set based on how many documents have links to it. It can be used to rank web pages, of course, but also scientific articles, or influential users in a social network.
PageRank is an iterative algorithms that performs many joins, so it is good use case for RDD partitioning. The algorithm maintain two datasets: one of (pageID, linkLlist) elements containing the list of neighbors of each page, and one of (pageID, rank) elements containing the current rank for each page. It proceeds as follows:
The last two steps repeat for several iterations, during which algorithm will converge to the correct PageRank value for each page. In practice, it's typical to run about 10 iterations. Below example gives the code to implement PageRank:
- Example 4-25. Scala PageRank
- // Assume that our neighbor list was saved as a Spark objectFile
- val links = sc.objectFile[(String, Seq[String])]("links").partitionBy(new HashPartitioner(100)).persist()
- // Initialize each page's rank to 1.0; since we use mapValues, the resulting RDD will have t he same partitioner as links
- var ranks = links.mapValues(v => 1.0)
- // Run 10 iterations of PageRank
- for (i <- nbsp="" span="">0 until 10) ->
- {
- val contributions = links.join(ranks).flatMap
- {
- case (pageId, (pageLinks, rank)) =>
- pageLinks.map(dest => (dest, rank / pageLinks.size()))
- }
- ranks = contributions.reduceByKey((x, y) => x + y).mapValues( v=> 0.15 + 0.85*v )
- }
- // Write out the final ranks
- ranks.saveAsTextFile("ranks")
Although the code itself is simple, the example does several things to ensure that the RDDs are parallelized in an efficient way, and to minimize communication:
Custom Partitioners
While Spark's HashPartitioner and RangePartitioner are well suited to many use cases, Spark also allow you to turn how an RDD is partitioned by providing a custom Partitioner object. This can help you further reduce communication by taking advantage of domain-specific knowledge.
For example, suppose we wanted to run the PageRank algorithm in the previous section on a set of web pages. Here each page'ID (the key in our RDD) will be its URL. Using a simple hash function to do the partitioning, pages with similar URLs might be hashed to completely different nodes. However, we know that web pages within the same domain tend to link to each other a lot. Because PageRank needs to send a messages from each page to each of its neighbors on each iteration, it helps to group these pages into the same partition. We can do this with a custom Partitioner that looks at just the domain name instead of the whole URL.
To implement a custom partitioner, you need to subclass the org.apache.spark.Partitioner class and implement three methods:
* numPartitions: Int
* getPartition(key: Any): Int
* equals()
One gotcha is that you relay on Java's hashCode() method in your algorithm, it can return negative number. You need to be careful to ensure that getPartition() always returns a non-negative result. Below example shows how we would write the domain-name-based partitioner sketched previously, when hashes only the domain of each URL:
- Example 4-26. Scala custom partitioner
- class DomainNamePartitioner(numParts: Int) extends Partitioner
- {
- override def numPartitions: Int = numParts
- override def getPartition(key: Any): Int = {
- val domain = new java.net.URL(key.toString).getHost()
- val code = (domain.hashCode % numPartitions)
- if (code < 0)
- {
- code + numPartitions // Make it non-negative
- }
- else
- {
- code
- }
- }
- // Java equals method to let Spark compare our Partitioner objects
- override def equals(other: Any): Boolean = other match {
- case dnp: DomainNamePartitioner =>
- dnp.numPartitions == numPartitions
- case _ =>
- false
- }
- }
- Example 4-27. Python custom partitioner
- import urlparse
- def hash_domain(url):
- return hash(urlparse.urlparse(url).netloc)
- rdd.partitionBy(20, hash_domain) # Create 20 partitons
Below is the demonstration of PageRank code line by line in Python (first iteration):
Supplement
* Ch4. Working with Key/Value Pairs - Part1
* Ch4. Working with Key/Value Pairs - Part2
* Ch4. Working with Key/Value Pairs - Part3
沒有留言:
張貼留言