Streaming in Hadoop (P80)
We have been using Java to write all our Hadoop programs. Hadoop supports other languages via a generic API called Streaming . In practice, Streaming is most useful for writing simple, short MapReduce programs that are more rapidly developed in a scripting language that can take advantage of non-Java libraries.
Hadoop Streaming interacts with programs using the Unix streaming paradigm. Inputs come in through STDIN and outputs go to STDOUT. Data has to be text based and each line is considered a record. Note that this is exactly how many Licensed to Unix commands work, and Hadoop Streaming enables those commands to be used as mappers and reducers. If you’re familiar with using Unix commands, such as wc, cut, or uniq for data processing, you can apply them to large data sets using Hadoop Streaming.
The overall data flow in Hadoop Streaming is like a pipe where data streams through the mapper, the output of which is sorted and streamed through the reducer. In pseudo-code using Unix’s command line notation, it’s:
The following examples will illustrate how to use Streaming with Unix commands.
Streaming with Unix commands
In the first example, let’s get a list of cited patents in cite75_99.txt.
That’s it! It’s a one-line command. Let’s see what each part of the command does.
The first part and the -input and the -output arguments specify that we’re running a Streaming program with the corresponding input and output file/directory. We see that for the mapper we use the Unix cut command to extract the second column, where columns are separated by commas. In the citation data set this column is the patent number of a cited patent. These patent numbers are then sorted and passed to the reducer. The uniq command at the reducer will remove all duplicates in the sorted data. The output of this command is:
After getting the list of cited patents, we may want to know how many are there. We can use Streaming to quickly get a count via Unix command wc –l.
Here we use wc –l as the mapper to count the number of records in each split. Hadoop Streaming (since version 0.19.0) supports the GenericOptionsParser. The-D argument is used for specifying configuration properties. We want the mapper to directly output the record count without any reducer, so we setmapreduce.job.reduces (mapred-default.xml) to 0 and don’t specify the -reducer option at all.
Streaming with scripts
We can use any executable script that processes a line-oriented data stream from STDIN and outputs to STDOUT with Hadoop Streaming. For example, the Python script in listing 4.4 randomly samples data from STDIN. For each line, we choose a random integer between 1 and 100 and check against the user-given argument (sys.argv[1]). The comparison determines whether to pass that line on to the output or ignore it. You can use the script in Unix to uniformly sample a line-oriented data file, for example:
The preceding command calls the Python script with an argument of 10; sampled_ output.txt will have (approximately) 10 percent of the records in input.txt. We can in fact specify any integer between 1 and 100 to get the corresponding percentage of data in the output.
- Listing 4.4 RandomSample.py: a Python script printing random lines from STDIN
We can apply the same script in Hadoop to get a smaller sample of a data set. A sampled data set is often useful for development purposes, as you can run your Hadoop program on the sampled data in standalone or pseudo-distributed mode to quickly debug and iterate. Also, when you’re looking for some “descriptive” information about your data, the speed and convenience in processing a smaller data set generally outweigh any loss of precision. Finding data clusters is one example of such descriptive information. Optimized implementations of a variety of clustering algorithms are readily available in R, MATLAB, and other packages. It makes a lot more sense to sample down the data and apply some standard software package, instead of trying to process all data using some distributed clustering algorithms in Hadoop.
Running RandomSample.py using Streaming is like running Unix commands using Streaming, the difference being that Unix commands are already available on all nodes in the cluster, whereas RandomSample.py is not. Hadoop Streaming supports a -file option to package your executable file as part of the job submission. Our command to execute RandomSample.py is:
- runRandomSample.sh
In specifying the mapper to be 'RandomSample.py 10' we’re sampling at 10 percent. Note that we’ve set the number of reducers (
mapred.reduce.tasks) to 1. As we haven’t specified any particular reducer, it will use the default IdentityReducer. As its name implies, IdentityReducer passes its input straight to output. In this case we can set the number of reducers to any non-zero value to get an exact number of output files. Alternatively, we can set the number of reducers to 0, and let the number of output files be the number of mappers. This is probably not ideal for the sampling task as each mapper’s output is only a small fraction of the input, and we may end up with a number of small files. We can easily correct that later using the HDFS shell command getmerge or other file manipulations to arrive at the right number of output files. The approach to use is more or less a personal preference.
The random sampling scripts don’t require any custom reducer, but you can’t always write a Streaming program like that. As you’ll use Streaming quite often in practice, let’s see another exercise. This time we create a custom reducer.
Suppose we’re interested in finding the most number of claims in a single patent. In the patent description data set, the number of claims for a given patent is in the ninth column. Our task is to find the maximum value in the ninth column of the patent description data.
Under Streaming, each mapper sees the entire stream of data, and it’s the mapper that takes on the responsibility of breaking the stream into (line-oriented) records. In the standard Java model, the framework itself breaks input data into records, and gives the map() method only one record at a time. The Streaming model makes it easy to keep state information across records in a split, which we take advantage of in computing the maximum. The standard Java model, too, can keep track of state across records in a split, but it’s more involved. We cover that in the next chapter.
In creating a Hadoop program for computing maximum, we take advantage of the distributive property of maximum. Given a data set divided into many splits, the global maximum is the maximum over the maxima of the splits. That sounded like a mouthful, but a simple example will make it clear. If we have four records X1, X2, X3, and X4, and they’re divided into two splits (X1, X2) and (X3, X4), we can find the maximum over all four records by looking at the maximum of each split, or:
Our strategy is to have mapper calculate the maximum over individual split. Each mapper will output a single value at the end. We have a single reducer that looks at all those values and outputs the global maximum. Listing 4.6 depicts the Python script to compute the maximum over split.
- Listing 4.6 AttributeMax.py: Python script to find maximum value of an attribute
Given the parsimonious output of the mapper, we can use the default IdentityReducer to record the (sorted) output of the mappers.
- runAttributeMax.sh (1)
The mapper is 'AttributeMax.py 8'. It outputs the maximum of the ninth column in a split. The single reducer collects all the mapper outputs. Given seven mappers, the final output of the above command is this:
We see that our mapper is doing the right thing. We can use a reducer that outputs the maximum over the values outputted by the mappers. We have an interesting situation here, due to the distributive property of maximum, where we can also use AttributeMax.py as the reducer. Only now the reducer is trying to find the maximum in the “first” column.
- runAttributeMax.sh (2)
The output of the above command should be a one-line file, and you’ll find the maximum number of claims in a patent to be 868.
Streaming with key/value pairs
At this point you may wonder what happened to the key/value pair way of encoding records. Our discussion on Streaming so far talks about each record as an atomic unit rather than as composed of a key and a value. The truth is that Streaming works on key/value pairs just like the standard Java MapReduce model. By default, Streaming uses the tab character to separate the key from the value in a record. When there’s no tab character, the entire record is considered the key and the value is empty text. For our data sets, which have no tab character, this provides the illusion that we’re processing each individual record as a whole unit. Furthermore, even if the records do have tab characters in them, the Streaming API will only shuffle and sort the records in a different order. As long as our mapper and reducer work in a record-oriented way, we can maintain the record-oriented illusion.
Working with key/value pairs allows us to take advantage of the key-based shuffling and sorting to create interesting data analyses. Let’s make this exercise more interesting by computing the average rather than finding the maximum. First, let’s examine how key/value pairs work in the Streaming API for each step of the MapReduce data flow.
To understand the data flow better, we write a Streaming program to compute the average number of claims for each country. The mapper will extract the country and the claims count for each patent and package them as a key/value pair. In accord with the default Streaming convention, the mapper outputs this key/value pair with a tab character to separate them. The Streaming API will pick up the key and the shuffling will guarantee that all claim counts of a country will end up at the same reducer. The Python code in listing 4.7, for each record, the mapper extracts the country (fields[4][1:-1]) as key and the claims count (fields[8]) as value. An extra concern with our data set is that missing values do exist. We’ve added a conditional statement to skip over records with missing claim counts.
- Listing 4.7 AverageByAttributeMapper.py: output country and claim count of patents
Before writing the reducer, let’s run the mapper in two situations: without any reducer, and with the default
IdentityReducer. It’s a useful approach now for learning as we can see exactly what’s being outputted by the mapper (by using no reducer) and what’s being inputted into the reducer (by usingIdentityReducer). You’ll find this handy later when debugging your MapReduce program. You can at least check if the mapper is outputting the proper data and if the proper data is being sent to the reducer. First let’s run the mapper without any reducer.
- runAverageByAttributeMapper.sh
The output should consist of lines where a country code is followed by a tab followed by a numeric count. The order of the output records is not sorted by (
the new) key. In fact, it’s in the same order as the order of the input records, although that’s not obvious from looking at the output.
The more interesting case is to use the IdentityReducer with a non-zero number of reducers. We see how the shuffled and sorted records are presented to the reducer. To keep it simple let’s try a single reducer by setting -D mapreduce.job.reduces=1 and see the first 32 records.
Under the Streaming API, the reducer will see these text data in STDIN. We have to code our reducer to recover the key/value pairs by breaking each line at the tab character. Sorting has “grouped” together records of the same key. As you read each line from STDIN, you’ll be responsible for keeping track of the boundary between records of different keys. Note that although the keys are sorted, the values don’t follow any particular order. Finally, the reducer must perform its stated computation, which in this case is calculating the average value across a key. Listing 4.8 gives the complete reducer in Python.
- Listing 4.8 AverageByAttributeReducer.py
The program keeps a running sum and count for each key. When it detects a new key in the input stream or the end of the file, it computes the average for the previous key and sends it to STDOUT. After running the entire MapReduce job, we can easily check the correctness of the first few results.
- runAverageByAttribute.sh
Run the MapReduce with below steps:
Streaming with the Aggregate package
Hadoop includes a library package called Aggregate that simplifies obtaining aggregate statistics of a data set. This package can simplify the writing of Java statistics collectors, especially when used with Streaming, which is the focus of this section.
The Aggregate package under Streaming functions as a reducer that computes aggregate statistics. You only have to provide a mapper that processes records and sends out a specially formatted output. Each line of the mapper’s output looks like:
The output string starts with the name of a value aggregator function (from the set of predefined functions available in the Aggregate package). A colon and a tab-separated key/value pair follows. The Aggregate reducer applies the function to the set of values for each key. For example, if the function is LongValueSum, then the output is the sum of values for each key. (As the function name implies, each value is treated as a Java long type.) If the function is LongValueMax, then the output is the maximum value for each key. You can see the list of aggregator functions supported in the Aggregate package in table 4.3.
Let’s go through an exercise using the Aggregate package to see how easy it is. We want to count the number of patents granted each year. We can approach this problem in a way similar to the word counting example we saw in chapter 1. For each record, our mapper will output the grant year as the key and a “1” as the value. The reducer will sum up all the values (“1”s) to arrive at a count. Only now we’re using Streaming with the Aggregate package. Our result will be the simple mapper shown in listing 4.9.
- Listing 4.9 AttributeCount.py
AttributeCount.py works for any CSV-formatted input file. The user only has to specify the column index to count the number of records for each attribute in that column. The print statement has the main “action” of this short program. It tells the Aggregate package to sum up all the values (of 1) for each key, defined as the user-specified column (index). To count the number of patents granted each year, we run this Streaming program with the Aggregate package, telling the mapper to use the second column (index = 1) of the input file as the attribute of interest.
- runAttributeCount.sh
You’ll find most of the options of running the Streaming program familiar. The main thing to point out is that we’ve specified the reducer to be 'aggregate'. This is the signal to the Streaming API that we’re using the Aggregate package. The output of the MapReduce job (
after sorting) is:
As shown in figure 4.3, we can plot the data to visualize it better. You’ll see that it has a mostly steady upward trend.
Looking at the list of functions in the Aggregate package in table 4.3, you’ll find that most of them are combinations of maximum, minimum, and sum for atomic data type. (For some reason DoubleValueMax and DoubleValueMin aren’t supported. They would be trivial modifications of LongValueMax andLongValueMin and an added advantage.) UniqValueCount and ValueHistogram are slightly different and we look at some examples of how to use them.
UniqValueCount gives the number of unique values for each key. For example, we may want to know whether more countries are participating in the U.S. patent system over time. We can examine this by looking at the number of countries with patents granted each year. We use a straightforward wrapper ofUniqValueCount in listing 4.10 and apply it to the year and country columns of apat63_99.txt (column index of 1 and 4, respectively).
- Listing 4.10 UniqueCount.py: a wrapper around the UniqValueCount function
- runUniqueCountx.sh
In the output we get one record for each year. Plotting it gives us figure 4.4. We can see that the increasing number of patents granted from 1960 to 1990 (
from figure 4.3) didn’t come from more countries (figure 4.4). The same number of countries had filed more.
The aggregate function ValueHistogramis the most ambitious function in the Aggregate package. For each key, it outputs the following:
In its most general form, it expects the output of the mapper to have the form
We specify the function ValueHistogramis followed by a colon, followed by a tab-separated key, value, and count triplet. The Aggregate reducer outputs the six statistics above for each key. Note that for everything except the first statistics (number of unique values) the counts are summed over each key/value pair. Outputting two records from your mapper as:
is no different than outputting a single record with the sum:
A useful variation is for the mapper to only output the key and value, without the count and the tab character that goes with it. ValueHistogram is automatically assumes a count of 1 in this case. Listing 4.11 shows a trivial wrapper around ValueHistogram.
- Listing 4.11 ValueHistogram.py: wrapper around Aggregate package’s ValueHistogram
We run below script to find the distribution of countries with patents granted for each year.
- runValueHistogram.sh
The output is a tab-separated value (
TSV) file with seven columns. The first column, the year of patent granted, is the key. The other six columns are the six statistics the ValueHistogram is set to compute. A partial view of the output is here:
The first column after the year is the number of unique values. This is exactly the same as the output of UniqValueCount. The second, third, and fourth columns are the minimum , median , and maximum , respectively.
We’ve seen how using the Aggregate package under Streaming is a simple way to get some popular metrics. It’s a great demonstration of Hadoop’s power in simplifying the analysis of large data sets.
Supplement
* Ch4. Writing basic MapReduce programs (Part1)
* Hadoop 2.6 FileSystem Shell
* DevX.com - Introduction to Hadoop Streaming
We have been using Java to write all our Hadoop programs. Hadoop supports other languages via a generic API called Streaming . In practice, Streaming is most useful for writing simple, short MapReduce programs that are more rapidly developed in a scripting language that can take advantage of non-Java libraries.
Hadoop Streaming interacts with programs using the Unix streaming paradigm. Inputs come in through STDIN and outputs go to STDOUT. Data has to be text based and each line is considered a record. Note that this is exactly how many Licensed to Unix commands work, and Hadoop Streaming enables those commands to be used as mappers and reducers. If you’re familiar with using Unix commands, such as wc, cut, or uniq for data processing, you can apply them to large data sets using Hadoop Streaming.
The overall data flow in Hadoop Streaming is like a pipe where data streams through the mapper, the output of which is sorted and streamed through the reducer. In pseudo-code using Unix’s command line notation, it’s:
The following examples will illustrate how to use Streaming with Unix commands.
Streaming with Unix commands
In the first example, let’s get a list of cited patents in cite75_99.txt.
That’s it! It’s a one-line command. Let’s see what each part of the command does.
The first part and the -input and the -output arguments specify that we’re running a Streaming program with the corresponding input and output file/directory. We see that for the mapper we use the Unix cut command to extract the second column, where columns are separated by commas. In the citation data set this column is the patent number of a cited patent. These patent numbers are then sorted and passed to the reducer. The uniq command at the reducer will remove all duplicates in the sorted data. The output of this command is:
After getting the list of cited patents, we may want to know how many are there. We can use Streaming to quickly get a count via Unix command wc –l.
Here we use wc –l as the mapper to count the number of records in each split. Hadoop Streaming (since version 0.19.0) supports the GenericOptionsParser. The-D argument is used for specifying configuration properties. We want the mapper to directly output the record count without any reducer, so we setmapreduce.job.reduces (mapred-default.xml) to 0 and don’t specify the -reducer option at all.
Streaming with scripts
We can use any executable script that processes a line-oriented data stream from STDIN and outputs to STDOUT with Hadoop Streaming. For example, the Python script in listing 4.4 randomly samples data from STDIN. For each line, we choose a random integer between 1 and 100 and check against the user-given argument (sys.argv[1]). The comparison determines whether to pass that line on to the output or ignore it. You can use the script in Unix to uniformly sample a line-oriented data file, for example:
The preceding command calls the Python script with an argument of 10; sampled_ output.txt will have (approximately) 10 percent of the records in input.txt. We can in fact specify any integer between 1 and 100 to get the corresponding percentage of data in the output.
- Listing 4.4 RandomSample.py: a Python script printing random lines from STDIN
- #!//usr/bin/python
- import sys, random
- for line in sys.stdin:
- if (random.randint(1,100) <= int(sys.argv[1])):
- print line.strip()
Running RandomSample.py using Streaming is like running Unix commands using Streaming, the difference being that Unix commands are already available on all nodes in the cluster, whereas RandomSample.py is not. Hadoop Streaming supports a -file option to package your executable file as part of the job submission. Our command to execute RandomSample.py is:
- runRandomSample.sh
- #!/bin/sh
- hadoop fs -rm -r output
- hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \
- -D mapred.reduce.tasks=1 \
- -input IA/cite_100000.txt \
- -output output \
- -mapper 'RandomSample.py 10' \
- -file RandomSample.py
The random sampling scripts don’t require any custom reducer, but you can’t always write a Streaming program like that. As you’ll use Streaming quite often in practice, let’s see another exercise. This time we create a custom reducer.
Suppose we’re interested in finding the most number of claims in a single patent. In the patent description data set, the number of claims for a given patent is in the ninth column. Our task is to find the maximum value in the ninth column of the patent description data.
Under Streaming, each mapper sees the entire stream of data, and it’s the mapper that takes on the responsibility of breaking the stream into (line-oriented) records. In the standard Java model, the framework itself breaks input data into records, and gives the map() method only one record at a time. The Streaming model makes it easy to keep state information across records in a split, which we take advantage of in computing the maximum. The standard Java model, too, can keep track of state across records in a split, but it’s more involved. We cover that in the next chapter.
In creating a Hadoop program for computing maximum, we take advantage of the distributive property of maximum. Given a data set divided into many splits, the global maximum is the maximum over the maxima of the splits. That sounded like a mouthful, but a simple example will make it clear. If we have four records X1, X2, X3, and X4, and they’re divided into two splits (X1, X2) and (X3, X4), we can find the maximum over all four records by looking at the maximum of each split, or:
Our strategy is to have mapper calculate the maximum over individual split. Each mapper will output a single value at the end. We have a single reducer that looks at all those values and outputs the global maximum. Listing 4.6 depicts the Python script to compute the maximum over split.
- Listing 4.6 AttributeMax.py: Python script to find maximum value of an attribute
Given the parsimonious output of the mapper, we can use the default IdentityReducer to record the (sorted) output of the mappers.
- runAttributeMax.sh (1)
- #!/bin/sh
- hadoop fs -rm -r output
- hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \
- -D mapred.reduce.tasks=1 \
- -input IA/patn_100000.txt \
- -output output \
- -mapper 'AttributeMax.py 8' \
- -file AttributeMax.py
We see that our mapper is doing the right thing. We can use a reducer that outputs the maximum over the values outputted by the mappers. We have an interesting situation here, due to the distributive property of maximum, where we can also use AttributeMax.py as the reducer. Only now the reducer is trying to find the maximum in the “first” column.
- runAttributeMax.sh (2)
- #!/bin/sh
- hadoop fs -rm -r output
- hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \
- -D mapred.reduce.tasks=1 \
- -input IA/patn_100000.txt \
- -output output \
- -mapper 'AttributeMax.py 8' \
- -reducer 'AttributeMax.py 0' \
- -file AttributeMax.py
Streaming with key/value pairs
At this point you may wonder what happened to the key/value pair way of encoding records. Our discussion on Streaming so far talks about each record as an atomic unit rather than as composed of a key and a value. The truth is that Streaming works on key/value pairs just like the standard Java MapReduce model. By default, Streaming uses the tab character to separate the key from the value in a record. When there’s no tab character, the entire record is considered the key and the value is empty text. For our data sets, which have no tab character, this provides the illusion that we’re processing each individual record as a whole unit. Furthermore, even if the records do have tab characters in them, the Streaming API will only shuffle and sort the records in a different order. As long as our mapper and reducer work in a record-oriented way, we can maintain the record-oriented illusion.
Working with key/value pairs allows us to take advantage of the key-based shuffling and sorting to create interesting data analyses. Let’s make this exercise more interesting by computing the average rather than finding the maximum. First, let’s examine how key/value pairs work in the Streaming API for each step of the MapReduce data flow.
To understand the data flow better, we write a Streaming program to compute the average number of claims for each country. The mapper will extract the country and the claims count for each patent and package them as a key/value pair. In accord with the default Streaming convention, the mapper outputs this key/value pair with a tab character to separate them. The Streaming API will pick up the key and the shuffling will guarantee that all claim counts of a country will end up at the same reducer. The Python code in listing 4.7, for each record, the mapper extracts the country (fields[4][1:-1]) as key and the claims count (fields[8]) as value. An extra concern with our data set is that missing values do exist. We’ve added a conditional statement to skip over records with missing claim counts.
- Listing 4.7 AverageByAttributeMapper.py: output country and claim count of patents
- #!/usr/bin/python
- import sys
- for line in sys.stdin:
- fields = line.split(",")
- if (fields[8] and fields[8].isdigit()):
- print fields[4][1:-1] + "\t" + fields[8]
- runAverageByAttributeMapper.sh
- #!/bin/sh
- hadoop fs -rm -r output
- hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \
- -D mapreduce.job.reduces=0 \
- -input IA/patn_100000.txt \
- -output output \
- -mapper 'AverageByAttributeMapper.py' \
- -file AverageByAttributeMapper.py
The more interesting case is to use the IdentityReducer with a non-zero number of reducers. We see how the shuffled and sorted records are presented to the reducer. To keep it simple let’s try a single reducer by setting -D mapreduce.job.reduces=1 and see the first 32 records.
Under the Streaming API, the reducer will see these text data in STDIN. We have to code our reducer to recover the key/value pairs by breaking each line at the tab character. Sorting has “grouped” together records of the same key. As you read each line from STDIN, you’ll be responsible for keeping track of the boundary between records of different keys. Note that although the keys are sorted, the values don’t follow any particular order. Finally, the reducer must perform its stated computation, which in this case is calculating the average value across a key. Listing 4.8 gives the complete reducer in Python.
- Listing 4.8 AverageByAttributeReducer.py
- #!/usr/bin/python
- import sys
- (last_key, sum, count) = (None, 0.0, 0)
- for line in sys.stdin:
- (key, val) = line.split("\t")
- if last_key and last_key != key:
- print last_key + "\t" + str(sum / count)
- (sum, count) = (0.0, 0)
- last_key = key
- sum += float(val)
- count += 1
- print last_key + "\t" + str(sum / count)
- runAverageByAttribute.sh
- #!/bin/sh
- hadoop fs -rm -r output
- hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \
- -D mapreduce.job.reduces=1 \
- -input IA/patn_100000.txt \
- -output output \
- -mapper 'AverageByAttributeMapper.py' \
- -reducer 'AverageByAttributeReducer.py' \
- -file AverageByAttributeMapper.py \
- -file AverageByAttributeReducer.py
Streaming with the Aggregate package
Hadoop includes a library package called Aggregate that simplifies obtaining aggregate statistics of a data set. This package can simplify the writing of Java statistics collectors, especially when used with Streaming, which is the focus of this section.
The Aggregate package under Streaming functions as a reducer that computes aggregate statistics. You only have to provide a mapper that processes records and sends out a specially formatted output. Each line of the mapper’s output looks like:
The output string starts with the name of a value aggregator function (from the set of predefined functions available in the Aggregate package). A colon and a tab-separated key/value pair follows. The Aggregate reducer applies the function to the set of values for each key. For example, if the function is LongValueSum, then the output is the sum of values for each key. (As the function name implies, each value is treated as a Java long type.) If the function is LongValueMax, then the output is the maximum value for each key. You can see the list of aggregator functions supported in the Aggregate package in table 4.3.
Let’s go through an exercise using the Aggregate package to see how easy it is. We want to count the number of patents granted each year. We can approach this problem in a way similar to the word counting example we saw in chapter 1. For each record, our mapper will output the grant year as the key and a “1” as the value. The reducer will sum up all the values (“1”s) to arrive at a count. Only now we’re using Streaming with the Aggregate package. Our result will be the simple mapper shown in listing 4.9.
- Listing 4.9 AttributeCount.py
- #!/usr/bin/python
- import sys
- index = int(sys.argv[1])
- for line in sys.stdin:
- fields = line.split(",")
- print "LongValueSum:" + fields[index] + "\t" + "1"
- runAttributeCount.sh
- #!/bin/sh
- hadoop fs -rm -r output
- hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \
- -input IA/patn_100000.txt \
- -output output \
- -mapper 'AttributeCount.py 1' \
- -reducer aggregate \
- -file AttributeCount.py
As shown in figure 4.3, we can plot the data to visualize it better. You’ll see that it has a mostly steady upward trend.
Looking at the list of functions in the Aggregate package in table 4.3, you’ll find that most of them are combinations of maximum, minimum, and sum for atomic data type. (For some reason DoubleValueMax and DoubleValueMin aren’t supported. They would be trivial modifications of LongValueMax andLongValueMin and an added advantage.) UniqValueCount and ValueHistogram are slightly different and we look at some examples of how to use them.
UniqValueCount gives the number of unique values for each key. For example, we may want to know whether more countries are participating in the U.S. patent system over time. We can examine this by looking at the number of countries with patents granted each year. We use a straightforward wrapper ofUniqValueCount in listing 4.10 and apply it to the year and country columns of apat63_99.txt (column index of 1 and 4, respectively).
- Listing 4.10 UniqueCount.py: a wrapper around the UniqValueCount function
- #!/usr/bin/env python
- import sys
- index1 = int(sys.argv[1])
- index2 = int(sys.argv[2])
- for line in sys.stdin:
- fields = line.split(",")
- print "UniqValueCount:" + fields[index1] + "\t" + fields[index2]
- #!/bin/sh
- hadoop fs -rm -r output
- hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \
- -input IA/patn_100000.txt \
- -output output \
- -mapper 'UniqueCount.py 1 4' \
- -reducer aggregate \
- -file UniqueCount.py
The aggregate function ValueHistogramis the most ambitious function in the Aggregate package. For each key, it outputs the following:
In its most general form, it expects the output of the mapper to have the form
We specify the function ValueHistogramis followed by a colon, followed by a tab-separated key, value, and count triplet. The Aggregate reducer outputs the six statistics above for each key. Note that for everything except the first statistics (number of unique values) the counts are summed over each key/value pair. Outputting two records from your mapper as:
is no different than outputting a single record with the sum:
A useful variation is for the mapper to only output the key and value, without the count and the tab character that goes with it. ValueHistogram is automatically assumes a count of 1 in this case. Listing 4.11 shows a trivial wrapper around ValueHistogram.
- Listing 4.11 ValueHistogram.py: wrapper around Aggregate package’s ValueHistogram
- #!/usr/bin/env python
- import sys
- index1 = int(sys.argv[1])
- index2 = int(sys.argv[2])
- for line in sys.stdin:
- fields = line.split(",")
- print "ValueHistogram:" + fields[index1] + "\t" + fields[index2]
- runValueHistogram.sh
- #!/bin/sh
- hadoop fs -rm -r output
- hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar \
- -input IA/patn_100000.txt \
- -output output \
- -mapper 'ValueHistogram.py 1 4' \
- -reducer aggregate \
- -file ValueHistogram.py
The first column after the year is the number of unique values. This is exactly the same as the output of UniqValueCount. The second, third, and fourth columns are the minimum , median , and maximum , respectively.
We’ve seen how using the Aggregate package under Streaming is a simple way to get some popular metrics. It’s a great demonstration of Hadoop’s power in simplifying the analysis of large data sets.
Supplement
* Ch4. Writing basic MapReduce programs (Part1)
* Hadoop 2.6 FileSystem Shell
* DevX.com - Introduction to Hadoop Streaming
沒有留言:
張貼留言