2014年12月24日 星期三

[ In Action ] Ch5. Advanced MapReduce: Creating a Bloom filter (Part5)

Preface 
If you use Hadoop for batch processing of large data sets, your data-intensive computing needs probably include transaction-style processing as well. We won’t cover all the techniques for running real-time distributed data processing (caching, sharding, etc.). They aren’t necessarily Hadoop-related and are well beyond the scope of this book. One lesser-known tool for real-time data processing is the Bloom filter, which is a summary of a data set whose usage makes other data processing techniques more efficient. When that data set is big, Hadoop is often called in to generate the Bloom filter representation. As we mentioned earlier, a Bloom filter is also sometimes used for data joining within Hadoop itself. As a data processing expert, you’ll be well rewarded to have the Bloom filter in your bag of tricks. 

What does a Bloom filter do? 
At its most basic, a Bloom filter object supports two methods: add() and contains(). These two methods work in a similar way as in the Java Set interface. The method add() adds an object to the set, and the method contains() returns a Boolean true/false value denoting whether an object is in the set or not. But, for a Bloom filter, contains() doesn’t always give an accurate answer. It has no false negatives. If contains() returns false, you can be sure that the set doesn’t have the object queried. It does have a small probability of false positives though. contains() can return true for some objects not in the set. The probability of false positives depends on the number of elements in the set and some configuration parameters of the Bloom filter itself. 

The major benefit of a Bloom filter is that its size, in number of bits, is constant and is set upon initialization. Adding more elements to a Bloom filter doesn’t increase its size. It only increases the false positive rate. A Bloom filter also has another configuration parameter to denote the number of hash functions it uses. We’ll discuss the reason for this parameter and how the hash functions are used later when we discuss the Bloom filter’s implementation. For now, its main implication is that it affects the false positive rate. The false positive rate is approximated by the equation
(1 – exp(-kn/m))k

where k is the number of hash functions used, m is the number of bits used to store the Bloom filter, and n is the number of elements to be added to the Bloom filter. In practice, m and n are determined by the requirement of the system, and therefore, k is chosen to minimize the false positive rate given m and n, which (after a little calculus) is: 
 

The false positive rate with the given k is 0.6185^(m/n), and k has to be an integer. The false positive rate will only be an approximation. From a design point of view, one should think in terms of (m/n), number of bits per element, rather than m alone. For example, we have to store a set containing ten million URLs (n=10,000,000). Allocating 8 bits per URL (m/n=8) will require a 10 MB Bloom filter (m = 80,000,000 bits). This Bloom filter will have a false positive rate of(0.6185)^8, or about 2 percent. If we were to implement the Set class by storing the raw URLs, and let’s say the average URL length was 100 bytes, we would have to use 1 GB. Bloom filter has shrunk the storage requirement by 2 orders of magnitude at the expense of only a 2 percent false positive rate! A slight increase in storage allocated to the Bloom filter will reduce the false positive rate further. At 10 bits per URL, the Bloom filter will take up 12.5 MB and have a false positive rate of only 0.8 percent. 

In summary, the signature of our Bloom filter class will look like the following: 
  1. class BloomFilter {  
  2.     public BloomFilter(int m, int k) { ... }  
  3.     public void add(E obj) { ... }  
  4.     public boolean contains(E obj) { ... }  
  5. }  
Implementing a Bloom filter 
Conceptually the implementation of a Bloom filter is quite straightforward. We describe its implementation in a single system first before implementing it using Hadoop in a distributed way. The internal representation of a Bloom filter is a bit array of size m. We have k independent hash functions, where each hash function takes an object as input and outputs an integer between 0 and m-1. We use the integer output as an index into the bit array. When we “add” an element to the Bloom filter, we use the hash functions to generate k indexes into the bit array. We set the k bits to 1. Figure 5.3 shows what happens when we add several objects (x, y, and z) over time, in a Bloom filter that uses three hash functions. Note that a bit will be set to 1 regardless of its previous state. The number of 1s in the bit array can only grow. 
 

When an object comes in and we want to check whether it has been previously added to the Bloom filter, we use the same k hash functions to generate the bit array indexes as we would do in adding the object. Now we check whether all those k bits in the bit array are 1s. If all k bits are 1, we return true and claim that the Bloom filter contains the object. Otherwise we return false. We see that if the object has in fact been added before, then the Bloom filter will necessarily return true. There are no false negatives (returning false when the object is truly in the set). The k bits corresponding to the queried object can all be set to 1 even though the object has never been added to the set. It may happen that adding other objects set those bits leading to false positives. 
  1. package ch5.bf;  
  2.   
  3. import java.util.BitSet;  
  4.   
  5. public class BloomFilter {  
  6.     private BitSet bf;  
  7.   
  8.     public void add(E obj) {  
  9.         int[] indexes = getHashIndexes(obj);  
  10.         for (int index : indexes) {  
  11.             bf.set(index);  
  12.         }  
  13.     }  
  14.   
  15.     public boolean contains(E obj) {  
  16.         int[] indexes = getHashIndexes(obj);  
  17.         for (int index : indexes) {  
  18.             if (bf.get(index) == false) {  
  19.                 return false;  
  20.             }  
  21.         }  
  22.         return true;  
  23.     }  
  24.       
  25.     protected int[] getHashIndexes(E obj) { return null; }  
  26. }  
To implement getHashIndexes() such that it works truly as k independent hash functions is nontrivial. Instead, in our Bloom filter implementation in listing 5.4, we use a hack to generate k indexes that are roughly independent and uniformly distributed. The getHashIndexes() method seeds the Java Random number generator with an MD5 hash of the object and then takes k “random” numbers as indexes. The Bloom filter class would benefit from a more rigorous implementation of getHashIndexes(), but our hack suffices for illustration purposes. 

An ingenious way of creating a Bloom filter for the union of two sets is by OR’ing the (bit array of the) Bloom filters of each individual set. As adding an object is setting certain bits in a bit array to 1, it’s easy to see why this union rule is true: 
  1. public void union(BloomFilter other) {  
  2.     bf.or(other.bf);  
  3. }  
We’ll be exploiting this union trick to build Bloom filters in a distributed fashion. Each mapper will build a Bloom filter based on its own data split. We’ll send the Bloom filters to a single reducer, which will take a union of them and record the final output. 

As the Bloom filter will be shuffled around as the mappers’ output, the BloomFilter class will have to implement the Writable interface, which consists of methodswrite() and readFields(). For our purpose these methods transform between the internal BitSet representation and a byte array such that the data can be serialized to DataInput/DataOutput. The final code is in listing 5.4. 
- Listing 5.4 Basic Bloom filter implementation 
  1. package ch5.bf;  
  2.   
  3. import java.io.DataInput;  
  4. import java.io.DataOutput;  
  5. import java.io.IOException;  
  6. import java.security.MessageDigest;  
  7. import java.security.NoSuchAlgorithmException;  
  8. import java.util.BitSet;  
  9. import java.util.Random;  
  10.   
  11. import org.apache.hadoop.io.Writable;  
  12.   
  13. public class BloomFilter implements Writable{  
  14.     private BitSet bf;  
  15.     private int bitArraySize = 100000000;  
  16.     private int numHashFunc = 6;  
  17.       
  18.     public BloomFilter() {  
  19.         bf = new BitSet(bitArraySize);  
  20.     }  
  21.   
  22.     public void add(E obj) {  
  23.         int[] indexes = getHashIndexes(obj);  
  24.         for (int index : indexes) {  
  25.             bf.set(index);  
  26.         }  
  27.     }  
  28.   
  29.     public boolean contains(E obj) {  
  30.         int[] indexes = getHashIndexes(obj);  
  31.         for (int index : indexes) {  
  32.             if (bf.get(index) == false) {  
  33.                 return false;  
  34.             }  
  35.         }  
  36.         return true;  
  37.     }  
  38.       
  39.     public void union(BloomFilter other) {  
  40.         bf.or(other.bf);  
  41.     }  
  42.       
  43.     protected int[] getHashIndexes(E obj) {  
  44.         int[] indexes = new int[numHashFunc];  
  45.         long seed = 0;  
  46.         byte[] digest;  
  47.         try   
  48.         {  
  49.             MessageDigest md = MessageDigest.getInstance("MD5");  
  50.             md.update(obj.toString().getBytes());  
  51.             digest = md.digest();  
  52.             for (int i = 0; i < 6; i++)   
  53.             {                 
  54.                 seed = seed ^ (((long) digest[i] & 0xFF)) << (8 * i);  
  55.             }  
  56.         }   
  57.         catch (NoSuchAlgorithmException e)   
  58.         {  
  59.             e.printStackTrace();  
  60.         }  
  61.         Random gen = new Random(seed);  
  62.         for (int i = 0; i < numHashFunc; i++) {  
  63.             indexes[i] = gen.nextInt(bitArraySize);  
  64.         }  
  65.         return indexes;  
  66.     }  
  67.       
  68.     @Override  
  69.     public void write(DataOutput out) throws IOException {  
  70.         int byteArraySize = (int) (bitArraySize / 8);  
  71.         byte[] byteArray = new byte[byteArraySize];  
  72.         for (int i = 0; i < byteArraySize; i++) {  
  73.             byte nextElement = 0;  
  74.             for (int j = 0; j < 8; j++) {  
  75.                 if (bf.get(8 * i + j)) {  
  76.                     nextElement |= 1 << j;  
  77.                 }  
  78.             }  
  79.             byteArray[i] = nextElement;  
  80.         }  
  81.         out.write(byteArray);  
  82.     }  
  83.       
  84.     @Override  
  85.     public void readFields(DataInput in) throws IOException {  
  86.         int byteArraySize = (int) (bitArraySize / 8);  
  87.         byte[] byteArray = new byte[byteArraySize];  
  88.         in.readFully(byteArray);  
  89.         for (int i = 0; i < byteArraySize; i++) {  
  90.             byte nextByte = byteArray[i];  
  91.             for (int j = 0; j < 8; j++) {  
  92.                 if (((int) nextByte & (1 << j)) != 0) {  
  93.                     bf.set(8 * i + j);  
  94.                 }  
  95.             }  
  96.         }  
  97.     }  
  98. }  
Next we’ll create the MapReduce program to make a Bloom filter using Hadoop. As we said earlier, each mapper will instantiate a BloomFilter object and add the key of each record in its split into its BloomFilter instance. (We’re using the key of the record to follow our data joining example.) We’ll create a union of theBloomFilter by collecting them into a single reducer. 

The driver for the MapReduce program is straightforward. Our mappers will output a key/value pair where the value is a BloomFilter instance. 
  1. job.setOutputValueClass(BloomFilter.class);  
The output key will not matter in terms of partitioning because we only have a single reducer. 
  1. job.setNumReduceTasks(1);  
We want our reducer to output the final BloomFilter as a binary file. Hadoop’s OutputFormats outputs either text files or assumes a key/value pair. Our reducer, therefore, won’t use Hadoop’s MapReduce output mechanism and instead we’ll write the result out to a file ourselves. 
  1. job.setOutputFormat(NullOutputFormat.class);  
Recall that our strategy for the mapper is to build a single Bloom filter on the entire split and output it at the end of the split to the reducer. Given that themap() method of the Map-Class has no state information about which record in the split it’s processing, we should output the BloomFilter in thecleanup() method to ensure that all the records in the split have been read. The MapClass looks like: 
  1. package ch5.bf;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.Mapper;  
  8.   
  9. public class MapClass extends Mapper>{  
  10.     BloomFilter bmf = new BloomFilter();   
  11.     IntWritable one = new IntWritable(1);  
  12.       
  13.     @Override  
  14.     public void map(Text key, Text value, Context context)  
  15.             throws IOException, InterruptedException   
  16.     {  
  17.         // Add object into bloom filter  
  18.     }  
  19.       
  20.     @Override  
  21.     protected void cleanup(Context context)  
  22.     {  
  23.         try  
  24.         {  
  25.             context.write(one, bmf);  
  26.         }  
  27.         catch(Exception e){e.printStackTrace();}  
  28.     }  
  29. }  
The BloomFilters generated by all the mappers are sent to a single reducer. The reduce() method in the Reduce class will do a Bloom filter union of all of them. 
- Reducer 
  1. package ch5.bf;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.fs.FSDataOutputStream;  
  6. import org.apache.hadoop.fs.Path;  
  7. import org.apache.hadoop.io.IntWritable;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapreduce.Reducer;  
  10.   
  11. public class Reduce extends Reducer, Text, Text> {  
  12.     BloomFilter bf = new BloomFilter();  
  13.       
  14.     @Override  
  15.     public void reduce(IntWritable key, Iterable> values, Context context)  
  16.             throws IOException, InterruptedException   
  17.     {  
  18.         for(BloomFilter b:values) bf.union(b);  
  19.     }  
  20.       
  21.     @Override  
  22.     protected void cleanup(Context context)   
  23.     {  
  24.         try  
  25.         {  
  26.             Path file = new Path(context.getConfiguration().get("mapred.output.dir") + "/bloomfilter");  
  27.             FSDataOutputStream out = file.getFileSystem(context.getConfiguration()).create(file);  
  28.             bf.write(out);  
  29.             out.close();  
  30.         }  
  31.         catch(Exception e){e.printStackTrace();}  
  32.     }  
  33. }  
As we mentioned earlier, we want the final BloomFilter to be written out in a file of our own format rather than one of Hadoop’s OutputFormats. We had already set the reducer’s OutputFormat to NullOutputFormat in the driver to turn off that output mechanism. The rest of the cleanup() method in Reducer will use Hadoop’s file I/O to write out our BloomFilter in binary to a file in HDFS . The complete code is in listing 5.5. 
- Listing 5.5 A MapReduce program (Driver class) 
  1. package ch5.bf;  
  2.   
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.conf.Configured;  
  5. import org.apache.hadoop.fs.Path;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.Job;  
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  9. import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;  
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  11. import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;  
  12. import org.apache.hadoop.util.Tool;  
  13. import org.apache.hadoop.util.ToolRunner;  
  14.   
  15.   
  16. public class BloomFilterMR extends Configured implements Tool{  
  17.   
  18.     @Override  
  19.     public int run(String[] args) throws Exception {  
  20.         Job job = new Job(getConf());         
  21.         Path in = new Path(args[0]);  
  22.         Path out = new Path(args[1]);  
  23.           
  24.         FileInputFormat.setInputPaths(job, in);  
  25.         FileOutputFormat.setOutputPath(job, out);  
  26.         job.setJarByClass(BloomFilterMR.class);  
  27.         job.setJobName("Bloom Filter");  
  28.         job.setMapperClass(MapClass.class);  
  29.         job.setReducerClass(Reduce.class);  
  30.         job.setNumReduceTasks(1);  
  31.           
  32.         job.setInputFormatClass(KeyValueTextInputFormat.class);  
  33.         job.setOutputFormatClass(NullOutputFormat.class);  
  34.         job.setOutputKeyClass(Text.class);  
  35.         job.setOutputValueClass(BloomFilter.class);  
  36.         job.getConfiguration().set("key.value.separator.in.input.line"",");  
  37.   
  38.         boolean success = job.waitForCompletion(true);    
  39.         return(success ? 0 : 1);   
  40.     }  
  41.   
  42.     public static void main(String[] args) throws Exception {  
  43.         int res = ToolRunner.run(new Configuration(), new BloomFilterMR(), args);  
  44.         System.exit(res);  
  45.     }  
  46. }  
Bloom filter in Hadoop version 0.20+ 
Hadoop version 0.20 has a Bloom filter class in it. It plays a support role to some of the new classes introduced in version 0.20, and it will likely stay around for future versions as well. It functions much like our BloomFilter class in listing 5.4, although it’s much more rigorous in its implementation of the hashing functions. As a built-in class, it can be a good choice for semijoin within Hadoop. But it’s not easy to separate this class from the Hadoop framework to use it as a standalone class. If you’re building a Bloom filter for non-Hadoop applications, Hadoop’s built-in BloomFilter may not be appropriate. 

Supplement 
Chaining MapReduce jobs (Part1) 
Joining data from different sources (Part2) 
Replicated joins using DistributedCache (Part3) 
reduce-side join with map-side filtering (Part4) 

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...