写在前面

在上一篇blog 中,我们从源码的角度来分析了 Kubernetes 中的List-Watch机制的部分内容。它更注重于 API Server 和 etcd 之间的交互。通过下面的这幅 Kubernetes 的架构图和List-Watch机制的时序图我们可以知道,API Server 通过List-Watch机制获取到的资源对象的信息还将用于 Kubernetes 内置的 Controller 和我们自定义的 Controller,甚至是 Scheduler 和 Kubelet。

本文我们将通过对ReplicaSetController源码的分析来了解,API Server 保存的和资源对象有关的数据到底是怎么流动到 Controller(目的组件)中的,以及最终这些数据是如何被处理的。

ControllerManager

Kubernetes 中一个非常重要的组件就是:ControllerManager。它负责管理集群内部各个资源控制器(Controller) 的启停。通过阅读 ControManager 的代码可以很容易的发现,它在运行起来的时候会启动 Kubernetes 内置的各个 Controller。(下面的实例我截取了部分关键的逻辑,完整版的链接:kubernetes/controllermanager.go at f48e18faa4dc035cc927c6a2b34c83c8475b55fa · kubernetes/kubernetes · GitHub

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
	run := func(ctx context.Context) {
		...
		controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
		if err != nil {
			klog.Fatalf("error building controller context: %v", err)
		}
		...
		if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
			klog.Fatalf("error starting controllers: %v", err)
		}

		controllerContext.InformerFactory.Start(controllerContext.Stop)
		close(controllerContext.InformersStarted)

上述逻辑有几个值得注意的点:

  1. StartControllers
  2. NewControllerInitializers
  3. controllerContext
    1. controllerContext.InformerFactory.Start

StartControllers

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
...
	for controllerName, initFn := range controllers {
		if !ctx.IsControllerEnabled(controllerName) {
			glog.Warningf("%q is disabled", controllerName)
			continue
		}

time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))

		glog.V(1).Infof("Starting %q", controllerName)
		debugHandler, started, err := initFn(ctx)

StartControllers 的逻辑比较简单,它遍历了一个存有 Controller 的 Map,依次调用 Controller 的启动函数initFn。这个 Map 是通过函数参数传递进来的,对这个参数进行赋值的位置也正是NewControllerInitializers函数被调用的位置。

NewControllerInitializers

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
	controllers := map[string]InitFunc{}
	controllers["endpoint"] = startEndpointController
	controllers["replicationcontroller"] = startReplicationController
	controllers["podgc"] = startPodGCController
	controllers["resourcequota"] = startResourceQuotaController
	controllers["namespace"] = startNamespaceController
	controllers["serviceaccount"] = startServiceAccountController
	controllers["garbagecollector"] = startGarbageCollectorController
	controllers["daemonset"] = startDaemonSetController
	controllers["job"] = startJobController
	controllers["deployment"] = startDeploymentController
	controllers["replicaset"] = startReplicaSetController
	controllers["horizontalpodautoscaling"] = startHPAController
	controllers["disruption"] = startDisruptionController
	controllers["statefulset"] = startStatefulSetController

截取该函数的部分逻辑就可以看出,它构造了一个 key 为 Controller 名字,value 为 Controller 创建函数的 Map。

controllerContext

有了 NewControllerInitializersStartControllers的出现,按理说 Controller 就可以启动了。但是我们发现在StartControllers函数调用的位置,传递进去了一个比较特殊的参数:controllerContext,它是通过CreateControllerContext函数创建的。

查看CreateControllerContext函数逻辑后发现并没有什么特别的地方,唯一值得注意的就是一个名为sharedInformers的变量。因为他在 ControllerManager 中的 Run 函数内调用了自己的 Start 方法。

controllerContext.InformerFactory

controllerContext.InformerFactory被我们在上面说到sharedInformers赋值。根据创建shardInformers 的流程跟进下去,我们发现它的核心逻辑在这里:kubernetes/factory.go at 7f23a743e8c23ac6489340bbb34fa6f1d392db9d · kubernetes/kubernetes · GitHub。并且同时发现,它所实现的 Start 函数,其实是通过遍历其内部一个名为informers的Map,调用了每一个informer实现的 Run 函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
	factory := &sharedInformerFactory{
		client:           client,
		namespace:        v1.NamespaceAll,
		defaultResync:    defaultResync,
		informers:        make(map[reflect.Type]cache.SharedIndexInformer),
		startedInformers: make(map[reflect.Type]bool),
		customResync:     make(map[reflect.Type]time.Duration),
	}

	// Apply all options
	for _, opt := range options {
		factory = opt(factory)
	}

	return factory
}

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()

	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
			go informer.Run(stopCh)
			f.startedInformers[informerType] = true
		}
	}
}

那么现在看起来,这个informer和其 Run 函数的实现就值得我们再去深入的了解一下了。

informer

Informer 的数据类型是cache.SharedIndexInformer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// SharedInformer has a shared data cache and is capable of distributing notifications for changes
// to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is
// one behavior change compared to a standard Informer.  When you receive a notification, the cache
// will be AT LEAST as fresh as the notification, but it MAY be more fresh.  You should NOT depend
// on the contents of the cache exactly matching the notification you've received in handler
// functions.  If there was a create, followed by a delete, the cache may NOT have your item.  This
// has advantages over the broadcaster since it allows us to share a common cache across many
// controllers. Extending the broadcaster would have required us keep duplicate caches for each
// watch.
type SharedInformer interface {
	// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
	// period.  Events to a single handler are delivered sequentially, but there is no coordination
	// between different handlers.
	AddEventHandler(handler ResourceEventHandler)
	// AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
	// specified resync period.  Events to a single handler are delivered sequentially, but there is
	// no coordination between different handlers.
	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
	// GetStore returns the Store.
	GetStore() Store
	// GetController gives back a synthetic interface that "votes" to start the informer
	GetController() Controller
	// Run starts the shared informer, which will be stopped when stopCh is closed.
	Run(stopCh <-chan struct{})
	// HasSynced returns true if the shared informer's store has synced.
	HasSynced() bool
	// LastSyncResourceVersion is the resource version observed when last synced with the underlying
	// store. The value returned is not synchronized with access to the underlying store and is not
	// thread-safe.
	LastSyncResourceVersion() string
}

type SharedIndexInformer interface {
	SharedInformer
	// AddIndexers add indexers to the informer before it starts.
	AddIndexers(indexers Indexers) error
	GetIndexer() Indexer
}

通过它的数据结构的定义以及注释我们兴奋的发现,SharedIndexInformer类型的对象可能正是多个 Controller 从 API Server 接收资源对象信息的一个入口。总结下来,SharedIndexInformer有如下几个特点:

  1. 它和 API Server 一样,在内部会维护一个缓存。一旦缓存的内容有变化,它将通知给关心这些消息的对象。「关心」的方式很简单,就是自己实现这些消息的处理函数,并将它们注册到SharedIndexInformer类型的实例中

    1
    2
    3
    4
    5
    6
    7
    8
    
    // ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
    // as few of the notification functions as you want while still implementing
    // ResourceEventHandler.
    type ResourceEventHandlerFuncs struct {
    	AddFunc    func(obj interface{})
    	UpdateFunc func(oldObj, newObj interface{})
    	DeleteFunc func(obj interface{})
    }
  2. SharedIndexInformer 所维护的缓存将尽量保证其中的内容和实时的资源对象的信息一致。但是我们不能够依赖通过 event 形式发来的资源对象的信息和缓存内容之间的匹配关系。如过一个资源对象的创建和删除操作发生的间隔时间比较短,那么缓存中最终是没有这个资源对象的信息的。这非常符合 Kubernetes 的开发理念:保持数据的最终一致性。

Run

大概了解了informer 的实现原理后,我们来看下它的 Run 函数的逻辑: client-go/shared_informer.go at ee7a1ba5cdf1292b67a1fdf1fa28f90d2a7b0084 · kubernetes/client-go · GitHub

在去掉了一些干扰性的信息之后,我们发现,Run 函数做的核心工作有两点:

  1. 通过配置信息创建一个 controller 对象
  2. 调用这个 controller 对象的 Run 函数
controller.Run

在 controller 对象的 Run 函数中我们发现,它创建了一个让我们非常熟悉的对象:Reflector。 这个类型为 Reflector 的对象之前在 API Server 中也被创建过:Detect the Source Code of List Watch Between API Server and Etcd - LittleDriver。它有两个比较重要的功能:

  1. 封装了 List/Watch 等方法在 listerWatcher 成员内部,用于后期调用获取资源对象的数据
  2. 维护了一个缓存(以队列的形式实现),存储获取到的资源对象的数据

controller.Run 函数在末尾调用了 Reflector 对象的 Run 函数, 其内部的核心逻辑就是调用listerWatcher 成员的 ListWatch 方法,开始等待接受资源对象的信息。这部分和 API Server 等待接收来自 ectd 的消息原理是一样的。

除此之外,它还调用了一个名为processLoop的函数。这个函数的工作也很简单:不断的从 Reflector对象维护的资源对象信息的缓存中取出消息进行处理。

回到 Controller Manager

通过递归的观察 Controller Manager 的 Run 函数的逻辑,我们基本上可以确定下一阶段要关注的重点:informer 的创建者。因为这整段的逻辑看下来,最终通过List-Watch机制获取资源对象信息的是 informer。而 informer 的创建逻辑并没有在controllerContext.InformerFactory对象初始化的时候完成。那么也就是说,它是在之后的逻辑中对其内部的 informer 成员赋值的。

再次查看StartControllers的逻辑可知,持有controllerContext.InformerFactory对象的controllerContext,作为参数被传递到了每一个 Controller 的初始化函数。所以,看起来 informer 肯定是在各个 Controller 的逻辑中被赋值的。

ReplicaSetController

ReplicaSetController 的初始化函数如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
	if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
		return nil, false, nil
	}
	go replicaset.NewReplicaSetController(
		ctx.InformerFactory.Apps().V1().ReplicaSets(),
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
		replicaset.BurstReplicas,
	).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
	return nil, true, nil
}

可以看出在一开始调用 ReplicaSetController 的创建函数的时候,我们就传递进去了两个 Informer。选择ReplicaSets函数跟进去发现,它返回是一个类型为replicaSetInformer的对象。而replicaSetInformer这个类实现了一个名为Informer的方法,它返回的正式一个我们前面说到的SharedIndexInformer类型的对象。我们之前要找的和各个 Controller 相关的 informer 对象也是SharedIndexInformer类型。通过查看Informer函数的逻辑,它一共涉及到以下三个函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// NewFilteredReplicaSetInformer constructs a new informer for ReplicaSet type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredReplicaSetInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.AppsV1().ReplicaSets(namespace).List(options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.AppsV1().ReplicaSets(namespace).Watch(options)
			},
		},
		&appsv1.ReplicaSet{},
		resyncPeriod,
		indexers,
	)
}

func (f *replicaSetInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	return NewFilteredReplicaSetInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func (f *replicaSetInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&appsv1.ReplicaSet{}, f.defaultInformer)
}

看起来,关键的部分大概有两个:

  1. 实际创建这个 Informer 对象的函数应该是NewFilteredReplicaSetInformer方法
  2. NewFilteredReplicaSetInformer方法作为参数传到了我们前面提到的controllerContext.InformerFactory对象的InformerFor方法中

最终,我们通过查看InformerFor方法的逻辑可知,NewFilteredReplicaSetInformer 最终在这里被调用,生成一个 informer 对象返回给外部。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

	informerType := reflect.TypeOf(obj)
	informer, exists := f.informers[informerType]
	if exists {
		return informer
	}

	resyncPeriod, exists := f.customResync[informerType]
	if !exists {
		resyncPeriod = f.defaultResync
	}

	informer = newFunc(f.client, resyncPeriod)
	f.informers[informerType] = informer

	return informer
}

Informer方法调用的位置是在ReplicaSetController 创建函数中:kubernetes/replica_set.go at a3ccea9d8743f2ff82e41b6c2af6dc2c41dc7b10 · kubernetes/kubernetes · GitHub。并且,同时还通过informer对象向controllerContext.InformerFactory注册了相应的 Handler 函数,以便之后处理接收到的资源对象的信息。

链接 ReplicaSetController 和 controllerContext.InformerFactory

之所以要看下 ReplicaSetControllercontrollerContext.InformerFactory是如何建立链接开始交互的,是因为 ReplicaSetController 最终还是要靠之前提到的Reflector对象调用 List/Watch方法从 API Server 接受资源对象的信息。而这个 Reflector对象是被实现在controllerContext.InformerFactory中的。

返回上面提到informer 对象调用Run的地方,我们发现,Reflector对象是借助了informer 对象的listWatcher成员创建的。而这个listWatcher成员是在informer对象被创建的时候就赋值进去了,赋值的位置就是NewFilteredReplicaSetInformer函数。

截止到目前为止,我们找到了资源对象信息的生产者:Reflector,也找到了这些信息的消费者:ReplicaSetController为其informer 注册的处理函数。那么这个消息是如何从生产者送到消费者的呢?

返回Reflector对象被创建的位置:client-go/controller.go at 03bfb9bdcfe5482795b999f39ca3ed9ad42ce5bb · kubernetes/client-go · GitHub。我们发现它除了执行生产消息塞入缓存的逻辑wg.StartWithChannel(stopCh, r.Run),还调用了一个名为processLoop的函数。跟进去之后发现,该函数的功能是不断的从缓存中取出有效的内容(资源对象的信息)并通过一个函数c.config.Process对其进行处理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == FIFOClosedError {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

函数c.config.Process是在 informer 对象 Run 起来的时候被创建的,也就是上面提到的 controller 对象被创建的地方:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process: s.HandleDeltas,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

其中HandleDeltas便是函数c.config.Process的实际值。接下来,我们可以跟进到HandleDeltas函数的内部一探究竟:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Added, Updated:
			isSync := d.Type == Sync
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

对于从缓存中取出的每一个 event(资源对象的信息),在HandleDeltas中都会被判定eventType,然后交由processor.distribute方法继续处理。而在processor.distribute方法的内部,我们发现 event 最终给了 processor 的 listener 成员:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
}

由于这个 Processor 对象是 informer 对象的一个内部成员,但是我在查找 ReplicaSet 的 informer 被创建的逻辑的时候并没有发现 Processor.Listeners 或 p.syncingListeners 被赋值。但是我找到了Processor 实现的一个名为 addListener的方法。它看起来是专门为了添加 listener 成员给存在的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (p *sharedProcessor) addListener(listener *processorListener) {
	p.listenersLock.Lock()
	defer p.listenersLock.Unlock()

	p.addListenerLocked(listener)
	if p.listenersStarted {
		p.wg.Start(listener.run)
		p.wg.Start(listener.pop)
	}
}

查看该函数的调用位置可知,仅有一处,在sharedIndexInformer类的AddEventHandlerWithResyncPeriod方法中。看着这个函数名我觉得非常的熟悉,并且它又是属于sharedIndexInformer类,很可能和 informer 对象有关。那也就是说,它可能和 ReplicaSetController 也有关系,毕竟 informer 对象就是在 ReplicaSetController 的 New 函数中被创建的。

此时,我返回ReplicaSetController的 New 函数。我发现它在创建了informer 对象之后还执行了一个名为 AddHandler 的函数,将处理 event 的函数通过informer对象向controllerContext.InformerFactory注册了相应的 Handler 函数。跟进 AddHandler 这个函数之后我们发现:

1
2
3
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
	s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}

果然,我们上面提到的处理 event 的 listener 就是 ReplicaSetController 注册的 Handler 函数。

结束语

至此,从 ControllerManager 到 API Server 之间通过 List-Watch 机制处理资源对象信息的过程也就梳理完成了。如果你此时返回到本文头部给出的 List-Watch 机制时序图,就可以发现,其实我们这整个一篇 blog,都在说 List-Watch-1这个虚线框内,标号为 0, 4的过程。而标号为2,3的两个过程,真是我们昨天讲的,API Server 和 ectd 交互的过程。