程式扎記: [ In Action ] Ch7. Cookbook: Partitioning into multiple output files (Part2)

標籤

2014年12月28日 星期日

[ In Action ] Ch7. Cookbook: Partitioning into multiple output files (Part2)

Preface 
Up ‘till now all the MapReduce jobs we’ve seen output a single set of files. However, there are often cases where it’s more convenient to output multiple sets of files, or split a data set into multiple data sets . A popular example is the partitioning of a large log file into distinct sets of log files for each day. 

MultipleOutputFormat provides a simple way of grouping similar records into separate data sets. Before writing each output record, this OutputFormat class calls an internal method to determine the filename to write to. More specifically, you will extend a particular subclass of MultipleOutputFormat and implement the generateFileNameForKeyValue() method. The subclass you extend will determine the output format. For example, MultipleTextOutputFormat will output text files whereas MultipleSequenceFileOutputFormat will output sequence files. In either case, you’ll override the following method to return the filename for each output key/value pair: 
 

The default implementation returns the argument name, which is the leaf filename. You can make the method return a filename that’s dependent on the content of the record. For our example here, we take the patent metadata and partition it by country. All patents from U.S. inventors will go into one set of files, all patents from Japan into another pile, and so forth. The skeleton of this example program is a map-only job that takes its input and immediately outputs it. The main change we’ve made is to create our own subclass of MultipleTextOutputFormat called PartitionbyCountryMTOF. (Note that MTOF is an acronym forMultipleTextOutputFormat.) Our subclass will store each record to a location based on the inventing country listed in that record. As we treat the value returned by generateFileNameForKeyValue() as a file path, we’re able to create a subdirectory for each country by returning country + "/" + filename. See below: 
- Customized MTOF 
  1. package ch7.part2;  
  2.   
  3. import org.apache.hadoop.io.NullWritable;  
  4. import org.apache.hadoop.io.Text;  
  5. import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;  
  6.   
  7. public class PartitionByCountryMTOF extends MultipleTextOutputFormat{  
  8.     @Override  
  9.     protected String generateFileNameForKeyValue(NullWritable key,  
  10.                                                  Text value,  
  11.                                                  String filename)  
  12.     {  
  13.         // 3070816,1963,1096,,"US","OK",,1,,114,5,55,,4,,0,,,,,,,  
  14.         String[] arr = value.toString().split(",", -1);  
  15.         String country = arr[4].substring(1,3);  
  16.         return country + "/" + filename;  
  17.     }  
  18. }  
- Driver Class 
  1. package ch7.part2;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.conf.Configured;  
  7. import org.apache.hadoop.fs.Path;  
  8. import org.apache.hadoop.io.LongWritable;  
  9. import org.apache.hadoop.io.NullWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapred.FileInputFormat;  
  12. import org.apache.hadoop.mapred.FileOutputFormat;  
  13. import org.apache.hadoop.mapred.JobClient;  
  14. import org.apache.hadoop.mapred.JobConf;  
  15. import org.apache.hadoop.mapred.MapReduceBase;  
  16. import org.apache.hadoop.mapred.Mapper;  
  17. import org.apache.hadoop.mapred.OutputCollector;  
  18. import org.apache.hadoop.mapred.Reporter;  
  19. import org.apache.hadoop.mapred.TextInputFormat;  
  20. import org.apache.hadoop.util.Tool;  
  21. import org.apache.hadoop.util.ToolRunner;  
  22.   
  23. import ch5.joindc.PartitionByCountryMTOF;  
  24.   
  25. public class MultiFile extends Configured implements Tool {  
  26.     public static class MapClass extends MapReduceBase implements  
  27.             Mapper {  
  28.         public void map(LongWritable key, Text value,  
  29.                 OutputCollector output, Reporter reporter)  
  30.                 throws IOException {  
  31.             output.collect(NullWritable.get(), value);  
  32.         }  
  33.     }  
  34.   
  35.     @Override  
  36.     public int run(String[] args) throws Exception {  
  37.         Configuration conf = getConf();  
  38.         JobConf job = new JobConf(conf, MultiFile.class);  
  39.         Path in = new Path(args[0]);  
  40.         Path out = new Path(args[1]);  
  41.         FileInputFormat.setInputPaths(job, in);  
  42.         FileOutputFormat.setOutputPath(job, out);  
  43.         job.setJobName("MultiFile");  
  44.         job.setMapperClass(MapClass.class);  
  45.         job.setInputFormat(TextInputFormat.class);  
  46.         job.setOutputFormat(PartitionByCountryMTOF.class);  
  47.         job.setOutputKeyClass(NullWritable.class);  
  48.         job.setOutputValueClass(Text.class);  
  49.         job.setNumReduceTasks(0);  
  50.         JobClient.runJob(job);  
  51.         return 0;  
  52.     }  
  53.   
  54.     public static void main(String[] args) throws Exception {  
  55.         int res = ToolRunner.run(new Configuration(), new MultiFile(), args);  
  56.         System.exit(res);  
  57.     }  
  58. }  
Execute the job with below command: 
$ hadoop jar HadoopIA.jar ch7.part2.MultiFile IA/patn_100000.txt IA/MultiFile

After executing the preceding program, we can see that the output directory now has a separate directory for each country. 
$ hadoop fs -ls IA/MultiFile
...
... 2014-12-26 03:36 IA/MultiFile/AR
...
... 2014-12-26 03:36 IA/MultiFile/ZA
... 2014-12-26 03:36 IA/MultiFile/ZW
... 2014-12-26 03:36 IA/MultiFile/_SUCCESS
... 2014-12-26 03:36 IA/MultiFile/_logs

And within the directory for each country are files with only records (patents) created by those countries. 
$ hadoop fs -ls IA/MultiFile/AR
Found 1 items
-rw-r--r-- 1 training supergroup 1338 2014-12-26 03:36 IA/MultiFile/AR/part-00000

We’ve written this simple partitioning exercise as a map-only program. You can apply the same technique to the output of reducers as well. Be careful not to confuse this with the partitioner in the MapReduce framework. That partitioner looks at the keys of intermediate records and decides which reducer will process them. The partitioning we’re doing here looks at the key/value pair of the output and decides which file to store to

MultipleOutputFormat is simple, but it’s also limited. For example, we were able to split the input data by row, but what if we want to split by column? Let’s say we want to create two data sets from the patent metadata: one containing time-related information (e.g., publication date) for each patent and another one containing geographical information (e.g., country of invention). These two data sets may be of different output formats and different data types for the keys and values. We can look to MultipleOutputs, introduced in version 0.19 of Hadoop, for more powerful capabilities. 

The approach taken by MultipleOutputs is different from MultipleOutputFormatRather than asking for the filename to output each record, MultipleOutputscreates multiple OutputCollectors. Each OutputCollector can have its own OutputFormat and types for the key/value pair. Your MapReduce program will decide what to output to each OutputCollector. Listing 7.2 shows a program that takes our patent metadata and outputs two data sets. One has chronological information, such as issued date. The other data set has geographical information associated with each patent. This, too, is a map-only program, but you can apply the multiple output collectors to reducers in a straightforward way. 
- Listing 7.2 Program to project different columns of input data to different files 
  1. package ch7.part2;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.conf.Configured;  
  7. import org.apache.hadoop.fs.Path;  
  8. import org.apache.hadoop.io.LongWritable;  
  9. import org.apache.hadoop.io.NullWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapred.FileInputFormat;  
  12. import org.apache.hadoop.mapred.FileOutputFormat;  
  13. import org.apache.hadoop.mapred.JobClient;  
  14. import org.apache.hadoop.mapred.JobConf;  
  15. import org.apache.hadoop.mapred.MapReduceBase;  
  16. import org.apache.hadoop.mapred.Mapper;  
  17. import org.apache.hadoop.mapred.OutputCollector;  
  18. import org.apache.hadoop.mapred.Reporter;  
  19. import org.apache.hadoop.mapred.TextInputFormat;  
  20. import org.apache.hadoop.mapred.TextOutputFormat;  
  21. import org.apache.hadoop.mapred.lib.MultipleOutputs;  
  22. import org.apache.hadoop.util.Tool;  
  23. import org.apache.hadoop.util.ToolRunner;  
  24.   
  25. public class MultiFile2 extends Configured implements Tool {  
  26.     public static class MapClass extends MapReduceBase implements  
  27.             Mapper {  
  28.         private MultipleOutputs mos;  
  29.         private OutputCollector collector;  
  30.   
  31.         @Override  
  32.         public void configure(JobConf conf) {  
  33.             mos = new MultipleOutputs(conf);  
  34.         }  
  35.   
  36.         @Override  
  37.         public void map(LongWritable key, Text value,  
  38.                 OutputCollector output, Reporter reporter)  
  39.                 throws IOException {  
  40.             // 3104527,1963,1362,,"AR","",,1,,60,4,45,,4,,0.5,,,,,,,  
  41.             String[] arr = value.toString().split(",", -1);  
  42.             String chrono = arr[0] + "," + arr[1] + "," + arr[2];  
  43.             String geo = arr[0] + "," + arr[4] + "," + arr[5];  
  44.             collector = mos.getCollector("chrono", reporter);  
  45.             collector.collect(NullWritable.get(), new Text(chrono));  
  46.             collector = mos.getCollector("geo", reporter);  
  47.             collector.collect(NullWritable.get(), new Text(geo));  
  48.         }  
  49.   
  50.         @Override  
  51.         public void close() throws IOException {  
  52.             mos.close();  
  53.         }  
  54.     }  
  55.   
  56.     public static void main(String[] args) throws Exception {  
  57.         int res = ToolRunner.run(new Configuration(), new MultiFile2(), args);  
  58.         System.exit(res);  
  59.     }  
  60.   
  61.     @Override  
  62.     public int run(String[] args) throws Exception {  
  63.         Configuration conf = getConf();  
  64.         JobConf job = new JobConf(conf, MultiFile.class);  
  65.         Path in = new Path(args[0]);  
  66.         Path out = new Path(args[1]);  
  67.         FileInputFormat.setInputPaths(job, in);  
  68.         FileOutputFormat.setOutputPath(job, out);  
  69.         job.setJobName("MultiFile");  
  70.         job.setMapperClass(MapClass.class);  
  71.         job.setInputFormat(TextInputFormat.class);  
  72.         job.setOutputKeyClass(NullWritable.class);  
  73.         job.setOutputValueClass(Text.class);  
  74.         job.setNumReduceTasks(0);  
  75.           
  76.         MultipleOutputs.addNamedOutput(job,  
  77.                                        "chrono",  
  78.                                        TextOutputFormat.class,  
  79.                                        NullWritable.class,  
  80.                                        Text.class);  
  81.         MultipleOutputs.addNamedOutput(job,  
  82.                                        "geo",  
  83.                                        TextOutputFormat.class,  
  84.                                        NullWritable.class,  
  85.                                        Text.class);  
  86.         JobClient.runJob(job);  
  87.         return 0;  
  88.     }  
  89. }  
To use MultipleOutputs, the driver of the MapReduce program must set up the output collectors it expects to use. Creating the collectors involves a call toMultipleOutputs’ static method addNamedOutput(). We’ve created one output collector called chrono and another one called geo. We’ve created them both to useTextOutputFormat and have the same key/value types, but we can choose to use different output formats or data types. 

We have given a name to each output collector in MultipleOutputs, and MultipleOutputs will automatically generate the output filenames. We can look at the files outputted by our script to see how MultipleOutputs generates the output names: 
$ hadoop jar HadoopIA.jar ch7.part2.MultiFile2 -fs=file:/// -jt=local ../../training_materials/patn_50000.txt MultiFile2
$ ls MultiFile2/
chrono-m-00000 geo-m-00000 part-00000 _SUCCESS

We have a set of files prefixed with chrono and another set of files prefixed with geo. Note that the program created the default output files part-* even though it wrote nothing explicitly. It’s entirely possible to write to these files using the original OutputCollector passed in through the map() method. In fact, if this was not a map-only program, records written to the original OutputCollector, and only those records, would be passed to the reducers for processing. 

One of the trade-offs with MultipleOutputs is that it has a rigid naming structure compared to MultipleOutputFormatYour output collector’s name cannot be part, because that’s already in use for the default. The output filename is also strictly defined as the output collector’s name followed by m or rdepending on whether the output was collected at the mapper or the reducer. It’s finally followed by a partition number. 

沒有留言:

張貼留言

網誌存檔

關於我自己

我的相片
Where there is a will, there is a way!