2014年12月11日 星期四

[ In Action ] Ch4. Writing basic MapReduce programs (Part2)

Streaming in Hadoop (P80)
We have been using Java to write all our Hadoop programs. Hadoop supports other languages via a generic API called Streaming . In practice, Streaming is most useful for writing simple, short MapReduce programs that are more rapidly developed in a scripting language that can take advantage of non-Java libraries.

Hadoop Streaming interacts with programs using the Unix streaming paradigm. Inputs come in through STDIN and outputs go to STDOUTData has to be text based and each line is considered a record. Note that this is exactly how many Licensed to Unix commands work, and Hadoop Streaming enables those commands to be used as mappers and reducers. If you’re familiar with using Unix commands, such as wccut, or uniq for data processing, you can apply them to large data sets using Hadoop Streaming.

The overall data flow in Hadoop Streaming is like a pipe where data streams through the mapper, the output of which is sorted and streamed through the reducer. In pseudo-code using Unix’s command line notation, it’s:
cat [input_file] | [mapper] | sort | [reducer] > [output_file]

The following examples will illustrate how to use Streaming with Unix commands.

Streaming with Unix commands
In the first example, let’s get a list of cited patents in cite75_99.txt.
$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -input IA/cite_100000.txt \
-output output \
-mapper 'cut -f 2 -d ,' \
-reducer 'uniq'

That’s it! It’s a one-line command. Let’s see what each part of the command does.

The first part and the -input and the -output arguments specify that we’re running a Streaming program with the corresponding input and output file/directory. We see that for the mapper we use the Unix cut command to extract the second column, where columns are separated by commas. In the citation data set this column is the patent number of a cited patent. These patent numbers are then sorted and passed to the reducer. The uniq command at the reducer will remove all duplicates in the sorted data. The output of this command is:
"CITED"
1
10000
100000
1000006
...
999977
999978
999983

After getting the list of cited patents, we may want to know how many are there. We can use Streaming to quickly get a count via Unix command wc –l.
$ hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \
-D mapreduce.job.reduces=0 \
-input output \
-output output_a \
-mapper 'wc -l'

Here we use wc –l as the mapper to count the number of records in each split. Hadoop Streaming (since version 0.19.0) supports the GenericOptionsParser. The-D argument is used for specifying configuration properties. We want the mapper to directly output the record count without any reducer, so we setmapreduce.job.reduces (mapred-default.xml) to 0 and don’t specify the -reducer option at all.

Streaming with scripts
We can use any executable script that processes a line-oriented data stream from STDIN and outputs to STDOUT with Hadoop Streaming. For example, the Python script in listing 4.4 randomly samples data from STDIN. For each line, we choose a random integer between 1 and 100 and check against the user-given argument (sys.argv[1]). The comparison determines whether to pass that line on to the output or ignore it. You can use the script in Unix to uniformly sample a line-oriented data file, for example:
cat input.txt | RandomSample.py 10 > sampled_output.txt

The preceding command calls the Python script with an argument of 10; sampled_ output.txt will have (approximately) 10 percent of the records in input.txt. We can in fact specify any integer between 1 and 100 to get the corresponding percentage of data in the output.
- Listing 4.4 RandomSample.py: a Python script printing random lines from STDIN
  1. #!//usr/bin/python  
  2. import sys, random  
  3.   
  4. for line in sys.stdin:  
  5.     if (random.randint(1,100) <= int(sys.argv[1])):  
  6.         print line.strip()  
We can apply the same script in Hadoop to get a smaller sample of a data set. A sampled data set is often useful for development purposes, as you can run your Hadoop program on the sampled data in standalone or pseudo-distributed mode to quickly debug and iterate. Also, when you’re looking for some “descriptive” information about your data, the speed and convenience in processing a smaller data set generally outweigh any loss of precision. Finding data clusters is one example of such descriptive information. Optimized implementations of a variety of clustering algorithms are readily available in R, MATLAB, and other packages. It makes a lot more sense to sample down the data and apply some standard software package, instead of trying to process all data using some distributed clustering algorithms in Hadoop.

Running RandomSample.py using Streaming is like running Unix commands using Streaming, the difference being that Unix commands are already available on all nodes in the cluster, whereas RandomSample.py is not. Hadoop Streaming supports a -file option to package your executable file as part of the job submission. Our command to execute RandomSample.py is:
- runRandomSample.sh
  1. #!/bin/sh  
  2. hadoop fs -rm -r output  
  3. hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \  
  4. -D mapred.reduce.tasks=1 \  
  5. -input IA/cite_100000.txt \  
  6. -output output \  
  7. -mapper 'RandomSample.py 10' \  
  8. -file RandomSample.py  
In specifying the mapper to be 'RandomSample.py 10' we’re sampling at 10 percent. Note that we’ve set the number of reducers (mapred.reduce.tasks) to 1. As we haven’t specified any particular reducer, it will use the default IdentityReducer. As its name implies, IdentityReducer passes its input straight to output. In this case we can set the number of reducers to any non-zero value to get an exact number of output files. Alternatively, we can set the number of reducers to 0, and let the number of output files be the number of mappers. This is probably not ideal for the sampling task as each mapper’s output is only a small fraction of the input, and we may end up with a number of small files. We can easily correct that later using the HDFS shell command getmerge or other file manipulations to arrive at the right number of output files. The approach to use is more or less a personal preference.

The random sampling scripts don’t require any custom reducer, but you can’t always write a Streaming program like that. As you’ll use Streaming quite often in practice, let’s see another exercise. This time we create a custom reducer.

Suppose we’re interested in finding the most number of claims in a single patent. In the patent description data set, the number of claims for a given patent is in the ninth column. Our task is to find the maximum value in the ninth column of the patent description data.

Under Streaming, each mapper sees the entire stream of data, and it’s the mapper that takes on the responsibility of breaking the stream into (line-orientedrecords. In the standard Java model, the framework itself breaks input data into records, and gives the map() method only one record at a time. The Streaming model makes it easy to keep state information across records in a split, which we take advantage of in computing the maximum. The standard Java model, too, can keep track of state across records in a split, but it’s more involved. We cover that in the next chapter.

In creating a Hadoop program for computing maximum, we take advantage of the distributive property of maximum. Given a data set divided into many splits, the global maximum is the maximum over the maxima of the splits. That sounded like a mouthful, but a simple example will make it clear. If we have four records X1, X2, X3, and X4, and they’re divided into two splits (X1, X2) and (X3, X4), we can find the maximum over all four records by looking at the maximum of each split, or:
max(X1,X2,X3,X4) = max(max(X1,X2), max(X3,X4))

Our strategy is to have mapper calculate the maximum over individual split. Each mapper will output a single value at the end. We have a single reducer that looks at all those values and outputs the global maximum. Listing 4.6 depicts the Python script to compute the maximum over split.
- Listing 4.6 AttributeMax.py: Python script to find maximum value of an attribute

Given the parsimonious output of the mapper, we can use the default IdentityReducer to record the (sorted) output of the mappers.
- runAttributeMax.sh (1)
  1. #!/bin/sh  
  2. hadoop fs -rm -r output  
  3. hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \  
  4. -D mapred.reduce.tasks=1 \  
  5. -input IA/patn_100000.txt \  
  6. -output output \  
  7. -mapper 'AttributeMax.py 8' \  
  8. -file AttributeMax.py  
The mapper is 'AttributeMax.py 8'. It outputs the maximum of the ninth column in a split. The single reducer collects all the mapper outputs. Given seven mappers, the final output of the above command is this:
0
260
306
348
394
706
868

We see that our mapper is doing the right thing. We can use a reducer that outputs the maximum over the values outputted by the mappers. We have an interesting situation here, due to the distributive property of maximum, where we can also use AttributeMax.py as the reducer. Only now the reducer is trying to find the maximum in the “first” column.
- runAttributeMax.sh (2)
  1. #!/bin/sh  
  2. hadoop fs -rm -r output  
  3. hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \  
  4. -D mapred.reduce.tasks=1 \  
  5. -input IA/patn_100000.txt \  
  6. -output output \  
  7. -mapper 'AttributeMax.py 8' \  
  8. -reducer 'AttributeMax.py 0' \  
  9. -file AttributeMax.py  
The output of the above command should be a one-line file, and you’ll find the maximum number of claims in a patent to be 868.

Streaming with key/value pairs
At this point you may wonder what happened to the key/value pair way of encoding records. Our discussion on Streaming so far talks about each record as an atomic unit rather than as composed of a key and a value. The truth is that Streaming works on key/value pairs just like the standard Java MapReduce model. By default, Streaming uses the tab character to separate the key from the value in a record. When there’s no tab character, the entire record is considered the key and the value is empty text. For our data sets, which have no tab character, this provides the illusion that we’re processing each individual record as a whole unit. Furthermore, even if the records do have tab characters in them, the Streaming API will only shuffle and sort the records in a different order. As long as our mapper and reducer work in a record-oriented way, we can maintain the record-oriented illusion.

Working with key/value pairs allows us to take advantage of the key-based shuffling and sorting to create interesting data analyses. Let’s make this exercise more interesting by computing the average rather than finding the maximum. First, let’s examine how key/value pairs work in the Streaming API for each step of the MapReduce data flow.
1. As we’ve seen, the mapper under Streaming reads a split through STDIN and extracts each line as a record. Your mapper can choose to interpret each input record as a key/value pair or a line of text.

2. The Streaming API will interpret each line of your mapper’s output as a key/ value pair separated by tab. Similar to the standard MapReduce model, we apply the partitioner to the key to find the right reducer to shuffle the record to. All key/value pairs with the same key will end up at the same reducer.

3. At each reducer, key/value pairs are sorted according to the key by the Streaming API. Recall that in the Java model, all key/value pairs of the same key are grouped together into one key and a list of values. This group is then presented to the reduce() method. Under the Streaming API your reducer is responsible for performing the grouping. This is not too bad as the key/value pairs are already sorted by key. All records of the same key are in one contiguous chunk. Your reducer will read one line at a time from STDIN and will keep track of the new keys.

4. For all practical purposes, the output (STDOUT) of your reducer is written to a file directly. Technically a no-op step is taken before the file write. In this step the Streaming API breaks each line of the reducer’s output by the tab character and feeds the key/value pair to the default TextOutputFormat, which by default re-inserts the tab character before writing the result to a file. Without tab characters in the reducer’s output it will show the same no-op behavior. You can reconfigure the default behavior to do something different, but it makes sense to leave it as a no-op and push the processing into your reducer.

To understand the data flow better, we write a Streaming program to compute the average number of claims for each country. The mapper will extract the country and the claims count for each patent and package them as a key/value pair. In accord with the default Streaming convention, the mapper outputs this key/value pair with a tab character to separate them. The Streaming API will pick up the key and the shuffling will guarantee that all claim counts of a country will end up at the same reducer. The Python code in listing 4.7, for each record, the mapper extracts the country (fields[4][1:-1]) as key and the claims count (fields[8]) as value. An extra concern with our data set is that missing values do exist. We’ve added a conditional statement to skip over records with missing claim counts.
- Listing 4.7 AverageByAttributeMapper.py: output country and claim count of patents
  1. #!/usr/bin/python  
  2. import sys  
  3.   
  4. for line in sys.stdin:  
  5.     fields = line.split(",")  
  6.     if (fields[8] and fields[8].isdigit()):  
  7.         print fields[4][1:-1] + "\t" + fields[8]  
Before writing the reducer, let’s run the mapper in two situations: without any reducer, and with the default IdentityReducer. It’s a useful approach now for learning as we can see exactly what’s being outputted by the mapper (by using no reducer) and what’s being inputted into the reducer (by usingIdentityReducer). You’ll find this handy later when debugging your MapReduce program. You can at least check if the mapper is outputting the proper data and if the proper data is being sent to the reducer. First let’s run the mapper without any reducer.
- runAverageByAttributeMapper.sh
  1. #!/bin/sh  
  2. hadoop fs -rm -r output  
  3. hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \  
  4. -D mapreduce.job.reduces=0 \  
  5. -input IA/patn_100000.txt \  
  6. -output output \  
  7. -mapper 'AverageByAttributeMapper.py' \  
  8. -file AverageByAttributeMapper.py  
The output should consist of lines where a country code is followed by a tab followed by a numeric count. The order of the output records is not sorted by (the new) key. In fact, it’s in the same order as the order of the input records, although that’s not obvious from looking at the output.

The more interesting case is to use the IdentityReducer with a non-zero number of reducers. We see how the shuffled and sorted records are presented to the reducer. To keep it simple let’s try a single reducer by setting -D mapreduce.job.reduces=1 and see the first 32 records.


Under the Streaming API, the reducer will see these text data in STDIN. We have to code our reducer to recover the key/value pairs by breaking each line at the tab character. Sorting has “grouped” together records of the same key. As you read each line from STDIN, you’ll be responsible for keeping track of the boundary between records of different keys. Note that although the keys are sorted, the values don’t follow any particular order. Finally, the reducer must perform its stated computation, which in this case is calculating the average value across a key. Listing 4.8 gives the complete reducer in Python.
- Listing 4.8 AverageByAttributeReducer.py
  1. #!/usr/bin/python  
  2. import sys  
  3.   
  4. (last_key, sum, count) = (None, 0.00)  
  5. for line in sys.stdin:  
  6.     (key, val) = line.split("\t")  
  7.     if last_key and last_key != key:  
  8.         print last_key + "\t" + str(sum / count)  
  9.         (sum, count) = (0.00)  
  10.     last_key = key  
  11.     sum += float(val)  
  12.     count += 1  
  13. print last_key + "\t" + str(sum / count)  
The program keeps a running sum and count for each key. When it detects a new key in the input stream or the end of the file, it computes the average for the previous key and sends it to STDOUT. After running the entire MapReduce job, we can easily check the correctness of the first few results.
- runAverageByAttribute.sh
  1. #!/bin/sh  
  2. hadoop fs -rm -r output  
  3. hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \  
  4. -D mapreduce.job.reduces=1 \  
  5. -input IA/patn_100000.txt \  
  6. -output output \  
  7. -mapper 'AverageByAttributeMapper.py' \  
  8. -reducer 'AverageByAttributeReducer.py' \  
  9. -file AverageByAttributeMapper.py \  
  10. -file AverageByAttributeReducer.py  
Run the MapReduce with below steps:
$ ./runAverageByAttribute.sh # Run Streaming MapReduce
$ hadoop fs -ls output # Check output
...
... output/part-00000

$ hadoop fs -cat output/part-00000 # Check result
AR 5.1
AT 8.66666666667
AU 8.56
BB 6.0
...

Streaming with the Aggregate package
Hadoop includes a library package called Aggregate that simplifies obtaining aggregate statistics of a data set. This package can simplify the writing of Java statistics collectors, especially when used with Streaming, which is the focus of this section.

The Aggregate package under Streaming functions as a reducer that computes aggregate statistics. You only have to provide a mapper that processes records and sends out a specially formatted output. Each line of the mapper’s output looks like:
function:key\tvalue

The output string starts with the name of a value aggregator function (from the set of predefined functions available in the Aggregate package). A colon and a tab-separated key/value pair follows. The Aggregate reducer applies the function to the set of values for each key. For example, if the function is LongValueSum, then the output is the sum of values for each key. (As the function name implies, each value is treated as a Java long type.) If the function is LongValueMax, then the output is the maximum value for each key. You can see the list of aggregator functions supported in the Aggregate package in table 4.3.


Let’s go through an exercise using the Aggregate package to see how easy it is. We want to count the number of patents granted each year. We can approach this problem in a way similar to the word counting example we saw in chapter 1. For each record, our mapper will output the grant year as the key and a “1” as the value. The reducer will sum up all the values (“1”s) to arrive at a count. Only now we’re using Streaming with the Aggregate package. Our result will be the simple mapper shown in listing 4.9.
- Listing 4.9 AttributeCount.py
  1. #!/usr/bin/python  
  2. import sys  
  3. index = int(sys.argv[1])  
  4. for line in sys.stdin:  
  5.     fields = line.split(",")  
  6.     print "LongValueSum:" + fields[index] + "\t" + "1"  
AttributeCount.py works for any CSV-formatted input file. The user only has to specify the column index to count the number of records for each attribute in that column. The print statement has the main “action” of this short program. It tells the Aggregate package to sum up all the values (of 1) for each key, defined as the user-specified column (index). To count the number of patents granted each year, we run this Streaming program with the Aggregate package, telling the mapper to use the second column (index = 1) of the input file as the attribute of interest.
- runAttributeCount.sh
  1. #!/bin/sh  
  2. hadoop fs -rm -r output  
  3. hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \  
  4. -input IA/patn_100000.txt \  
  5. -output output \  
  6. -mapper 'AttributeCount.py 1' \  
  7. -reducer aggregate \  
  8. -file AttributeCount.py  
You’ll find most of the options of running the Streaming program familiar. The main thing to point out is that we’ve specified the reducer to be 'aggregate'. This is the signal to the Streaming API that we’re using the Aggregate package. The output of the MapReduce job (after sorting) is:
"GYEAR" 1
1963
45679
1964
47375
1965
62857
...
1996
109645
1997
111983
1998
147519
1999
153486

As shown in figure 4.3, we can plot the data to visualize it better. You’ll see that it has a mostly steady upward trend.


Looking at the list of functions in the Aggregate package in table 4.3, you’ll find that most of them are combinations of maximum, minimum, and sum for atomic data type. (For some reason DoubleValueMax and DoubleValueMin aren’t supported. They would be trivial modifications of LongValueMax andLongValueMin and an added advantage.UniqValueCount and ValueHistogram are slightly different and we look at some examples of how to use them.

UniqValueCount gives the number of unique values for each key. For example, we may want to know whether more countries are participating in the U.S. patent system over time. We can examine this by looking at the number of countries with patents granted each year. We use a straightforward wrapper ofUniqValueCount in listing 4.10 and apply it to the year and country columns of apat63_99.txt (column index of 1 and 4, respectively).
- Listing 4.10 UniqueCount.py: a wrapper around the UniqValueCount function
  1. #!/usr/bin/env python  
  2. import sys  
  3. index1 = int(sys.argv[1])  
  4. index2 = int(sys.argv[2])  
  5. for line in sys.stdin:  
  6.     fields = line.split(",")  
  7.     print "UniqValueCount:" + fields[index1] + "\t" + fields[index2]  
- runUniqueCountx.sh
  1. #!/bin/sh  
  2. hadoop fs -rm -r output  
  3. hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \  
  4. -input IA/patn_100000.txt \  
  5. -output output \  
  6. -mapper 'UniqueCount.py 1 4' \  
  7. -reducer aggregate \  
  8. -file UniqueCount.py  
In the output we get one record for each year. Plotting it gives us figure 4.4. We can see that the increasing number of patents granted from 1960 to 1990 (from figure 4.3) didn’t come from more countries (figure 4.4). The same number of countries had filed more.


The aggregate function ValueHistogramis the most ambitious function in the Aggregate package. For each key, it outputs the following:
1. The number of unique values
2. The minimum count
3. The median count
4. The maximum count
5. The average count
6. The standard deviation

In its most general form, it expects the output of the mapper to have the form
ValueHistogram:key\tvalue\tcount

We specify the function ValueHistogramis followed by a colon, followed by a tab-separated key, value, and count triplet. The Aggregate reducer outputs the six statistics above for each key. Note that for everything except the first statistics (number of unique values) the counts are summed over each key/value pair. Outputting two records from your mapper as:
ValueHistogram:key_a\tvalue_a\t10
ValueHistogram:key_a\tvalue_a\t20

is no different than outputting a single record with the sum:
ValueHistogram:key_a\tvalue_a\t30

A useful variation is for the mapper to only output the key and value, without the count and the tab character that goes with it. ValueHistogram is automatically assumes a count of 1 in this case. Listing 4.11 shows a trivial wrapper around ValueHistogram.
- Listing 4.11 ValueHistogram.py: wrapper around Aggregate package’s ValueHistogram
  1. #!/usr/bin/env python  
  2. import sys  
  3. index1 = int(sys.argv[1])  
  4. index2 = int(sys.argv[2])  
  5. for line in sys.stdin:  
  6.     fields = line.split(",")  
  7.     print "ValueHistogram:" + fields[index1] + "\t" + fields[index2]  
We run below script to find the distribution of countries with patents granted for each year.
- runValueHistogram.sh
  1. #!/bin/sh  
  2. hadoop fs -rm -r output  
  3. hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \  
  4. -input IA/patn_100000.txt \  
  5. -output output \  
  6. -mapper 'ValueHistogram.py 1 4' \  
  7. -reducer aggregate \  
  8. -file ValueHistogram.py  
The output is a tab-separated value (TSV) file with seven columns. The first column, the year of patent granted, is the key. The other six columns are the six statistics the ValueHistogram is set to compute. A partial view of the output is here:


The first column after the year is the number of unique values. This is exactly the same as the output of UniqValueCount. The second, third, and fourth columns are the minimum , median , and maximum , respectively.

We’ve seen how using the Aggregate package under Streaming is a simple way to get some popular metrics. It’s a great demonstration of Hadoop’s power in simplifying the analysis of large data sets.

Supplement
Ch4. Writing basic MapReduce programs (Part1)
Hadoop 2.6 FileSystem Shell
DevX.com - Introduction to Hadoop Streaming


沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...