Preface:
另一個常見的 MapReduce 的應用是對數據進行排序, 如學生的成績單評比, 數據建立索引等.
輸入數據描述:
在開始 MapReduce 編寫程序前, 一定要先了解輸入數據的格式. 這個範例的輸入文件中每行均為一個數字. 要求在輸出結果中進行排序, 輸出的每一行有兩個數字並以 Tab 鍵隔開. 第一個數字代表第二個數字在原始數據中排序的位置. 範例輸入如下:
- sort1.txt 內容
- sort2.txt 內容
- sort3.txt 內容
設計思路:
這個範例僅僅要求對輸入數據進行排序. 熟悉 MapReduce 過程的你很快能想到在 MapReduce 過程中就有排序的行為發生, 是否可以利用這個默認的排序, 而不需要自己去實現排序的代碼呢? 答案是肯定的. 但是在使用前要了解它的排序規則. 它是按照 key 值進行排序, 如果 key 是封裝成 int 的 IntWritable 類型, 那麼 MapReduce 按照數據大小對 key 排序 (預設排序行為是昇序-由小到大); 如果 key 封裝為 String 的 Text 類型, 那麼 MapReduce 會按照字典順序進行排序.
了解這個細節, 就應該知道要使用 IntWritable 類別封裝 key 值, 並將讀入的數據置放於 key 值中. reduce 拿到 之後, 再將輸入的 key 作為 value 輸出, 並使用一個全局變數 linenum紀錄當前的 key 值在原始數據中的排序位置.
程序代碼:
- Map 類別
- Reduce 類別
- Main 類別
執行結果:
另一個常見的 MapReduce 的應用是對數據進行排序, 如學生的成績單評比, 數據建立索引等.
輸入數據描述:
在開始 MapReduce 編寫程序前, 一定要先了解輸入數據的格式. 這個範例的輸入文件中每行均為一個數字. 要求在輸出結果中進行排序, 輸出的每一行有兩個數字並以 Tab 鍵隔開. 第一個數字代表第二個數字在原始數據中排序的位置. 範例輸入如下:
- sort1.txt 內容
- sort2.txt 內容
- sort3.txt 內容
設計思路:
這個範例僅僅要求對輸入數據進行排序. 熟悉 MapReduce 過程的你很快能想到在 MapReduce 過程中就有排序的行為發生, 是否可以利用這個默認的排序, 而不需要自己去實現排序的代碼呢? 答案是肯定的. 但是在使用前要了解它的排序規則. 它是按照 key 值進行排序, 如果 key 是封裝成 int 的 IntWritable 類型, 那麼 MapReduce 按照數據大小對 key 排序 (預設排序行為是昇序-由小到大); 如果 key 封裝為 String 的 Text 類型, 那麼 MapReduce 會按照字典順序進行排序.
了解這個細節, 就應該知道要使用 IntWritable 類別封裝 key 值, 並將讀入的數據置放於 key 值中. reduce 拿到
程序代碼:
- Map 類別
- package demo.mapr.sort;
- import java.io.IOException;
- public class Map extends Mapper
- final static IntWritable one = new IntWritable(1);
- IntWritable data = new IntWritable();
- @Override
- public void map(Object key, Text value, Context context)
- throws IOException, InterruptedException {
- data.set(Integer.valueOf(value.toString().trim()));
- context.write(data, one);
- }
- }
- package demo.mapr.sort;
- import java.io.IOException;
- public class Reduce extends Reducer
{ - static IntWritable linenum = new IntWritable(1);
- @Override
- public void reduce(IntWritable key, Iterable
values, Context context) - throws IOException, InterruptedException {
- context.write(linenum, key);
- linenum.set(linenum.get()+1);
- }
- }
- package demo.mapr.sort;
- import org.apache.hadoop.conf.Configuration;
- public class Main {
- public static void main(String[] args) throws Exception{
- Configuration conf = new Configuration();
- String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- if (otherArgs.length != 2) {
- System.err.println("Usage: demo.mapr.sort.Main
" - System.exit(2);
- }
- Job job = new Job(conf, "Sort");
- job.setJarByClass(Main.class);
- // 設置 Map 與 Reduce class
- job.setMapperClass(Map.class);
- job.setReducerClass(Reduce.class);
- // 設置 Map 輸出類型
- job.setMapOutputKeyClass(IntWritable.class);
- job.setMapOutputValueClass(IntWritable.class);
- // 設置 Reduce 輸出類型
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(IntWritable.class);
- // 設置輸入與輸出目錄
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
- System.exit(job.waitForCompletion(true)?0:1);
- }
- }
沒有留言:
張貼留言