6.消息投遞

7.複製

複製日誌: 法定人數(Quorums),ISR,狀態機(State Machines)

Unclean leader election: What if they all die?

Availability and Durability Guarantees

複製管理

8.日誌壓縮

基本概念

日誌壓縮提供什麼保證?

日誌壓縮細節

配置日誌清理

kafka優秀還需要提的三點


6.消息投遞

我們已經瞭解了一些生產者和消費者是如何工作的,現在讓我們討論在生產者和消費者之間,kafka提供的語義上的保證,顯然kafka提供了多種可能的消息投遞保證:

  • At most once:最多一次,消息可能丟失,而且也不會被再次投遞;
  • At least once:最少一次,消息不會丟失,但是有可能會被再次投遞;
  • Exactly once:恰好一次,這是用戶實際需要的,每條消息會且僅會被投遞一次。

這會被分解爲兩個問題:發送消息時的持久化保證和消費消息時的保證。

許多系統申明提供了"恰好一次"投遞語義,但是注意閱讀細節,許多的這些申明是誤導(他們沒有考慮消費者或者生產者出錯的情況,也沒有考慮有多個消費者的情況,也沒有考慮數據寫到磁盤可能丟失的情況)。

kafka的語義非常直接,當發送消息的時候,我們有一個消息被"提交"到日誌中的概念,一旦一個發送的消息被提交,只要消息寫入的分區有一個broker存活,消息就不會丟失。

現在讓我們假設一個完美的,沒有任何損壞的broker,然後試圖理解對生產者和消費者的保證。如果一個生產者試圖發送一個消息並且碰到網絡錯誤,那麼它不能確認錯誤發生在消息提交前還是後。

在0.11.0.0版本之前,如果生產者沒有收到消息已提交的響應,它只能選擇重新發送消息。這就提供了"至少一次"投遞語義。因爲原來請求如果實際成功了,那麼重發可能將消息再次寫入日誌中。

從0.11.0.0開始,kafka生產者也支持冪等投遞選項,從而保證重發消息不會在日誌中重複寫入。爲了實現這種特性,broker分配給每個生產者一個ID,並且使用生產者發送每條消息的序列數字,實現重複消息刪除。

同時從0.11.0.0開始,生產者支持使用類事務語義向多個topic分區發送消息的能力:要麼所有消息成功寫入所有分區,要麼不寫入任何分區。這個特性的主要使用場景就是爲了kafka topic的"恰好一次"處理。

並不是我們所有的使用場景都需要如此強的保證,對那些延遲敏感的用戶,我們允許生產者指定它期望的持久化級別。生產者可以指定它們要等待消息提交,但是生產者也能指定執行完全異步發送,或者等待只要leader(而不是所有的副本)確認消息即可。

現在讓我們從消費者的角度描述這個語義。所有副本具有相同的offset,相同的日誌,消費者控制其在日誌中的位置。如果消費者從來沒有出問題,它能保存這個位置信息在內存中,但是如果消費者故障,我們要topic分區被另一個消費者接管,新的處理需要選擇一個開始處理的合適的位置,它有幾個處理消息並更新位置的選項:

  1. 它能讀取消息,然後在日誌中保存它的位置,最終處理消息。在這種情況下,消費者進程可能在保存位置之後,但在消息處理之前崩潰。這種情況下,進程接管並從已經保存的位置開始處理,即使這個位置之前少量消息還沒有被處理。這就是"最多一次"語義,如果消費故障,消息可能沒有被處理。
  2. 它能讀取消息,處理消息,最終在日誌中保存它的位置。在這種情況下,消費者進程在處理消息之後,但是在保存位置之前崩潰。這種場景,當新的進程接管並接受少量已經被處理過的消息。這就對應於"至少一次"的語義,許多情況下,消息有主鍵因此更新是冪等的(兩次收到相同的消息,只是用另一個自身的副本覆蓋記錄)。

那麼(用戶實際需要的)"恰好一次"語義呢?當從kafka的topic消費併產生到另一個topic時(即kafka stram應用場景),我們可以利用上面提到的0.11.0.0中的新的事務性的生產者的能力。

消費者的位置以topic的消息形式保存。因此我們能在接收topic處理數據結果的相同事務中把offset寫到kafka中。如果事務中止,消費者的位置將回到老的地方,在topic上生成的數據對其他消費者不可見,當然這取決於它們的隔離級別(參考參數:isolation.level)。默認是read_uncommitted隔離級別,即所有消息對所有消費者可見,即使它們是中止事務的一部分。但是如果設置read_committed,消費者僅能消費從已提交事務中返回的消息。

當寫到外部系統時,這限制是需要協調消費者的位置與實際存儲輸出。實現這個目標的經典方法就是在消費者位置存儲和消費者輸出存儲之間引入二階提交。但是可以通過讓消費者將offset存儲在與其輸出相同的位置來更簡單地處理,這是更好的,因爲消費者可能想要寫入的許多輸出系統可能不支持兩階段提交。 例如,一個Kafka Connect連接器,它填充HDFS中的數據以及它讀取的數據的偏移量,以確保數據和偏移都被更新或兩者都不更新。 我們遵循許多其他數據系統的類似模式,這些數據系統需要這些更強的語義,並且消息沒有主鍵以允許重複數據刪除。

因此,kafka在kafka stream中支持消息"恰好一次"投遞。事務性的生產者和消費者通常被用來在kafka topic之間傳輸數據或者處理數據時,提供"恰好一次"投遞。

其他目標系統中"恰好一次"投遞通常需要與此類系統合作,kafka提供offset,使得實現這點是可行的。因此,kafka默認保證至少一次投遞,並允許用戶通過關閉生產者的重試機制並在消費者處理一批消息之前提交offset來實現"最多一次"投遞。

7.複製

kafka通過服務器配置數值複製每個topic的分區日誌。當集羣中某臺服務器故障後,允許自動故障轉移到其他副本,因此碰到這種故障時,消息任然可用。

其他消息系統提供一些相關的複製特性,但是,在我們看來,沒有很大的用處,並且有很大的缺點:它們是不活動的,吞吐量嚴重受到影響,需要複雜的手動配置等。kafka默認具備複製功能,事實上我們把複製因子爲1的topic當做沒有複製的topic來實現的(這時候每個分區只有1個leader,沒有任何follower)。

複製單元是topic的分區。在沒有故障的情況下,每個kafka的分區有一個leader,以及0個或者多個follower。包括leader在內的總副本數就是副本因子(例如1個leader,2個follower,那麼副本因子爲3),所有的讀和寫在分區的leader上執行。整個kafka集羣通常有很多的分區,這些分區的leader均勻分佈在集羣的所有broker上。follower的日誌完全和leader的日誌一樣:有完全一樣的offset,消息順序也完全一樣(當然,某些時刻,leader可能有一些還沒有被複制到follower的消息在最新的日誌中,但是這些日誌對客戶端是不可見的)。

follower從leader消費消息,和一個普通的kafka消費者一樣。然後將消息保存在它們自己的日誌中。follower從leader拉取日誌,這是非常好的特性,這讓follower可以很自然的批量處理日誌。

和許多分佈式系統一樣,自動處理故障需要一個節點存活的明確的定義。對kafka節點來說,存活有兩個條件:

  1. 節點必須能通過zookeeper的心跳機制與其保持回話;
  2. 如果是follower,它必須複製發生在leader上的寫入,不能落後太多。

我們認爲節點滿足這兩個條件就是"In Sync"(處於同步中),從而避免混淆存活和故障兩個概念。In-Sync的副本集合被稱爲ISR,leader持有ISR信息。如果follower死亡,卡住,或者落後,leaer就會把他從ISR列表中移除。卡住和落後副本由配置參數replica.lag.time.max.ms決定。

在分佈式系統術語中,我們只嘗試處理真正有故障的“故障/恢復”模型,例如其中節點突然停止工作,然後恢復(可能不知道它們已經死亡)。 Kafka沒有處理所謂的“拜占庭(Byzantine)”故障,即節點產生任意或惡意的響應(可能是由於錯誤或犯規)。

拜占庭將軍問題:拜占庭是東羅馬帝國的首都。由於當時拜占庭羅馬帝國國土遼闊,爲了防禦目的,因此每個軍隊都分隔很遠,將軍與將軍之間只能靠信差傳消息。 在戰爭的時候,拜占庭軍隊內所有將軍和副官必須達成一致的共識,決定是否有贏的機會纔去攻打敵人的陣營。但是,在軍隊內有可能存有叛徒和敵軍的間諜,左右將軍們的決定又擾亂整體軍隊的秩序。在進行共識時,結果並不代表大多數人的意見。這時候,在已知有成員謀反的情況下,其餘忠誠的將軍在不受叛徒的影響下如何達成一致的協議,拜占庭問題就此形成。

我們現在能更加精確的定義消息被認爲提交,即當分區所有ISR都同步了該消息到日誌中。並且只有已經提交的消息纔會被投遞到消費者,這就意味着消費者不需要擔心可能看到那些如果leader故障而丟失的消息。反過來說,生產者可以決定是否等待消息被提交,這取決於它們在延遲和可靠性之間平衡的偏向性。這個偏向性通過生產者使用acks參數來控制。需要注意的是,topic還有一個ISR最小數量的設置(參數min.insync.replicas),用於檢查生產者請求確認一個消息已經被寫入ISR集合的最小數量,這個參數只有在acks=-1時才能生效。所以如果生產者請求一個不太嚴格的確認機制(例如acks=0,或者acks=1),即使ISR數量比參數指定的數量還低,消息仍能被提交,也能被消費。

kafka提供的保護機制的前提是在任何時候只要還有一個存活的ISR,那麼已經被提交的消息就不會丟失。

複製日誌: 法定人數(Quorums),ISR,狀態機(State Machines)

kafka分區的核心是複製日誌。在分佈式數據系統中,複製日誌是最基本的原語之一,也有許多方法可以實現一個。

複製日誌模擬了對一系列值的順序達成共識的過程(通常將日誌條目編號爲0,1,2,…),有許多方法實現它,但是最簡單和最快的方法是leader選擇提供給它值的順序。只要leader存在,所有副本只能拷貝leader的值和順序。

當然如果leader沒有故障,我們不需要複製。但是當leader死亡了,我們需要從follower中選擇一個新的leader。但是那些follower本身可能落後,或者故障。因此我們必須確保選擇一個最新的follower。一個日誌複製算法必須提供這樣的基本保護,如果我們告訴客戶端消息被提交了,然後leader故障,那麼我們選舉的新的leader必須有這個消息。這就帶來了一個trade-off:如果leader在申明消息已提交前,等待更多follower確認消息,那麼就會有更多潛在的leader。

這種權衡的一種通用做法就是用多數投票從而達成提交決定和leader選舉。這不是kafka的行爲,而是讓我們拋出這個問題來理解trade-off。假設我們有2n+1個副本,leader申明消息爲提交前必須有n+1個副本收到消息,如果我們選舉了一個新的leader,那麼它必須是n+1個副本中擁有最完整的日誌的follower。然後,只要不超過n個副本出現故障,就一定能保證leader有所有完整的已提交消息。

這是因爲在n+1個副本之間,一定有至少一個副本包含了所有已提交消息,這個副本的日誌是最完整的,因此將被選舉爲新的leader。每種選舉算法還有許多其他細節需要處理,我們現在先忽略它(因爲這不是這裏討論的重點)。

多數投票方法有一個非常好的屬性:延遲只取決於更快的服務器。意思是,如果副本因子爲3,延遲由更快的follower決定,而不是最慢的那個(比如有3個副本R1,R2和R3,三個響應時間分別是1ms,2ms,4ms,那麼只會延遲2ms,而不是4ms)。

這些實現有豐富的算法,例如zookeeper的Zab,Raft,和Viewstamped Replication。我們瞭解到的與Kafka實際實施的最相似的學術算法是微軟的PacificA 。感興趣的同學可以戳下面的鏈接鏈接瞭解這些方法更多的細節:

  • Zab:http://web.archive.org/web/20140602093727/http://www.stanford.edu/class/cs347/reading/zab.pdf
  • Raft:https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf
  • PacificA :http://research.microsoft.com/apps/pubs/default.aspx?id=66814
  • Viewstamped Replication:http://pmg.csail.mit.edu/papers/vr-revisited.pdf

多數投票的缺點是它不能承受許多故障,讓你選擇不出leader。容忍一個故障,需要3份數據拷貝。容忍2個故障,需要5份數據拷貝。根據我們的經驗,對於一個實際的系統來說,僅能承受1個故障是不夠的,但是每個都寫5次,需要5倍的磁盤空間,只有1/5的吞吐量,對大容量數據來說,又不太實用。這可能就是爲什麼法定人數仲裁算法更多的出現在分片的集羣配置上,例如ZooKeeper。而很少出現在主要數據存儲上。例如HDFS中,namenode的高可用特性是建立在基於多數投票算法,但是卻沒有用在數據本身(datanode)。ISR集合持久化保存在ZooKeeper中。正因爲如此,ISR中的任何副本都可以被選舉爲leader。

kafka採用了一個略有不同的方法選擇它的仲裁集合。沒有使用大多數投票,kafka動態維護了一個跟上leader的ISR集合,只有這個集合裏的成員能夠參與選舉。kafka分區的寫操作,必須直到所有ISR接收到了這個請求,才能認爲已提交。對於kafka用法而言,這是一個重要因素。因爲有許多分區,並確保leader均衡是非常重要的。有了ISR模型和n+1副本,kafka的topic能承受n個副本故障而不會丟失已經提交的消息(只要還有1個ISR就能正常運行)。

對於很多我們希望處理的使用場景,我們認爲trade-off是合理的。在實踐中,爲了容忍n個副本故障,大多數投票和ISR方法都要在提交消息之前等待相同數量副本確認(爲了一個節點故障後集羣還能正常工作,大多數法定人數需要3個副本和1個節點確認,而ISR方式需要2個副本和1個節點確認),提交能力與最慢服務器無關是大多數投票方法的優點。我們認爲ISR方法能通過允許客戶端選擇是否阻塞消息提交(acks參數)來改善,由於所需的複製因子較低,額外的吞吐量和磁盤空間是值得的。

另一個重要的設計是kafka不需要崩潰的節點在所有它們的數據完整的情況下才恢復。這裏有兩個主要的問題,首先,磁盤錯誤是持久化數據系統操作時最常見的問題,它們經常不會留下完整的數據。其次,即使這不是問題,但是我們不需要爲了我們一致性保證,每次寫操作都fsync,這可能降低2~3個數量級的性能。我們的協議是允許一個副本重新加入ISR,但是在重新加入前,需要確保它必須完全再次同步所有數據,即使在崩潰時丟失的沒有刷到磁盤上的數據。

Unclean leader election: What if they all die?

注意kafka保護數據不丟失是需要ISR中至少還有一個副本的前提。如果複製分區所有節點都死掉了,保護機制不再適用。

所以一個實際的系統當所有副本都死掉後需要做一些合理的事情,如果你恰好不幸碰到這種的事情,考慮將要發生的事情就非常重要了,這裏有兩種方法可以實施:

  1. 等待ISR中的一個副本活過來,並選擇這個副本爲leader(上帝保佑希望它有完整的數據)。
  2. 選擇第一個活過來的副本爲leader(不一定非要是ISR中的副本)。

這是非常簡單的在可用性和一致性之間權衡,如果我們等待ISR中的副本,那麼只要這些副本故障,我們就處於不可用狀態。如果這些副本被破壞,或者它們的數據丟失,那麼我們將永久不可用。如果一個非ISR副本活過來,我們也允許它成爲leader,那麼即使它不能保證有每一條已提交消息,但是所有副本恢復後還是得以它的日誌爲準。

kafka從0.11.0.0以後,默認選擇第一種策略,寧願等待一個一致性的副本。當然這個可以通過配置unclean.leader.election.enable改變這個行爲,爲了支持可用性比一致性更重要的用戶場景。

這個問題不止在kafka中才有,基於法定人數選舉方案也會遇到這樣的問題。例如,在一個多數投票方案中,如果大多數服務器永久性故障,那麼你也必須做出抉擇,如果選擇高可用,那麼數據就可能丟失。

Availability and Durability Guarantees

當消息寫入kafka時,生產者能選擇等待消息被0,1或者 all (-1)個副本確認。需要注意的是,被所有副本確認(acks=all)並不保證所有分配的副本收到消息,而是ISR都收到消息,這一點需要特別注意。

例如,如果一個topic創建時有兩個副本,然後其中一個故障了,這時候ISR只有一個副本,那麼即使指定acks=all,並且寫入成功。然而,如果這剩下的一個副本如果也出現故障,數據仍然可能丟失。

確保分區最大可用性,這種行爲可能不是那些把持久性(數據不能丟失)看的比可用性更重要的用戶所需要的。因此,kafka提供了兩個topic級別的配置,被用在消息持久性比可用性更重要的場景:

  • Disable unclean leader election - 關閉unclean leader選舉(unclean.leader.election.enable)。如果所有副本不可用,那麼分區也不可用,直到最後的leader再次恢復爲止,寧願不可用也不願冒消息丟失的風險。
  • Specify a minimum ISR size - 指定ISR最小數(min.insync.replicas)。只有ISR數量超過這個最小數(可以等於),分區才能寫入成功。爲了防止消息只寫入一個副本而導致數據丟失,這個配置只有在生產者配置acks=all的情況下才生效,從而保證消息將被ISR確認的最小數量(例如min.insync.replicas=2,那麼至少需要2個ISR確認,生產者纔會收到寫入成功的響應)。這個配置提供了在持久性和可用性之間的權衡取捨。設置更大的min.insync.replicas,能更好的保證一致性,因爲消息能被保證寫入更多的副本中,從而減少數據丟失的可能性。不過,它也會降低可用性,當ISR數量下降到低於min.insync.replicas的閾值時,這個分區將不再能寫入消息。

複製管理

上面關於複製日誌的討論實際上只涵蓋了一個日誌,即一個主題分區。然而kafka集羣一般會管理成百上千分區,我們試圖用round-robin方式平衡集羣裏的分區,避免高容量topic的所有分區集中在幾個節點上(而不是儘可能分佈在集羣各個節點上)。同樣的,我們試圖均衡leader,以便每個節點成爲其分區的比例份額的leader(假設集羣有3個broker,總計有2個topic,每個topic有3個分區,3個副本,那麼每個broker應該有2個leader)。

優化leader選舉過程非常重要,因爲這是分區不可用的關鍵窗口。一個幼稚的leader選舉行爲是leader故障後,每個分區都嘗試選舉。

相反,kafka的做法是選舉某個broker爲controller,controller發現broker級別故障後,承擔改變這個故障broker上所有受影響分區的leader。這樣做的結果是,我們能批量處理leader變更通知,即使很多分區我們的選舉過程代價也很低,而且更快。如果controller故障,某個存活的broker將成爲新的controller(這個競爭發生的概率很低,選舉代價不大,畢竟不可能broker經常故障)。

8.日誌壓縮

說明:kafka中的日誌壓縮和我們平常通過tar或者zip命令將一個afei.log日誌文件壓縮爲afei.log.tar.gz或者afei.log.zip是完全不一樣的原理。

日誌壓縮確保kafka總是保留單個topic分區的日誌數據中,每個消息key的最新的、最後已知的值。它對應的用戶場景是,應用崩潰或者系統故障後的狀態恢復,或者應用重啓後,重新加載緩存。接下來讓我們深入瞭解這些用戶場景更多的細節,然後描述kafka日誌壓縮是如何工作的。

到目前爲止,我們僅描述了更簡單的數據保留方法,其中舊日誌數據在固定的時間週期後或者當日誌達到某個預定大小時被丟棄。這適用於時間事件數據,例如每個獨立的日誌記錄。 但是,還有一類重要的數據流是對KEY的可變數據的更改日誌(例如,對數據庫表的更改)。

讓我們討論一個具體例子,我們有一個topic,包含了用戶的郵箱地址信息,每次用戶更新它們的郵箱地址,我們發送一個消息到這個topic上,並且這個消息的key是用戶ID。現在用戶ID爲123的用戶在一段時間內發送了下面這些消息,每條消息對應修改後新的郵箱地址(其他用戶ID修改的消息暫時忽略):

123 => [email protected]
.
.
.
123 => [email protected]
.
.
.
123 => [email protected]

kafka的日誌壓縮機制能提供給我們一個更細粒度的保留機制,因此我們能保證保留下每個KEY的最後一次更新(上面的例子就是[email protected])。

通過這樣做後,我們能保證日誌包含每個KEY的最終值的完整快照,而不只是最近改變的KEY。這意味着下游消費者能恢復它們的狀態,而並不需要保留所有改變的完整的日誌。讓我們從一些有用的用例開始,從而瞭解如何使用它。

  1. 數據庫變更訂閱。通常在多個數據系統中有一個數據集,並且這些系統中的某個系統就是一種數據庫(例如關係型數據庫系統,或者KV存儲系統)。例如,你可能有一個數據庫,一個緩存,一個搜索集羣,一個hadoop集羣。每一個數據庫的變更,需要反映到緩存 ,搜索集羣中,並最終反映到hadoop中。在只處理實時更新的情況下,只需要最近的日誌即可。 但是,如果您希望能夠重新加載緩存或恢復失敗的搜索節點,則可能需要完整的數據集。
  2. 事件源。這是一種應用程序設計風格,它使用變更日誌作爲應用程序的主存儲。
  3. 高可用日誌(Journaling )。 通過記錄對本地狀態的改變,本地計算過程能達到容錯的目的。如果它失敗的話,另一個進程可以重新加載這些更改並繼續執行。一個具體的例子是在流查詢系統中處理計數,聚合和其他分組處理。

這些用例中,每種場景都需要處理變化的實時反饋數據,但是偶爾一臺服務器崩潰,或者數據需要被重新加載,或者被重新處理,都需要完全加載。

一般的想法很簡單,如果我們需要保留無限多的日誌,我們記錄上面這些場景每次變更,然後,我們捕獲系統從一開始的每次狀態。用這些完整的日誌,我們通過應用日誌中前N條能恢復到任意點。這樣假象的完整的日誌,對系統來說不是很符合實際,對於一個穩定的數據集來說,多次更新一條記錄導致日誌增長而不受限制。簡單的日誌保留機制將拋棄舊的更新,從而限制空間。但是日誌不再能恢復當前狀態,因爲現在從日誌的開始位置恢復,可能不再能重新創建當前的狀態。

日誌壓縮是一個對於每條記錄來說更細粒度的保留機制,而不是基於時間的粗粒度的保留機制。這種選擇刪除記錄的想法,讓我們保留每個KEY的最近更新,日誌能保證至少有每個KEY的最新狀態。

保留策略能針對每個topic設置,因此單個集羣中,一些topic的保留策略是尺寸或者時間,而其他topic的保留策略可以是日誌壓縮。

這個功能的靈感來自一個LinkedIn的最古老,最成功的基礎架構,一個叫做databus的數據庫變更日誌緩存服務。不像許多日誌結構存儲系統系統,kafka爲了訂閱和組織數據而生,以便能更快速的線性讀和寫。也不像databus,kafka扮演了一個真實的存儲,所以即使在上游數據源無法重放的情況下它也很有用。

基本概念

下面是一張圖片,顯示了Kafka日誌的邏輯結構以及每條消息的偏移量:

kafka優秀還需要提的三點


日誌頭部和傳統的kafka日誌是相同的。它有密集且有序的offset,並保留所有消息。日誌壓縮提供了一個處理日誌尾部的選項。這張圖片展示了一個已壓縮尾部的日誌。需要注意的是,日誌尾部的消息保留了它們第一次寫入時的原始的offset,並且這個offset從來不會改變。並且所有offset都是日誌中有效的位置,即使消息已經被壓縮處理過的offset。例如,如上圖所示,36,37,38這三個offset是完全等價的位置,在這3個offset上的讀都會返回offset爲38位置的消息(即使36和37兩個offset指定的日誌已經被刪除)。

壓縮在後臺完成,通過週期性的重新複製日誌段。清理不會阻塞讀,並且爲了避免影響生產者和消費者,可以限制使用不超過配置的I / O吞吐量。

kafka壓縮一個日誌段的實際過程更像這樣:

kafka優秀還需要提的三點

log coompaction

日誌壓縮提供什麼保證?

日誌壓縮提供如下保證:

  1. 任何消費者都會看到所寫的每條消息,這些消息將具有連續的偏移量。 topic的參數min.compaction.lag.ms可用於保證消息寫入後必須經過的最小時間才能被壓縮。即 它提供了每條消息保留不壓縮狀態的時間下限。
  2. 消息順序總是不變,壓縮不會重新對消息排序,只是刪除一些消息。
  3. 消息的offset不會改變,offset永遠是日誌中位置的唯一標識符。

日誌壓縮細節

日誌壓縮被一個日誌清理程序控制,它是一個後臺線程池。它會重新拷貝日誌段文件,並刪除那麼KEY已經在日誌頭部中出現的記錄。每個壓縮線程按照如下方式工作:

  1. 選擇日誌頭與日誌尾比率最高的日誌;
  2. 創建一個簡單的摘要,日誌頭部中的每個KEY的最後一個偏移量。
  3. 從頭到尾重新拷貝日誌,並刪除那些在日誌更後面的地方出現過的KEY。新的,乾淨的段被立即交換到日誌中,因此所需的額外磁盤空間只是一個額外的日誌段,而不是日誌的完整副本。
  4. 日誌頭概要實際上只是一個空間緊湊的hash表,每個entry恰好用24個字節,這樣做的結果就是一個8G的清理緩衝區,一次迭代清理大概能清理366G的日誌頭(假設每個消息是1K)。

配置日誌清理

日誌清理默認被開啓,這將開啓一個清理線程池。爲了在一個特定的topic上打開日誌清理,你可以增加一些屬性:

  • log.cleanup.policy=compact


配置壓縮(compact)日誌的清理策略,還可以配置刪除(delete)日誌的清理策略,delete是默認策略;

  • log.cleaner.min.compaction.lag.ms=5000


其含義是,消息在日誌中保持不壓縮的最短時間,僅適用於正在被壓縮的日誌。如果沒有配置,則除了最新的日誌段(當前正被寫入,即活動段),其他所有日誌段都可以壓縮。活動段不能被壓縮,即使這個段裏所有的日誌比參數log.cleaner.min.compaction.lag.ms配置的更老。

相關文章