2014年12月10日 星期三

[ In Action ] Ch4. Writing basic MapReduce programs (Part1)

Introduction: 
This chapter covers: 
■ Patent data as an example data set to process with Hadoop
■ Skeleton of a MapReduce program
■ Basic MapReduce programs to count statistics
■ Hadoop’s Streaming API for writing MapReduce programs using scripting languages
■ Combiner to improve performance

The MapReduce programming model is unlike most programming models you may have learned. It’ll take some time and practice to gain familiarity. To help develop your proficiency, we go through many example programs in the next couple chap­ters. These examples will illustrate various MapReduce programming techniques. By applying MapReduce in multiple ways you’ll start to develop an intuition and a habit of “MapReduce thinking.” The examples will cover simple tasks to advanced uses. In one of the advanced applications we introduce the Bloom filter, a data struc­ture not normally taught in the standard computer science curriculum. You’ll see that processing large data sets, whether you’re using Hadoop or not, often requires a rethinking of the underlying algorithms

Getting the patent data set 
To do anything meaningful with Hadoop we need data. Many of our examples will use patent data sets, both of which are available from the National Bureau of Economic Research (NBER) at http://www.nber.org/patents/. The data sets were originally compiled for the paper “The NBER Patent Citation Data File: Lessons, Insights and Methodological Tools.”1 We use the citation data set cite75_99.txt and the patent description data set apat63_99.txt
NOTE 
The data sets are approximately 250 MB each, which are small enough to make our examples runnable in Hadoop’s standalone or pseudo-distributed mode . You can practice writing MapReduce programs using them even when you don’t have access to a live cluster. The best part of Hadoop is that you can be fairly sure your MapReduce program will run on clusters of machines processing data sets 100 or 1,000 times larger with virtually no code changes.

The patent citation data 
The patent citation data set contains citations from U.S. patents issued between 1975 and 1999. It has more than 16 million rows and the first few lines resemble the following: 
"CITING","CITED"
3858241,956203
3858241,1324234
3858241,3398406
3858241,3557384
3858241,3634889
3858242,1515701
3858242,3319261
3858242,3668705
3858242,3707004
...

The data set is in the standard comma-separated values (CSV) format, with the first line a description of the columns. Each of the other lines record one particular citation. For example, the second line shows that patent 3858241 cites patent 956203. The file is sorted by the citing patent. We can see that patent 3858241 cites five patents in total. Analyzing the data more quantitatively will give us deeper insights into it. 

The patent description data 
The other data set we use is the patent description data. It has the patent number, the patent application year, the patent grant year, the number of claims, and other metadata about patents. Look at the first few lines of this data set. It’s similar to a table in a relational database, but in CSV format. This data set has more than 2.9 million records. As in many real-world data sets, it has many missing values . 
 

The first row contains the name of a couple dozen attributes, which are meaningful only to patent specialists. Even without understanding all the attributes, it’s still useful to have some idea of a few of them. Table 4.1 describes the first ten. 
 

Now that we have two patent data sets, let’s write Hadoop programs to process the data. 

Constructing the basic template of a MapReduce program (P67) 
We write most MapReduce programs in brief and as variations on a template. When writing a new MapReduce program, you generally take an existing MapReduce program and modify it until it does what you want. In this section, we write our first MapReduce program and explain its different parts. This program can serve as a template for future MapReduce programs. 

Our first program will take the patent citation data and invert it. For each patent, we want to find and group the patents that cite it. Our output should be similar to the following: 
1000051 4541310
1000054 4946631
1000065 4748968
1000067 5312208,4944640,5071294
1000070 4928425,5009029

We have discovered that patents 5312208, 4944640, and 5071294 cited patent 1000067. For this section we won’t focus too much on the MapReduce data flow, which we’ve already covered in chapter 3. Instead we focus on the structure of a MapReduce program. We need only one file for the entire program as you can see in listing 4.1. 
- Listing 4.1 Template for a typical Hadoop program 
  1. package ch4;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.ArrayList;  
  5. import java.util.List;  
  6.   
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.conf.Configured;  
  9. import org.apache.hadoop.fs.Path;  
  10. import org.apache.hadoop.io.LongWritable;  
  11. import org.apache.hadoop.io.Text;  
  12. import org.apache.hadoop.mapreduce.Job;  
  13. import org.apache.hadoop.mapreduce.Mapper;  
  14. import org.apache.hadoop.mapreduce.Reducer;  
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  16. import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;  
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  18. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  19. import org.apache.hadoop.util.Tool;  
  20. import org.apache.hadoop.util.ToolRunner;  
  21. import org.apache.hadoop.util.StringUtils;  
  22.   
  23. public class MyJob extends Configured implements Tool {  
  24.     public static class MapClass extends Mapper {  
  25.         @Override  
  26.         public void map(Text key, Text value, Context context) throws IOException, InterruptedException {  
  27.             // "CITING","CITED"   
  28.             context.write(value, key);  
  29.         }  
  30.     }  
  31.       
  32.     public static class Reduce extends Reducer   
  33.     {  
  34.         @Override  
  35.         public void reduce(Text key, Iterable values, Context context)  
  36.                 throws IOException, InterruptedException   
  37.         {  
  38.             // "CITED", "CITING List"  
  39.             List citingList = new ArrayList();  
  40.             for(Text citing:values) citingList.add(citing.toString());  
  41.             context.write(key, new Text(StringUtils.join(",", citingList)));  
  42.         }  
  43.     }  
  44.   
  45.     public int run(String[] args) throws Exception {  
  46.         Job job = new Job(getConf());  
  47.         Path in = new Path(args[0]);  
  48.         Path out = new Path(args[1]);  
  49.         FileInputFormat.setInputPaths(job, in);  
  50.         FileOutputFormat.setOutputPath(job, out);  
  51.         job.setJobName("MyJob");  
  52.         job.setJarByClass(MyJob.class);  
  53.         job.setMapperClass(MapClass.class);  
  54.         job.setReducerClass(Reduce.class);  
  55.         job.setInputFormatClass(KeyValueTextInputFormat.class);  
  56.         job.setOutputFormatClass(TextOutputFormat.class);  
  57.         job.setOutputKeyClass(Text.class);  
  58.         job.setOutputValueClass(Text.class);  
  59.         job.getConfiguration().set("key.value.separator.in.input.line"",");  
  60.         boolean success = job.waitForCompletion(true);    
  61.         return(success ? 0 : 1);    
  62.     }  
  63.   
  64.     public static void main(String[] args) throws Exception {  
  65.         int res = ToolRunner.run(new Configuration(), new MyJob(), args);  
  66.         System.exit(res);  
  67.     }  
  68. }  
Our convention is that a single class, called MyJob in this case, completely defines each MapReduce job. Hadoop requires the Mapper and the Reducer to be their own static classes. These classes are quite small, and our template includes them as inner classes to the MyJob class. The advantage is that everything fits in one file, simplifying code management. But keep in mind that these inner classes are independent and don’t interact much with the MyJob classVarious nodes with different JVMs clone and run the Mapper and the Reducer during job execution , whereas the rest of the job class is executed only at the client machine

The core of the skeleton is within the run() method, also known as the driver. The driver instantiates, configures, and run a Job object named job. It will hold aConfiguration object which holds all configuration parameters necessary for the job to run. The driver can define its own set of commands and process the user arguments itself to enable the user to modify some of the configuration parameters. As this task is needed often, the Hadoop framework provides ToolRunner,Tool, and Configured to simplify it. When used together in the MyJob skeleton above, these classes enable our job to understand user-supplied options that are supported by GenericOptionsParser. For example, we have previously executed the MyJob class using this command line: 
$ hadoop fs -put cite75_99.txt IA/cite75_99.txt # Upload data to HDFS
$ hadoop jar HadoopIA.jar ch4.MyJob IA/cite75_99.txt IA_Cite # Run MapReduce

Had we wanted to run the job only to see the mapper’s output (which you may want to do for debugging purposes), we could set the number of reducers to zero with the option -D mapreduce.job.reduces=0 (mapred-default.xml). 

It works even though our program doesn’t explicitly understand the -D option. By using ToolRunnerMyJob will automatically support the options in table 4.2. 
 

The signatures for the Mapper class and the Reducer class are: 
  1. public static class MapClass extends MapReduceBase extends Mapper {  
  2.     public void map(K1 key, V1 value, Context context) throws IOException, InterruptedException { { }  
  3. }  
  4. public static class Reduce extends Reducer  {  
  5.     public void reduce(K2 key, Iterable values, Context context)  
  6.                 throws IOException, InterruptedException { }  
  7. }  
The center of action for the Mapper class is the map() method and for the Reducer class the reduce() method. Each invocation of the map() method is given a key/value pair of types K1 and V1, respectively. The key/value pairs generated by the mapper are outputted via the write() method of the Context object. Somewhere in your map() method you need to call. 

Each invocation of the reduce() method at the reducer is given a key of type K2 and a list of values of type V2. Note that it must be the same K2 and V2 types used in the Mapper. The reduce() method will likely have a loop to go through all the values of type V2. 

Finally, all the key and value types must be subtypes of Writable, which ensures a serialization interface for Hadoop to send the data around in a distributed cluster. In fact, the key types implement WritableComparable, a subinterface of Writable. The key types need to additionally support the compareTo() method, as keys are used for sorting in various places in the MapReduce framework. 

Counting things 
Much of what the layperson thinks of as statistics is counting, and many basic Hadoop jobs involve counting. We’ve already seen the word count example in chapter 1. For the patent citation data, we may want the number of citations a patent has received. This too is counting. The desired output would look like this: 
1 2
10000 1
100000 1
1000006 1
1000007 1
1000011 1
1000017 1
...

In each record, a patent number is associated with the number of citations it has received. We can write a MapReduce program for this task. We already have a program for getting the inverted citation index. We can modify that program to output the count instead of the list of citing patents. We need the modification only at the Reducer. If we choose to output the count as an IntWritable, we need to specify IntWritable in three places in the Reducer code. We called them V3 in our previous notation. 
  1. public static class Reduce extends Reducer   
  2. {  
  3.     @Override  
  4.     public void reduce(Text key, Iterable values, Context context)  
  5.             throws IOException, InterruptedException   
  6.     {  
  7.         // "PTN_ID", "CITING_COUNT"  
  8.         int cnt=0;  
  9.         Iterator iter = values.iterator();  
  10.         while(iter.hasNext())   
  11.         {  
  12.             iter.next();  
  13.             cnt++;  
  14.         }  
  15.         context.write(key, new IntWritable(cnt));  
  16.     }  
  17. }  
You can use below script to run the counting MapReduce (): 
- runCount.sh 
  1. #!/bin/sh  
  2. OUTPUT="IA_Count"  
  3. hadoop fs -rm -r $OUTPUT  
  4. hadoop jar HadoopIA.jar ch4.counting.CountCiting IA/cite75_99.txt $OUTPUT  
  5. hadoop fs -cat IA_Count/* | less  
After running the previous example, we now have a data set that counts the number of citations for each patent. A neat exercise would be to count the counts. Let’s build a histogram of the citation counts. We expect a large number of patents to have been only cited once, and a small number may have been cited hundreds of times. It would be interesting to see the distribution of the citation counts. 

The first step to writing a MapReduce program is to figure out the data flow. In this case, as a mapper reads a record, it ignores the patent number and outputs an intermediate key/value pair of . The reducer will sum up the number of 1s for each citation count and output the total. 

After figuring out the data flow, decide on the types for the key/value pairs—K1, V1, K2, V2, K3, and V3 for the input, intermediate, and output key/value pairs. Let’s use the KeyValueTextInputFormat, which automatically breaks each input record into key/value pairs based on a separator character. The input format produces K1 and V1 as Text. We choose to use IntWritable for K2, V2, K3, and V3 because we know those data must be integers. 

Based on the data flow and the data types, you’ll be able to see the final program shown in listing 4.2 and understand what it’s doing. You can see that it’s structurally similar to the other MapReduce programs we’ve seen so far. 
- Listing 4.2 CitationHistogram.java: count patents cited once, twice, and so on 
  1. package ch4.counting;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.Iterator;  
  5.   
  6. import org.apache.hadoop.conf.Configuration;  
  7. import org.apache.hadoop.conf.Configured;  
  8. import org.apache.hadoop.fs.Path;  
  9. import org.apache.hadoop.io.IntWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapreduce.Job;  
  12. import org.apache.hadoop.mapreduce.Mapper;  
  13. import org.apache.hadoop.mapreduce.Reducer;  
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  15. import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;  
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  17. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  18. import org.apache.hadoop.util.Tool;  
  19. import org.apache.hadoop.util.ToolRunner;  
  20.   
  21.   
  22. public class CitationHistogram extends Configured implements Tool{  
  23.     public static class MapClass extends Mapper {  
  24.         private final static IntWritable uno = new IntWritable(1);  
  25.         private IntWritable citationCount = new IntWritable();  
  26.           
  27.         @Override  
  28.         public void map(Text key, Text value, Context context) throws IOException, InterruptedException {  
  29.             // PatentID, CitingNumber  
  30.             citationCount.set(Integer.valueOf(value.toString()));  
  31.             context.write(citationCount, uno);  
  32.         }  
  33.     }  
  34.       
  35.     public static class Reduce extends Reducer   
  36.     {  
  37.         @Override  
  38.         public void reduce(IntWritable key, Iterable values, Context context)  
  39.                 throws IOException, InterruptedException   
  40.         {  
  41.             // CitingNumber, Count  
  42.             int count=0;          
  43.             Iterator iter = values.iterator();  
  44.             while(iter.hasNext())   
  45.             {  
  46.                 iter.next();  
  47.                 count++;  
  48.             }  
  49.             context.write(key, new IntWritable(count));  
  50.         }  
  51.     }  
  52.   
  53.     @Override  
  54.     public int run(String[] args) throws Exception {  
  55.         Job job = new Job(getConf());  
  56.         Path in = new Path(args[0]);  
  57.         Path out = new Path(args[1]);  
  58.         FileInputFormat.setInputPaths(job, in);  
  59.         FileOutputFormat.setOutputPath(job, out);  
  60.         job.setJobName("CitationHistogram");  
  61.         job.setJarByClass(CitationHistogram.class);  
  62.         job.setMapperClass(MapClass.class);  
  63.         job.setReducerClass(Reduce.class);  
  64.         job.setMapOutputKeyClass(IntWritable.class);  
  65.         job.setMapOutputValueClass(IntWritable.class);  
  66.         job.setInputFormatClass(KeyValueTextInputFormat.class);  
  67.         job.setOutputFormatClass(TextOutputFormat.class);  
  68.         job.setOutputKeyClass(IntWritable.class);  
  69.         job.setOutputValueClass(IntWritable.class);  
  70.         //job.getConfiguration().set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");  
  71.         boolean success = job.waitForCompletion(true);    
  72.         return(success ? 0 : 1);   
  73.     }  
  74.   
  75.     public static void main(String[] args) throws Exception {  
  76.         int res = ToolRunner.run(new Configuration(), new CitationHistogram(), args);  
  77.         System.exit(res);  
  78.     }  
  79. }  
Below is the script to run MapReduce for Histogram data: 
- runHistogram.sh 
  1. #!/bin/sh  
  2. OUTPUT="IA_Histogram"  
  3. hadoop fs -rm -r $OUTPUT  
  4. hadoop jar HadoopIA.jar ch4.counting.CitationHistogram IA_Count $OUTPUT  
  5. hadoop fs -cat "${OUTPUT}"/* | less  
Running the MapReduce job on the citation count data will show the following result. As we suspect, a large number (900K+) of patents have only one citation, whereas some have hundreds of citations. The most popular patent has 779 citations. 
1 921128
2 552246
3 380319
4 278438
...
678 1
716 1
779 1

As this histogram output is only several hundred lines long, we can put it into a spreadsheet and plot it. Figure 4.2 shows the number of patents at various citation frequencies. The plot is on a log-log scale. When a distribution shows as a line in a log-log plot, it’s considered to be a power law distribution . The citation count histogram seems to fit the description, although its approximately parabolic curvature also suggests a lognormal distribution . 
 

Supplement 
How to specify KeyValueTextInputFormat Separator in Hadoop-.20 api? 
Listing current running Hadoop Jobs and Killing running Jobs

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...