2014年12月11日 星期四

[ In Action ] Ch4. Writing basic MapReduce programs (Part3)

Improving performance with combiners (P95) 
We saw in AverageByAttributeMapper.py and AverageByAttributeReducer.py (Part2) how to compute the average for each attribute. The mapper reads each record and outputs a key/value pair for the record’s attribute and count. It shuffles the key/value pairs across the network, and the reducer computes the average for each key. In our example of computing the average number of claims for each country’s patents, we see at least two efficiency bottlenecks
1. If we have 1 billion input records, the mappers will generate 1 billion key/ value pairs that will be shuffled across the networkIf we were computing a function such as maximum, it’s obvious that the mapper only has to output the maximum for each key it has seen. Doing so would reduce network traffic and increase performance. For a function such as average , it’s a bit more complicated, but we can still redefine the algorithm such that for each mapper only one record is shuffled for each key.

2. Using country from the patent data set as key illustrates data skew. The data is far from uniformly distributed, as a significant majority of the records would have U.S. as the key. Not only does every key/value pair in the input map to a key/value pair in the intermediate data, most of the intermediate key/value pairs will end up at a single reducer, overwhelming it.

Hadoop solves these bottlenecks by extending the MapReduce framework with a combiner step in between the mapper and reducer. You can think of the combiner as a helper for the reducer. It’s supposed to whittle down the output of the mapper to lessen the load on the network and on the reducer. If we specify a combiner, the MapReduce framework may apply it zero, one, or more times to the intermediate data. In order for a combiner to work, it must be an equivalent transformation of the data with respect to the reducer. If we take out the combiner, the reducer’s output will remain the same. Furthermore, the equivalent transformation property must hold when the combiner is applied to arbitrary subsets of the intermediate data. 

If the reducer only performs a distributive function, such as maximum, minimum, and summation (counting), then we can use the reducer itself as the combiner. But many useful functions aren’t distributive. We can rewrite some of them, such as averaging to take advantage of a combiner. 

The averaging approach taken by AverageByAttributeMapper.py is to output only each key/value pair. AverageByAttributeReducer.py will count the number of key/value pairs it receives and sum up their values, in order for a single final division to compute the average. The main obstacle to using a combiner is the counting operation, as the reducer assumes the number of key/value pairs it receives is the number of key/value pairs in the input data. We can refactor the MapReduce program to track the count explicitly. The combiner becomes a simple summation function with the distributive property. 

Let’s first refactor the mapper and reducer before writing the combiner, as the operation of the MapReduce job must be correct even without a combiner. We write the new averaging program in Java as the combiner must be a Java class. 
NOTE. 
The Streaming API allows you to specify a combiner using the -combiner option. For versions up to at least 0.20, the combiner must still be a Java class. It’s best to write your mapper and reducer in a Java language. Fortunately, the Hadoop roadmap supports native Streaming scripts as combiners. In practice, one can get the equivalent of a combiner by setting the mapper to a Unix pipe ‘mapper.py | sort | combiner.py’. In addition, if you’re using the Aggregate package, each value aggregator already has a built-in (Java) combiner. The Aggregate package will automatically use these combiners.

Let’s write a Java mapper (listing 4.12) that’s analogous to AverageByAttributeMapper. py of listing 4.7. 
- Listing 4.12 Java equivalent of AverageByAttributeMapper.py 
  1. public static class MapClass extends Mapper {  
  2.     @Override  
  3.     public void map(LongWritable key, Text value, Context context)   
  4.             throws IOException, InterruptedException   
  5.     {  
  6.         String fields[] = value.toString().split(",", -20);  
  7.         String country = fields[4];  
  8.         String numClaims = fields[8];  
  9.         if (numClaims.length() > 0 && !numClaims.startsWith("\""))  
  10.         {  
  11.             context.write(new Text(country), new Text(String.format("%s,1", numClaims)));  
  12.         }  
  13.     }  
  14. }  
At the reducer, the list of values for each key are parsed. The total sum and count are then computed by summation and divided at the end to get the average . 
  1. public static class Reduce extends Reducer   
  2. {  
  3.     @Override  
  4.     public void reduce(Text key, Iterable values, Context context)  
  5.             throws IOException, InterruptedException   
  6.     {  
  7.         double sum = 0;  
  8.         int count = 0;  
  9.         for(Text value:values)   
  10.         {  
  11.             String fields[] = value.toString().split(",");  
  12.             sum += Double.parseDouble(fields[0]);  
  13.             count += Integer.parseInt(fields[1]);  
  14.         }  
  15.         context.write(key, new DoubleWritable(sum/count));  
  16.     }  
  17. }  
Programmatically, the combiner must extends the Reducer class. The combiner’s reduce() method performs the combining operation. This may seem like a bad naming scheme, but recall that for the important class of distributive functions , the combiner and the reducer perform the same operations. Therefore, the combiner has adopted the reducer’s signature to simplify its reuse. In the end, we’ve created a Combine class that looks similar to the Reduce class, except it only outputs the (partial) sum and count at the end, whereas the reducer computes the final average. 
  1. public static class Combine extends Reducer   
  2. {  
  3.     @Override  
  4.     public void reduce(Text key, Iterable values, Context context)  
  5.             throws IOException, InterruptedException   
  6.     {  
  7.         double sum = 0;  
  8.         int count = 0;  
  9.         for(Text value:values)  
  10.         {  
  11.             String fields[] = value.toString().split(",");  
  12.             sum += Double.parseDouble(fields[0]);  
  13.             count += Integer.parseInt(fields[1]);  
  14.         }  
  15.         context.write(key, new Text(sum + "," + count));  
  16.     }  
  17. }  
To enable the combiner, the driver must specify the combiner’s class to the Job object. You can do this through the setCombinerClass() method. The driver sets the mapper, combiner, and the reducer. Below is the complete code: 
- AverageByAttribute.java 
  1. package ch4.combiner;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.conf.Configured;  
  7. import org.apache.hadoop.fs.Path;  
  8. import org.apache.hadoop.io.DoubleWritable;  
  9. import org.apache.hadoop.io.IntWritable;  
  10. import org.apache.hadoop.io.LongWritable;  
  11. import org.apache.hadoop.io.Text;  
  12. import org.apache.hadoop.mapreduce.Job;  
  13. import org.apache.hadoop.mapreduce.Mapper;  
  14. import org.apache.hadoop.mapreduce.Reducer;  
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  17. import org.apache.hadoop.util.Tool;  
  18. import org.apache.hadoop.util.ToolRunner;  
  19.   
  20. import ch4.counting.CountCiting;  
  21.   
  22.   
  23. public class AverageByAttribute extends Configured implements Tool{  
  24.     public static class MapClass extends Mapper {  
  25.         @Override  
  26.         public void map(LongWritable key, Text value, Context context)   
  27.                 throws IOException, InterruptedException   
  28.         {  
  29.             String fields[] = value.toString().split(",", -20);  
  30.             String country = fields[4];  
  31.             String numClaims = fields[8];  
  32.             if (numClaims.length() > 0 && !numClaims.startsWith("\""))  
  33.             {  
  34.                 context.write(new Text(country), new Text(String.format("%s,1", numClaims)));  
  35.             }  
  36.         }  
  37.     }  
  38.       
  39.     public static class Reduce extends Reducer   
  40.     {  
  41.         @Override  
  42.         public void reduce(Text key, Iterable values, Context context)  
  43.                 throws IOException, InterruptedException   
  44.         {  
  45.             double sum = 0;  
  46.             int count = 0;  
  47.             for(Text value:values)   
  48.             {  
  49.                 String fields[] = value.toString().split(",");  
  50.                 sum += Double.parseDouble(fields[0]);  
  51.                 count += Integer.parseInt(fields[1]);  
  52.             }  
  53.             context.write(key, new DoubleWritable(sum/count));  
  54.         }  
  55.     }  
  56.       
  57.     public static class Combine extends Reducer   
  58.     {  
  59.         @Override  
  60.         public void reduce(Text key, Iterable values, Context context)  
  61.                 throws IOException, InterruptedException   
  62.         {  
  63.             double sum = 0;  
  64.             int count = 0;  
  65.             for(Text value:values)  
  66.             {  
  67.                 String fields[] = value.toString().split(",");  
  68.                 sum += Double.parseDouble(fields[0]);  
  69.                 count += Integer.parseInt(fields[1]);  
  70.             }  
  71.             context.write(key, new Text(sum + "," + count));  
  72.         }  
  73.     }  
  74.   
  75.     @Override  
  76.     public int run(String[] args) throws Exception {  
  77.         Job job = new Job(getConf());  
  78.         Path in = new Path(args[0]);  
  79.         Path out = new Path(args[1]);  
  80.         FileInputFormat.setInputPaths(job, in);  
  81.         FileOutputFormat.setOutputPath(job, out);  
  82.           
  83.         job.setJarByClass(AverageByAttribute.class);  
  84.         job.setJobName("AverageByAttribute");  
  85.         job.setMapperClass(MapClass.class);  
  86.         job.setReducerClass(Reduce.class);  
  87.         job.setCombinerClass(Combine.class);  
  88.         job.setMapOutputKeyClass(Text.class);  
  89.         job.setMapOutputValueClass(Text.class);  
  90.         job.setOutputKeyClass(Text.class);  
  91.         job.setOutputValueClass(DoubleWritable.class);    
  92.           
  93.         boolean success = job.waitForCompletion(true);    
  94.         return(success ? 0 : 1);   
  95.     }  
  96.   
  97.     public static void main(String args[])throws Exception   
  98.     {  
  99.         int res = ToolRunner.run(new Configuration(), new AverageByAttribute(), args);  
  100.         System.exit(res);  
  101.     }  
  102. }  
You can evaluate the program by following below steps: 
$ hadoop jar HadoopIA.jar ch4.combiner.AverageByAttribute IA/patn_100000.txt output_avg
$ hadoop fs -ls output_avg
...
-rw-r--r-- 1 training supergroup 794 2014-12-11 20:40 output_avg/part-r-00000

$ hadoop fs -cat output_avg/part-r-00000
"AR" 5.1
"AT" 8.666666666666666
"AU" 8.56
"BB" 6.0
...

A combiner doesn’t necessarily improve performance. You should monitor the job’s behavior to see if the number of records outputted by the combiner is meaningfully less than the number of records going in. The reduction must justify the extra execution time of running a combiner. You can easily check this through the JobTracker’s Web UI , which we’ll see in chapter 6. 

Looking at figure 4.5, note that in the map phase, combine has 1,984,625 input records and only 1,063 output records. Clearly the combiner has reduced the amount of intermediate data. Note that the reduce side executes the combiner, though the benefit of this is negligible in this case. 

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...