更好的閱讀體驗請移步至我的 blog:

Detect the Source Code of List Watch Between API Server and Etcd?

littledriver.net圖標

寫在前面

在上一篇文章中,我們通過 Kubernetes 的架構圖以及一個 Deployment 資源對象創建的過程大致了解了List-Watch機制在一個 Kubernetes 集群中所起的作用以及它所面臨的問題。本文我們將繼續深入List-Watch機制的實現原理,從源碼的角度再次探索它其中的奧秘。

List-Watch 機制時序圖

通過上面的時序圖我們可以看到,在 Controller 和 API Server 交互之前,API Server 和 Kubectl 以及 etcd 還有一段交互的過程。這個過程對於整個 List-Watch 機制是非常重要的,因為它是 List-Watch 機制對外提供的數據的生產過程。所以,本文將對這一過程做出詳細的分析。

API Server

一個資源創建的起點是從 API Server 提供的 HTTP API 開始的。這裡之所以沒有提到時序圖中的 kubelet 是因為,除了使用 kubelet,我們還可以通過 client-go 或者直接發送 HTTP 請求的方式給 API Server 來創建資源。既然List-Watch機制中消息的發送端為 API Server,那麼它肯定就提供了相應的 List 和 Watch 的 HTTP。API。通過觀察 API Server 中註冊 HTTP API 的代碼邏輯:apiserver/installer.go at master · kubernetes/apiserver · GitHub, 我們可以發現它通過「類型轉換」構造了一個 Lister 對象還有一個 Watcher 對象:

// what verbs are supported by the storage, used to know what verbs we support per path
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
updater, isUpdater := storage.(rest.Updater)
patcher, isPatcher := storage.(rest.Patcher)
watcher, isWatcher := storage.(rest.Watcher)
connecter, isConnecter := storage.(rest.Connecter)
storageMeta, isMetadata := storage.(rest.StorageMetadata)

順著 registerResourceHandlers函數的邏輯往下看我們可以知道,無論是 Lister 還是 Watcher,都是通過一個叫做restfulListResource的方法封裝了一下暴露給外部使用的:apiserver/installer.go at master · kubernetes/apiserver · GitHub。而通過進一步觀察這個函數的內部邏輯我們也可以看到,watcher 和 lister 最終在名為ListResource的方法內執行其內部真正的邏輯。具體當一個 GET 請求過來調用的是 Watch 還是 List 介面,是通過請求當中的一個參數來確定的:apiserver/get.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub。

假設目前的 HTTP 請求是 Watch。那麼在ListResource 的邏輯中就會走到apiserver/get.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub 這一步。它調用 Watcher 的 Watch 方法,創建了一個Watch-Interface類型的對象。然後將其傳遞至 serveWatch方法:apiserver/watch.go at 8a1312795085bced3bc5d4553b97c450a79fc420 · kubernetes/apiserver · GitHub。通過觀察 serveWatch方法的內部邏輯可知,之前創建的Watch-Interface的對象被塞入了 ServeHTTP中,然後利用這個對象內部一個用於傳遞「資源對象信息的 channel」中的消息來對外部的「Watch」 請求提供服務:apiserver/watch.go at 8a1312795085bced3bc5d4553b97c450a79fc420 · kubernetes/apiserver · GitHub

到目前為止,通過對 API Server 關於List-Watch 機制的源碼梳理,我們基本可以確定,API Server 獲取資源對象信息的邏輯主要是實現於Watch-Interface類型的對象以及向其內部的channel 傳遞消息的發送端。而在上面的描述中,我們還可以梳理出一條Watch-Interface類型的對象的創建鏈路:

既然 Watcher 對象是通過 Storage 對象進行轉換而來的,那麼就說明watch.Interface中的方法大概率也是在 Storage 類型的對象中實現的。在apiserver/store.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub 文件中,我們看到了 Storage類型實現了List 和 Watch 方法,繼續遞歸的跟進下面的邏輯發現,最終,在名為WatchPredicate的函數中,調用了名為 Storage 成員(與上面說的 Storage 類型的對象不是一個,它只是 struct 中其中一個 filed)的 Watch 方法,返回了類型為watch.Interface 的對象:apiserver/store.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub。而這個名為 Storage 成員的類型為DryRunnableStorage。通過查看 DryRunnableStorage這個類型的定義可知,其內部包含了一個類型為storage.Interface的對象,該 Interface 內部涵蓋了 Watch 和 List 方法,這兩個方法應該會被具體的某個資源實現,如 Pod, Deployment 等。

WARNING: 讀者閱讀到這裡想必有點頭暈,因為邏輯嵌套的層數太多,並且很多方法和成員的名字都是相同的。所以這裡建議大家根據我貼出的源碼的鏈接,按照 blog 中敘述的順序畫一個流程圖,會看的更清楚。

全局搜索一下創建Store類型對象的地方可知,幾乎存在於每一個資源的目錄下:

我們挑選 Deployment 資源對象的創建Store類型對象的邏輯:

// NewREST returns a RESTStorage object that will work against deployments.
func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, *RollbackREST) {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &extensions.Deployment{} },
NewListFunc: func() runtime.Object { return &extensions.DeploymentList{} },
DefaultQualifiedResource: extensions.Resource("deployments"),

CreateStrategy: deployment.Strategy,
UpdateStrategy: deployment.Strategy,
DeleteStrategy: deployment.Strategy,

TableConvertor: printerstorage.TableConvertor{TablePrinter: printers.NewTablePrinter().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter}
if err := store.CompleteWithOptions(options); err != nil {
panic(err) // TODO: Propagate error up
}

statusStore := *store
statusStore.UpdateStrategy = deployment.StatusStrategy
return &REST{store, []string{"all"}}, &StatusREST{store: &statusStore}, &RollbackREST{store: store}
}

可以看出在構造genericregistry.Store對象的時候,沒有有指定一個名為 Storage 的成員。在執行 return 語句之前僅僅只調用了一個store.CompleteWithOptions方法。跟進去之後,豁然開朗:apiserver/store.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub。名為 Storage 成員內部包含的類型為storage.Interface的對象最終是被一個名為Decorator的方法創建的。而這個方法和opts這個變數有關。順著這條線向上查找,我們最終發現,在 Deployment 代碼邏輯中有一個名為 NewStorage 的函數:kubernetes/storage.go at master · kubernetes/kubernetes · GitHub。opts 這個變數的值來自於這個函數的參數optsGetter

在向上尋找 opts 參數的過程中我們發現這條鏈路是比較長的,也是比較繞的,很難清晰的去定位到它第一次被創建的地方。所以,我們換一種思路:因為看到 opts 這個參數的類型為generic.RESTOptionsGetter,所以我們在全局可以搜一下,哪裡有對這個類型變數的賦值操作並且和 Storage 有關的。最後,我們定位到了這裡:apiserver/etcd.go at c53cd379d4b8e8acbe23a7a3b40c949687ba9926 · kubernetes/apiserver · GitHub。這是一段和 etcd 配置有關的邏輯。這段邏輯在 API Server 啟動構造其使用的配置的邏輯中有調用過:kubernetes/server.go at b1a52a38e9e3651680655416cc7afbec5e119854 · kubernetes/kubernetes · GitHub。buildGenericConfig函數構造的通用配置,最終賦值給了啟動 Master 節點所需要的配置集合:kubernetes/server.go at b1a52a38e9e3651680655416cc7afbec5e119854 · kubernetes/kubernetes · GitHub。而這部分配置最終被用於創建 API Server: kubernetes/server.go at b1a52a38e9e3651680655416cc7afbec5e119854 · kubernetes/kubernetes · GitHub。在 CreateKubeAPIServer函數內,我們可以看到,通過參數傳遞進來的 master.Config 最終調用了一個名為Complete的函數處理了一下相關配置,並且通過它的返回值調用了一個 New 函數。而在 New 函數的內部,也通過調用一個名為Install的函數,將restOptionsGetter參數傳了進去。

此時,我們再次返回到Deployment 代碼邏輯中的 NewStorage 函數被調用的地方:kubernetes/storage_apps.go at 7f23a743e8c23ac6489340bbb34fa6f1d392db9d · kubernetes/kubernetes · GitHub,隨機選取一個版本的函數v1beta1Storage,它在當前文件的一個名為NewRestStorage函數中被調用:github.com/kubernetes/k。隨後,我們按照這條調用鏈路再繼續向上尋找restOptionsGetter參數被賦值位置。隨即定位到了kubernetes/master.go at ec2e767e59395376fa191d7c56a74f53936b7653 · kubernetes/kubernetes · GitHub中的 InstallAPIs函數。

Bingo,此時我們可以將調用InstallAPIs函數的邏輯作為橋樑,將我們上面整個的查找流程鏈接起來。所以,當前我們已經可以確認的是,我們之前說的opts變數已經找到了出處。但實際上,在創建 Deployment storage 對象時,名為 Storage 成員內部包含的類型為storage.Interface的對象最終是被一個名為Decorator的方法創建的,這個Decorator方法來自於opts變數。再次查看Decorator方法的定義可知,它被包含在一個 Interface 內部的函數的返回值中,而這個函數正是創建opts這個變數所在的類必須要實現的:GetRESTOptions。所以,我們需要再次回到和 etcd 配置有關的邏輯:

func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
}
c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
return nil
}

type StorageFactoryRestOptionsFactory struct {
Options EtcdOptions
StorageFactory serverstorage.StorageFactory
}

func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}

ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
cacheSize, ok := sizes[resource]
if !ok {
cacheSize = f.Options.DefaultWatchCacheSize
}
ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
}

return ret, nil
}

可以看到GetRESTOptions最終被StorageFactoryRestOptionsFactory類實現。查看genericregistry.StorageWithCacher的定義,一路跟下去,就會發現,我們最終是創建了一個名為cacher類型為storage.Interface的變數,它將作為 Decorator 的值:apiserver/storage_factory.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub。如果再深入至cacher創建的邏輯,可以看到,它是實現了storage.Interface的全部介面的:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。

還記得我們當時在閱讀API Server中關於ListResource方法的時候,發現最終通過 ServeHTTP 函數對外提供服務的是一個storage.Interface類型對象調用了其 Watch 介面的返回值,即一個Watch-Interface類型的對象。在cacher的實現部分,我們同樣可以找到一個這樣的函數:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub,其內部將會為我們創建一個類型為cacheWatcher的對象apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。這個對象內部有一個非常重要的成員:result: make(chan watch.Event, chanSize),它是被一個名為ResultChan的函數暴露給外部使用的:

// Implements watch.Interface.
func (c *cacheWatcher) ResultChan() <-chan watch.Event {
return c.result
}

這個函數相信你看到後會非常非常的熟悉,因為我們在ServeHTTP 函數中看到過它曾經被watcher.Watch()的返回值調用:apiserver/watch.go at 8a1312795085bced3bc5d4553b97c450a79fc420 · kubernetes/apiserver · GitHub。newCacheWatcher除了返回一個cacheWatcher類型的對象之外,還會啟動一個goroutine, 執行一個名為process的函數:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。當它從一個名為input的 channel 中監聽到有資源對象的信息發送過來的時候,就會通過sendWatchCacheEvent函數,將最新的 event 通過剛才提到的 Result Channel 發送給客戶端。並且由於客戶端和 API Server 之間是一個長連接,所以這個循環會一直執行。

如果讀者閱讀過前一篇過於 List-Watch 機制原理性的文章就可以知道,List-Watch 機制會通過 ResourceVersion 來保證發送報文的順序性。而這部分邏輯就是在 process內實現的。如果當前客戶端對 API Server 的 Watch 請求帶來的 ResourceVersion 為1,那麼 process 函數內的邏輯保證會返回給客戶端一個序號大於1的報文。

但是,截止到目前為止,我們只是看到了cacheWatcher為每一個請求啟動一個 goroutine 不斷的監聽資源對象信息的變化,如果有新的消息過來就返回給客戶端。那麼這個變化的消息是誰向inputchannel 傳遞過來的呢?回頭看下 cacher 的數據結構:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub,我們發現有如下幾個成員是值得注意的:

// Underlying storage.Interface.
storage storage.Interface
// "sliding window" of recent changes of objects and the current state.
watchCache *watchCache
reflector *cache.Reflector

storage storage.Interface(資源對象數據真正的來源)

storage 成員是在創建 Cacher 的時候就被傳遞進來了。根據這個順序向回查找,查看genericregistry.StorageWithCacher的定義apiserver/storage_factory.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub,可知 storage 是通過 NewRawStorage創建的。一路跟下去後。我們最後到了創建 etcdStorage 的函數里:kubernetes/etcd_helper.go at release-1.12 · kubernetes/kubernetes · GitHub。此時,你可以看下 NewEtcdStorage的返回值還有etcdHelper類實現的的 List 和 Watch 方法,就可以明白,整個 ListWatch 機制中,API Server 從 etcd 獲取資源對象信息所使用的 Watch 和 List 方法就是在這裡真正的被實現。而這也符合我們之前看的 Kubernetes 的架構圖中的一個細節:API Server 對外提供的一切信息都是從 etcd 而來的。如果你進入 etcd 實現的 Watch 方法中,稍微掃一眼就可以看到,它核心的邏輯就是啟動一個死循環,不斷的等待從 etcd 而來的有關資源對象的信息。

for {
resp, err := watcher.Next(w.ctx)
if err != nil {
w.etcdError <- err
return
}
w.etcdIncoming <- resp
}

除此之外,etcdHelper類還實現了很多和 etcd 相關的方法。那麼我們姑且可以認為,storage 成員是操作 etcd 的一個封裝。

watchCache *watchCache(資源對象信息的緩存)

apiserver/watch_cache.go at b080aefffce393d0aa75a2d3c62442b5515c8963 · kubernetes/apiserver · GitHub 通過觀察 watchCache 和數據結構以及它實現的方法,我猜測它應該是實現了一個資源對象信息的緩存。將通過和 etcd 通信而獲取到的資源對象的信息緩存在內存中。當資源對象長時間未發生變化的時候,如果再有 List 或者 Watch 請求該資源對象的信息,可以直接返回給它緩存中的內容,而不再去和 etcd 通信。

reflector *cache.Reflector

reflector 的創建一共需要兩個重要的對象作為參數:

  1. listerWatcher
  2. watchCache

其中 watchCache 我們上面已經提到過,那 listerWatcher 是什麼呢?在 cacher.go 文件中可以找到創建這個對象的方法的定義:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。瀏覽一下它實現的方法集合可知,它其實就是對上面我們說到的 storage 對象的一個封裝。實際上調用的還是 storage 內實現的一些方法。

通過觀察 Cacher 數據結構中幾個比較重要的成員的邏輯,我們現在確定了資源對象真正的數據來源,也了解了catchWatcher 啟動了一個goroutine 運行死循環等待著從inputchannel 發來的資源對象的信息。目前唯一缺少的就是數據生產者是如何將數據傳遞到input這個 channel 中的。

在創建了 Cacher 對象時候,我們緊接著運行了一個名為StartCaching 的方法:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。在它的邏輯內部,我們調用了 reflector 的 ListWatch 方法client-go/reflector.go at ee7a1ba5cdf1292b67a1fdf1fa28f90d2a7b0084 · kubernetes/client-go · GitHub。在這個方法中,它先通過 listerWatcher 封裝的 List 方法全量的獲取了一下 Kubernetes 集群中的資源,然後起了一個死循環,調用了 ListerWatcher 封裝的 Watch 方法,然後在 watchHandler 方法中,通過訪問一個阻塞的 channel,等待資源對象信息從 etcd 發過來。如果此時確實接收到了一個資源對象的信息,它會調用 watchCache.Add 方法,將其塞入緩存中:apiserver/watch_cache.go at b080aefffce393d0aa75a2d3c62442b5515c8963 · kubernetes/apiserver · GitHub。在 Add 的函數的邏輯中,會繼續調用 processEvent函數,在其內部我們發現,除了正常的更新 watchCache 的緩存之外,還執行了行邏輯:apiserver/watch_cache.go at b080aefffce393d0aa75a2d3c62442b5515c8963 · kubernetes/apiserver · GitHub。

那麼這個 onEvent 函數究竟是什麼呢?返回 watchCache 被創建的邏輯的位置會發現,watchCache.onEvent 是被cacher.processEvent 賦值的。cacher.processEvent 函數的定義如下:

func (c *Cacher) processEvent(event *watchCacheEvent) {
if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
}
c.incoming <- *event
}

再結合對比一下 watchCache.Add 函數的邏輯可知,從 etcd 發來的 event 將會通過 processEvent 函數傳遞至incoming 這個 channel。而 incoming 這個 channel 是在 cacher 對象的dispatchEvents 函數內被讀取的:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。跟著這個數據流動的邏輯繼續向下,我們發現這個 event 最終作為參數傳遞到了之前我們創建的watcher的 Add 方法內:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。進入這個 Add 函數的內部,你會發現一下子豁然開朗,因為這個 event 事件經過漫長的流程終於傳遞到了inputchannel: apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。而上面這些 watcher 是怎麼被收集進來的呢?通過 cacher 中 Watch 方法的邏輯可以發現,它來自於對 Watch 方法的調用:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。而這個方法,正是在我們前面說到的,API Server 在響應 Watch/List 相關的 HTTP 請求的時候,生成類型為watch.Interface且名為catcherWatcher對象時調用的。

至此,整個List-Watch機制中,資源對象的數據從 etcd 到 API Server HTTP API 之間的數據流動過程就都走通了。


推薦閱讀:
相关文章