This chapter covers
As your data processing becomes more complex you’ll want to exploit different Hadoop features. This chapter will focus on some of these more advanced techniques. When handling advanced data processing, you’ll often find that you can’t program the process into a single MapReduce job. Hadoop supports chaining MapReduce programs together to form a bigger job.
You’ll also find that advanced data processing often involves more than one data set. We’ll explore various joining techniques in Hadoop for simultaneously processing multiple data sets. You can code certain data processing tasks more efficiently when processing a group of records at a time.
We’ve seen how Streaming natively supports the ability to process a whole split at a time, and the Streaming implementation of the maximum function takes advantage of this ability. We’ll see that the same is true for Java programs. We’ll discover the Bloom filter and implement it with a mapper that keeps state information across records.
Chaining MapReduce jobs
You’ve been doing data processing tasks which a single MapReduce job can accomplish. As you get more comfortable writing MapReduce programs and take on more ambitious data processing tasks, you’ll find that many complex tasks need to be broken down into simpler subtasks, each accomplished by an individual MapReduce job. For example, from the citation data set you may be interested in finding the ten most-cited patents. A sequence of two MapReduce jobs can do this. The first one creates the “inverted” citation data set and counts the number of citations for each patent, and the second job finds the top ten in that “inverted” data.
Chaining MapReduce jobs in a sequence
Though you can execute the two jobs manually one after the other, it’s more convenient to automate the execution sequence. You can chain MapReduce jobs to run sequentially, with the output of one MapReduce job being the input to the next. Chaining MapReduce jobs is analogous to Unix pipes .
Chaining MapReduce jobs sequentially is quite straightforward. The driver at each job will have to create a new Job object and set its input path to be the output path of the previous job. You can delete the intermediate data generated at each step of the chain at the end.
Chaining MapReduce jobs with complex dependency
Sometimes the subtasks of a complex data processing task don’t run sequentially, and their MapReduce jobs are therefore not chained in a linear fashion. For example, mapreduce1 may process one data set while mapreduce2 independently processes another data set. The third job, mapreduce3, performs an inner join of the first two jobs’ output. (We’ll discuss data joining in the next sections.) It’s dependent on the other two and can execute only after both mapreduce1 and mapreduce2 are completed. But mapreduce1 and mapreduce2 aren’t dependent on each other.
Hadoop has a mechanism to simplify the management of such (nonlinear) job dependencies via the JobControl class. A Job object is a representation of a MapReduce job while the JobControl class encapsulates a set of MapReduce jobs and its dependency:
So if we want to search top 10 patent with most citing count, we will have two MapReduce jobs:
The first job will invert key/value of input data to "CITED PATENT\tCITING COUNT".
- Mapper of Job1
- Descending IntWritable Comparator
- Mapper of job2
- Driver class to run job1/job2
A lot of data processing tasks involve record-oriented preprocessing and postprocessing. For example, in processing documents for information retrieval , you may have one step to remove stop words , and another step for stemming. You can write a separate MapReduce job for each of these pre- and postprocessing steps and chain them together, using IdentityReducer (or no reducer at all) for these steps. This approach is inefficient as each step in the chain takes up I/O and storage to process the intermediate results. Another approach is for you to write your mapper such that it calls all the preprocessing steps beforehand and the reducer to call all the postprocessing steps afterward. This forces you to architect the pre- and postprocessing steps in a modular and composable manner. Hadoop introduced the ChainMapper and the ChainReducer classes in version 0.19.0 to simplify the composition of pre- and postprocessing.
You can think of chaining MapReduce jobs, as explained in section 5.1.1, symbolically using the pseudo-regular expression:
where a reducer REDUCE comes after a mapper MAP, and this [MAP | REDUCE] sequence can repeat itself one or more times, one right after another. The analogous expression for a job using ChainMapper and ChainReducer would be:
The job runs multiple mappers in sequence to preprocess the data, and after running reduce it can optionally run multiple mappers in sequence to postprocess the data. The beauty of this mechanism is that you write the pre- and postprocessing steps as standard mappers. You can run each one of them individually if you want. (This is useful when you want to debug them individually.) You call the addMapper() method in ChainMapper and ChainReducer to compose the pre- and postprocessing steps, respectively. Running all the pre- and postprocessing steps in a single job leaves no intermediate file and there’s a dramatic reduction in I/O.
Consider the example where there are four mappers (Map1, Map2, Map3, and Map4) and one reducer (Reduce), and they’re chained into a single MapReduce job in this sequence:
You should think of Map2 and Reduce as the core of the MapReduce job, with the standard partitioning and shuffling applied between the mapper and reducer. You should consider Map1 as a preprocessing step and Map3 and Map4 as postprocessing steps. The number of processing steps can vary.
You can specify the composition of this sequence of mappers and reducer with the driver. See listing 5.1. You need to make sure the key and value outputs of one task have matching types (classes) with the inputs of the next task.
* slideshare - Hadoop Tutorial: Map-Reduce Part 8 -- MapReduce Workflows
* How to implement customized sort in Hadoop?