作者:明成 AS6咪咕二級用戶中心 開發工程師
來源:AS6的課程筆記


消息中間件kafka源碼剖析及探究(一)


Kafka是最初由Linkedin公司開發,是一個分佈式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分佈式消息系統,它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/spark流式處理引擎,web/nginx日誌、訪問日誌,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會併成爲頂級開源項目。

  • 相關名詞解釋

Broker:Kafka節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka集羣。

Topic:一類消息,消息存放的目錄即主題,例如page view日誌、click日誌等都可以以topic的形式存在,Kafka集羣能夠同時負責多個topic的分發。

Partition:topic物理上的分組,一個topic可以分爲多個partition,每個partition是一個有序的隊列。

Segment:partition物理上由多個segment組成,每個Segment存着message信息。

Producer : 生產message發送到topic。

Consumer : 訂閱topic消費message, consumer作爲一個線程來消費。


消息中間件kafka源碼剖析及探究(一)


Consumer Group:一個Consumer Group包含多個consumer, 這個是預先在配置文件中配置好的。各個consumer(consumer 線程)可以組成一個組(Consumer group ),partition中的每個message只能被組(Consumer group ) 中的一個consumer(consumer 線程 )消費,如果一個message可以被多個consumer(consumer 線程 ) 消費的話,那麼這些consumer必須在不同的組。Kafka不支持一個partition中的message由兩個或兩個以上的consumer thread來處理,即便是來自不同的consumer group的也不行。它不能像AMQ那樣可以多個BET作爲consumer去處理message,這是因爲多個BET去消費一個Queue中的數據的時候,由於要保證不能多個線程拿同一條message,所以就需要行級別悲觀所(for update),這就導致了consume的性能下降,吞吐量不夠。而kafka爲了保證吞吐量,只允許一個consumer線程去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴展,那麼再加新的consumer thread去消費。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就形成了分佈式消費的概念。

  • Kafka的特性

- 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作。

- 可擴展性:kafka集羣支持熱擴展。

- 持久性、可靠性:消息被持久化到本地磁盤,並且支持數據備份防止數據丟失。

- 容錯性:允許集羣中節點失敗(若副本數量爲n,則允許n-1個節點失敗)

- 高併發:支持數千個客戶端同時讀寫。

  • Kafka的使用場景

- 日誌收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。

- 消息系統:解耦和生產者和消費者、緩存消息等。

- 用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發佈到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。

- 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分佈式應用的數據,生產各種操作的集中反饋,比如報警和報告。

  • Kakfa的設計思想


消息中間件kafka源碼剖析及探究(一)


- Kakfa Broker Leader的選舉:Kakfa Broker集羣受Zookeeper管理。所有的Kafka Broker節點一起去Zookeeper上註冊一個臨時節點,因爲只有一個Kafka Broker會註冊成功,其他的都會失敗,所以這個成功在Zookeeper上註冊臨時節點的這個Kafka Broker會成爲Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(這個過程叫Controller在ZooKeeper註冊Watch)。這個Controller會監聽其他的Kafka Broker的所有信息,如果這個Kafka Broker Controller宕機了,在Zookeeper上面的那個臨時節點就會消失,此時所有的Kafka Broker又會一起去Zookeeper上註冊一個臨時節點,因爲只有一個Kafka Broker會註冊成功,其他的都會失敗,所以這個成功在Zookeeper上註冊臨時節點的這個Kafka Broker會成爲Kafka Broker Controller,其他的Kafka broker叫Kafka Broker Follower。例如:一旦有一個broker宕機了,這個kafka broker controller會讀取該宕機broker上所有的partition在zookeeper上的狀態,並選取ISR列表中的一個replica作爲partition leader(如果ISR列表中的replica全掛,選一個倖存的replica作爲leader; 如果該partition的所有的replica都宕機了,則將新的leader設置爲-1,等待恢復,等待ISR中的任一個Replica“活”過來,並且選它作爲Leader;或選擇第一個“活”過來的Replica(不一定是ISR中的)作爲Leader),這個broker宕機的事情,kafka controller也會通知zookeeper,zookeeper就會通知其他的kafka broker。


消息中間件kafka源碼剖析及探究(一)


- Consumer Group:當啓動一個consumer group去消費一個topic的時候,無論topic裏面有多個少個partition,無論我們consumer group裏面配置了多少個consumer thread,這個consumer group下面的所有consumer thread一定會消費全部的partition;即便這個consumer group下只有一個consumer thread,那麼這個consumer thread也會去消費所有的partition。因此,最優的設計就是,consumer group下的consumer thread的數量等於partition數量,這樣效率是最高的。

同一partition的一條message只能被同一個Consumer Group內的一個Consumer消費。不能夠一個consumer group的多個consumer同時消費一個partition。

一個consumer group下,無論有多少個consumer,這個consumer group一定回去把這個topic下所有的partition都消費了。當consumer group裏面的consumer數量小於這個topic下的partition數量的時候,如下圖groupA,groupB,就會出現一個conusmer thread消費多個partition的情況,總之是這個topic下的partition都會被消費。如果consumer group裏面的consumer數量等於這個topic下的partition數量的時候,如下圖groupC,此時效率是最高的,每個partition都有一個consumer thread去消費。當consumer group裏面的consumer數量大於這個topic下的partition數量的時候,如下圖GroupD,就會有一個consumer thread空閒。因此,我們在設定consumer group的時候,只需要指明裏面有幾個consumer數量即可,無需指定對應的消費partition序號,consumer會自動進行rebalance。

多個Consumer Group下的consumer可以消費同一條message,但是這種消費也是以O(1)的方式順序的讀取message去消費,,所以一定會重複消費這批message的,不能向AMQ那樣多個BET作爲consumer消費(對message加鎖,消費的時候不能重複消費message)。


消息中間件kafka源碼剖析及探究(一)


- Topic & Partition:Topic相當於傳統消息系統MQ中的一個隊列queue,producer端發送的message必須指定是發送到哪個topic,但是不需要指定topic下的哪個partition,因爲kafka會把收到的message進行load balance,均勻的分佈在這個topic下的不同的partition上( hash(message) % [broker數量] )。物理上存儲上,這個topic會分成一個或多個partition,每個partiton相當於是一個子queue。在物理結構上,每個partition對應一個物理的目錄(文件夾),文件夾命名是[topicname]_[partition]_[序號],一個topic可以有無數多的partition,根據業務需求和數據量來設置。在kafka配置文件中可隨時更高num.partitions參數來配置更改topic的partition數量,在創建Topic時通過參數指定parittion數量。Topic創建之後通過Kafka提供的工具也可以修改partiton數量。

一般來說,(1)一個Topic的Partition數量大於等於Broker的數量,可以提高吞吐率。(2)同一個Partition的Replica儘量分散到不同的機器,高可用。

當add a new partition的時候,partition裏面的message不會重新進行分配,原來的partition裏面的message數據不會變,新加的這個partition剛開始是空的,隨後進入這個topic的message就會重新參與所有partition的load balance。

- Partition Replica:每個partition可以在其他的kafka broker節點上存副本,以便某個kafka broker節點宕機不會影響這個kafka集羣。存replica副本的方式是按照kafka broker的順序存。例如有5個kafka broker節點,某個topic有3個partition,每個partition存2個副本,那麼partition1存broker1,broker2,partition2存broker2,broker3,以此類推(replica副本數目不能大於kafka broker節點的數目,否則報錯。這裏的replica數其實就是partition的副本總數,其中包括一個leader,其他的就是copy副本)。這樣如果某個broker宕機,其實整個kafka內數據依然是完整的。但是,replica副本數越高,系統雖然越穩定,但是回來帶資源和性能上的下降;replica副本少的話,也會造成系統丟數據的風險。

- Consumer Rebalance的觸發條件:

(1)Consumer增加或刪除會觸發 Consumer Group的Rebalance

(2)Broker的增加或者減少都會觸發 Consumer Rebalance。

- Consumer: Consumer處理partition裏面的message的時候是O(1)順序讀取的。所以必須維護着上一次讀到哪裏的offset信息。high level API,offset存於Zookeeper中,low level API的offset由自己維護。一般來說都是使用high level api的。Consumer的delivery gurarantee,默認是讀完message先commmit再處理message,autocommit默認是true,這時候先commit就會更新offsite+1,一旦處理失敗,offset已經+1,這個時候就會丟message;也可以配置成讀完消息處理再commit,這種情況下consumer端的響應就會比較慢的,需要等處理完纔行。

一般情況下,一定是一個consumer group處理一個topic的message,Best Practice是這個consumer group裏面consumer的數量等於topic裏面partition的數量,這樣效率是最高的。一個consumer thread處理一個partition。如果這個consumer group裏面consumer的數量小於topic裏面partition的數量,就會有consumer thread同時處理多個partition(這個是kafka自動的機制,我們不用指定),但是總之這個topic裏面的所有partition都會被處理到的。如果這個consumer group裏面consumer的數量大於topic裏面partition的數量,多出的consumer thread就會閒着啥也不幹,剩下的是一個consumer thread處理一個partition,這就造成了資源的浪費,因爲一個partition不可能被兩個consumer thread去處理。

34張架構史上最全技術知識圖譜

程序員專屬手機壁紙來了。。。

相關文章