2014年11月24日 星期一

[CCDH] Class2 - Writing Basic MapReduce Programs (1)

Preface
In this chapter you will learn
* Basic MapReduce API concepts
* How to write MapReduce drivers, Mappers, and Reducers in Java
* The differences between the old and new MapReduce APIs

Basic MapReduce API Concepts
The MapReduce Flow


A Sample MapReduce Program - WordCount
This will demonstrate the Hadoop API


To investigate the API, we will dissect t he WordCount program we covered in the previous chapter. This consists of three portions:
* The driver code - Code that runs on the client to configure and submit the job
* The Mapper 
* The Reducer

Getting Data to the Mapper
The data passed to the Mapper is specified by an InputFormat:
*Specified in the driver code
* Defines the location of the input data - Typically a file or directory
* Determines how to split the input data into input splits
* Create a RecordReader object - RecordReader parses the input data into key/value pairs to pass to the Mapper

TextInputFormat is the default implementation of InputFormat which will create KeyValueLineRecordReader object to treat each "\n" as separator to break file into lines. The key is the byte offset of that line within the file and the value is each line.


There are other standard InputFormat implementation:
FileInputFormat - Abstract base class used for all file-based InputFormats
KeyValueTextInputFormat - Maps "\n"-terminated lines as 'key[separator]value'. By default, [separator] is a tab.
SequenceFileInputFormat - Binary file of (key, value) pairs with some additional metadata
SequenceFileAsTextInputFormat - Similar, but map(key.toString(), value.toString())
...

Keys and Values are Objects
* Keys and values in Hadoop are Java Objects - Not primitives
* Values are objects which implement Writable
* Keys are objects which implement WritableComparable
* Hadoop defines its own 'box classes' for strings, integers, and so on - IntWritableLongWritable and Text etc.

Writing MapReduce Applications in Java
The Driver Code - Introduction
The driver code runs on the client machine. It configures the job, then submit it to the cluster.
- WordCount.java
  1. package solution;  
  2.   
  3. import org.apache.hadoop.fs.Path;  
  4. import org.apache.hadoop.io.IntWritable;  
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  8. import org.apache.hadoop.mapreduce.Job;  
  9.   
  10. /*  
  11. * MapReduce jobs are typically implemented by using a driver class. 
  12. * The purpose of a driver class is to set up the configuration for the 
  13. * MapReduce job and to run the job. 
  14. * Typical requirements for a driver class include configuring the input 
  15. * and output data formats, configuring the map and reduce classes, 
  16. * and specifying intermediate data formats. 
  17.  
  18. * The following is the code for the driver class: 
  19. */  
  20. public class WordCount {  
  21.   
  22.   public static void main(String[] args) throws Exception {  
  23.   
  24.     /* 
  25.      * The expected command-line arguments are the paths containing 
  26.      * input and output data. Terminate the job if the number of 
  27.      * command-line arguments is not exactly 2. 
  28.      */  
  29.     if (args.length != 2) {  
  30.       System.out.printf(  
  31.           "Usage: WordCount \n");  
  32.       System.exit(-1);  
  33.     }  
  34.   
  35.     /* 
  36.      * Instantiate a Job object for your job's configuration.   
  37.      */  
  38.     Job job = new Job();  
  39.       
  40.     /* 
  41.      * Specify the jar file that contains your driver, mapper, and reducer. 
  42.      * Hadoop will transfer this jar file to nodes in your cluster running 
  43.      * mapper and reducer tasks. 
  44.      */  
  45.     job.setJarByClass(WordCount.class);  
  46.       
  47.     /* 
  48.      * Specify an easily-decipherable name for the job. 
  49.      * This job name will appear in reports and logs. 
  50.      */  
  51.     job.setJobName("Word Count");  
  52.   
  53.     /* 
  54.      * Specify the paths to the input and output data based on the 
  55.      * command-line arguments. 
  56.      */  
  57.     FileInputFormat.setInputPaths(job, new Path(args[0]));  
  58.     FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  59.   
  60.     /* 
  61.      * Specify the mapper and reducer classes. 
  62.      */  
  63.     job.setMapperClass(WordMapper.class);  
  64.     job.setReducerClass(SumReducer.class);  
  65.   
  66.     /* 
  67.      * For the word count application, the input file and output  
  68.      * files are in text format - the default format. 
  69.      *  
  70.      * In text format files, each record is a line delineated by a  
  71.      * by a line terminator. 
  72.      *  
  73.      * When you use other input formats, you must call the  
  74.      * SetInputFormatClass method. When you use other  
  75.      * output formats, you must call the setOutputFormatClass method. 
  76.      */  
  77.         
  78.     /* 
  79.      * For the word count application, the mapper's output keys and 
  80.      * values have the same data types as the reducer's output keys  
  81.      * and values: Text and IntWritable. 
  82.      *  
  83.      * When they are not the same data types, you must call the  
  84.      * setMapOutputKeyClass and setMapOutputValueClass  
  85.      * methods. 
  86.      */  
  87.   
  88.     /* 
  89.      * Specify the job's output key and value classes. 
  90.      */  
  91.     job.setOutputKeyClass(Text.class);  
  92.     job.setOutputValueClass(IntWritable.class);  
  93.   
  94.     /* 
  95.      * Start the MapReduce job and wait for it to finish. 
  96.      * If it finishes successfully, return 0. If not, return 1. 
  97.      */  
  98.     boolean success = job.waitForCompletion(true);  
  99.     System.exit(success ? 0 : 1);  
  100.   }  
  101. }  
1. Creating a New Job Object
The Job class allows you to set configuration options for your MapReduce job
* The class to be used for your Mapper and Reducer
* The input/output directories
* Many other options by given Configuration object to construction.

Any options not explicitly set in your driver code will be read from your Hadoop configuration files ($HADOOP_HOME/etc/hadoop/conf). Any options not specified in your configuration files will use Hadoop's default values. You can use Job object to submit the job, control its execution, and query its state.

2. Specifying Input/Output Directories
  1. /* 
  2. * Specify the paths to the input and output data based on the 
  3. * command-line arguments. 
  4. */  
  5. FileInputFormat.setInputPaths(job, new Path(args[0]));  
  6. FileOutputFormat.setOutputPath(job, new Path(args[1]));  
The default InputFormat (TextInputFormat) will be used unless you specify otherwise. To use other InputFormat than the default, use e.g.
  1. job.setInputFormatClass(KeyValueTextInputFormat.class)  
By default, FileInputFormat.setInputPaths() will read all files from a specified directory and send them to Mappers except items whose names begin with a period (.) or underscore (_). Or you can use Globs to restrict input. For example, /2010/*/01/*

Alternatively, FileInputFormat.addInputPath() can be called multiple times, specifying a single file or directory each time. More advanced filtering can be performed by implementing a PathFilter (Interface with a method named accept which returns true or false depending on whether or not the file should be processed).

FileOutputFormat.setOutputPath() specifies the directory to which the Reducers will write their final output. The default is a plain text file and can be explicitly written as job.setOutputFormatClass(...).

3. Specifying the Mapper/Reducer Classes
  1. /* 
  2. * Specify the mapper and reducer classes. 
  3. */  
  4. job.setMapperClass(WordMapper.class);  
  5. job.setReducerClass(SumReducer.class);  
Setting the Mapper/Reducer classes is optional! If not set in your driver code, Hadoop uses its defaults IdentityMapper and IdentityReducer:


4. Specifying Intermediate/Final Output data type
  1. /* 
  2. * Specify the intermediate key/value classes. 
  3. */  
  4. job.setMapOutputKeyClass(Text.class);  
  5. job.setMapOutputValueClass(IntWritable.class);  
  6.   
  7. /* 
  8. * Specify the job's output key and value classes. 
  9. */  
  10. job.setOutputKeyClass(Text.class);  
  11. job.setOutputValueClass(IntWritable.class);  
5. Running The Job
  1. /* 
  2. * Start the MapReduce job and wait for it to finish. 
  3. * If it finishes successfully, return 0. If not, return 1. 
  4. */  
  5. boolean success = job.waitForCompletion(true);  
  6. System.exit(success ? 0 : 1);  
There are two ways to run your MapReduce job:
job.waitForCompletion() - Block and wait for the job to complete before continuing
job.submit() - Doesn't block.

The client determines the proper division of input data into inputSplits, and then sends the job information to the JobTracker daemon on the cluster.

WordCount Mapper Review

- WordMapper.java
  1. package solution;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.LongWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.mapreduce.Mapper;  
  9.   
  10. /*  
  11. * To define a map function for your MapReduce job, subclass  
  12. * the Mapper class and override the map method. 
  13. * The class definition requires four parameters:  
  14. *   The data type of the input key 
  15. *   The data type of the input value 
  16. *   The data type of the output key (which is the input key type  
  17. *   for the reducer) 
  18. *   The data type of the output value (which is the input value  
  19. *   type for the reducer) 
  20. */  
  21.   
  22. public class WordMapper extends Mapper {  
  23.   
  24.   /* 
  25.    * The map method runs once for each line of text in the input file. 
  26.    * The method receives a key of type LongWritable, a value of type 
  27.    * Text, and a Context object. 
  28.    */  
  29.   @Override  
  30.   public void map(LongWritable key, Text value, Context context)  
  31.       throws IOException, InterruptedException {  
  32.   
  33.     /* 
  34.      * Convert the line, which is received as a Text object, 
  35.      * to a String object. 
  36.      */  
  37.     String line = value.toString();  
  38.   
  39.     /* 
  40.      * The line.split("\\W+") call uses regular expressions to split the 
  41.      * line up by non-word characters. 
  42.      *  
  43.      * If you are not familiar with the use of regular expressions in 
  44.      * Java code, search the web for "Java Regex Tutorial."  
  45.      */  
  46.     for (String word : line.split("\\W+")) {  
  47.       if (word.length() > 0) {  
  48.           
  49.         /* 
  50.          * Call the write method on the Context object to emit a key 
  51.          * and a value from the map method. 
  52.          */  
  53.         context.write(new Text(word), new IntWritable(1));  
  54.       }  
  55.     }  
  56.   }  
  57. }  
WordCount Reducer Review

- SumReducer.java
  1. package solution;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.Reducer;  
  8.   
  9. /*  
  10. * To define a reduce function for your MapReduce job, subclass  
  11. * the Reducer class and override the reduce method. 
  12. * The class definition requires four parameters:  
  13. *   The data type of the input key (which is the output key type  
  14. *   from the mapper) 
  15. *   The data type of the input value (which is the output value  
  16. *   type from the mapper) 
  17. *   The data type of the output key 
  18. *   The data type of the output value 
  19. */     
  20. public class SumReducer extends Reducer {  
  21.   
  22.   /* 
  23.    * The reduce method runs once for each key received from 
  24.    * the shuffle and sort phase of the MapReduce framework. 
  25.    * The method receives a key of type Text, a set of values of type 
  26.    * IntWritable, and a Context object. 
  27.    */  
  28.   @Override  
  29.     public void reduce(Text key, Iterable values, Context context)  
  30.             throws IOException, InterruptedException {  
  31.         int wordCount = 0;  
  32.           
  33.         /* 
  34.          * For each value in the set of values passed to us by the mapper: 
  35.          */  
  36.         for (IntWritable value : values) {  
  37.             
  38.           /* 
  39.            * Add the value to the word count counter for this key. 
  40.            */  
  41.             wordCount += value.get();  
  42.         }  
  43.           
  44.         /* 
  45.          * Call the write method on the Context object to emit a key 
  46.          * and a value from the reduce method.  
  47.          */  
  48.         context.write(key, new IntWritable(wordCount));  
  49.     }  
  50. }  
Ensure "Types" Match
Mappers and Reducers declare input/output type parameters through extended Mapper/Reducer with Generic. These must match the types used in the class:


Output types must also match those setting in the driver:


Supplement
Exercise2 - Running a MapReduce Job
Exercise3 - Writting a MapReduce Java Program
Exercise4 - More Practice With MapReduce Java Programs
Apache Hadoop 2.5.1 - MapReduce Tutorial
Hadoop2.2.0遇到NativeLibraries錯誤的解決過程
$ export HADOOP_ROOT_LOGGER=DEBUG,console # See more debug information


沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...