Detect the source code of Scheduler

本文基于 Kubernetes 1.12 版本

写在前面

在上一篇关于 Scheduler 的文章中,我们主要从宏观的层面上了解了 Kubernetes 中调度的大概流程。本篇 blog,我将从源码级别分析一下 Scheduler 组件对 Pod 进行调度的过程。

调度过程

通过上面的两个图示我们可以看出,Scheduler 的核心逻辑就是Predicates 和 Priorities两个过程。其中 Predicate 先进行,Priority 后进行。在执行完这两个步骤之后,Scheduler 将在候选 Node 中选取一个「分数最高的」,作为 Pod 的调度 Node。

Scheduler 的启动和创建

在 Kubernetes 主项目的cmd/kube-scheduler/app/server.go文件下,我们不难找到 Scheduler 的创建和运行逻辑:kubernetes/server.go at release-1.12 · kubernetes/kubernetes · GitHub

创建

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Run runs the Scheduler.
func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
	// To help debugging, immediately log version
	glog.Infof(Version: %+v, version.Get())

	// Apply algorithms based on feature gates.
	// TODO: make configurable?
	algorithmprovider.ApplyFeatureGates()

	// Configz registration.
	if cz, err := configz.New(componentconfig); err == nil {
		cz.Set(c.ComponentConfig)
	} else {
		return fmt.Errorf(unable to register configz: %s, err)
	}

	// Build a scheduler config from the provided algorithm source.
	schedulerConfig, err := NewSchedulerConfig(c)
	if err != nil {
		return err
	}

	// Create the scheduler.
	sched := scheduler.NewFromConfig(schedulerConfig)
...

在 server.go 文件的内部,我们可以看到,二进制文件 scheduler 启动之后会调用一个Run 函数,在Run 函数的内部会调用NewSchedulerConfig来构造一份配置,并且通过这份配置调用scheduler.NewFromConfig创建一个 scheduler 的实例。

在创建了scheduler 实例之后,我们继续向下,看到了一段非常关键的逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Start all informers.
go c.PodInformer.Informer().Run(stopCh)
c.InformerFactory.Start(stopCh)

// Wait for all caches to sync before scheduling.
c.InformerFactory.WaitForCacheSync(stopCh)
controller.WaitForCacheSync(scheduler, stopCh, c.PodInformer.Informer().HasSynced)

// Prepare a reusable run function.
run := func(ctx context.Context) {
	sched.Run()
	<-ctx.Done()
}

在这段逻辑中,有我们非常熟悉的 Informer。并且在之前介绍 List-Watch 原理的时候,我们也提到过,scheduler 主要是依靠 PodInformer 来监听 Pod 的相关事件的。当它发现有一些待调度的 Pod 的时候,就会执行后面的调度逻辑。可以说,这里就是 scheduler 输入数据的源头。最终,我们将调用sched.Run 函数,执行 scheduler 内的调度逻辑。

那么,上面的 Informer 是在哪里生成的呢?顺着c这个对象向上寻找,我们先找到了NewSchedulerConfig函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// NewSchedulerConfig creates the scheduler configuration. This is exposed for use by tests.
func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Config, error) {
	var storageClassInformer storageinformers.StorageClassInformer
	if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		storageClassInformer = s.InformerFactory.Storage().V1().StorageClasses()
	}

	// Set up the configurator which can create schedulers from configs.
	configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
		SchedulerName:                  s.ComponentConfig.SchedulerName,
		Client:                         s.Client,
		NodeInformer:                   s.InformerFactory.Core().V1().Nodes(),
		PodInformer:                    s.PodInformer,
		PvInformer:                     s.InformerFactory.Core().V1().PersistentVolumes(),
		PvcInformer:                    s.InformerFactory.Core().V1().PersistentVolumeClaims(),
		ReplicationControllerInformer:  s.InformerFactory.Core().V1().ReplicationControllers(),
		ReplicaSetInformer:             s.InformerFactory.Apps().V1().ReplicaSets(),
		StatefulSetInformer:            s.InformerFactory.Apps().V1().StatefulSets(),
		ServiceInformer:                s.InformerFactory.Core().V1().Services(),
		PdbInformer:                    s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
		StorageClassInformer:           storageClassInformer,
		HardPodAffinitySymmetricWeight: s.ComponentConfig.HardPodAffinitySymmetricWeight,
		EnableEquivalenceClassCache:    utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
		DisablePreemption:              s.ComponentConfig.DisablePreemption,
		PercentageOfNodesToScore:       s.ComponentConfig.PercentageOfNodesToScore,
		BindTimeoutSeconds:             *s.ComponentConfig.BindTimeoutSeconds,
	})
	....
	// Additional tweaks to the config produced by the configurator.
	config.Recorder = s.Recorder

这个函数内部不单单有 PodInformer,还有 PvInformer 等。这些 Informer 的类型基本都是和 Pod 创建可能需要操作的资源对象有关。在函数的末尾,还有一个名为Recorder 的参数。通过查看它的定义可知,这个参数其实也是一个缓存形式的成员,它负责临时存储一些 Watch 到的,但是还没来得及处理的资源对象的事件:

1
2
// Recorder is the EventRecorder to use
Recorder record.EventRecorder

不难发现,PodInformer,Recorder,InformerFactory 三个对象并不在这个函数内部赋值,而是来源于参数:schedulerserverconfig.CompletedConfig。顺着该参数传递的顺序向上继续查找,我们发现,它来自于 Scheduler 二进制文件启动的时候调用的函数NewSchedulerCommand。此函数内部和我们上面说到的和 Config 有关的参数的逻辑如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
c, err := opts.Config()
if err != nil {
	fmt.Fprintf(os.Stderr, "%v\n", err)
	os.Exit(1)
}

stopCh := make(chan struct{})
if err := Run(c.Complete(), stopCh); err != nil {
	fmt.Fprintf(os.Stderr, "%v\n", err)
	os.Exit(1)
}

跟进 Complete 函数内并没有看到什么特别的逻辑,所以,上面三个成员基本上就是在opts.Config()函数中被创建和赋值的。

1
2
3
4
5
c.Client = client
c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
c.PodInformer = factory.NewPodInformer(client, 0)
c.EventClient = eventClient
c.Recorder = recorder

还是熟悉的味道,我们找到了InformerFactoryc.PodInformer被创建的位置。如果你想了解上述的 Informer 是如何被注册到InformerFactory的话,那么可以找这些 Informer 的同名函数 Informer 被调用的位置。其中,PodInformer 在 server.go 文件中的 Run 函数内被调用,而其余的 Informer 在NewSchedulerConfig 函数内的factory.NewConfigFactory中被调用。并且将处理相应资源对象的 Handler 函数也一起注册了进去。

加载调度策略

在阅读NewSchedulerConfig函数逻辑的时候,我们知道这是一个负责为 Scheduler 构造配置文件的函数。最终返回的配置信息,看起来是需要分情况处理的。一种是 Provider 不为 nil,另外一种是 Policy 不为 nil。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
source := s.ComponentConfig.AlgorithmSource
var config *scheduler.Config
switch {
case source.Provider != nil:
	// Create the config from a named algorithm provider.
	sc, err := configurator.CreateFromProvider(*source.Provider)
	if err != nil {
		return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
	}
	config = sc
case source.Policy != nil:
	// Create the config from a user specified policy source.
	policy := &schedulerapi.Policy{}
	switch {
	case source.Policy.File != nil:
		// Use a policy serialized in a file.
		...
	case source.Policy.ConfigMap != nil:
		// Use a policy serialized in a config map value.
		policyRef := source.Policy.ConfigMap
		...
	}
	sc, err := configurator.CreateFromConfig(*policy)
	if err != nil {
		return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
	}
	config = sc
default:
	return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}

分别查看 Provider 和 Policy 成员的定义:

1
2
3
4
5
6
7
8
// SchedulerAlgorithmSource is the source of a scheduler algorithm. One source
// field must be specified, and source fields are mutually exclusive.
type SchedulerAlgorithmSource struct {
	// Policy is a policy based algorithm source.
	Policy *SchedulerPolicySource
	// Provider is the name of a scheduling algorithm provider to use.
	Provider *string
}

通过注释和类型的定义可以发现,这两个成员来源于同一个 Struct,并且它们的作用是为 Scheduler 提供调度策略。两个成员必须指定其中一个。其中Policy内部包含一个 Path 和一个 ConfigMap 成员,前者可以通过指定一个按照特定格式书写的调度策略文件,后者可以直接将一些调度策略用到的信息存储在 ConfigMap 中。而 Provider 可以指定一个调度策略提供者的名称,scheduler 会自动从某个位置加载特定的调度策略集合。

无论是在 Provider 分支调用的 CreateFromProvider(providerName string) (*Config, error)函数还是在 Policy 分支调用的: CreateFromConfig(policy schedulerapi.Policy) (*Config, error) 函数,他们最后其实在内部都执行了同一个函数:CreateFromKeys。截取其中的部分逻辑如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
predicateFuncs, err := c.GetPredicates(predicateKeys)
if err != nil {
	return nil, err
}

priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
if err != nil {
	return nil, err
}

priorityMetaProducer, err := c.GetPriorityMetadataProducer()
if err != nil {
	return nil, err
}

predicateMetaProducer, err := c.GetPredicateMetadataProducer()
if err != nil {
	return nil, err
}

// Init equivalence class cache
if c.enableEquivalenceClassCache {
	c.equivalencePodCache = equivalence.NewCache()
	glog.Info("Created equivalence class cache")
}

algo := core.NewGenericScheduler(
	c.schedulerCache,
	c.equivalencePodCache,
	c.podQueue,
	predicateFuncs,
	predicateMetaProducer,
	priorityConfigs,
	priorityMetaProducer,
	extenders,
	c.volumeBinder,
	c.pVCLister,
	c.alwaysCheckAllPredicates,
	c.disablePreemption,
	c.percentageOfNodesToScore,
)

return &scheduler.Config{
	SchedulerCache: c.schedulerCache,
	Ecache:         c.equivalencePodCache,
	// The scheduler only needs to consider schedulable nodes.
	NodeLister:          &nodeLister{c.nodeLister},
	Algorithm:           algo,
	GetBinder:           c.getBinderFunc(extenders),
	PodConditionUpdater: &podConditionUpdater{c.client},
	PodPreemptor:        &podPreemptor{c.client},
	WaitForCacheSync: func() bool {
		return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
	},
	NextPod: func() *v1.Pod {
		return c.getNextPod()
	},
	Error:          c.MakeDefaultErrorFunc(podBackoff, c.podQueue),
	StopEverything: c.StopEverything,
	VolumeBinder:   c.volumeBinder,
}, nil

对比在文章开头中我们给出的关于调度过程的流程图,不难发现,调度策略中包含的 predicate 和 predicate 在此函数都有涉及,它们被封装在一起,作为调度算法,赋值给 Config. Algorithm 成员。

除此之外,还有几个 Config 内的成员,看起来也和调度有关:

  • NextPod: 不断的从缓存中取出待调度的 Pod
  • Error:调度失败的 Pod 会按照一定的策略放入 Queue 中等待重试
  • SchedulerCache:正在调度中的 Pod 信息的缓存
  • GetBinder:将 Pod 调度的结果写回至 etcd

运行

进入 Scheduler 的 Run 函数,可以看到,它的运行逻辑非常简单:启动一个 goroutine,不断的运行sched.scheduleOne函数进行调度,直到 StopEverything 这个 channel 接收到一个让其退出的消息:

1
2
3
4
5
6
7
8
// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
	if !sched.config.WaitForCacheSync() {
		return
	}

	go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

对号入座

当我们进入sched.scheduleOne函数的时候,会看到很多在生成 Scheduler Config 时候创建的对象。整个 scheduleOne 的过程都可以用这些对象来进行描述:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (sched *Scheduler) scheduleOne() {
	// 1. Get pod from the podQueue
  pod := sched.config.NextPod()
	
  // 2. Schedule the pod 
  suggestedHost, err := sched.schedule(pod)

	// 3. Assume the pod to node
  err = sched.assume(assumedPod, suggestedHost)
	
 	// 4. Bind pod to the suggesstion Node
  err := sched.bind(assumedPod, &v1.Binding{
	ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
	Target: v1.ObjectReference{
		Kind: "Node",
		Name: suggestedHost,
	},
})
}
NextPod()

首先,我们将从 podQueue 中取出一个待调度的 Pod。它使用了在创建 Scheduler Config 的时候,调用的factory.NewConfigFactory函数内部生成的 Queue:

1
2
3
4
5
c := &configFactory{
	client:                         args.Client,
	podLister:                      schedulerCache,
	podQueue:                       core.NewSchedulingQueue(),
...

既然从队列中取出 Pod 信息的位置我们找到了,那么相应的我们需要找到在哪里向内部塞入 Pod。通过观察factor.NewConfigFactory 函数的内部逻辑,我们可以知道,它在 PodInformer 的 AddFunc 成员上注册了一个名为addPodToCache的函数,而这个函数内部会将 Watch 到的 Pod 信息塞入到队列中,并且会将其缓存起来,塞入 schedulerCache。

schedule(pod)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// schedule implements the scheduling algorithm and returns the suggested host.
func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
	host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
	if err != nil {
		pod = pod.DeepCopy()
		sched.config.Error(pod, err)
		sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, FailedScheduling, %v, err)
		sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
			Type:    v1.PodScheduled,
			Status:  v1.ConditionFalse,
			Reason:  v1.PodReasonUnschedulable,
			Message: err.Error(),
		})
		return “”, err
	}
	return host, err
}

Schedule 函数的逻辑看起来也非常的清晰:

  1. 根据您我们之前已经生成好的调度策略对 Pod 进行调度
  2. 如果调度失败,调用之前在生成配置的时候创建的 Error 对象,它将根据一定策略将该调度失败的 Pod 塞入到队列中重试
  3. 将 Pod 调度失败的信息包装成一个 Event 对象先缓存到 Recorder 中
  4. 将本次 Pod 调度失败的信息通过 Update 函数更新至 etcd 中
assume()

assume 函数内仅是调用了一个名为AssumePod的函数。它会在其内部尝试将该 Pod 的信息插入到 SchedulerCache 维护的缓存中。若插入操作成功,那就证明这个 Pod 还没有被其他的调度过程占用,是可以被调度的,并且假设这个 Pod 的调度会成功。若插入失败,要么是这个 Pod 的信息根本无法被找到,要么就是这个 Pod 的信息已经存在于缓存中了,并且可能正在进行调度,不能立刻进行新的调度操作。所以,会将判定为调度失败的 Pod 重新加入到队列中等待重试。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
if err := sched.config.SchedulerCache.AssumePod(assumed); err != nil {
	glog.Errorf("scheduler cache AssumePod failed: %v", err)

	// This is most probably result of a BUG in retrying logic.
	// We report an error here so that pod scheduling can be retried.
	// This relies on the fact that Error will check if the pod has been bound
	// to a node and if so will not add it back to the unscheduled pods queue
	// (otherwise this would cause an infinite loop).
	sched.config.Error(assumed, err)
	sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePod failed: %v", err)
	sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
		Type:    v1.PodScheduled,
		Status:  v1.ConditionFalse,
		Reason:  "SchedulerError",
		Message: err.Error(),
	})
	return err
}
bind()

若 assume 函数执行成功,则 scheduler 接下来会调用 bind 函数。其实 bind 函数和 assume 函数的功能一样,都只是一个异步调度 Pod 至 Node 的操作,它们所做的除了想 etcd 写入 Pod 最新的信息之外,就是将当前 Pod 的状态写入至缓存中。因为最终创建 Pod 相关容器的工作是由 Kubelet 来完成的。所以,如果 Scheduler 部分的逻辑没有发生错误,那我们就姑且认为它是调度成功了。这样的话,即使 kubelet 在执行和该 Pod 相关工作的时候发生了错误,也自己会通过向 API Server 发请求来将调度失败的信息更新至 etcd 中。上述这些基本都是异步的操作,尤其是对于在 assume 函数内操作的缓存,我们将会定期清理其中的和 Pod 有关的信息。

关键步骤—Algorithm.Schedule

对于一个 Scheduler 来说,整个调度流程当中最重要的就是Algorithm.Schedule函数,即按照一定的调度策略为待调度 Pod 计算出它的目的 Node。根据我们之前所说的「加载调度策略」部分的内容,可以很容易的看出在CreateFromKeys 函数中我们调用core.NewGenericScheduler,将返回值赋值给了配置中的Algorithm成员,Algorithm的数据类型为genericScheduler

genericScheduler内部实现了一个叫 Schedule 的方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 1. Get all candidates of Node
nodes, err := nodeLister.List()
// 2. Update node info
err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
// 3. Find a fit Node through predicate
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
// 4. Rank the candidates
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
// 5. Calculate the score and pick the Node
return g.selectHost(priorityList)

首先,我们获取所有可被调度的 Node 集合。然后根据 schedulerCache 中维护的关于 Node 信息的缓存,更新 cachedNodeInfoMapcachedNodeInfoMap内部也是维护了一个 NodeName->NodeInfo 的 Map。其中 NodeInfo 类型的对象中包含了很多 Node 的详细情况,如剩余的资源,已经被占用的资源等。这些信息最后会作为筛选调度节点的依据。

随后,我们先根据之前预设的 predicate 策略先将 Nodes 集合分为可调度和不可调度两部分:filteredNodesfailedPredicateMap。然后再根据 Priority 策略将筛选出来的节点按照优先级排序。最终,我们将根据这些备选 Node 的信息选出一个「评分最高」的 Node,作为最终调度的 Node。

g.findNodesThatFit (根据 Predicate 过滤节点)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
	var filtered []*v1.Node
	failedPredicateMap := FailedPredicateMap{}

	if len(g.predicates) == 0 {
		filtered = nodes
	} else {
		allNodes := int32(g.cache.NodeTree().NumNodes)
		numNodesToFind := g.numFeasibleNodesToFind(allNodes)

		// Create filtered list with enough space to avoid growing it
		// and allow assigning.
		filtered = make([]*v1.Node, numNodesToFind)
		errs := errors.MessageCountMap{}
		var (
			predicateResultLock sync.Mutex
			filteredLen         int32
			equivClass          *equivalence.Class
		)

		ctx, cancel := context.WithCancel(context.Background())

		// We can use the same metadata producer for all nodes.
		meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)

		if g.equivalenceCache != nil {
			// getEquivalenceClassInfo will return immediately if no equivalence pod found
			equivClass = equivalence.NewClass(pod)
		}

		checkNode := func(i int) {
			var nodeCache *equivalence.NodeCache
			nodeName := g.cache.NodeTree().Next()
			if g.equivalenceCache != nil {
				nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName)
			}
			fits, failedPredicates, err := podFitsOnNode(
				pod,
				meta,
				g.cachedNodeInfoMap[nodeName],
				g.predicates,
				g.cache,
				nodeCache,
				g.schedulingQueue,
				g.alwaysCheckAllPredicates,
				equivClass,
			)
			if err != nil {
				predicateResultLock.Lock()
				errs[err.Error()]++
				predicateResultLock.Unlock()
				return
			}
			if fits {
				length := atomic.AddInt32(&filteredLen, 1)
				if length > numNodesToFind {
					cancel()
					atomic.AddInt32(&filteredLen, -1)
				} else {
					filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
				}
			} else {
				predicateResultLock.Lock()
				failedPredicateMap[nodeName] = failedPredicates
				predicateResultLock.Unlock()
			}
		}
		// Stops searching for more nodes once the configured number of feasible nodes
		// are found.
		workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

筛选 Node 的逻辑虽然比较多,但是整体上看起来还是比较简洁的:

  1. 创建一个名为filtered的Slice,用于保存最终被筛选出来的可被调度的 Node
  2. 创建一个名为checkNode的函数,通过函数的名字就可以知道,它会被应用于每一个候选的 Node。真正的筛选 Node 的逻辑就是这一部分
  3. 以 16 个 Worker,并发的对每一个 Node 都运行 checkNode函数。满足条件的 Node 放入filtered内,否则放入failedPredicateMap
podFitsOnNode
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
for i := 0; i < 2; i++ {
for _, predicateKey := range predicates.Ordering() {
	//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
	if predicate, exist := predicateFuncs[predicateKey]; exist {
		if eCacheAvailable {
			fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache)
		} else {
			fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
		}
		if err != nil {
			return false, []algorithm.PredicateFailureReason{}, err
		}

		if !fit {
			// eCache is available and valid, and predicates result is unfit, record the fail reasons
			failedPredicates = append(failedPredicates, reasons...)
			// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
			if !alwaysCheckAllPredicates {
				glog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
					"evaluation is short circuited and there are chances " +
					"of other predicates failing as well.")
				break
			}
	}

}

在 checkNode 函数中,我们对每一个 Node 都会执行一个名为podFitsOnNode函数,尝试将这个 Pod 调度到此次检测的 Node 上。predicateFuncs这个 map 内存储了一些「筛选函数」,他们的函数签名如下:

1
type FitPredicate func(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []PredicateFailureReason, error)

看起来筛选 Node 的逻辑主要依赖两个参数:pod 和 nodeInfo。

k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go文件中,我们看到了很多被实现的,类型为 FitPredicate的筛选函数。如:

1
2
3
4
5
6
7
8
// NewPodAffinityPredicate creates a PodAffinityChecker.
func NewPodAffinityPredicate(info NodeInfo, podLister algorithm.PodLister) algorithm.FitPredicate {
	checker := &PodAffinityChecker{
		info:      info,
		podLister: podLister,
	}
	return checker.InterPodAffinityMatches
}

我们可以看到,这个函数主要是根据 Pod.Spec 指定的 InerPodAffinity 的属性生成一个筛选函数checker.InterPodAffinityMatches,最后它将被调用且接受 Info 参数传递进来的 Node 信息,对其进行筛选。

再来看一个通过检查 Node 内存负载来判定其是否适合被调度的策略函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func CheckNodeMemoryPressurePredicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
	var podBestEffort bool
	if predicateMeta, ok := meta.(*predicateMetadata); ok {
		podBestEffort = predicateMeta.podBestEffort
	} else {
		// We couldn't parse metadata - fallback to computing it.
		podBestEffort = isPodBestEffort(pod)
	}
	// pod is not BestEffort pod
	if !podBestEffort {
		return true, nil, nil
	}

	// check if node is under memory pressure
	if nodeInfo.MemoryPressureCondition() == v1.ConditionTrue {
		return false, []algorithm.PredicateFailureReason{ErrNodeUnderMemoryPressure}, nil
	}
	return true, nil, nil
}

该函数主要是通过检查被筛选的 Node 的内存资源是否已经处于一个压力较大的状态,来判断是否还应该向其调度新的 Pod。但是在这之前,它需要检查被调度的 Pod 是否对 CPU 和 MEM 等资源有硬性的要求。如果对资源要求的策略是「BestEffort」,那么就证明该 Pod 没有对其要使用的资源设置限制。换句话说,它可能用的多,超出此 Node 的剩余资源的量,也有可能用得少,不会超出。所以在这种情况下,不用在理会 Node 上的剩余资源量的问题。

PrioritizeNodes

理论上来说,通过 Predicate 策略的筛选之后,如果有余下的 Node,那么这些 Node 都是可被调度的。否则,将会返回一个错误,代表本次调度是失败的。这么看来,Predicate 策略应该是一种硬性的筛选规则,它会对候选的 Node 集合做减法。

被筛选出来的 Node 在 Kubernetes 中还会再经过一个策略的筛选:Priority。这是一个更为精细化的策略。如果说 Predicate 只是帮助 Pod 选择可以被调度的 Node 的话,那么 Priority 就是在此之上帮助 Kubernetes 集群做好资源的负载平衡,选出一个「最佳」的 Node。它更像是一个优化性质调度策略。

kubernetes/generic_scheduler.go at ec2e767e59395376fa191d7c56a74f53936b7653 · kubernetes/kubernetes · GitHub

Priority 的调度策略可以大致做如下概括:

  1. kubernetes/defaults.go at release-1.12 · kubernetes/kubernetes · GitHub 文件中,我们向 Scheduler 中用于保存调度算法的 AlgorithmFactory 对象注册了一些默认的调度策略函数。值得注意的是,每一个策略函数都有一个权重
  2. 分别使用每一个策略函数对每一个候选节点都计算一个临时的分数
  3. 遍历之前策略函数计算的临时分数,针对每一个临时分数都乘以其对应的策略函数的权重,得到一个节点的最终分数。每个 Node 的 Priority 分数都在0-10之前,0为不适合,10为最适合。一个 Node 在 Priority 策略下的最终分数的计算公式大概是这样:finalScoreNodeA = (weight1 * priorityFunc1) + (weight2 * priorityFunc2)

通过对 PrioritizeNodes 函数的逻辑了解可知,它大致包括以下几个核心的步骤:

  1. 循环遍历 priorityConfigs 中的用于计算 Node 优先级的函数,用它计算候选 Node 列表中各 Node 的分数。结果存于一个二维数组中。
  2. 通过 Map-Reduce 的方式对各个 Node 的分数再次进行计算,并且是通过并行的方式进行
  3. 对于每一个 Node 都根据我们上面提到的公式,综合多个策略函数和权重相乘的值计算出它的最终得分。且将他们塞回到一个 Slice 内,返回给调用者

计算最终得分的过程可以用一个表格来展示:

LeastRequestedPriority — 最小请求资源优先策略

LeastRequestedPriority 是默认的 Priority 策略之一,它主要用来平衡集群中各节点的资源用量情况。通过将一些「资源使用比」较大的 Node 降低优先级,「资源使用比」较小的 Node 提高优先级,来防止新的 Pod 继续调度到那些负载已经过高的 Node 上。它通过一个公式,分别计算一个 Node 的 CPU 和 MEM 的占比并将它们加在一起,来作为这个策略评估 Node 的分数。

1
(cpu((capacity-sum(requested))*10/capacity) + memory((capacity-sum(requested))*10/capacity))/2
BalancedResourceAllocation — 平衡节点资源策略

BalancedResourceAllocation 是默认的 Priority 策略之一,它会根据某个 Node 上面的 CPU 占比和 MEM 占比的差值来给 Node 评一个分数。差值越大的优先级越低,因为这代表在该 Node 上,CPU 和 MEM 资源的消耗差距过大,如果继续向其调度 Pod 的话,很可能导致 CPU 被消耗光了,但是 MEM 还有很多,到最后这个 Node 可能就会被判定为不可调度的节点。该策略会尽量选择资源消耗较为平衡的 Node。

但是,通过该策略在代码中相关的注释我们可以了解到,它不能单独使用,需要和 LeastRequestedPriority 策略一起配合使用。因为两者在解决「维护集群节点资源平衡」的问题上关注的点是不同的:前者更关注 Node 上 CPU 和 MEM 的总剩余量,后者则在剩余量没问题的情况下,关注 Node 上 CPU 和 MEM 的使用比差值。最后为 Pod 挑选出一个「资源剩余量」合适且「资源占比」平衡的 Node。

SelectHost

经过 Priority 策略的筛选,它不会丢掉任何的候选 Node。而是通过 SelectHost 函数,挑选一个分数最高的 Node。如果分数高的 Node 有很多的话,则通过随机策略选取一个作为最终的被调度节点。

自定义调度策略

在之前讨论的关于「加载调度策略」的部分,我们已经提到了,Scheduler 的调度策略可以通过两种方式来指定:

  1. Policy
    1. ConfigMap
    2. PolicyFile
  2. Provider
    1. Algorithm Name

Policy 指定的是调度策略的文件,而 Provider 可直接指定调度算法的提供者的名称。

Scheduler 为其使用者提供了非常大的自由度,若默认的调度策略不能满足我们的需求的话,允许我们自己定义调度策略。自定义的方式无非就是我们提到的两种:Policy 和 Provider。

PolicyFile

–policy-config-file

我们可以在 Scheduler 启动的时候通过--policy-config-file 参数传入一个 policy 文件的路径,这个文件按照 json 的格式写明了我们想启用的调度策略:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
    {"name" : "PodFitsHostPorts"},
    {"name" : "PodFitsResources"},
    {"name" : "NoDiskConflict"},
    {"name" : "NoVolumeZoneConflict"},
    {"name" : "MatchNodeSelector"},
    {"name" : "HostName"}
    ],
"priorities" : [
    {"name" : "LeastRequestedPriority", "weight" : 1},
    {"name" : "BalancedResourceAllocation", "weight" : 1},
    {"name" : "ServiceSpreadingPriority", "weight" : 1},
    {"name" : "EqualPriority", "weight" : 1}
    ],
"hardPodAffinitySymmetricWeight" : 10
}

调度函数的名称既可以是 Kubernetes 已经实现的,也可以是我们自己实现的。如何自己实现策略函数,我们放到后面来说。

–policy-configmap

除了指定一个 policy 文件的地址,我们还可以将 policy 文件的内容放入 configMap 中,并在 Scheduler 启动的时候指定这个 configMap 的名称。需要注意的是,这个 configMap 必须要在 Scheduler 启动之前就创建好。

自定义 Predicate 和 Policy 的策略函数

通过上面对 Predicate 和 Policy 两个调度策略的讨论,我们知道,Kubernetes 已经为我们默认实现了一些策略函数。这些函数可以写在上面所说的 Policy 文件中或者是 configMap 内。但是,当这些内置的函数不能满足我们需求的时候,我们可以自己实现一个。

实现一个自定义策略函数并让他成功生效,通常需要以下几个步骤:

  1. 按照 kubernetes/types.go at release-1.12 · kubernetes/kubernetes · GitHubkubernetes/types.go at release-1.12 · kubernetes/kubernetes · GitHub 以及 kubernetes/types.go at release-1.12 · kubernetes/kubernetes · GitHub 提供的函数原型,可以实现两种调度策略需要的策略函数。并将实现放于 pkg/scheduler/algorithm/priorities/pkg/scheduler/algorithm/predicates 目录下
  2. 根据 kubernetes/defaults.go at release-1.12 · kubernetes/kubernetes · GitHub algorithmprovider package 中的 Init 函数内将默认的策略函数注册到 Factory 的方式,我们也需要将自定义实现的函数注册到Factory 内

对于实现步骤来说,我觉得还是比较简单的,根据参数写策略逻辑即可。稍微迷惑的就是第二步的注册过程。由于这是属于调度策略的事情,我们要返回 Scheduler 的创建逻辑,看一下这个 Factory 是怎么和 Scheduler 结合在一起的。

在创建 Scheduler 的时候,有一个函数非常的重要:NewSchedulerConfig。该函数内的逻辑几乎创建了所有 Scheduler 在之后调度的过程中需要使用的各类对象。并且,最终使用的 config 生成我们之前也提到过,是和它的调度策略有关的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 1. Construct the factory
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
	SchedulerName:                  s.ComponentConfig.SchedulerName,
	Client:                         s.Client,
	NodeInformer:                   s.InformerFactory.Core().V1().Nodes(),
	PodInformer:                    s.PodInformer,
	PvInformer:                     s.InformerFactory.Core().V1().PersistentVolumes(),
	PvcInformer:                    s.InformerFactory.Core().V1().PersistentVolumeClaims(),
	ReplicationControllerInformer:  s.InformerFactory.Core().V1().ReplicationControllers(),
	ReplicaSetInformer:             s.InformerFactory.Apps().V1().ReplicaSets(),
	StatefulSetInformer:            s.InformerFactory.Apps().V1().StatefulSets(),
	ServiceInformer:                s.InformerFactory.Core().V1().Services(),
	PdbInformer:                    s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
	StorageClassInformer:           storageClassInformer,
	HardPodAffinitySymmetricWeight: s.ComponentConfig.HardPodAffinitySymmetricWeight,
	EnableEquivalenceClassCache:    utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
	DisablePreemption:              s.ComponentConfig.DisablePreemption,
	PercentageOfNodesToScore:       s.ComponentConfig.PercentageOfNodesToScore,
	BindTimeoutSeconds:             *s.ComponentConfig.BindTimeoutSeconds,
})

// 2. Create the config for scheduler according to the policy
sc, err := configurator.CreateFromConfig(*policy)
if err != nil {
	return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
}
config = sc

NewSchedulerConfig函数的逻辑跟我们这部分主题有关的内容有两处,第一个是创建 Factory ,第二个是根据 Policy 创建 config。因为 Policy 中也是从 ConfigMap 或者 Policy File 内取出和调度策略相关的信息,比如要使用那些策略函数。所以,真正注册那些策略函数的逻辑应该是在CreateFromConfig中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler.Config, error) {
	glog.V(2).Infof("Creating scheduler from configuration: %v", policy)

	// validate the policy configuration
	if err := validation.ValidatePolicy(policy); err != nil {
		return nil, err
	}

	predicateKeys := sets.NewString()
	if policy.Predicates == nil {
		glog.V(2).Infof("Using predicates from algorithm provider '%v'", DefaultProvider)
		provider, err := GetAlgorithmProvider(DefaultProvider)
		if err != nil {
			return nil, err
		}
		predicateKeys = provider.FitPredicateKeys
	} else {
		for _, predicate := range policy.Predicates {
			glog.V(2).Infof("Registering predicate: %s", predicate.Name)
			predicateKeys.Insert(RegisterCustomFitPredicate(predicate))
		}
	}

	priorityKeys := sets.NewString()
	if policy.Priorities == nil {
		glog.V(2).Infof("Using priorities from algorithm provider '%v'", DefaultProvider)
		provider, err := GetAlgorithmProvider(DefaultProvider)
		if err != nil {
			return nil, err
		}
		priorityKeys = provider.PriorityFunctionKeys
	} else {
		for _, priority := range policy.Priorities {
			glog.V(2).Infof("Registering priority: %s", priority.Name)
			priorityKeys.Insert(RegisterCustomPriorityFunction(priority))
		}
	}
	return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)

通过上面截取的部分逻辑可以看出,在CreateFromConfig函数中,先查看了是否有提供的自定义的策略函数(包括 Predicate 和 Priority ),如果有的话,通过RegisterCustomFitPredicate进行注册,否则,使用默认的策略函数,通过GetAlgorithmProvider进行注册。

默认策略函数的注册

GetAlgorithmProvider 是从一个 Map 中返回了一个类型为AlgorithmProviderConfig的对象。通过查看和这个函数相对应的写入版本RegisterAlgorithmProvider,我们了解到,他在 https://github.com/kubernetes/kubernetes/blob/release-1.12/pkg/scheduler/algorithmprovider/defaults/defaults.go#L38 文件中的registerAlgorithmProvider函数内就被调用了。那么 Init 函数对应的 algorithmprovider package 是在哪里第一次被引用的呢?如果你此时返回到 schduler.go 文件中的 Run 函数处就可以知道,它在一开始运行的时候就调用了ApplyFeatureGates函数。而ApplyFeatureGates函数的定义就在 algorithmprovider package 中。

1
registerAlgorithmProvider(defaultPredicates(), defaultPriorities())

其中 defaultPredicatesdefaultPriorities函数,就是我们最开始在 https://github.com/kubernetes/kubernetes/blob/release-1.12/pkg/scheduler/algorithmprovider/defaults/defaults.go#L38 文件内看到的那些 Kubernetes 默认实现的策略函数。那也就是说,这个 Provider 其实是一个对象,里面包含了策略函数。 随机查看一下defaultPredicates函数的逻辑,可知它在内部将策略函数塞入了一个 Map 中,该 Map 的 Key 为 Policy 的名称,Value为 策略函数

1
	fitPredicateMap[name] = predicateFactory

不过,对于默认策略函数的注册,我们有一点需要注意:

1
2
3
// Apply algorithms based on feature gates.
// *TODO: make configurable?*
algorithmprovider.ApplyFeatureGates()

ApplyFeatureGates函数执行,即想要注册默认策略函数的时候,看起来是强制把所有的默认的策略函数都注册进去的。并且,通过注释也可以看出,这里 Kubernetes 官方也是想把它做成可配置的。所以,即使我们单独通过 policyFile 指定了默认策略函数的子集,看起来也并不会因此就只注册一部分默认策略函数。

自定义策略函数的注册

自定义函数的注册也是通过遍历 Priority 和 Predicate 两个策略来分别向之前我们所说的 Map 注册相应的策略函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string {
	var predicateFactory FitPredicateFactory
	var ok bool

	validatePredicateOrDie(policy)

	// generate the predicate function, if a custom type is requested
	if policy.Argument != nil {
		if policy.Argument.ServiceAffinity != nil {
			predicateFactory = func(args PluginFactoryArgs) algorithm.FitPredicate {
				predicate, precomputationFunction := predicates.NewServiceAffinityPredicate(
					args.PodLister,
					args.ServiceLister,
					args.NodeInfo,
					policy.Argument.ServiceAffinity.Labels,
				)

				// Once we generate the predicate we should also Register the Precomputation
				predicates.RegisterPredicateMetadataProducer(policy.Name, precomputationFunction)
				return predicate
			}
		} else if policy.Argument.LabelsPresence != nil {
			predicateFactory = func(args PluginFactoryArgs) algorithm.FitPredicate {
				return predicates.NewNodeLabelPredicate(
					policy.Argument.LabelsPresence.Labels,
					policy.Argument.LabelsPresence.Presence,
				)
			}
		}
	} else if predicateFactory, ok = fitPredicateMap[policy.Name]; ok {
		// checking to see if a pre-defined predicate is requested
		glog.V(2).Infof("Predicate type %s already registered, reusing.", policy.Name)
		return policy.Name
	}

	if predicateFactory == nil {
		glog.Fatalf("Invalid configuration: Predicate type not found for %s", policy.Name)
	}

	return RegisterFitPredicateFactory(policy.Name, predicateFactory)
}

注册自定义策略函数的逻辑比较奇怪。首先 policy.Argument!=nil 的分支临时生成并注册了两个特殊的策略函数,并不是我们想要的。其次,如果else iff分支还没命中的话,本次注册相当于失败了,因为predicateFactory没有被赋值。那也就是说,真正注册自定义函数的逻辑在执行RegisterCustomFitPredicate之前就应该被注册进去了。

最终,找了半天也没找到还有什么其他位置可以根据 policy 规则注册自定义函数。但是转念一想,即使是默认的策略函数也是靠直接调用defaultPredicates等函数注册到名为fitPredicateMap中的,并且在运行CreateFromConfig也没有再进行多余的操作。这种注册策略函数远在 Scheduler 实例被创建之前的情况可能同样适用于自定义策略函数。并且,如果注册自定义策略函数真想我们之前想的一样:通过 policy 给出的名字找到我们已经实现的自定义策略函数,然后注册到一个 map 中。那么又是怎么通过 policy 中的自定义策略函数的名字找到它的实现的呢?显然是不可能的。So,注册自定义策略函数可能需要在 ApplyFeatureGates函数被调用的位置处进行。

自定义的策略的实现其实包含两部分:1. 实现自定义策略函数 2. 将自定义策略函数的名称写入 Policy 文件内或者 Policy ConfigMap 内。完成了这两部,我们实现的自定义策略才会真正的被 Scheduler 执行。

对于 Scheduler 的一些思考

不光是 Kubernetes 中的 Scheduler,任何一个分布式系统中的调度器,我相信它的核心逻辑都是为了解决「资源管理」的问题,这里说的资源,不是 Kubernetes 中抽象的资源对象,如 Deployment 等。而是真实的一些存储资源和计算资源,如 CPU, MEM,Storage 等。

Scheduler 的实现都比较简单,但是区别一个好的 Scheduler,我觉得是取决于它的调度算法,也就是我们之前说的调度策略。一个好的调度策略应该尽可能的提高集群资源的利用率,并且在可调度资源数量和节点数量增多的时候,还可以保证一个良好的性能。甚至是能够更好的适应不同的业务场景。

由于最近这段时间接触了一个策略系统的项目openpolicyaget,所以我在想,在Kubernetes 后续的发展中,是否有可能单独的将调度策略抽象出来作为一个单独组件呢?这可能会比自定义 Policy 文件+自定义策略函数给用户提供的灵活性更大。