在雲原生時代和容器化浪潮中,容器的日誌採集是一個看起來不起眼卻又無法忽視的重要議題。對於容器日誌採集我們常用的工具有filebeat和fluentd,兩者對比各有優劣,相比基於ruby的fluentd,考慮到可定製性,我們一般默認選擇golang技術棧的filbeat作為主力的日誌採集agent。

相比較傳統的日誌採集方式,容器化下單節點會運行更多的服務,負載也會有更短的生命週期,而這些更容易對日誌採集agent造成壓力,雖然filebeat足夠輕量級和高性能,但如果不瞭解filebeat的機制,不合理的配置filebeat,實際的生產環境使用中可能也會給我們帶來意想不到的麻煩和難題。

整體架構

日誌採集的功能看起來不複雜,主要功能無非就是找到配置的日誌文件,然後讀取並處理,發送至相應的後端如elasticsearch,kafka等。

filebeat官網有張示意圖,如下所示:

針對每個日誌文件,filebeat都會啟動一個harvester協程,即一個goroutine,在該goroutine中不停的讀取日誌文件,直到文件的EOF末尾。一個最簡單的表示採集目錄的input配置大概如下所示:

filebeat.inputs:
- type: log
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /var/log/*.log

不同的harvester goroutine採集到的日誌數據都會發送至一個全局的隊列queue中,queue的實現有兩種:基於內存和基於磁碟的隊列,目前基於磁碟的隊列還是處於alpha階段,filebeat默認啟用的是基於內存的緩存隊列。

每當隊列中的數據緩存到一定的大小或者超過了定時的時間(默認1s),會被註冊的client從隊列中消費,發送至配置的後端。目前可以設置的client有kafka、elasticsearch、redis等。

雖然這一切看著挺簡單,但在實際使用中,我們還是需要考慮更多的問題,例如:

  • 日誌文件是如何被filbebeat發現又是如何被採集的?
  • filebeat是如何確保日誌採集發送到遠程的存儲中,不丟失一條數據的?
  • 如果filebeat掛掉,下次採集如何確保從上次的狀態開始而不會重新採集所有日誌?
  • filebeat的內存或者cpu佔用過多,該如何分析解決?
  • filebeat如何支持docker和kubernetes,如何配置容器化下的日誌採集?
  • 想讓filebeat採集的日誌發送至的後端存儲,如果原生不支持,怎樣定製化開發?

這些均需要對filebeat有更深入的理解,下面讓我們跟隨filebeat的源碼一起探究其中的實現機制。

一條日誌是如何被採集的

filebeat源碼歸屬於beats項目,而beats項目的設計初衷是為了採集各類的數據,所以beats抽象出了一個libbeat庫,基於libbeat我們可以快速的開發實現一個採集的工具,除了filebeat,還有像metricbeat、packetbeat等官方的項目也是在beats工程中。

如果我們大致看一下代碼就會發現,libbeat已經實現了內存緩存隊列memqueue、幾種output日誌發送客戶端,數據的過濾處理processor等通用功能,而filebeat只需要實現日誌文件的讀取等和日誌相關的邏輯即可。

從代碼的實現角度來看,filebeat大概可以分以下幾個模塊:

  • input: 找到配置的日誌文件,啟動harvester
  • harvester: 讀取文件,發送至spooler - spooler: 緩存日誌數據,直到可以發送至publisher
  • publisher: 發送日誌至後端,同時通知registrar
  • registrar: 記錄日誌文件被採集的狀態

1. 找到日誌文件

對於日誌文件的採集和生命週期管理,filebeat抽象出一個Crawler的結構體, 在filebeat啟動後,crawler會根據配置創建,然後遍歷並運行每個input:

for _, inputConfig := range c.inputConfigs {
err := c.startInput(pipeline, inputConfig, r.GetStates())
}

在每個input運行的邏輯裏,首先會根據配置獲取匹配的日誌文件,需要注意的是,這裡的匹配方式並非正則,而是採用linux glob的規則,和正則還是有一些區別。

matches, err := filepath.Glob(path)

獲取到了所有匹配的日誌文件之後,會經過一些複雜的過濾,例如如果配置了exclude_files則會忽略這類文件,同時還會查詢文件的狀態,如果文件的最近一次修改時間大於ignore_older的配置,也會不去採集該文件。

2. 讀取日誌文件

匹配到最終需要採集的日誌文件之後,filebeat會對每個文件啟動harvester goroutine,在該goroutine中不停的讀取日誌,並發送給內存緩存隊列memqueue。

(h *Harvester) Run()方法中,我們可以看到這麼一個無限循環,省略了一些邏輯的代碼如下所示:

for {
message, err := h.reader.Next()
if err != nil {
switch err {
case ErrFileTruncate:
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.state.Source)
h.state.Offset = 0
filesTruncated.Add(1)
case ErrRemoved:
logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source)
case ErrRenamed:
logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source)
case ErrClosed:
logp.Info("Reader was closed: %s. Closing.", h.state.Source)
case io.EOF:
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source)
case ErrInactive:
logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive)
default:
logp.Err("Read line error: %v; File: %v", err, h.state.Source)
}
return nil
}
...
if !h.sendEvent(data, forwarder) {
return nil
}
}

可以看到,reader.Next()方法會不停的讀取日誌,如果沒有返回異常,則發送日誌數據到緩存隊列中。

返回的異常有幾種類型,除了讀取到EOF外,還會有例如文件一段時間不活躍等情況發生會使harvester goroutine退出,不再採集該文件,並關閉文件句柄。 filebeat為了防止佔據過多的採集日誌文件的文件句柄,默認的close_inactive參數為5min,如果日誌文件5min內沒有被修改,上面代碼會進入ErrInactive的case,之後該harvester goroutine會被關閉。 這種場景下還需要注意的是,如果某個文件日誌採集中被移除了,但是由於此時被filebeat保持著文件句柄,文件佔據的磁碟空間會被保留直到harvester goroutine結束。

3. 緩存隊列

在memqueue被初始化時,filebeat會根據配置min_event是否大於1創建BufferingEventLoop或者DirectEventLoop,一般默認都是BufferingEventLoop,即帶緩衝的隊列。

type bufferingEventLoop struct {
broker *Broker

buf *batchBuffer
flushList flushList
eventCount int

minEvents int
maxEvents int
flushTimeout time.Duration

// active broker API channels
events chan pushRequest
get chan getRequest
pubCancel chan producerCancelRequest

// ack handling
acks chan int // ackloop -> eventloop : total number of events ACKed by outputs
schedACKS chan chanList // eventloop -> ackloop : active list of batches to be acked
pendingACKs chanList // ordered list of active batches to be send to the ackloop
ackSeq uint // ack batch sequence number to validate ordering

// buffer flush timer state
timer *time.Timer
idleC <-chan time.Time
}

BufferingEventLoop是一個實現了Broker、帶有各種channel的結構,主要用於將日誌發送至consumer消費。 BufferingEventLoop的run方法中,同樣是一個無限循環,這裡可以認為是一個日誌事件的調度中心。

for {
select {
case <-broker.done:
return
case req := <-l.events: // producer pushing new event
l.handleInsert(&req)
case req := <-l.get: // consumer asking for next batch
l.handleConsumer(&req)
case count := <-l.acks:
l.handleACK(count)
case <-l.idleC:
l.idleC = nil
l.timer.Stop()
if l.buf.length() > 0 {
l.flushBuffer()
}
}
}

上文中harvester goroutine每次讀取到日誌數據之後,最終會被發送至bufferingEventLoop中的events chan pushRequest channel,然後觸發上面req := <-l.events的case,handleInsert方法會把數據添加至bufferingEventLoop的buf中,buf即memqueue實際緩存日誌數據的隊列,如果buf長度超過配置的最大值或者bufferingEventLoop中的timer定時器觸發了case <-l.idleC,均會調用flushBuffer()方法。

flushBuffer()又會觸發req := <-l.get的case,然後運行handleConsumer方法,該方法中最重要的是這一句代碼:

req.resp <- getResponse{ackChan, events}

這裡獲取到了consumer消費者的response channel,然後發送數據給這個channel。真正到這,才會觸發consumer對memqueue的消費。所以,其實memqueue並非一直不停的在被consumer消費,而是在memqueue通知consumer的時候才被消費,我們可以理解為一種脈衝式的發送。

4. 消費隊列

實際上,早在filebeat初始化的時候,就已經創建了一個eventConsumer並在loop無限循環方法裏試圖從Broker中獲取日誌數據。

for {
if !paused && c.out != nil && consumer != nil && batch == nil {
out = c.out.workQueue
queueBatch, err := consumer.Get(c.out.batchSize)
...
batch = newBatch(c.ctx, queueBatch, c.out.timeToLive)
}
...
select {
case <-c.done:
return
case sig := <-c.sig:
handleSignal(sig)
case out <- batch:
batch = nil
}
}

上面consumer.Get就是消費者consumer從Broker中獲取日誌數據,然後發送至out的channel中被output client發送,我們看一下Get方法裏的核心代碼:

select {
case c.broker.requests <- getRequest{sz: sz, resp: c.resp}:
case <-c.done:
return nil, io.EOF
}

// if request has been send, we do have to wait for a response
resp := <-c.resp
return &batch{
consumer: c,
events: resp.buf,
ack: resp.ack,
state: batchActive,
}, nil

getRequest的結構如下:

type getRequest struct {
sz int // request sz events from the broker
resp chan getResponse // channel to send response to
}

getResponse的結構:

type getResponse struct {
ack *ackChan
buf []publisher.Event
}

getResponse裏包含了日誌的數據,而getRequest包含了一個發送至消費者的channel。

在上文bufferingEventLoop緩衝隊列的handleConsumer方法裏接收到的參數為getRequest,裡麪包含了consumer請求的getResponse channel。 如果handleConsumer不發送數據,consumer.Get方法會一直阻塞在select中,直到flushBuffer,consumer的getResponse channel才會接收到日誌數據。

5. 發送日誌

在創建beats時,會創建一個clientWorker,clientWorker的run方法中,會不停的從consumer發送的channel裏讀取日誌數據,然後調用client.Publish批量發送日誌。

func (w *clientWorker) run() {
for !w.closed.Load() {
for batch := range w.qu {
if err := w.client.Publish(batch); err != nil {
return
}
}
}
}

libbeats庫中包含了kafka、elasticsearch、logstash等幾種client,它們均實現了client介面:

type Client interface {
Close() error
Publish(publisher.Batch) error
String() string
}

當然最重要的是實現Publish介面,然後將日誌發送出去。

實際上,filebeat中日誌數據在各種channel裏流轉的設計還是比較複雜和繁瑣的,筆者也是研究了好久、畫了很長的架構圖才理清楚其中的邏輯。 這裡抽出了一個簡化後的圖以供參考:

如何保證at least once

filebeat維護了一個registry文件在本地的磁碟,該registry文件維護了所有已經採集的日誌文件的狀態。 實際上,每當日誌數據發送至後端成功後,會返回ack事件。filebeat啟動了一個獨立的registry協程負責監聽該事件,接收到ack事件後會將日誌文件的State狀態更新至registry文件中,State中的Offset表示讀取到的文件偏移量,所以filebeat會保證Offset記錄之前的日誌數據肯定被後端的日誌存儲接收到。

State結構如下所示:

type State struct {
Id string `json:"-"` // local unique id to make comparison more efficient
Finished bool `json:"-"` // harvester state
Fileinfo os.FileInfo `json:"-"` // the file info
Source string `json:"source"`
Offset int64 `json:"offset"`
Timestamp time.Time `json:"timestamp"`
TTL time.Duration `json:"ttl"`
Type string `json:"type"`
Meta map[string]string `json:"meta"`
FileStateOS file.StateOS
}

記錄在registry文件中的數據大致如下所示:

[{"source":"/tmp/aa.log","offset":48,"timestamp":"2019-07-03T13:54:01.298995+08:00","ttl":-1,"type":"log","meta":null,"FileStateOS":{"inode":7048952,"device":16777220}}]

由於文件可能會被改名或移動,filebeat會根據inode和設備號來標誌每個日誌文件。

如果filebeat異常重啟,每次採集harvester啟動的時候都會讀取registry文件,從上次記錄的狀態繼續採集,確保不會從頭開始重複發送所有的日誌文件。

當然,如果日誌發送過程中,還沒來得及返回ack,filebeat就掛掉,registry文件肯定不會更新至最新的狀態,那麼下次採集的時候,這部分的日誌就會重複發送,所以這意味著filebeat只能保證at least once,無法保證不重複發送。 還有一個比較異常的情況是,linux下如果老文件被移除,新文件馬上創建,很有可能它們有相同的inode,而由於filebeat根據inode來標誌文件記錄採集的偏移,會導致registry裏記錄的其實是被移除的文件State狀態,這樣新的文件採集卻從老的文件Offset開始,從而會遺漏日誌數據。 為了盡量避免inode被複用的情況,同時防止registry文件隨著時間增長越來越大,建議使用clean_inactive和clean_remove配置將長時間未更新或者被刪除的文件State從registry中移除。

同時我們可以發現在harvester讀取日誌中,會更新registry的狀態處理一些異常場景。例如,如果一個日誌文件被清空,filebeat會在下一次Reader.Next方法中返回ErrFileTruncate異常,將inode標誌文件的Offset置為0,結束這次harvester,重新啟動新的harvester,雖然文件不變,但是registry中的Offset為0,採集會從頭開始。

特別注意的是,如果使用容器部署filebeat,需要將registry文件掛載到宿主機上,否則容器重啟後registry文件丟失,會使filebeat從頭開始重複採集日誌文件。

filebeat自動reload更新

目前filebeat支持reload input配置,module配置,但reload的機制只有定時更新。

在配置中打開reload.enable之後,還可以配置reload.period表示自動reload配置的時間間隔。 filebeat在啟動時,會創建一個專門用於reload的協程。對於每個正在運行的harvester,filebeat會將其加入一個全局的Runner列表,每次到了定時的間隔後,會觸發一次配置文件的diff判斷,如果是需要停止的加入stopRunner列表,然後逐個關閉,新的則加入startRunner列表,啟動新的Runner。

filebeat對kubernetes的支持

filebeat官方文檔提供了在kubernetes下基於daemonset的部署方式,最主要的一個配置如下所示:

- type: docker
containers.ids:
- "*"
processors:
- add_kubernetes_metadata:
in_cluster: true

即設置輸入input為docker類型。由於所有的容器的標準輸出日誌默認都在節點的/var/lib/docker/containers/<containerId>/*-json.log路徑,所以本質上採集的是這類日誌文件。

和傳統的部署方式有所區別的是,如果服務部署在kubernetes上,我們查看和檢索日誌的維度不能僅僅侷限於節點和服務,還需要有podName,containerName等,所以每條日誌我們都需要打標增加kubernetes的元信息才發送至後端。 filebeat會在配置中增加了add_kubernetes_metadata的processor的情況下,啟動監聽kubernetes的watch服務,監聽所有kubernetes pod的變更,然後將歸屬本節點的pod最新的事件同步至本地的緩存中。 節點上一旦發生容器的銷毀創建,/var/lib/docker/containers/下會有目錄的變動,filebeat根據路徑提取出containerId,再根據containerId從本地的緩存中找到pod信息,從而可以獲取到podName、label等數據,並加到日誌的元信息fields中。 filebeat還有一個beta版的功能autodiscover,autodiscover的目的是把分散到不同節點上的filebeat配置文件集中管理。目前也支持kubernetes作為provider,本質上還是監聽kubernetes事件然後採集docker的標準輸出文件。 大致架構如下所示:

但是在實際生產環境使用中,僅採集容器的標準輸出日誌還是遠遠不夠,我們往往還需要採集容器掛載出來的自定義日誌目錄,還需要控制每個服務的日誌採集方式以及更多的定製化功能。

在輕舟容器雲上,我們自研了一個監聽kubernetes事件自動生成filebeat配置的agent,通過CRD的方式,支持自定義容器內部日誌目錄、支持自定義fields、支持多行讀取等功能。同時可在kubernetes上統一管理各種日誌配置,而且無需用戶感知pod的創建銷毀和遷移,自動完成各種場景下的日誌配置生成和更新。

性能分析與調優

雖然beats系列主打輕量級,雖然用golang寫的filebeat的內存佔用確實比較基於jvm的logstash等好太多,但是事實告訴我們其實沒那麼簡單。

正常啟動filebeat,一般確實只會佔用3、40MB內存,但是在輕舟容器雲上偶發性的我們也會發現某些節點上的filebeat容器內存佔用超過配置的pod limit限制(一般設置為200MB),並且不停的觸發的OOM。 究其原因,一般容器化環境中,特別是裸機上運行的容器個數可能會比較多,導致創建大量的harvester去採集日誌。如果沒有很好的配置filebeat,會有較大概率導致內存急劇上升。 當然,filebeat內存佔據較大的部分還是memqueue,所有採集到的日誌都會先發送至memqueue聚集,再通過output發送出去。每條日誌的數據在filebeat中都被組裝為event結構,filebeat默認配置的memqueue緩存的event個數為4096,可通過queue.mem.events設置。默認最大的一條日誌的event大小限制為10MB,可通過max_bytes設置。4096 * 10MB = 40GB,可以想像,極端場景下,filebeat至少佔據40GB的內存。特別是配置了multiline多行模式的情況下,如果multiline配置有誤,單個event誤採集為上千條日誌的數據,很可能導致memqueue佔據了大量內存,致使內存爆炸。 所以,合理的配置日誌文件的匹配規則,限制單行日誌大小,根據實際情況配置memqueue緩存的個數,才能在實際使用中規避filebeat的內存佔用過大的問題。

如何對filebeat進行擴展開發

一般情況下filebeat可滿足大部分的日誌採集需求,但是仍然避免不了一些特殊的場景需要我們對filebeat進行定製化開發,當然filebeat本身的設計也提供了良好的擴展性。

beats目前只提供了像elasticsearch、kafka、logstash等幾類output客戶端,如果我們想要filebeat直接發送至其他後端,需要定製化開發自己的output。同樣,如果需要對日誌做過濾處理或者增加元信息,也可以自制processor插件。 無論是增加output還是寫個processor,filebeat提供的大體思路基本相同。一般來講有3種方式:

  1. 直接fork filebeat,在現有的源碼上開發。output或者processor都提供了類似Run、Stop等的介面,只需要實現該類介面,然後在init方法中註冊相應的插件初始化方法即可。當然,由於golang中init方法是在import包時才被調用,所以需要在初始化filebeat的代碼中手動import。
  2. 複製一份filebeat的main.go,import我們自研的插件庫,然後重新編譯。本質上和方式1區別不大。
  3. filebeat還提供了基於golang plugin的插件機制,需要把自研的插件編譯成.so共享鏈接庫,然後在filebeat啟動參數中通過-plugin指定庫所在路徑。不過實際上一方面golang plugin還不夠成熟穩定,一方面自研的插件依然需要依賴相同版本的libbeat庫,而且還需要相同的golang版本編譯,坑可能更多,不太推薦。

推薦閱讀:

相關文章