程式扎記: [CCDH] Exercise3 - Writting a MapReduce Java Program (P16)

標籤

2014年12月3日 星期三

[CCDH] Exercise3 - Writting a MapReduce Java Program (P16)

Preface 
Projects and Directories Used in this Exercise 
Eclipse project: averagewordlength
Java files:
AverageReducer.java (Reducer)
LetterMapper.java (Mapper)
AvgWordLength.java (driver)

Test data (HDFS):
shakesepare

Exercise directory: ~/workspace/averagewordlength

In this exercise you write a MapReduce job that reads any text input and computes the average length of all words that start with each char. For any text input, the job should report the average length of words that begin with 'a', 'b' and so forth. For example, for input: 
  1. No now is definitely not the time  

The output would be: 
N 2.0
n 3.0
d 10.0
i 2.0
t 3.5

The Algorithm 
The algorithm for this program is a simple one-pass MapReduce program: 

The Mapper 
The Mapper receives a line of text for each input value. (Ignore the input key.) For each word in the line, emit the first letter of the word as a key, and the length of the word as a value. Check source code below: 
- LetterMapper.java 
  1. package stubs;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.LongWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.mapreduce.Job;  
  9. import org.apache.hadoop.mapreduce.Mapper;  
  10.   
  11. public class LetterMapper extends Mapper {  
  12.   
  13.   @Override  
  14.   public void map(LongWritable key, Text value, Context context)  
  15.       throws IOException, InterruptedException {  
  16.   
  17.     for(String token:value.toString().split("\\W"))  
  18.     {  
  19.         if(token.length()>0)   
  20.             context.write(new Text(String.valueOf(token.charAt(0))), new IntWritable(token.length()));  
  21.     }  
  22.   }  
  23. }  
The Reducer 
Thanks to the shuffle and sort phrase built into MapReduce, the Reduce receives the keys in sorted order, and all the values for one key are grouped together. So for the Mapper output above, the Receive source code as below: 
- AverageReducer.java 
  1. package stubs;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.DoubleWritable;  
  6. import org.apache.hadoop.io.IntWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.mapreduce.Reducer;  
  9.   
  10. public class AverageReducer extends  
  11.         Reducer {  
  12.   
  13.     @Override  
  14.     public void reduce(Text key, Iterable values, Context context)  
  15.             throws IOException, InterruptedException {  
  16.         Double wls = 0.0;  
  17.         Double cnt = 0.0;  
  18.         for (IntWritable wl : values) {  
  19.             wls += wl.get();  
  20.             cnt++;  
  21.         }  
  22.         context.write(key, new DoubleWritable(wls / cnt));  
  23.     }  
  24. }  
The Driver 
The driver is almost the same as the one in WordCount. Source code as below: 
- AvgWordLength.java 
  1. package stubs;  
  2. import java.util.Iterator;  
  3. import java.util.Map.Entry;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.fs.Path;  
  7. import org.apache.hadoop.io.DoubleWritable;  
  8. import org.apache.hadoop.io.IntWritable;  
  9. import org.apache.hadoop.io.Text;  
  10. import org.apache.hadoop.mapreduce.Job;  
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  13.   
  14. public class AvgWordLength {  
  15.   
  16.   public static void main(String[] args) throws Exception {  
  17.     if (args.length != 2) {  
  18.       System.out.printf("Usage: AvgWordLength \n");  
  19.       System.exit(-1);  
  20.     }  
  21.   
  22.     Job job = new Job();  
  23.       
  24.     job.setJarByClass(AvgWordLength.class);  
  25.       
  26.     job.setJobName("Average Word Length");  
  27.   
  28.     FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  29.     FileInputFormat.setInputPaths(job, new Path(args[0]));  
  30.       
  31.     job.setMapperClass(LetterMapper.class);  
  32.     job.setReducerClass(AverageReducer.class);  
  33.       
  34.     job.setMapOutputKeyClass(Text.class);  
  35.     job.setMapOutputValueClass(IntWritable.class);  
  36.       
  37.     job.setOutputKeyClass(Text.class);  
  38.     job.setOutputValueClass(DoubleWritable.class);  
  39.       
  40.     /* 
  41.      * Start the MapReduce job and wait for it to finish. 
  42.      * If it finishes successfully, return 0. If not, return 1. 
  43.      */  
  44.     boolean success = job.waitForCompletion(true);  
  45.     System.exit(success ? 0 : 1);  
  46.   }  
  47. }  
Lab Experiment 
Under path ~/workspace/averagewordlength, you should have a build.xml file which you can use ant to build the project. 
1. Build the project 
$ ant -f build.xml # The build process will output 'averagewordlength.jar'

2. Run the MapReduce program 
$ hadoop jar averagewordlength.jar solution.AvgWordLength shakespeare wordlengths
$ hadoop fs -ls wordlengths # The result will inside wordlengths in HDFS
...
... 2014-12-03 06:09 wordlengths/part-r-00000

3. Review the results 
$ hadoop fs -cat wordlengths/*
...
s 4.327014649237208
t 3.733261651336357
u 4.4905590522028875
v 5.726228030644434
w 4.3475752474027844
y 3.5292446231858716
z 4.672727272727273

The file should list all the numbers and letters in the data set, and the average length of the words starting with them.

沒有留言:

張貼留言

網誌存檔

關於我自己

我的相片
Where there is a will, there is a way!