2014年12月15日 星期一

[ In Action ] Ch5. Advanced MapReduce (Part1)

Preface 
This chapter covers 
■ Chaining multiple MapReduce jobs
■ Performing joins of multiple data sets
■ Creating Bloom filters

As your data processing becomes more complex you’ll want to exploit different Hadoop features. This chapter will focus on some of these more advanced techniques. When handling advanced data processing, you’ll often find that you can’t program the process into a single MapReduce job. Hadoop supports chaining MapReduce programs together to form a bigger job. 

You’ll also find that advanced data processing often involves more than one data set. We’ll explore various joining techniques in Hadoop for simultaneously processing multiple data sets. You can code certain data processing tasks more efficiently when processing a group of records at a time. 

We’ve seen how Streaming natively supports the ability to process a whole split at a time, and the Streaming implementation of the maximum function takes advantage of this ability. We’ll see that the same is true for Java programs. We’ll discover the Bloom filter and implement it with a mapper that keeps state information across records. 

Chaining MapReduce jobs 
You’ve been doing data processing tasks which a single MapReduce job can accomplish. As you get more comfortable writing MapReduce programs and take on more ambitious data processing tasks, you’ll find that many complex tasks need to be broken down into simpler subtasks, each accomplished by an individual MapReduce job. For example, from the citation data set you may be interested in finding the ten most-cited patents. A sequence of two MapReduce jobs can do this. The first one creates the “inverted” citation data set and counts the number of citations for each patent, and the second job finds the top ten in that “inverted” data. 

Chaining MapReduce jobs in a sequence 
Though you can execute the two jobs manually one after the other, it’s more convenient to automate the execution sequence. You can chain MapReduce jobs to run sequentially, with the output of one MapReduce job being the input to the next. Chaining MapReduce jobs is analogous to Unix pipes . 
mapreduce-1 | mapreduce-2 | mapreduce-3 | ...

Chaining MapReduce jobs sequentially is quite straightforward. The driver at each job will have to create a new Job object and set its input path to be the output path of the previous job. You can delete the intermediate data generated at each step of the chain at the end. 

Chaining MapReduce jobs with complex dependency 
Sometimes the subtasks of a complex data processing task don’t run sequentially, and their MapReduce jobs are therefore not chained in a linear fashion. For example, mapreduce1 may process one data set while mapreduce2 independently processes another data set. The third job, mapreduce3, performs an inner join of the first two jobs’ output. (We’ll discuss data joining in the next sections.) It’s dependent on the other two and can execute only after both mapreduce1 and mapreduce2 are completed. But mapreduce1 and mapreduce2 aren’t dependent on each other. 

Hadoop has a mechanism to simplify the management of such (nonlinear) job dependencies via the JobControl class. A Job object is a representation of a MapReduce job while the JobControl class encapsulates a set of MapReduce jobs and its dependency: 
1. Create JobControl - Implements java.lang.Runnable, will need to execute within a Thread
2. For each Job in the workflow Construct ControlledJob which wrap the Job instance.
3. Add each ControlledJob to JobControl.
4. Execute JobControl in a Thread.
5. Wait for JobControl to complete and report results - Clean up in case failure

So if we want to search top 10 patent with most citing count, we will have two MapReduce jobs: 
1. Invert the citing data to "CITED PATENT\tCITING COUNT" - Job1
2. Sorting the output of Job1 with "CITING COUNT" in descending order and output the top 10 records "CITED PATENT LIST\tCITING COUNT". -Job2

The first job will invert key/value of input data to "CITED PATENT\tCITING COUNT". 
- Mapper of Job1 
  1. package ch5;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.mapreduce.Mapper;  
  7.   
  8. public class InvMapClass extends Mapper {  
  9.     @Override  
  10.     public void map(Text key, Text value, Context context)   
  11.             throws IOException, InterruptedException {  
  12.         // Input: "CITING","CITED"   
  13.         // Output: "CITED", "CITING"  
  14.         context.write(value, key);  
  15.     }  
  16. }  
- Reducer of Job1 
  1. package ch5;  
  2.   
  3. import java.io.IOException;  
  4. import org.apache.hadoop.io.Text;  
  5. import org.apache.hadoop.mapreduce.Reducer;  
  6.   
  7. public class InvReduce extends Reducer   
  8. {  
  9.     @Override  
  10.     public void reduce(Text key, Iterable values, Context context)  
  11.             throws IOException, InterruptedException   
  12.     {  
  13.         // "CITED", "CITING COUNT"  
  14.         int count=0;  
  15.         for(Text citing:values)   
  16.         {  
  17.             count++;  
  18.         }  
  19.         context.write(key, new Text(String.valueOf(count)));  
  20.     }  
  21. }  
The job2 will sorting the result of job1. However the default sorting order of Hadoop is ascending while we want descending order. So we implement our sorting comparator by extending WritableComparator class: 
- Descending IntWritable Comparator 
  1. package ch5;  
  2.   
  3. import org.apache.hadoop.io.IntWritable;  
  4. import org.apache.hadoop.io.WritableComparable;  
  5. import org.apache.hadoop.io.WritableComparator;  
  6.   
  7. public class DescendingKeyComparator extends WritableComparator {  
  8.     protected DescendingKeyComparator() {  
  9.         super(IntWritable.classtrue);  
  10.     }  
  11.   
  12.     @SuppressWarnings("rawtypes")  
  13.     @Override  
  14.     public int compare(WritableComparable w1, WritableComparable w2) {  
  15.         IntWritable key1 = (IntWritable) w1;  
  16.         IntWritable key2 = (IntWritable) w2;  
  17.         return -1 * key1.compareTo(key2);  
  18.     }  
  19. }  
Then we can setup the comparator in ranking Job object through API:setSortComparatorClass(). Below is the mapper/reducer of Job2: 
- Mapper of job2 
  1. package ch5;  
  2.   
  3. import java.io.IOException;  
  4. import org.apache.hadoop.io.IntWritable;  
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.mapreduce.Mapper;  
  7.   
  8. public class RankMapClass extends Mapper   
  9. {  
  10.     @Override         
  11.     public void map(Text key, Text value, Context context)   
  12.             throws IOException, InterruptedException   
  13.     {  
  14.         // "CITING COUNT","CITED"   
  15.         context.write(new IntWritable(Integer.valueOf(value.toString())), key);  
  16.     }  
  17. }  
- Reducer of job2 
  1. package ch5;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.ArrayList;  
  5. import java.util.List;  
  6.   
  7. import org.apache.hadoop.io.IntWritable;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapreduce.Reducer;  
  10. import org.apache.hadoop.util.StringUtils;  
  11.   
  12. public class RankReduce extends Reducer  
  13. {  
  14.     boolean     reachTopN=false;  
  15.     int         count=0;  
  16.     int         topN=10;  
  17.     @Override  
  18.     public void reduce(IntWritable key, Iterable values, Context context)  
  19.             throws IOException, InterruptedException   
  20.     {  
  21.         if(!reachTopN)  
  22.         {  
  23.             List ptnList = new ArrayList();  
  24.             for(Text pid:values) ptnList.add(pid.toString());                 
  25.             count+=ptnList.size();  
  26.             context.write(new Text(StringUtils.join(",", ptnList)), key);  
  27.             if(count>=topN) reachTopN=true;  
  28.         }  
  29.     }  
  30. }  
Finally is the driver class: 
- Driver class to run job1/job2 
  1. package ch5;  
  2.   
  3. import java.util.ArrayList;  
  4. import java.util.List;  
  5.   
  6. import org.apache.hadoop.conf.Configuration;  
  7. import org.apache.hadoop.conf.Configured;  
  8. import org.apache.hadoop.fs.FileSystem;  
  9. import org.apache.hadoop.fs.Path;  
  10. import org.apache.hadoop.io.IntWritable;  
  11. import org.apache.hadoop.io.Text;  
  12. import org.apache.hadoop.mapreduce.Job;  
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  14. import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;  
  15. import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;  
  16. import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;  
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  18. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  19. import org.apache.hadoop.util.Tool;  
  20. import org.apache.hadoop.util.ToolRunner;  
  21.   
  22.   
  23. public class TopNCiting extends Configured implements Tool{       
  24.     @Override  
  25.     public int run(String[] args) throws Exception {  
  26.         JobControl jobControl = new JobControl("Test");  
  27.         String intermediateTmpDir = String.format("/user/training/%s-tmp", getClass().getSimpleName());  
  28.         Path intermediatePath = new Path(intermediateTmpDir);  
  29.         Path in = new Path(args[0]);  
  30.         Path out = new Path(args[1]);  
  31.         FileSystem fs = FileSystem.get(getConf());        
  32.         fs.deleteOnExit(intermediatePath);  
  33.         //fs.mkdirs(intermediatePath);  
  34.                       
  35.         Job invJob = new Job(getConf());          
  36.         FileInputFormat.setInputPaths(invJob, in);  
  37.         FileOutputFormat.setOutputPath(invJob, intermediatePath);  
  38.         invJob.setJarByClass(TopNCiting.class);  
  39.         invJob.setMapperClass(InvMapClass.class);  
  40.         invJob.setReducerClass(InvReduce.class);  
  41.         invJob.setInputFormatClass(KeyValueTextInputFormat.class);  
  42.         invJob.setOutputFormatClass(TextOutputFormat.class);  
  43.         invJob.setMapOutputKeyClass(Text.class);  
  44.         invJob.setMapOutputValueClass(Text.class);  
  45.         invJob.setOutputKeyClass(Text.class);  
  46.         invJob.setOutputValueClass(Text.class);  
  47.         invJob.getConfiguration().set("key.value.separator.in.input.line"",");  
  48.         ControlledJob step1 = new ControlledJob(invJob, null);  
  49.           
  50.           
  51.         Job rankJob = new Job(getConf());  
  52.         FileInputFormat.setInputPaths(rankJob, intermediatePath);  
  53.         FileOutputFormat.setOutputPath(rankJob, out);  
  54.         rankJob.setJarByClass(TopNCiting.class);  
  55.         rankJob.setMapperClass(RankMapClass.class);  
  56.         rankJob.setReducerClass(RankReduce.class);  
  57.         rankJob.setInputFormatClass(KeyValueTextInputFormat.class);  
  58.         rankJob.setOutputFormatClass(TextOutputFormat.class);  
  59.         rankJob.setMapOutputKeyClass(IntWritable.class);  
  60.         rankJob.setMapOutputValueClass(Text.class);  
  61.         rankJob.setOutputKeyClass(Text.class);  
  62.         rankJob.setOutputValueClass(IntWritable.class);       
  63.         rankJob.setSortComparatorClass(DescendingKeyComparator.class);  
  64.         //rankJob.getConfiguration().set("key.value.separator.in.input.line", "\t");  
  65.         ArrayList dependencies = new ArrayList();  
  66.         dependencies.add(step1);  
  67.         ControlledJob step2 = new ControlledJob(rankJob, dependencies);  
  68.           
  69.         jobControl.addJob(step1);  
  70.         jobControl.addJob(step2);  
  71.           
  72.         Thread workThread = new Thread(jobControl, "WorkThread");  
  73.         workThread.setDaemon(true);  
  74.         workThread.start();  
  75.   
  76.         while(!jobControl.allFinished()){Thread.sleep(500);}  
  77.         List failedList = jobControl.getFailedJobList();  
  78.         if(failedList.size()>0)  
  79.         {  
  80.             System.err.printf("\t[Error] Job fail (%d):\n", failedList.size());  
  81.             for(ControlledJob job:failedList){System.err.printf("\t%s: %s\n", job.getJobName(), job.getMessage());}  
  82.             System.out.println();  
  83.             return 1;  
  84.         }  
  85.         else  
  86.         {  
  87.             System.out.printf("\t[Info] Success! Workflow completed %d jobs.\n", jobControl.getSuccessfulJobList().size());  
  88.             return 0;  
  89.         }  
  90.     }  
  91.   
  92.     public static void main(String[] args) throws Exception {  
  93.         int res = ToolRunner.run(new Configuration(), new TopNCiting(), args);  
  94.         System.exit(res);  
  95.     }  
  96. }  
Below script will execute the MapReduce jobs: 
- chainJobs.sh 
  1. #!/bin/sh  
  2. hadoop fs -rm -r output  
  3. hadoop fs -rm -r TopNCiting-tmp  
  4. hadoop jar HadoopIA.jar ch5.TopNCiting IA/cite_100000.txt output  
  5. hadoop fs -cat output/* | less  
Chaining preprocessing and postprocessing steps 
A lot of data processing tasks involve record-oriented preprocessing and postprocessing. For example, in processing documents for information retrieval , you may have one step to remove stop words , and another step for stemming. You can write a separate MapReduce job for each of these pre- and postprocessing steps and chain them together, using IdentityReducer (or no reducer at all) for these steps. This approach is inefficient as each step in the chain takes up I/O and storage to process the intermediate results. Another approach is for you to write your mapper such that it calls all the preprocessing steps beforehand and the reducer to call all the postprocessing steps afterward. This forces you to architect the pre- and postprocessing steps in a modular and composable manner. Hadoop introduced the ChainMapper and the ChainReducer classes in version 0.19.0 to simplify the composition of pre- and postprocessing. 

You can think of chaining MapReduce jobs, as explained in section 5.1.1, symbolically using the pseudo-regular expression: 
[MAP | REDUCE]+

where a reducer REDUCE comes after a mapper MAP, and this [MAP | REDUCE] sequence can repeat itself one or more times, one right after another. The analogous expression for a job using ChainMapper and ChainReducer would be: 
MAP+ | REDUCE | MAP*

The job runs multiple mappers in sequence to preprocess the data, and after running reduce it can optionally run multiple mappers in sequence to postprocess the data. The beauty of this mechanism is that you write the pre- and postprocessing steps as standard mappers. You can run each one of them individually if you want. (This is useful when you want to debug them individually.) You call the addMapper() method in ChainMapper and ChainReducer to compose the pre- and postprocessing steps, respectively. Running all the pre- and postprocessing steps in a single job leaves no intermediate file and there’s a dramatic reduction in I/O.

Consider the example where there are four mappers (Map1, Map2, Map3, and Map4) and one reducer (Reduce), and they’re chained into a single MapReduce job in this sequence: 
Map1 | Map2 | Reduce | Map3 | Map4

You should think of Map2 and Reduce as the core of the MapReduce job, with the standard partitioning and shuffling applied between the mapper and reducer. You should consider Map1 as a preprocessing step and Map3 and Map4 as postprocessing steps. The number of processing steps can vary. 

You can specify the composition of this sequence of mappers and reducer with the driver. See listing 5.1. You need to make sure the key and value outputs of one task have matching types (classes) with the inputs of the next task. 

Supplement 
slideshare - Hadoop Tutorial: Map-Reduce Part 8 -- MapReduce Workflows 
How to implement customized sort in Hadoop?

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...