2013年11月21日 星期四

[ 深入雲計算 ] 初識 MapReduce : 實例之 數據排序

Preface:
另一個常見的 MapReduce 的應用是對數據進行排序, 如學生的成績單評比, 數據建立索引等.

輸入數據描述:
在開始 MapReduce 編寫程序前, 一定要先了解輸入數據的格式. 這個範例的輸入文件中每行均為一個數字. 要求在輸出結果中進行排序, 輸出的每一行有兩個數字並以 Tab 鍵隔開. 第一個數字代表第二個數字在原始數據中排序的位置. 範例輸入如下:
- sort1.txt 內容
2
32
654
32
15
756
65223

- sort2.txt 內容
5956
22
650
92

- sort3.txt 內容
26
54
6

設計思路:
這個範例僅僅要求對輸入數據進行排序. 熟悉 MapReduce 過程的你很快能想到在 MapReduce 過程中就有排序的行為發生, 是否可以利用這個默認的排序, 而不需要自己去實現排序的代碼呢? 答案是肯定的. 但是在使用前要了解它的排序規則. 它是按照 key 值進行排序, 如果 key 是封裝成 int 的 IntWritable 類型, 那麼 MapReduce 按照數據大小對 key 排序 (預設排序行為是昇序-由小到大); 如果 key 封裝為 String 的 Text 類型, 那麼 MapReduce 會按照字典順序進行排序.

了解這個細節, 就應該知道要使用 IntWritable 類別封裝 key 值, 並將讀入的數據置放於 key 值中. reduce 拿到 之後, 再將輸入的 key 作為 value 輸出, 並使用一個全局變數 linenum紀錄當前的 key 值在原始數據中的排序位置.

程序代碼:
- Map 類別
  1. package demo.mapr.sort;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. public class Map extends Mapper{  
  6.     final static IntWritable one = new IntWritable(1);  
  7.     IntWritable data = new IntWritable();  
  8.       
  9.     @Override  
  10.     public void map(Object key, Text value, Context context)  
  11.             throws IOException, InterruptedException {  
  12.         data.set(Integer.valueOf(value.toString().trim()));  
  13.         context.write(data, one);  
  14.     }  
  15. }  
- Reduce 類別
  1. package demo.mapr.sort;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. public class Reduce extends Reducer{  
  6.     static IntWritable linenum = new IntWritable(1);  
  7.     @Override  
  8.     public void reduce(IntWritable key, Iterable values, Context context)  
  9.             throws IOException, InterruptedException {  
  10.         context.write(linenum, key);  
  11.         linenum.set(linenum.get()+1);  
  12.     }  
  13. }  
- Main 類別
  1. package demo.mapr.sort;  
  2.   
  3. import org.apache.hadoop.conf.Configuration;  
  4.   
  5.   
  6. public class Main {  
  7.     public static void main(String[] args) throws Exception{  
  8.         Configuration conf = new Configuration();             
  9.         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
  10.         if (otherArgs.length != 2) {  
  11.           System.err.println("Usage: demo.mapr.sort.Main ");  
  12.           System.exit(2);  
  13.         }  
  14.         Job job = new Job(conf, "Sort");  
  15.         job.setJarByClass(Main.class);  
  16.           
  17.         // 設置 Map 與 Reduce class  
  18.         job.setMapperClass(Map.class);  
  19.         job.setReducerClass(Reduce.class);  
  20.                   
  21.         // 設置 Map 輸出類型  
  22.         job.setMapOutputKeyClass(IntWritable.class);  
  23.         job.setMapOutputValueClass(IntWritable.class);  
  24.           
  25.         // 設置 Reduce 輸出類型  
  26.         job.setOutputKeyClass(IntWritable.class);  
  27.         job.setOutputValueClass(IntWritable.class);  
  28.           
  29.         // 設置輸入與輸出目錄  
  30.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
  31.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
  32.         System.exit(job.waitForCompletion(true)?0:1);  
  33.   
  34.     }  
  35. }  
執行結果:
$ hadoop fs -mkdir /sort_in # 建立輸入目錄
$ hadoop fs -rmr /sort_out # 移除舊有結果, 如果有的話
$ hadoop fs -put sort1.txt sort2.txt sort3.txt /sort_in # 將 sorting files 從 local file system 放入 HDFS
$ hadoop jar MRTest.jar demo.mapr.sort.Main /sort_in /sort_out # 執行 MapReduce
$ hadoop fs -ls /sort_out # 檢視輸出
$ hadoop fs -cat /sort_out/part-r-00000
1 2
2 6
3 15
4 22
5 26
6 32
7 54
8 92
9 650
10 654
11 756
12 5956
13 65223


沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...