2014年12月26日 星期五

[CCDH] Exercise7 - Using ToolRunner and Passing Parameters (P30)

Preface
File and Directories Used in this Exercise (P30)
Eclipse project: toolrunner
Java files:
AverageReducer.java (Reducer from AverageWordLength)
LetterMapper.java (Mapper from AverageWordLength)
AvgWordLength.java (Driver from AverageWordLength)

Exercise directory: ~/workspace/toolrunner

In this Exercise, you will implement a driver using ToolRunner.

Follow the steps below to start with the Average Word Length program you wrote in an earlier exercise, and modify the driver to use ToolRunner. Then modify the Mapper to reference a Boolean parameter called caseSensitive; if true, the mapper should treat upper and lower case letters as different; if false or unset, all letters should be converted to lower case.

Source Code
Driver
By using ToolRunner, you can easily pass argument in command line and the Mapper/Reducer can have different behavior based on the arguments given in command line. This time your driver class should extends Configured and implements Tool:
- solution/AvgWordLength.java
  1. package solution;  
  2.   
  3. import org.apache.hadoop.fs.Path;  
  4. import org.apache.hadoop.io.DoubleWritable;  
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  9. import org.apache.hadoop.mapreduce.Job;  
  10. import org.apache.hadoop.conf.Configured;  
  11. import org.apache.hadoop.conf.Configuration;  
  12. import org.apache.hadoop.util.Tool;  
  13. import org.apache.hadoop.util.ToolRunner;  
  14.   
  15.   
  16. public class AvgWordLength extends Configured implements Tool {  
  17.   
  18.     public static void main(String[] args) throws Exception {         
  19.         Configuration conf = new Configuration();  
  20.           
  21.         /* 
  22.          * set the caseSensitive configuration value for the job 
  23.          * programmatically. 
  24.          * Comment code out to set from the command line instead. 
  25.          */  
  26.         //conf.setBoolean("caseSensitive", false);  
  27.           
  28.         int exitCode = ToolRunner.run(conf, new AvgWordLength(),  
  29.                 args);  
  30.         System.exit(exitCode);  
  31.   
  32.     }  
  33.   
  34.     @Override  
  35.     public int run(String[] args) throws Exception {  
  36.         /* 
  37.          * Validate that two arguments were passed from the command line. 
  38.          */  
  39.         if (args.length != 2) {  
  40.             System.out  
  41.                     .printf("Usage: AvgWordLength \n");  
  42.             System.exit(-1);  
  43.         }  
  44.   
  45.         /* 
  46.          * Instantiate a Job object for your job's configuration. 
  47.          */  
  48.         Job job = new Job(getConf());  
  49.   
  50.         /* 
  51.          * Specify the jar file that contains your driver, mapper, and reducer. 
  52.          * Hadoop will transfer this jar file to nodes in your cluster running 
  53.          * mapper and reducer tasks. 
  54.          */  
  55.         job.setJarByClass(AvgWordLength.class);  
  56.   
  57.         /* 
  58.          * Specify an easily-decipherable name for the job. This job name will 
  59.          * appear in reports and logs. 
  60.          */  
  61.         job.setJobName("Average Word Length");  
  62.   
  63.         /* 
  64.          * Specify the paths to the input and output data based on the 
  65.          * command-line arguments. 
  66.          */  
  67.         FileInputFormat.setInputPaths(job, new Path(args[0]));  
  68.         FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  69.   
  70.         /* 
  71.          * Specify the mapper and reducer classes. 
  72.          */  
  73.         job.setMapperClass(LetterMapper.class);  
  74.         job.setReducerClass(AverageReducer.class);  
  75.   
  76.         /* 
  77.          * The input file and output files are text files, so there is no need 
  78.          * to call the setInputFormatClass and setOutputFormatClass methods. 
  79.          */  
  80.   
  81.         /* 
  82.          * The mapper's output keys and values have different data types than 
  83.          * the reducer's output keys and values. Therefore, you must call the 
  84.          * setMapOutputKeyClass and setMapOutputValueClass methods. 
  85.          */  
  86.         job.setMapOutputKeyClass(Text.class);  
  87.         job.setMapOutputValueClass(IntWritable.class);  
  88.   
  89.         /* 
  90.          * Specify the job's output key and value classes. 
  91.          */  
  92.         job.setOutputKeyClass(Text.class);  
  93.         job.setOutputValueClass(DoubleWritable.class);  
  94.   
  95.         /* 
  96.          * Start the MapReduce job and wait for it to finish. If it finishes 
  97.          * successfully, return 0. If not, return 1. 
  98.          */  
  99.         boolean success = job.waitForCompletion(true);  
  100.         return(success ? 0 : 1);  
  101.     }  
  102. }  
Mapper
The mapper reads parameter caseSensitive from setup() and uses it to decide your letter is case sensitive or not:
- solution/LetterMapper.java
  1. package solution;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.LongWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.mapreduce.Mapper;  
  9. import org.apache.hadoop.conf.Configuration;  
  10.   
  11. /** 
  12. * To define a map function for your MapReduce job, subclass the Mapper class 
  13. * and override the map method. The class definition requires four parameters: 
  14.  
  15. * @param The 
  16. *            data type of the input key - LongWritable 
  17. * @param The 
  18. *            data type of the input value - Text 
  19. * @param The 
  20. *            data type of the output key - Text 
  21. * @param The 
  22. *            data type of the output value - IntWritable 
  23. */  
  24. public class LetterMapper extends Mapper {  
  25.   
  26.     boolean caseSensitive = false;  
  27.   
  28.     /** 
  29.      * The map method runs once for each line of text in the input file. The 
  30.      * method receives: 
  31.      *  
  32.      * @param A 
  33.      *            key of type LongWritable 
  34.      * @param A 
  35.      *            value of type Text 
  36.      * @param A 
  37.      *            Context object. 
  38.      */  
  39.     @Override  
  40.     public void map(LongWritable key, Text value, Context context)  
  41.             throws IOException, InterruptedException {  
  42.   
  43.         /* 
  44.          * Convert the line, which is received as a Text object, to a String 
  45.          * object. 
  46.          */  
  47.         String line = value.toString();  
  48.   
  49.         /* 
  50.          * The line.split("\\W+") call uses regular expressions to split the 
  51.          * line up by non-word characters. If you are not familiar with the use 
  52.          * of regular expressions in Java code, search the web for 
  53.          * "Java Regex Tutorial." 
  54.          */  
  55.         for (String word : line.split("\\W+")) {  
  56.             if (word.length() > 0) {  
  57.   
  58.                 /* 
  59.                  * Obtain the first letter of the word 
  60.                  */  
  61.                 String letter;  
  62.   
  63.                 if (caseSensitive)  
  64.                     letter = word.substring(01);  
  65.                 else  
  66.                     letter = word.substring(01).toLowerCase();  
  67.   
  68.                 /* 
  69.                  * Call the write method on the Context object to emit a key and 
  70.                  * a value from the map method. The key is the letter (in 
  71.                  * lower-case) that the word starts with; the value is the 
  72.                  * word's length. 
  73.                  */  
  74.                 context.write(new Text(letter), new IntWritable(word.length()));  
  75.             }  
  76.         }  
  77.     }  
  78.   
  79.     @Override  
  80.     public void setup(Context context) {  
  81.         Configuration conf = context.getConfiguration();  
  82.         caseSensitive = conf.getBoolean("caseSensitive"false);  
  83.   
  84.     }  
  85. }  
Reducer
The reducer is doing sum operation and output the frequency of letter:
- solution/AverageReducer.java
  1. package solution;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.DoubleWritable;  
  6. import org.apache.hadoop.io.IntWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.mapreduce.Reducer;  
  9.   
  10. /** 
  11. * To define a reduce function for your MapReduce job, subclass 
  12. * the Reducer class and override the reduce method. 
  13. * The class definition requires four parameters:  
  14. * @param The data type of the input key - Text 
  15. * @param The data type of the input value - IntWritable 
  16. * @param The data type of the output key - Text 
  17. * @param The data type of the output value - DoubleWritable 
  18. */  
  19. public class AverageReducer extends  
  20.     Reducer {  
  21.   
  22.   /** 
  23.    * The reduce method runs once for each key received from 
  24.    * the shuffle and sort phase of the MapReduce framework. 
  25.    * The method receives: 
  26.    * @param A key of type Text 
  27.    * @param A set of values of type IntWritable 
  28.    * @param A Context object 
  29.    */  
  30.   @Override  
  31.   public void reduce(Text key, Iterable values, Context context)  
  32.       throws IOException, InterruptedException {  
  33.   
  34.     long sum = 0, count = 0;  
  35.   
  36.     /* 
  37.      * For each value in the set of values passed to us by the mapper: 
  38.      */  
  39.     for (IntWritable value : values) {  
  40.         
  41.       /* 
  42.        * Add up the values and increment the count 
  43.        */  
  44.       sum += value.get();  
  45.       count++;  
  46.     }  
  47.     if (count != 0) {  
  48.         
  49.       /* 
  50.        * The average length is the sum of the values divided by the count. 
  51.        */  
  52.       double result = (double)sum / (double)count;  
  53.        
  54.       /* 
  55.        * Call the write method on the Context object to emit a key 
  56.        * (the words' starting letter) and a value (the average length  
  57.        * per word starting with this letter) from the reduce method.  
  58.        */  
  59.       context.write(key, new DoubleWritable(result));  
  60.     }  
  61.   }  
  62. }  
Lab Experiment
1. Build project and run MapReduce
$ ant -f build.xml # Build project
$ hadoop fs -rm -r toolrunnerout # Remove toolrunnerout directory in HDFS in case it exist
$ hadoop jar toolrunner.jar solution.AvgWordLength -DcaseSensitive=false shakespeare toolrunnerout
# You can change argument -DcaseSensitive to modify the behavior of Mapper.

2. Review the result
$ hadoop fs -cat toolrunnerout/*


沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...