Preface
Files and Directories Used in this Exercise
In this exercise, you will add a Combiner to the WordCount program to reduce the amount of intermediate data sent from Mapper to Reducer.Because summing is associative and commutative, the same class can be used for both the Reducer and the Combiner.
Implement A Combiner
Mapper
The same as WordCount program
- solution/WordMapper.java
Reducer
The same as WordCount project
- solution/SumReducer.java
Driver
In WordCount program case, we can use Reducer to do the job of Combiner. So using API setCombinerClass(Class theClass) to indicateCombiner class through JobConf:
- solution/WordCountDriver.java
Lab Experiment
1. Build the project and Run MapReduce
2. Check result
Files and Directories Used in this Exercise
In this exercise, you will add a Combiner to the WordCount program to reduce the amount of intermediate data sent from Mapper to Reducer.Because summing is associative and commutative, the same class can be used for both the Reducer and the Combiner.
Implement A Combiner
Mapper
The same as WordCount program
- solution/WordMapper.java
- package solution;
- import java.io.IOException;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- /**
- * This is the WordMapper class from the word count exercise.
- */
- public class WordMapper extends Mapper
{ - @Override
- public void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- String line = value.toString();
- for (String word : line.split("\\W+")) {
- if (word.length() > 0) {
- context.write(new Text(word), new IntWritable(1));
- }
- }
- }
- }
The same as WordCount project
- solution/SumReducer.java
- package solution;
- import java.io.IOException;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- /**
- * This is the SumReducer class from the word count exercise.
- */
- public class SumReducer extends Reducer
{ - @Override
- public void reduce(Text key, Iterable
values, Context context) - throws IOException, InterruptedException {
- int wordCount = 0;
- for (IntWritable value : values) {
- wordCount += value.get();
- }
- context.write(key, new IntWritable(wordCount));
- }
- }
In WordCount program case, we can use Reducer to do the job of Combiner. So using API setCombinerClass(Class theClass) to indicateCombiner class through JobConf:
- solution/WordCountDriver.java
- package solution;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- /*
- * This driver class is called using the ToolRunner.run method
- * call in the main method (below). Extending the Configured class
- * enables the driver class to access Hadoop configuration options.
- */
- public class WordCountDriver extends Configured implements Tool {
- @Override
- public int run(String[] args) throws Exception {
- if (args.length != 2) {
- System.out.printf("Usage: WordCountDriver );
- return -1;
- }
- Job job = new Job(getConf());
- job.setJarByClass(WordCountDriver.class);
- job.setJobName("Word Count Driver");
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- job.setMapperClass(WordMapper.class);
- job.setReducerClass(SumReducer.class);
- /*
- * Specify SumCombiner as the combiner class.
- */
- job.setCombinerClass(SumReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- if (job.getCombinerClass() == null) {
- throw new Exception("Combiner not set");
- }
- boolean success = job.waitForCompletion(true);
- return success ? 0 : 1;
- }
- /*
- * The main method calls the ToolRunner.run method, which
- * calls an options parser that interprets Hadoop command-line
- * options and puts them into a Configuration object.
- */
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new Configuration(), new WordCountDriver(), args);
- System.exit(exitCode);
- }
- }
1. Build the project and Run MapReduce
2. Check result
沒有留言:
張貼留言