2014年11月27日 星期四

[CCDH] Class3 - Programming with Hadoop Core API (1)

Preface 
In this chapter you will learn (P255
* How to use the ToolRunner class
* How to decrease the amount of intermediate data with Combiners
* How to set up and tear down Mappers and Reducer using the setup and cleanup methods.
* How to access HDFS programmatically
* How to use the distributed cache
* How to use the Hadoop API's library of Mapper, Reducers, and Partitioners.

Using the ToolRunner Class 
You can use ToolRunner in MapReduce driver classes which is not required but rather a best practice. It uses the GenericOptionsParser class internally which: 
* Allow you to specify configuration options from the command line.
* Allow you to specify items for the Distributed Cache from the command line.

Implement ToolRunner - Imports 
  1. import org.apache.hadoop.fs.Path;  
  2. import org.apache.hadoop.io.IntWritable;  
  3. import org.apache.hadoop.io.Text;  
  4. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  5. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  6. import org.apache.hadoop.mapreduce.Job;  
  7. import org.apache.hadoop.conf.Configured;  
  8. import org.apache.hadoop.conf.Configuration;  
  9. import org.apache.hadoop.util.Tool;  
  10. import org.apache.hadoop.util.ToolRunner;  
Implement ToolRunner - Driver Class Definition 
The driver class implements the Tool interface and extends the Configured class: 
  1. public class AvgWordLength extends Configured implements Tool {  
  2.     public static void main(String[] args) throws Exception {...}  
  3.     @Override  
  4.     public int run(String[] args) throws Exception {...}  
  5. }  
Implement ToolRunner - Main Method 
The driver main method calls ToolRunner.run
  1. ...  
  2. public static void main(String[] args) throws Exception {  
  3.     Configuration conf = new Configuration();         
  4.     int exitCode = ToolRunner.run(conf, new AvgWordLength(), args);  
  5.     System.exit(exitCode);  
  6. }  
  7. ...  
Implement ToolRunner - Run Method 
The driver run method creates, configures, and submits the job: 
  1. @Override  
  2. public int run(String[] args) throws Exception {  
  3.     /* 
  4.      * Validate that two arguments were passed from the command line. 
  5.      */  
  6.     if (args.length != 2) {  
  7.         System.out.printf("Usage: AvgWordLength \n");  
  8.         System.exit(-1);  
  9.     }  
  10.   
  11.     /* 
  12.      * Instantiate a Job object for your job's configuration. 
  13.      */  
  14.     Job job = new Job(getConf());  
  15.   
  16.     /* 
  17.      * Specify the jar file that contains your driver, mapper, and reducer. 
  18.      * Hadoop will transfer this jar file to nodes in your cluster running 
  19.      * mapper and reducer tasks. 
  20.      */  
  21.     job.setJarByClass(AvgWordLength.class);  
  22.   
  23.     /* 
  24.      * Specify an easily-decipherable name for the job. This job name will 
  25.      * appear in reports and logs. 
  26.      */  
  27.     job.setJobName("Average Word Length");  
  28.   
  29.     /* 
  30.      * Specify the paths to the input and output data based on the 
  31.      * command-line arguments. 
  32.      */  
  33.     FileInputFormat.setInputPaths(job, new Path(args[0]));  
  34.     FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  35.   
  36.     /* 
  37.      * Specify the mapper and reducer classes. 
  38.      */  
  39.     job.setMapperClass(LetterMapper.class);  
  40.     job.setReducerClass(AverageReducer.class);  
  41.   
  42.     /* 
  43.      * The input file and output files are text files, so there is no need 
  44.      * to call the setInputFormatClass and setOutputFormatClass methods. 
  45.      */  
  46.   
  47.     /* 
  48.      * The mapper's output keys and values have different data types than 
  49.      * the reducer's output keys and values. Therefore, you must call the 
  50.      * setMapOutputKeyClass and setMapOutputValueClass methods. 
  51.      */  
  52.     job.setMapOutputKeyClass(Text.class);  
  53.     job.setMapOutputValueClass(IntWritable.class);  
  54.   
  55.     /* 
  56.      * Specify the job's output key and value classes. 
  57.      */  
  58.     job.setOutputKeyClass(Text.class);  
  59.     job.setOutputValueClass(DoubleWritable.class);  
  60.   
  61.     /* 
  62.      * Start the MapReduce job and wait for it to finish. If it finishes 
  63.      * successfully, return 0. If not, return 1. 
  64.      */  
  65.     boolean success = job.waitForCompletion(true);  
  66.     return(success ? 0 : 1);  
  67. }  
ToolRunner allows the user to specify configuration options on the command line. Commonly used to specify Hadoop properties using the -D floag. It will override any default or site properties in the configuration. For example: 
 

Note that -D options must appear before any additional program arguments. Besides, you can: 
* Specify an XML configuration file with -conf
* Specify the default filesystem with -fs uri - Shortcut for -D fs.default.name=uri
* ...

Setting Up and Tearing Down Mappers and Reducers 
The setup Method 
It is common to want your Mapper or Reducer to execute some code before the map or reduce method is called for the first time: 
* Initialize data structures
* Read data from an external file
* Set parameters

 

The cleanup Method 
Similarly, you may wish to perform some action(s) after all the records have been processed by your Mapper or Reducer. The cleanup method is called before the Mapper or Reducer terminates. 
 

Pass Parameters 
You can setup parameters in driver and the mapper/reducer can fetch those parameters to customize MapReduce behavior: 
 

Decreasing the Amount of Intermediate Data with Combiners 
Often, Mapper produce large amounts of intermediate data which must be passed to the Reducers and can result in a lot of network traffic. It is often possible to specify a Combiner which likes a 'mini-Reducer' and runs locally on a single Mapper's output. 

Combiner and Reducer code are often identical. Technically, this is possible if the operation performed is commutative and associative. Input and output data types for the Combiner/Reducer must be identical
 

WordCount Revisited 
 

WordCount with Combiner 
 

Writing a Combiner 
The Combiner uses the same signature as the Reducer: 
  1. public void reduce(Key key, Iterable values,  
  2.                       Context context) throws IOException, InterruptedException   
  3. {  
  4.     ....  
  5. }  
Combiners and Reducers 
Some Reducers may be used as Combiners - If operation is associative and commutative, e.g., SumReader
 

Some Reducers cannot be used as a Combiner, e.g., AverageReducer
 

Specifying a Combiner 
Specify the Combiner class to be used in your MapReduce code in the driver through setCombinerClas method: 
  1. job.setCombinerClass(SumReducer.class);  
Input and output data types for the Combiner and the Reducer for a job must be identical. The Combiner may run once, or more than once, on the output from any given Mapper. So don't put code in the Combiner which could influence your results if it run more than once. 

Supplement 
Exercise7 - Using ToolRunner and Passing Parameters 
Exercise8 - Writing a Partitioner

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...