This chapter covers:
The MapReduce programming model is unlike most programming models you may have learned. It’ll take some time and practice to gain familiarity. To help develop your proficiency, we go through many example programs in the next couple chapters. These examples will illustrate various MapReduce programming techniques. By applying MapReduce in multiple ways you’ll start to develop an intuition and a habit of “MapReduce thinking.” The examples will cover simple tasks to advanced uses. In one of the advanced applications we introduce the Bloom filter, a data structure not normally taught in the standard computer science curriculum. You’ll see that processing large data sets, whether you’re using Hadoop or not, often requires a rethinking of the underlying algorithms.
Getting the patent data set
To do anything meaningful with Hadoop we need data. Many of our examples will use patent data sets, both of which are available from the National Bureau of Economic Research (NBER) at http://www.nber.org/patents/. The data sets were originally compiled for the paper “The NBER Patent Citation Data File: Lessons, Insights and Methodological Tools.”1 We use the citation data set cite75_99.txt and the patent description data set apat63_99.txt.
NOTE
The patent citation data
The patent citation data set contains citations from U.S. patents issued between 1975 and 1999. It has more than 16 million rows and the first few lines resemble the following:
The data set is in the standard comma-separated values (CSV) format, with the first line a description of the columns. Each of the other lines record one particular citation. For example, the second line shows that patent 3858241 cites patent 956203. The file is sorted by the citing patent. We can see that patent 3858241 cites five patents in total. Analyzing the data more quantitatively will give us deeper insights into it.
The patent description data
The other data set we use is the patent description data. It has the patent number, the patent application year, the patent grant year, the number of claims, and other metadata about patents. Look at the first few lines of this data set. It’s similar to a table in a relational database, but in CSV format. This data set has more than 2.9 million records. As in many real-world data sets, it has many missing values .
The first row contains the name of a couple dozen attributes, which are meaningful only to patent specialists. Even without understanding all the attributes, it’s still useful to have some idea of a few of them. Table 4.1 describes the first ten.
Now that we have two patent data sets, let’s write Hadoop programs to process the data.
Constructing the basic template of a MapReduce program (P67)
We write most MapReduce programs in brief and as variations on a template. When writing a new MapReduce program, you generally take an existing MapReduce program and modify it until it does what you want. In this section, we write our first MapReduce program and explain its different parts. This program can serve as a template for future MapReduce programs.
Our first program will take the patent citation data and invert it. For each patent, we want to find and group the patents that cite it. Our output should be similar to the following:
We have discovered that patents 5312208, 4944640, and 5071294 cited patent 1000067. For this section we won’t focus too much on the MapReduce data flow, which we’ve already covered in chapter 3. Instead we focus on the structure of a MapReduce program. We need only one file for the entire program as you can see in listing 4.1.
- Listing 4.1 Template for a typical Hadoop program
- package ch4;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import org.apache.hadoop.util.StringUtils;
- public class MyJob extends Configured implements Tool {
- public static class MapClass extends Mapper
{ - @Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- // "CITING","CITED"
- context.write(value, key);
- }
- }
- public static class Reduce extends Reducer
- {
- @Override
- public void reduce(Text key, Iterable
values, Context context) - throws IOException, InterruptedException
- {
- // "CITED", "CITING List"
- List
citingList = new ArrayList (); - for(Text citing:values) citingList.add(citing.toString());
- context.write(key, new Text(StringUtils.join(",", citingList)));
- }
- }
- public int run(String[] args) throws Exception {
- Job job = new Job(getConf());
- Path in = new Path(args[0]);
- Path out = new Path(args[1]);
- FileInputFormat.setInputPaths(job, in);
- FileOutputFormat.setOutputPath(job, out);
- job.setJobName("MyJob");
- job.setJarByClass(MyJob.class);
- job.setMapperClass(MapClass.class);
- job.setReducerClass(Reduce.class);
- job.setInputFormatClass(KeyValueTextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.getConfiguration().set("key.value.separator.in.input.line", ",");
- boolean success = job.waitForCompletion(true);
- return(success ? 0 : 1);
- }
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new MyJob(), args);
- System.exit(res);
- }
- }
The core of the skeleton is within the run() method, also known as the driver. The driver instantiates, configures, and run a Job object named job. It will hold aConfiguration object which holds all configuration parameters necessary for the job to run. The driver can define its own set of commands and process the user arguments itself to enable the user to modify some of the configuration parameters. As this task is needed often, the Hadoop framework provides ToolRunner,Tool, and Configured to simplify it. When used together in the MyJob skeleton above, these classes enable our job to understand user-supplied options that are supported by GenericOptionsParser. For example, we have previously executed the MyJob class using this command line:
Had we wanted to run the job only to see the mapper’s output (which you may want to do for debugging purposes), we could set the number of reducers to zero with the option -D mapreduce.job.reduces=0 (mapred-default.xml).
It works even though our program doesn’t explicitly understand the -D option. By using ToolRunner, MyJob will automatically support the options in table 4.2.
The signatures for the Mapper class and the Reducer class are:
- public static class MapClass extends MapReduceBase extends Mapper
{ - public void map(K1 key, V1 value, Context context) throws IOException, InterruptedException { { }
- }
- public static class Reduce extends Reducer
{ - public void reduce(K2 key, Iterable
values, Context context) - throws IOException, InterruptedException { }
- }
Each invocation of the reduce() method at the reducer is given a key of type K2 and a list of values of type V2. Note that it must be the same K2 and V2 types used in the Mapper. The reduce() method will likely have a loop to go through all the values of type V2.
Finally, all the key and value types must be subtypes of Writable, which ensures a serialization interface for Hadoop to send the data around in a distributed cluster. In fact, the key types implement WritableComparable, a subinterface of Writable. The key types need to additionally support the compareTo() method, as keys are used for sorting in various places in the MapReduce framework.
Counting things
Much of what the layperson thinks of as statistics is counting, and many basic Hadoop jobs involve counting. We’ve already seen the word count example in chapter 1. For the patent citation data, we may want the number of citations a patent has received. This too is counting. The desired output would look like this:
In each record, a patent number is associated with the number of citations it has received. We can write a MapReduce program for this task. We already have a program for getting the inverted citation index. We can modify that program to output the count instead of the list of citing patents. We need the modification only at the Reducer. If we choose to output the count as an IntWritable, we need to specify IntWritable in three places in the Reducer code. We called them V3 in our previous notation.
- public static class Reduce extends Reducer
- {
- @Override
- public void reduce(Text key, Iterable
values, Context context) - throws IOException, InterruptedException
- {
- // "PTN_ID", "CITING_COUNT"
- int cnt=0;
- Iterator
iter = values.iterator(); - while(iter.hasNext())
- {
- iter.next();
- cnt++;
- }
- context.write(key, new IntWritable(cnt));
- }
- }
- runCount.sh
- #!/bin/sh
- OUTPUT="IA_Count"
- hadoop fs -rm -r $OUTPUT
- hadoop jar HadoopIA.jar ch4.counting.CountCiting IA/cite75_99.txt $OUTPUT
- hadoop fs -cat IA_Count/* | less
The first step to writing a MapReduce program is to figure out the data flow. In this case, as a mapper reads a record, it ignores the patent number and outputs an intermediate key/value pair of
After figuring out the data flow, decide on the types for the key/value pairs—K1, V1, K2, V2, K3, and V3 for the input, intermediate, and output key/value pairs. Let’s use the KeyValueTextInputFormat, which automatically breaks each input record into key/value pairs based on a separator character. The input format produces K1 and V1 as Text. We choose to use IntWritable for K2, V2, K3, and V3 because we know those data must be integers.
Based on the data flow and the data types, you’ll be able to see the final program shown in listing 4.2 and understand what it’s doing. You can see that it’s structurally similar to the other MapReduce programs we’ve seen so far.
- Listing 4.2 CitationHistogram.java: count patents cited once, twice, and so on
- package ch4.counting;
- import java.io.IOException;
- import java.util.Iterator;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- public class CitationHistogram extends Configured implements Tool{
- public static class MapClass extends Mapper
{ - private final static IntWritable uno = new IntWritable(1);
- private IntWritable citationCount = new IntWritable();
- @Override
- public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
- // PatentID, CitingNumber
- citationCount.set(Integer.valueOf(value.toString()));
- context.write(citationCount, uno);
- }
- }
- public static class Reduce extends Reducer
- {
- @Override
- public void reduce(IntWritable key, Iterable
values, Context context) - throws IOException, InterruptedException
- {
- // CitingNumber, Count
- int count=0;
- Iterator
iter = values.iterator(); - while(iter.hasNext())
- {
- iter.next();
- count++;
- }
- context.write(key, new IntWritable(count));
- }
- }
- @Override
- public int run(String[] args) throws Exception {
- Job job = new Job(getConf());
- Path in = new Path(args[0]);
- Path out = new Path(args[1]);
- FileInputFormat.setInputPaths(job, in);
- FileOutputFormat.setOutputPath(job, out);
- job.setJobName("CitationHistogram");
- job.setJarByClass(CitationHistogram.class);
- job.setMapperClass(MapClass.class);
- job.setReducerClass(Reduce.class);
- job.setMapOutputKeyClass(IntWritable.class);
- job.setMapOutputValueClass(IntWritable.class);
- job.setInputFormatClass(KeyValueTextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(IntWritable.class);
- //job.getConfiguration().set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
- boolean success = job.waitForCompletion(true);
- return(success ? 0 : 1);
- }
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new CitationHistogram(), args);
- System.exit(res);
- }
- }
- runHistogram.sh
- #!/bin/sh
- OUTPUT="IA_Histogram"
- hadoop fs -rm -r $OUTPUT
- hadoop jar HadoopIA.jar ch4.counting.CitationHistogram IA_Count $OUTPUT
- hadoop fs -cat "${OUTPUT}"/* | less
As this histogram output is only several hundred lines long, we can put it into a spreadsheet and plot it. Figure 4.2 shows the number of patents at various citation frequencies. The plot is on a log-log scale. When a distribution shows as a line in a log-log plot, it’s considered to be a power law distribution . The citation count histogram seems to fit the description, although its approximately parabolic curvature also suggests a lognormal distribution .
Supplement
* How to specify KeyValueTextInputFormat Separator in Hadoop-.20 api?
* Listing current running Hadoop Jobs and Killing running Jobs
沒有留言:
張貼留言