程式扎記: [ In Action ] Ch5. Advanced MapReduce: Joining data from different sources (Part2)

標籤

2014年12月17日 星期三

[ In Action ] Ch5. Advanced MapReduce: Joining data from different sources (Part2)

Preface (P107) 
It’s inevitable that you’ll come across data analyses where you need to pull in data from different sources. For example, given our patent data sets, you may want to find out if certain countries cite patents from another country. You’ll have to look at citation data (cite75_99.txt) as well as patent data for country information (apat63_99. txt). In the database world it would just be a matter of joining two tables, and most databases automagically take care of the join processing for you. Unfortunately, joining data in Hadoop is more involved, and there are several possible approaches with different trade-offs. 

We use a couple toy data sets to better illustrate joining in Hadoop. Let’s take a comma-separated Customers file where each record has three fields: Customer ID, Name, and Phone Number. We put four records in the file for illustration: 
  1. 1,Stephanie Leung,555-555-5555  
  2. 2,Edward Kim,123-456-7890  
  3. 3,Jose Madriz,281-330-8004  
  4. 4,David Stork,408-555-0000  
We store Customer orders in a separate file, called Orders. It’s in CSV format, with four fields: Customer ID, Order IDPrice, and Purchase Date
  1. 3,A,12.95,02-Jun-2008  
  2. 1,B,88.25,20-May-2008  
  3. 2,C,32.00,30-Nov-2007  
  4. 3,D,25.02,22-Jan-2009  
If we want an inner join of the two data sets above, the desired output would look a listing 5.2. 
- Listing 5.2 Desired output of an inner join between Customers and Orders data 
  1. 1,Stephanie Leung,555-555-5555,B,88.25,20-May-2008  
  2. 2,Edward Kim,123-456-7890,C,32.00,30-Nov-2007  
  3. 3,Jose Madriz,281-330-8004,A,12.95,02-Jun-2008  
  4. 3,Jose Madriz,281-330-8004,D,25.02,22-Jan-2009  
Hadoop can also perform outer joins, although to simplify explanation we focus on inner joins. 

Reduce-side joining 
Hadoop has a contrib package called datajoin that works as a generic framework for data joining in Hadoop. Its jar file is at contrib/datajoin/hadoop-*-datajoin. jar. To distinguish it from other joining techniques, it’s called the reduce-side join, as we do most of the processing on the reduce side. It’s also known as therepartitioned join (or the repartitioned sort-merge join), as it’s the same as the database technique of the same name. Although it’s not the most efficient joining technique, it’s the most general and forms the basis of some more advanced techniques (such as the semijoin). 

Reduce-side join introduces some new terminologies and concepts, namely, data source , tag , and group key. A data source is analogous to a table in relational databases. We have two data sources in our toy example: Customers and Orders. A data source can be a single file or multiple files. The important point is that all the records in a data source have the same structure, analogous to a schema. 

The MapReduce paradigm calls for processing each record one at a time in a stateless manner. If we want some state information to persist, we have to tag the record with such state. For example, given our two files, a record may look to a mapper like this: 
3,Jose Madriz,281-330-8004

Or 
3,A,12.95,02-Jun-2008

where the record type (Customers or Orders) is dissociated from the record itself. Tagging the record will ensure that specific metadata will always go along with the record. For the purpose of data joining, we want to tag each record with its data source. 

The group key functions like a join key in a relational database. For our example, the group key is the Customer ID. As the datajoin package allows the group key to be any user-defined function, group key is more general than a join key in a relational database. 

Before explaining how to use the contrib package, let’s go through all the major steps in a repartitioned sort-merge join of our toy datasets. After seeing how those steps fit together, we’ll see which steps are done by the datajoin package, and which ones we program. We’ll have code to see the hooks for integrating our code with the datajoin package. 

DATA FLOW OF A REDUCE-SIDE JOIN 
Figure 5.1 illustrates the data flow of a repartitioned join on the toy data sets Customers and Orders, up to the reduce stage. We’ll go into more details later to see what happens in the reduce stage. 
 
Figure 5.1 In repartitioned join , the mapper first wraps each record with a group key and a tag. The group key is the joining attribute, and the tag is the data source (table in SQL parlance) of the record. The partition and shuffle step will group all the records with the same group key together. The reducer is called on the set of records with the same group key. 

The function reduce() will take its input and do a full cross-product on the values. Reduce() creates all combinations of the values with the constraint that a combination will not be tagged more than once. In cases where reduce() sees values of distinct tags, the cross-product is the original set of values. In our example, this is the case for group keys 1, 2, and 4. Figure 5.2 illustrates the cross product for group key 3. We have three values, one tagged with Customers and two tagged with Orders. The cross-product creates two combinations. Each combination consists of the Customers value and one of the Orders value. 
 
Figure 5.2 The reduce side of a repartitioned join . For a given join key, the reduce task performs a full cross-product of values from different sources. It sends each combination to combine() to create an output record. The combine() function can choose to not output any particular combination. 

It feeds each combination from the cross-product into a function called combine(). (Don’t confuse with combiners as explained in section 4.5.) Due to the nature of the cross-product, combine() is guaranteed to see at most one record from each of the data sources (tags), and all the records it sees have the same join key. It’s the combine() function that determines whether the whole operation is an inner join , outer join , or another type of join . In an inner join , combine() drops all combinations where not all tags are present, such as our case with group key "4". Otherwise combine() merges the records from different sources into a single output record. 

Now you see why we call this joining process the repartitioned sort-merge join. The records in the original input sources can be in random order. They arerepartitioned onto the reducers in the right grouping. The reducer can then merge records of the same join key together to create the desired join output. 

IMPLEMENTING JOIN WITH THE DATAJOIN PACKAGE 
Hadoop’s datajoin package implements the dataflow of a join as described previously. We have certain hooks to handle the details of our particular data structure and a special hook for us to define the exact function of combine()

Hadoop ’s datajoin package has three abstract classes that we inherit and make concrete: DataJoinMapperBaseDataJoinReducerBase, andTaggedMapOutput. As the names suggest, our MapClass will extend DataJoinMapperBase, and our Reduce class will extend DataJoinReducerBase. The datajoin package has already implemented the map() and reduce() methods in these respective base classes to perform the join dataflow describe in the last section. Our subclass will only have to implement a few new methods to configure the details. 

Before explaining how to use DataJoinMapperBase and DataJoinReducerBase, you need to understand a new abstract data type TaggedMapOutput that is used throughout the code. Recall from the dataflow description that the mapper outputs a package with a (group) key and a value that is a tagged record. The datajoin package specifies the (group) key to be of type Textand the value (i.e., the tagged record) to be of type TaggedMapOutputTaggedMapOutput is a data type for wrapping our records with a Text tag. It trivially implements a getTag() and a setTag(Text tag) method. It specifies an abstract method getData(). Our subclass will implement that method to handle the type of the record. There’s no explicit requirement for the subclass to have a setData() method but we must pass in the record data. The subclass can implement such a setData() method for the sake of symmetry or take in a record in the constructor. In addition, as the output of a mapper, TaggedMapOutput needs to be Writable. Therefore, our subclass has to implement the readFields() and write() methods. We createdTaggedWritable, a simple subclass for handling any Writable record type. 
  1. package ch5.join;  
  2.   
  3. import java.io.DataInput;  
  4. import java.io.DataOutput;  
  5. import java.io.IOException;  
  6.   
  7. import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.io.Writable;  
  10.   
  11. public class TaggedWritable extends TaggedMapOutput{  
  12.     private Writable data;  
  13.       
  14.     public TaggedWritable(Writable data) {  
  15.         this.tag = new Text("");  
  16.         this.data = data;  
  17.     }  
  18.       
  19.     @Override  
  20.     public Writable getData() {  
  21.         return data;  
  22.     }  
  23.   
  24.     @Override  
  25.     public void readFields(DataInput din) throws IOException {  
  26.         String tagStr = din.readUTF();  
  27.         if(tagStr.equals("Customers"))  
  28.         {  
  29.             String id = din.readUTF();  
  30.             String name = din.readUTF();  
  31.             String pNum = din.readUTF();  
  32.             ((Text)data).set(String.format("%s,%s,%s", id, name, pNum));  
  33.         }  
  34.         else  // Orders  
  35.         {  
  36.             String id = din.readUTF();  
  37.             String oid = din.readUTF();  
  38.             String price = din.readUTF();  
  39.             String pdate = din.readUTF();  
  40.             ((Text)data).set(String.format("%s,%s,%s,%s", id, oid, price, pdate));  
  41.         }  
  42.     }  
  43.   
  44.     @Override  
  45.     public void write(DataOutput dout) throws IOException {  
  46.         String tagStr = getTag().toString();  
  47.         dout.writeUTF(tagStr);  
  48.         String fields[] = data.toString().split(",");  
  49.         for(String field:fields) dout.writeUTF(field.trim());  
  50.     }  
  51. }  
Recall from the join dataflow that the mapper’s main function is to package a record such that it goes to the same reducer as other records with the same join key.DataJoinMapperBase performs all the packaging, but the class specifies three abstract methods for our subclass to fill in: 
 
 

The generateInputTag() is called at the start of a map task to globally specify the tag for all the records this map task will process. The tag is defined to be of type Text. Note that we call the generateInputTag() with the filename of the records. The mapper working on the Customers file will receive the string "Customers" as the argument to generateInputTag(). As we’re using the tag to signify the data source, and our filename is set up to reflect the data source,generateInputTag() is: 
  1. @Override  
  2. protected Text generateInputTag(String arg0) {  
  3.     return new Text(inputFile);  
  4. }  
We store the result of generateInputTag() in the DataJoinMapperBase object’s inputTag variable for later use. We can also store the filename inDataJoinMapperBase’s inputFile variable if we want to refer to it again. After the map task’s initialization, DataJoinMapperBase’s map() is called for each record. It calls the two abstract methods that we have yet to implement: 
  1. public void map(Object key, Object value,  
  2.   OutputCollector output, Reporter reporter) throws IOException  
  3. {  
  4.     TaggedMapOutput aRecord = generateTaggedMapOutput(value);  
  5.     Text groupKey = generateGroupKey(aRecord);  
  6.     output.collect(groupKey, aRecord);  
  7. }  
The generateTaggedMapOutput() method wraps the record value into a TaggedMapOutput type. Recall the concrete implementation of TaggedMapOutput that we’re using is called TaggedWritable. The method generateTaggedMapOutput() can return a TaggedWritable with any Text tag that we want. In principle, the tag can even be different for different records in the same file. In the standard case, we want the tag to stand for the data source that our generateInputTag() had computed earlier and stored in this.inputTag
  1. @Override  
  2. protected TaggedMapOutput generateTaggedMapOutput(Object value) {  
  3.     TaggedWritable retv = new TaggedWritable((Text) value);  
  4.     retv.setTag(this.inputTag);  
  5.     return retv;  
  6. }  
The generateGroupKey() method takes a tagged record (of type TaggedMapOutput) and returns the group key for joining. For our current purpose, we unwrap the tagged record and take the first field in the CSV-formatted value as the join key. 
  1. @Override  
  2. protected Text generateGroupKey(TaggedMapOutput aRecord) {  
  3.     String line = ((Text) aRecord.getData()).toString();  
  4.     String[] tokens = line.split(",");  
  5.     String groupKey = tokens[0];  
  6.     return new Text(groupKey);  
  7. }  
DataJoinMapperBase is a simple class, and much of the mapper code is in our subclass. DataJoinReducerBase, on the other hand, is the workhorse of the datajoin package, and it simplifies our programming by performing a full outer join for us. Our reducer subclass only has to implement the combine() method to filter out unwanted combinations to get the desired join operation (inner join, left outer join, etc.). It’s also in the combine() method that we format the combination into the appropriate output format. 

We give the combine() method one combination of the cross product of the tagged records with the same join (group) key. This may sound complicated, but recall from the dataflow diagrams in figures 5.1 and 5.2 that the cross-product is simple for the canonical case of two data sources. Each combination will have either two records (meaning there’s at least one record in each data source with the join key) or one (meaning only one data source has that join key). 

Let’s look at the signature of combine()
 

An array of tags and an array of values represent the combination. The size of those two arrays is guaranteed to be the same and equal to the number of tagged records in the combination. Furthermore, the tags are always in sorted order. 

As tags correspond to the data sources, in the canonical case of joining two data sources, the tags array to combine() won’t be longer than two. Figure 5.2 showscombine() being called twice. For the left side, the tags and values arrays are like this: 
  1. tags = {"Customers""Orders"};  
  2. values = {"3,Jose Madriz,281-330-8004""A,12.95,02-Jun-2008"};  
For an inner join , combine() will ignore combinations where not all tags are present. It does so by returning null. Given a legal combination, the role ofcombine() is to concatenate all the values into one single record for output. The order of concatenation is fully determined by combine(). In the case of an inner join, the length of values[] is always the number of data sources available (two in the canonical case), and the tags are always in sorted order. It’s a sensible choice to loop through the values[] array to get the default alphabetical ordering based on data source names. 

DataJoinReducerBase, like any reducer, outputs key/value pairs. For each legal combination, the key is always the join key and the value is the output of combine()Note that the join key is still present in each element of the values[] array. The combine() method should strip out the join key in those elements before concatenating them. Otherwise the join key will be shown multiple times in one output record. 
  1. package ch5.join;  
  2.   
  3. import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;  
  4. import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;  
  5. import org.apache.hadoop.io.Text;  
  6.   
  7. public class Reduce extends DataJoinReducerBase {  
  8.   
  9.     @Override  
  10.     protected TaggedMapOutput combine(Object[] tags, Object[] values) {  
  11.         if (tags.length < 2)  
  12.             return null;  
  13.         String joinedStr = "";  
  14.         for (int i = 0; i < values.length; i++) {  
  15.             if (i > 0)  
  16.                 joinedStr += ",";  
  17.             TaggedWritable tw = (TaggedWritable) values[i];  
  18.             String line = ((Text) tw.getData()).toString();  
  19.             String[] tokens = line.split(","2);  
  20.             joinedStr += tokens[1];  
  21.         }  
  22.         TaggedWritable retv = new TaggedWritable(new Text(joinedStr));  
  23.         retv.setTag((Text) tags[0]);  
  24.         return retv;  
  25.     }  
  26.   
  27. }  
Finally is the driver class: 
  1. package ch5.join;  
  2.   
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.conf.Configured;  
  5. import org.apache.hadoop.fs.Path;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapred.FileInputFormat;  
  8. import org.apache.hadoop.mapred.FileOutputFormat;  
  9. import org.apache.hadoop.mapred.JobClient;  
  10. import org.apache.hadoop.mapred.JobConf;  
  11. import org.apache.hadoop.mapred.TextInputFormat;  
  12. import org.apache.hadoop.mapred.TextOutputFormat;  
  13. import org.apache.hadoop.util.Tool;  
  14. import org.apache.hadoop.util.ToolRunner;  
  15.   
  16. public class DataJoin extends Configured implements Tool {  
  17.   
  18.     @Override  
  19.     public int run(String[] args) throws Exception {  
  20.         Configuration conf = getConf();  
  21.           
  22.         JobConf job = new JobConf(conf, DataJoin.class);  
  23.         Path in = new Path(args[0]);  
  24.         Path out = new Path(args[1]);  
  25.         FileInputFormat.setInputPaths(job, in);  
  26.         FileOutputFormat.setOutputPath(job, out);  
  27.   
  28.         job.setJobName("DataJoin");  
  29.         job.setMapperClass(MapClass.class);  
  30.         job.setReducerClass(Reduce.class);  
  31.         job.setInputFormat(TextInputFormat.class);  
  32.         job.setOutputFormat(TextOutputFormat.class);  
  33.         job.setOutputKeyClass(Text.class);  
  34.         job.setOutputValueClass(TaggedWritable.class);  
  35.         job.set("mapred.textoutputformat.separator"",");  
  36.         JobClient.runJob(job);   
  37.         return 0;  
  38.     }  
  39.       
  40.     public static void main(String[] args) throws Exception {  
  41.         int res = ToolRunner.run(new Configuration(), new DataJoin(), args);  
  42.         System.exit(res);  
  43.     }  
  44. }  
Supplement 
How-to: Include Third-Party Libraries in Your MapReduce Job

沒有留言:

張貼留言

網誌存檔

關於我自己

我的相片
Where there is a will, there is a way!