We saw in AverageByAttributeMapper.py and AverageByAttributeReducer.py (Part2) how to compute the average for each attribute. The mapper reads each record and outputs a key/value pair for the record’s attribute and count. It shuffles the key/value pairs across the network, and the reducer computes the average for each key. In our example of computing the average number of claims for each country’s patents, we see at least two efficiency bottlenecks:
Hadoop solves these bottlenecks by extending the MapReduce framework with a combiner step in between the mapper and reducer. You can think of the combiner as a helper for the reducer. It’s supposed to whittle down the output of the mapper to lessen the load on the network and on the reducer. If we specify a combiner, the MapReduce framework may apply it zero, one, or more times to the intermediate data. In order for a combiner to work, it must be an equivalent transformation of the data with respect to the reducer. If we take out the combiner, the reducer’s output will remain the same. Furthermore, the equivalent transformation property must hold when the combiner is applied to arbitrary subsets of the intermediate data.
If the reducer only performs a distributive function, such as maximum, minimum, and summation (counting), then we can use the reducer itself as the combiner. But many useful functions aren’t distributive. We can rewrite some of them, such as averaging to take advantage of a combiner.
The averaging approach taken by AverageByAttributeMapper.py is to output only each key/value pair. AverageByAttributeReducer.py will count the number of key/value pairs it receives and sum up their values, in order for a single final division to compute the average. The main obstacle to using a combiner is the counting operation, as the reducer assumes the number of key/value pairs it receives is the number of key/value pairs in the input data. We can refactor the MapReduce program to track the count explicitly. The combiner becomes a simple summation function with the distributive property.
Let’s first refactor the mapper and reducer before writing the combiner, as the operation of the MapReduce job must be correct even without a combiner. We write the new averaging program in Java as the combiner must be a Java class.
NOTE.
Let’s write a Java mapper (listing 4.12) that’s analogous to AverageByAttributeMapper. py of listing 4.7.
- Listing 4.12 Java equivalent of AverageByAttributeMapper.py
- public static class MapClass extends Mapper
{ - @Override
- public void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException
- {
- String fields[] = value.toString().split(",", -20);
- String country = fields[4];
- String numClaims = fields[8];
- if (numClaims.length() > 0 && !numClaims.startsWith("\""))
- {
- context.write(new Text(country), new Text(String.format("%s,1", numClaims)));
- }
- }
- }
- public static class Reduce extends Reducer
- {
- @Override
- public void reduce(Text key, Iterable
values, Context context) - throws IOException, InterruptedException
- {
- double sum = 0;
- int count = 0;
- for(Text value:values)
- {
- String fields[] = value.toString().split(",");
- sum += Double.parseDouble(fields[0]);
- count += Integer.parseInt(fields[1]);
- }
- context.write(key, new DoubleWritable(sum/count));
- }
- }
- public static class Combine extends Reducer
- {
- @Override
- public void reduce(Text key, Iterable
values, Context context) - throws IOException, InterruptedException
- {
- double sum = 0;
- int count = 0;
- for(Text value:values)
- {
- String fields[] = value.toString().split(",");
- sum += Double.parseDouble(fields[0]);
- count += Integer.parseInt(fields[1]);
- }
- context.write(key, new Text(sum + "," + count));
- }
- }
- AverageByAttribute.java
- package ch4.combiner;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.DoubleWritable;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import ch4.counting.CountCiting;
- public class AverageByAttribute extends Configured implements Tool{
- public static class MapClass extends Mapper
{ - @Override
- public void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException
- {
- String fields[] = value.toString().split(",", -20);
- String country = fields[4];
- String numClaims = fields[8];
- if (numClaims.length() > 0 && !numClaims.startsWith("\""))
- {
- context.write(new Text(country), new Text(String.format("%s,1", numClaims)));
- }
- }
- }
- public static class Reduce extends Reducer
- {
- @Override
- public void reduce(Text key, Iterable
values, Context context) - throws IOException, InterruptedException
- {
- double sum = 0;
- int count = 0;
- for(Text value:values)
- {
- String fields[] = value.toString().split(",");
- sum += Double.parseDouble(fields[0]);
- count += Integer.parseInt(fields[1]);
- }
- context.write(key, new DoubleWritable(sum/count));
- }
- }
- public static class Combine extends Reducer
- {
- @Override
- public void reduce(Text key, Iterable
values, Context context) - throws IOException, InterruptedException
- {
- double sum = 0;
- int count = 0;
- for(Text value:values)
- {
- String fields[] = value.toString().split(",");
- sum += Double.parseDouble(fields[0]);
- count += Integer.parseInt(fields[1]);
- }
- context.write(key, new Text(sum + "," + count));
- }
- }
- @Override
- public int run(String[] args) throws Exception {
- Job job = new Job(getConf());
- Path in = new Path(args[0]);
- Path out = new Path(args[1]);
- FileInputFormat.setInputPaths(job, in);
- FileOutputFormat.setOutputPath(job, out);
- job.setJarByClass(AverageByAttribute.class);
- job.setJobName("AverageByAttribute");
- job.setMapperClass(MapClass.class);
- job.setReducerClass(Reduce.class);
- job.setCombinerClass(Combine.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(DoubleWritable.class);
- boolean success = job.waitForCompletion(true);
- return(success ? 0 : 1);
- }
- public static void main(String args[])throws Exception
- {
- int res = ToolRunner.run(new Configuration(), new AverageByAttribute(), args);
- System.exit(res);
- }
- }
A combiner doesn’t necessarily improve performance. You should monitor the job’s behavior to see if the number of records outputted by the combiner is meaningfully less than the number of records going in. The reduction must justify the extra execution time of running a combiner. You can easily check this through the JobTracker’s Web UI , which we’ll see in chapter 6.
Looking at figure 4.5, note that in the map phase, combine has 1,984,625 input records and only 1,063 output records. Clearly the combiner has reduced the amount of intermediate data. Note that the reduce side executes the combiner, though the benefit of this is negligible in this case.
沒有留言:
張貼留言