程式扎記: [CCDH] Class3 - Programming with Hadoop Core API (2)

標籤

2014年12月1日 星期一

[CCDH] Class3 - Programming with Hadoop Core API (2)

Accessing HDFS programmatically 
In addition to using the command-line shell, you can access HDFS programmatically. 
* Useful if your code needs to read or write 'side data' in addition to the standard MapReduce inputs and outputs
* Or for programs outside of Hadoop which need to read the results of MapReduce jobs

Beware that HDFS is not a general-purpose filesystem (Files cannot be modified once they have been written, for example)! Hadoop provides the FileSystemabstract base class which provides an API to generic file sytems: 
* Could be HDFS
* Could be your local file system
* Could even be, for example, Amazon S3

The FileSystem API 
In order to use the FileSystem API, retrieve an instance of it first: 
  1. Configuration conf = new Configuration();  
  2. FileSystem fs = FileSystem.get(conf);  
The conf object has read in the Hadoop configuration files, and therefore knows the address of the NameNode. A file in HDFS is represented by a Path object. 
  1. Path p = new Path("/path/to/my/file");  
There are still some useful API methods: 
FSDataInputStream create(...) - Provide methods for writing primitives, raw bytes etc.
FSDataInputStream open(...) - Provide methods for reading primitives, raw bytes etc.
* boolean delete(...) - Delete a file
* boolean mkdirs(...)
* void copyFromLocalFile(...)
* void copyToLocalFile(...)
FileStatus[] listStatus(...)

Directory Listing 
Get a directory listing: 
  1. Path p = new Path("/my/path");  
  2. Configuration conf = new Configuration();  
  3. FileSystem fs = FileSystem.get(conf);  
  4. FileStatus[] fileStats = fs.listStatus(p);  
  5.   
  6. for(int i=0; i < fileStats.length; i++)  
  7. {  
  8.     Path f = fileStats[i].getPath();  
  9.     // do something   
  10. }  
Writing Data 
Write data to a file 
  1. Configuration conf = new Configuration();  
  2. FileSystem fs = FileSystem.get(conf);  
  3.   
  4. Path p = new Path("/my/path/foo");  
  5.   
  6. FSDataDataputStream out = fs.create(p, false); false means not overwrite   
  7.   
  8. // write some raw bytes  
  9. out.write(...);  
  10.   
  11. // write an int  
  12. out.writeInt(...);  
  13.   
  14. ...  
  15. out.close();  
Using the Distributed Cache (P289) 
A common requirement is for a Mapper/Reducer to access some "side data": 
* Lookup tables
* Distionaries
* Standard configuration values

One option is to read directly from HDFS in the setup method: 
* Using the API seen in the previous section
* Works, but is not scalable

The Distributed Cache provides an API to push data to all slave nodes: 
* Transfer happens behind the scenes before any task is executed
* Data is only transferred once to each node
* Distributed Cache is read-only
* Files in the Distributed Cache are automatically deleted from slave nodes when the job finishes.

The Difficulty Way 
You can place the files into HDFS and configure the DistributedCache in your driver code: 
  1. Configuration conf = new Configuration();  
  2. DistributedCache.addCacheFile(new URI("/myapp/lookup.dat"), conf);  
  3. DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), conf);  
  4. DistributedCache.addCacheArchive(new URL("/myapp/map.zip", conf));  
  5. DistributedCache.addCacheArchive(new URL("/myapp/map.tar", conf));  

* .jar files added with addFileToClassPath will be added to your Mapper or Reducer's classpath
* Files added with addCacheArchive will automatically be dearchived/decompressed

The Easy Way 
If you are using ToolRunner, you can add files to the Distributed Cache directly from the command line when you run the job (No need to copy the files to HDFS first). Just using the -files option to add files. For example: 
$ hadoop jar myjar.jar MyDriver -files file1, file2, file3, ...

The -archives flag adds archived files, and automatically unarchives them on the destination machines. The -libjars flags adds jar files to the classpath. 

Accessing Files in the Distributed Cache 
Files added to the Distributed Cache are made available in your task's local working directory. You can access them from your Mapper or Reducer the way you would read any ordinary local file likes: 
  1. File f = new File("file_name_here");  
Reusable Classes for the New API 
The org.apache.hadoop.madpreduce.lib.* packages contain a library of Mappers/Reducers, and Partitioners supporting the new API. 
* org.apache.hadoop.mapreduce.lib.aggregate 
* org.apache.hadoop.mapreduce.lib.chain 
* org.apache.hadoop.mapreduce.lib.db
* org.apache.hadoop.mapreduce.lib.fieldsel
* org.apache.hadoop.mapreduce.lib.input 
* org.apache.hadoop.mapreduce.lib.jobcontrol
* org.apache.hadoop.mapreduce.lib.join
* org.apache.hadoop.mapreduce.lib.map
* org.apache.hadoop.mapreduce.lib.output 
* org.apache.hadoop.mapreduce.lib.partition
* org.apache.hadoop.mapreduce.lib.reduce

Example classes: 
InverseMapper - Swaps keys and values
RegexMapper - Extract text based on a regular expression
IntSumReducerLongSumReducer - Add up all values for a key
TotalOrderPartitioner - Reads a previously-created partition files and partitions based on the data from that file which allow you to partition your data into n partitions without hardcoding the partitioning information.

Practical Development Tips and Techniques 
In this chapter you will learn 
* Strategies for debugging MapReduce code
* How to test MapReduce code locally using LocalJobRunner
* How to write and view log files
* How to retrieve job information with counters
* Why reusing objects is a best practice
* How to create Map-only MapReduce jobs.

Introduction to Debugging 
Debugging MapReduce code is difficult! 
* Each instance of a Mapper runs as a separate task - Often on a different machine
* Difficult to attach a debugger to the process
* Difficult to cache 'edge cases'

Very large volumes of data mean that unexpected input is likely to appear. Code with expects all data to be well-formed is likely to fail! 

Common-Sense Debugging Tips 
* Code defensively 
- Ensure that input data is in the expected format
- Expect things to go wrong
- Cache exceptions

* Start small and build incrementally. Make as much of your code as possible Hadoop-agnostic (Make it easier to test). 
* Write unit tests 
* Test locally whenever possible - With small amount of data 
* Then test in pseudo-distributed mode 
* Finally, test on the cluster. 

Testing Strategies 
When testing in pseudo-distributed mode, ensure that you are testing with a similar environment to that on the real cluster. 
* Same amount of RAM allocated to the tasks JVM. 
* Same version of Hadoop 
* Same version of Java 
* Same versions of third-party libraries 

Testing MapReduce Code Locally Using LocalJobRunner 
Hadoop can run MapReducer in a single, local process which doesn't require any Hadoop daemons to be running. It uses the local filesystem instead of HDFS and known as LocalJobRunner mode. This is a very useful way of quickly testing incremental changes to code. 

To run in LocalJobRunner mode, add the following lines to the driver code: 
  1. Configuration conf = new Configuration();  
  2. conf.set("mapred.job.tracker""local");  
  3. conf.set("fs.default.name""file:///");  
Or set these options on the command line if your driver uses ToolRunner: 
-fs is equivalent to -D fs.default.name (core-default.xml)
-jt is equivalent to -D mapred.job.tracker (mapred-default.xml)

For example: 
$ hadoop jar myjar.jar MyDriver -fs=file:/// -jt=local indir outdir

There are some limitations of LocalJobRunner mode: 
* Distributed Cache doesn't work!
* The job can only specify a single Reducer
* Some 'beginner' mistakes may not be caught - For example, attempting to share data between Mappers will work!

LocalJobRunner Mode in Eclipse 
Eclipse on the course VM runs Hadoop code in LocalJobRunner mode from within the IDE. This is Hadoop's default behavior when no configuration is provided. This allows rapid development iterations (Agile programming). 

Writing and Viewing Log Files 
Tried-and-true debugging technique: write to stdout or stderr. If running in LocalJobRunner mode, you will see the results of System.err.println(). If running on a cluster, that output will not appear on your console. (Output is visible via Hadoop's Web UI

All Hadoop daemons contain a Web server to expose information on a well-known port. Most important for developers is JobTracker: 
http://:50030/
http://localhost:50030/ (Pseudo-distributed mode)

Also useful for the NameNode Web UI: 
http://:50070

Turning println statements on and off in your code is tedious, and leads to errors. Logging provides much finer-grained control over: 
* What gets logged.
* When something gets logged.
* How something is logged.

Hadoop use Log4j to generate all its log files. Your Mapper/Reducer can also use Log4j which the initialization is handled by Hadoop. All you have to do is to add the Log4j jar library from your CDH distribution to your classpath when you reference the log4j classes. 
  1. import org.apache.log4j.Level;  
  2. import org.apache.log4j.Logger;  
  3.   
  4. class FooMapper implements Mapper   
  5. {  
  6.     private static final Logger LOGGER = Logger.getLogger(FooMapper.class.getName());  
  7.     ...  
  8. }  
Simply send strings to loggers tagged with different levels as you want: 
  1. LOGGER.trace("message");  
  2. LOGGER.debug("message");  
  3. LOGGER.info("message");  
  4. LOGGER.warn("message");  
  5. LOGGER.error("message");  
Beware expensive operations like concatenation. To avoid performance penalty, make it conditional likes this: 
  1. if (LOGGER.isDebugEnabled())  
  2. {  
  3.     LOGGER.debug("Account info: "+acct.getReport());  
  4. }  
Log4j Configuration 
Node-wide configuration for Log4j is stored in $HADOOP_HOME/etc/hadoop/conf/log4j.properties. You can override setting for your application in your own log4j.properties. 
* Can change global log settings with hadoop.root.log property
* Can override log level on a per-class basis, e.g.

* Or set the level programmatically:
  1. LOGGER.setLevel(Level.WARN);  

Setting Logging Levels for a Job 
You can tell Hadoop to set logging levels for a job using configuration properties 
* mapred.map.child.log.level
* mapred.reduce.child.log.level

For examples, set the logging level to DEBUG for the Mapper: 
$ hadoop jar myjob.jar MyDriver \
-Dmapred.map.child.log.level=DEBUG indir outdir

Set the logging level to WARN for the Reducer: 
$ hadoop jar myjob.jar MyDriver \
-Dmapred.reduce.child.log.level=WARN indir outdir

Where Are Log Files Stored? 
Log files are stored on the machine where the task attempt ran: 
* Location is configurable
* By default: /var/log/hadoop-xxx-mapreduce/userlogs/${task.id}/syslog

You my not have ssh access to a node to view its logs. Much easier to use the JobTracker Web UI which automatically retrieves and displays the log files. 

You could throw exceptions if a particular condition is met (For example, if illegal data is found). Exception cause the task to fail. If a task fails four times, the entire job will fail! And it it suggested to use try/catch to output the data catched in catch block rather than the entire data. 

Retrieving Job Information with Counters 
Counters provides a way for Mappers/Reducers to pass aggregate values back to the driver after the job has completed. Their values are also visible from the JobTracker's Web UI and are reported on the console when the job ends. 
* Very basic: just have a name and a value - Value can be incremented within the code
* Counters are collected into Groups - Within the group, each Counter has a name
* Example: A group of Counters called RecordType - Appropriate Counter can be incremented as each record is read in the Mapper.

Counters can be set and incremented via the method: 
  1. # context.getCounter(group, name).increment(amount);  
  2. context.getCounter("RecordType""A").increment(1);  
To retrieve Counters in the Driver code after the job is complete, use code like below in the driver: 
  1. long typeARecords = job.getCounters().findCounter("RecordType","A").getValue();  
  2. long typeBRecords = job.getCounters().findCounter("RecordType","B").getValue();  
Don't rely on a counter's value from the Web UI while a job is running due to possible speculative execution, a counter's value could appear larger than the actual final value. Modifications to counters from subsequently killed/failed tasks will be removed from the final count. 

Reusing Objects 
It is generally good practice to reuse objects - Instead of creating many new objects. For example: 
 

Instead, it is better this way: 
 

Creating Map-only MapReduce jobs 
There are many types of job where only a Mapper is needed. For example: 
* Image processing
* File format conversion
* Input data sampling
* ETL

To create a Map-only job, set the number of Reducers to 0 in the Driver code: 
  1. job.setNumReducerTasks(0);  
Call the Job.setOutputKeyClass and Job.setOutputValueClass methods to specify the output types. Anything written using the Context.write method in the Mapper will be written to HDFS rather written as intermediate data. One file per Mapper will be written

Supplement 
Exercise9 - Testing with LocalJobRunner (P34) 
Exercise10 - Logging (P38) 
Exercise11 - Using Counters and a Map-Only Job (P41) 
Blog: Hadoop 參數設定 – mapred-site.xml

沒有留言:

張貼留言

網誌存檔

關於我自己

我的相片
Where there is a will, there is a way!