In addition to using the command-line shell, you can access HDFS programmatically.
Beware that HDFS is not a general-purpose filesystem (Files cannot be modified once they have been written, for example)! Hadoop provides the FileSystemabstract base class which provides an API to generic file sytems:
The FileSystem API
In order to use the FileSystem API, retrieve an instance of it first:
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(conf);
- Path p = new Path("/path/to/my/file");
Directory Listing
Get a directory listing:
- Path p = new Path("/my/path");
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(conf);
- FileStatus[] fileStats = fs.listStatus(p);
- for(int i=0; i < fileStats.length; i++)
- {
- Path f = fileStats[i].getPath();
- // do something
- }
Write data to a file
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(conf);
- Path p = new Path("/my/path/foo");
- FSDataDataputStream out = fs.create(p, false); false means not overwrite
- // write some raw bytes
- out.write(...);
- // write an int
- out.writeInt(...);
- ...
- out.close();
A common requirement is for a Mapper/Reducer to access some "side data":
One option is to read directly from HDFS in the setup method:
The Distributed Cache provides an API to push data to all slave nodes:
The Difficulty Way
You can place the files into HDFS and configure the DistributedCache in your driver code:
- Configuration conf = new Configuration();
- DistributedCache.addCacheFile(new URI("/myapp/lookup.dat"), conf);
- DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), conf);
- DistributedCache.addCacheArchive(new URL("/myapp/map.zip", conf));
- DistributedCache.addCacheArchive(new URL("/myapp/map.tar", conf));
The Easy Way
If you are using ToolRunner, you can add files to the Distributed Cache directly from the command line when you run the job (No need to copy the files to HDFS first). Just using the -files option to add files. For example:
The -archives flag adds archived files, and automatically unarchives them on the destination machines. The -libjars flags adds jar files to the classpath.
Accessing Files in the Distributed Cache
Files added to the Distributed Cache are made available in your task's local working directory. You can access them from your Mapper or Reducer the way you would read any ordinary local file likes:
- File f = new File("file_name_here");
The org.apache.hadoop.madpreduce.lib.* packages contain a library of Mappers/Reducers, and Partitioners supporting the new API.
Example classes:
Practical Development Tips and Techniques
In this chapter you will learn
Introduction to Debugging
Debugging MapReduce code is difficult!
Very large volumes of data mean that unexpected input is likely to appear. Code with expects all data to be well-formed is likely to fail!
Common-Sense Debugging Tips
* Code defensively
* Start small and build incrementally. Make as much of your code as possible Hadoop-agnostic (Make it easier to test).
* Write unit tests
* Test locally whenever possible - With small amount of data
* Then test in pseudo-distributed mode
* Finally, test on the cluster.
Testing Strategies
When testing in pseudo-distributed mode, ensure that you are testing with a similar environment to that on the real cluster.
* Same amount of RAM allocated to the tasks JVM.
* Same version of Hadoop
* Same version of Java
* Same versions of third-party libraries
Testing MapReduce Code Locally Using LocalJobRunner
Hadoop can run MapReducer in a single, local process which doesn't require any Hadoop daemons to be running. It uses the local filesystem instead of HDFS and known as LocalJobRunner mode. This is a very useful way of quickly testing incremental changes to code.
To run in LocalJobRunner mode, add the following lines to the driver code:
- Configuration conf = new Configuration();
- conf.set("mapred.job.tracker", "local");
- conf.set("fs.default.name", "file:///");
For example:
There are some limitations of LocalJobRunner mode:
LocalJobRunner Mode in Eclipse
Eclipse on the course VM runs Hadoop code in LocalJobRunner mode from within the IDE. This is Hadoop's default behavior when no configuration is provided. This allows rapid development iterations (Agile programming).
Writing and Viewing Log Files
Tried-and-true debugging technique: write to stdout or stderr. If running in LocalJobRunner mode, you will see the results of System.err.println(). If running on a cluster, that output will not appear on your console. (Output is visible via Hadoop's Web UI)
All Hadoop daemons contain a Web server to expose information on a well-known port. Most important for developers is JobTracker:
Also useful for the NameNode Web UI:
Turning println statements on and off in your code is tedious, and leads to errors. Logging provides much finer-grained control over:
Hadoop use Log4j to generate all its log files. Your Mapper/Reducer can also use Log4j which the initialization is handled by Hadoop. All you have to do is to add the Log4j jar library from your CDH distribution to your classpath when you reference the log4j classes.
- import org.apache.log4j.Level;
- import org.apache.log4j.Logger;
- class FooMapper implements Mapper
- {
- private static final Logger LOGGER = Logger.getLogger(FooMapper.class.getName());
- ...
- }
- LOGGER.trace("message");
- LOGGER.debug("message");
- LOGGER.info("message");
- LOGGER.warn("message");
- LOGGER.error("message");
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Account info: "+acct.getReport());
- }
Node-wide configuration for Log4j is stored in $HADOOP_HOME/etc/hadoop/conf/log4j.properties. You can override setting for your application in your own log4j.properties.
Setting Logging Levels for a Job
You can tell Hadoop to set logging levels for a job using configuration properties
For examples, set the logging level to DEBUG for the Mapper:
Set the logging level to WARN for the Reducer:
Where Are Log Files Stored?
Log files are stored on the machine where the task attempt ran:
You my not have ssh access to a node to view its logs. Much easier to use the JobTracker Web UI which automatically retrieves and displays the log files.
You could throw exceptions if a particular condition is met (For example, if illegal data is found). Exception cause the task to fail. If a task fails four times, the entire job will fail! And it it suggested to use try/catch to output the data catched in catch block rather than the entire data.
Retrieving Job Information with Counters
Counters provides a way for Mappers/Reducers to pass aggregate values back to the driver after the job has completed. Their values are also visible from the JobTracker's Web UI and are reported on the console when the job ends.
Counters can be set and incremented via the method:
- # context.getCounter(group, name).increment(amount);
- context.getCounter("RecordType", "A").increment(1);
- long typeARecords = job.getCounters().findCounter("RecordType","A").getValue();
- long typeBRecords = job.getCounters().findCounter("RecordType","B").getValue();
Reusing Objects
It is generally good practice to reuse objects - Instead of creating many new objects. For example:
Instead, it is better this way:
Creating Map-only MapReduce jobs
There are many types of job where only a Mapper is needed. For example:
To create a Map-only job, set the number of Reducers to 0 in the Driver code:
- job.setNumReducerTasks(0);
Supplement
* Exercise9 - Testing with LocalJobRunner (P34)
* Exercise10 - Logging (P38)
* Exercise11 - Using Counters and a Map-Only Job (P41)
* Blog: Hadoop 參數設定 – mapred-site.xml
沒有留言:
張貼留言