程式扎記: [ In Action ] Ch3. Components of Hadoop

標籤

2013年11月26日 星期二

[ In Action ] Ch3. Components of Hadoop

Preface: 
This chapter convers: 
■ Managing files in HDFS
■ Analyzing components of the MapReduce framework
■ Reading and writing input and output data

We first cover HDFS, where you’ll store data that your Hadoop applications will process. Next we explain the MapReduce framework in more detail. In this chapter we get to know the Java classes and methods, as well as the underlying processing steps. We also learn how to read and write using different data formats. 

3.1 Working with files in HDFS 
HDFS is a filesystem designed for large-scale distributed data processing under frame­ works such as MapReduce. You can store a big data set of (say) 100 TB as a single file in HDFS , something that would overwhelm most other filesystems. We discussed in chapter 2 how to replicate the data for availability and distribute it over multiple ma­chines to enable parallel processing. HDFS abstracts these details away and gives you the illusion that you’re dealing with only a single file. 

Hadoop provides a set of command line utilities that work similarly to the Linux file commands. In the next section we’ll discuss those Hadoop file shell commands, which are your primary interface with the HDFS system. Section 3.1.2 covers Hadoop Java libraries for handling HDFS files programmatically. 

3.1.1 Basic file commands 
Hadoop file commands take the form of: 
$ hadoop fs -cmd args

where cmd is the specific file command and args is a variable number of arguments. The command cmd is usually named after the corresponding Unix equivalent. For example, the command for listing files is: 
$ hadoop fs -ls

Let’s look at the most common file management tasks in Hadoop, which include 
■ Adding files and directories: mkdirlsputlsr
■ Retrieving files: getcattail
■ Deleting files: rmrmr

For more, please refer to File System Shell

3.1.2 Reading and writing to HDFS programmatically 
To motivate an examination of the HDFS Java API, we’ll develop a PutMerge program for merging files while putting them into HDFS. The command line utilities don’t sup­port this operation; we’ll use the API. 

The motivation for this example came when we wanted to analyze Apache log files coming from many web servers. We can copy each log file into HDFS, but in general, Hadoop works more effectively with a single large file rather than a number of smaller ones. (“Smaller” is relative here as it can still be tens or hundreds of gigabytes.) Besides, for analytics purposes we think of the log data as one big file. That it’s spread over multiple files is an incidental result of the physical web server architecture. One solution is to merge all the files first and then copy the combined file into HDFS. Unfortunately, the file merging will require a lot of disk space in the local machine. It would be much easier if we could merge all the files on the fly as we copy them into HDFS. 

What we need is, therefore, a PutMerge-type of operation. Hadoop’s command line utilities include a getmerge command for merging a number of HDFS files before copying them onto the local machine. What we’re looking for is the exact opposite. This is not available in Hadoop’s file utilities. 

The main classes for file manipulation in Hadoop are in the package org.apache.hadoop.fs. Basic Hadoop file operations include the familiar open, read, write, and close. In fact, the Hadoop file API is generic and can be used for working with filesystems other than HDFS. For our PutMerge program, we’ll use the Hadoop file API to both read the local filesystem and write to HDFS. 

The starting point for the Hadoop file API is the FileSystem class . This is an abstract class for interfacing with the filesystem, and there are different concrete subclasses for handling HDFS and the local filesystem. You get the desired FileSystem instance by calling the factory method FileSystem.get(Configuration conf). The Configurationclass is a special class for holding key/value configuration parameters. Its default instantiation is based on the resource configuration for your HDFS system. We can get the FileSystem object to interface with HDFS by: 
  1. Configuration conf = new Configuration();  
  2. FileSystem hdfs = FileSystem.get(conf);  
To get a FileSystem object specifically for the local filesystem, there’s the FileSystem.getLocal(Configuration conf) factory method. 

Hadoop file API uses Path objects to encode file and directory names and FileStatus objects to store metadata for files and directories. Our PutMerge program will merge all files from a local directory. We use the FileSystem’s listStatus() method to get a list of files in a directory. 
  1. Path inputDir = new Path(args[0]);  
  2. FileStatus[] inputFiles = local.listStatus(inputDir);  
The length of the inputFiles array is the number of files in the specified directory. Each FileStatus object in inputFiles has metadata information such as file length, permissions, modification time, and others. Of interest to our PutMerge program is each file’s Path representation, inputFiles[i].getPath(). We can use this Path to request an FSDataInputStream object for reading in the file: 
  1. FSDataInputStream in = local.open(inputFiles[i].getPath());  
  2. byte buffer[] = new byte[256];  
  3. int bytesRead = 0;  
  4.   
  5. while( (bytesRead = in.read(buffer)) > 0)   
  6. {  
  7.     ...  
  8. }  
  9. in.close();  
FSDataInputStream is a subclass of Java’s standard java.io.DataInputStream with ad­ditional support for random access. For writing to a HDFS file, there’s the analogousFSDataOutputStream object: 
  1. Path hdfsFile = new Path(args[1]);  
  2. FSDataOutputStream out = hdfs.create(hdfsFile);  
  3. out.write(buffer, 0, bytesRead);  
  4. out.close();  
To complete the PutMerge program, we create a loop that goes through all the files in inputFiles as we read each one in and write it out to the destination HDFS file. You can see the complete program in listing 3.1. 
- Listing 3.1 A PutMerge program 
  1. package demo.hdfs;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.fs.FSDataInputStream;  
  7. import org.apache.hadoop.fs.FSDataOutputStream;  
  8. import org.apache.hadoop.fs.FileStatus;  
  9. import org.apache.hadoop.fs.FileSystem;  
  10. import org.apache.hadoop.fs.Path;  
  11.   
  12. public class PutMerge {  
  13.     public static void main(String args[]) throws IOException {  
  14.         Configuration conf = new Configuration();  
  15.         FileSystem hdfs = FileSystem.get(conf);  
  16.         FileSystem local = FileSystem.getLocal(conf);  
  17.         Path inputDir = new Path(args[0]); // Specify input directory  
  18.         Path hdfsFile = new Path(args[1]); // and output file  
  19.         try { // Get list of local files  
  20.             FileStatus[] inputFiles = local.listStatus(inputDir);   
  21.             FSDataOutputStream out = hdfs.create(hdfsFile);                                                           
  22.             for (int i = 0; i < inputFiles.length; i++) {  
  23.                 System.out.println(inputFiles[i].getPath().getName());  
  24.                 FSDataInputStream in = local.open(inputFiles[i].getPath());                                                                           
  25.                 byte buffer[] = new byte[256]; // input stream  
  26.                 int bytesRead = 0;  
  27.                 while ((bytesRead = in.read(buffer)) > 0) {  
  28.                     out.write(buffer, 0, bytesRead);  
  29.                 }  
  30.                 in.close();  
  31.             }  
  32.             out.close();  
  33.         } catch (IOException e) {  
  34.             e.printStackTrace();  
  35.         }  
  36.     }  
  37. }  
We have covered how to work with files in HDFS. You now know a few ways to put data into and out of HDFS. But merely having data isn’t terribly interesting. You want to process it, analyze it, and do other things. Let’s conclude our discussion of HDFS and move on to the other major component of Hadoop, the MapReduce framework, and how to program under it. 

3.2 Anatomy of a MapReduce program 
As we have mentioned before, a MapReduce program processes data by manipulating (key/value) pairs in the general form: 
map: (K1,V1) > list(K2,V2)
reduce: (K2,list(V2)) > list(K3,V3)

Not surprisingly, this is an overly generic representation of the data flow. In this section we learn more details about each stage in a typical MapReduce program. Figure 3.1 displays a high-level diagram of the entire process, and we further dissect each compo­nent as we step through the flow. 
 
Figure 3.1 The general MapReduce data flow. 

Before we analyze how data gets passed onto each individual stage, we should first fa­miliarize ourselves with the data types that Hadoop supports. 

3.2.1 Hadoop data types 
Despite our many discussions regarding keys and values, we have yet to mention their types. The MapReduce framework won’t allow them to be any arbitrary class. For ex­ample, although we can and often do talk about certain keys and values as integers, strings, and so on, they aren’t exactly standard Java classes, such as Integer, String, and so forth. This is because the MapReduce framework has a certain defined way of serializing the key/value pairs to move them across the cluster’s network, andonly classes that support this kind of serialization can function as keys or values in the framework

More specifically, classes that implement the Writable interface can be values, and classes that implement the WritableComparable interface can be either keys or values. Note that the WritableComparable interface is a combination of the Writable and java.lang.Comparable interfaces . We need the comparability requirement for keys because they will be sorted at the reduce stage, whereas values are simply passed through. 

Hadoop comes with a number of predefined classes that implement WritableComparable, including wrapper classes for all the basic data types, as seen in table 3.1. 
 

Keys and values can take on types beyond the basic ones which Hadoop natively sup­ports. You can create your own custom type as long as it implements the Writable(or WritableComparable) interface. Listing 3.2 shows a class that can represent edges in a network. This may represent a flight route between two cities: 
- Listing 3.2 An example class that implements the WritableComparable interface 
  1. package demo.io;  
  2.   
  3. import java.io.DataInput;  
  4. import java.io.DataOutput;  
  5. import java.io.IOException;  
  6.   
  7. import org.apache.hadoop.io.WritableComparable;  
  8.   
  9. public class Edge implements WritableComparable {  
  10.     private String departureNode;  
  11.     private String arrivalNode;  
  12.   
  13.     public String getDepartureNode() {  
  14.         return departureNode;  
  15.     }  
  16.   
  17.     @Override  
  18.     public void readFields(DataInput in) throws IOException {  
  19.         departureNode = in.readUTF();  
  20.         arrivalNode = in.readUTF();  
  21.     }  
  22.   
  23.     @Override  
  24.     public void write(DataOutput out) throws IOException {  
  25.         out.writeUTF(departureNode);  
  26.         out.writeUTF(arrivalNode);  
  27.     }  
  28.   
  29.     @Override  
  30.     public int compareTo(Edge o) {  
  31.         return (departureNode.compareTo(o.departureNode) != 0) ? departureNode  
  32.                 .compareTo(o.departureNode) : arrivalNode  
  33.                 .compareTo(o.arrivalNode);  
  34.     }  
  35. }  
With the data type interfaces now defined, we can proceed to the first stage of the data flow process as described in figure 3.1: the mapper. 

3.2.2 Mapper 
To serve as the mapper, a class implements from the Mapper interface and inherits the MapReduceBase class . The MapReduceBase class, not surprisingly, serves as the base class for both mappers and reducers. It includes two methods that effectively act as the constructor and destructor for the class: 
■ void configure( JobConf job) 
In this function you can extract the parameters set either by the configuration XML files or in the main class of your application. Call this function before any data processing begins.

■ void close () 
As the last action before the map task terminates, this function should wrap up any loose ends—database connections, open files, and so on.

The Mapper interface is responsible for the data processing step. It utilizes Java generics of the form Mapper where the key classes and value classes implement the WritableComparable and Writable interfaces, respectively. Its single method is to process an individual (key/value) pair
  1. void map( K1 key,  
  2.             V1 value,  
  3.             OutputCollector output,  
  4.             Reporter reporter  
  5.            ) throws IOException  
Or you can extends org.apache.hadoop.mapreduce.Mapper and overwrite its process method: 
  1. protected void map(KEYIN key,  
  2.                    VALUEIN value,  
  3.                    org.apache.hadoop.mapreduce.Mapper.Context context)  
  4.             throws IOException,  
  5.                    InterruptedException  
The function generates a (possibly empty) list of (K2, V2) pairs for a given (K1, V1) input pair. The OutputCollector receives the output of the mapping process, and the Reporter provides the option to record extra information about the mapper as the task progresses. 

Hadoop provides a few useful mapper implementations. You can see some of them in the table 3.2. 
 

As the MapReduce name implies, the major data flow operation after map is the re­duce phase, shown in the bottom part of figure 3.1. 

3.2.3 Reducer 
As with any mapper implementation, a reducer must first extend the MapReduce base class to allow for configuration and cleanup. In addition, it must also implement theReducer interface which has the following single method: 
  1. void reduce(K2 key,  
  2.             Iterator values,  
  3.             OutputCollector output,  
  4.             Reporter reporter)  
  5.             throws IOException  
When the reducer task receives the output from the various mappers, it sorts the incoming data on the key of the (key/value) pair and groups together all values of the same key. The reduce() function is then called, and it generates a (possibly empty) list of (K3, V3) pairs by iterating over the values associated with a given key. TheOutputCollector receives the output of the reduce process and writes it to an output file. The Reporter provides the option to record extra information about the reducer as the task progresses. 

Table 3.3 lists a couple of basic reducer implementations provided by Hadoop. 
 

Although we have referred to Hadoop programs as MapReduce applications, there is a vital step between the two stages: directing the result of the mappers to the different reducers. This is the responsibility of the partitioner. 

3.2.4 Partitioner— redirecting output from Mapper 
A common misconception for first-time MapReduce programmers is to use only a single reducer. After all, a single reducer sorts all of your data before processing— and who doesn’t like sorted data? Our discussions regarding MapReduce expose the folly of such thinking. We would have ignored the benefits of parallel com­putation . With one reducer, our compute cloud has been demoted to a compute raindrop. 

With multiple reducers, we need some way to determine the appropriate one to send a (key/value) pair outputted by a mapper. The default behavior is to hash the key to determine the reducer. Hadoop enforces this strategy by use of the HashPartitioner class . Sometimes the HashPartitioner will steer you awry. Let’s return to the Edgeclass introduced in section 3.2.1

Suppose you used the Edge class to analyze flight information data to determine the number of passengers departing from each airport. Such data may be: 
(San Francisco, Los Angeles) Chuck Lam
(San Francisco, Dallas) James Warren
...

If you used HashPartitioner, the two rows could be sent to different reducers. The number of departures would be processed twice and both times erroneously. How do we customize the partitioner for your applications? In this situation, we want all edges with a common departure point to be sent to the same reducer. This is done easily enough by hashing the departureNode member of the Edge
  1. package demo.io;  
  2.   
  3. import org.apache.hadoop.io.Writable;  
  4. import org.apache.hadoop.mapreduce.Partitioner;  
  5.   
  6. public class EdgePartitioner extends Partitioner {  
  7.     @Override  
  8.     public int getPartition(Edge edge, Writable w, int numPartitions) {  
  9.         return edge.getDepartureNode().hashCode() % numPartitions;  
  10.     }  
  11. }  
A custom partitioner only needs handle getPartition(). The former returns an integer between 0 and the number of reduce tasks indexing to which reducer the (key/value) pair will be sent. The exact mechanics of the partitioner may be difficult to follow. Figure 3.2 illustrates this for better understanding: 
 

Between the map and reduce stages, a MapReduce application must take the output from the mapper tasks and distribute the results among the reducer tasks. This process is typically called shuffling , because the output of a mapper on a single node may be sent to reducers across multiple nodes in the cluster. 

3.2.5 Combiner—local reduce 
In many situations with MapReduce applications, we may wish to perform a "local reduce" before we distribute the mapper results. Consider the WordCount example of chapter 1 once more. If the job processes a document containing the word “the” 574 times, it’s much more efficient to store and shuffle the pair (“the”, 574) once instead of the pair (“the”, 1) multiple times. This processing step is known as combining. We explain combiners in more depth in section 4.6

3.3 Reading and writing: 
Let’s see how MapReduce reads input data and writes output data and focus on the file formats it uses. To enable easy distributed processing, MapReduce makes certain assumptions about the data it’s processing. It also provides flexibility in dealing with a variety of data formats. 

Input data usually resides in large files, typically tens or hundreds of gigabytes or even more. One of the fundamental principles of MapReduce’s processing power is the splitting of the input data into chunks. You can process these chunks in parallel using multiple machines. In Hadoop terminology these chunks are called input splits

The size of each split should be small enough for a more granular parallelization . (If all the input data is in one split, then there is no parallelization.) On the other hand, each split shouldn’t be so small that the overhead of starting and stopping the processing of a split becomes a large fraction of execution time. 

HDFS stores files in blocks spread over multiple machines. Roughly speaking, each file block is a split. As different machines will likely have different blocks, parallelization is automatic if each split/block is processed by the machine that it’s residing at. Furthermore, as HDFS replicates blocks in multiple nodes for reliability, MapReduce can choose any of the nodes that have a copy of a split/block. 

You’ll recall that MapReduce works on key/value pairs. So far we’ve seen that Hadoop by default considers each line in the input file to be a record and the key/value pair is the byte offset (key) and content of the line (value), respectively. You may not have recorded all your data that way. Hadoop supports a few other data formats and allows you to define your own. 

3.3.1 InputFormat 
The way an input file is split up and read by Hadoop is defined by one of the imple­mentations of the InputFormat interface . TextInputFormat is the default InputFormat implementation, and it’s the data format we’ve been implicitly using up to now. It’s often useful for input data that has no definite key value, when you want to get the content one line at a time. 

Table 3.4 lists other popular implementations of InputFormat along with a descrip­tion of the key/value pair each one passes to the mapper. 
 

KeyValueTextInputFormat is used in the more structured input files where a pre­defined character, usually a tab (\t), separates the key and value of each line (record). For example, you may have a tab-separated data file of timestamps and URLs: 
17:16:18 http://hadoop.apache.org/core/docs/r0.19.0/api/index.html
17:16:19 http://hadoop.apache.org/core/docs/r0.19.0/mapred_tutorial.html
17:16:20 http://wiki.apache.org/hadoop/GettingStartedWithHadoop
17:16:20 http://www.maxim.com/hotties/2008/finalist_gallery.aspx
17:16:25 http://wiki.apache.org/hadoop/
...

You can set your JobConf object to use the KeyValueTextInputFormat class to read this file. 
  1. conf.setInputFormat(KeyValueTextInputFormat.class);  
In the newest version of Hadoop, JobConf class has been depreciated. Instead, the Job class replaces JobConf to setup the InputFormatFileInputFormat replacesKeyValueTextInputFormat. Below is the sample code: 
  1. Configuration conf = new Configuration();  
  2. Job job = new Job(conf, "word count");    
  3. job.setInputFormatClass(FileInputFormat.class);  
Given the preceding example file, the first record your mapper reads will have a key of "17:16:18” and a value of “http://hadoop.apache.org/core/docs/r0.19.0/api/index.html”. 

The input data to your MapReduce job does not necessarily have to be some external data. In fact it’s often the case that the input to one MapReduce job is the output of some other MapReduce job. As we’ll see, you can customize your output format too. The default output format writes the output in the same format thatKeyValueTextInputFormat can read back in (i.e., each line is a record with key and value separated by a tab character). Hadoop provides a much more efficient binary compressed file format called sequence file. This sequence file is optimized for Hadoop processing and should be the preferred format when chaining multiple MapReduce jobs. The InputFormat class to read sequence files is SequenceFileInputFormat. The object type for key and value in a sequence file are definable by the user. The output and the input type have to match, and your Mapper implementation and map() method have to take in the right input type. 

CREATING A CUSTOM INPUTFORMAT—INPUTSPLIT AND RECORDREADER 
Sometimes you may want to read input data in a way different from the standard InputFormat classes. In that case you’ll have to write your own custom InputFormat class. Let’s look at what it involves. InputFormat is an interface consisting of only two methods: 
* getSplits(JobConf job, int numSplits): Logically split the set of input files for the job.
* getRecordReader(InputSplit split, JobConf job, Reporter reporter): Get the RecordReader for the given InputSplit.

The two methods sum up the functions that InputFormat has to perform: 
* Identify all the files used as input data and divide them into input splits. Each map task is assigned one split.
* Provide an object (RecordReader) to iterate through records in a given split, and to parse each record into key and value of predefined types.

Who wants to worry about how files are divided into splits? In creating your own InputFormat class you should subclass the FileInputFormat class, which takes care of file splitting. In fact, all the InputFormat classes in table 3.4 subclass FileInputFormatFileInputFormat implements the getSplits() method but leaves getRecordReader()abstract for the subclass to fill out. FileInputFormat’s getSplits() implementation tries to divide the input data into roughly the number of splits specified in numSplits, subject to the constraints that each split must have more than mapred.min.split.size number of bytes but also be smaller than the block size of the filesystem. In practice, a split usually ends up being the size of a block, which defaults to 64 MB in HDFS

In using FileInputFormat you focus on customizing RecordReader, which is responsible for parsing an input split into records and then parsing each record into a key/value pair. Let’s look at the signature of this interface. 
  1. public interface RecordReader {  
  2. boolean next(K key, V value) throws IOException;  
  3. K createKey();  
  4. V createValue();  
  5. long getPos() throws IOException;  
  6. public void close() throws IOException;  
  7. float getProgress() throws IOException;  
  8. }  
Instead of writing our own RecordReader, we’ll again leverage existing classes pro­vided by Hadoop. For example, LineRecordReader implementsRecordReader. It’s used in TextInputFormat and reads one line at a time, with byte offset as key and line content as value. For the most part, your custom RecordReader will be a wrapper around an existing implementation, and most of the action will be in the next() method. 

One use case for writing your own custom InputFormat class is to read records in a specific type rather than the generic Text type. For example, we had previously usedKeyValueTextInputFormat to read a tab-separated data file of timestamps and URLs. The class ends up treating both the timestamp and the URL as Text type. For our illustration, let’s create a TimeUrlTextInputFormat that works exactly the same but treats the URL as a URLWritable type. As mentioned earlier, we create ourInputFormat class by extending FileInputFormat and implementing the factory method to return our RecordReader

One use case for writing your own custom InputFormat class is to read records in a specific type rather than the generic Text type. As mentioned earlier, we create ourInputFormat class by extending FileInputFormat and implementing the factory method to return our RecordReader
- TimeUrlTextInputFormat.java 
  1. package demo.io;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.mapreduce.InputSplit;  
  7. import org.apache.hadoop.mapreduce.RecordReader;  
  8. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  10. import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;  
  11.   
  12. public class TimeUrlTextInputFormat extends FileInputFormat{  
  13.     @Override  
  14.     public RecordReader createRecordReader(InputSplit split,  
  15.             TaskAttemptContext context) throws IOException, InterruptedException {        
  16.         return new SequenceFileRecordReader();  
  17.     }  
  18. }  
Our URLWritable class is quite straightforward: 
- URLWritable.java 
  1. package demo.io;  
  2.   
  3. import java.io.DataInput;  
  4.   
  5. public class URLWritable implements Writable{  
  6.     protected URL url;   
  7.       
  8.     public URLWritable(){}  
  9.     public URLWritable(URL url){this.url = url;}  
  10.   
  11.     @Override  
  12.     public void readFields(DataInput input) throws IOException {  
  13.         url = new URL(input.readUTF());       
  14.     }  
  15.   
  16.     @Override  
  17.     public void write(DataOutput output) throws IOException {  
  18.         output.writeUTF(url.toString());          
  19.     }  
  20.   
  21.     public void set(String s) throws MalformedURLException {  
  22.          url = new URL(s);  
  23.     }  
  24. }  
3.3.2 OutputFormat 
MapReduce outputs data into files using the OutputFormat class , which is analogous to the InputFormat class. The output has no splits, as each reducer writes its output only to its own file. The output files reside in a common directory and are typically named part-nnnnn, where nnnnn is the partition ID of the reducer. RecordWriterobjects format the output and RecordReaders parse the format of the input. 

Hadoop provides several standard implementations of OutputFormat, as shown in table 3.5. Not surprisingly, almost all the ones we deal with inherit from theFileOutputFormat class. You specify the OutputFormat by calling setOutputFormat() of the JobConf object that holds the configuration of your MapReduce job. 
 

The default OutputFormat is TextOutputFormat, which writes each record as a line of text. Each record’s key and value are converted to strings through toString(), and a tab (\t) character separates them. The separator character can be changed in the mapred.textoutputformat.separator property. 

If you want to suppress the output completely, then you should use the NullOutputFormat. Suppressing the Hadoop output is useful if your reducer writes its output in its own way and doesn’t need Hadoop to write any additional files. Finally, SequenceFileOutputFormat writes the output in a sequence file format that can be read back in using SequenceFileInputFormat. It’s useful for writing intermediate data results when chaining MapReduce jobs.

沒有留言:

張貼留言

網誌存檔

關於我自己

我的相片
Where there is a will, there is a way!