2013年11月21日 星期四

[ 深入雲計算 ] 初識 MapReduce : 實例之 Inverted Index

Preface: 
這裡要介紹基於 MapReduce 分布編程模型下的簡單 Inverted Index 的建立過程, Inverted Index 是 IR 系統中常見的數據結構, 被廣泛的應用於全文搜索引擎. 它主要用來存儲某個單詞 (或詞組) 在一個文檔或一組文檔中的存儲位置的映射, 即提供一種根據內容來查找文檔的方式. 由於不是根據文檔來確定文檔所包含的內容 (某個文檔包含哪些字詞), 而是進行反向的操作 (紀錄某個字詞出現在那些文檔中), 因而稱作為 Inverted Index. 下面簡單說明 Inverted Index 的流程: 
 

設計的思路: 
實現 "Inverted Index" 需要關注的資訊有 "單詞", "文檔" 與 "單詞出現頻率". 下面根據 MapReduce 的處理過程給出設計的想法: 

1. Map 過程 
首先使用默認的 TextInputFormat 類別對輸入的文件進行處理, 得到文本中每行的偏移量及其內容. 接著 Map 過程必須分析輸入的 對, 得到 Inverted index 中需要的三個資訊: 單詞, 文檔名稱 與 該詞頻. Map 的輸入與輸出範例如下: 
 

這裡存在兩個問題: 第一, 對只能有兩個值, 在使用 Hadoop 自定義數據類型的情況下, 需要根據情況將其中兩個值合併成一個值, 作為 key 或 value 值; 第二, 通過一個 Reduce 過程無法同時完成詞頻統計與生成文檔列表, 所以必須增加一個 Combine 過程完成詞頻統計. 

這裡讓 單詞 與 對應文檔組成 key 值 (如 "MapReduce:file1.txt") , 並將詞頻作為 value, 這樣的好處是可以利用 MapReduce 框架自帶的 Map 端進行排序, 將同一文檔的相同單詞的詞頻組成列表, 傳遞給 Combine 過程, 實現類似於 WordCount 的功能. 

2. Combine 過程 
經過 Map 方法後, Combine 過程將 key 值相同的 value 值累加, 得到一個單詞在文檔中的詞頻. Combine 過程中的輸入/輸出如下圖所示. 如果將下圖輸出直接作為 Reduce 過程的輸入, 在 Shuffle 過程中將面臨一個問題: 所有具有相同單詞的紀錄 (由 單詞, 文檔名稱 與 詞頻 組成) 應該交由同一個 Reducer 處理, 但當前的 key 值無法保證這一點, 所以必須修改 key 值與 value 值. 這次將單詞作為 key 值, 文檔名稱與詞頻組成 value 值 (如 "file1.txt:1"). 這樣做的好處是可以利用 MapReduce 框架默認的 HashPartitioner 類完成 Shuffle 過程, 將相同的單詞的所有紀錄發送給同一個 Reducer 進行處理
 

3. Reduce 過程 
經過上面兩個過程, Reduce 過程只需將相同的 key 值的 value 值組合成 Inverted Index 文件所需的格式即可, 剩下的事情就交給 MapReduce 框架進行處理. Reduce 過程的輸入/輸出 如下所示: 
 

實作代碼: 
- Map 類別 
  1. package demo.mapr.ii;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.StringTokenizer;  
  5.   
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.Mapper;  
  8. import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
  9.   
  10. public class Map extends Mapper{      
  11.     Text keyInfo = new Text();      // Save Term+FileName  
  12.     Text valueInfo = new Text("1"); // Save Term frequency        
  13.       
  14.     @Override  
  15.     public void map(Object key, Text value, Context context)  
  16.             throws IOException, InterruptedException {        
  17.         StringTokenizer iter = new StringTokenizer(value.toString());  
  18.         while(iter.hasMoreTokens())  
  19.         {  
  20.             String fn = ((FileSplit) context.getInputSplit()).getPath().getName();  
  21.             keyInfo.set(String.format("%s:%s", iter.nextToken(), fn));  
  22.             context.write(keyInfo, valueInfo);  
  23.         }  
  24.     }  
  25. }  
- Combine 類別 
  1. package demo.mapr.ii;  
  2.   
  3. import java.io.IOException;  
  4. import org.apache.hadoop.io.Text;  
  5. import org.apache.hadoop.mapreduce.Reducer;  
  6.   
  7. public class Combine extends Reducer{  
  8.     Text info = new Text();  
  9.       
  10.     @Override  
  11.     public void reduce(Text key, Iterable values, Context context)  
  12.             throws IOException, InterruptedException {  
  13.         // 計算詞頻  
  14.         int sum = 0;  
  15.         for(Text value:values) sum+=Integer.valueOf(value.toString());  
  16.   
  17.         // Term+FileName  
  18.         String items[] = key.toString().split(":");  
  19.           
  20.         // 重新設置 value 值為由 文檔名稱+詞頻  
  21.         info.set(String.format("%s:%d", items[1], sum));  
  22.           
  23.         // 重新設置 key 值為單詞  
  24.         key.set(items[0]);  
  25.         context.write(key, info);  
  26.     }  
  27. }  
- Reduce 類別 
  1. package demo.mapr.ii;  
  2.   
  3. import java.io.IOException;  
  4. import org.apache.hadoop.io.Text;  
  5. import org.apache.hadoop.mapreduce.Reducer;  
  6.   
  7. public class Reduce extends Reducer  
  8. {  
  9.     Text result = new Text();  
  10.       
  11.     @Override  
  12.     public void reduce(Text key, Iterable values, Context context)  
  13.             throws IOException, InterruptedException {  
  14.         StringBuffer fileList = new StringBuffer();  
  15.         for(Text value:values)  
  16.         {  
  17.             fileList.append(String.format("%s;", value));  
  18.         }  
  19.         result.set(fileList.toString());  
  20.         context.write(key, result);  
  21.     }  
  22. }  
- Main 類別 
  1. package demo.mapr.ii;  
  2.   
  3.   
  4. import org.apache.hadoop.conf.Configuration;  
  5. import org.apache.hadoop.fs.Path;  
  6. import org.apache.hadoop.io.IntWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.mapreduce.Job;  
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  11. import org.apache.hadoop.util.GenericOptionsParser;  
  12.   
  13. public class Main {  
  14.   
  15.     /** 
  16.      * @param args 
  17.      */  
  18.     public static void main(String[] args) throws Exception{  
  19.         Configuration conf = new Configuration();             
  20.         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
  21.         if (otherArgs.length != 2) {  
  22.           System.err.println("Usage: demo.mapr.ii.Main ");  
  23.           System.exit(2);  
  24.         }  
  25.         Job job = new Job(conf, "Inverted Index");  
  26.         job.setJarByClass(Main.class);  
  27.           
  28.         // 設置 Map,Combine 與 Reduce class  
  29.         job.setMapperClass(Map.class);  
  30.         job.setCombinerClass(Combine.class);  
  31.         job.setReducerClass(Reduce.class);  
  32.                   
  33.         // 設置 Map 輸出類型  
  34.         job.setMapOutputKeyClass(Text.class);  
  35.         job.setMapOutputValueClass(Text.class);  
  36.           
  37.         // 設置 Reduce 輸出類型  
  38.         job.setOutputKeyClass(Text.class);  
  39.         job.setOutputValueClass(Text.class);  
  40.           
  41.         // 設置輸入與輸出目錄  
  42.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
  43.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
  44.         System.exit(job.waitForCompletion(true)?0:1);  
  45.     }  
  46.   
  47. }  
執行 Map/Reduce: 
首先來製作輸入的測試檔案: 
# 現在在 NameNode
$ echo "MapReduce is simple" > file1.txt
$ echo "MapReduce is powerful is simple" > file2.txt
$ echo "Hello MapReduce bye MapReduce" > file3.txt

建立 輸入/輸出 目錄並將測試檔案放到 /index_in 路徑裡: 
$ hadoop fs -mkdir /index_in
$ hadoop fs -rmr /index_out # 移除舊有結果
$ hadoop fs -put file1.txt file2.txt file3.txt /index_in
$ hadoop fs -ls /index_in
Found 3 items
-rw-r--r-- 1 john supergroup 20 2013-11-21 00:45 /index_in/file1.txt
-rw-r--r-- 1 john supergroup 32 2013-11-21 00:45 /index_in/file2.txt
-rw-r--r-- 1 john supergroup 30 2013-11-21 00:45 /index_in/file3.txt

接著便是來執行 MapReduce 並檢視結果: 
$ hadoop jar MRTest.jar demo.mapr.ii.Main /index_in /index_out # 前面撰寫的 Map/Combine/Reduce 被包裝在 MRTest.jar
...
13/11/21 01:01:37 INFO mapred.JobClient: map 0% reduce 0%
13/11/21 01:01:44 INFO mapred.JobClient: map 33% reduce 0%
...

$ hadoop fs -ls /index_out # 檢視輸出檔案
Found 2 items
drwxr-xr-x - john supergroup 0 2013-11-21 01:01 /index_out/_logs
-rw-r--r-- 1 john supergroup 165 2013-11-21 01:01 /index_out/part-r-00000
 # 輸出檔案
$ hadoop fs -cat /index_out/part-r-00000 # 檢視處理結果
Hello file3.txt:1;
MapReduce file3.txt:2;file2.txt:1;file1.txt:1;
bye file3.txt:1;
is file2.txt:2;file1.txt:1;
powerful file2.txt:1;
simple file2.txt:1;file1.txt:1;


Supplement: 
Hadoop: How to get file path of the input record being read in mapper? 
How-to: Include Third-Party Libraries in Your MapReduce Job

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...