Files and Directories Used in this Exercise
In this exercise you will practice reading and writing uncompressed and compress SequenceFiles.
First, you will develop a MapReduce application to convert text data to a SequenceFile. Then you will modify the application to compress the SequenceFile using Snappy file compression. When creating the SequenceFile, use the full access log file for input data.
After you have created the compressed SequenceFile, you will write a second MapReduce application to read the compressed SequenceFile and write a text file that contains the original log file text.
Lab Experiment
Write a MapReduce program to create sequence files from text files
1. Determine the number of HDFS blocks occupied by the access log file:
2. Refer to the solution in the createsequencefile project to read the access log file and create a SequenceFile. Records emitted to theSequenceFile can have any key you like, but the values should match the text in the access log file.
- package solution;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- public class CreateUncompressedSequenceFile extends Configured implements Tool {
- @Override
- public int run(String[] args) throws Exception {
- if (args.length != 2) {
- System.out.printf("Usage: CreateUncompressedSequenceFile );
- return -1;
- }
- Job job = new Job(getConf());
- job.setJarByClass(CreateUncompressedSequenceFile.class);
- job.setJobName("Create Uncompressed Sequence File");
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- /*
- * There is no need to call setInputFormatClass, because the input
- * file is a text file. However, the output file is a SequenceFile.
- * Therefore, we must call setOutputFormatClass.
- */
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- /*
- * This is a map-only job that uses the default (identity mapper), so we do not need to set
- * the mapper or reducer classes. We just need to set the number of reducers to 0.
- */
- job.setNumReduceTasks(0);
- boolean success = job.waitForCompletion(true);
- return success ? 0 : 1;
- }
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new Configuration(), new CreateUncompressedSequenceFile(), args);
- System.exit(exitCode);
- }
- }
3. Build and test your solution so far, Use the access log as input data, and specify the uncompressdsf directory for output.
4. Examine the initial portion of the output SequenceFile using the following command:
Some of the data in the SequenceFile is unreadable, but parts of them should be recognizable:
5. Verify that the number of files created by the job is equivalent to the number of blocks required to store the uncompressed SequenceFile.
Compress The Output
6. Modify the MapRedece job to compress the output SequenceFile. Add statements to your driver to configure the output as follows:
- package solution;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
- import org.apache.hadoop.io.SequenceFile.CompressionType;
- import org.apache.hadoop.io.compress.SnappyCodec;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- public class CreateCompressedSequenceFile extends Configured implements Tool {
- @Override
- public int run(String[] args) throws Exception {
- if (args.length != 2) {
- System.out.printf("Usage: CreateCompressedSequenceFile );
- return -1;
- }
- Job job = new Job(getConf());
- job.setJarByClass(CreateCompressedSequenceFile.class);
- job.setJobName("Create Compressed Sequence File");
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- /*
- * There is no need to call setInputFormatClass, because the input
- * file is a text file. However, the output file is a SequenceFile.
- * Therefore, we must call setOutputFormatClass.
- */
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- /*
- * Set the compression options.
- */
- /*
- * Compress the output
- */
- FileOutputFormat.setCompressOutput(job, true);
- /*
- * Use Snappy compression
- */
- FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);
- /*
- * Use block compression
- */
- SequenceFileOutputFormat.setOutputCompressionType(job,
- CompressionType.BLOCK);
- /*
- * This is a map-only job that uses the default (identity mapper), so we do not need to set
- * the mapper or reducer classes. We just need to set the number of reducers to 0.
- */
- job.setNumReduceTasks(0);
- boolean success = job.waitForCompletion(true);
- return success ? 0 : 1;
- }
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new Configuration(), new CreateCompressedSequenceFile(), args);
- System.exit(exitCode);
- }
- }
8. Examine the first portion of the output SequenceFile. Notice the differences between the uncompressed and compressed SequenceFiles:
9. Compare the file size of the uncompressed and compressed SequenceFiles in the uncompressdsf and compressdsf directories. The compresssed SequenceFiles should be smaller.
Write Another MapReduce Program To UnCompress The Files
10. Write a MapReduce to read the compressed log file and write a text file. This text file should have the same text data as the log file, plus keys. The keys can contain any values you like.
- package solution;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- public class ReadCompressedSequenceFile extends Configured implements Tool {
- @Override
- public int run(String[] args) throws Exception {
- if (args.length != 2) {
- System.out
- .printf("Usage: ReadCompressedSequenceFile );
- return -1;
- }
- Job job = new Job(getConf());
- job.setJarByClass(ReadCompressedSequenceFile.class);
- job.setJobName("Read Compressed Sequence File");
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- /*
- * We are using a SequenceFile as the input file.
- * Therefore, we must call setInputFormatClass.
- * There is no need to call setOutputFormatClass, because the
- * application uses a text file on output.
- */
- job.setInputFormatClass(SequenceFileInputFormat.class);
- /*
- * There is no need to set compression options for the input file.
- * The compression implementation details are encoded within the
- * input SequenceFile.
- */
- /*
- * This is a map-only job that uses the default (identity mapper), so we do not need to set
- * the mapper or reducer classes. We just need to set the number of reducers to 0.
- */
- job.setNumReduceTasks(0);
- boolean success = job.waitForCompletion(true);
- return success ? 0 : 1;
- }
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new Configuration(), new ReadCompressedSequenceFile(), args);
- System.exit(exitCode);
- }
- }
12. Examine the first portion of the output in the compresseddsftotext directory. You should be able to read the texual log file entries.
Optional: Use Command Line Options To Control Compression
13. If you used ToolRunner for your driver, you can control compressing using command line arguments. Try commenting out the code in your driver where you can setCompressOutput. Then test setting the mapred.output.compressed option on the command line, e.g.:
14. Review the output to confirm the files are compressed.
沒有留言:
張貼留言