This chapter convers:
We first cover HDFS, where you’ll store data that your Hadoop applications will process. Next we explain the MapReduce framework in more detail. In this chapter we get to know the Java classes and methods, as well as the underlying processing steps. We also learn how to read and write using different data formats.
3.1 Working with files in HDFS
HDFS is a filesystem designed for large-scale distributed data processing under frame works such as MapReduce. You can store a big data set of (say) 100 TB as a single file in HDFS , something that would overwhelm most other filesystems. We discussed in chapter 2 how to replicate the data for availability and distribute it over multiple machines to enable parallel processing. HDFS abstracts these details away and gives you the illusion that you’re dealing with only a single file.
Hadoop provides a set of command line utilities that work similarly to the Linux file commands. In the next section we’ll discuss those Hadoop file shell commands, which are your primary interface with the HDFS system. Section 3.1.2 covers Hadoop Java libraries for handling HDFS files programmatically.
3.1.1 Basic file commands
Hadoop file commands take the form of:
where cmd is the specific file command and args is a variable number of arguments. The command cmd is usually named after the corresponding Unix equivalent. For example, the command for listing files is:
Let’s look at the most common file management tasks in Hadoop, which include
For more, please refer to File System Shell.
3.1.2 Reading and writing to HDFS programmatically
To motivate an examination of the HDFS Java API, we’ll develop a PutMerge program for merging files while putting them into HDFS. The command line utilities don’t support this operation; we’ll use the API.
The motivation for this example came when we wanted to analyze Apache log files coming from many web servers. We can copy each log file into HDFS, but in general, Hadoop works more effectively with a single large file rather than a number of smaller ones. (“Smaller” is relative here as it can still be tens or hundreds of gigabytes.) Besides, for analytics purposes we think of the log data as one big file. That it’s spread over multiple files is an incidental result of the physical web server architecture. One solution is to merge all the files first and then copy the combined file into HDFS. Unfortunately, the file merging will require a lot of disk space in the local machine. It would be much easier if we could merge all the files on the fly as we copy them into HDFS.
What we need is, therefore, a PutMerge-type of operation. Hadoop’s command line utilities include a getmerge command for merging a number of HDFS files before copying them onto the local machine. What we’re looking for is the exact opposite. This is not available in Hadoop’s file utilities.
The main classes for file manipulation in Hadoop are in the package org.apache.hadoop.fs. Basic Hadoop file operations include the familiar open, read, write, and close. In fact, the Hadoop file API is generic and can be used for working with filesystems other than HDFS. For our PutMerge program, we’ll use the Hadoop file API to both read the local filesystem and write to HDFS.
The starting point for the Hadoop file API is the FileSystem class . This is an abstract class for interfacing with the filesystem, and there are different concrete subclasses for handling HDFS and the local filesystem. You get the desired FileSystem instance by calling the factory method FileSystem.get(Configuration conf). The Configurationclass is a special class for holding key/value configuration parameters. Its default instantiation is based on the resource configuration for your HDFS system. We can get the FileSystem object to interface with HDFS by:
- Configuration conf = new Configuration();
- FileSystem hdfs = FileSystem.get(conf);
Hadoop file API uses Path objects to encode file and directory names and FileStatus objects to store metadata for files and directories. Our PutMerge program will merge all files from a local directory. We use the FileSystem’s listStatus() method to get a list of files in a directory.
- Path inputDir = new Path(args[0]);
- FileStatus[] inputFiles = local.listStatus(inputDir);
- FSDataInputStream in = local.open(inputFiles[i].getPath());
- byte buffer[] = new byte[256];
- int bytesRead = 0;
- while( (bytesRead = in.read(buffer)) > 0)
- {
- ...
- }
- in.close();
- Path hdfsFile = new Path(args[1]);
- FSDataOutputStream out = hdfs.create(hdfsFile);
- out.write(buffer, 0, bytesRead);
- out.close();
- Listing 3.1 A PutMerge program
- package demo.hdfs;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- public class PutMerge {
- public static void main(String args[]) throws IOException {
- Configuration conf = new Configuration();
- FileSystem hdfs = FileSystem.get(conf);
- FileSystem local = FileSystem.getLocal(conf);
- Path inputDir = new Path(args[0]); // Specify input directory
- Path hdfsFile = new Path(args[1]); // and output file
- try { // Get list of local files
- FileStatus[] inputFiles = local.listStatus(inputDir);
- FSDataOutputStream out = hdfs.create(hdfsFile);
- for (int i = 0; i < inputFiles.length; i++) {
- System.out.println(inputFiles[i].getPath().getName());
- FSDataInputStream in = local.open(inputFiles[i].getPath());
- byte buffer[] = new byte[256]; // input stream
- int bytesRead = 0;
- while ((bytesRead = in.read(buffer)) > 0) {
- out.write(buffer, 0, bytesRead);
- }
- in.close();
- }
- out.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
3.2 Anatomy of a MapReduce program
As we have mentioned before, a MapReduce program processes data by manipulating (key/value) pairs in the general form:
Not surprisingly, this is an overly generic representation of the data flow. In this section we learn more details about each stage in a typical MapReduce program. Figure 3.1 displays a high-level diagram of the entire process, and we further dissect each component as we step through the flow.
Figure 3.1 The general MapReduce data flow.
Before we analyze how data gets passed onto each individual stage, we should first familiarize ourselves with the data types that Hadoop supports.
3.2.1 Hadoop data types
Despite our many discussions regarding keys and values, we have yet to mention their types. The MapReduce framework won’t allow them to be any arbitrary class. For example, although we can and often do talk about certain keys and values as integers, strings, and so on, they aren’t exactly standard Java classes, such as Integer, String, and so forth. This is because the MapReduce framework has a certain defined way of serializing the key/value pairs to move them across the cluster’s network, andonly classes that support this kind of serialization can function as keys or values in the framework.
More specifically, classes that implement the Writable interface can be values, and classes that implement the WritableComparable
Hadoop comes with a number of predefined classes that implement WritableComparable, including wrapper classes for all the basic data types, as seen in table 3.1.
Keys and values can take on types beyond the basic ones which Hadoop natively supports. You can create your own custom type as long as it implements the Writable(or WritableComparable
- Listing 3.2 An example class that implements the WritableComparable interface
- package demo.io;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import org.apache.hadoop.io.WritableComparable;
- public class Edge implements WritableComparable
{ - private String departureNode;
- private String arrivalNode;
- public String getDepartureNode() {
- return departureNode;
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- departureNode = in.readUTF();
- arrivalNode = in.readUTF();
- }
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeUTF(departureNode);
- out.writeUTF(arrivalNode);
- }
- @Override
- public int compareTo(Edge o) {
- return (departureNode.compareTo(o.departureNode) != 0) ? departureNode
- .compareTo(o.departureNode) : arrivalNode
- .compareTo(o.arrivalNode);
- }
- }
3.2.2 Mapper
To serve as the mapper, a class implements from the Mapper interface and inherits the MapReduceBase class . The MapReduceBase class, not surprisingly, serves as the base class for both mappers and reducers. It includes two methods that effectively act as the constructor and destructor for the class:
■ void configure( JobConf job)
■ void close ()
The Mapper interface is responsible for the data processing step. It utilizes Java generics of the form Mapper
- void map( K1 key,
- V1 value,
- OutputCollector
output, - Reporter reporter
- ) throws IOException
- protected void map(KEYIN key,
- VALUEIN value,
- org.apache.hadoop.mapreduce.Mapper.Context context)
- throws IOException,
- InterruptedException
Hadoop provides a few useful mapper implementations. You can see some of them in the table 3.2.
As the MapReduce name implies, the major data flow operation after map is the reduce phase, shown in the bottom part of figure 3.1.
3.2.3 Reducer
As with any mapper implementation, a reducer must first extend the MapReduce base class to allow for configuration and cleanup. In addition, it must also implement theReducer interface which has the following single method:
- void reduce(K2 key,
- Iterator
values, - OutputCollector
output, - Reporter reporter)
- throws IOException
Table 3.3 lists a couple of basic reducer implementations provided by Hadoop.
Although we have referred to Hadoop programs as MapReduce applications, there is a vital step between the two stages: directing the result of the mappers to the different reducers. This is the responsibility of the partitioner.
3.2.4 Partitioner— redirecting output from Mapper
A common misconception for first-time MapReduce programmers is to use only a single reducer. After all, a single reducer sorts all of your data before processing— and who doesn’t like sorted data? Our discussions regarding MapReduce expose the folly of such thinking. We would have ignored the benefits of parallel computation . With one reducer, our compute cloud has been demoted to a compute raindrop.
With multiple reducers, we need some way to determine the appropriate one to send a (key/value) pair outputted by a mapper. The default behavior is to hash the key to determine the reducer. Hadoop enforces this strategy by use of the HashPartitioner class . Sometimes the HashPartitioner will steer you awry. Let’s return to the Edgeclass introduced in section 3.2.1.
Suppose you used the Edge class to analyze flight information data to determine the number of passengers departing from each airport. Such data may be:
If you used HashPartitioner, the two rows could be sent to different reducers. The number of departures would be processed twice and both times erroneously. How do we customize the partitioner for your applications? In this situation, we want all edges with a common departure point to be sent to the same reducer. This is done easily enough by hashing the departureNode member of the Edge:
- package demo.io;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapreduce.Partitioner;
- public class EdgePartitioner extends Partitioner
{ - @Override
- public int getPartition(Edge edge, Writable w, int numPartitions) {
- return edge.getDepartureNode().hashCode() % numPartitions;
- }
- }
Between the map and reduce stages, a MapReduce application must take the output from the mapper tasks and distribute the results among the reducer tasks. This process is typically called shuffling , because the output of a mapper on a single node may be sent to reducers across multiple nodes in the cluster.
3.2.5 Combiner—local reduce
In many situations with MapReduce applications, we may wish to perform a "local reduce" before we distribute the mapper results. Consider the WordCount example of chapter 1 once more. If the job processes a document containing the word “the” 574 times, it’s much more efficient to store and shuffle the pair (“the”, 574) once instead of the pair (“the”, 1) multiple times. This processing step is known as combining. We explain combiners in more depth in section 4.6.
3.3 Reading and writing:
Let’s see how MapReduce reads input data and writes output data and focus on the file formats it uses. To enable easy distributed processing, MapReduce makes certain assumptions about the data it’s processing. It also provides flexibility in dealing with a variety of data formats.
Input data usually resides in large files, typically tens or hundreds of gigabytes or even more. One of the fundamental principles of MapReduce’s processing power is the splitting of the input data into chunks. You can process these chunks in parallel using multiple machines. In Hadoop terminology these chunks are called input splits.
The size of each split should be small enough for a more granular parallelization . (If all the input data is in one split, then there is no parallelization.) On the other hand, each split shouldn’t be so small that the overhead of starting and stopping the processing of a split becomes a large fraction of execution time.
HDFS stores files in blocks spread over multiple machines. Roughly speaking, each file block is a split. As different machines will likely have different blocks, parallelization is automatic if each split/block is processed by the machine that it’s residing at. Furthermore, as HDFS replicates blocks in multiple nodes for reliability, MapReduce can choose any of the nodes that have a copy of a split/block.
You’ll recall that MapReduce works on key/value pairs. So far we’ve seen that Hadoop by default considers each line in the input file to be a record and the key/value pair is the byte offset (key) and content of the line (value), respectively. You may not have recorded all your data that way. Hadoop supports a few other data formats and allows you to define your own.
3.3.1 InputFormat
The way an input file is split up and read by Hadoop is defined by one of the implementations of the InputFormat interface . TextInputFormat is the default InputFormat implementation, and it’s the data format we’ve been implicitly using up to now. It’s often useful for input data that has no definite key value, when you want to get the content one line at a time.
Table 3.4 lists other popular implementations of InputFormat along with a description of the key/value pair each one passes to the mapper.
KeyValueTextInputFormat is used in the more structured input files where a predefined character, usually a tab (\t), separates the key and value of each line (record). For example, you may have a tab-separated data file of timestamps and URLs:
You can set your JobConf object to use the KeyValueTextInputFormat class to read this file.
- conf.setInputFormat(KeyValueTextInputFormat.class);
- Configuration conf = new Configuration();
- Job job = new Job(conf, "word count");
- job.setInputFormatClass(FileInputFormat.class);
The input data to your MapReduce job does not necessarily have to be some external data. In fact it’s often the case that the input to one MapReduce job is the output of some other MapReduce job. As we’ll see, you can customize your output format too. The default output format writes the output in the same format thatKeyValueTextInputFormat can read back in (i.e., each line is a record with key and value separated by a tab character). Hadoop provides a much more efficient binary compressed file format called sequence file. This sequence file is optimized for Hadoop processing and should be the preferred format when chaining multiple MapReduce jobs. The InputFormat class to read sequence files is SequenceFileInputFormat. The object type for key and value in a sequence file are definable by the user. The output and the input type have to match, and your Mapper implementation and map() method have to take in the right input type.
CREATING A CUSTOM INPUTFORMAT—INPUTSPLIT AND RECORDREADER
Sometimes you may want to read input data in a way different from the standard InputFormat classes. In that case you’ll have to write your own custom InputFormat class. Let’s look at what it involves. InputFormat is an interface consisting of only two methods:
The two methods sum up the functions that InputFormat has to perform:
Who wants to worry about how files are divided into splits? In creating your own InputFormat class you should subclass the FileInputFormat class, which takes care of file splitting. In fact, all the InputFormat classes in table 3.4 subclass FileInputFormat. FileInputFormat implements the getSplits() method but leaves getRecordReader()abstract for the subclass to fill out. FileInputFormat’s getSplits() implementation tries to divide the input data into roughly the number of splits specified in numSplits, subject to the constraints that each split must have more than mapred.min.split.size number of bytes but also be smaller than the block size of the filesystem. In practice, a split usually ends up being the size of a block, which defaults to 64 MB in HDFS.
In using FileInputFormat you focus on customizing RecordReader, which is responsible for parsing an input split into records and then parsing each record into a key/value pair. Let’s look at the signature of this interface.
- public interface RecordReader
{ - boolean next(K key, V value) throws IOException;
- K createKey();
- V createValue();
- long getPos() throws IOException;
- public void close() throws IOException;
- float getProgress() throws IOException;
- }
One use case for writing your own custom InputFormat class is to read records in a specific type rather than the generic Text type. For example, we had previously usedKeyValueTextInputFormat to read a tab-separated data file of timestamps and URLs. The class ends up treating both the timestamp and the URL as Text type. For our illustration, let’s create a TimeUrlTextInputFormat that works exactly the same but treats the URL as a URLWritable type. As mentioned earlier, we create ourInputFormat class by extending FileInputFormat and implementing the factory method to return our RecordReader.
One use case for writing your own custom InputFormat class is to read records in a specific type rather than the generic Text type. As mentioned earlier, we create ourInputFormat class by extending FileInputFormat and implementing the factory method to return our RecordReader:
- TimeUrlTextInputFormat.java
- package demo.io;
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
- public class TimeUrlTextInputFormat extends FileInputFormat
{ - @Override
- public RecordReader
createRecordReader(InputSplit split, - TaskAttemptContext context) throws IOException, InterruptedException {
- return new SequenceFileRecordReader
(); - }
- }
- URLWritable.java
- package demo.io;
- import java.io.DataInput;
- public class URLWritable implements Writable{
- protected URL url;
- public URLWritable(){}
- public URLWritable(URL url){this.url = url;}
- @Override
- public void readFields(DataInput input) throws IOException {
- url = new URL(input.readUTF());
- }
- @Override
- public void write(DataOutput output) throws IOException {
- output.writeUTF(url.toString());
- }
- public void set(String s) throws MalformedURLException {
- url = new URL(s);
- }
- }
MapReduce outputs data into files using the OutputFormat class , which is analogous to the InputFormat class. The output has no splits, as each reducer writes its output only to its own file. The output files reside in a common directory and are typically named part-nnnnn, where nnnnn is the partition ID of the reducer. RecordWriterobjects format the output and RecordReaders parse the format of the input.
Hadoop provides several standard implementations of OutputFormat, as shown in table 3.5. Not surprisingly, almost all the ones we deal with inherit from theFileOutputFormat class. You specify the OutputFormat by calling setOutputFormat() of the JobConf object that holds the configuration of your MapReduce job.
The default OutputFormat is TextOutputFormat, which writes each record as a line of text. Each record’s key and value are converted to strings through toString(), and a tab (\t) character separates them. The separator character can be changed in the mapred.textoutputformat.separator property.
If you want to suppress the output completely, then you should use the NullOutputFormat. Suppressing the Hadoop output is useful if your reducer writes its output in its own way and doesn’t need Hadoop to write any additional files. Finally, SequenceFileOutputFormat writes the output in a sequence file format that can be read back in using SequenceFileInputFormat. It’s useful for writing intermediate data results when chaining MapReduce jobs.