2018年10月27日 星期六

[ Java 文章收集 ] Kafka 設計解析之 Kafka 背景及架構介紹

Source from here 
摘要 
Kafka 是由 LinkedIn 開發並開源的分布式消息系統,因其分布式及高吞吐率而被廣泛使用,現已與 Cloudera Hadoop,Apache Storm,Apache Spark 集成。本文介紹了 Kafka 的 創建背景,設計目標,使用消息系統的優勢以及目前流行的消息系統對比。並介紹了 Kafka 的架構,Producer 消息路由,Consumer Group 以及由其實現的不同消息分發方式,Topic & Partition,最後介紹了 Kafka Consumer 為何使用 pull 模式以及 Kafka 提供的三種 delivery guarantee。 

背景介紹 

Kafka 創建背景 
Kafka 是一個消息系統,原本開發自 LinkedIn,用作 LinkedIn 的活動流 (Activity Stream) 和運營數據處理管道 (Pipeline) 的基礎。現在它已被多家不同類型的公司 作為多種類型的數據管道和消息系統使用。 

活動流數據是幾乎所有站點在對其網站使用情況做報表時都要用到的數據中最常規的部分。活動數據包括頁面訪問量 (Page View)、被查看內容方面的信息以及搜索情況等內容。這種數據通常的處理方式是先把各種活動以日誌的形式寫入某種文件,然後周期性地對這些文件進行統計分析。運營數據指的是伺服器的性能數據 (CPU、IO使用率、請求時間、服務日誌等等數據)。運營數據的統計方法種類繁多。 

近年來,活動和運營數據處理已經成為了網站軟體產品特性中一個至關重要的組成部分,這就需要一套稍微更加複雜的基礎設施對其提供支持。 

Kafka 簡介 
Kafka是一種分布式的,基於發布/訂閱的消息系統。主要設計目標如下: 
* 以時間複雜度為 O(1) 的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間複雜度的訪問性能
* 高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒 100K 條以上消息的傳輸
* 支持 Kafka Server 間的消息分區,及分布式消費,同時保證每個 Partition 內的消息順序傳輸
* 同時支持離線數據處理和實時數據處理
* Scale out:支持在線水平擴展

為何使用消息系統 

解耦 
在項目啟動之初來預測將來項目會碰到什麼需求,是極其困難的。消息系統在處理過程中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。 

冗餘 
有些情況下,處理數據的過程會失敗。除非數據被持久化,否則將造成丟失。消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所採用的」插入-獲取-刪除」範式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。 

擴展性 
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調節參數。擴展就像調大電力按鈕一樣簡單。 

靈活性 & 峰值處理能力 
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見;如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。 

可恢復性 
系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復後被處理。 

順序保證 
在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。Kafka 保證一個 Partition 內的消息的有序性。 

緩衝 
在任何重要的系統中,都會有需要不同的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列通過一個緩衝層來幫助任務最高效率的執行———寫入隊列的處理會儘可能的快速。該緩衝有助於控制和優化數據流經過系統的速度。 

異步通信 
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然後在需要的時候再去處理它們。 

常用 Message Queue 對比 

RabbitMQ 
RabbitMQ 是使用 Erlang 編寫的一個開源的消息隊列,本身支持很多的協議:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量級,更適合於企業級的開發。同時實現了 Broker 構架,這意味著消息在發送給客戶端時先在中心隊列排隊。對路由,負載均衡或者數據持久化都有很好的支持。 

Redis 
Redis 是一個基於 Key-Value 對的 NoSQL 資料庫,開發維護很活躍。雖然它是一個 Key-Value 資料庫存儲系統,但它本身支持 MQ 功能,所以完全可以當做一個輕量級的隊列服務來使用。對於 RabbitMQ 和 Redis 的入隊和出隊操作,各執行100萬次,每10萬次記錄一次執行時間。測試數據分為128Bytes、512Bytes、1K和10K四個不同大小的數據。實驗表明:入隊時,當數據比較小時 Redis 的性能要高於 RabbitMQ,而如果數據大小超過了10K,Redis 則慢的無法忍受;出隊時,無論數據大小,Redis 都表現出非常好的性能,而 RabbitMQ 的出隊性能則遠低於 Redis。 

ZeroMQ 
ZeroMQ 號稱最快的消息隊列系統,尤其針對大吞吐量的需求場景。ZMQ 能夠實現 RabbitMQ 不擅長的高級/複雜的隊列,但是開發人員需要自己組合多種技術框架,技術上的複雜度是對這 MQ 能夠應用成功的挑戰。ZeroMQ 具有一個獨特的非中間件的模式,你不需要安裝和運行一個消息伺服器或中間件,因為你的應用程式將扮演這個伺服器角色。你只需要簡單的引用 ZeroMQ 程序庫,可以使用 NuGet 安裝,然後你就可以愉快的在應用程式之間發送消息了。但是 ZeroMQ 僅提供非持久性的隊列,也就是說如果宕機,數據將會丟失。其中,Twitter 的 Storm 0.9.0 以前的版本中默認使用 ZeroMQ 作為數據流的傳輸 (Storm 從 0.9 版本開始同時支持 ZeroMQ 和 Netty 作為傳輸模塊)。 

ActiveMQ 
ActiveMQ 是 Apache 下的一個子項目。 類似於 ZeroMQ,它能夠以代理人和點對點的技術實現隊列。同時類似於 RabbitMQ,它少量代碼就可以高效地實現高級應用場景。 

Kafka/Jafka 
Kafka 是 Apache下的一個子項目,是一個高性能跨語言分布式發布/訂閱消息隊列系統,而 Jafka 是在 Kafka 之上孵化而來的,即 Kafka 的一個升級版。具有以下特性:快速持久化,可以在 O(1) 的系統開銷下進行消息持久化; 高吞吐,在一台普通的伺服器上既可以達到 10W/s 的吞吐速率;完全的分布式系統,Broker、Producer、Consumer 都原生自動支持分布式,自動實現負載均衡;支持 Hadoop 數據並行加載,對於像 Hadoop 的一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka 通過 Hadoop 的並行加載機制統一了在線和離線的消息處理。Apache Kafka 相對於 ActiveMQ 是一個非常輕量級的消息系統,除了性能非常好之外,還是一個工作良好的分布式系統。 

Kafka 架構 

Terminology 

Broker 
Kafka集群包含一個或多個伺服器,這種伺服器被稱為 broker 

Topic 
每條發布到 Kafka 集群的消息都有一個類別,這個類別被稱為 Topic。(物理上不同 Topic 的消息分開存儲,邏輯上一個 Topic 的消息雖然保存於一個或多個 broker 上但用戶只需指定消息的 Topic 即可生產或消費數據而不必關心數據存於何處
 

Partition 
Parition 是物理上的概念,每個 Topic 包含一個或多個 Partition. 

Producer 
負責發布消息到 Kafka broker. 


Consumer 
消息消費者,向 Kafka broker 讀取消息的客戶端。 

Consumer Group 
每個 Consumer 屬於一個特定的 Consumer Group (可為每個 Consumer 指定 group name,若不指定 group name 則屬於默認的 group)。 

Kafka 拓撲結構 
 
(More

如上圖所示,一個典型的 Kafka 集群中包含若干 Producer (可以是 web 前端產生的 Page View,或者是伺服器日誌,系統 CPU、Memory 等),若干 broker (Kafka 支持水平擴展,一般 broker 數量越多,集群吞吐率越高),若干 Consumer Group,以及一個 Zookeeper 集群。Kafka 通過 Zookeeper 管理集群配置,選舉 leader,以及在 Consumer Group 發生變化時進行 rebalance。Producer 使用 push 模式將消息發布到 broker,Consumer 使用 pull 模式從 broker 訂閱並消費消息。 

Topic & Partition 
Topic 在邏輯上可以被認為是一個 queue,每條消費都必須指定它的 Topic,可以簡單理解為必須指明把這條消息放進哪個 queue 里為了使得 Kafka 的吞吐率可以線性提高,物理上把 Topic 分成一個或多個 Partition,每個 Partition 在物理上對應一個文件夾,該文件夾下存儲這個 Partition 的所有消息和索引文件。若創建 topic1 和 topic2 兩個 topic,且分別有 13 個和 19 個分區,則整個集群上會相應會生成共 32 個文件夾 (本文所用集群共8個節點,此處 topic1 和 topic2 replication-factor 均為1). 

每個日誌文件都是一個 log entrie 序列,每個 log entrie 包含一個 4 字節整型數值(值為 N+5),1 個字節的」magic value」,4 個字節的 CRC 校驗碼,其後跟 N 個字節的消息體。每條消息都有一個當前 Partition 下唯一的 64 字節的 offset,它指明了這條消息的起始位置。磁碟上存儲的消息格式如下: 
* message length : 4 bytes (value: 1+4+n)
*「magic」 value : 1 byte
* crc : 4 bytes
* payload : n bytes

這個 log entries 並非由一個文件構成,而是分成多個 segment,每個 segment 以該 segment 第一條消息的 offset 命名並以「.kafka」為後綴。另外會有一個索引文件,它標明了每個 segment 下包含的 log entry 的 offset 範圍,如下圖所示。 


因為每條消息都被 append 到該 Partition 中,屬於順序寫磁碟,因此效率非常高 (經驗證,順序寫磁碟效率比隨機寫內存還要高,這是 Kafka 高吞吐率的一個很重要的保證)。對於傳統的 message queue 而言,一般會刪除已經被消費的消息,而 Kafka 集群會保留所有的消息,無論其被消費與否。當然,因為磁碟限制,不可能永久保留所有數據 (實際上也沒必要),因此 Kafka 提供兩種策略刪除舊數據。一是基於時間,二是基於 Partition 文件大小。例如可以通過配置 $KAFKA_HOME/config/server.properties,讓 Kafka 刪除一周前的數據,也可在 Partition 文件超過 1GB 時刪除舊數據,配置如下所示。 
  1. # The minimum age of a log file to be eligible for deletion  
  2.   
  3. log.retention.hours=168  
  4.   
  5. # The maximum size of a log segment file. When this size is reached a new log segment will be created.  
  6.   
  7. log.segment.bytes=1073741824  
  8.   
  9. # The interval at which log segments are checked to see if they can be deleted according to the retention policies  
  10.   
  11. log.retention.check.interval.ms=300000  
  12.   
  13. # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.  
  14.   
  15. log.cleaner.enable=false  
這裡要注意,因為 Kafka 讀取特定消息的時間複雜度為 O(1),即與文件大小無關,所以這裡刪除過期文件與提高 Kafka 性能無關。選擇怎樣的刪除策略只與磁碟以及具體的需求有關。另外,Kafka 會為每一個 Consumer Group 保留一些 metadata 信息——當前消費的消息的 position,也即 offset。這個 offset 由 Consumer 控制。正常情況下 Consumer 會在消費完一條消息後遞增該 offset。當然,Consumer 也可將 offset 設成一個較小的值,重新消費一些消息。因為 offet 由 Consumer 控制,所以Kafka broker 是無狀態的,它不需要標記哪些消息被哪些消費過,也不需要通過 broker 去保證同一個 Consumer Group 只有一個 Consumer 能消費某一條消息,因此也就不需要鎖機制,這也為 Kafka 的高吞吐率提供了有力保障。 

Producer 消息路由 
Producer 發送消息到 broker 時,會根據 Paritition 機制選擇將其存儲到哪一個 Partition。如果 Partition 機制設置合理,所有消息可以均勻分布到不同的 Partition 里,這樣就實現了負載均衡。如果一個 Topic 對應一個文件,那這個文件所在的機器I/O將會成為這個 Topic 的性能瓶頸,而有了 Partition 後,不同的消息可以並行寫入不同 broker 的不同 Partition 里,極大的提高了吞吐率。可以在 $KAFKA_HOME/config/server.properties 中通過配置項 num.partitions 來指定新建 Topic 的默認 Partition 數量,也可在創建 Topic 時通過參數指定,同時也可以在 Topic 創建之後通過 Kafka 提供的工具修改。 

在發送一條消息時,可以指定這條消息的 key,Producer 根據這個 key 和 Partition 機制來判斷應該將這條消息發送到哪個 Parition。Paritition 機制可以通過指定 Producer 的 paritition.class 這一參數來指定,該 class 必須實現 kafka.producer.Partitioner 接口。本例中如果 key 可以被解析為整數則將對應的整數與 Partition 總數取余,該消息會被發送到該數對應的 Partition。(每個 Parition 都會有個序號, 序號從 0 開始) 
  1. package test;  
  2.   
  3. import kafka.producer.Partitioner;  
  4. import kafka.utils.VerifiableProperties;  
  5.   
  6. public class TestPartitioner implements Partitioner {  
  7.   
  8.     public TestPartitioner(VerifiableProperties verifiableProperties) {}  
  9.   
  10.     @Override  
  11.     public int partition(Object key, int numPartitions) {  
  12.   
  13.         try {  
  14.   
  15.             int partitionNum = Integer.parseInt((String) key);  
  16.             return Math.abs(Integer.parseInt((String) key) % numPartitions);  
  17.         } catch (Exception e) {  
  18.             return Math.abs(key.hashCode() % numPartitions);  
  19.         }  
  20.     }  
  21. }  
如果將上例中的類作為 TestPartitioner.class,並通過如下代碼發送 20 條消息(key 分別為 0,1,2,3) 至 topic3 (包含 4 個 Partition
  1. public void sendMessage(Producer producer) throws InterruptedException {  
  2.     for (int i = 1; i <= 5; i++) {  
  3.         List> messageList = new ArrayList>();  
  4.         for (int j = 0; j < 4; j++) {  
  5.             messageList.add(  
  6.                     new KeyedMessage(String.format("key-%d", j), String.format("Message-%d", j)));  
  7.         }  
  8.         producer.send(messageList);  
  9.     }  
  10.     producer.close();  
  11. }  
則 key 相同的消息會被發送並存儲到同一個 partition 里,而且 key 的序號正好和 Partition 序號相同。(Partition 序號從 0 開始,本例中的 key 也從 0 開始)。下圖所示是通過 Java 程序調用 Consumer 後列印出的消息列表。 


Consumer Group 
使用 Consumer high level API時,同一 Topic 的一條消息只能被同一個 Consumer Group 內的一個 Consumer 消費,但多個 Consumer Group 可同時消費這一消息。 


這是 Kafka 用來實現一個 Topic 消息的 廣播 (發給所有的Consumer) 和 單播 (發給某一個 Consumer)的手段。一個 Topic 可以對應多個 Consumer Group。如果需要實現廣播,只要每個 Consumer 有一個獨立的 Group 就可以了。要實現單播只要所有的 Consumer 在同一個Group 里。用 Consumer Group 還可以將 Consumer 進行自由的分組而不需要多次發送消息到不同的 Topic。實際上,Kafka 的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可以使用 Storm 這種實時流處理系統對消息進行實時在線處理,同時使用 Hadoop 這種批處理系統進行離線處理,還可以同時將數據實時備份到另一個數據中心,只需要保證這三個操作所使用的 Consumer 屬於不同的 Consumer Group 即可。下圖是 Kafka 在 Linkedin 的一種簡化部署示意圖 


下面這個例子更清晰地展示了 Kafka Consumer Group 的特性。首先創建一個 Topic (名為 topic1,包含 3 個 Partition),然後創建一個屬於 group1 的 Consumer 實例,並創建三個屬於 group2 的Consumer實例,最後通過 Producer向 topic1 發送 key 分別為 1,2,3 的消息。結果發現屬於 group1 的Consumer收到了所有的這三條消息,同時 group2 中的 3 個 Consumer 分別收到了 key 為 1,2,3的消息。如下圖所示。 


Push vs. Pull 
作為一個消息系統,Kafka 遵循了傳統的方式,選擇由 Producer 向 broker push 消息並由 Consumer 從 broker pull 消息。一些 logging-centric system,比如 Facebook 的 Scribe 和 Cloudera 的 Flume,採用 push 模式。事實上,push 模式和 pull 模式各有優劣。 

push 模式很難適應消費速率不同的消費者,因為消息發送速率是由 broker 決定的。push 模式的目標是儘可能以最快速度傳遞消息,但是這樣很容易造成 Consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 pull 模式則可以根據 Consumer 的消費能力以適當的速率消費消息。對於 Kafka 而言,pull 模式更合適。pull 模式可簡化 broker 的設計,Consumer 可自主控制消費消息的速率,同時 Consumer 可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。 

Kafka delivery guarantee 
有這麼幾種可能的delivery guarantee: 
* At most once 消息可能會丟,但絕不會重複傳輸
* At least one 消息絕不會丟,但可能會重複傳輸
* Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次,很多時候這是用戶所想要的。

當 Producer 向 broker 發送消息時,一旦這條消息被 commit,因數 replication 的存在,它就不會丟。但是如果 Producer 發送數據給 broker 後,遇到網絡問題而造成通信中斷,那 Producer 就無法判斷該條消息是否已經 commit。雖然 Kafka 無法確定網絡故障期間發生了什麼,但是 Producer 可以生成一種類似於主鍵的東西,發生故障時冪等性的重試多次,這樣就做到了Exactly once。截止到目前 (Kafka 0.8.2版本,2015-03-04),這一 Feature 還並未實現,有希望在 Kafka 未來的版本中實現。(所以目前默認情況下一條消息從 Producer 到 broker 是確保了 At least once,可通過設置 Producer 異步發送實現 At most once)。 

接下來討論的是消息從 broker 到 Consumer 的 delivery guarantee 語義。(僅針對 Kafka consumer high level API)。Consumer 在從 broker 讀取消息後,可以選擇 commit,該操作會在 Zookeeper 中保存該 Consumer 在該 Partition 中讀取的消息的 offset。該 Consumer 下一次再讀該 Partition 時會從下一條開始讀取。如未 commit,下一次讀取的開始位置會跟上一次 commit 之後的開始位置相同。當然可以將 Consumer 設置為 autocommit,即 Consumer 一旦讀到數據立即自動 commit。如果只討論這一讀取消息的過程,那 Kafka 是確保了 Exactly once。但實際使用中應用程式並非在 Consumer 讀取完數據就結束了,而是要進行進一步處理,而數據處理與 commit 的順序在很大程度上決定了消息從 broker 和 consumer 的 delivery guarantee semantic。 

讀完消息先 commit 再處理消息。這種模式下,如果 Consumer 在 commit 後還沒來得及處理消息就 crash了,下次重新開始工作後就無法讀到剛剛已提交而未處理的消息,這就對應於 At most once 

讀完消息先處理再 commit。這種模式下,如果在處理完消息之後 commit 之前 Consumer crash 了,下次重新開始工作時還會處理剛剛未 commit 的消息,實際上該消息已經被處理過了。這就對應於 At least once。在很多使用場景下,消息都有一個主鍵,所以消息的處理往往具有冪等性,即多次處理這一條消息跟只處理一次是等效的,那就可以認為是 Exactly once。(筆者認為這種說法比較牽強,畢竟它不是Kafka本身提供的機制,主鍵本身也並不能完全保證操作的冪等性。而且實際上我們說 delivery guarantee 語義是討論被處理多少次,而非處理結果怎樣,因為處理方式多種多樣,我們不應該把處理過程的特性——如是否冪等性,當成 Kafka 本身的 Feature

如果一定要做到 Exactly once,就需要協調 offset 和實際操作的輸出。精典的做法是引入兩階段提交。如果能讓 offset 和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,因為許多輸出系統可能不支持兩階段提交。比如,Consumer 拿到數據後可能把數據放到 HDFS,如果把最新的 offset 和數據本身一起寫到 HDFS,那就可以保證數據的輸出和 offset 的更新要麼都完成,要麼都不完成,間接實現 Exactly once。(目前就 high level API 而言,offset 是存於 Zookeeper 中的,無法存於 HDFS,而 low level API 的 offset 是由自己去維護的,可以將之存於 HDFS 中

總之,Kafka 默認保證 At least once,並且允許通過設置 Producer 異步提交來實現 At most once。而 Exactly once 要求與外部存儲系統協作,幸運的是Kafka提供的offset可以非常直接非常容易得使用這種方式。

4 則留言:

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...