程式扎記: [CCDH] Exercise12 - Writing a Partitioner (P43)

標籤

2014年12月14日 星期日

[CCDH] Exercise12 - Writing a Partitioner (P43)

Preface
Files and Directories Used in this Exercise
Eclipse project: partitioner
Java files:
MonthPartitioner.java (Partitioner)
ProcessLogs.java (Driver)
CountReducer.java (Reducer)
LogMonthMapper.java (Mapper)

Test data (HDFS):
weblog (full web server access log)
testlog (partial data set for testing)

Exercise directory: ~/workspace/partitioner

In this Exercise, you will write a MapReduce job with multiple Reducers, and create a Partitioner to determine which Reducer each piece of Mapper output is sent to.

The Problem
In the "More Practice with Writing MapReduce Java Programs" exercise you did previously, you built the code in log_file_analysis project. That program counted the number of hits for each different IP address in a web log file. The final output was a file containing a list of IP addresses, and the number of hits from that address.

This time, we want to perform a similar task, but we want the final output to consist of 12 files, one each for each month of the year: January, February, and son on. Each file will contain a list of IP address, and the number of hits from that address in that month.

We will accomplish this by having 12 Reducers, each of which is responsible for processing the data for a particular month. Reducer 0 processes January hits, Reducer 1 processes February hits, and so on.
Note: 
We are actually breaking the standard MapReduce paradigm here, which says that all the values from a particular key will go to the same Reducer. In this example, which is a very common pattern when analyzing log files, values from the same key (the IP address) will go to multiple Reducers, based on the month portion of the line.

Sample Code
The mapper will analyze each line of log and extract IP address and month information for (key, value)=(IP address, month):
- Mapper
  1. package solution;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.Arrays;  
  5. import java.util.List;  
  6.   
  7. import org.apache.hadoop.io.LongWritable;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapreduce.Mapper;  
  10.   
  11. public class LogMonthMapper extends Mapper {  
  12.   
  13.     public static List months = Arrays.asList("Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec");  
  14.   
  15.   /** 
  16.    * Example input line: 
  17.    * 96.7.4.14 - - [24/Apr/2011:04:20:11 -0400] "GET /cat.jpg HTTP/1.1" 200 12433 
  18.    * 
  19.    */  
  20.   @Override  
  21.   public void map(LongWritable key, Text value, Context context)  
  22.       throws IOException, InterruptedException {  
  23.       
  24.     /* 
  25.      * Split the input line into space-delimited fields. 
  26.      */  
  27.     String[] fields = value.toString().split(" ");  
  28.       
  29.     if (fields.length > 3) {  
  30.         
  31.       /* 
  32.        * Save the first field in the line as the IP address. 
  33.        */  
  34.       String ip = fields[0];  
  35.         
  36.       /* 
  37.        * The fourth field contains [dd/Mmm/yyyy:hh:mm:ss]. 
  38.        * Split the fourth field into "/" delimited fields. 
  39.        * The second of these contains the month. 
  40.        */  
  41.       String[] dtFields = fields[3].split("/");  
  42.       if (dtFields.length > 1) {  
  43.         String theMonth = dtFields[1];  
  44.           
  45.         /* check if it's a valid month, if so, write it out */  
  46.         if (months.contains(theMonth))  
  47.             context.write(new Text(ip), new Text(theMonth));  
  48.       }  
  49.     }  
  50.   }  
  51. }  
The partitioner will extends Partitioner class and implement getPartition(KEY key, VALUE value, int numPartitions):
- Month Partitioner
  1. package solution;  
  2.   
  3. import java.util.HashMap;  
  4.   
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.conf.Configurable;  
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.mapreduce.Partitioner;  
  9.   
  10. public class MonthPartitioner extends Partitioner implements  
  11.     Configurable {  
  12.   
  13.   private Configuration configuration;  
  14.   HashMap months = new HashMap();  
  15.   
  16.   /** 
  17.    * Set up the months hash map in the setConf method. 
  18.    */  
  19.   @Override  
  20.   public void setConf(Configuration configuration) {  
  21.     this.configuration = configuration;  
  22.     months.put("Jan"0);  
  23.     months.put("Feb"1);  
  24.     months.put("Mar"2);  
  25.     months.put("Apr"3);  
  26.     months.put("May"4);  
  27.     months.put("Jun"5);  
  28.     months.put("Jul"6);  
  29.     months.put("Aug"7);  
  30.     months.put("Sep"8);  
  31.     months.put("Oct"9);  
  32.     months.put("Nov"10);  
  33.     months.put("Dec"11);  
  34.   }  
  35.   
  36.   /** 
  37.    * Implement the getConf method for the Configurable interface. 
  38.    */  
  39.   @Override  
  40.   public Configuration getConf() {  
  41.     return configuration;  
  42.   }  
  43.   
  44.   /** 
  45.    * You must implement the getPartition method for a partitioner class. 
  46.    * This method receives the three-letter abbreviation for the month 
  47.    * as its value. (It is the output value from the mapper.) 
  48.    * It should return an integer representation of the month. 
  49.    * Note that January is represented as 0 rather than 1. 
  50.    *  
  51.    * For this partitioner to work, the job configuration must have been 
  52.    * set so that there are exactly 12 reducers. 
  53.    */  
  54.   public int getPartition(Text key, Text value, int numReduceTasks) {  
  55.     return (int) (months.get(value.toString()));  
  56.   }  
  57. }  
The reducer is a simple count reducer:
- Reducer
  1. package solution;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.Reducer;  
  8.   
  9. /* Counts the number of values associated with a key */  
  10.   
  11. public class CountReducer extends Reducer {  
  12.   
  13.     @Override  
  14.     public void reduce(Text key, Iterable values, Context context)  
  15.             throws IOException, InterruptedException {  
  16.   
  17.         /* 
  18.          * Iterate over the values iterable and count the number 
  19.          * of values in it. Emit the key (unchanged) and an IntWritable 
  20.          * containing the number of values. 
  21.          */  
  22.   
  23.         int count = 0;  
  24.   
  25.         /* 
  26.          * Use for loop to count items in the iterator.  
  27.          */  
  28.           
  29.         /* Ignore warnings that we 
  30.          * don't use the value -- in this case, we only need to count the 
  31.          * values, not use them. 
  32.          */  
  33.         for (@SuppressWarnings("unused")  
  34.         Text value : values) {  
  35.   
  36.             /* 
  37.              * for each item in the list, increment the count 
  38.              */  
  39.             count++;  
  40.         }  
  41.   
  42.         context.write(key, new IntWritable(count));  
  43.     }  
  44. }  
Finally, below is the driver class:
- Driver 
  1. package solution;  
  2.   
  3. import org.apache.hadoop.fs.Path;  
  4. import org.apache.hadoop.io.IntWritable;  
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  8. import org.apache.hadoop.mapreduce.Job;  
  9.   
  10. public class ProcessLogs {  
  11.   
  12.   public static void main(String[] args) throws Exception {  
  13.   
  14.     if (args.length != 2) {  
  15.       System.out.printf("Usage: ProcessLogs \n");  
  16.       System.exit(-1);  
  17.     }  
  18.   
  19.     Job job = new Job();  
  20.     job.setJarByClass(ProcessLogs.class);  
  21.     job.setJobName("Process Logs");  
  22.   
  23.     FileInputFormat.setInputPaths(job, new Path(args[0]));  
  24.     FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  25.   
  26.     job.setMapperClass(LogMonthMapper.class);  
  27.     job.setReducerClass(CountReducer.class);  
  28.       
  29.     job.setMapOutputKeyClass(Text.class);  
  30.     job.setMapOutputValueClass(Text.class);  
  31.   
  32.     job.setOutputKeyClass(Text.class);  
  33.     job.setOutputValueClass(IntWritable.class);  
  34.       
  35.     /* 
  36.      * Set up the partitioner. Specify 12 reducers - one for each 
  37.      * month of the year. The partitioner class must have a  
  38.      * getPartition method that returns a number between 0 and 11. 
  39.      * This number will be used to assign the intermediate output 
  40.      * to one of the reducers. 
  41.      */  
  42.     job.setNumReduceTasks(12);  
  43.       
  44.     /* 
  45.      * Specify the partitioner class. 
  46.      */  
  47.     job.setPartitionerClass(MonthPartitioner.class);  
  48.   
  49.     boolean success = job.waitForCompletion(true);  
  50.     System.exit(success ? 0 : 1);  
  51.   }  
  52. }  
Remember to set number of reducer to 12 by setNumReduceTasks(12) and setup partitioner class by setPartitionerClass(MonthPartitioner.class)

Lab Experiment
1. Compile project and run MapReduce job
$ ant -f build.xml # build project and output partitioner.jar
$ hadoop fs -rm -r output # Clean previous result
$ hadoop jar partitioner.jar solution.ProcessLogs testlog output # Run MapReduce job

2. Check output result
$ hadoop fs -ls output
...
... output/part-r-00000
...
... output/part-r-00011
 # part-r-00000~part-r-00011
$ hadoop fs -cat output/part-r-00006
10.114.184.86 1
10.153.239.5 547
10.187.129.140 18
10.207.190.45 21
10.216.113.172 368
10.223.157.186 115
10.82.30.199 183


沒有留言:

張貼留言

網誌存檔

關於我自己

我的相片
Where there is a will, there is a way!