這裡要介紹基於 MapReduce 分布編程模型下的簡單 Inverted Index 的建立過程, Inverted Index 是 IR 系統中常見的數據結構, 被廣泛的應用於全文搜索引擎. 它主要用來存儲某個單詞 (或詞組) 在一個文檔或一組文檔中的存儲位置的映射, 即提供一種根據內容來查找文檔的方式. 由於不是根據文檔來確定文檔所包含的內容 (某個文檔包含哪些字詞), 而是進行反向的操作 (紀錄某個字詞出現在那些文檔中), 因而稱作為 Inverted Index. 下面簡單說明 Inverted Index 的流程:
設計的思路:
實現 "Inverted Index" 需要關注的資訊有 "單詞", "文檔" 與 "單詞出現頻率". 下面根據 MapReduce 的處理過程給出設計的想法:
1. Map 過程
首先使用默認的 TextInputFormat 類別對輸入的文件進行處理, 得到文本中每行的偏移量及其內容. 接著 Map 過程必須分析輸入的
這裡存在兩個問題: 第一,
這裡讓 單詞 與 對應文檔組成 key 值 (如 "MapReduce:file1.txt") , 並將詞頻作為 value, 這樣的好處是可以利用 MapReduce 框架自帶的 Map 端進行排序, 將同一文檔的相同單詞的詞頻組成列表, 傳遞給 Combine 過程, 實現類似於 WordCount 的功能.
2. Combine 過程
經過 Map 方法後, Combine 過程將 key 值相同的 value 值累加, 得到一個單詞在文檔中的詞頻. Combine 過程中的輸入/輸出如下圖所示. 如果將下圖輸出直接作為 Reduce 過程的輸入, 在 Shuffle 過程中將面臨一個問題: 所有具有相同單詞的紀錄 (由 單詞, 文檔名稱 與 詞頻 組成) 應該交由同一個 Reducer 處理, 但當前的 key 值無法保證這一點, 所以必須修改 key 值與 value 值. 這次將單詞作為 key 值, 文檔名稱與詞頻組成 value 值 (如 "file1.txt:1"). 這樣做的好處是可以利用 MapReduce 框架默認的 HashPartitioner 類完成 Shuffle 過程, 將相同的單詞的所有紀錄發送給同一個 Reducer 進行處理:
3. Reduce 過程
經過上面兩個過程, Reduce 過程只需將相同的 key 值的 value 值組合成 Inverted Index 文件所需的格式即可, 剩下的事情就交給 MapReduce 框架進行處理. Reduce 過程的輸入/輸出 如下所示:
實作代碼:
- Map 類別
- package demo.mapr.ii;
- import java.io.IOException;
- import java.util.StringTokenizer;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- public class Map extends Mapper
- Text keyInfo = new Text(); // Save Term+FileName
- Text valueInfo = new Text("1"); // Save Term frequency
- @Override
- public void map(Object key, Text value, Context context)
- throws IOException, InterruptedException {
- StringTokenizer iter = new StringTokenizer(value.toString());
- while(iter.hasMoreTokens())
- {
- String fn = ((FileSplit) context.getInputSplit()).getPath().getName();
- keyInfo.set(String.format("%s:%s", iter.nextToken(), fn));
- context.write(keyInfo, valueInfo);
- }
- }
- }
- package demo.mapr.ii;
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- public class Combine extends Reducer
{ - Text info = new Text();
- @Override
- public void reduce(Text key, Iterable
values, Context context) - throws IOException, InterruptedException {
- // 計算詞頻
- int sum = 0;
- for(Text value:values) sum+=Integer.valueOf(value.toString());
- // Term+FileName
- String items[] = key.toString().split(":");
- // 重新設置 value 值為由 文檔名稱+詞頻
- info.set(String.format("%s:%d", items[1], sum));
- // 重新設置 key 值為單詞
- key.set(items[0]);
- context.write(key, info);
- }
- }
- package demo.mapr.ii;
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- public class Reduce extends Reducer
- {
- Text result = new Text();
- @Override
- public void reduce(Text key, Iterable
values, Context context) - throws IOException, InterruptedException {
- StringBuffer fileList = new StringBuffer();
- for(Text value:values)
- {
- fileList.append(String.format("%s;", value));
- }
- result.set(fileList.toString());
- context.write(key, result);
- }
- }
- package demo.mapr.ii;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
- public class Main {
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception{
- Configuration conf = new Configuration();
- String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- if (otherArgs.length != 2) {
- System.err.println("Usage: demo.mapr.ii.Main
" - System.exit(2);
- }
- Job job = new Job(conf, "Inverted Index");
- job.setJarByClass(Main.class);
- // 設置 Map,Combine 與 Reduce class
- job.setMapperClass(Map.class);
- job.setCombinerClass(Combine.class);
- job.setReducerClass(Reduce.class);
- // 設置 Map 輸出類型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- // 設置 Reduce 輸出類型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- // 設置輸入與輸出目錄
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
- System.exit(job.waitForCompletion(true)?0:1);
- }
- }
首先來製作輸入的測試檔案:
建立 輸入/輸出 目錄並將測試檔案放到 /index_in 路徑裡:
接著便是來執行 MapReduce 並檢視結果:
Supplement:
* Hadoop: How to get file path of the input record being read in mapper?
* How-to: Include Third-Party Libraries in Your MapReduce Job
沒有留言:
張貼留言