2011年10月20日 星期四

[ Java Crawler ] 設計爬蟲佇列

前言 :
透過 之前的介紹 可以看出, 網路爬蟲最關鍵的資料結構是 URL 佇列 - 通常我們稱之為爬蟲佇列. 之前一直使用記憶體來存放佇列的資料, 例如鏈結串列或是佇列來實現 URL 佇列. 但是網路中我們需要抓取的連結成千上萬, 在一些大型搜尋引擎中 (例如百度與Google), 大概都有幾十億的 URL 處理量. 因此記憶體並不使用於大型爬蟲. 可能的一種方法就是使用資料庫的方案. 這裡將介紹一種非常流行的資料庫 - Berkeley DB, 最後介紹一個成熟的開放原代碼爬蟲軟體 - Heritrix 是如何實現爬蟲佇列的.

爬蟲佇列 :
爬蟲佇列是用來儲存 URL 的資料結構. 在大型爬蟲應用中, 建構通用的, 可以伸縮的爬蟲佇列非常重要. 數以十億的 URL 位址使用記憶體的鏈結串列或者佇列來儲存顯然不夠, 因此需要一種資料結構而該結構具有以下幾個特點 : 

* 能儲存巨量資料, 當資料超過記憶體限制時, 能夠把它固定在硬碟上.
* 存取資料速度非常快.
* 能夠支援多執行緒存取 (多執行緒技術能大幅提升爬蟲的性能.)

結合上面 3 點對儲存速度的要求, 使 Hash 成為儲存結構的不二選擇. 通常在進行 Hash 儲存時候, key 值都選取 URL 字串, 但是為了更節省空間, 通常會對 URL 進行壓縮. 常用的壓縮演算法是 MD5 壓縮演算法. 而在 Java 中, java.security.MessageDigest 中已經定義了 MD5 的計算, 只需要簡單地呼叫即可得到 MD5 的 128 位元整數. 然後將此 128 位元 (16位元組) 轉換成 16 進位表示即可. 下面是一段 Java 實現 MD5 壓縮的程式碼 :
- MD5 壓縮演算法程式碼 :
  1. package john.test;  
  2.   
  3. public class MD5 {  
  4.     public static String getMD5(byte[] source)  
  5.     {  
  6.         String s = null;  
  7.         char hexDigits[] = {'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'}; // 用來將位元組傳換成十六進位表示的字元.  
  8.         try  
  9.         {  
  10.             java.security.MessageDigest md = java.security.MessageDigest.getInstance("MD5");  
  11.             md.update(source);  
  12.             byte tmp[] = md.digest();  
  13.             char str[] = new char[16*2]; // one byte consist of two hex.  
  14.             int k = 0;  
  15.               
  16.             for(int i=0; i<16; i++)  
  17.             {  
  18.                 byte b = tmp[i];  
  19.                 str[k++] = hexDigits[b>>>4 & 0xf]; // 取高位元組並進行邏輯右移, 並取得高位元組的 Hex string  
  20.                 str[k++] = hexDigits[b & 0xf]; // 取低位元組的 Hex string               
  21.             }  
  22.             s = new String(str);  
  23.         }   
  24.         catch(Exception e)  
  25.         {  
  26.             e.printStackTrace();  
  27.         }  
  28.         return s;  
  29.     }  
  30. }  

Hash 儲存的 Value 值通常會對 URL 和相關的資訊進行封裝, 封裝成為一個物件進行儲存. 綜合前面的分析, 選擇一個可以執行緒安全, 使用 Hash 儲存並且能夠面對巨量資料的資料庫方案是儲存 URL 最合適的資料結構. 因此由 Oracle 公司開發的資料庫產品 Berkeley DB 就成了下一節介紹的主題.

使用 Berkeley DB 建構爬蟲佇列 :
Berkeley DB 是一個嵌入式資料庫, 它適用於管理巨量, 簡單的資料. 例如 Google 用 Berkeley DB HA (High Availability) 來管理它們的帳戶資訊. Motorola 在它的無線產品用 Berkeley DB 追蹤移動單元. HP, Microsoft 等也都是它的大客戶. Berkeley DB 不能完全取代關聯式資料庫, 但在某些方面它卻令關聯式資料庫望塵莫及.

關鍵字/資料 (key/value) 是 Berkeley DB 用來進行資料庫管理的基礎. 每個 key/value 對構成一條紀錄. 而整個資料庫實際上就是由許多這樣的結構單元所構成. 透過這種方式, 開發人員在使用 Berkeley DB 提供的 API 存取資料庫時, 只需要提供關鍵字就能夠存取到相應的資料. 當然也可以提供 key 與部分 Data 來查詢符合條件的相近資料.

Berkeley DB 底層採用二元數, 可以看成能儲存大量資料的 HashMap. Berkeley DB 簡稱 BDB. C++ 的版本首先出現, 然後在此基礎上又實現的 Java 本機版本. Berkeley DB 是透過環境物件 EnvironmentConfig 來對資料庫進行管理, 每個 EnvironmentConfig 物件可以管理多個資料庫. 新增一個 EnvironmentConfig 的程式碼如下 :
  1. File envDir = new File("dbwrkdir");  
  2. EnvironmentConfig envConfig = new EnvironmentConfig();  
  3. envConfig.setTranctional(false);  
  4. envConfig.setAllowCreate(true);  
  5. Environment exampleEnv = new Environment(envDir, envConfig);  
其中 envDir 是使用者指定的一個目錄. 只要是同一個 Environment 指定的資料庫的資料檔案與紀錄檔案, 都會放在這個目錄下. Environment 也是一種資源, 使用完畢後需要關閉 :
  1. exampleEnv.sync();  
  2. exampleEnv.close();  
  3. exampleEnv = null;  
建立好環境後, 就可以開始用它建立資料庫了. 用 Berkeley DB 建立資料庫時, 需要指定資料庫的屬性, 就好比在 Oracle 中建立資料庫要指定 java_pool, buffer_size 等屬性一樣. 在 Berkeley DB 使用 DatabaseConfig 來管理一個實際的 Database :
  1. String databaseName = "ToDoTaskList.db";  
  2. DatabaseConfig dbConfig = new DatabaseConfig();  
  3. dbConfig.setAllowCreate(true);  
  4. dbConfig.setTransactional(false);  
  5.   
  6. // 開啟用來儲存資訊的資料庫  
  7. // 用來儲存資訊的資料庫不允許儲存重複的關鍵字  
  8. dbConfig.setSortedDuplicates(false);  
  9. Database myClassDb = exampleEnv.openDatabase(null"classDb", dbConfig);  
  10. // 初始化用來儲存序列化物件的 catalog 類別  
  11. catalog = new StoredClassCatalog(myClassDb);  
  12. TupleBinding keyBinding = TupleBinding.getPrimitiveBinding(String.class);  
  13. // 把 Value 作為物件的序列化方式儲存  
  14. SerialBinding valueBinding = new SerialBinding(catalog, NewsSource.class);  
  15. store = exampleEnv.openDatabase(null, databaseName, dbConfig);  
當資料庫建立起來後, 就要確定資料庫裡儲存的資料類型 (也就是確定 key 和 value 的值). Berkeley DB 資料類型是使用 EntryBinding 物件來確定 :
  1. EntryBinding keyBinding = new SerialBinding(javaCatalog, String.class);  
其中 SerialBinding 表示這個物件能夠序列化到磁碟上, 因此建構函數的第二個參數一定要是實現了序列化界面的物件. 最後我們來建立一個以 Berkeley DB 為底層的資料結構的 Map :
  1. // 建立資料儲存的映象檢視  
  2. this.map = new StoredStoredMap(store, keyBinding, valueBinding, true);   
使用 Berkeley DB 建立爬蟲佇列範例 :
上一節我們說明了 Berkeley DB 的基礎知識, 這一節要講述如何使用 Berkeley DB 連建立一個完整的爬蟲佇列. 首先 Berkeley DB 儲存是一個 key/value 的結構, 並且 key 和 value 物件都要實現 Java 序列化界面. 因此我們先來建構 value 物件, 即一個封裝很多重要屬性的 URL 類別 :
- CrawlUrl.java : Berkeley DB 中儲存的 Value 類別
  1. package john.crawler.entity;  
  2.   
  3. import java.io.Serializable;  
  4. import java.util.Date;  
  5.   
  6. import com.sleepycat.je.utilint.Timestamp;  
  7.   
  8. public class CrawlUrl implements Serializable{  
  9.     private static final long serialVersionUID = -9211654572201720122L;  
  10.       
  11.     private String                  oriUrl; // 原始 URL 的值, 主機部分是功能變數名稱  
  12.     private String                  url; // URL 的值, 主機部分是 IP, 為防止重複主機的出現.  
  13.     private int                     urlNo; // URL NUM  
  14.     private int                     statusCode; // URL 傳回結果碼  
  15.     private int                     hitNum; // 此 URL 被其他文章參考次數  
  16.     private String                  charSet; // 使用 chartset  
  17.     private String                  abstractText; // 文章摘要  
  18.     private String                  author; // 作者  
  19.     private int                     weight; // 文章權重  
  20.     private String                  description; // 文章描述  
  21.     private int                     fileSize; // 文章大小  
  22.     private Timestamp               lastUpdateTime; // 最後修改時間  
  23.     private Date                    timeToLive; // 過期時間  
  24.     private String                  title; // 文章名稱  
  25.     private String                  type; // 文章類型  
  26.     private String[]                urlReferences; // 參考連結  
  27.     private int                     layer; // 爬取的階層, 從種子開始依序為第0層, 第1層 ...  
  28.       
  29.     public CrawlUrl(){}  
  30.       
  31.     // Other Getter/Setter  
  32. }  

接著我們寫一個 TODO 表的介面 :
- Frontier.java :
  1. package john.crawler.entity;  
  2.   
  3. public interface Frontier {  
  4.     public CrawlUrl getNext() throws Exception;  
  5.     public boolean putUrl(CrawlUrl url) throws Exception;  
  6.     //public boolean visited(CrawlUrl url);  
  7. }  

使用一個抽象類別來封裝對 Berkeley DB 的操作 :
- AbstractFrontier.java :
  1. package john.crawler.bdb;  
  2.   
  3. import java.io.File;  
  4. import java.io.FileNotFoundException;  
  5.   
  6. import com.sleepycat.bind.serial.StoredClassCatalog;  
  7. import com.sleepycat.je.Database;  
  8. import com.sleepycat.je.DatabaseConfig;  
  9. import com.sleepycat.je.DatabaseException;  
  10. import com.sleepycat.je.Environment;  
  11. import com.sleepycat.je.EnvironmentConfig;  
  12.   
  13. public abstract class AbstractFrontier {  
  14.     private Environment env;  
  15.     private static final String CLASS_CATALOG = "java_class_catalog";  
  16.     protected StoredClassCatalog javaCatalog;  
  17.     protected Database catalogDatabase;  
  18.     protected Database database;  
  19.       
  20.     public AbstractFrontier(String homeDirectory) throws DatabaseException, FileNotFoundException  
  21.     {  
  22.         // 開啟 env  
  23.         System.out.printf("\t[AbstractFrontier] Open Environment in: %s", homeDirectory);  
  24.         EnvironmentConfig envConfig = new EnvironmentConfig();  
  25.         envConfig.setTransactional(true);  
  26.         envConfig.setAllowCreate(true);  
  27.         env = new Environment(new File(homeDirectory), envConfig);  
  28.         // 設定 DatabaseConfig  
  29.         DatabaseConfig dbConfig = new DatabaseConfig();  
  30.         dbConfig.setTransactional(true);  
  31.         dbConfig.setAllowCreate(true);  
  32.         // 開啟  
  33.         catalogDatabase = env.openDatabase(null, CLASS_CATALOG, dbConfig);  
  34.         javaCatalog = new StoredClassCatalog(catalogDatabase);  
  35.         // 設定 DatabaseConfig  
  36.         DatabaseConfig dbConfig0 = new DatabaseConfig();  
  37.         dbConfig0.setTransactional(true);  
  38.         dbConfig0.setAllowCreate(true);  
  39.         // 開啟  
  40.         database = env.openDatabase(null"URL", dbConfig0);  
  41.     }  
  42.       
  43.     // 關閉資料庫, 關閉環境  
  44.     public void close() throws DatabaseException  
  45.     {  
  46.         database.close();  
  47.         javaCatalog.close();  
  48.         env.close();  
  49.     }  
  50.   
  51.     // put 方法  
  52.     protected abstract void put(Object key, Object value);  
  53.     // get 方法  
  54.     protected abstract Object get(Object key);  
  55.     // delete 方法  
  56.     protected abstract Object delete(Object key);  
  57. }  

實現真正的 TODO 表 :
- BDBFrontier.java :
  1. package john.crawler.bdb;  
  2.   
  3. import java.io.FileNotFoundException;  
  4. import java.util.Map.Entry;  
  5. import java.util.Set;  
  6.   
  7. import john.crawler.Frontier;  
  8. import john.crawler.entity.CrawlUrl;  
  9.   
  10. import com.sleepycat.bind.EntryBinding;  
  11. import com.sleepycat.bind.serial.SerialBinding;  
  12. import com.sleepycat.collections.StoredMap;  
  13. import com.sleepycat.je.DatabaseException;  
  14.   
  15. public class BDBFrontier extends AbstractFrontier implements Frontier{  
  16.     private StoredMap pendingUrlDB = null;  
  17.       
  18.     public BDBFrontier(String homeDirectory) throws DatabaseException,  
  19.             FileNotFoundException {  
  20.         super(homeDirectory);  
  21.         EntryBinding keyBinding = new SerialBinding(javaCatalog, String.class);  
  22.         EntryBinding valueBinding = new SerialBinding(javaCatalog, CrawlUrl.class);  
  23.         pendingUrlDB = new StoredMap(database, keyBinding, valueBinding, true);  
  24.     }  
  25.   
  26.     @Override  
  27.     protected void put(Object key, Object value) {  
  28.         pendingUrlDB.put(key, value);         
  29.     }  
  30.   
  31.     @Override  
  32.     protected Object get(Object key) {  
  33.         return pendingUrlDB.get(key);  
  34.     }  
  35.   
  36.     @Override  
  37.     protected Object delete(Object key) {  
  38.         return pendingUrlDB.remove(key);  
  39.     }  
  40.   
  41.     @Override  
  42.     public CrawlUrl getNext() throws Exception {  
  43.         CrawlUrl result = null;  
  44.         if(!pendingUrlDB.isEmpty())  
  45.         {  
  46.             Set entrys = pendingUrlDB.entrySet();  
  47.             System.out.printf("\t[BDBFrontier] getNext()->%s\n", entrys);  
  48.             Entry entry = (Entry)pendingUrlDB.entrySet().iterator().next();  
  49.             result = entry.getValue();  
  50.             delete(entry.getKey());  
  51.         }  
  52.         return result;  
  53.     }  
  54.   
  55.     @Override  
  56.     public boolean putUrl(CrawlUrl url) throws Exception {  
  57.         put(url.getOriUrl(), url);  
  58.         return true;  
  59.     }  
  60.   
  61.     // 根據 URL 計算鍵值, 可以使用各種壓縮演算法, 如 MD5 etc.  
  62.     private String caculateUrl(String url){return url;}  
  63.       
  64.     // 測試函數  
  65.     public static void main(String args[])  
  66.     {  
  67.         BDBFrontier dbFrontier = null;  
  68.         try  
  69.         {  
  70.             dbFrontier = new BDBFrontier("dbwdir");  
  71.             CrawlUrl url = new CrawlUrl();  
  72.             url.setOriUrl("http://puremonkey2010.blogspot.com/");  
  73.             dbFrontier.putUrl(url);  
  74.             System.out.println(((CrawlUrl)dbFrontier.getNext()).getOriUrl());  
  75.               
  76.         }  
  77.         catch(Exception e)  
  78.         {  
  79.             e.printStackTrace();  
  80.         }  
  81.         finally  
  82.         {  
  83.             if(dbFrontier!=null)dbFrontier.close();  
  84.         }  
  85.     }  
  86. }  

以上就是一個使用 Berkeley DB 來實現 TODO 表的完整範例!

使用布隆過濾器建構 Visited 表 :
上一節內容介紹了如何實現 TODO 表, 這一節要來探討如何在一個企業級的搜尋引擎中實現 Visited 表. 在企業級搜尋引擎中, 常用一個稱為 布隆過濾器(Bloom Filter) 的演算法來實現對已經抓取過的 URL 進行過濾. 首先來介紹什麼叫布隆過濾器演算法.

在日常生活中, 包括在設計電腦軟體時, 經常要判斷一個元素是否在一個集合中, 例如在文字處理軟體中, 需要檢查一個英文單字是否拼字正確 (也就是要判斷它是否出現在字典裡); 在網路爬蟲裡, 我們需要知道一個網址是否曾經被存取過等. 最直接的辦法就是將集合中全部的元素存在電腦中, 遇到一個需要辨別元素時, 將它與集合中全部的元素直接比對即可. 一般來說電腦中的集合是用 雜湊表 (Hash Table) 來儲存. 它的好處是快速而準確, 缺點是費儲存空間. 當集合比較小時, 這個問題可以忽略, 但是集合巨大時, 雜湊表儲存效率低的問題就顯現出來了.

比如說一個像 Yahoo, Hotmail 和 Gmail 電子郵件供應商, 總是需要過濾來自發送垃圾郵件人(Spamer)的垃圾郵件. 一個辦法就是記錄下那些垃圾郵件的 E-mail 地址. 由於那些發送者不停註冊新的位址, 全世界少說也有幾十億的垃圾郵件地址, 將它們全部存取來則需要大量記憶體的服務器. 如果用雜湊表, 每儲存一億個 E-mail 位址就需要 1.6GB 的記憶體 (用雜湊表實現是將每一個 E-mail 位置對應成一個八位元組的資訊指紋, 然後將該資訊存入雜湊表, 由於雜湊表的儲存效率一般只有 50%, 因此一個 E-mail 位址需要占用 16 個位元組. 一億個位址大約要 1.6GB). 因此儲存幾十億的郵件地址可能需要上百 GB 的記憶體!

一種稱作布隆過濾器的數學工具, 它只需要雜湊表的1/8 到 1/4 的大小就能解決同樣的問題. 布隆過濾器是由巴頓.布隆於 1970 年提出的. 它實際是一個很長的二進位向量和一系列隨機映射函數. 我們透過上面的例子來說明其工作原理.

假設儲存一億個電子郵件地址, 先建立一個 16 億二進位常數, 即200MB向量, 然後將這16億個二進位位元全部設定為零. 對於每一個電子郵件地址 X, 用 8 個不同的亂數產生器 (F1,F2, ... , F8) 產生 8 個資訊指紋 (f1, f2, ..., f8). 再用一個亂數產生器 G 將這八個資訊指紋映射到 1 到 16 億中的 8 個自然數 g1,g2,...g8. 現在我們把這 8 個位置的二進位位元全部設為1. 當我們對這一億個 E-mail 位置都進行這樣的處理後, 一個針對這些 E-mail 位址的布隆過濾器就建成了, 如下圖所示 :


現在我們來看看布隆過濾器是如何檢測一個可以的電子郵件位址 Y 是否存在於黑名單中. 我們用 8 個亂數產生器 (F1,F2,...F8) 對這個位址產生 8 個資訊指紋 S1, S2, ... S8. 然後將這 8 個指紋對應到布隆過濾器的 8 個二進位位元, 分別是 T1,T2,...,T8. 如果 Y 在黑名單中, 顯然 T1,T2,...,T8 定位的八個二進位一定是1. 這樣在遇到任何黑名單的電子郵件地址時, 我們都能準確地發現.

布隆過濾器絕對不會漏掉任何一個在黑名單中可疑位址. 但是它有一條不足之處, 也就是它有極小可能將一個不在黑名單中的電子郵件地址誤判在黑名單中, 因為剛好有可能某個好的電子郵件正巧對應 8 個都被設定為 1 的二進位位元. 好在這種可能性很小. 我們把它稱為誤識機率. 在上面的例子中誤識機率大概在萬分之一以下. 常見的補救辦法是建立一個小的白名單, 儲存那些可能誤判的郵件地址. 下面是一個布隆過濾器 (Bloom Filter) 的實現 :
- BitSet.java : 位元操作類別
  1. package john.crawler;  
  2.   
  3. public class BitSet {  
  4.     private boolean bits[];  
  5.       
  6.     public BitSet(int len){bits = new boolean[len];}  
  7.       
  8.     public void set(int bit, boolean v){bits[bit]=v;}  
  9.     public boolean get(int bit){return bits[bit];}  
  10. }  

- SimpleBloomFilter.java : 簡易版 Bloom Filter 實現
  1. package john.crawler;  
  2.   
  3. import john.crawler.entity.CrawlUrl;  
  4.   
  5. public class SimpleBloomFilter {  
  6.     private static final int DEFAULT_SIZE = 2 << 24;  // 2^25=32M  
  7.     private static final int[] seeds = new int[]{7,11,13,31,37,61,79,88};  
  8.     private BitSet bits = new BitSet(DEFAULT_SIZE);  
  9.     private SimpleHash[] func = new SimpleHash[seeds.length];  
  10.       
  11.     public SimpleBloomFilter()  
  12.     {  
  13.         for(int i=0; inew SimpleHash(DEFAULT_SIZE, seeds[i]);  
  14.     }  
  15.       
  16.     public void add(CrawlUrl value)  
  17.     {  
  18.         if(value!=null)  
  19.         {  
  20.             add(value.getOriUrl());  
  21.         }  
  22.     }  
  23.       
  24.     public void add(String value)  
  25.     {  
  26.         for(SimpleHash f:func) bits.set(f.hash(value), true);  
  27.     }  
  28.       
  29.     public boolean contains(CrawlUrl value)  
  30.     {  
  31.         return contains(value.getOriUrl());  
  32.     }  
  33.       
  34.     public boolean contains(String value)  
  35.     {  
  36.         if(value==nullreturn false;  
  37.         boolean ret = true;  
  38.         for(SimpleHash f:func)   
  39.         {  
  40.             ret = ret && bits.get(f.hash(value));  
  41.             if(!ret) break;  
  42.         }  
  43.         return ret;  
  44.     }  
  45.       
  46.     public static void main(String args[])  
  47.     {  
  48.         String value = "john@yahoo.com.tw";  
  49.         String value1 = "peter@yahoo.com.tw";  
  50.         SimpleBloomFilter filter = new SimpleBloomFilter();  
  51.         System.out.println(filter.contains(value));  
  52.         filter.add(value);  
  53.         System.out.println(filter.contains(value));  
  54.         System.out.println(filter.contains(value1));  
  55.     }  
  56.       
  57.     public class SimpleHash{  
  58.         private int cap;  
  59.         private int seed;  
  60.           
  61.         public SimpleHash(int cap, int seed)  
  62.         {  
  63.             this.cap = cap;  
  64.             this.seed = seed;  
  65.         }  
  66.           
  67.         public int hash(String value)  
  68.         {  
  69.             int result = 0;  
  70.             int len = value.length();  
  71.             for(int i=0; i
  72.             {  
  73.                 result = seed * result+value.charAt(i);  
  74.             }  
  75.             return (cap-1) & result;  
  76.         }  
  77.     }  
  78. }  

如果想知道需要使用多少位元才能降低誤判機率, 可以從下表所示的儲存項目和位元數比率估計布隆過濾器的誤判率 :


為每個 URL 分配兩個位元組就可以達到千分之幾的誤判. 比較保守的實現是, 為每個 URL 分配 4 個位元組也就是 1:32, 誤判率為 0.00000021167340. 對於 5000 萬數量級的 URL, 布隆過濾器只占 200MB 的空間, 並且建立 Bloom Filter 速度超快, 一次下來不到兩分鐘.

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...