程式扎記: [CCDH] Exercise14 - Using SequenceFiles and File Compression (P49)

標籤

2014年12月21日 星期日

[CCDH] Exercise14 - Using SequenceFiles and File Compression (P49)

Preface 
Files and Directories Used in this Exercise 
Eclipse project: createsequencefile
Java files:
CreateSequenceFile.java (A driver that converts a text file to a sequence file)
ReadCompressedSequenceFile.java (A driver that converts a compressed sequence file to text)

Test data (HDFS):
weblog (full web server access log)

Exercise directory: ~/workspace/createsequencefile

In this exercise you will practice reading and writing uncompressed and compress SequenceFiles. 

First, you will develop a MapReduce application to convert text data to a SequenceFile. Then you will modify the application to compress the SequenceFile using Snappy file compression. When creating the SequenceFile, use the full access log file for input data. 

After you have created the compressed SequenceFile, you will write a second MapReduce application to read the compressed SequenceFile and write a text file that contains the original log file text. 

Lab Experiment 
Write a MapReduce program to create sequence files from text files 
1. Determine the number of HDFS blocks occupied by the access log file: 
a. In a browser window, start the Name Node Web UI - http://localhost:50070
b. Click "Browse the filesystem"
c. Navigate to the /user/training/weblog/access_log file.
d. Scroll down to the bottom of the page. The total number of blocks occupied by the access log file appears in the browser window.

 

2. Refer to the solution in the createsequencefile project to read the access log file and create a SequenceFile. Records emitted to theSequenceFile can have any key you like, but the values should match the text in the access log file. 
  1. package solution;  
  2.   
  3. import org.apache.hadoop.fs.Path;  
  4. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  5. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  6. import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;  
  7. import org.apache.hadoop.mapreduce.Job;  
  8.   
  9. import org.apache.hadoop.conf.Configured;  
  10. import org.apache.hadoop.conf.Configuration;  
  11. import org.apache.hadoop.util.Tool;  
  12. import org.apache.hadoop.util.ToolRunner;  
  13.   
  14. public class CreateUncompressedSequenceFile extends Configured implements Tool {  
  15.   
  16.   @Override  
  17.   public int run(String[] args) throws Exception {  
  18.   
  19.     if (args.length != 2) {  
  20.       System.out.printf("Usage: CreateUncompressedSequenceFile \n");  
  21.       return -1;  
  22.     }  
  23.   
  24.     Job job = new Job(getConf());  
  25.     job.setJarByClass(CreateUncompressedSequenceFile.class);  
  26.     job.setJobName("Create Uncompressed Sequence File");  
  27.   
  28.     FileInputFormat.setInputPaths(job, new Path(args[0]));  
  29.     FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  30.   
  31.     /* 
  32.      * There is no need to call setInputFormatClass, because the input 
  33.      * file is a text file. However, the output file is a SequenceFile. 
  34.      * Therefore, we must call setOutputFormatClass. 
  35.      */  
  36.     job.setOutputFormatClass(SequenceFileOutputFormat.class);  
  37.   
  38.     /* 
  39.      * This is a map-only job that uses the default (identity mapper), so we do not need to set 
  40.      * the mapper or reducer classes.  We just need to set the number of reducers to 0. 
  41.      */  
  42.     job.setNumReduceTasks(0);  
  43.   
  44.     boolean success = job.waitForCompletion(true);  
  45.     return success ? 0 : 1;  
  46.   }  
  47.   
  48.   public static void main(String[] args) throws Exception {  
  49.     int exitCode = ToolRunner.run(new Configuration(), new CreateUncompressedSequenceFile(), args);  
  50.     System.exit(exitCode);  
  51.   }  
  52. }  

3. Build and test your solution so far, Use the access log as input data, and specify the uncompressdsf directory for output. 
$ ant -f build.xml # Build the project and output createsequencefile.jar
$ hadoop jar createsequencefile.jar solution.CreateUncompressedSequenceFile weblog uncompressdsf
$ hadoop fs -ls uncompressedsf # 8 part files should be generated from 8 mapper output
...
-rw-r--r-- 1 training supergroup 77517687 2014-12-21 02:58 uncompressedsf/part-m-00000
-rw-r--r-- 1 training supergroup 77517464 2014-12-21 02:58 uncompressedsf/part-m-00001
-rw-r--r-- 1 training supergroup 77448148 2014-12-21 02:59 uncompressedsf/part-m-00002
-rw-r--r-- 1 training supergroup 77286206 2014-12-21 02:59 uncompressedsf/part-m-00003
-rw-r--r-- 1 training supergroup 77366617 2014-12-21 03:00 uncompressedsf/part-m-00004
-rw-r--r-- 1 training supergroup 77465310 2014-12-21 03:00 uncompressedsf/part-m-00005
-rw-r--r-- 1 training supergroup 77424243 2014-12-21 03:01 uncompressedsf/part-m-00006
-rw-r--r-- 1 training supergroup 40614390 2014-12-21 03:01 uncompressedsf/part-m-00007

4. Examine the initial portion of the output SequenceFile using the following command: 
$ hadoop fs -cat uncompressedsf/part-m-00000 | less

Some of the data in the SequenceFile is unreadable, but parts of them should be recognizable: 
* The string SEQ, which appears at the beginning of a SequenceFile.
* The Java classes for the keys and values
* Text from the access log file.

5. Verify that the number of files created by the job is equivalent to the number of blocks required to store the uncompressed SequenceFile. 

Compress The Output 
6. Modify the MapRedece job to compress the output SequenceFile. Add statements to your driver to configure the output as follows: 
* Compress the output file
* Use block compression.
* Use the Snappy compression codec.

  1. package solution;  
  2.   
  3. import org.apache.hadoop.fs.Path;  
  4. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  5. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  6. import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;  
  7. import org.apache.hadoop.io.SequenceFile.CompressionType;  
  8. import org.apache.hadoop.io.compress.SnappyCodec;  
  9. import org.apache.hadoop.mapreduce.Job;  
  10.   
  11. import org.apache.hadoop.conf.Configured;  
  12. import org.apache.hadoop.conf.Configuration;  
  13. import org.apache.hadoop.util.Tool;  
  14. import org.apache.hadoop.util.ToolRunner;  
  15.   
  16. public class CreateCompressedSequenceFile extends Configured implements Tool {  
  17.   
  18.   @Override  
  19.   public int run(String[] args) throws Exception {  
  20.   
  21.     if (args.length != 2) {  
  22.       System.out.printf("Usage: CreateCompressedSequenceFile \n");  
  23.       return -1;  
  24.     }  
  25.   
  26.     Job job = new Job(getConf());  
  27.     job.setJarByClass(CreateCompressedSequenceFile.class);  
  28.     job.setJobName("Create Compressed Sequence File");  
  29.   
  30.     FileInputFormat.setInputPaths(job, new Path(args[0]));  
  31.     FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  32.   
  33.     /* 
  34.      * There is no need to call setInputFormatClass, because the input 
  35.      * file is a text file. However, the output file is a SequenceFile. 
  36.      * Therefore, we must call setOutputFormatClass. 
  37.      */  
  38.     job.setOutputFormatClass(SequenceFileOutputFormat.class);  
  39.   
  40.     /* 
  41.      * Set the compression options. 
  42.      */  
  43.       
  44.     /* 
  45.      * Compress the output 
  46.      */  
  47.     FileOutputFormat.setCompressOutput(job, true);  
  48.       
  49.     /* 
  50.      * Use Snappy compression 
  51.      */  
  52.     FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);  
  53.     /* 
  54.      * Use block compression 
  55.      */  
  56.     SequenceFileOutputFormat.setOutputCompressionType(job,  
  57.         CompressionType.BLOCK);  
  58.   
  59.     /* 
  60.      * This is a map-only job that uses the default (identity mapper), so we do not need to set 
  61.      * the mapper or reducer classes.  We just need to set the number of reducers to 0. 
  62.      */  
  63.     job.setNumReduceTasks(0);  
  64.   
  65.     boolean success = job.waitForCompletion(true);  
  66.     return success ? 0 : 1;  
  67.   }  
  68.   
  69.   public static void main(String[] args) throws Exception {  
  70.     int exitCode = ToolRunner.run(new Configuration(), new CreateCompressedSequenceFile(), args);  
  71.     System.exit(exitCode);  
  72.   }  
  73. }  
7. Compile the code and run your modified MapReduce job. For the MapReduce output, specify the compressdsf directory. 
$ hadoop jar createsequencefile.jar solution.CreateCompressedSequenceFile weblog compressdsf
$ hadoop fs -ls compressdsf
...
-rw-r--r-- 1 training supergroup 16820906 2014-12-21 05:44 compressdsf/part-m-00000
...

8. Examine the first portion of the output SequenceFile. Notice the differences between the uncompressed and compressed SequenceFiles: 
* The compressed SequenceFile specifies the org.apache.hadoop.io.compress.SnappyCodec compression codec in its header.
* You cannot read the log file text in the compressed file.

9. Compare the file size of the uncompressed and compressed SequenceFiles in the uncompressdsf and compressdsf directories. The compresssed SequenceFiles should be smaller. 

Write Another MapReduce Program To UnCompress The Files 
10. Write a MapReduce to read the compressed log file and write a text file. This text file should have the same text data as the log file, plus keys. The keys can contain any values you like. 
  1. package solution;  
  2.   
  3. import org.apache.hadoop.fs.Path;  
  4. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  5. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  6. import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;  
  7. import org.apache.hadoop.mapreduce.Job;  
  8.   
  9. import org.apache.hadoop.conf.Configured;  
  10. import org.apache.hadoop.conf.Configuration;  
  11. import org.apache.hadoop.util.Tool;  
  12. import org.apache.hadoop.util.ToolRunner;  
  13.   
  14. public class ReadCompressedSequenceFile extends Configured implements Tool {  
  15.   
  16.   @Override  
  17.   public int run(String[] args) throws Exception {  
  18.   
  19.     if (args.length != 2) {  
  20.       System.out  
  21.           .printf("Usage: ReadCompressedSequenceFile \n");  
  22.       return -1;  
  23.     }  
  24.   
  25.     Job job = new Job(getConf());  
  26.     job.setJarByClass(ReadCompressedSequenceFile.class);  
  27.     job.setJobName("Read Compressed Sequence File");  
  28.   
  29.     FileInputFormat.setInputPaths(job, new Path(args[0]));  
  30.     FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  31.   
  32.     /* 
  33.      * We are using a SequenceFile as the input file. 
  34.      * Therefore, we must call setInputFormatClass. 
  35.      * There is no need to call setOutputFormatClass, because the 
  36.      * application uses a text file on output. 
  37.      */  
  38.     job.setInputFormatClass(SequenceFileInputFormat.class);  
  39.   
  40.     /* 
  41.      * There is no need to set compression options for the input file. 
  42.      * The compression implementation details are encoded within the 
  43.      * input SequenceFile.     
  44.      */  
  45.   
  46.     /* 
  47.      * This is a map-only job that uses the default (identity mapper), so we do not need to set 
  48.      * the mapper or reducer classes.  We just need to set the number of reducers to 0. 
  49.      */  
  50.     job.setNumReduceTasks(0);  
  51.   
  52.     boolean success = job.waitForCompletion(true);  
  53.     return success ? 0 : 1;  
  54.   }  
  55.   
  56.   public static void main(String[] args) throws Exception {  
  57.     int exitCode = ToolRunner.run(new Configuration(), new ReadCompressedSequenceFile(), args);  
  58.     System.exit(exitCode);  
  59.   }  
  60. }  
11. Compile the code and run your MapReduce job. For the MapReduce input, specify the compressdsf directory in which you created the compressed SequenceFile in the previous section. For the MapReduce output, specify the compresseddsftotext directory: 
$ hadoop jar createsequencefile.jar solution.ReadCompressedSequenceFile compressdsf compresseddsftotext

12. Examine the first portion of the output in the compresseddsftotext directory. You should be able to read the texual log file entries. 
$ hadoop fs -cat compresseddsftotext/part-m-00000 | less

Optional: Use Command Line Options To Control Compression 
13. If you used ToolRunner for your driver, you can control compressing using command line arguments. Try commenting out the code in your driver where you can setCompressOutput. Then test setting the mapred.output.compressed option on the command line, e.g.: 
$ hadoop jar createsequencefile.jar solution.CreateUncompressedSequenceFile \
-Dmapred.output.compressed=true \
weblog outdir

14. Review the output to confirm the files are compressed.

沒有留言:

張貼留言

網誌存檔