2014年11月25日 星期二

[CCDH] Class2 - Writing Basic MapReduce Programs (2)

Writing a MapReduce Program Using Streaming
In this chapter you will learn how to write MapReduce programs using Hadoop Streaming.

The Streaming API
Many organizations have developers skilled in languages other than Java, such as Ruby, Python and Perl. The Streaming API allows developers to use any language they wish to write Mappers and Reducers as long as the selected language can read from standard input and write to standard output.

Advantages and Disadvantages
Advantages of the Streaming API:
* No need for non-Java coders to learn Java
* Fast development time - no need to learn Java language
* Ability to use existing code libraries of selected language.

Disadvantages of the Streaming API:
* Performance issue
* Primarily suited for handling data that can be represented as text.
* Streaming jobs can use excessive amounts of RAM or fork excessive numbers of processes.
* Although Mappers and Reducers can be written using the Streaming API, Partitioners, InputFormat etc, must still be written in Java.

How Streaming Works
To implement streaming, write separate Mapper and Reducer programs in the language(s) of your choice:
* They will receive input via stdin
* They should write their output to stdout.

If TextInputFormat (default) is used, the streaming Mapper just receives each line from the file on stdin (No key is passed); Mapper and Reducer output should be sent to stdout as - key [tab] value [newline] (Separators other than tab can be specified.)

Streaming Mapper Example
Take WordCount as example, you can use python to write mapper as below:
- mapper.py
  1. #!/usr/bin/python  
  2.   
  3. import re  
  4. import sys  
  5.   
  6. NONALPHA = re.compile("\W")  
  7.   
  8. for input in sys.stdin.readlines():  
  9.   for w in NONALPHA.split(input):  
  10.     if len(w) > 0:  
  11.       print w + '\t' + str(1)  
Streaming Reducers Example
Recall that in Java, all the values associated with a key are passed to the Reducer as an Iterable. Using Hadoop Streaming, the Reducer receives its input as one key/value pair per line. Your code will have to keep track of the key so that it can detect when values from a new key start:


Example streaming wordcount Reducer:
- reducer.py
  1. #!/usr/bin/python  
  2.   
  3. import sys  
  4.   
  5. wordcount=0  
  6.   
  7. key = None  
  8. for input in sys.stdin.readlines():  
  9.   input = input.rstrip()  
  10.   parts = input.split("\t")  
  11.   
  12.   if len(parts) < 2:  
  13.     continue  
  14.   
  15.   newkey=parts[0]  
  16.   
  17.   if not key:   
  18.     key = newkey  
  19.   
  20.   if key != newkey:  
  21.     print key + "\t" + str(wordcount)  
  22.     key = newkey;  
  23.     wordcount = 0  
  24.   
  25.   wordcount = wordcount + int(parts[1])  
  26.   
  27. if key != None:  
  28.   print key + "\t" + str(wordcount)  
Launching a Streaming Job
To launch a Streaming job, use e.g.:


Many other command-line options are available (See the documentation for full details). Note that system commands can be used as a Streaming Mapper or Reducer. For example: awk, grep, sed or wc.

Unit Testing MapReduce Programs
In this chapter you will learn
* What unit testing is, and why you should write unit tests
* What the JUnit testing framework is, and how MRUnit builds on the JUnit framework.
* How to write unit tests with MRUnit
* How to run unit tests.

Introduction to Unit Testing
A 'unit' is a small piece of your code which is a small piece of functionality. The unit test verifies the correctness of that unit of code.
* A purist might say that in a well-written unit test, only a single 'thing' should be able to fail.
* Generally accepted rule-of-thumb: A unit test should take less than second to complete.

Unit testing provides verification that your code is functioning correctly and much faster than testing your entire program each time you modify the code. For a fastest MapReduce job on a cluster will take many seconds. Even running in LocalJobRunner mode will take several seconds. Unit tests help you interate faster in your code development.

Why MRUnit?
JUnit is a popular Java unit testing framework. The problem with JUnit on our case is that JUnit cannot be used directly to test Mappers/Reducers. Unit tests require mocking up classes in the MapReduce framework. MRUnit is built on top of JUnit. It works with the mockito framework to provide required mock objects which allows you to test your code from within an IDE.

JUnit Basic Annotation
@Test: Indicates that this method is a test which JUnit should execute as test case.
@Before: Tells JUnit to call this method before every @Test method. (Two @Test methods would result in the @Before method being called twice.)
@After: This method is executed after each test. It is used to cleanup the test environment
@BeforeClass: This method is executed once, before the start of all tests. It is used to perform time intensive activities, for example, to connect to a database.
@AfterClass: This method is executed once, after all tests have been finished. It is used to perform clean-up activities, for example, to disconnect from a database.

JUnit use below assert methods to confirm the testing result:


Writing unit tests with MRUnit
MRUnit builds on top of JUnit and provide a mock InputSplit and other classes. It can help to test just the Mapper, Reducer or the full MapReduce flow. Below is the sample unit test code which test only the Mapper functionality/Reducer functionality and MapReduce flow:
- TestWordCount.java
  1. package stubs;  
  2.   
  3. import static org.junit.Assert.fail;  
  4.   
  5. import java.util.ArrayList;  
  6. import java.util.List;  
  7.   
  8. import org.apache.hadoop.io.IntWritable;  
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mrunit.mapreduce.MapDriver;  
  12. import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;  
  13. import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;  
  14. import org.junit.Before;  
  15. import org.junit.Test;  
  16.   
  17. public class TestWordCount {  
  18.   
  19.   /* 
  20.    * Declare harnesses that let you test a mapper, a reducer, and 
  21.    * a mapper and a reducer working together. 
  22.    */  
  23.   MapDriver mapDriver;  
  24.   ReduceDriver reduceDriver;  
  25.   MapReduceDriver mapReduceDriver;  
  26.   
  27.   /* 
  28.    * Set up the test. This method will be called before every test. 
  29.    */  
  30.   @Before  
  31.   public void setUp() {  
  32.   
  33.     /* 
  34.      * Set up the mapper test harness. 
  35.      */  
  36.     WordMapper mapper = new WordMapper();  
  37.     mapDriver = new MapDriver();  
  38.     mapDriver.setMapper(mapper);  
  39.   
  40.     /* 
  41.      * Set up the reducer test harness. 
  42.      */  
  43.     SumReducer reducer = new SumReducer();  
  44.     reduceDriver = new ReduceDriver();  
  45.     reduceDriver.setReducer(reducer);  
  46.   
  47.     /* 
  48.      * Set up the mapper/reducer test harness. 
  49.      */  
  50.     mapReduceDriver = new MapReduceDriver();  
  51.     mapReduceDriver.setMapper(mapper);  
  52.     mapReduceDriver.setReducer(reducer);  
  53.   }  
  54.   
  55.   /* 
  56.    * Test the mapper. 
  57.    */  
  58.   @Test  
  59.   public void testMapper() {  
  60.       mapDriver.setInput(new LongWritable(1), new Text("a b c"));  
  61.       mapDriver.withOutput(new Text("a"), new IntWritable(1))  
  62.                      .withOutput(new Text("b"), new IntWritable(1))  
  63.                      .withOutput(new Text("c"), new IntWritable(1));  
  64.       mapDriver.runTest();  
  65.   }  
  66.   
  67.   /* 
  68.    * Test the reducer. 
  69.    */  
  70.   @Test  
  71.   public void testReducer() {  
  72.       List values = new ArrayList();  
  73.       values.add(new IntWritable(1));  
  74.       values.add(new IntWritable(1));  
  75.       reduceDriver.setInput(new Text("cat"), values);  
  76.       reduceDriver.withOutput(new Text("cat"), new IntWritable(2));  
  77.       reduceDriver.runTest();  
  78.   }  
  79.   
  80.   
  81.   /* 
  82.    * Test the mapper and reducer working together. 
  83.    */  
  84.   @Test  
  85.   public void testMapReduce() {  
  86.       mapReduceDriver.addInput(new LongWritable(1), new Text("a a b c c c"));  
  87.       mapReduceDriver.withOutput(new Text("a"), new IntWritable(2))  
  88.                      .withOutput(new Text("b"), new IntWritable(1))  
  89.                      .withOutput(new Text("c"), new IntWritable(3));  
  90.       mapReduceDriver.runTest();  
  91.   }  
  92. }  
MRUnit has below methods to run tests (Defined in TestDriver):
* runTest: Runs the test and verifies the output
* run: Runs the test and returns the result set. It will ignores previous witOutput and addOutput calls.

If you are calling driver.runTest() or driver.run() multiple times, call driver.resetOutput() between each call. Also you can execute the JUnit from console:


Supplement
Unit Testing with JUnit - Tutorial
Apache Hadoop 2.5.1 - Hadoop Streaming
Hadoop streaming is a utility that comes with the Hadoop distribution. The utility allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer...

Maven repository - Apache Hadoop MapReduce Streaming Jar


沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...