2014年12月4日 星期四

[CCDH] Exercise8 - Using a Combiner (P33)

Preface
Files and Directories Used in this Exercise
Eclipse project: combiner

Java files:
WordCountDriver.java (Driver from WordCount)
WordMapper.java (Mapper from WordCount)
SumReducer.java (Reducer from WordCount)

Exercise directory: ~/workspace/combiner

In this exercise, you will add a Combiner to the WordCount program to reduce the amount of intermediate data sent from Mapper to Reducer.Because summing is associative and commutative, the same class can be used for both the Reducer and the Combiner.

Implement A Combiner
Mapper
The same as WordCount program
- solution/WordMapper.java
  1. package solution;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.LongWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.mapreduce.Mapper;  
  9.   
  10. /** 
  11. * This is the WordMapper class from the word count exercise. 
  12. */   
  13. public class WordMapper extends Mapper {  
  14.   @Override  
  15.   public void map(LongWritable key, Text value, Context context)  
  16.       throws IOException, InterruptedException {  
  17.     String line = value.toString();  
  18.     for (String word : line.split("\\W+")) {  
  19.       if (word.length() > 0) {  
  20.         context.write(new Text(word), new IntWritable(1));  
  21.       }  
  22.     }  
  23.   }  
  24. }  
Reducer
The same as WordCount project
- solution/SumReducer.java
  1. package solution;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.Reducer;  
  8.   
  9. /** 
  10. * This is the SumReducer class from the word count exercise. 
  11. */   
  12. public class SumReducer extends Reducer {  
  13.   
  14.   @Override  
  15.   public void reduce(Text key, Iterable values, Context context)  
  16.       throws IOException, InterruptedException {  
  17.     int wordCount = 0;  
  18.     for (IntWritable value : values) {  
  19.       wordCount += value.get();  
  20.     }  
  21.     context.write(key, new IntWritable(wordCount));  
  22.   }  
  23. }  
Driver
In WordCount program case, we can use Reducer to do the job of Combiner. So using API setCombinerClass(Class theClass) to indicateCombiner class through JobConf:
- solution/WordCountDriver.java
  1. package solution;  
  2.   
  3. import org.apache.hadoop.fs.Path;  
  4. import org.apache.hadoop.io.IntWritable;  
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  8. import org.apache.hadoop.mapreduce.Job;  
  9. import org.apache.hadoop.conf.Configured;  
  10. import org.apache.hadoop.conf.Configuration;  
  11. import org.apache.hadoop.util.Tool;  
  12. import org.apache.hadoop.util.ToolRunner;  
  13.   
  14. /* 
  15. * This driver class is called using the ToolRunner.run method 
  16. * call in the main method (below). Extending the Configured class  
  17. * enables the driver class to access Hadoop configuration options. 
  18. */  
  19. public class WordCountDriver extends Configured implements Tool {  
  20.   
  21.   @Override  
  22.   public int run(String[] args) throws Exception {  
  23.   
  24.     if (args.length != 2) {  
  25.       System.out.printf("Usage: WordCountDriver \n");  
  26.       return -1;  
  27.     }  
  28.   
  29.     Job job = new Job(getConf());  
  30.     job.setJarByClass(WordCountDriver.class);  
  31.     job.setJobName("Word Count Driver");  
  32.   
  33.     FileInputFormat.setInputPaths(job, new Path(args[0]));  
  34.     FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  35.   
  36.     job.setMapperClass(WordMapper.class);  
  37.     job.setReducerClass(SumReducer.class);  
  38.   
  39.     /* 
  40.      * Specify SumCombiner as the combiner class. 
  41.      */  
  42.     job.setCombinerClass(SumReducer.class);  
  43.   
  44.     job.setOutputKeyClass(Text.class);  
  45.     job.setOutputValueClass(IntWritable.class);  
  46.   
  47.     if (job.getCombinerClass() == null) {  
  48.       throw new Exception("Combiner not set");  
  49.     }  
  50.   
  51.     boolean success = job.waitForCompletion(true);  
  52.     return success ? 0 : 1;  
  53.   }  
  54.   
  55.   /* 
  56.    * The main method calls the ToolRunner.run method, which 
  57.    * calls an options parser that interprets Hadoop command-line 
  58.    * options and puts them into a Configuration object. 
  59.    */  
  60.   public static void main(String[] args) throws Exception {  
  61.     int exitCode = ToolRunner.run(new Configuration(), new WordCountDriver(), args);  
  62.     System.exit(exitCode);  
  63.   }  
  64. }  
Lab Experiment
1. Build the project and Run MapReduce
$ ant -f build.xml # build project
$ hadoop fs -rm -r combiner_out # Clean result from previous run
$ hadoop jar combiner.jar stubs.WordCountDriver shakespeare combiner_out # Run MapReduce

2. Check result
$ hadoop fs -cat combiner_out/*
...
yore 1
you 12697
young 430
younger 33
youngest 23
...


沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...