This chapter covers
As your data processing becomes more complex you’ll want to exploit different Hadoop features. This chapter will focus on some of these more advanced techniques. When handling advanced data processing, you’ll often find that you can’t program the process into a single MapReduce job. Hadoop supports chaining MapReduce programs together to form a bigger job.
You’ll also find that advanced data processing often involves more than one data set. We’ll explore various joining techniques in Hadoop for simultaneously processing multiple data sets. You can code certain data processing tasks more efficiently when processing a group of records at a time.
We’ve seen how Streaming natively supports the ability to process a whole split at a time, and the Streaming implementation of the maximum function takes advantage of this ability. We’ll see that the same is true for Java programs. We’ll discover the Bloom filter and implement it with a mapper that keeps state information across records.
Chaining MapReduce jobs
You’ve been doing data processing tasks which a single MapReduce job can accomplish. As you get more comfortable writing MapReduce programs and take on more ambitious data processing tasks, you’ll find that many complex tasks need to be broken down into simpler subtasks, each accomplished by an individual MapReduce job. For example, from the citation data set you may be interested in finding the ten most-cited patents. A sequence of two MapReduce jobs can do this. The first one creates the “inverted” citation data set and counts the number of citations for each patent, and the second job finds the top ten in that “inverted” data.
Chaining MapReduce jobs in a sequence
Though you can execute the two jobs manually one after the other, it’s more convenient to automate the execution sequence. You can chain MapReduce jobs to run sequentially, with the output of one MapReduce job being the input to the next. Chaining MapReduce jobs is analogous to Unix pipes .
Chaining MapReduce jobs sequentially is quite straightforward. The driver at each job will have to create a new Job object and set its input path to be the output path of the previous job. You can delete the intermediate data generated at each step of the chain at the end.
Chaining MapReduce jobs with complex dependency
Sometimes the subtasks of a complex data processing task don’t run sequentially, and their MapReduce jobs are therefore not chained in a linear fashion. For example, mapreduce1 may process one data set while mapreduce2 independently processes another data set. The third job, mapreduce3, performs an inner join of the first two jobs’ output. (We’ll discuss data joining in the next sections.) It’s dependent on the other two and can execute only after both mapreduce1 and mapreduce2 are completed. But mapreduce1 and mapreduce2 aren’t dependent on each other.
Hadoop has a mechanism to simplify the management of such (nonlinear) job dependencies via the JobControl class. A Job object is a representation of a MapReduce job while the JobControl class encapsulates a set of MapReduce jobs and its dependency:
So if we want to search top 10 patent with most citing count, we will have two MapReduce jobs:
The first job will invert key/value of input data to "CITED PATENT\tCITING COUNT".
- Mapper of Job1
- package ch5;
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- public class InvMapClass extends Mapper
{ - @Override
- public void map(Text key, Text value, Context context)
- throws IOException, InterruptedException {
- // Input: "CITING","CITED"
- // Output: "CITED", "CITING"
- context.write(value, key);
- }
- }
- package ch5;
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- public class InvReduce extends Reducer
- {
- @Override
- public void reduce(Text key, Iterable
values, Context context) - throws IOException, InterruptedException
- {
- // "CITED", "CITING COUNT"
- int count=0;
- for(Text citing:values)
- {
- count++;
- }
- context.write(key, new Text(String.valueOf(count)));
- }
- }
- Descending IntWritable Comparator
- package ch5;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.io.WritableComparator;
- public class DescendingKeyComparator extends WritableComparator {
- protected DescendingKeyComparator() {
- super(IntWritable.class, true);
- }
- @SuppressWarnings("rawtypes")
- @Override
- public int compare(WritableComparable w1, WritableComparable w2) {
- IntWritable key1 = (IntWritable) w1;
- IntWritable key2 = (IntWritable) w2;
- return -1 * key1.compareTo(key2);
- }
- }
- Mapper of job2
- package ch5;
- import java.io.IOException;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- public class RankMapClass extends Mapper
- {
- @Override
- public void map(Text key, Text value, Context context)
- throws IOException, InterruptedException
- {
- // "CITING COUNT","CITED"
- context.write(new IntWritable(Integer.valueOf(value.toString())), key);
- }
- }
- package ch5;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.util.StringUtils;
- public class RankReduce extends Reducer
- {
- boolean reachTopN=false;
- int count=0;
- int topN=10;
- @Override
- public void reduce(IntWritable key, Iterable
values, Context context) - throws IOException, InterruptedException
- {
- if(!reachTopN)
- {
- List
ptnList = new ArrayList (); - for(Text pid:values) ptnList.add(pid.toString());
- count+=ptnList.size();
- context.write(new Text(StringUtils.join(",", ptnList)), key);
- if(count>=topN) reachTopN=true;
- }
- }
- }
- Driver class to run job1/job2
- package ch5;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
- import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
- import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- public class TopNCiting extends Configured implements Tool{
- @Override
- public int run(String[] args) throws Exception {
- JobControl jobControl = new JobControl("Test");
- String intermediateTmpDir = String.format("/user/training/%s-tmp", getClass().getSimpleName());
- Path intermediatePath = new Path(intermediateTmpDir);
- Path in = new Path(args[0]);
- Path out = new Path(args[1]);
- FileSystem fs = FileSystem.get(getConf());
- fs.deleteOnExit(intermediatePath);
- //fs.mkdirs(intermediatePath);
- Job invJob = new Job(getConf());
- FileInputFormat.setInputPaths(invJob, in);
- FileOutputFormat.setOutputPath(invJob, intermediatePath);
- invJob.setJarByClass(TopNCiting.class);
- invJob.setMapperClass(InvMapClass.class);
- invJob.setReducerClass(InvReduce.class);
- invJob.setInputFormatClass(KeyValueTextInputFormat.class);
- invJob.setOutputFormatClass(TextOutputFormat.class);
- invJob.setMapOutputKeyClass(Text.class);
- invJob.setMapOutputValueClass(Text.class);
- invJob.setOutputKeyClass(Text.class);
- invJob.setOutputValueClass(Text.class);
- invJob.getConfiguration().set("key.value.separator.in.input.line", ",");
- ControlledJob step1 = new ControlledJob(invJob, null);
- Job rankJob = new Job(getConf());
- FileInputFormat.setInputPaths(rankJob, intermediatePath);
- FileOutputFormat.setOutputPath(rankJob, out);
- rankJob.setJarByClass(TopNCiting.class);
- rankJob.setMapperClass(RankMapClass.class);
- rankJob.setReducerClass(RankReduce.class);
- rankJob.setInputFormatClass(KeyValueTextInputFormat.class);
- rankJob.setOutputFormatClass(TextOutputFormat.class);
- rankJob.setMapOutputKeyClass(IntWritable.class);
- rankJob.setMapOutputValueClass(Text.class);
- rankJob.setOutputKeyClass(Text.class);
- rankJob.setOutputValueClass(IntWritable.class);
- rankJob.setSortComparatorClass(DescendingKeyComparator.class);
- //rankJob.getConfiguration().set("key.value.separator.in.input.line", "\t");
- ArrayList
dependencies = new ArrayList (); - dependencies.add(step1);
- ControlledJob step2 = new ControlledJob(rankJob, dependencies);
- jobControl.addJob(step1);
- jobControl.addJob(step2);
- Thread workThread = new Thread(jobControl, "WorkThread");
- workThread.setDaemon(true);
- workThread.start();
- while(!jobControl.allFinished()){Thread.sleep(500);}
- List
failedList = jobControl.getFailedJobList(); - if(failedList.size()>0)
- {
- System.err.printf("\t[Error] Job fail (%d):\n", failedList.size());
- for(ControlledJob job:failedList){System.err.printf("\t%s: %s\n", job.getJobName(), job.getMessage());}
- System.out.println();
- return 1;
- }
- else
- {
- System.out.printf("\t[Info] Success! Workflow completed %d jobs.\n", jobControl.getSuccessfulJobList().size());
- return 0;
- }
- }
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new TopNCiting(), args);
- System.exit(res);
- }
- }
- chainJobs.sh
- #!/bin/sh
- hadoop fs -rm -r output
- hadoop fs -rm -r TopNCiting-tmp
- hadoop jar HadoopIA.jar ch5.TopNCiting IA/cite_100000.txt output
- hadoop fs -cat output/* | less
A lot of data processing tasks involve record-oriented preprocessing and postprocessing. For example, in processing documents for information retrieval , you may have one step to remove stop words , and another step for stemming. You can write a separate MapReduce job for each of these pre- and postprocessing steps and chain them together, using IdentityReducer (or no reducer at all) for these steps. This approach is inefficient as each step in the chain takes up I/O and storage to process the intermediate results. Another approach is for you to write your mapper such that it calls all the preprocessing steps beforehand and the reducer to call all the postprocessing steps afterward. This forces you to architect the pre- and postprocessing steps in a modular and composable manner. Hadoop introduced the ChainMapper and the ChainReducer classes in version 0.19.0 to simplify the composition of pre- and postprocessing.
You can think of chaining MapReduce jobs, as explained in section 5.1.1, symbolically using the pseudo-regular expression:
where a reducer REDUCE comes after a mapper MAP, and this [MAP | REDUCE] sequence can repeat itself one or more times, one right after another. The analogous expression for a job using ChainMapper and ChainReducer would be:
The job runs multiple mappers in sequence to preprocess the data, and after running reduce it can optionally run multiple mappers in sequence to postprocess the data. The beauty of this mechanism is that you write the pre- and postprocessing steps as standard mappers. You can run each one of them individually if you want. (This is useful when you want to debug them individually.) You call the addMapper() method in ChainMapper and ChainReducer to compose the pre- and postprocessing steps, respectively. Running all the pre- and postprocessing steps in a single job leaves no intermediate file and there’s a dramatic reduction in I/O.
Consider the example where there are four mappers (Map1, Map2, Map3, and Map4) and one reducer (Reduce), and they’re chained into a single MapReduce job in this sequence:
You should think of Map2 and Reduce as the core of the MapReduce job, with the standard partitioning and shuffling applied between the mapper and reducer. You should consider Map1 as a preprocessing step and Map3 and Map4 as postprocessing steps. The number of processing steps can vary.
You can specify the composition of this sequence of mappers and reducer with the driver. See listing 5.1. You need to make sure the key and value outputs of one task have matching types (classes) with the inputs of the next task.
Supplement
* slideshare - Hadoop Tutorial: Map-Reduce Part 8 -- MapReduce Workflows
* How to implement customized sort in Hadoop?
沒有留言:
張貼留言