程式扎記: [ In Action ] Ch5. Advanced MapReduce: Replicated joins using DistributedCache (Part3)

標籤

2014年12月19日 星期五

[ In Action ] Ch5. Advanced MapReduce: Replicated joins using DistributedCache (Part3)

Preface (P117)
The reduce-side join technique discussed in the last section is flexible, but it can also be quite inefficient. Joining doesn’t take place until the reduce phase. We shuffle all data across the network first, and in many situations we drop the majority of this data during the joining process. It would be more efficient if we eliminate the unnecessary data right in the map phase. Even better would be to perform the entire joining operation in the map phase.

The main obstacle to performing joins in the map phase is that a record being processed by a mapper may be joined with a record not easily accessible (or even located) by that mapper. If we can guarantee the accessibility of all the necessary data when joining a record, joining on the map side can work. For example, if we know that the two sources of data are partitioned into the same number of partitions and the partitions are all sorted on the key and the key is the desired join key, then each mapper (with the proper InputFormat and RecordReader) can deterministically locate and retrieve all the data necessary to perform joining. In fact, Hadoop’s org. apache.hadoop.mapred.join package contains helper classes to facilitate this map-side join . Unfortunately, situations where we can naturally apply this are limited, and running extra MapReduce jobs to repartition the data sources to be usable by this package seems to defeat the efficiency gain. Therefore, we’ll not pursue this package further.

All hope is not lost though. There’s another data pattern that occurs quite frequently that we can take advantage of. When joining big data, often only one of the sources is big; the second source may be orders of magnitude smaller. For example, a local phone company’s Customers data may have only tens of millions of records (each record containing basic information for one customer), but its transaction log can have billions of records containing detailed call history.When the smaller source can fit in memory of a mapper, we can achieve a tremendous gain in efficiency by copying the smaller source to all mappers and performing joining in the map phase. This is called replicated join in the database literature as one of the data tables is replicated across all nodes in the cluster. (The next section will cover the case when the smaller source doesn’t fit in memory.)

Replicated joins using DistributedCache
Hadoop has a mechanism called distributed cache that’s designed to distribute files to all nodes in a cluster. It’s normally used for distributing files containing “background” data needed by all mappers. For example, if you’re using Hadoop to classify documents , you may have a list of keywords for each class. (Or better yet, a probabilistic model for each class, but we digress…) You would use distributed cache to ensure all mappers have access to the lists of keywords, the “background” data. For executing replicated joins, we consider the smaller data source as background data.

Distributed cache is handled by the appropriately named class DistributedCache. There are two steps to using this class. First, when configuring a job, you call the static method DistributedCache.addCacheFile() to specify the files to be disseminated to all nodes. These files are specified as URI objects, and they default to HDFS unless a different filesystem is specified. The JobTracker will take this list of URIs and create a local copy of the files in all the TaskTrackers when it starts the job. In the second step, your mappers on each individual TaskTracker will call the static method DistributedCache.getLocalCacheFiles() to get an array of local file Paths where the local copy is located. At this point the mapper can use standard Java file I/O techniques to read the local copy.

Replicated joins using DistributedCache are simpler than reduce-side joins. Let’s begin with our standard Hadoop template.
  1. package ch5.joindc;  
  2.   
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.conf.Configured;  
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.mapreduce.Mapper;  
  7. import org.apache.hadoop.mapreduce.Reducer;  
  8. import org.apache.hadoop.util.Tool;  
  9. import org.apache.hadoop.util.ToolRunner;  
  10.   
  11. public class DataJoinDC extends Configured implements Tool{  
  12.     public static class MapClass extends Mapper {  
  13.           
  14.     }  
  15.       
  16.     @Override  
  17.     public int run(String[] args) throws Exception {  
  18.         // TODO Auto-generated method stub  
  19.         return 0;  
  20.     }  
  21.   
  22.     public static void main(String[] args) throws Exception {  
  23.         int res = ToolRunner.run(new Configuration(), new DataJoinDC(), args);  
  24.         System.exit(res);  
  25.     }  
  26. }  
Note that we’ve taken out the Reduce class. We plan on performing the joining in the map phase and will configure this job to have no reducers. You’ll find our driver method familiar too.
  1. @Override  
  2. public int run(String[] args) throws Exception {  
  3.     Job job = new Job(getConf());  
  4.     DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration());  
  5.     Path in = new Path(args[1]);  
  6.     Path out = new Path(args[2]);  
  7.       
  8.     FileInputFormat.setInputPaths(job, in);  
  9.     FileOutputFormat.setOutputPath(job, out);  
  10.     job.setJarByClass(DataJoinDC.class);  
  11.     job.setJobName("DataJoin with DistributedCache");  
  12.     job.setMapperClass(MapClass.class);  
  13.     job.setNumReduceTasks(0);  
  14.     job.setInputFormatClass(KeyValueTextInputFormat.class);  
  15.     job.setOutputFormatClass(TextOutputFormat.class);         
  16.     job.getConfiguration().set("key.value.separator.in.input.line"",");  
  17.     job.setOutputKeyClass(Text.class);  
  18.     job.setOutputValueClass(IntWritable.class);       
  19.     boolean success = job.waitForCompletion(true);    
  20.     return(success ? 0 : 1);   
  21. }  
The crucial addition here is where we take the file specified by the first argument and add it to DistributedCache. When we run the job, each node will create a local copy of that file from HDFS. The second and third arguments denote the input and output paths of the standard Hadoop job

Up to now our MapClass has only had to define one method, map(). In fact, the Mapper abstract class has two more interesting methods, setup() and cleanup(). The method setup() will be called when we first instantiate the MapClass, and the method cleanup() will be called when the mapper finishes processing its split. This way we can have the data available each time we call map() to process a new record.
  1. public static class MapClass extends Mapper {  
  2.     private static final Logger LOGGER = Logger.getLogger(MapClass.class);  
  3.     private Hashtable joinData = new Hashtable();  
  4.       
  5.     @Override  
  6.     protected void setup(Context context) {  
  7.         try {  
  8.             Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());  
  9.             if (cacheFiles != null && cacheFiles.length > 0) {  
  10.                 String line;  
  11.                 String[] tokens;  
  12.                 BufferedReader joinReader = new BufferedReader(  
  13.                         new FileReader(cacheFiles[0].toString()));  
  14.                 try {  
  15.                     while ((line = joinReader.readLine()) != null) {  
  16.                         tokens = line.split(","2);  
  17.                         joinData.put(tokens[0], tokens[1]);  
  18.                     }  
  19.                 } finally {  
  20.                     joinReader.close();  
  21.                 }  
  22.             }  
  23.         } catch (Exception e) {  
  24.             e.printStackTrace();  
  25.             LOGGER.error(String.format("Fail to read cache file: %s", e));  
  26.         }  
  27.     }  
  28.       
  29.     @Override  
  30.     public void map(Text key, Text value, Context context)  
  31.             throws IOException, InterruptedException {  
  32.         String joinValue = joinData.get(key.toString());  
  33.         if (joinValue != null) {  
  34.             context.write(key, new Text(value.toString() + "," + joinValue));  
  35.         }  
  36.     }  
  37. }  
During setup() in mapper, we get an array of file paths to the local copy of files pushed by DistributedCache. As our driver method has only pushed one file (given by our first argument) into DistributedCache, this should be an array of size one. We read that file using standard Java file I/O. For our purpose, the program assumes each line is a record, the key/value pair is comma separated, and the key is unique and will be used for joining. The program reads this source file into a Java Hashtable called joinData that’s available throughout the mapper’s lifespan.

The joining takes place in the map() method and is straightforward now that one of the sources resides in memory in the form of joinData. If we don’t find the join key in joinData, we drop the record. Otherwise, we match the (join) key to the value in joinDataand concatenate the values. The result is outputted directly into HDFS as we don’t have any reducer for further processing.

A not-infrequent situation in using DistributedCache is that the background data (the smaller data source in our data join case) is in the local filesystem of the client rather than stored in HDFS. One way to handle this is to add code to upload the local file on the client to HDFS before callingDistributedCache.addCacheFile(). Fortunately, this process is natively supported as one of the generic Hadoop command line arguments inGenericOptionsParser. The option is -files (generic options) and it automatically copies a comma-separated list of files to all the task nodes. Our command line statement is:
$ hadoop jar -files small_in.txt DataJoinDC.jar big_in.txt output

The full driver class code is shown below:
  1. package ch5.joindc;  
  2.   
  3. import java.io.BufferedReader;  
  4. import java.io.FileReader;  
  5. import java.io.IOException;  
  6. import java.util.Hashtable;  
  7.   
  8. import org.apache.hadoop.conf.Configuration;  
  9. import org.apache.hadoop.conf.Configured;  
  10. import org.apache.hadoop.filecache.DistributedCache;  
  11. import org.apache.hadoop.fs.Path;  
  12. import org.apache.hadoop.io.IntWritable;  
  13. import org.apache.hadoop.io.Text;  
  14. import org.apache.hadoop.mapreduce.Job;  
  15. import org.apache.hadoop.mapreduce.Mapper;  
  16. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  17. import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;  
  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  19. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  20. import org.apache.hadoop.util.Tool;  
  21. import org.apache.hadoop.util.ToolRunner;  
  22. import org.apache.log4j.Logger;  
  23.   
  24.   
  25.   
  26. public class DataJoinDC extends Configured implements Tool{   
  27.     public static class MapClass extends Mapper {  
  28.         private static final Logger LOGGER = Logger.getLogger(MapClass.class);  
  29.         private Hashtable joinData = new Hashtable();  
  30.           
  31.         @Override  
  32.         protected void setup(Context context) {  
  33.             try {  
  34.                 Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());  
  35.                 if (cacheFiles != null && cacheFiles.length > 0) {  
  36.                     String line;  
  37.                     String[] tokens;  
  38.                     BufferedReader joinReader = new BufferedReader(  
  39.                             new FileReader(cacheFiles[0].toString()));  
  40.                     try {  
  41.                         while ((line = joinReader.readLine()) != null) {  
  42.                             tokens = line.split(","2);  
  43.                             joinData.put(tokens[0], tokens[1]);  
  44.                         }  
  45.                     } finally {  
  46.                         joinReader.close();  
  47.                     }  
  48.                 }  
  49.             } catch (Exception e) {  
  50.                 e.printStackTrace();  
  51.                 LOGGER.error(String.format("Fail to read cache file: %s", e));  
  52.             }  
  53.         }  
  54.           
  55.         @Override  
  56.         public void map(Text key, Text value, Context context)  
  57.                 throws IOException, InterruptedException {  
  58.             String joinValue = joinData.get(key.toString());  
  59.             if (joinValue != null) {  
  60.                 context.write(key, new Text(value.toString() + "," + joinValue));  
  61.             }  
  62.         }  
  63.     }  
  64.       
  65.     @Override  
  66.     public int run(String[] args) throws Exception {  
  67.         Job job = new Job(getConf());  
  68.         DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration());  
  69.         Path in = new Path(args[1]);  
  70.         Path out = new Path(args[2]);  
  71.           
  72.         FileInputFormat.setInputPaths(job, in);  
  73.         FileOutputFormat.setOutputPath(job, out);  
  74.         job.setJarByClass(DataJoinDC.class);  
  75.         job.setJobName("DataJoin with DistributedCache");  
  76.         job.setMapperClass(MapClass.class);  
  77.         job.setNumReduceTasks(0);  
  78.         job.setInputFormatClass(KeyValueTextInputFormat.class);  
  79.         job.setOutputFormatClass(TextOutputFormat.class);         
  80.         job.getConfiguration().set("key.value.separator.in.input.line"",");  
  81.         job.setOutputKeyClass(Text.class);  
  82.         job.setOutputValueClass(IntWritable.class);       
  83.         boolean success = job.waitForCompletion(true);    
  84.         return(success ? 0 : 1);   
  85.     }  
  86.   
  87.     public static void main(String[] args) throws Exception {  
  88.         int res = ToolRunner.run(new Configuration(), new DataJoinDC(), args);  
  89.         System.exit(res);  
  90.     }  
  91. }  
Supplement
[CCDH] Class3 - Programming with Hadoop Core API
The Distributed Cache provides an API to push data to all slave nodes:
* Transfer happens behind the scenes before any task is executed
* Data is only transferred once to each node
* Distributed Cache is read-only
* Files in the Distributed Cache are automatically deleted from slave nodes when the job finishes.


沒有留言:

張貼留言

網誌存檔

關於我自己

我的相片
Where there is a will, there is a way!