2014年12月24日 星期三

[ In Action ] Ch6. Programming Practice: Tuning Performance (Part3)

Preface (P152) 
After you have developed your MapReduce program and fully debugged it, you may want to start tuning it for performance. Before doing any optimization, note that one of the main attractions of Hadoop is its linear scalability. You can speed up many jobs by adding more machines. This makes economic sense when you have a small cluster. Consider the value of time it takes to optimize your program to gain a 10 percent improvement. For a 10-node cluster, you can get the same 10 percent performance gain by adding one machine (and this gain applies to all jobs on that cluster). The cost of your development time may well be higher than the cost of the additional computer. On the other hand, for a 1,000-node cluster, squeezing a 10 percent improvement through hardware will take 100 new machines. At that scale the brute force approach of adding hardware to boost performance may be less cost effective

Hadoop has a number of specific levers and knobs for tuning performance, some of which boost the effectiveness of the cluster as a whole. We cover those in the next chapter when we discuss system administration issues. In this section we examine techniques that can be applied on a per-job basis. 

Reducing network traffic with combiner 
Combiner can reduce the amount of data shuffled between the map and reduce phases, and lower network traffic improves execution time. The details and the benefits of using combiner are thoroughly described in section 4.6. We mention it here again for the sake of completeness. 

Reducing the amount of input data 
When processing large data sets , sometimes a nontrivial portion of the processing time is spent scanning data from disk. Reducing the number of bytes to read can enhance overall throughput. There are several ways to do this. 

The simplest way to reduce the amount of bytes processed is to reduce the amount of data processed. We can choose to process only a sampled subset of the data. This is a viable option for certain analytics applications. For those applications, sampling reduces precision but not accuracy. Their results remain useful for many decision support systems. 

Often your MapReduce jobs don’t use all the information in the input data set. Recall our patent description data set from chapter 4. It has almost a couple dozen fields, yet most of our jobs access only a few common ones. It’s inefficient for every job on that data set to read the unused fields every time. One can “refactor” the input data into several smaller data sets. Each has only the fields necessary for a particular type of data processing. The exact refactoring will be application dependent. This technique is similar in spirit to vertical partitioning and column-oriented databases in the relational database management system (RDBMS) world . 

Finally, you can reduce the amount of disk and network I/O by compressing your data. You can apply this technique to the intermediate as well as output data sets. Hadoop has many options for data compression, and we devote the next subsection to this topic. 

Using compression 
Even with the use of a combiner, the output of the map phase can be large. This intermediate data has to be stored on disk and shuffled across the network.Compressing this intermediate data will improve performance for most MapReduce jobs, and it’s easy too

Hadoop has built-in support for compression and decompression. Enabling compression on the mapper’s output involves setting two configuration properties, as you can see in table 6.3. 
 

To enable compression on the mapper’s output, you set mapred.compress.map.output to true. In addition, you should set mapred.map.output.compression.codecto the appropriate codec class. All codec classes in Hadoop implement the CompressionCodec interface. Hadoop supports a number of compression codecs (see table 6.4). For example, to use GZIP compression, you can set the configuration object: 
  1. conf.setBoolean("mapred.compress.map.output"true);  
  2. conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);  
You can also use the convenience methods setCompressMapOutput() and setMapOutputCompressorClass() in JobConf instead of setting the properties directly. 
 

Data output from the map phase of a job is used only internally to the job, so enabling compression for this intermediate data is transparent to the developer and is a no-brainer. As many MapReduce applications involve multiple jobs, it makes sense for jobs to be able to output and input in compressed form. It’s highly recommended that data that are passed between Hadoop jobs use the Hadoop-specific sequence file format

Sequence file is a compressable binary file format for storing key/value pairs. It is designed to support compression while remaining splittable.Recall that one of the parallelisms of Hadoop is its ability to split an input file for reading and processing by multiple map tasks. If the input file is in a compressed format, Hadoop will have to be able to split the file such that each split can be decompressed by the map tasks independently. Otherwise parallelism is destroyed if Hadoop has to decompress the file as a whole first. Not all compressed file formats are designed for splitting and decompressing in chunks. Sequence files were specially developed to support this feature. The file format provides sync markers to Hadoop to denote splittable boundaries. 

In addition to its compressability and splittability, sequence files support binary keys and values. Therefore, a sequence file is often used for processing binary documents, such as images, and it works great for text documents and other large key/value objects as well. Each document is considered a record within the sequence file. 

You can make a MapReduce job output a sequence file by setting its output format to SequenceFileOutputFormat. You’ll want to change its compression type from the default RECORD to BLOCKWith record compression, each record is compressed separately. With block compression, a block of records is compressed together and achieves a higher compression ratio. Finally, you have to call the static methods setCompressOutput() and setOutputCompressorClass() inFileOutputFormat (or SequenceFileOutputFormat, which inherits those methods) to enable output compression using a specific codec. The supported codecs are the same as those given in table 6.4. You add these lines to the driver: 
  1. /*  
  2. * There is no need to call setInputFormatClass, because the input  
  3. * file is a text file. However, the output file is a SequenceFile.  
  4. * Therefore, we must call setOutputFormatClass.  
  5. */    
  6. job.setOutputFormatClass(SequenceFileOutputFormat.class);    
  7.   
  8. /*  
  9. * Set the compression options.  
  10. */    
  11.     
  12. /*  
  13. * Compress the output  
  14. */    
  15. FileOutputFormat.setCompressOutput(job, true);    
  16.     
  17. /*  
  18. * Use Snappy compression  
  19. */    
  20. FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);    
  21. /*  
  22. * Use block compression  
  23. */    
  24. SequenceFileOutputFormat.setOutputCompressionType(job,  CompressionType.BLOCK);  
Table 6.5 lists the equivalent properties for configuring for sequence file output. A Streaming program can output sequence files when given the following options: 
-outputformat org.apache.hadoop.mapred.SequenceFileOutputFormat
-D mapred.output.compression.type=BLOCK
-D mapred.output.compress=true
-D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCode

 

To read a sequence file as input, set the input format to SequenceFileInputFormat. Use 
  1. job.setInputFormatClass(SequenceFileInputFormat.class);    
Or 
-inputformat org.apache.hadoop.mapred.SequenceFileInputFormat

for Streaming . There’s no need to configure the compression type or codec class, as the SequenceFile.Reader class (used by SequenceFileRecordReader) will automatically determine those settings from the file header. 

Reusing the JVM 
By default, the TaskTracker runs each Mapperand Reducer task in a separate JVM as a child process. This necessarily incurs the JVM start-up cost for each task. If the mapper does its own initialization, such as reading into memory a large data structure (see the example of joining using distributed cache in section 5.2.2), that initialization is part of the start-up cost as well. If each task runs only briefly, or if the mapper initialization takes a long time, then the start-up cost can be a significant portion of a task’s total run time. 

Starting with version 0.19.0, Hadoop allows the reuse of a JVM across multiple tasks of the same job. The start-up cost can, therefore, be amortized across many tasks. A new property, mapred.job.reuse.jvm.num.tasks, specifies the maximum number of tasks (of the same job) a JVM can run. The default value is 1; JVM is not reused. You can enable JVM reuse by setting the property to a higher number. You can also set it to -1, which means there’s no limit to the number of tasks a JVM can be reused for. The JobConf object has a convenience method, setNumTasksToExecutePerJvm(int), to set the property for a job. This is summarized in table 6.6. 
 

Running with speculative execution 
One of the original design assumptions of MapReduce (as stated in the Google MapReduce paper) is that nodes are unreliable and the framework must handle the situation where some nodes fail in the middle of a job. Under this assumption, the original MapReduce framework specifies the map tasks and the reduce tasks to be idempotent. This means that when a task fails, Hadoop can restart that task and the overall job will end up with the same result. Hadoop can monitor the health of running nodes and restart tasks on failed nodes automatically. This makes fault tolerance transparent to the developer. 

Often nodes don’t suddenly fail but experience slowdown as I/O devices go bad. In such situations everything works but the tasks run slower. Sometimes tasks also run slow because of temporary congestion. This doesn’t affect the correctness of the running job but certainly affects its performance. Even one slow-running task will delay the completion of a MapReduce job. Until all mappers have finished, none of the reducers will start running. Similarly, a job is not considered finished until all the reducers have finished. 

Hadoop uses the idempotency property again to mitigate the slow-task problem. Instead of restarting a task only after it has failed, Hadoop will notice a slow-running task and schedule the same task to be run in another node in parallel. Idempotency guarantees the parallel task will generate the same output. Hadoop will monitor the parallel tasks. As soon as one finishes successfully, Hadoop will use its output and kill the other parallel tasks. This entire process is calledspeculative execution

Note that speculative execution of map tasks will take place only after all map tasks have been scheduled to run, and only for map tasks that are making much less progress than is average on the other map tasks. It’s the same case for speculative execution of reduce tasks. Speculative execution does not “race” multiple copies of a task to get the best completion time. It only prevents the slow tasks from dragging down the job’s completion time. 

By default, speculative execution is enabled. One can turn it off for map tasks and reduce tasks separately. To do this, set one or both of the properties in table 6.7 to false. They’re applied on a per-job basis, but you can also change the cluster-wide default by setting them in the cluster configuration file
 

You should leave speculative execution on in general. The primary reason to turn it off is if your map tasks or reduce tasks have side effects and are therefore not idempotent. For example, if a task writes to external files, speculative execution can cause multiple copies of a task to collide in attempting to create the same external files. You can turn off speculative execution to ensure that only one copy of a task is being run at a time. 

Refactoring code and rewriting algorithms 
If you’re willing to rewrite your MapReduce programs to optimize performance, some straightforward techniques and some nontrivial, application-dependent rewritings can speed things up. 

One straightforward technique for a Streaming program is to rewrite it for Hadoop Java. Streaming is great for quickly creating a MapReduce job for ad hoc data analysis, but it doesn’t run as fast as Java under Hadoop. Streaming jobs that start out as one-off queries but end up being run frequently can gain from a Java re-implementation. 

If you have several jobs that run on the same input data, there are probably opportunities to rewrite them into fewer jobs. For example, if you’re computing the maximum as well as the minimum of a data set , you can write a single MapReduce job that computes both rather than compute them separately using two different jobs. This may sound obvious, but in practice many jobs are originally written to do one function well. This is a good design practice. A job’s conciseness makes it widely applicable to different data sets for different purposes. Only after some usage should you start looking for job groupings that you can rewrite to be faster. 

One of the most important things you can do to speed up a MapReduce program is to think hard about the underlying algorithm and see if a more efficient algorithm can compute the same results faster. This is true for any programming, but it is more significant for MapReduce programs. Standard text books on algorithm and data structure (sorting, lists, maps, etc.) comprehensively cover design choices for most traditional programming. Hadoop programs, on the other hand, tend to touch on “exotic” areas, such as distributed computing, functional programming, statistics, and data-intensive processing, where best practices are less known to most programmers and there is still exciting research today to explore new approaches. 

One example we’ve already seen that leverages a new data structure to speed up MapReduce programs is the use of Bloom filters in semijoins (section 5.3). The Bloom filter is well-known in the distributed computing community but relatively unknown outside of it. 

Supplement 
Exercise14 - Using SequenceFiles and File Compression (P49)

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...