Writing a MapReduce Program Using Streaming
In this chapter you will learn how to write MapReduce programs using Hadoop Streaming.
The Streaming API
Many organizations have developers skilled in languages other than Java, such as Ruby, Python and Perl. The Streaming API allows developers to use any language they wish to write Mappers and Reducers as long as the selected language can read from standard input and write to standard output.
Advantages and Disadvantages
Advantages of the Streaming API:
Disadvantages of the Streaming API:
How Streaming Works
To implement streaming, write separate Mapper and Reducer programs in the language(s) of your choice:
If TextInputFormat (default) is used, the streaming Mapper just receives each line from the file on stdin (No key is passed); Mapper and Reducer output should be sent to stdout as - key [tab] value [newline] (Separators other than tab can be specified.)
Streaming Mapper Example
Take WordCount as example, you can use python to write mapper as below:
- mapper.py
Streaming Reducers Example
Recall that in Java, all the values associated with a key are passed to the Reducer as an Iterable. Using Hadoop Streaming, the Reducer receives its input as one key/value pair per line. Your code will have to keep track of the key so that it can detect when values from a new key start:
Example streaming wordcount Reducer:
- reducer.py
Launching a Streaming Job
To launch a Streaming job, use e.g.:
Many other command-line options are available (See the documentation for full details). Note that system commands can be used as a Streaming Mapper or Reducer. For example: awk, grep, sed or wc.
Unit Testing MapReduce Programs
In this chapter you will learn
Introduction to Unit Testing
A 'unit' is a small piece of your code which is a small piece of functionality. The unit test verifies the correctness of that unit of code.
Unit testing provides verification that your code is functioning correctly and much faster than testing your entire program each time you modify the code. For a fastest MapReduce job on a cluster will take many seconds. Even running in LocalJobRunner mode will take several seconds. Unit tests help you interate faster in your code development.
Why MRUnit?
JUnit is a popular Java unit testing framework. The problem with JUnit on our case is that JUnit cannot be used directly to test Mappers/Reducers. Unit tests require mocking up classes in the MapReduce framework. MRUnit is built on top of JUnit. It works with the mockito framework to provide required mock objects which allows you to test your code from within an IDE.
JUnit Basic Annotation
JUnit use below assert methods to confirm the testing result:
Writing unit tests with MRUnit
MRUnit builds on top of JUnit and provide a mock InputSplit and other classes. It can help to test just the Mapper, Reducer or the full MapReduce flow. Below is the sample unit test code which test only the Mapper functionality/Reducer functionality and MapReduce flow:
- TestWordCount.java
MRUnit has below methods to run tests (
Defined in TestDriver):
If you are calling driver.runTest() or driver.run() multiple times, call driver.resetOutput() between each call. Also you can execute the JUnit from console:
Supplement
* Unit Testing with JUnit - Tutorial
* Apache Hadoop 2.5.1 - Hadoop Streaming
* Maven repository - Apache Hadoop MapReduce Streaming Jar
In this chapter you will learn how to write MapReduce programs using Hadoop Streaming.
The Streaming API
Many organizations have developers skilled in languages other than Java, such as Ruby, Python and Perl. The Streaming API allows developers to use any language they wish to write Mappers and Reducers as long as the selected language can read from standard input and write to standard output.
Advantages and Disadvantages
Advantages of the Streaming API:
Disadvantages of the Streaming API:
How Streaming Works
To implement streaming, write separate Mapper and Reducer programs in the language(s) of your choice:
If TextInputFormat (default) is used, the streaming Mapper just receives each line from the file on stdin (No key is passed); Mapper and Reducer output should be sent to stdout as - key [tab] value [newline] (Separators other than tab can be specified.)
Streaming Mapper Example
Take WordCount as example, you can use python to write mapper as below:
- mapper.py
- #!/usr/bin/python
- import re
- import sys
- NONALPHA = re.compile("\W")
- for input in sys.stdin.readlines():
- for w in NONALPHA.split(input):
- if len(w) > 0:
- print w + '\t' + str(1)
Recall that in Java, all the values associated with a key are passed to the Reducer as an Iterable. Using Hadoop Streaming, the Reducer receives its input as one key/value pair per line. Your code will have to keep track of the key so that it can detect when values from a new key start:
Example streaming wordcount Reducer:
- reducer.py
- #!/usr/bin/python
- import sys
- wordcount=0
- key = None
- for input in sys.stdin.readlines():
- input = input.rstrip()
- parts = input.split("\t")
- if len(parts) < 2:
- continue
- newkey=parts[0]
- if not key:
- key = newkey
- if key != newkey:
- print key + "\t" + str(wordcount)
- key = newkey;
- wordcount = 0
- wordcount = wordcount + int(parts[1])
- if key != None:
- print key + "\t" + str(wordcount)
To launch a Streaming job, use e.g.:
Many other command-line options are available (See the documentation for full details). Note that system commands can be used as a Streaming Mapper or Reducer. For example: awk, grep, sed or wc.
Unit Testing MapReduce Programs
In this chapter you will learn
Introduction to Unit Testing
A 'unit' is a small piece of your code which is a small piece of functionality. The unit test verifies the correctness of that unit of code.
Unit testing provides verification that your code is functioning correctly and much faster than testing your entire program each time you modify the code. For a fastest MapReduce job on a cluster will take many seconds. Even running in LocalJobRunner mode will take several seconds. Unit tests help you interate faster in your code development.
Why MRUnit?
JUnit is a popular Java unit testing framework. The problem with JUnit on our case is that JUnit cannot be used directly to test Mappers/Reducers. Unit tests require mocking up classes in the MapReduce framework. MRUnit is built on top of JUnit. It works with the mockito framework to provide required mock objects which allows you to test your code from within an IDE.
JUnit Basic Annotation
JUnit use below assert methods to confirm the testing result:
Writing unit tests with MRUnit
MRUnit builds on top of JUnit and provide a mock InputSplit and other classes. It can help to test just the Mapper, Reducer or the full MapReduce flow. Below is the sample unit test code which test only the Mapper functionality/Reducer functionality and MapReduce flow:
- TestWordCount.java
- package stubs;
- import static org.junit.Assert.fail;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mrunit.mapreduce.MapDriver;
- import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
- import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
- import org.junit.Before;
- import org.junit.Test;
- public class TestWordCount {
- /*
- * Declare harnesses that let you test a mapper, a reducer, and
- * a mapper and a reducer working together.
- */
- MapDriver
mapDriver; - ReduceDriver
reduceDriver; - MapReduceDriver
mapReduceDriver; - /*
- * Set up the test. This method will be called before every test.
- */
- @Before
- public void setUp() {
- /*
- * Set up the mapper test harness.
- */
- WordMapper mapper = new WordMapper();
- mapDriver = new MapDriver
(); - mapDriver.setMapper(mapper);
- /*
- * Set up the reducer test harness.
- */
- SumReducer reducer = new SumReducer();
- reduceDriver = new ReduceDriver
(); - reduceDriver.setReducer(reducer);
- /*
- * Set up the mapper/reducer test harness.
- */
- mapReduceDriver = new MapReduceDriver
(); - mapReduceDriver.setMapper(mapper);
- mapReduceDriver.setReducer(reducer);
- }
- /*
- * Test the mapper.
- */
- @Test
- public void testMapper() {
- mapDriver.setInput(new LongWritable(1), new Text("a b c"));
- mapDriver.withOutput(new Text("a"), new IntWritable(1))
- .withOutput(new Text("b"), new IntWritable(1))
- .withOutput(new Text("c"), new IntWritable(1));
- mapDriver.runTest();
- }
- /*
- * Test the reducer.
- */
- @Test
- public void testReducer() {
- List
values = new ArrayList (); - values.add(new IntWritable(1));
- values.add(new IntWritable(1));
- reduceDriver.setInput(new Text("cat"), values);
- reduceDriver.withOutput(new Text("cat"), new IntWritable(2));
- reduceDriver.runTest();
- }
- /*
- * Test the mapper and reducer working together.
- */
- @Test
- public void testMapReduce() {
- mapReduceDriver.addInput(new LongWritable(1), new Text("a a b c c c"));
- mapReduceDriver.withOutput(new Text("a"), new IntWritable(2))
- .withOutput(new Text("b"), new IntWritable(1))
- .withOutput(new Text("c"), new IntWritable(3));
- mapReduceDriver.runTest();
- }
- }
If you are calling driver.runTest() or driver.run() multiple times, call driver.resetOutput() between each call. Also you can execute the JUnit from console:
Supplement
* Unit Testing with JUnit - Tutorial
* Apache Hadoop 2.5.1 - Hadoop Streaming
* Maven repository - Apache Hadoop MapReduce Streaming Jar
沒有留言:
張貼留言