Up ‘till now all the MapReduce jobs we’ve seen output a single set of files. However, there are often cases where it’s more convenient to output multiple sets of files, or split a data set into multiple data sets . A popular example is the partitioning of a large log file into distinct sets of log files for each day.
MultipleOutputFormat provides a simple way of grouping similar records into separate data sets. Before writing each output record, this OutputFormat class calls an internal method to determine the filename to write to. More specifically, you will extend a particular subclass of MultipleOutputFormat and implement the generateFileNameForKeyValue() method. The subclass you extend will determine the output format. For example, MultipleTextOutputFormat will output text files whereas MultipleSequenceFileOutputFormat will output sequence files. In either case, you’ll override the following method to return the filename for each output key/value pair:
The default implementation returns the argument name, which is the leaf filename. You can make the method return a filename that’s dependent on the content of the record. For our example here, we take the patent metadata and partition it by country. All patents from U.S. inventors will go into one set of files, all patents from Japan into another pile, and so forth. The skeleton of this example program is a map-only job that takes its input and immediately outputs it. The main change we’ve made is to create our own subclass of MultipleTextOutputFormat called PartitionbyCountryMTOF. (Note that MTOF is an acronym forMultipleTextOutputFormat.) Our subclass will store each record to a location based on the inventing country listed in that record. As we treat the value returned by generateFileNameForKeyValue() as a file path, we’re able to create a subdirectory for each country by returning country + "/" + filename. See below:
- Customized MTOF
- package ch7.part2;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
- public class PartitionByCountryMTOF extends MultipleTextOutputFormat
{ - @Override
- protected String generateFileNameForKeyValue(NullWritable key,
- Text value,
- String filename)
- {
- // 3070816,1963,1096,,"US","OK",,1,,114,5,55,,4,,0,,,,,,,
- String[] arr = value.toString().split(",", -1);
- String country = arr[4].substring(1,3);
- return country + "/" + filename;
- }
- }
- package ch7.part2;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.FileInputFormat;
- import org.apache.hadoop.mapred.FileOutputFormat;
- import org.apache.hadoop.mapred.JobClient;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.MapReduceBase;
- import org.apache.hadoop.mapred.Mapper;
- import org.apache.hadoop.mapred.OutputCollector;
- import org.apache.hadoop.mapred.Reporter;
- import org.apache.hadoop.mapred.TextInputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import ch5.joindc.PartitionByCountryMTOF;
- public class MultiFile extends Configured implements Tool {
- public static class MapClass extends MapReduceBase implements
- Mapper
{ - public void map(LongWritable key, Text value,
- OutputCollector
output, Reporter reporter) - throws IOException {
- output.collect(NullWritable.get(), value);
- }
- }
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf = getConf();
- JobConf job = new JobConf(conf, MultiFile.class);
- Path in = new Path(args[0]);
- Path out = new Path(args[1]);
- FileInputFormat.setInputPaths(job, in);
- FileOutputFormat.setOutputPath(job, out);
- job.setJobName("MultiFile");
- job.setMapperClass(MapClass.class);
- job.setInputFormat(TextInputFormat.class);
- job.setOutputFormat(PartitionByCountryMTOF.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
- job.setNumReduceTasks(0);
- JobClient.runJob(job);
- return 0;
- }
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new MultiFile(), args);
- System.exit(res);
- }
- }
After executing the preceding program, we can see that the output directory now has a separate directory for each country.
And within the directory for each country are files with only records (patents) created by those countries.
We’ve written this simple partitioning exercise as a map-only program. You can apply the same technique to the output of reducers as well. Be careful not to confuse this with the partitioner in the MapReduce framework. That partitioner looks at the keys of intermediate records and decides which reducer will process them. The partitioning we’re doing here looks at the key/value pair of the output and decides which file to store to.
MultipleOutputFormat is simple, but it’s also limited. For example, we were able to split the input data by row, but what if we want to split by column? Let’s say we want to create two data sets from the patent metadata: one containing time-related information (e.g., publication date) for each patent and another one containing geographical information (e.g., country of invention). These two data sets may be of different output formats and different data types for the keys and values. We can look to MultipleOutputs, introduced in version 0.19 of Hadoop, for more powerful capabilities.
The approach taken by MultipleOutputs is different from MultipleOutputFormat. Rather than asking for the filename to output each record, MultipleOutputscreates multiple OutputCollectors. Each OutputCollector can have its own OutputFormat and types for the key/value pair. Your MapReduce program will decide what to output to each OutputCollector. Listing 7.2 shows a program that takes our patent metadata and outputs two data sets. One has chronological information, such as issued date. The other data set has geographical information associated with each patent. This, too, is a map-only program, but you can apply the multiple output collectors to reducers in a straightforward way.
- Listing 7.2 Program to project different columns of input data to different files
- package ch7.part2;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.FileInputFormat;
- import org.apache.hadoop.mapred.FileOutputFormat;
- import org.apache.hadoop.mapred.JobClient;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.MapReduceBase;
- import org.apache.hadoop.mapred.Mapper;
- import org.apache.hadoop.mapred.OutputCollector;
- import org.apache.hadoop.mapred.Reporter;
- import org.apache.hadoop.mapred.TextInputFormat;
- import org.apache.hadoop.mapred.TextOutputFormat;
- import org.apache.hadoop.mapred.lib.MultipleOutputs;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- public class MultiFile2 extends Configured implements Tool {
- public static class MapClass extends MapReduceBase implements
- Mapper
{ - private MultipleOutputs mos;
- private OutputCollector
collector; - @Override
- public void configure(JobConf conf) {
- mos = new MultipleOutputs(conf);
- }
- @Override
- public void map(LongWritable key, Text value,
- OutputCollector
output, Reporter reporter) - throws IOException {
- // 3104527,1963,1362,,"AR","",,1,,60,4,45,,4,,0.5,,,,,,,
- String[] arr = value.toString().split(",", -1);
- String chrono = arr[0] + "," + arr[1] + "," + arr[2];
- String geo = arr[0] + "," + arr[4] + "," + arr[5];
- collector = mos.getCollector("chrono", reporter);
- collector.collect(NullWritable.get(), new Text(chrono));
- collector = mos.getCollector("geo", reporter);
- collector.collect(NullWritable.get(), new Text(geo));
- }
- @Override
- public void close() throws IOException {
- mos.close();
- }
- }
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new MultiFile2(), args);
- System.exit(res);
- }
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf = getConf();
- JobConf job = new JobConf(conf, MultiFile.class);
- Path in = new Path(args[0]);
- Path out = new Path(args[1]);
- FileInputFormat.setInputPaths(job, in);
- FileOutputFormat.setOutputPath(job, out);
- job.setJobName("MultiFile");
- job.setMapperClass(MapClass.class);
- job.setInputFormat(TextInputFormat.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
- job.setNumReduceTasks(0);
- MultipleOutputs.addNamedOutput(job,
- "chrono",
- TextOutputFormat.class,
- NullWritable.class,
- Text.class);
- MultipleOutputs.addNamedOutput(job,
- "geo",
- TextOutputFormat.class,
- NullWritable.class,
- Text.class);
- JobClient.runJob(job);
- return 0;
- }
- }
We have given a name to each output collector in MultipleOutputs, and MultipleOutputs will automatically generate the output filenames. We can look at the files outputted by our script to see how MultipleOutputs generates the output names:
We have a set of files prefixed with chrono and another set of files prefixed with geo. Note that the program created the default output files part-* even though it wrote nothing explicitly. It’s entirely possible to write to these files using the original OutputCollector passed in through the map() method. In fact, if this was not a map-only program, records written to the original OutputCollector, and only those records, would be passed to the reducers for processing.
One of the trade-offs with MultipleOutputs is that it has a rigid naming structure compared to MultipleOutputFormat. Your output collector’s name cannot be part, because that’s already in use for the default. The output filename is also strictly defined as the output collector’s name followed by m or rdepending on whether the output was collected at the mapper or the reducer. It’s finally followed by a partition number.
沒有留言:
張貼留言