更好的閱讀體驗請移步至我的 blog:
Detect the Source Code of List Watch Between API Server and Etcd?littledriver.net
在上一篇文章中,我們通過 Kubernetes 的架構圖以及一個 Deployment 資源對象創建的過程大致了解了List-Watch機制在一個 Kubernetes 集群中所起的作用以及它所面臨的問題。本文我們將繼續深入List-Watch機制的實現原理,從源碼的角度再次探索它其中的奧秘。
List-Watch
通過上面的時序圖我們可以看到,在 Controller 和 API Server 交互之前,API Server 和 Kubectl 以及 etcd 還有一段交互的過程。這個過程對於整個 List-Watch 機制是非常重要的,因為它是 List-Watch 機制對外提供的數據的生產過程。所以,本文將對這一過程做出詳細的分析。
一個資源創建的起點是從 API Server 提供的 HTTP API 開始的。這裡之所以沒有提到時序圖中的 kubelet 是因為,除了使用 kubelet,我們還可以通過 client-go 或者直接發送 HTTP 請求的方式給 API Server 來創建資源。既然List-Watch機制中消息的發送端為 API Server,那麼它肯定就提供了相應的 List 和 Watch 的 HTTP。API。通過觀察 API Server 中註冊 HTTP API 的代碼邏輯:apiserver/installer.go at master · kubernetes/apiserver · GitHub, 我們可以發現它通過「類型轉換」構造了一個 Lister 對象還有一個 Watcher 對象:
// what verbs are supported by the storage, used to know what verbs we support per path creater, isCreater := storage.(rest.Creater) namedCreater, isNamedCreater := storage.(rest.NamedCreater) lister, isLister := storage.(rest.Lister) getter, isGetter := storage.(rest.Getter) getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions) gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter) collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter) updater, isUpdater := storage.(rest.Updater) patcher, isPatcher := storage.(rest.Patcher) watcher, isWatcher := storage.(rest.Watcher) connecter, isConnecter := storage.(rest.Connecter) storageMeta, isMetadata := storage.(rest.StorageMetadata)
順著 registerResourceHandlers函數的邏輯往下看我們可以知道,無論是 Lister 還是 Watcher,都是通過一個叫做restfulListResource的方法封裝了一下暴露給外部使用的:apiserver/installer.go at master · kubernetes/apiserver · GitHub。而通過進一步觀察這個函數的內部邏輯我們也可以看到,watcher 和 lister 最終在名為ListResource的方法內執行其內部真正的邏輯。具體當一個 GET 請求過來調用的是 Watch 還是 List 介面,是通過請求當中的一個參數來確定的:apiserver/get.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub。
registerResourceHandlers
restfulListResource
ListResource
假設目前的 HTTP 請求是 Watch。那麼在ListResource 的邏輯中就會走到apiserver/get.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub 這一步。它調用 Watcher 的 Watch 方法,創建了一個Watch-Interface類型的對象。然後將其傳遞至 serveWatch方法:apiserver/watch.go at 8a1312795085bced3bc5d4553b97c450a79fc420 · kubernetes/apiserver · GitHub。通過觀察 serveWatch方法的內部邏輯可知,之前創建的Watch-Interface的對象被塞入了 ServeHTTP中,然後利用這個對象內部一個用於傳遞「資源對象信息的 channel」中的消息來對外部的「Watch」 請求提供服務:apiserver/watch.go at 8a1312795085bced3bc5d4553b97c450a79fc420 · kubernetes/apiserver · GitHub
Watch-Interface
serveWatch
到目前為止,通過對 API Server 關於List-Watch 機制的源碼梳理,我們基本可以確定,API Server 獲取資源對象信息的邏輯主要是實現於Watch-Interface類型的對象以及向其內部的channel 傳遞消息的發送端。而在上面的描述中,我們還可以梳理出一條Watch-Interface類型的對象的創建鏈路:
既然 Watcher 對象是通過 Storage 對象進行轉換而來的,那麼就說明watch.Interface中的方法大概率也是在 Storage 類型的對象中實現的。在apiserver/store.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub 文件中,我們看到了 Storage類型實現了List 和 Watch 方法,繼續遞歸的跟進下面的邏輯發現,最終,在名為WatchPredicate的函數中,調用了名為 Storage 成員(與上面說的 Storage 類型的對象不是一個,它只是 struct 中其中一個 filed)的 Watch 方法,返回了類型為watch.Interface 的對象:apiserver/store.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub。而這個名為 Storage 成員的類型為DryRunnableStorage。通過查看 DryRunnableStorage這個類型的定義可知,其內部包含了一個類型為storage.Interface的對象,該 Interface 內部涵蓋了 Watch 和 List 方法,這兩個方法應該會被具體的某個資源實現,如 Pod, Deployment 等。
watch.Interface
WatchPredicate
DryRunnableStorage
storage.Interface
WARNING: 讀者閱讀到這裡想必有點頭暈,因為邏輯嵌套的層數太多,並且很多方法和成員的名字都是相同的。所以這裡建議大家根據我貼出的源碼的鏈接,按照 blog 中敘述的順序畫一個流程圖,會看的更清楚。
全局搜索一下創建Store類型對象的地方可知,幾乎存在於每一個資源的目錄下:
Store
我們挑選 Deployment 資源對象的創建Store類型對象的邏輯:
// NewREST returns a RESTStorage object that will work against deployments. func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, *RollbackREST) { store := &genericregistry.Store{ NewFunc: func() runtime.Object { return &extensions.Deployment{} }, NewListFunc: func() runtime.Object { return &extensions.DeploymentList{} }, DefaultQualifiedResource: extensions.Resource("deployments"),
CreateStrategy: deployment.Strategy, UpdateStrategy: deployment.Strategy, DeleteStrategy: deployment.Strategy,
TableConvertor: printerstorage.TableConvertor{TablePrinter: printers.NewTablePrinter().With(printersinternal.AddHandlers)}, } options := &generic.StoreOptions{RESTOptions: optsGetter} if err := store.CompleteWithOptions(options); err != nil { panic(err) // TODO: Propagate error up }
statusStore := *store statusStore.UpdateStrategy = deployment.StatusStrategy return &REST{store, []string{"all"}}, &StatusREST{store: &statusStore}, &RollbackREST{store: store} }
可以看出在構造genericregistry.Store對象的時候,沒有有指定一個名為 Storage 的成員。在執行 return 語句之前僅僅只調用了一個store.CompleteWithOptions方法。跟進去之後,豁然開朗:apiserver/store.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub。名為 Storage 成員內部包含的類型為storage.Interface的對象最終是被一個名為Decorator的方法創建的。而這個方法和opts這個變數有關。順著這條線向上查找,我們最終發現,在 Deployment 代碼邏輯中有一個名為 NewStorage 的函數:kubernetes/storage.go at master · kubernetes/kubernetes · GitHub。opts 這個變數的值來自於這個函數的參數optsGetter。
genericregistry.Store
store.CompleteWithOptions
Decorator
opts
optsGetter
在向上尋找 opts 參數的過程中我們發現這條鏈路是比較長的,也是比較繞的,很難清晰的去定位到它第一次被創建的地方。所以,我們換一種思路:因為看到 opts 這個參數的類型為generic.RESTOptionsGetter,所以我們在全局可以搜一下,哪裡有對這個類型變數的賦值操作並且和 Storage 有關的。最後,我們定位到了這裡:apiserver/etcd.go at c53cd379d4b8e8acbe23a7a3b40c949687ba9926 · kubernetes/apiserver · GitHub。這是一段和 etcd 配置有關的邏輯。這段邏輯在 API Server 啟動構造其使用的配置的邏輯中有調用過:kubernetes/server.go at b1a52a38e9e3651680655416cc7afbec5e119854 · kubernetes/kubernetes · GitHub。buildGenericConfig函數構造的通用配置,最終賦值給了啟動 Master 節點所需要的配置集合:kubernetes/server.go at b1a52a38e9e3651680655416cc7afbec5e119854 · kubernetes/kubernetes · GitHub。而這部分配置最終被用於創建 API Server: kubernetes/server.go at b1a52a38e9e3651680655416cc7afbec5e119854 · kubernetes/kubernetes · GitHub。在 CreateKubeAPIServer函數內,我們可以看到,通過參數傳遞進來的 master.Config 最終調用了一個名為Complete的函數處理了一下相關配置,並且通過它的返回值調用了一個 New 函數。而在 New 函數的內部,也通過調用一個名為Install的函數,將restOptionsGetter參數傳了進去。
generic.RESTOptionsGetter
buildGenericConfig
CreateKubeAPIServer
Complete
Install
restOptionsGetter
此時,我們再次返回到Deployment 代碼邏輯中的 NewStorage 函數被調用的地方:kubernetes/storage_apps.go at 7f23a743e8c23ac6489340bbb34fa6f1d392db9d · kubernetes/kubernetes · GitHub,隨機選取一個版本的函數v1beta1Storage,它在當前文件的一個名為NewRestStorage函數中被調用:https://github.com/kubernetes/kubernetes/blob/7f23a743e8c23ac6489340bbb34fa6f1d392db9d/pkg/registry/apps/rest/storage_apps.go#L38。隨後,我們按照這條調用鏈路再繼續向上尋找restOptionsGetter參數被賦值位置。隨即定位到了kubernetes/master.go at ec2e767e59395376fa191d7c56a74f53936b7653 · kubernetes/kubernetes · GitHub中的 InstallAPIs函數。
v1beta1Storage
NewRestStorage
InstallAPIs
Bingo,此時我們可以將調用InstallAPIs函數的邏輯作為橋樑,將我們上面整個的查找流程鏈接起來。所以,當前我們已經可以確認的是,我們之前說的opts變數已經找到了出處。但實際上,在創建 Deployment storage 對象時,名為 Storage 成員內部包含的類型為storage.Interface的對象最終是被一個名為Decorator的方法創建的,這個Decorator方法來自於opts變數。再次查看Decorator方法的定義可知,它被包含在一個 Interface 內部的函數的返回值中,而這個函數正是創建opts這個變數所在的類必須要實現的:GetRESTOptions。所以,我們需要再次回到和 etcd 配置有關的邏輯:
GetRESTOptions
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error { if err := s.addEtcdHealthEndpoint(c); err != nil { return err } c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory} return nil }
type StorageFactoryRestOptionsFactory struct { Options EtcdOptions StorageFactory serverstorage.StorageFactory }
func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { storageConfig, err := f.StorageFactory.NewConfig(resource) if err != nil { return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error()) }
ret := generic.RESTOptions{ StorageConfig: storageConfig, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, EnableGarbageCollection: f.Options.EnableGarbageCollection, ResourcePrefix: f.StorageFactory.ResourcePrefix(resource), CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod, } if f.Options.EnableWatchCache { sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes) if err != nil { return generic.RESTOptions{}, err } cacheSize, ok := sizes[resource] if !ok { cacheSize = f.Options.DefaultWatchCacheSize } ret.Decorator = genericregistry.StorageWithCacher(cacheSize) }
return ret, nil }
可以看到GetRESTOptions最終被StorageFactoryRestOptionsFactory類實現。查看genericregistry.StorageWithCacher的定義,一路跟下去,就會發現,我們最終是創建了一個名為cacher類型為storage.Interface的變數,它將作為 Decorator 的值:apiserver/storage_factory.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub。如果再深入至cacher創建的邏輯,可以看到,它是實現了storage.Interface的全部介面的:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。
StorageFactoryRestOptionsFactory
genericregistry.StorageWithCacher
cacher
還記得我們當時在閱讀API Server中關於ListResource方法的時候,發現最終通過 ServeHTTP 函數對外提供服務的是一個storage.Interface類型對象調用了其 Watch 介面的返回值,即一個Watch-Interface類型的對象。在cacher的實現部分,我們同樣可以找到一個這樣的函數:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub,其內部將會為我們創建一個類型為cacheWatcher的對象apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。這個對象內部有一個非常重要的成員:result: make(chan watch.Event, chanSize),它是被一個名為ResultChan的函數暴露給外部使用的:
cacheWatcher
result: make(chan watch.Event, chanSize)
ResultChan
// Implements watch.Interface. func (c *cacheWatcher) ResultChan() <-chan watch.Event { return c.result }
這個函數相信你看到後會非常非常的熟悉,因為我們在ServeHTTP 函數中看到過它曾經被watcher.Watch()的返回值調用:apiserver/watch.go at 8a1312795085bced3bc5d4553b97c450a79fc420 · kubernetes/apiserver · GitHub。newCacheWatcher除了返回一個cacheWatcher類型的對象之外,還會啟動一個goroutine, 執行一個名為process的函數:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。當它從一個名為input的 channel 中監聽到有資源對象的信息發送過來的時候,就會通過sendWatchCacheEvent函數,將最新的 event 通過剛才提到的 Result Channel 發送給客戶端。並且由於客戶端和 API Server 之間是一個長連接,所以這個循環會一直執行。
watcher.Watch()
newCacheWatcher
process
input
sendWatchCacheEvent
如果讀者閱讀過前一篇過於 List-Watch 機制原理性的文章就可以知道,List-Watch 機制會通過 ResourceVersion 來保證發送報文的順序性。而這部分邏輯就是在 process內實現的。如果當前客戶端對 API Server 的 Watch 請求帶來的 ResourceVersion 為1,那麼 process 函數內的邏輯保證會返回給客戶端一個序號大於1的報文。
但是,截止到目前為止,我們只是看到了cacheWatcher為每一個請求啟動一個 goroutine 不斷的監聽資源對象信息的變化,如果有新的消息過來就返回給客戶端。那麼這個變化的消息是誰向inputchannel 傳遞過來的呢?回頭看下 cacher 的數據結構:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub,我們發現有如下幾個成員是值得注意的:
// Underlying storage.Interface. storage storage.Interface // "sliding window" of recent changes of objects and the current state. watchCache *watchCache reflector *cache.Reflector
storage 成員是在創建 Cacher 的時候就被傳遞進來了。根據這個順序向回查找,查看genericregistry.StorageWithCacher的定義apiserver/storage_factory.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub,可知 storage 是通過 NewRawStorage創建的。一路跟下去後。我們最後到了創建 etcdStorage 的函數里:kubernetes/etcd_helper.go at release-1.12 · kubernetes/kubernetes · GitHub。此時,你可以看下 NewEtcdStorage的返回值還有etcdHelper類實現的的 List 和 Watch 方法,就可以明白,整個 ListWatch 機制中,API Server 從 etcd 獲取資源對象信息所使用的 Watch 和 List 方法就是在這裡真正的被實現。而這也符合我們之前看的 Kubernetes 的架構圖中的一個細節:API Server 對外提供的一切信息都是從 etcd 而來的。如果你進入 etcd 實現的 Watch 方法中,稍微掃一眼就可以看到,它核心的邏輯就是啟動一個死循環,不斷的等待從 etcd 而來的有關資源對象的信息。
NewRawStorage
NewEtcdStorage
etcdHelper
for { resp, err := watcher.Next(w.ctx) if err != nil { w.etcdError <- err return } w.etcdIncoming <- resp }
除此之外,etcdHelper類還實現了很多和 etcd 相關的方法。那麼我們姑且可以認為,storage 成員是操作 etcd 的一個封裝。
apiserver/watch_cache.go at b080aefffce393d0aa75a2d3c62442b5515c8963 · kubernetes/apiserver · GitHub 通過觀察 watchCache 和數據結構以及它實現的方法,我猜測它應該是實現了一個資源對象信息的緩存。將通過和 etcd 通信而獲取到的資源對象的信息緩存在內存中。當資源對象長時間未發生變化的時候,如果再有 List 或者 Watch 請求該資源對象的信息,可以直接返回給它緩存中的內容,而不再去和 etcd 通信。
reflector 的創建一共需要兩個重要的對象作為參數:
其中 watchCache 我們上面已經提到過,那 listerWatcher 是什麼呢?在 cacher.go 文件中可以找到創建這個對象的方法的定義:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。瀏覽一下它實現的方法集合可知,它其實就是對上面我們說到的 storage 對象的一個封裝。實際上調用的還是 storage 內實現的一些方法。
通過觀察 Cacher 數據結構中幾個比較重要的成員的邏輯,我們現在確定了資源對象真正的數據來源,也了解了catchWatcher 啟動了一個goroutine 運行死循環等待著從inputchannel 發來的資源對象的信息。目前唯一缺少的就是數據生產者是如何將數據傳遞到input這個 channel 中的。
在創建了 Cacher 對象時候,我們緊接著運行了一個名為StartCaching 的方法:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。在它的邏輯內部,我們調用了 reflector 的 ListWatch 方法client-go/reflector.go at ee7a1ba5cdf1292b67a1fdf1fa28f90d2a7b0084 · kubernetes/client-go · GitHub。在這個方法中,它先通過 listerWatcher 封裝的 List 方法全量的獲取了一下 Kubernetes 集群中的資源,然後起了一個死循環,調用了 ListerWatcher 封裝的 Watch 方法,然後在 watchHandler 方法中,通過訪問一個阻塞的 channel,等待資源對象信息從 etcd 發過來。如果此時確實接收到了一個資源對象的信息,它會調用 watchCache.Add 方法,將其塞入緩存中:apiserver/watch_cache.go at b080aefffce393d0aa75a2d3c62442b5515c8963 · kubernetes/apiserver · GitHub。在 Add 的函數的邏輯中,會繼續調用 processEvent函數,在其內部我們發現,除了正常的更新 watchCache 的緩存之外,還執行了行邏輯:apiserver/watch_cache.go at b080aefffce393d0aa75a2d3c62442b5515c8963 · kubernetes/apiserver · GitHub。
StartCaching
processEvent
那麼這個 onEvent 函數究竟是什麼呢?返回 watchCache 被創建的邏輯的位置會發現,watchCache.onEvent 是被cacher.processEvent 賦值的。cacher.processEvent 函數的定義如下:
func (c *Cacher) processEvent(event *watchCacheEvent) { if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) { // Monitor if this gets backed up, and how much. glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen) } c.incoming <- *event }
再結合對比一下 watchCache.Add 函數的邏輯可知,從 etcd 發來的 event 將會通過 processEvent 函數傳遞至incoming 這個 channel。而 incoming 這個 channel 是在 cacher 對象的dispatchEvents 函數內被讀取的:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。跟著這個數據流動的邏輯繼續向下,我們發現這個 event 最終作為參數傳遞到了之前我們創建的watcher的 Add 方法內:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。進入這個 Add 函數的內部,你會發現一下子豁然開朗,因為這個 event 事件經過漫長的流程終於傳遞到了inputchannel: apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。而上面這些 watcher 是怎麼被收集進來的呢?通過 cacher 中 Watch 方法的邏輯可以發現,它來自於對 Watch 方法的調用:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。而這個方法,正是在我們前面說到的,API Server 在響應 Watch/List 相關的 HTTP 請求的時候,生成類型為watch.Interface且名為catcherWatcher對象時調用的。
incoming
watcher
catcherWatcher
至此,整個List-Watch機制中,資源對象的數據從 etcd 到 API Server HTTP API 之間的數據流動過程就都走通了。