写在前面

在上一篇文章中,我们通过 Kubernetes 的架构图以及一个 Deployment 资源对象创建的过程大致了解了List-Watch机制在一个 Kubernetes 集群中所起的作用以及它所面临的问题。本文我们将继续深入List-Watch机制的实现原理,从源码的角度再次探索它其中的奥秘。

List-Watch 机制时序图

通过上面的时序图我们可以看到,在 Controller 和 API Server 交互之前,API Server 和 Kubectl 以及 etcd 还有一段交互的过程。这个过程对于整个 List-Watch 机制是非常重要的,因为它是 List-Watch 机制对外提供的数据的生产过程。所以,本文将对这一过程做出详细的分析。

API Server

一个资源创建的起点是从 API Server 提供的 HTTP API 开始的。这里之所以没有提到时序图中的 kubelet 是因为,除了使用 kubelet,我们还可以通过 client-go 或者直接发送 HTTP 请求的方式给 API Server 来创建资源。既然List-Watch机制中消息的发送端为 API Server,那么它肯定就提供了相应的 List 和 Watch 的 HTTP。API。通过观察 API Server 中注册 HTTP API 的代码逻辑:apiserver/installer.go at master · kubernetes/apiserver · GitHub, 我们可以发现它通过「类型转换」构造了一个 Lister 对象还有一个 Watcher 对象:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
	// what verbs are supported by the storage, used to know what verbs we support per path
	creater, isCreater := storage.(rest.Creater)
	namedCreater, isNamedCreater := storage.(rest.NamedCreater)
	lister, isLister := storage.(rest.Lister)
	getter, isGetter := storage.(rest.Getter)
	getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
	gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
	collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
	updater, isUpdater := storage.(rest.Updater)
	patcher, isPatcher := storage.(rest.Patcher)
	watcher, isWatcher := storage.(rest.Watcher)
	connecter, isConnecter := storage.(rest.Connecter)
	storageMeta, isMetadata := storage.(rest.StorageMetadata)

顺着 registerResourceHandlers函数的逻辑往下看我们可以知道,无论是 Lister 还是 Watcher,都是通过一个叫做restfulListResource的方法封装了一下暴露给外部使用的:apiserver/installer.go at master · kubernetes/apiserver · GitHub。而通过进一步观察这个函数的内部逻辑我们也可以看到,watcher 和 lister 最终在名为ListResource的方法内执行其内部真正的逻辑。具体当一个 GET 请求过来调用的是 Watch 还是 List 接口,是通过请求当中的一个参数来确定的:apiserver/get.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub

假设目前的 HTTP 请求是 Watch。那么在ListResource 的逻辑中就会走到apiserver/get.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub 这一步。它调用 Watcher 的 Watch 方法,创建了一个Watch-Interface类型的对象。然后将其传递至 serveWatch方法:apiserver/watch.go at 8a1312795085bced3bc5d4553b97c450a79fc420 · kubernetes/apiserver · GitHub。通过观察 serveWatch方法的内部逻辑可知,之前创建的Watch-Interface的对象被塞入了 ServeHTTP中,然后利用这个对象内部一个用于传递「资源对象信息的 channel」中的消息来对外部的「Watch」 请求提供服务:apiserver/watch.go at 8a1312795085bced3bc5d4553b97c450a79fc420 · kubernetes/apiserver · GitHub

到目前为止,通过对 API Server 关于List-Watch 机制的源码梳理,我们基本可以确定,API Server 获取资源对象信息的逻辑主要是实现于Watch-Interface类型的对象以及向其内部的channel 传递消息的发送端。而在上面的描述中,我们还可以梳理出一条Watch-Interface类型的对象的创建链路:

既然 Watcher 对象是通过 Storage 对象进行转换而来的,那么就说明watch.Interface中的方法大概率也是在 Storage 类型的对象中实现的。在apiserver/store.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub 文件中,我们看到了 Storage类型实现了List 和 Watch 方法,继续递归的跟进下面的逻辑发现,最终,在名为WatchPredicate的函数中,调用了名为 Storage 成员(与上面说的 Storage 类型的对象不是一个,它只是 struct 中其中一个 filed)的 Watch 方法,返回了类型为watch.Interface 的对象:apiserver/store.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub。而这个名为 Storage 成员的类型为DryRunnableStorage。通过查看 DryRunnableStorage这个类型的定义可知,其内部包含了一个类型为storage.Interface的对象,该 Interface 内部涵盖了 Watch 和 List 方法,这两个方法应该会被具体的某个资源实现,如 Pod, Deployment 等。

WARNING: 读者阅读到这里想必有点头晕,因为逻辑嵌套的层数太多,并且很多方法和成员的名字都是相同的。所以这里建议大家根据我贴出的源码的链接,按照 blog 中叙述的顺序画一个流程图,会看的更清楚。

全局搜索一下创建Store类型对象的地方可知,几乎存在于每一个资源的目录下:

我们挑选 Deployment 资源对象的创建Store类型对象的逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// NewREST returns a RESTStorage object that will work against deployments.
func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, *RollbackREST) {
	store := &genericregistry.Store{
		NewFunc:                  func() runtime.Object { return &extensions.Deployment{} },
		NewListFunc:              func() runtime.Object { return &extensions.DeploymentList{} },
		DefaultQualifiedResource: extensions.Resource("deployments"),

		CreateStrategy: deployment.Strategy,
		UpdateStrategy: deployment.Strategy,
		DeleteStrategy: deployment.Strategy,

		TableConvertor: printerstorage.TableConvertor{TablePrinter: printers.NewTablePrinter().With(printersinternal.AddHandlers)},
	}
	options := &generic.StoreOptions{RESTOptions: optsGetter}
	if err := store.CompleteWithOptions(options); err != nil {
		panic(err) // TODO: Propagate error up
	}

	statusStore := *store
	statusStore.UpdateStrategy = deployment.StatusStrategy
	return &REST{store, []string{"all"}}, &StatusREST{store: &statusStore}, &RollbackREST{store: store}
}

可以看出在构造genericregistry.Store对象的时候,没有有指定一个名为 Storage 的成员。在执行 return 语句之前仅仅只调用了一个store.CompleteWithOptions方法。跟进去之后,豁然开朗:apiserver/store.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub。名为 Storage 成员内部包含的类型为storage.Interface的对象最终是被一个名为Decorator的方法创建的。而这个方法和opts这个变量有关。顺着这条线向上查找,我们最终发现,在 Deployment 代码逻辑中有一个名为 NewStorage 的函数:kubernetes/storage.go at master · kubernetes/kubernetes · GitHubopts 这个变量的值来自于这个函数的参数optsGetter

在向上寻找 opts 参数的过程中我们发现这条链路是比较长的,也是比较绕的,很难清晰的去定位到它第一次被创建的地方。所以,我们换一种思路:因为看到 opts 这个参数的类型为generic.RESTOptionsGetter,所以我们在全局可以搜一下,哪里有对这个类型变量的赋值操作并且和 Storage 有关的。最后,我们定位到了这里:apiserver/etcd.go at c53cd379d4b8e8acbe23a7a3b40c949687ba9926 · kubernetes/apiserver · GitHub。这是一段和 etcd 配置有关的逻辑。这段逻辑在 API Server 启动构造其使用的配置的逻辑中有调用过:kubernetes/server.go at b1a52a38e9e3651680655416cc7afbec5e119854 · kubernetes/kubernetes · GitHubbuildGenericConfig函数构造的通用配置,最终赋值给了启动 Master 节点所需要的配置集合:kubernetes/server.go at b1a52a38e9e3651680655416cc7afbec5e119854 · kubernetes/kubernetes · GitHub。而这部分配置最终被用于创建 API Server: kubernetes/server.go at b1a52a38e9e3651680655416cc7afbec5e119854 · kubernetes/kubernetes · GitHub。在 CreateKubeAPIServer函数内,我们可以看到,通过参数传递进来的 master.Config 最终调用了一个名为Complete的函数处理了一下相关配置,并且通过它的返回值调用了一个 New 函数。而在 New 函数的内部,也通过调用一个名为Install的函数,将restOptionsGetter参数传了进去。

此时,我们再次返回到Deployment 代码逻辑中的 NewStorage 函数被调用的地方:kubernetes/storage_apps.go at 7f23a743e8c23ac6489340bbb34fa6f1d392db9d · kubernetes/kubernetes · GitHub,随机选取一个版本的函数v1beta1Storage,它在当前文件的一个名为NewRestStorage函数中被调用:https://github.com/kubernetes/kubernetes/blob/7f23a743e8c23ac6489340bbb34fa6f1d392db9d/pkg/registry/apps/rest/storage_apps.go#L38。随后,我们按照这条调用链路再继续向上寻找`restOptionsGetter`参数被赋值位置。随即定位到了[kubernetes/master.go at ec2e767e59395376fa191d7c56a74f53936b7653 · kubernetes/kubernetes · GitHub](https://github.com/kubernetes/kubernetes/blob/ec2e767e59395376fa191d7c56a74f53936b7653/pkg/master/master.go#L401)中的 InstallAPIs函数。

Bingo,此时我们可以将调用InstallAPIs函数的逻辑作为桥梁,将我们上面整个的查找流程链接起来。所以,当前我们已经可以确认的是,我们之前说的opts变量已经找到了出处。但实际上,在创建 Deployment storage 对象时,名为 Storage 成员内部包含的类型为storage.Interface的对象最终是被一个名为Decorator的方法创建的,这个Decorator方法来自于opts变量。再次查看Decorator方法的定义可知,它被包含在一个 Interface 内部的函数的返回值中,而这个函数正是创建opts这个变量所在的类必须要实现的:GetRESTOptions。所以,我们需要再次回到和 etcd 配置有关的逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
	if err := s.addEtcdHealthEndpoint(c); err != nil {
		return err
	}
	c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
	return nil
}

type StorageFactoryRestOptionsFactory struct {
	Options        EtcdOptions
	StorageFactory serverstorage.StorageFactory
}

func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
	storageConfig, err := f.StorageFactory.NewConfig(resource)
	if err != nil {
		return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
	}

	ret := generic.RESTOptions{
		StorageConfig:           storageConfig,
		Decorator:               generic.UndecoratedStorage,
		DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
		EnableGarbageCollection: f.Options.EnableGarbageCollection,
		ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource),
		CountMetricPollPeriod:   f.Options.StorageConfig.CountMetricPollPeriod,
	}
	if f.Options.EnableWatchCache {
		sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
		if err != nil {
			return generic.RESTOptions{}, err
		}
		cacheSize, ok := sizes[resource]
		if !ok {
			cacheSize = f.Options.DefaultWatchCacheSize
		}
		ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
	}

	return ret, nil
}

可以看到GetRESTOptions最终被StorageFactoryRestOptionsFactory类实现。查看genericregistry.StorageWithCacher的定义,一路跟下去,就会发现,我们最终是创建了一个名为cacher类型为storage.Interface的变量,它将作为 Decorator 的值:apiserver/storage_factory.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub。如果再深入至cacher创建的逻辑,可以看到,它是实现了storage.Interface的全部接口的:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub

还记得我们当时在阅读API Server中关于ListResource方法的时候,发现最终通过 ServeHTTP 函数对外提供服务的是一个storage.Interface类型对象调用了其 Watch 接口的返回值,即一个Watch-Interface类型的对象。在cacher的实现部分,我们同样可以找到一个这样的函数:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub,其内部将会为我们创建一个类型为cacheWatcher的对象apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。这个对象内部有一个非常重要的成员:result: make(chan watch.Event, chanSize),它是被一个名为ResultChan的函数暴露给外部使用的:

1
2
3
4
// Implements watch.Interface.
func (c *cacheWatcher) ResultChan() <-chan watch.Event {
	return c.result
}

这个函数相信你看到后会非常非常的熟悉,因为我们在ServeHTTP 函数中看到过它曾经被watcher.Watch()的返回值调用:apiserver/watch.go at 8a1312795085bced3bc5d4553b97c450a79fc420 · kubernetes/apiserver · GitHubnewCacheWatcher除了返回一个cacheWatcher类型的对象之外,还会启动一个goroutine, 执行一个名为process的函数:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。当它从一个名为input的 channel 中监听到有资源对象的信息发送过来的时候,就会通过sendWatchCacheEvent函数,将最新的 event 通过刚才提到的 Result Channel 发送给客户端。并且由于客户端和 API Server 之间是一个长连接,所以这个循环会一直执行。

如果读者阅读过前一篇过于 List-Watch 机制原理性的文章就可以知道,List-Watch 机制会通过 ResourceVersion 来保证发送报文的顺序性。而这部分逻辑就是在 process内实现的。如果当前客户端对 API Server 的 Watch 请求带来的 ResourceVersion 为1,那么 process 函数内的逻辑保证会返回给客户端一个序号大于1的报文。

但是,截止到目前为止,我们只是看到了cacheWatcher为每一个请求启动一个 goroutine 不断的监听资源对象信息的变化,如果有新的消息过来就返回给客户端。那么这个变化的消息是谁向inputchannel 传递过来的呢?回头看下 cacher 的数据结构:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub,我们发现有如下几个成员是值得注意的:

1
2
3
4
5
	// Underlying storage.Interface.
	storage storage.Interface
	// "sliding window" of recent changes of objects and the current state.
	watchCache *watchCache
	reflector  *cache.Reflector

storage storage.Interface(资源对象数据真正的来源)

storage 成员是在创建 Cacher 的时候就被传递进来了。根据这个顺序向回查找,查看genericregistry.StorageWithCacher的定义apiserver/storage_factory.go at b8915a5609e4d7553d92f0d431ba04ecf9b52777 · kubernetes/apiserver · GitHub,可知 storage 是通过 NewRawStorage创建的。一路跟下去后。我们最后到了创建 etcdStorage 的函数里:kubernetes/etcd_helper.go at release-1.12 · kubernetes/kubernetes · GitHub。此时,你可以看下 NewEtcdStorage的返回值还有etcdHelper类实现的的 List 和 Watch 方法,就可以明白,整个 ListWatch 机制中,API Server 从 etcd 获取资源对象信息所使用的 Watch 和 List 方法就是在这里真正的被实现。而这也符合我们之前看的 Kubernetes 的架构图中的一个细节:API Server 对外提供的一切信息都是从 etcd 而来的。如果你进入 etcd 实现的 Watch 方法中,稍微扫一眼就可以看到,它核心的逻辑就是启动一个死循环,不断的等待从 etcd 而来的有关资源对象的信息。

1
2
3
4
5
6
7
8
	for {
		resp, err := watcher.Next(w.ctx)
		if err != nil {
			w.etcdError <- err
			return
		}
		w.etcdIncoming <- resp
	}

除此之外,etcdHelper类还实现了很多和 etcd 相关的方法。那么我们姑且可以认为,storage 成员是操作 etcd 的一个封装。

watchCache *watchCache(资源对象信息的缓存)

apiserver/watch_cache.go at b080aefffce393d0aa75a2d3c62442b5515c8963 · kubernetes/apiserver · GitHub 通过观察 watchCache 和数据结构以及它实现的方法,我猜测它应该是实现了一个资源对象信息的缓存。将通过和 etcd 通信而获取到的资源对象的信息缓存在内存中。当资源对象长时间未发生变化的时候,如果再有 List 或者 Watch 请求该资源对象的信息,可以直接返回给它缓存中的内容,而不再去和 etcd 通信。

reflector *cache.Reflector

reflector 的创建一共需要两个重要的对象作为参数:

  1. listerWatcher
  2. watchCache

其中 watchCache 我们上面已经提到过,那 listerWatcher 是什么呢?在 cacher.go 文件中可以找到创建这个对象的方法的定义:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。浏览一下它实现的方法集合可知,它其实就是对上面我们说到的 storage 对象的一个封装。实际上调用的还是 storage 内实现的一些方法。

通过观察 Cacher 数据结构中几个比较重要的成员的逻辑,我们现在确定了资源对象真正的数据来源,也了解了catchWatcher 启动了一个goroutine 运行死循环等待着从inputchannel 发来的资源对象的信息。目前唯一缺少的就是数据生产者是如何将数据传递到input这个 channel 中的。

在创建了 Cacher 对象时候,我们紧接着运行了一个名为StartCaching 的方法:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。在它的逻辑内部,我们调用了 reflector 的 ListWatch 方法client-go/reflector.go at ee7a1ba5cdf1292b67a1fdf1fa28f90d2a7b0084 · kubernetes/client-go · GitHub。在这个方法中,它先通过 listerWatcher 封装的 List 方法全量的获取了一下 Kubernetes 集群中的资源,然后起了一个死循环,调用了 ListerWatcher 封装的 Watch 方法,然后在 watchHandler 方法中,通过访问一个阻塞的 channel,等待资源对象信息从 etcd 发过来。如果此时确实接收到了一个资源对象的信息,它会调用 watchCache.Add 方法,将其塞入缓存中:apiserver/watch_cache.go at b080aefffce393d0aa75a2d3c62442b5515c8963 · kubernetes/apiserver · GitHub。在 Add 的函数的逻辑中,会继续调用 processEvent函数,在其内部我们发现,除了正常的更新 watchCache 的缓存之外,还执行了行逻辑:apiserver/watch_cache.go at b080aefffce393d0aa75a2d3c62442b5515c8963 · kubernetes/apiserver · GitHub

那么这个 onEvent 函数究竟是什么呢?返回 watchCache 被创建的逻辑的位置会发现,watchCache.onEvent 是被cacher.processEvent 赋值的。cacher.processEvent 函数的定义如下:

1
2
3
4
5
6
7
func (c *Cacher) processEvent(event *watchCacheEvent) {
	if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
		// Monitor if this gets backed up, and how much.
		glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
	}
	c.incoming <- *event
}

再结合对比一下 watchCache.Add 函数的逻辑可知,从 etcd 发来的 event 将会通过 processEvent 函数传递至incoming 这个 channel。而 incoming 这个 channel 是在 cacher 对象的dispatchEvents 函数内被读取的:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。跟着这个数据流动的逻辑继续向下,我们发现这个 event 最终作为参数传递到了之前我们创建的watcher的 Add 方法内:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。进入这个 Add 函数的内部,你会发现一下子豁然开朗,因为这个 event 事件经过漫长的流程终于传递到了inputchannel: apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。而上面这些 watcher 是怎么被收集进来的呢?通过 cacher 中 Watch 方法的逻辑可以发现,它来自于对 Watch 方法的调用:apiserver/cacher.go at cf5eff4f5e8f6019796cb18c69918b9f2f09e6db · kubernetes/apiserver · GitHub。而这个方法,正是在我们前面说到的,API Server 在响应 Watch/List 相关的 HTTP 请求的时候,生成类型为watch.Interface且名为catcherWatcher对象时调用的。

至此,整个List-Watch机制中,资源对象的数据从 etcd 到 API Server HTTP API 之间的数据流动过程就都走通了。