因为 blog 贴过多的代码或者链接不太美观,也不易于阅读。所以,建议大家下载 Kubernetes 1.12 版本的源码再配合 blog 来看。blog 只能对各位了解 StatefulSet Controller 原理起到一个抛砖引玉的作用。如果想了解的再清楚一些,还是要仔细的看一下它的内部实现。

写在前面

StatefulSet 作为 Kubernetes 提供的一种用于管理有状态服务的资源对象是非常重要的,尤其是对于数据库应用来说。宏观上来说,StatefulSet 的结构是比较简单的,它不像 Deployment,和 pod 之间还隔着一个 ReplicaSet。StatefulSet 是直接管辖 Pod 的。那么 StatefulSet Controller 内部是如何实现上一篇文章中所提到的那些用于管理有状态服务的特性的呢?

创建

kubernetes/stateful_set.go at release-1.12 · kubernetes/kubernetes · GitHub

StatefulSet Controller 和其他资源的 Controller 启动的方式一样,都是在 ControllerManager 中被创建和启动的。对于 StatefulSet 来说,ControllerManager 调用了一个名为 NewStatefulSetController 的函数来创建其 Controller。StatefulSet Controller 的数据结构如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// StatefulSetController controls statefulsets.
type StatefulSetController struct {
	// client interface
	kubeClient clientset.Interface
	// control returns an interface capable of syncing a stateful set.
	// Abstracted out for testing.
	control StatefulSetControlInterface
	// podControl is used for patching pods.
	podControl controller.PodControlInterface
	// podLister is able to list/get pods from a shared informer's store
	podLister corelisters.PodLister
	// podListerSynced returns true if the pod shared informer has synced at least once
	podListerSynced cache.InformerSynced
	// setLister is able to list/get stateful sets from a shared informer's store
	setLister appslisters.StatefulSetLister
	// setListerSynced returns true if the stateful set shared informer has synced at least once
	setListerSynced cache.InformerSynced
	// pvcListerSynced returns true if the pvc shared informer has synced at least once
	pvcListerSynced cache.InformerSynced
	// revListerSynced returns true if the rev shared informer has synced at least once
	revListerSynced cache.InformerSynced
	// StatefulSets that need to be synced.
	queue workqueue.RateLimitingInterface
}

其中 kubeclient 是从外部通过参数传递进来的一个 client 对象,StatefulSet Controller 会用它来调用一些 API 和 API Server 进行通信。queue 则作为 StatefulSet Controller 接收 StatefulSet Event 的缓存。StatefulSet Controller 的本质就是根据队列中的 Event 将 StatefulSet 对象以及其所管辖的 Pod 驱动至用户的 Desire State。而剩余的成员大致可以分为以下两类:

  • Lister
  • control

Lister

StatefulSet Controller 关心两种资源

  1. StatefulSet
  2. Pod

所以,在 StatefulSet Controller 中就必须要通过 List-Watch 机制来收集这两种资源的 Event。既然提到了 List-Watch,那么在 Controller 和 API Server 之间就一定会有相应类型的 Informer 的存在。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	// lookup the statefulset and enqueue
	AddFunc: ssc.addPod,
	// lookup current and old statefulset if labels changed
	UpdateFunc: ssc.updatePod,
	// lookup statefulset accounting for deletion tombstones
	DeleteFunc: ssc.deletePod,
})
ssc.podLister = podInformer.Lister()
ssc.podListerSynced = podInformer.Informer().HasSynced

PodInformer 的创建是很简单的:通过对podInformer.Informer()的调用,向 SharedInformerFactory 中注册了一个属于 StatefulSet Controller 的 PodInformer。并且向这个 Informer 中注册了 Event 处理函数。podInformer.Lister()将会生成一个类型为PodLister的对象,它里面包含了一个 List 所有 ns 下 Pod 的函数以及一个可以只 List 特定 ns 下 Pod 的函数。该对象同样实现了 List 和 Get 接口。

1
2
3
4
5
6
7
8
// PodLister helps list Pods.
type PodLister interface {
	// List lists all Pods in the indexer.
	List(selector labels.Selector) (ret []*v1.Pod, err error)
	// Pods returns an object that can list and get Pods.
	Pods(namespace string) PodNamespaceLister
	PodListerExpansion
}

control

control 整个 StatefulSet Controller 最核心的成员。StatefulSet Controller 几乎所有的功能逻辑均由这个成员的方法来实现。它的数据结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// NewDefaultStatefulSetControl returns a new instance of the default implementation StatefulSetControlInterface that
// implements the documented semantics for StatefulSets. podControl is the PodControlInterface used to create, update,
// and delete Pods and to create PersistentVolumeClaims. statusUpdater is the StatefulSetStatusUpdaterInterface used
// to update the status of StatefulSets. You should use an instance returned from NewRealStatefulPodControl() for any
// scenario other than testing.
func NewDefaultStatefulSetControl(
	podControl StatefulPodControlInterface,
	statusUpdater StatefulSetStatusUpdaterInterface,
	controllerHistory history.Interface,
	recorder record.EventRecorder) StatefulSetControlInterface {
	return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
}

根据注释以及相应参数的定义可以了解到,podControl 主要用于操作 StatefulSet 管辖的 Pod,statusUpdater 主要用于更新 StatefulSet 的状态相关字段。

运行

运行流程

在 Kubernetes 中有很多的 Controller,StatefulSet Controller 只是其中的一种。但是这些 Controller 的执行流程基本上是一样的(如上图所示)。本质上就是一个生产者和消费者的模型,中间用队列进行解耦。

这里要对 RateLimitedQueue 多说一下。RateLimitedQueue 是 Kubernetes 自己实现的一个「延迟队列」。当一个包含了 Event 的 Task 的处理过程发生错误的时候,它没有立即将它加入队列,而是延迟一定的时间再加入。思考了一下,「延迟队列」的好处可能有以下几点:

  1. 若处理 Task 失败后立即将其塞入队列且异常情况短时间内无法被处理,Queue 中将会有大量的「会处理失败」的Task。这可能导致「会处理成功」的 Task 的处理效率降低,甚至出现饥饿和饿死的情况。并且,频繁的重试一定会失败的 Task 还会增加消费者的负载。
  2. 某些类型的任务的处理过程分为多个阶段。每个阶段之间都必须要等待一段时间才可以。比如 Pod 的删除,可以在Pod.Spec 中指定一个参数DeletionGracePeriodSeconds,以此来实现在 Pod 删除操作触发的一段时间之后再真正的执行 Pod 的删除逻辑(etcd 记录的删除,相应容器的销毁)。

Sync

通过上面的运行流程图可以看到,Worker 从 EventQueue 中取出 Event 包装成 Task,并且将 Task 交给了 Sync 函数来处理。Sync 函数的核心逻辑如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
	return err
}

set, err := ssc.setLister.StatefulSets(namespace).Get(name)
if errors.IsNotFound(err) {
	glog.Infof("StatefulSet has been deleted %v", key)
	return nil
}
if err != nil {
	utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
	return err
}

selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
	utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
	// This is a non-transient error, so don't retry.
	return nil
}

if err := ssc.adoptOrphanRevisions(set); err != nil {
	return err
}

pods, err := ssc.getPodsForStatefulSet(set, selector)
if err != nil {
	return err
}

return ssc.syncStatefulSet(set, pods)
  1. 通过 Task 的 Key,解压出其对应的 StatefulSet 的 Name 和 Namespace
  2. 从 StatefulSet Informer 的缓存中,通过 Name 拿到相应的 StatefulSet 对象的信息
  3. 对 Revision 资源对象尝试执行 Adoption 操作
  4. 获取 StatefulSet 管辖的 Pod,并对 Pod 执行 Adoption 操作
  5. 进入 ssc.syncStatefulSet,执行接下来的处理逻辑

大致看完 Sync 函数的实现之后发现,其内部并没有执行一些和创建,删除等关键操作相关的逻辑。没看到的一些实现细节可能在ssc.syncStatefulSet函数内。而ssc.syncStatefulSet函数最终也是调用了前面说到的control成员的UpdateStatefulSet方法。

但是 Sync 函数内仍然有一些细节值得去深入的了解:

  1. Revision 是什么资源?用来做什么的?
  2. Adoption 是什么操作?

首先,来说说 Adoption 操作。

Adoption

Adoption 是 Controller 一种处理 owner 和 denpendent 资源关系不正常的一种方式,这种关系是通过 dependent 对象中的 ownerReference 信息来建立的,通常将它称为「级联关系」。ownerReference 在 dependent 对象部分,使得 dependent 可以据此找到其 owner。而 owner 则是通过 label Selector 来找到其对应的 dependent 资源。当 owner 资源的 Controller 在处理 Owner 的时候发现,通过 label Selector 找到的 dependent 资源中的 OwnerReference 已经为空。就会执行一个名为 Adoption 的操作:根据 owner 资源的信息将正确的 OwnerReference 端重新写入进 dependent 对象中,以便修复 owner 和 denepedent 之间的级联关系。

级联关系主要用于「资源管理」这个问题域中的「资源清理」问题。Kubernetes 通过 GC 和级联关系一起实现了对集群中异常资源对象的清理。

Revision

按理来说,StatefulSet 是直接管理 Pod 的,所以 StatefulSet Controller 应该仅处理 StatefulSet 和 Pod 两种资源。即使要执行 Adoption 操作,也是在ssc.getPodsForStatefulSet内对 Pod 执行。那么为什么还要对 Revision 这个资源执行呢?而且 k8s 的文档上面也没有公布过这个资源的相关信息。

递归的跟入ssc.adoptOrphanRevisions函数,最终发现了这个资源的定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// ControllerRevision implements an immutable snapshot of state data. Clients
// are responsible for serializing and deserializing the objects that contain
// their internal state.
// Once a ControllerRevision has been successfully created, it can not be updated.
// The API Server will fail validation of all requests that attempt to mutate
// the Data field. ControllerRevisions may, however, be deleted. Note that, due to its use by both
// the DaemonSet and StatefulSet controllers for update and rollback, this object is beta. However,
// it may be subject to name and representation changes in future releases, and clients should not
// depend on its stability. It is primarily for internal use by controllers.
type ControllerRevision struct {
	metav1.TypeMeta `json:",inline"`
	// Standard object's metadata.
	// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#metadata
	// +optional
	metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

	// Data is the serialized representation of the state.
	Data runtime.RawExtension `json:"data,omitempty" protobuf:"bytes,2,opt,name=data"`

	// Revision indicates the revision of the state represented by Data.
	Revision int64 `json:"revision" protobuf:"varint,3,opt,name=revision"`
}

根据注释可以知道,ControllerRevision 这个资源主要用于 DaemonSet 和 StatefulSet 两种资源的更新和回滚。回想一下之前对于 StatefulSet 的介绍,StatefulSet 的 RollingUpdate 更新策略是允许多个不同版本的 Pod 存在的。而 Pod 对应了 StatefulSet,所以Pod 版本的不同就代表着 StatefulSet 也不同。这样一来。既然 ControllerRevision 代表的是 StatefulSet 的版本信息,那么它理应被 StatefulSetController 进行管理。

control.UpdateStatefulSet——创建 Pod

UpdateStatefulSet 函数会先获取 StatefulSet 的全部 revision,并将它们从小到大排序。然后会根据已经获取到的 revision 集合计算一下当前的 revision 和下一个版本的 revision。第一次创建的时候,updateRevision 和 currentRevision 都是相同的,为1。

然后,将会执行一个名为updateStatefulSet的函数。这个函数里包含了 Pod 的创建,删除,更新等操作的核心逻辑。

updateStatefulSet

假设现在有一个两副本的 StatefulSet 需要创建

Round 1

对于 StatefulSet 第一次创建来说,该函数的内很多逻辑都是不会执行的。这是因为通过参数传进来的 Pods 集合为空。所以,会首先创建指定数量的 Pod。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision
for ord := 0; ord < replicaCount; ord++ {
	if replicas[ord] == nil {
		replicas[ord] = newVersionedStatefulSetPod(
			currentSet,
			updateSet,
			currentRevision.Name,
			updateRevision.Name, ord)
	}
}

通过调用newVersionedStatefulSetPod函数创建了即将要生成的 Pod 的实例,并且将其塞入到了一个 Slice 中。其中 index 为 Pod 的序号,value 为 Pod 的实例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// Examine each replica with respect to its ordinal
for i := range replicas {
	// delete and recreate failed pods
	if isFailed(replicas[i]) {
		...
	}
	// If we find a Pod that has not been created we create the Pod
	if !isCreated(replicas[i]) {
		if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil {
			return &status, err
		}
		status.Replicas++
		if getPodRevision(replicas[i]) == currentRevision.Name {
			status.CurrentReplicas++
		}
		if getPodRevision(replicas[i]) == updateRevision.Name {
			status.UpdatedReplicas++
		}

		// if the set does not allow bursting, return immediately
		if monotonic {
			return &status, nil
		}
		// pod created, no more work possible for this round
		continue
	}
	// If we find a Pod that is currently terminating, we must wait until graceful deletion
	// completes before we continue to make progress.
	if isTerminating(replicas[i]) && monotonic {
		...
	}
	// If we have a Pod that has been created but is not running and ready we can not make progress.
	// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
	// ordinal, are Running and Ready.
	if !isRunningAndReady(replicas[i]) && monotonic {
		glog.V(4).Infof(
			"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready",
			set.Namespace,
			set.Name,
			replicas[i].Name)
		return &status, nil
	}
	// Enforce the StatefulSet invariants
	if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
		continue
	}
	// Make a deep copy so we don't mutate the shared cache
	replica := replicas[i].DeepCopy()
	if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil {
		return &status, err
	}
}

接下来,将会遍历刚刚生成的带有要创建 Pod 实例的 Slice。因为还没有创建过 Pod ,所以控制流会进入!isCreated()分支。在该分支内部,会通过调用 API Server 的 HTTP API 为 StatefulSet 创建 Pod。因为第一创建的时候 Pod 的 Revision 就是被 currentRevision.Name 赋值的,所以两者相等,相应的 CurrentReplicas 也需要加1。

Pod 的 Revision 被赋值的位置其实就是 Slice 中 Pod 实例被创建的位置。其中 currentRevision.Name 最终被写入到了 Pod 的 Label 内,key 为 ControllerRevisionHashLabelKey, Value 为 currentRevision.Name。

在创建了第一个 Pod 之后,发现它需要判断monotonic变量。看起来,如果这个条件为 True,就证明该 StatefulSet 不支持以 burst 形式创建 Pod。那么 burst 是个什么形式呢?

1
2
3
4
5
6
monotonic := !allowsBurst(set)

// allowsBurst is true if the alpha burst annotation is set.
func allowsBurst(set *apps.StatefulSet) bool {
	return set.Spec.PodManagementPolicy == apps.ParallelPodManagement
}

查看 monotonic被赋值的位置以及其函数定义可知,这个变量代表的是对「StatefulSet 是否允许并行操作 Pod」 条件的判断结果。根据上一篇关于 StatefulSet 行为介绍的文章内容,StatefulSet 默认情况下是需要按照顺序逐个操作 Pod 的。如果你想并行的去操作而不在意先后顺序,则需要为其指定管理策略:ParallelPodManagement。

所以,创建 Pod 的流程在这一步会被临时结束,返回一个 status 的对象。status 对象内部的信息展示了当前这个 StatefulSet 对象以及其管辖 Pod 的情况。

在第一次退出 updateStatefulSet 函数之后,control.UpdateStatefulSet 函数将会执行接下来的逻辑:向 API Server 更新 StatefulSet 的状态。

Round 2

再次进入 updateStatefulSet 函数的时候,pods 集合就不再为空了,它会包含之前创建好的 Pod。遍历 Pod 集合,会将「合法」的 Pod 放在上一次保存在一个 Slice 内。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// First we partition pods into two lists valid replicas and condemned Pods
for i := range pods {
	status.Replicas++

	// count the number of running and ready replicas
	if isRunningAndReady(pods[i]) {
		status.ReadyReplicas++
	}

	// count the number of current and update replicas
	if isCreated(pods[i]) && !isTerminating(pods[i]) {
		if getPodRevision(pods[i]) == currentRevision.Name {
			status.CurrentReplicas++
		}
		if getPodRevision(pods[i]) == updateRevision.Name {
			status.UpdatedReplicas++
		}
	}

	if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount {
		// if the ordinal of the pod is within the range of the current number of replicas,
		// insert it at the indirection of its ordinal
		replicas[ord] = pods[i]

	} else if ord >= replicaCount {
		// if the ordinal is greater than the number of replicas add it to the condemned list
		condemned = append(condemned, pods[i])
	}
	// If the ordinal could not be parsed (ord < 0), ignore the Pod.
}

因为在第一轮创建中,实际上只创建了一个 Pod,所以这一次要为第二个 Pod 再次创建一个 Pod 实例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision
for ord := 0; ord < replicaCount; ord++ {
	if replicas[ord] == nil {
		replicas[ord] = newVersionedStatefulSetPod(
			currentSet,
			updateSet,
			currentRevision.Name,
			updateRevision.Name, ord)
	}
}

在 StatefulSet 没有被删除的情况下,将会和 Round1 开始遍历 replicas 这个 Slice。如果遍历到已经创建过的 Pod 实例,会进行如下的判断:

  1. 是否当前 Pod 已经处于 Running 状态。如果没有,那么退出此轮创建,继续等待
  2. 如果 Pod 已经正常运行,会检查他的 Name,ordinal Index 等信息是否合法。如果合法,那么继续处理下一个候选 Pod。否则,重新执行一次前面的「检测」逻辑,若有问题则更新这个 Pod,否则不做任何处理,退出此次创建。

当遍历到第二个 Pod 实例的时候,因为没有被创建过,所以会执行和 Round 1 同样的逻辑。直到两个 Pod 都创建完成。

Other Branch

在遍历 replicas 这个 Slice 的过程中,还有几个其他的分支也有必要说下。

  • isFailed: 如果某个 Pod 创建失败,那么 StatefulSet Controller 会删掉它,并且重新创建它。此时,它会继续执行 !isCreated 分支的内容。
  • isTerminating && monotonic: 如果 Pod 已经在终止的过程中且开启了「按顺序处理 Pod 」的特性,将会退出此次循环
  • isRunningAndReady && monotonic: 如果 Pod 已经被创建,但是还没有 Running && Ready,且开启了「按顺序处理 Pod 」的特性,将会退出此次循环,直到该 Pod 正常运行才继续操作。这是符合 StatefulSet 的预期行为的。

control.UpdateStatefulSet——终止 Pod

在了解了创建流程之后,如果你再观察 Sync 函数,就会发现,不管是 StatefulSet 的创建,删除,还是更新,都是在 UpdateStatefulSet 这个函数内处理的。而在UpdateStatefulSet 内也只执行了 updateStatefulSet 一个函数。

在 StatefulSet 中,知道它有一个很重要的功能就是「横向扩缩容」。可以动态的增加或者减少 StatefulSet 管辖的 Pod 数量。其中「扩容功能」仍然可以复用上面创建 Pod 的逻辑。因为决定是否要创建新的 Pod 的条件,就是 StatefulSet 对象中 Replica 字段的值。它代表了 StatefulSet 的副本数,即 Pod 数。

在 Replica 规定数量的 Pod 都被创建且成功运行之后,假设要进行「缩容」操作。updateStatefulSet 具体会为怎样处理呢?

修改 Replica

既然 Replica 是作为 StatefulSet 扩缩容判定的标准,想要触发「缩容」,就需要手动改小 Replica 的值。接着上面描述的例子:一个具有两副本的 StatefulSet。此时将 Replica 的值改为1。控制流会因为响应了 StatefulSet 的 Update Event,再一次进入到 updateStatefulSet 函数中。

Round 1

当 Pod 集合不为空时,首先要根据 Pod 的 Ordinal Index 和 Replica 的值进行比较。因为 Pod 的 Ordinal Index 是从 0 开始的,所以若发现 Ordinal Index 已经大于等于 Replica 的值的时候,那么这个 Pod 就是要被缩容。它将会被塞入一个名为 condemned 的 Slice 中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// slice that will contain all Pods such that 0 <= getOrdinal(pod) < set.Spec.Replicas
replicas := make([]*v1.Pod, replicaCount)
// slice that will contain all Pods such that set.Spec.Replicas <= getOrdinal(pod)
condemned := make([]*v1.Pod, 0, len(pods))

if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount {
	// if the ordinal of the pod is within the range of the current number of replicas,
	// insert it at the indirection of its ordinal
	replicas[ord] = pods[i]

} else if ord >= replicaCount {
	// if the ordinal is greater than the number of replicas add it to the condemned list
	condemned = append(condemned, pods[i])
}

因为在遍历 pods 集合的时候,无法确定 Pod 在 Slice 中的顺序是怎样的。而在缩容的过程中,又必须按照 Pod 的 Ordinal Index 逆序进行操作。所以,会对 condemned 进行一次排序。

1
sort.Sort(ascendingOrdinal(condemned))

然后,遍历 condemned 这个 Slice:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
for target := len(condemned) - 1; target >= 0; target {
	// wait for terminating pods to expire
	if isTerminating(condemned[target]) {
		glog.V(4).Infof(
			StatefulSet %s/%s is waiting for Pod %s to Terminate prior to scale down,
			set.Namespace,
			set.Name,
			condemned[target].Name)
		// block if we are in monotonic mode
		if monotonic {
			return &status, nil
		}
		continue
	}
	// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
	if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod {
		glog.V(4).Infof(
			StatefulSet %s/%s is waiting for Pod %s to be Running and Ready prior to scale down,
			set.Namespace,
			set.Name,
			firstUnhealthyPod.Name)
		return &status, nil
	}
	glog.V(2).Infof(StatefulSet %s/%s terminating Pod %s for scale down,
		set.Namespace,
		set.Name,
		condemned[target].Name)

	if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil {
		return &status, err
	}
	if getPodRevision(condemned[target]) == currentRevision.Name {
		status.CurrentReplicas
	}
	if getPodRevision(condemned[target]) == updateRevision.Name {
		status.UpdatedReplicas
	}
	if monotonic {
		return &status, nil
	}
}

在第一轮执行这段逻辑的时候,若被缩容的 Pod 运行正常,会直接执行ssc.podControl.DeleteStatefulPod。该函数内部的逻辑就是调用 API Server 的 HTTP API 对该 Pod 资源进行删除。

若删除 Pod 的请求成功被 API Server 响应,StatefulSet Controller 就认为这次删除是成功的,因为即使之后的删除步骤失败了也不是它能够控制得了。随着删除操作的执行,「缩容」的问题显然已经演变成了一个「资源管理」的问题。GC 和 API Server 会帮来完成接下来的删除步骤。

既然删除 Pod 成功了,那么相应的就要修改 CurrentReplicas 和 UpdatedReplicas 字段的值。两者都标识由 StatefulSet 创建的 Pod 的数量。如果是以默认「按顺序操作 Pod」 的模式创建的 StatefulSet,那么这一轮的「缩容」操作就到此为止了。因为按照 Orderd 模式的规则,必须要等到这个 Pod 完全被中止之后,才能够进行下一个操作。

Round 2

如果上一轮被中止的 Pod 还没有完全被清理干净,控制流会再次回到遍历 condemned 逻辑的位置。如果该 Pod 仍然在中止的过程中且开启了 Orderd 模式,会直接退出此次缩容流程。不断的轮训,直到这个 Pod 消失。

control.UpdateStatefulSet——更新 Pod

更新的基础——资源版本机制

很多 Controller 在对它管辖的资源对象进行更新的时候,可能都会有一个相反的需求,就是回滚。那么对于对一个资源来说,它通常会有两个不同的版本:

  1. history
  2. current

之所以没有把 future 版本也列出来,是因为 future 版本信息并不会被保存下来。只要资源对象的信息发生修改,那么就会触发更新操作。之前的 current 版本会变为 history,而所谓的 future 会立刻变成 current 版本。

既然资源有了版本,那么随之而来的就是需要解决三个问题:

  1. 如何定义一个资源的版本?需要哪些必要的信息?
  2. 如何管理这些不同版本的资源?
  3. StatefulSet Controller 是如何利用版本机制去做回滚和更新的?

如何定义一个资源的版本?需要哪些必要的信息?

首先,「资源版本」这个概念在 Kubernetes 中,尤其是对于一个资源对象来说,它就是某一时间段内,用户对这个资源对象设定的的 desire state。这个 desire state 不仅仅可以描述用户对这个资源对象的期望,还可以作为创建这个资源对象的模板。在 StatefulSet Controller 中,它会操作两种资源:StatefulSet 和 Pod。其中 StatefulSet 和 Pod 之间也是存在层级关系的。所以,它集合两者定义了一个「版本」:

  • TargetState——StatefulSet
    • 当前有多少副本,即 Pod 数量 [0, replica-1]
    • 所有的 Pod 都必须处于 Target Object State
  • TargetObjectState——Pod
    • StatefulSet.Spec.template
    • StatefulSet.Spec.volumeClaimTemplates

如果你找到一个创建 StatefulSet 资源的 yaml 文件,再对比上面 TargetState 和 TargetObjectState 的定义,就可以看出:只要是有了这两个包含在「版本」内的模板信息,就可以构造出符合特定版本的 StatefulSet 对象。

StatefulSet 将资源的版本信息存储在了一个名为ControllerRevision的结构当中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type ControllerRevision struct {
	metav1.TypeMeta `json:",inline"`
	// Standard object's metadata.
	// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#metadata
	// +optional
	metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

	// Data is the serialized representation of the state.
	Data runtime.RawExtension `json:"data,omitempty" protobuf:"bytes,2,opt,name=data"`

	// Revision indicates the revision of the state represented by Data.
	Revision int64 `json:"revision" protobuf:"varint,3,opt,name=revision"`
}

其中,Revision 成员按照递增的顺序保存资源的版本,从1到 N。Data 成员,则是把之前说到的 TargetState 和 TargetObjectState 信息经过 Encode 操作保存起来。其中 TargetState 和 TargetObjectState 所表示的信息都来自于 StatefulSet.Spec 的内容。

如何管理这些不同版本的资源?

在前面了解UpdateStatefulSet 函数内创建 Pod 流程的时候,暂时忽略了一些函数名带有 Revision 的逻辑。这些逻辑其实就是用来操作 ControllerRevision 的。

首先,通过ListRevisions函数想要获取和当前处理的 StatefulSet 对象相对应的 ControllerRevision 对象集合。它将根据以下几个筛选步骤,不断的缩小候选的 ControllerRevision 对象集合:

  1. 在 StatefulSet 对象所在的 NS 中,先获取到所有的 ControllerRevision 对象集合
  2. 根据 StatefulSet 对象中的 LabelSelector 对第一步的结果进行筛选。既然可以用LabelSelector,那么就证明在创建 ControllerRevision 对象的时候,是对其写入了特定的 label 的。ControllerRevision 对象的 Label 看起来和 Pod 的 Label 是一样的。
  3. 同一个 NS 下面且能够匹配 Label Selector 的 ControllerRevision 可能不止一个,甚至是有一部分并不是 StatefulSet Controller 来管理的。所以,将会通过 OwnerReference 对第二步的结果进行进一步的筛选。既然可以用 OwnerReference,就说明 ControllerRevision 对象在创建的时候,是和 StatefulSet Controller 建立过「从属关系」的。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (rh *realHistory) ListControllerRevisions(parent metav1.Object, selector labels.Selector) ([]*apps.ControllerRevision, error) {
	// List all revisions in the namespace that match the selector
	history, err := rh.lister.ControllerRevisions(parent.GetNamespace()).List(selector)
	if err != nil {
		return nil, err
	}
	var owned []*apps.ControllerRevision
	for i := range history {
		ref := metav1.GetControllerOf(history[i])
		if ref == nil || ref.UID == parent.GetUID() {
			owned = append(owned, history[i])
		}

	}
	return owned, err
}

根据上面举的例子:一个两副本的 StatefulSet 对象,可以分两种场景来进行讨论。

创建 StatefulSet 的第一个副本

在第一个Pod 创建的流程中,ListControllerRevisions 将会得到一个空的 Slice,因为此时没有一个副本被创建。之后,控制流程会进入 getStatefulSetRevisions函数。

1
2
3
4
5
// create a new revision from the current set
updateRevision, err := newRevision(set, nextRevision(revisions), &collisionCount)
if err != nil {
	return nil, nil, collisionCount, err
}

首先,会通过newRevision函数创建一个 ControllerRevision 对象,updateRevision.Revision 为 1,即初始版本。那么,在 newRevision 函数的内部,究竟是通过哪些信息来构造 ControllerRevision 对象的呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func newRevision(set *apps.StatefulSet, revision int64, collisionCount *int32) (*apps.ControllerRevision, error) {
	patch, err := getPatch(set)
	if err != nil {
		return nil, err
	}
	cr, err := history.NewControllerRevision(set,
		controllerKind,
		set.Spec.Template.Labels,
		runtime.RawExtension{Raw: patch},
		revision,
		collisionCount)
	if err != nil {
		return nil, err
	}
	if cr.ObjectMeta.Annotations == nil {
		cr.ObjectMeta.Annotations = make(map[string]string)
	}
	for key, value := range set.Annotations {
		cr.ObjectMeta.Annotations[key] = value
	}
	return cr, nil
}

跟进函数内部可以发现,ControllerRevision 对象最核心的两个成员:RevisionData 。一个通过 nextRevision 函数构造,另外一个通过 getPatch 函数构造。其中 nextRevision 函数的逻辑比较简单,就是在旧的 ControllerRevision 对象的 Revision 成员的基础上+1,由于本次是第一次创建,所以会默认赋值为1。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func getPatch(set *apps.StatefulSet) ([]byte, error) {
	str, err := runtime.Encode(patchCodec, set)
	if err != nil {
		return nil, err
	}
	var raw map[string]interface{}
	json.Unmarshal([]byte(str), &raw)
	objCopy := make(map[string]interface{})
	specCopy := make(map[string]interface{})
	spec := raw["spec"].(map[string]interface{})
	template := spec["template"].(map[string]interface{})
	specCopy["template"] = template
	template["$patch"] = "replace"
	objCopy["spec"] = specCopy
	patch, err := json.Marshal(objCopy)
	return patch, err
}

通过对 getPatch函数的逻辑观察可知,它首先将 StatefulSet 对象的信息解压到一个 Map 内。最终返回的 path 变量,其实是 StatefulSet.spec.template 的内容,也就是 StatefulSet 内 PodSpec 相关的信息。

1
2
map[template]=pod.Spec
map[spec]=map[template]

但是为什么在 Data 成员中没有把StatefulSet.Spec.volumeClaimTemplates 信息包含进来呢?这是因为 pod.Spec 中已经包含了 PVC 的名字了,而且 StatefulSet 删除或者 Pod 删除的情况下 PVC 和 PV 都不会被删除。如果想拿到 PVC 相关信息的时候,可以直接通过 PVC 的名字来找到相应的 PVC 资源对象。

创建了 ControllerRevision 对象之后,会将其和之前旧的 ControllerRevision 对象进行比对,看它们有无差别。如果有,那就证明 StatefulSet 有更新,接下来要进行一系列的更新操作。否则,什么都不用做。

判定是否相同的标准也很简单,就是对 ControllerRevision.Data 进行比对:

1
2
3
4
func EqualRevision(lhs *apps.ControllerRevision, rhs *apps.ControllerRevision) bool {
	...
	return bytes.Equal(lhs.Data.Raw, rhs.Data.Raw) && apiequality.Semantic.DeepEqual(lhs.Data.Object, rhs.Data.Object)
}

由于处于的是第一轮的创建流程,所以本次对 ControllerRevision 对象是否相等的判定会失败。因为没有相等的 ControllerRevision 对象,所以默认会创建一个新的:

1
2
3
4
5
//if there is no equivalent revision we create a new one
updateRevision, err = ssc.controllerHistory.CreateControllerRevision(set, updateRevision, &collisionCount)
if err != nil {
	return nil, nil, collisionCount, err
}

最终,getStatefulSetRevisions 函数返回的 currentRevision 和 updateRevision 都是同一个,即版本号为 1的 ControllerRevision 对象。

StatefulSet 所有副本已经创建完成

如果 StatefulSet 对象已经被创建完成。newRevision依然会照常执行。如果此时对 StatefulSet 对象没有任何更新的话,那么新生成的 ControllerRevision 对象除了 Revision 成员有变化之外,Data 成员的内容仍然是旧的。

接下来就是判断新的 ControllerRevision 是否和之前的 ControllerRevision 相同。如果相同的话,updateRevision 就会被更新为旧的且最近的 ControllerRevision 对象。最终,getStatefulSetRevisions 函数返回的 currentRevision 和 updateRevision 仍然是同一个,表示 StatefulSet 对象没有被修改过。

StatefulSet Controller 是如何利用版本机制去做回滚和更新的?

StatefulSet Controller 对于 Pod 或者 StatefulSet Update Event 的响应逻辑处于updateStatefulSet函数的尾部。之前也说过,创建和扩缩容等操作的响应逻辑也是在这里。

在处理 Update Event 的时候,StatefulSet Controller 会区分不同类型的更新策略进行处理。

OnDelete

如果更新策略是 OnDelete,响应 Update Event 的逻辑不会做任何处理。因为此更新策略本身就是放弃自动更新机制,用户手动删除 Pod 进行更新的。

1
2
3
4
// for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
	return &status, nil
}
RollingUpdate

首先,将从 StatefulSet 对象的信息中获取set.Spec.UpdateStrategy.RollingUpdate.Partition字段。默认情况下如果不显示指定,Partition 为 0。Partition 这个字段的作用在上一篇文章中也提到过:

StatefulSet 在更新的时候,会先在其管辖的 Pod 集合内,通过 Partition 筛选 Ordinal Index 大于等于他的 Pod。然后再按照 Ordinal Index 逆序的顺序逐个更新剩余的 Pod。

1
2
3
4
updateMin := 0
if set.Spec.UpdateStrategy.RollingUpdate != nil {
	updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
}

更新 StatefulSet 的核心逻辑也很简单:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// we terminate the Pod with the largest ordinal that does not match the update revision.
for target := len(replicas) - 1; target >= updateMin; target-- {

	// delete the Pod if it is not already terminating and does not match the update revision.
	if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
		glog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update",
			set.Namespace,
			set.Name,
			replicas[target].Name)
		err := ssc.podControl.DeleteStatefulPod(set, replicas[target])
		status.CurrentReplicas--
		return &status, err
	}

...

按照 Ordinal Index 逆序的顺序逐个通过删除 Pod 的方式让它们重启,并应用最新版本的配置。

回滚操作其实和更新操作实现原理是一样的。你可以把回滚理解成一个反向的更新操作。通过对上面 StatefulSet Controller 对于 StatefulSet,Pod 各类 Event 的处理流程可以了解到,每一次版本的变更,都会产生一个新的 ControllerRevision 对象,它相当于之前版本的一个 snapshot。

假设现在有 3个旧的版本, A,B,C,当前版本为 D。如果此时我想回滚到 A 版本的话,我可以直接将 D 版本的 StatefulSet 对象的 template 部分改为和 A 版本一致的。此时,在getStatefulSetRevisions函数中,新生成的包含 A 版本内容的 E 版本,将会在「版本比对」的过程中发现和 A 版本相同。此时,StatefulSet Controller 会进行两个操作:

  1. 修改 A 版本的 Revision 字段为最新的版本号
  2. 使用 A 版本 ControllerRevision 对象保存的信息对 StatefulSet 管辖的 Pod 进行更新

最终,被删掉的 Pod 会重新创建起来。

总结

StatefulSet Controller 自身的创建,删除等逻辑并不复杂。稍微有些难度的地方就是更新操作,以及扩缩容操作,还有 ControllerRevision 对象的用法。 ControllerRevision 是一个 Kubernetes 内置的资源对象,它也有自己的 Controller。它被创建出来的最大的目的就是帮助我们在多版本的场景下,将恢复现场的必要信息以 CR 的形式保存起来。等到必要的时候,将这些 snapshot 取出来,重新创建一个 StatefulSet 对象。