在進行詳解之前,我想先聲明一下,本次我們進行講解說明的是 Kafka 消息存儲的信息文件內容,不是所謂的 Kafka 伺服器運行產生的日誌文件,這一點希望大家清楚。

Kafka 消息是以主題為單位進行歸類,各個主題之間是彼此獨立的,互不影響。每個主題又可以分為一個或多個分區。每個分區各自存在一個記錄消息數據的日誌文件。也就是該文要著重關注的內容。我們根據如下的圖進行進一步說明:

圖中,創建了一個 demo-topic 主題,其存在 7 個 Parition,對應的每個 Parition 下存在一個 [Topic-Parition] 命名的消息日誌文件。在理想情況下,數據流量分攤到各個 Parition 中,實現了負載均衡的效果。在分區日誌文件中,你會發現很多類型的文件,比如:.index、.timestamp、.log、.snapshot 等,其中,文件名一致的文件集合就稱為 LogSement。我們先留有這樣的一個整體的日誌結構概念,接下來我們一一的進行詳細的說明其中的設計。

LogSegment

我們已經知道分區日誌文件中包含很多的 LogSegment ,Kafka 日誌追加是順序寫入的,LogSegment 可以減小日誌文件的大小,進行日誌刪除的時候和數據查找的時候可以快速定位。同時,ActiveLogSegment 也就是活躍的日誌分段擁有文件擁有寫入許可權,其餘的 LogSegment 只有隻讀的許可權。

日誌文件存在多種後綴文件,重點需要關注 .index、.timestamp、.log 三種類型。其他的日誌類型功能作用,請查詢下面圖表:

類別作用.index偏移量索引文件.timestamp時間戳索引文件.log日誌文件.snaphot快照文件.deleted.cleaned日誌清理時臨時文件.swapLog Compaction 之後的臨時文件Leader-epoch-checkpoint

每個 LogSegment 都有一個基準偏移量,用來表示當前 LogSegment 中第一條消息的 offset。偏移量是一個 64 位的長整形數,固定是20位數字,長度未達到,用 0 進行填補,索引文件和日誌文件都由該作為文件名命名規則(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。特別說明一下,如果日誌文件名為 00000000000000000121.log ,則當前日誌文件的一條數據偏移量就是 121,偏移量是從 0 開始的。

如果想要查看相應文件內容可以通過 kafka-run-class.sh 腳本查看 .log

/data/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log

2.0 中可以使用 kafka-dump-log.sh 查 看.index 文件

/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index

日誌與索引文件

配置項默認值說明log.index.interval.bytes4096 (4K)增加索引項位元組間隔密度,會影響索引文件中的區間密度和查詢效率log.segment.bytes1073741824 (1G)日誌文件最大值log.roll.ms當前日誌分段中消息的最大時間戳與當前系統的時間戳的差值允許的最大範圍,毫秒維度log.roll.hours168 (7天)當前日誌分段中消息的最大時間戳與當前系統的時間戳的差值允許的最大範圍,小時維度log.index.size.max.bytes10485760 (10MB)觸發偏移量索引文件或時間戳索引文件分段位元組限額

偏移量索引文件用於記錄消息偏移量與物理地址之間的映射關係。時間戳索引文件則根據時間戳查找對應的偏移量。

Kafka 中的索引文件是以稀疏索引的方式構造消息的索引,他並不保證每一個消息在索引文件中都有對應的索引項。每當寫入一定量的消息時,偏移量索引文件和時間戳索引文件分別增加一個偏移量索引項和時間戳索引項,通過修改 log.index.interval.bytes 的值,改變索引項的密度。

切分文件

從上文中可知,日誌文件和索引文件都會存在多個文件,組成多個 SegmentLog,那麼其切分的規則是怎樣的呢?

當滿足如下幾個條件中的其中之一,就會觸發文件的切分:

  1. 當前日誌分段文件的大小超過了 broker 端參數 log.segment.bytes 配置的值。log.segment.bytes 參數的默認值為 1073741824,即 1GB。
  2. 當前日誌分段中消息的最大時間戳與當前系統的時間戳的差值大於 log.roll.mslog.roll.hours 參數配置的值。如果同時配置了 log.roll.mslog.roll.hours 參數,那麼 log.roll.ms 的優先順序高。默認情況下,只配置了 log.roll.hours 參數,其值為168,即 7 天。
  3. 偏移量索引文件或時間戳索引文件的大小達到 broker 端參數 log.index.size.max.bytes 配置的值。log.index.size.max.bytes 的默認值為 10485760,即 10MB。
  4. 追加的消息的偏移量與當前日誌分段的偏移量之間的差值大於 Integer.MAX_VALUE,即要追加的消息的偏移量不能轉變為相對偏移量。

為什麼是 Integer.MAX_VALUE

在偏移量索引文件中,每個索引項共佔用 8 個位元組,並分為兩部分。相對偏移量和物理地址。

相對偏移量:表示消息相對與基準偏移量的偏移量,占 4 個位元組

物理地址:消息在日誌分段文件中對應的物理位置,也占 4 個位元組

4 個位元組剛好對應 Integer.MAX_VALUE ,如果大於 Integer.MAX_VALUE ,則不能用 4 個位元組進行表示了。

索引文件切分過程

索引文件會根據 log.index.size.max.bytes 值進行預先分配空間,即文件創建的時候就是最大值,當真正的進行索引文件切分的時候,才會將其裁剪到實際數據大小的文件。這一點是跟日誌文件有所區別的地方。其意義降低了代碼邏輯的複雜性。

查找消息

offset 查詢

偏移量索引由相對偏移量和物理地址組成。

可以通過如下命令解析.index 文件

/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index
offset:0 position:0
offset:20 position:320
offset:43 position:1220

注意:offset 與 position 沒有直接關係哦,由於存在數據刪除和日誌清理。

在偏移量索引文件中,索引數據都是順序記錄 offset ,但時間戳索引文件中每個追加的索引時間戳必須大於之前追加的索引項,否則不予追加。在 Kafka 0.11.0.0 以後,消息信息中存在若干的時間戳信息。如果 broker 端參數 log.message.timestamp.type 設置為 LogAppendTIme ,那麼時間戳必定能保持單調增長。反之如果是 CreateTime 則無法保證順序。

注意:timestamp文件中的 offset 與 index 文件中的 relativeOffset 不是一一對應的哦。因為數據的寫入是各自追加。

e.g. 如何查看 偏移量為 23 的消息?

Kafka 中存在一個 ConcurrentSkipListMap 來保存在每個日誌分段,通過跳躍表方式,定位到在 00000000000000000000.index ,通過二分法在偏移量索引文件中找到不大於 23 的最大索引項,即 offset 20 那欄,然後從日誌分段文件中的物理位置為320 開始順序查找偏移量為 23 的消息。

時間戳方式查詢

在上文已經有所提及,通過時間戳方式進行查找消息,需要通過查找時間戳索引和偏移量索引兩個文件。

時間戳索引索引格式

e.g. 查找時間戳為 1557554753430 開始的消息?

  • 將 1557554753430 和每個日誌分段中最大時間戳 largestTimeStamp 逐一對比,直到找到不小於 1557554753430 所對應的日誌分段。日誌分段中的 largestTimeStamp 的計算是先查詢該日誌分段所對應時間戳索引文件,找到最後一條索引項,若最後一條索引項的時間戳欄位值大於 0 ,則取該值,否則去該日誌分段的最近修改時間。
  • 找到相應日誌分段之後,使用二分法進行定位,與偏移量索引方式類似,找到不大於 1557554753430 最大索引項,也就是 [1557554753420 430]。
  • 拿著偏移量為 430 到偏移量索引文件中使用二分法找到不大於 430 最大索引項,即 [20,320] 。
  • 日誌文件中從 320 的物理位置開始查找不小於 1557554753430 數據。

注意:timestamp文件中的 offset 與 index 文件中的 relativeOffset 不是一一對應的哦。因為數據的寫入是各自追加。

在偏移量索引文件中,索引數據都是順序記錄 offset ,但時間戳索引文件中每個追加的索引時間戳必須大於之前追加的索引項,否則不予追加。在 Kafka 0.11.0.0 以後,消息信息中存在若干的時間戳信息。如果 broker 端參數 log.message.timestamp.type 設置為 LogAppendTIme ,那麼時間戳必定能保持單調增長。反之如果是 CreateTime 則無法保證順序。

日誌清理

日誌清理,不是日誌刪除哦,這還是有所區別的,日誌刪除會在下文進行說明。

Kafka 提供兩種日誌清理策略:

日誌刪除:按照一定的刪除策略,將不滿足條件的數據進行數據刪除

日誌壓縮:針對每個消息的 Key 進行整合,對於有相同 Key 的不同 Value 值,只保留最後一個版本。

Kafka 提供 log.cleanup.policy 參數進行相應配置,默認值:delete,還可以選擇 compact。

是否支持針對具體的 Topic 進行配置?

答案是肯定的,主題級別的配置項是 cleanup.policy

日誌刪除

Kafka 會周期性根據相應規則進行日誌數據刪除,保留策略有 3 種:基於時間的保留策略、基於日誌大小的保留策略和基於日誌其實偏移量的保留策略。

基於時間

日誌刪除任務會根據 log.retention.hours/log.retention.minutes/log.retention.ms 設定日誌保留的時間節點。如果超過該設定值,就需要進行刪除。默認是 7 天,log.retention.ms 優先順序最高。

如何查找日誌分段文件中已經過去的數據呢?

Kafka 依據日誌分段中最大的時間戳進行定位,首先要查詢該日誌分段所對應的時間戳索引文件,查找時間戳索引文件中最後一條索引項,若最後一條索引項的時間戳欄位值大於 0,則取該值,否則取最近修改時間。

為什麼不直接選最近修改時間呢?

因為日誌文件可以有意無意的被修改,並不能真實的反應日誌分段的最大時間信息。

刪除過程

  1. 從日誌對象中所維護日誌分段的跳躍表中移除待刪除的日誌分段,保證沒有線程對這些日誌分段進行讀取操作。
  2. 這些日誌分段所有文件添加 上 .delete 後綴。
  3. 交由一個以 "delete-file" 命名的延遲任務來刪除這些 .delete 為後綴的文件。延遲執行時間可以通過 file.delete.delay.ms 進行設置

如果活躍的日誌分段中也存在需要刪除的數據時?

Kafka 會先切分出一個新的日誌分段作為活躍日誌分段,然後執行刪除操作。

基於日誌大小

日誌刪除任務會檢查當前日誌的大小是否超過設定值。設定項為 log.retention.bytes ,單個日誌分段的大小由 log.regment.bytes 進行設定。

刪除過程

  1. 計算需要被刪除的日誌總大小 (當前日誌文件大小-retention值)。
  2. 從日誌文件第一個 LogSegment 開始查找可刪除的日誌分段的文件集合。
  3. 執行刪除。

基於日誌起始偏移量

基於日誌起始偏移量的保留策略的判斷依據是某日誌分段的下一個日誌分段的起始偏移量是否大於等於日誌文件的起始偏移量,若是,則可以刪除此日誌分段。

注意:日誌文件的起始偏移量並不一定等於第一個日誌分段的基準偏移量,存在數據刪除,可能與之相等的那條數據已經被刪除了。

刪除過程

  • 從頭開始變了每一個日誌分段,日誌分段 1 的下一個日誌分段的起始偏移量為 11,小於 logStartOffset,將 日誌分段 1 加入到刪除隊列中
  • 日誌分段 2 的下一個日誌分段的起始偏移量為 23,小於 logStartOffset,將 日誌分段 2 加入到刪除隊列中
  • 日誌分段 3 的下一個日誌分段的起始偏移量為 30,大於 logStartOffset,則不進行刪除。

推薦閱讀:

相关文章