2016年5月5日 星期四

[ Learn Spark ] Ch6. Advanced Spark Programming

Source From Here 
Introduction 
This chapter introduces a variety of advanced Spark programming features that we didn't get to cover in previous chapters. We introduce two types of shared variables: accumulators to aggregate information and broadcast variables to efficiently distribute large values. Building on our existing transformations on RDDs, we introduce batch operations for tasks with high setup costs, like querying a database. To expand the range of tool accessible to us, we cover Spark's methods for interacting with external programs such as scripts written in R. 

Throughout this chapter we build an example using ham radio operators' call logs as the input. These logs, at the minimum, include the call signs of the stations contacted. Call signs are assigned by country, and each country has its own range of call signs so we can look up the countries involved. Some call logs also include the physical location of the operators, which we can use to determine the distance involved. We included a sample log entry in below example. The book's sample repo includes a list of call signs to look up the call logs for and process the results. 
- Example 6-1. Sample call log entry in JSON, with some fields removed 
  1. {"address":"address here""band":"40m","callsign":"KK6JLK","city":"SUNNYVALE",  
  2. "contactlat":"37.384733","contactlong":"-122.032164",  
  3. "county":"Santa Clara","dxcc":"291","fullname":"MATTHEW McPherrin",  
  4. "id":57779,"mode":"FM","mylat":"37.751952821","mylong":"-122.4208688735",...}  
The first set of Spark features we'll look at are shared variables, which are a special type of variable you can use in Spark tasks. In our example we use Spark's shared variables to count nonfatal error conditions and distribute a large lookup table. When our task involves a large setup time, such as creating a database connection or random-number generator, it is useful to share this setup work across multiple data items. Using a remote call sign lookup database, we examine how to reuse setup work by operating on a per-partition basis. 

In addition to the languages directly supported by Spark, the system can call into programs written in other languages. This chapter introduces how to use Spark's language-agnostic pipe() method to interact with other programs through standard input and output. We will use the pipe() method to access an R library for computing the distance of a ham radio operator's contacts. Finally, similar to its tool for working with key/value pairs, Spark has methoods for working with numeric data. We demonstrate these methods by removing outliers from the distances computed with our ham raido call logs. 

Accumulators 
When we normally pass function to Spark, such as a map() function or a condition for filter(), they can use variables defined outside them in the driver program, but each task running on the cluster gets a new copy of each variable, and updates from these copies are not propagated back to the driver. Spark's shared variables, accumulators and broadcast variables, relax this restriction for two common types of communication patterns: aggregation of results and broadcasts. 

Our first type of shared variable, accumulators, provides a simple syntax for aggregating values from worker nodes back to the driver program. One of the most common uses of accumulators is to count events that occur during job execution for debugging purposes. For example, say that we are loading a list of all of the call signs for which we want to retrieve logs from a file, but we are also interested in how many lines of the input file were blank (perhaps we do not expect to see many such lines in valid input). Below example demonstrate this scenario: 
>>> file = sc.textFile('/root/datas/foo.txt')
>>> blankLines = sc.accumulator(0)
>>> def extractCallSigns(line):
... global blankLines
... if (line == ''):
... blankLines += 1
... return line.split(" ")
...
>>> callSigns = file.flatMap(extractCallSigns)
>>> print "Blank lines: %d" % blankLines.value
Blank lines: 0
>>> callSigns.collect()
[u'This', u'is', u'for', u'testing', u'', u'This', u'is', u'for', u'testing', u'again', u'', u'Test', u'again']
>>> print "Blank lines: %d" % blankLines.value
Blank lines: 2

In these examples, we create an Accumulator[Int] called blankLines, and then add 1 to it whenever we see a blank line in the input. After evaluating the transformation, we print the value of the counter. Note that we will see the right count only after we run the collect() action, because the transformation above it, flatMap(), is lazy, so the side-effect incrementing of the accumulator will happen only when the lazy flatMap() transformation is forced to occur by collect()action. Of course, it is possible to aggregate values from an entire RDD back to the driver program using actions like reuce(), but sometimes we need a simple way to aggregate values that, in the process of transforming an RDD, are generated at different scale or granularlity than that of the RDD itself. In the previous example, accumulators let us count errors as we load the data, without doing a separate filter() or reduce()

To summarize, accumulators work as follows: 
* We create them in the driver by calling the SparkContext.accumulator(initialValue) method, which produces an accumulator holding an initial value. The return type is an pyspark.accumulators.Accumulator[T] object, where T is the type ofinitialValue.

* Worker code in Spark closures can add to the accumulator with its += method (or add in Java)

* The driver program cal call the value property on the accumulator to access its value (or call value() and setValue() in Java).

Note that tasks on worker nodes cannot access the accumulator's value() - from the point of view of these tasks, accumulators are write-only variables. This allows accumulators to be implemented efficiently, without having to communicate every update. 

The type of counting shown here becomes especially handy when there are multiple values to keep track of or when the same value needs to increase at multiple places in the parallel program (for example, you might be counting calls to a JSON parsing library throughout your program). For instance, often we expect some percentages of our data to be corrupted or allow for the backend to fail some number of times. To prevent producing garbage output when there are too many errors, we can use a counter for valid records and a counter for invalid records. The value of our accumulators is available only in the driver program, so that is where we place our checks. 

Counting from our last example, we can now validate the call signs and write the output only if most of the input is valid. The ham radio call sign format is specified and verified with a regular expression shown in below example: 
- Example 6-5. Accumulator error count in Python 
  1. # Create Accumulators for validating call signs  
  2. validSignCount = sc.accumulator(0)  
  3. invalidSignCount = sc.accumulator(0)  
  4.   
  5. def validateSign(sign)  
  6.     global validSignCount, invalidSignCount  
  7.     if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z", sign):  
  8.         validSignCount += 1  
  9.         return True  
  10.     else:  
  11.         invalidSignCount += 1  
  12.         return False  
  13.   
  14. # Count the number of times we contacted each call sign  
  15. validSigns = callSigns.filter(validateSign)  
  16. contactCount = validSigns.map(lambda sign: (sign, 1)).reduceByKey(lambda (x, y): x + y)  
  17.   
  18. # Force evaluation so the counters are populated  
  19. contactCount.count()  
  20. if invalidSignCount.value < 0.1 * validSignCount.value:  
  21.     contactCount.saveAsTextFile(outputDir + "/contactCount")  
  22. else:  
  23.     print "Too many errors: %d, %d valid" % (invalidSignCount.value, validSignCount.value)  
Accumulators and Fault Tolerance 
Spark automatically deals with failed or slow machines by re-executing failed or slow tasks. For example, if the node running a partition of a map() operation crashes, Spark will rerun it on another node; and even if the node does not crash but is simply much slower than other nodes, Spark can preemptively launch a "speculative"copy of the task on another node, and take its result if that finishes. Even if no nodes fail, Spark may have to rerun a task to rebuild a cached value that falls out of memory. The net result is therefore that the same function may run multiple times on the same data depending on what happens on the cluster. 

How does this interact with accumulators? The end result is that for accumulators used in actions, Spark applies each task's update to each accumulator only once. Thus, if we want a reliable absolute value counter, regardless of failures or multiple evaluations, we must put it inside an action like foreach()

For accumulators used in RDD transformations instead of actions, this guarantee does not exist. An accumulator update within a transformation can occur more than once. One such case of a probably unintended multiple update occurs when a cached but infrequently used RDD is firstly evicted from the LRU cache and is then subsequently needed. This forces the RDD to be recalculated from its lineage, with the unintended side effect that calls to update an accumulator within the transformations in that lineage re sent again to the driver. Within transformations, accumulators should, consequently, be used only for debugging purpose. 

While future version of Spark may change this behavior to count the update only once, the current version (1.3.0) does have the multiple update behavior, so accumulators in transformations are recommended only for debugging purpose. 

Custom Accumulators 
So far we've seen how to use one of Spark's built-in accumulator types: integers(Accumulator[Int]) with addition. Out of the box, Spark supports accumulators of type Double, Long and Float. In addition to these, Spark also included an API to define custom accumulator types and custom aggregation operations (e.g., finding the maximum of the accumulated values instead of adding them). Custom accumulators need to extended AccumulatorParam, which is covered in the Spark API document. Beyond adding to a numeric value, we can use any operation for add, provided that operation is communicate and associative. For example, instead of adding to track the total we could keep track of the maximum value seen so far. 

Broadcast Variables 
Spark's second type of shared variable, broadcast variables, allows the program to efficiently send a large, read-only value to all the worker nodes for use in one or more Spark operations. They come in handy, for example, if your application needs to send a large, read-only lookup table to all the nodes, or even a large feature vector in a machine learning algorithm. 

Recall that Spark automatically sends all variables referenced in your closures to the worker nodes. While this is convenient, it can also be inefficient because (1) the default task launching mechanism is optimized for small task sizes, and (2) you might, in fact, use the same variable in multiple parallel operations, but Spark will send it separately for each operation. As an example, say that we wanted to write a Spark program that looks up countries by their call signs by prefix matching in an array. This is useful for ham radio call signs since each country gets its own prefix, although the prefixes are not uniform in length. If we wrote this naively in Spark, the code might look like below example: 
- Example 6-6. Country lookup in Python 
  1. # Look up the locations of the call signs on the  
  2. # RDD contactCounts. We load a list of call sign  
  3. # prefixes to country code to support this lookup.  
  4. signPrefixes = loadCallSignTable()  
  5.   
  6. def processSignCount(sign_count, signPrefixes):  
  7.     country = lookupCountry(sign_count[0], signPrefixes)  
  8.     country = sign_count[1]  
  9.     return (count, count)  
  10.   
  11. countryContactCounts = (contackCounts.map(processSignCount).reduceByKey((lambda x, y: x + y)))  
This program would run, but if we had a large table (say, with IP address instead of call signes), the signPrefixes could easily be several megabyes in size, making it expensive to send that Array from the master alongside each task. In addition, if we used the same signPrefixes object later (maybe we next ran the same code on file2.txt), it would be send again to each node. 

We can fix this by making signPrefixes a boradcast variable. A broadcast variable is simply an object of type spark.broadcast.Broadcast[T], which wraps a value of type T. We can access this value by calling value on the Broadcast object in our tasks. Thee value is sent to each node only once, using an efficient, BitTorrent-like communication mechanism. 

Using broadcast variables, our previous example look like in below example: 
- Example 6-7. Country lookup with Broadcast variables in Python 
  1. # Look up the locations of the call signs on the  
  2. # RDD contactCounts. We load a list of call sign  
  3. # prefixes to country code to support this lookup.  
  4. signPrefixes = sc.broadcast(loadCallSignTable())  
  5.   
  6. def processSignCount(sign_count, signPrefixes):  
  7.     country = lookupCountry(sign_count[0], signPrefixes)  
  8.     country = sign_count[1]  
  9.     return (count, count)  
  10.   
  11. countryContactCounts = (contackCounts.map(processSignCount).reduceByKey((lambda x, y: x + y)))  
  12. countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")  
As show in above example, the process of using broadcast variables is simple: 
1. Create a Broadcast[T] by calling SparkContext.broadcast on an object of type T. Any type works as long as it is also Serializable.
2. Access its value with the value property (or value() method in Java)
3. The variable will be sent to each node only once, and should be treated as read-only (updates will not be propagated to other nodes).

The easiest way to satisfy the read-only requirement is to broadcast a primitive value or a reference to an immutable object. In such cases, you won't be able to change the value of the broadcast variable except within the driver code. However, sometimes it can be more convenient or more efficient to broadcast a mutable object. If you do that, it is up to you to maintain the read-only condition. As we did with our call sign prefix table of Array[String], we must make sure that the code we run on our worker nodes does not try to do something like val theArray = broadcastArray.value; theArray(0) = newValue. When run in a worker node, that line will assign newValue to the first array element only in the copy of the array local to the worker node running the code; it will not change the contents of broadcaseArray.value on any of the other worker nodes. 

Optimizing Broadcasts 
When we are broadcasting large values, it is important to choose a data serialization format that is both fast and compact, because the time to send the value over the network can quickly become a bottleneck if it takes a long time to either serialize a value or to send the serialized value over the network. In particular, Java Serialization, the default serialization library used in Spark's Scala and Java APIs, ca be very inefficient out of the box for anything except arrays of primitive types. You can optimize serialization by selecting a different serialization library using the spark.serializer (Spark Configuration - Compression & Serialization. Chapter 8 will describe how to use Kyro, a faster serialization library.) property, or by implementing your own serialization routines for your data type (e.g., using the java.io.Externalizable interface for Java Serialization, or using the reduce() method to define custom serialization for Python'spickle library

Working on a Per-Partition Basis 
Working with data on a per-partition basis allows us to avoid redoing setup work for each data item. Operations like opening a database connection or creating a random-number generator are example of setup steps that we wish to avoid doing for each element. Spark has per-partition versions of map and each to help reduce the cost of these operations by letting you run code only once for each partition of an RDD. Going back to our example will call signs, there is an online database of ham radio call signs we can query for a public list of their logged contacts. By using partition-based operations, we can share a connection pool to this database to avoid setting up many connections, and reuse our JSON parser. As below example show, we use the mapPartitions() function which gives us an iterator of the elements in each partition of the input RDD and expects us to return an iterator of our results. 
- Example 6-10. Shared connection pool in Python 
  1. def processCallsigns(signs):  
  2.     """ Lookup call signs using a connection pool"""  
  3.     # Create a connection pool  
  4.     http = urllib3.PoolManager()  
  5.     # the URL associated with each call sign record  
  6.     urls = map(lambda x: "http://73s.com/qsos/%s.json" % x, signs)  
  7.     # create the requests (non-blocking)   
  8.     requests = map(lambda x: (x, http.request('GET', x)), urls)  
  9.     # fetch the results  
  10.     result = map(lambda x: (x[0], json.loads(x[1].data)), requests)  
  11.     # remove any empty results and return  
  12.     return filter(lambda x: x[1] is not None, result)  
  13.   
  14. def fetchCallSigns(input):  
  15.     """ Fetch call signs """  
  16.     return input.mapPartitions(lambda callSigns: processCallSigns(callSigns))  
  17.   
  18. contactsContactList = fetchCallSigns(validSigns)  
When operating on a per-partition basis, Spark gives our function an Iterator of the elements in that partition. To return values, we return an Iterable. In addition to mapPartitions(), Spark has a number of other per-partition operations, listed in below table: 
mapPartitions(f, preservesPartitioning=False): Return a new RDD by applying a function to each partition of this RDD. 
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(iterator): yield sum(iterator)
>>> rdd.mapPartitions(f).collect()
[3, 7]

mapPartitionsWithIndex(f, preservesPartitioning=False)Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. 
>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
>>> def f(splitIndex, iterator): print("Split index=%d" % splitIndex); yield splitIndex
>>> rdd.mapPartitionsWithIndex(f).collect()
Split index=3
Split index=1
Split index=2
Split index=0
[0, 1, 2, 3]

foreachPartition(f): Applies a function to each partition of this RDD. 
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
  1. >>> def f(iterator):  
  2. ...     sum = 0  
  3. ...     for x in iterator:  
  4. ...             sum = sum + x  
  5. ...             print("x = %d; sum=%d" % (x, sum))  
  6. ...  
>>> rdd.foreachPartition(f)
x = 1; sum=1
x = 2; sum=3
x = 3; sum=3
x = 4; sum=7

In addition to avoiding setup work, we can sometimes use mapPartitions() to avoid object creation overhead. Sometimes we need to make an object for aggregating the result that is of a different type. Thinking back to Ch3, where we computed the average, one of the ways we did this way by converting our RDD of numbers to an RDD of tuples so we could track the number of elements proceeded in our reduce step. Instead of doing this for each element, we can instead create the tuple once per partition, as show in below example: 
- Example 6-13. Average without mapPartitions() in Python 
  1. def combineCtrs(c1, c2):  
  2.     return (c1[0] + c2[0], c1[1], c2[1])  
  3.   
  4. def basicAvg(nums):  
  5.     """ Compute the average """  
  6.     nums.map(lambda num: (num, 1)).reduce(combineCtrs)  
- Example 6-14. Average with mapPartitions() in Python 
  1. def partitionCtr(nums):  
  2.     """ Compute sumCounter for partition """  
  3.     sumCount = [00]  
  4.     for num in nums:  
  5.         sumCount[0] += num  
  6.         sumCount[1] += 1  
  7.     return [sumCount]  
  8.   
  9. def fastAvg(nums):  
  10.     """ Compute the avg """  
  11.     sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs)  
  12.     return sumCount[0] / float(sumCount[1])  
Piping to External Programs 
With three languages bindings to choose from out of the box, you may have all the options you need for writing Spark applications. However, if none of Scala, Java, or Python does what you need, then Spark provides a general mechanism to pipe data to programs in other languages, like R scripts. Spark provides a pipe() method on RDDs, Spark's pipe() lets us write parts of jobs using any language we want as long as it can read and write to Unix standard streams. With pipe(), you can write a transformation of an RDD that reads each RDD element from standard input as a String, manipulates that String however you like, and then writes the result(s) as Strings to standard output. This interface and programming model is restrictive and limited, but sometimes it's just what you need to do something like make use of a native code function within a map or filter operation. 

Most likely, you'd want to pipe an RDD's content through some external program or script because you've already got complicated software built and tested that you'd like to reuse with Spark. A lot of data scientists have code in R, and we can interact with R program using pipe(). In below example, we will use python script wc_map.py as external program to calculate the word count for demonstration. 
- wc_map.py 
  1. #!/usr/bin/env python  
  2. import sys  
  3.   
  4. case_sensitive = True  
  5. if len(sys.argv) > 1:  
  6.     if int(sys.argv[1]) == 0:  
  7.         case_sensitive = False  
  8.   
  9. # Input comes from STDIN  
  10. for line in sys.stdin:  
  11.     # Remove leading and trailing whitespace  
  12.     line = line.strip()  
  13.     # split the line into words  
  14.     words = line.split()  
  15.     # increase counters  
  16.     for word in words:  
  17.         # write the results to STDOUT (standard output);  
  18.         # what we output here will be the input for the  
  19.         # Reduce step, i.e. the input for reducer.py  
  20.         #  
  21.         # tab-delimited; the trivial word count is 1  
  22.         if case_sensitive:  
  23.             print "%s\t%s" % (word ,1)  
  24.         else:  
  25.             print "%s\t%s" % (word.lower(), 1)  
Then we can use this python script to transform the text into (word, 1) tuple for further operations as below example: 
- Example 6-16. Driver program using pipe() to calculate word count in Python 
>>> rdd = sc.parallelize(['This is for test', 'testing is again for future', 'for fun only'])
>>> rdd.collect()
['This is for test', 'testing is again for future', 'for fun only']
>>> mapScript = '/root/datas/wc_map.py'
>>> mapScriptName = 'wc_map.py'
>>> sc.addFile(mapScript)
>>> from pyspark import SparkFiles
>>> wc_tup = rdd.pipe(SparkFiles.get(mapScriptName))
>>> wc_tup.collect()
[u'This\t1', u'is\t1', u'for\t1', u'test\t1', u'testing\t1', u'is\t1', u'again\t1', u'for\t1', u'future\t1', u'for\t1', u'fun\t1', u'only\t1']
>>> def f(s):
... t = s.split('\t')
... return (t[0], t[1])
...
>>> wc = wc_tup.map(f)
>>> wc.collect()
[(u'This', u'1'), (u'is', u'1'), (u'for', u'1'), (u'test', u'1'), (u'testing', u'1'), (u'is', u'1'), (u'again', u'1'), (u'for', u'1'), (u'future', u'1'), (u'for', u'1'), (u'fun', u'1'), (u'only', u'1')]
>>> result = wc.reduceByKey(lambda x,y: int(x)+int(y)) // Reduce step in MapReduce
>>> result.collect()
[(u'This', u'1'), (u'test', u'1'), (u'fun', u'1'), (u'again', u'1'), (u'is', 2), (u'only', u'1'), (u'for', 3), (u'future', u'1'), (u'testing', u'1')]

With SparkContext.addFile(path), we can build up a list of files for each of the worker nodes to download with a Spark job. These files can come from the driver's local filesystem (as we did in above example), from HDFS or other Hadoop-supported filesystems, or from an HTTP, HTTPS or FTP URL. When an action is run in the job, the files will be downloaded by each of the nodes. The files can then be found on the worker nodes in SparkFiles.getRootDirectory, or located with SparkFiles.get(filename). Of course, this is only one way to make sure the pipe() can find a script on each worker node. You could use another remote copying tool to place the script file in a knowable location on each node. 
Note. 
All the files added with SparkContext.addFile(path) are stored in the same directory, so it's important to use unique names.

Once the script is available, the pipe() method on RDDs makes it easy to pipe the elements of an RDD through the script. If you want to pass argument to the script, either of below approach would do the job, although the first is preferred: 
* rdd.pipe(Seq(SparkFiles.get("wc_map.py"), ","))
* rdd.pipe(SparkFiles.get("wc_map.py") + " ,")


Inf the first option, we are passing the command invocation as a sequence of positional arguments (with the command itself at the zero-offset position); in the second, we're passing it as a single command string that Spark will then break down into positional arguments. We can also specify shell environment variables with pipe() if we desire. Simply pass in a map of environment variables to values as the second parameter to pipe(), and Spark will set those values.

You should now at least have an understanding of how to use pipe() to process the elements of an RDD through an external command, and of how to distribute such command scripts to the cluster in a way that the worker nodes can find them. 

Numeric RDD Operations 
Spark provides several descriptive statistics operations on RDDs containing numeric data. These are in addition to the more complex statistical and machine learning methods we will describe later in Ch11. Spark's numeric operations are implemented with a streaming algorithm that allows for building up our model one element at a time. The descriptive statistics are all computed in a single pass over the data and returned as a StatsCounter object by calling stats(). Below table lists the methods available on the StatsCounter object: 


Inf you want to compute only one of these statistics, you can also call the corresponding method directly on an RDD - for example, rdd.mean() or rdd.sum(). In below example, we will use summary statistics to remove some outliers from our data. Since we will be going over the same RDD twice (once to compute the summary statistics and once to remove the outliers), we may wish to cache the RDD. Going back to our call log example, we can remove the contact points from our call log that are too far away. 
- Example 6-19. Removing outliers in Python 
>>> rdd = sc.parallelize([1, 2, 2, 2, 5, 6, 4, 2, 3, 100, 50, 7, 8])
>>> stats = rdd.stats()
>>> stats.stdev()
27.568526776725903
>>> stats.mean()
14.76923076923077
>>> import math
>>> removeOutlier = rdd.filter(lambda x: math.fabs(x - stats.mean()) < 3 * stats.stdev())
>>> removeOutlier.collect()
[1, 2, 2, 2, 5, 6, 4, 2, 3, 50, 7, 8] // 100 is an outlier and being removed

Conclusion 
In this chapter, you have been introduced to some of the more advanced Spark programming features that you can use to make your programs more efficient or expressive. Subsequent chapters cover deploying and tuning Spark applications, as well as built-in libraries for SQL and streaming and machine learning. We'll also start seeing more complex and more complete sample applications that make use of much of the functionality described so far, and that should help guide and inspire your own usage of Spark. 

Supplement 
Writing an Hadoop MapReduce Program in Python

1 則留言:

  1. Really nice blog post.provided a helpful information.I hope that you will post more updates like this Big Data Hadoop Online Training Bangalore

    回覆刪除

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...