It’s inevitable that you’ll come across data analyses where you need to pull in data from different sources. For example, given our patent data sets, you may want to find out if certain countries cite patents from another country. You’ll have to look at citation data (cite75_99.txt) as well as patent data for country information (apat63_99. txt). In the database world it would just be a matter of joining two tables, and most databases automagically take care of the join processing for you. Unfortunately, joining data in Hadoop is more involved, and there are several possible approaches with different trade-offs.
We use a couple toy data sets to better illustrate joining in Hadoop. Let’s take a comma-separated Customers file where each record has three fields: Customer ID, Name, and Phone Number. We put four records in the file for illustration:
- Listing 5.2 Desired output of an inner join between Customers and Orders data
Hadoop has a contrib package called datajoin that works as a generic framework for data joining in Hadoop. Its jar file is at contrib/datajoin/hadoop-*-datajoin. jar. To distinguish it from other joining techniques, it’s called the reduce-side join, as we do most of the processing on the reduce side. It’s also known as therepartitioned join (or the repartitioned sort-merge join), as it’s the same as the database technique of the same name. Although it’s not the most efficient joining technique, it’s the most general and forms the basis of some more advanced techniques (such as the semijoin).
Reduce-side join introduces some new terminologies and concepts, namely, data source , tag , and group key. A data source is analogous to a table in relational databases. We have two data sources in our toy example: Customers and Orders. A data source can be a single file or multiple files. The important point is that all the records in a data source have the same structure, analogous to a schema.
The MapReduce paradigm calls for processing each record one at a time in a stateless manner. If we want some state information to persist, we have to tag the record with such state. For example, given our two files, a record may look to a mapper like this:
where the record type (Customers or Orders) is dissociated from the record itself. Tagging the record will ensure that specific metadata will always go along with the record. For the purpose of data joining, we want to tag each record with its data source.
The group key functions like a join key in a relational database. For our example, the group key is the Customer ID. As the datajoin package allows the group key to be any user-defined function, group key is more general than a join key in a relational database.
Before explaining how to use the contrib package, let’s go through all the major steps in a repartitioned sort-merge join of our toy datasets. After seeing how those steps fit together, we’ll see which steps are done by the datajoin package, and which ones we program. We’ll have code to see the hooks for integrating our code with the datajoin package.
DATA FLOW OF A REDUCE-SIDE JOIN
Figure 5.1 illustrates the data flow of a repartitioned join on the toy data sets Customers and Orders, up to the reduce stage. We’ll go into more details later to see what happens in the reduce stage.
Figure 5.1 In repartitioned join , the mapper first wraps each record with a group key and a tag. The group key is the joining attribute, and the tag is the data source (table in SQL parlance) of the record. The partition and shuffle step will group all the records with the same group key together. The reducer is called on the set of records with the same group key.
The function reduce() will take its input and do a full cross-product on the values. Reduce() creates all combinations of the values with the constraint that a combination will not be tagged more than once. In cases where reduce() sees values of distinct tags, the cross-product is the original set of values. In our example, this is the case for group keys 1, 2, and 4. Figure 5.2 illustrates the cross product for group key 3. We have three values, one tagged with Customers and two tagged with Orders. The cross-product creates two combinations. Each combination consists of the Customers value and one of the Orders value.
Figure 5.2 The reduce side of a repartitioned join . For a given join key, the reduce task performs a full cross-product of values from different sources. It sends each combination to combine() to create an output record. The combine() function can choose to not output any particular combination.
It feeds each combination from the cross-product into a function called combine(). (Don’t confuse with combiners as explained in section 4.5.) Due to the nature of the cross-product, combine() is guaranteed to see at most one record from each of the data sources (tags), and all the records it sees have the same join key. It’s the combine() function that determines whether the whole operation is an inner join , outer join , or another type of join . In an inner join , combine() drops all combinations where not all tags are present, such as our case with group key "4". Otherwise combine() merges the records from different sources into a single output record.
Now you see why we call this joining process the repartitioned sort-merge join. The records in the original input sources can be in random order. They arerepartitioned onto the reducers in the right grouping. The reducer can then merge records of the same join key together to create the desired join output.
IMPLEMENTING JOIN WITH THE DATAJOIN PACKAGE
Hadoop’s datajoin package implements the dataflow of a join as described previously. We have certain hooks to handle the details of our particular data structure and a special hook for us to define the exact function of combine().
Hadoop ’s datajoin package has three abstract classes that we inherit and make concrete: DataJoinMapperBase, DataJoinReducerBase, andTaggedMapOutput. As the names suggest, our MapClass will extend DataJoinMapperBase, and our Reduce class will extend DataJoinReducerBase. The datajoin package has already implemented the map() and reduce() methods in these respective base classes to perform the join dataflow describe in the last section. Our subclass will only have to implement a few new methods to configure the details.
Before explaining how to use DataJoinMapperBase and DataJoinReducerBase, you need to understand a new abstract data type TaggedMapOutput that is used throughout the code. Recall from the dataflow description that the mapper outputs a package with a (group) key and a value that is a tagged record. The datajoin package specifies the (group) key to be of type Textand the value (i.e., the tagged record) to be of type TaggedMapOutput. TaggedMapOutput is a data type for wrapping our records with a Text tag. It trivially implements a getTag() and a setTag(Text tag) method. It specifies an abstract method getData(). Our subclass will implement that method to handle the type of the record. There’s no explicit requirement for the subclass to have a setData() method but we must pass in the record data. The subclass can implement such a setData() method for the sake of symmetry or take in a record in the constructor. In addition, as the output of a mapper, TaggedMapOutput needs to be Writable. Therefore, our subclass has to implement the readFields() and write() methods. We createdTaggedWritable, a simple subclass for handling any Writable record type.
The generateInputTag() is called at the start of a map task to globally specify the tag for all the records this map task will process. The tag is defined to be of type Text. Note that we call the generateInputTag() with the filename of the records. The mapper working on the Customers file will receive the string "Customers" as the argument to generateInputTag(). As we’re using the tag to signify the data source, and our filename is set up to reflect the data source,generateInputTag() is:
We give the combine() method one combination of the cross product of the tagged records with the same join (group) key. This may sound complicated, but recall from the dataflow diagrams in figures 5.1 and 5.2 that the cross-product is simple for the canonical case of two data sources. Each combination will have either two records (meaning there’s at least one record in each data source with the join key) or one (meaning only one data source has that join key).
Let’s look at the signature of combine():
An array of tags and an array of values represent the combination. The size of those two arrays is guaranteed to be the same and equal to the number of tagged records in the combination. Furthermore, the tags are always in sorted order.
As tags correspond to the data sources, in the canonical case of joining two data sources, the tags array to combine() won’t be longer than two. Figure 5.2 showscombine() being called twice. For the left side, the tags and values arrays are like this:
DataJoinReducerBase, like any reducer, outputs key/value pairs. For each legal combination, the key is always the join key and the value is the output of combine(). Note that the join key is still present in each element of the values array. The combine() method should strip out the join key in those elements before concatenating them. Otherwise the join key will be shown multiple times in one output record.
* How-to: Include Third-Party Libraries in Your MapReduce Job