Kubernetes源码学习-Controller-P5-StatefulSet Controller

P5-StatefulSet Controller

前言

在前面的几篇文章中,先对deployment controller进行了初步分析:

Controller-P3-Deployment Controller

严格来讲deployment的管理pod的逻辑是基于replicaSet来实现的,因此接下来结合replicaSet controller进行了深入:

Controller-P3-ReplicaSet Controller

那么在本篇,来看看另一个最常用的承载在pod之上的管理单位的控制器实现: StatefulSet Controller

StatefulSet 的基本特性

在看代码之前,先回顾一下sts的基本运行特性,代入地阅读代码会比较顺畅

创建

sts是有序的,pod副本有序串行地新建,pod名称为{sts_name}-{0..N},从小序号的pod(名称为{sts_name}-0)创建,一直到第n个副本的pod(名称为{sts_name}-n)

更新

sts的更新策略有2种:

  • RollingUpdateStatefulSetStrategyType,默认的滚动更新策略,此策略下,更新时pod根据序号反顺序更新,从最大序号的pod开始删除重建,更新至序号最小的pod。更新过程中,始终保持pod数量等于指定副本数,即每删除一个pod,才会再创建一个。同时可以指定一个partition参数,指定这个参数后,只有序号大于等于partition的pod才会被更新,序号小于partition参数的pod不会被更新,例如有5个副本,partition设置为2,那么在更新sts时,0和1号pod不会更新,2 3 4号pod则会更新重建;此时继续将partition缩减为0,则0 1号pod也会更新重建。默认partition为0,即所有的pod都会更新。这个参数一般不会使用,但可用在发布时动态更新递减partition的值,来实现滚动灰度发布。

  • OnDeleteStatefulSetStrategyType, 此策略下controller不会对pod做任何操作,由手动删除pod来触发新pod的创建

删除

删除sts时,可以指定级联模式的参数--cascade=true,默认为true,意思是删除sts会同时删除它所管理的pod。设置为false时,删除sts不会影响pod的运行,且sts重建后依然能与此前的pod关联起来(这种方式可能会产生孤儿pod)。

关联关系

先来看看sts和pod的关联方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# sts
[root@008019 ~]# kubectl get sts deptest11dev
NAME READY AGE
deptest11dev 2/2 99d

# pod
[root@008019 ~]# kubectl get pods | grep deptest11dev
deptest11dev-0 1/1 Running 1 99d
deptest11dev-1 1/1 Running 0 3d17h

# edit pod
# 可以查看到pod的ownerReferences字段,与sts关联
ownerReferences:
- apiVersion: apps/v1
blockOwnerDeletion: true
controller: true
kind: StatefulSet
name: deptest11dev
uid: 28ecf735-2ab4-11ea-afa8-1866daf0f324

# 可以查看到pod的labels标签,新增了一个controller-revision-hash标签,与controllerRevision关联
labels:
app: deptest11dev
controller-revision-hash: deptest11dev-587f8bd845
statefulset.kubernetes.io/pod-name: deptest11dev-1

再来看看sts和ControllerRevision关联方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
[root@008019 ~]# kubectl get sts deptest11dev
NAME READY AGE
deptest11dev 2/2 99d



[root@008019 ~]# kubectl get ControllerRevisions | grep deptest11dev
deptest11dev-587f8bd845 statefulset.apps/deptest11dev 1 99d

[root@008019 ~]# kubectl get ControllerRevisions deptest11dev-587f8bd845
NAME CONTROLLER REVISION AGE
deptest11dev-587f8bd845 statefulset.apps/deptest11dev 1 99d


# ControllerRevisions资源中的ownerReferences字段,可以看出sts与其通过这个字段关联
ownerReferences:
- apiVersion: apps/v1
blockOwnerDeletion: true
controller: true
kind: StatefulSet
name: deptest11dev
uid: 28ecf735-2ab4-11ea-afa8-1866daf0f324


# sts status字段,可以看出sts通过status下的currentRevision、updateRevision字段与ControllerRevision关联
status:
collisionCount: 0
currentReplicas: 2
currentRevision: deptest11dev-587f8bd845
observedGeneration: 3
readyReplicas: 2
replicas: 2
updateRevision: deptest11dev-587f8bd845
updatedReplicas: 2

# 对sts.spec字段里的内容更新后引起pod重建,sts开始滚动更新,此时sts的status字段内容如下:
status:
collisionCount: 0
currentReplicas: 1
currentRevision: deptest11dev-587f8bd845
observedGeneration: 4
readyReplicas: 2
replicas: 2
# 这时可以发现updateRevision字段更新为了新的revision,即updateRevision是最近一次更新的Revision
updateRevision: deptest11dev-7487498978

# 修改sts进行缩容/扩容 时的status字段:
status:
collisionCount: 0
currentReplicas: 3
currentRevision: deptest11dev-7487498978
observedGeneration: 5
readyReplicas: 3
replicas: 3
# revision不会更新
updateRevision: deptest11dev-7487498978
updatedReplicas: 3

记住这几者之间双向地关联方式,下面会提到。

StatefulSet Controller

初始化

cmd/kube-controller-manager/app/apps.go:59

1
2
3
4
5
6
7
8
9
10
11
12
13
func startStatefulSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}] {
return nil, false, nil
}
go statefulset.NewStatefulSetController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Apps().V1().StatefulSets(),
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Apps().V1().ControllerRevisions(),
ctx.ClientBuilder.ClientOrDie("statefulset-controller"),
).Run(1, ctx.Stop)
return nil, true, nil
}

先来看看NewStatefulSetController做了什么:

==> pkg/controller/statefulset/stateful_set.go:81

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func NewStatefulSetController(
// 1.StatefulSetController关注四种类型的资源: Pod/Sts/PVC/ControllerRevision
podInformer coreinformers.PodInformer,
setInformer appsinformers.StatefulSetInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
revInformer appsinformers.ControllerRevisionInformer,
kubeClient clientset.Interface,
) *StatefulSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})

// 2.NewDefaultStatefulSetControl方法需要关注
ssc := &StatefulSetController{
kubeClient: kubeClient,
control: NewDefaultStatefulSetControl(
NewRealStatefulPodControl(
kubeClient,
setInformer.Lister(),
podInformer.Lister(),
pvcInformer.Lister(),
recorder),
NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
history.NewHistory(kubeClient, revInformer.Lister()),
recorder,
),
pvcListerSynced: pvcInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},

revListerSynced: revInformer.Informer().HasSynced,
}
// 当sts管理的pod curd时对应的处理方法(按入workqueue/更新pod/删除pod)
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// lookup the statefulset and enqueue
AddFunc: ssc.addPod,
// lookup current and old statefulset if labels changed
UpdateFunc: ssc.updatePod,
// lookup statefulset accounting for deletion tombstones
DeleteFunc: ssc.deletePod,
})
ssc.podLister = podInformer.Lister()
ssc.podListerSynced = podInformer.Informer().HasSynced

// 当sts curd时对应的方法(sts压入workqueue)
setInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: ssc.enqueueStatefulSet,
UpdateFunc: func(old, cur interface{}) {
oldPS := old.(*apps.StatefulSet)
curPS := cur.(*apps.StatefulSet)
if oldPS.Status.Replicas != curPS.Status.Replicas {
klog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas)
}
ssc.enqueueStatefulSet(cur)
},
DeleteFunc: ssc.enqueueStatefulSet,
},
statefulSetResyncPeriod,
)
ssc.setLister = setInformer.Lister()
ssc.setListerSynced = setInformer.Informer().HasSynced

// TODO: Watch volumes
// 返回ssc(StatefulSetController)
return ssc
}

先看注释1,可以发现,StatefulSetController关注四种类型的资源: Pod/Sts/PVC/ControllerRevision,其中的ControllerRevision不太熟悉,先找出来看下它的结构,逐级跳转:

cmd/kube-controller-manager/app/apps.go:63

==> vendor/k8s.io/client-go/informers/apps/v1/interface.go:28

==>vendor/k8s.io/client-go/informers/apps/v1/controllerrevision.go:38

==> vendor/k8s.io/client-go/listers/apps/v1/controllerrevision.go:29

==> vendor/k8s.io/api/apps/v1/types.go:800

1
2
3
4
5
6
7
8
9
10
11
12
13
type ControllerRevision struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

// Data is the serialized representation of the state.
Data runtime.RawExtension `json:"data,omitempty" protobuf:"bytes,2,opt,name=data"`

// Revision indicates the revision of the state represented by Data.
Revision int64 `json:"revision" protobuf:"varint,3,opt,name=revision"`
}

阅读这个结构体上方的注释可以得知,ControllerRevision提供给DaemonSet和StatefulSet用作更新和回滚,ControllerRevision存放的是数据的快照,ControllerRevision生成之后内容是不可修改的,由调用端来负责序列化写入和反序列化读取。其中Revision(int64)字段相当于ControllerRevision的版本id号,Data字段则存放序列化后的数据。

画外音:不难猜测,StatefulSet的更新以及回滚(也是一种特殊的更新)操作,是基于对新旧ControllerRevision的对比来进行的

在来看下注释2,NewDefaultStatefulSetControl方法:

pkg/controller/statefulset/stateful_set.go:95

==> pkg/controller/statefulset/stateful_set_control.go:54

1
2
3
4
5
6
7
func NewDefaultStatefulSetControl(
podControl StatefulPodControlInterface,
statusUpdater StatefulSetStatusUpdaterInterface,
controllerHistory history.Interface,
recorder record.EventRecorder) StatefulSetControlInterface {
return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
}

NewDefaultStatefulSetControl返回的defaultStatefulSetControl结构体对象是sts管理控制逻辑的语义实现,defaultStatefulSetControl结构体里面包含了sts控制过程中的各种接口:

  1. 管理sts对应的pod/pvc(podControl)的方法接口,有(CreateStatefulPod/UpdateStatefulPod/DeleteStatefulPod)这几个方法,通过NewRealStatefulPodControl函数返回的realStatefulPodControl结构体对象来实现
  2. 管理sts status状态的更新(statusUpdater)的方法接口,有UpdateStatefulSetStatus这一个方法,通过NewRealStatefulSetStatusUpdater返回的realStatefulSetStatusUpdater结构体对象来实现。
  3. 管理ControllerRevision版本(controllerHistory) 的方法接口,有(ListControllerRevisions/CreateControllerRevision/DeleteControllerRevision/UpdateControllerRevision/AdoptControllerRevision/ReleaseControllerRevision)这几个方法,通过history.NewHistory返回的realHistory结构体对象来实现。

现在接着往下,去看看ssc(StatefulSetController) 运行的Run函数。

工作过程

*StatefulSetController.Run()函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer ssc.queue.ShutDown()

klog.Infof("Starting stateful set controller")
defer klog.Infof("Shutting down statefulset controller")

if !controller.WaitForCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(ssc.worker, time.Second, stopCh)
}

<-stopCh
}

wait.Until定时器前面已经讲过,不再复述,重点在于ssc.worker函数,代码里有多次跳跃:

pkg/controller/statefulset/stateful_set.go:159

==>pkg/controller/statefulset/stateful_set.go:410

==> pkg/controller/statefulset/stateful_set.go:399

==>pkg/controller/statefulset/stateful_set.go:415

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// sync syncs the given statefulset.
func (ssc *StatefulSetController) sync(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime))
}()
// key的样例: default/teststs,做个切割,拿到namespace和sts name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 获取到sts对象
set, err := ssc.setLister.StatefulSets(namespace).Get(name)
if errors.IsNotFound(err) {
klog.Infof("StatefulSet has been deleted %v", key)
return nil
}
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
return err
}

// labelSelector
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
// This is a non-transient error, so don't retry.
return nil
}

// 孤儿Revisions修正托管
if err := ssc.adoptOrphanRevisions(set); err != nil {
return err
}
// 获取到sts管理的pod
pods, err := ssc.getPodsForStatefulSet(set, selector)
if err != nil {
return err
}
// syncStatefulSet 执行sts sync
return ssc.syncStatefulSet(set, pods)
}

来分步看下

孤儿Revisions修正托管

上面指出sts和revision两者之间显示地双向指定字段来关联对方,明白这一点那么这个函数就好理解了。

出现孤儿ControllerRevisions的原因,很有可能是sts在此期间进行了反复的更新,更新时间差之中产生了脏数据.

pkg/controller/statefulset/stateful_set.go:316

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// adoptOrphanRevisions adopts any orphaned ControllerRevisions matched by set's Selector.
func (ssc *StatefulSetController) adoptOrphanRevisions(set *apps.StatefulSet) error {
// 通过sts指定的revision相关字段找到对应的revisions
revisions, err := ssc.control.ListRevisions(set)
if err != nil {
return err
}
hasOrphans := false
for i := range revisions {
// 通过revision指定的controller来源,来找sts。如果指定绑定的sts为空,那么说明此ControllerRevisions是孤儿状态(无托管),需要回收
if metav1.GetControllerOf(revisions[i]) == nil {
hasOrphans = true
break
}
}

// 出现孤儿ControllerRevisions的原因,很有可能是sts在此期间进行了反复的更新,因此重新获取一次最新的sts
if hasOrphans {
fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(set.Name, metav1.GetOptions{})
if err != nil {
return err
}
// sts(old) 若与fresh sts uid不同,则说明期间sts可能经历了删除重建,本次逻辑的流程打破,抛错返回
if fresh.UID != set.UID {
return fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID)
}
// 为这些controller sts指定为空的revision,若label匹配则加上ownerReferences sts指定,若label不匹配则gc
return ssc.control.AdoptOrphanRevisions(set, revisions)
}
return nil
}

获取到sts管理的pod

pkg/controller/statefulset/stateful_set.go:285

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) {
// List all pods to include the pods that don't match the selector anymore but
// has a ControllerRef pointing to this StatefulSet.
pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything())
if err != nil {
return nil, err
}

// filter函数的作用是判断指定的pod和sts是否有所属关系,展开代码可以看到判断的方式很简单,对pod的名称做re字符串切割,最后一个"-"之前的字符串是parent,之后的数字是序号索引,判断parent与sts name是否一致,一致则为true,pod 属于 sts,不一致则为false
filter := func(pod *v1.Pod) bool {
// Only claim if it matches our StatefulSet name. Otherwise release/ignore.
return isMemberOf(set, pod)
}


// 如同revision一样,若存在孤儿pod,也需要对孤儿pod进行收养,与sts label匹配则加上关联,label不匹配则解除关联。
canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(set.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if fresh.UID != set.UID {
return nil, fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID)
}
return fresh, nil
})

cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind, canAdoptFunc)
// 执行筛选
return cm.ClaimPods(pods, filter)
}

ClaimPods

pkg/controller/controller_ref_manager.go:171

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {
var claimed []*v1.Pod
var errlist []error


match := func(obj metav1.Object) bool {
pod := obj.(*v1.Pod)
// 先根据标签匹配pod,仅当标签匹配通过后,再匹配下一步(sts调用则是按照上面说的取 pod name 字符串切割后与sts name对比)
if !m.Selector.Matches(labels.Set(pod.Labels)) {
return false
}
for _, filter := range filters {
if !filter(pod) {
return false
}
}
return true
}
adopt := func(obj metav1.Object) error {
// 收养pod(添加关联关系),即为pod.metadata patch ownerReferences字段。
return m.AdoptPod(obj.(*v1.Pod))
}
release := func(obj metav1.Object) error {
// 释放pod关联关系,即为pod.metadata delete ownerReferences字段。
return m.ReleasePod(obj.(*v1.Pod))
}

for _, pod := range pods {
// 判断单个pod是否匹配,收养/释放孤儿pod的函数ClaimObject
ok, err := m.ClaimObject(pod, match, adopt, release)
if err != nil {
errlist = append(errlist, err)
continue
}
if ok {
claimed = append(claimed, pod)
}
}
return claimed, utilerrors.NewAggregate(errlist)
}

####

####ClaimObject

pkg/controller/controller_ref_manager.go:66

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (m *BaseControllerRefManager) ClaimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) {
// 1 获取到pod.metadata中的ownerReferences字段
controllerRef := metav1.GetControllerOf(obj)
// 1-1 如果pod存在ownerReferences,则直接进入判断是否match
if controllerRef != nil {
if controllerRef.UID != m.Controller.GetUID() {
// Owned by someone else. Ignore.
return false, nil
}
// 1-2 匹配则返回true
if match(obj) {
return true, nil
}

if m.Controller.GetDeletionTimestamp() != nil {
return false, nil
}

// 1-3 不匹配则pod释放关联字段,返回false
if err := release(obj); err != nil {
// If the pod no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}
// Successfully released.
return false, nil
}

// 2 孤儿pod,则要根据情况判断是否收养/释放
// 2-1 已删除的sts或match规则不匹配,返回false
if m.Controller.GetDeletionTimestamp() != nil || !match(obj) {
// Ignore if we're being deleted or selector doesn't match.
return false, nil
}
if obj.GetDeletionTimestamp() != nil {
// Ignore if the object is being deleted
return false, nil
}
// Selector matches. Try to adopt.
if err := adopt(obj); err != nil {
// If the pod no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
}
// Either someone else claimed it first, or there was a transient error.
// The controller should requeue and try again if it's still orphaned.
return false, err
}
// 收养成功返回true
return true, nil
}

到这里,所有应当被sts管理的pod(包括孤儿pod)就过滤完毕了,开始执行真正的sts sync。

syncStatefulSet

在找到了所有管理的pod后,就要开始sts 的sync,进行更新sts及更新pod的操作了,回到这里:

pkg/controller/statefulset/stateful_set.go:451

==> pkg/controller/statefulset/stateful_set.go:458

==> pkg/controller/statefulset/stateful_set_control.go:75

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {

// 取出sts所有的revision并排序
revisions, err := ssc.ListRevisions(set)
if err != nil {
return err
}
history.SortControllerRevisions(revisions)

// 获得当前revision,以及更新后最新的revision
currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
if err != nil {
return err
}

// 核心方法,对pod进行操作
status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
if err != nil {
return err
}

// 操作完成后记录修改sts.status
err = ssc.updateStatefulSetStatus(set, status)
if err != nil {
return err
}

klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d",
set.Namespace,
set.Name,
status.Replicas,
status.ReadyReplicas,
status.CurrentReplicas,
status.UpdatedReplicas)

klog.V(4).Infof("StatefulSet %s/%s revisions current=%s update=%s",
set.Namespace,
set.Name,
status.CurrentRevision,
status.UpdateRevision)

// 对set的revision history进行维护
return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
}

这里面最核心的函数是updateStatefulSetStatus,接着往下

updateStatefulSet

这一个函数内容很多,200多行代码,需要说明的地方会在下面代码中注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
func (ssc *defaultStatefulSetControl) updateStatefulSet(
set *apps.StatefulSet,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
collisionCount int32,
pods []*v1.Pod) (*apps.StatefulSetStatus, error) {

// 获取到当前sts currentSet,然后获取到需更新到的sts updateSet。要实现的更新效果是:

// 1.滚动更新时,在未指定partition时,使当前sts的管理的pod缩减为0,updateSet的ready pod数 = spec.replicas
// 2.滚动更新时,在未指定partition后,大于等于partition的pod全部归于updateSet,小于partition值的pod还是归属于原currentSet
// 3.OnDelete更新时,do nothing
currentSet, err := ApplyRevision(set, currentRevision)
if err != nil {
return nil, err
}

updateSet, err := ApplyRevision(set, updateRevision)
if err != nil {
return nil, err
}

// set the generation, and revisions in the returned status
// 重新计算sts的status
status := apps.StatefulSetStatus{}
status.ObservedGeneration = set.Generation
status.CurrentRevision = currentRevision.Name
status.UpdateRevision = updateRevision.Name
status.CollisionCount = new(int32)
*status.CollisionCount = collisionCount

replicaCount := int(*set.Spec.Replicas)
// replicas是合法副本,将满足 0 <= pod序号 < sts.spec.replicas的pod,放到这个slice里来。这里面的pod都是要保证ready的
replicas := make([]*v1.Pod, replicaCount)
// condemned是非法副本,将满足 pod序号 >= sts.spec.replicas的pod,放到这个slice里来,这些pod是要删除掉的(可能是被缩容掉的)
condemned := make([]*v1.Pod, 0, len(pods))
unhealthy := 0
firstUnhealthyOrdinal := math.MaxInt32
var firstUnhealthyPod *v1.Pod

// First we partition pods into two lists valid replicas and condemned Pods
for i := range pods {
status.Replicas++

// status.ReadyReplicas计数
if isRunningAndReady(pods[i]) {
status.ReadyReplicas++
}

if isCreated(pods[i]) && !isTerminating(pods[i]) {
// 通过pod的controller-revision-hash label,判断pod属于currentSet还是UpdatedSet,分别计数
if getPodRevision(pods[i]) == currentRevision.Name {
status.CurrentReplicas++
}
if getPodRevision(pods[i]) == updateRevision.Name {
status.UpdatedReplicas++
}
}

if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount {
// 将满足 0 <= pod序号 < sts.spec.replicas的pod,放到replicas这个slice里来
replicas[ord] = pods[i]

} else if ord >= replicaCount {
// 将满足 pod序号 >= sts.spec.replicas的pod,放到condemned这个slice里来,这些pod是要删除掉的。
condemned = append(condemned, pods[i])
}
}


// replicas slice之中如果有索引位置为空,则需要填充相应的pod。
// 根据currentSet.replicas/UpdatedSet.replicas/partition这三个值来判断pod是基于current revision还是基于update revision创建
for ord := 0; ord < replicaCount; ord++ {
if replicas[ord] == nil {
replicas[ord] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name, ord)
}
}

// 对需要删除的非法pod按照序号从大到小的顺序排序
sort.Sort(ascendingOrdinal(condemned))

// 如果有不健康的pod,也需要删除,但还是遵循串行的原则,优先删除非法pod中序号最大的,再到合法副本中的序号最小的。
for i := range replicas {
if !isHealthy(replicas[i]) {
unhealthy++
if ord := getOrdinal(replicas[i]); ord < firstUnhealthyOrdinal {
firstUnhealthyOrdinal = ord
firstUnhealthyPod = replicas[i]
}
}
}

for i := range condemned {
if !isHealthy(condemned[i]) {
unhealthy++
if ord := getOrdinal(condemned[i]); ord < firstUnhealthyOrdinal {
firstUnhealthyOrdinal = ord
firstUnhealthyPod = condemned[i]
}
}
}

if unhealthy > 0 {
klog.V(4).Infof("StatefulSet %s/%s has %d unhealthy Pods starting with %s",
set.Namespace,
set.Name,
unhealthy,
firstUnhealthyPod.Name)
}

// If the StatefulSet is being deleted, don't do anything other than updating
// status.
if set.DeletionTimestamp != nil {
return &status, nil
}

monotonic := !allowsBurst(set)

// 根据pod的序号,对它们依次进行检查并操作。
for i := range replicas {
// 错误状态的pod删除重建
if isFailed(replicas[i]) {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
"StatefulSet %s/%s is recreating failed Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
return &status, err
}
if getPodRevision(replicas[i]) == currentRevision.Name {
status.CurrentReplicas--
}
if getPodRevision(replicas[i]) == updateRevision.Name {
status.UpdatedReplicas--
}
status.Replicas--
replicas[i] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name,
i)
}
// pod没有被创建(可能是上面刚填充的),就创建pod
if !isCreated(replicas[i]) {
if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil {
return &status, err
}
status.Replicas++
if getPodRevision(replicas[i]) == currentRevision.Name {
status.CurrentReplicas++
}
if getPodRevision(replicas[i]) == updateRevision.Name {
status.UpdatedReplicas++
}

// 如果不允许burst,直接返回
if monotonic {
return &status, nil
}
// pod created, no more work possible for this round
continue
}
// 如果不允许burst,对于终结中的pod不采取任何逻辑,等待它终结完毕后下一轮再操作。
if isTerminating(replicas[i]) && monotonic {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to Terminate",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// 如果是正在创建中的pod(还未达到ready状态),同样不采取任何操作,因为需要保证创建操作依次有序
if !isRunningAndReady(replicas[i]) && monotonic {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}

// 如果此pod与sts已经匹配(ready),且存储满足sts、pod的要求,那么这个pod就是合格的pod,continue
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
continue
}

// 确保pod与sts的标签关联,以及为pod准备好它需要的pvc
replica := replicas[i].DeepCopy()
if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil {
return &status, err
}
}


// 上面的合法副本得以保证之后,下面要开始按pod序号从大到小的顺序,删除非法pod了
for target := len(condemned) - 1; target >= 0; target-- {
// 终结中的pod不再处理,直接返回,等待下一轮检查
if isTerminating(condemned[target]) {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to Terminate prior to scale down",
set.Namespace,
set.Name,
condemned[target].Name)
// block if we are in monotonic mode
if monotonic {
return &status, nil
}
continue
}

// 如果此非法pod不是ready状态,且不允许burst,且它不是优先级第一的非健康pod,不做任何操作。换而言之,即使是删除非健康的pod,也要按照序号从大到小的顺序串行执行。
if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready prior to scale down",
set.Namespace,
set.Name,
firstUnhealthyPod.Name)
return &status, nil
}
// 开始删除此pod,更新status
klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for scale down",
set.Namespace,
set.Name,
condemned[target].Name)

if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil {
return &status, err
}
if getPodRevision(condemned[target]) == currentRevision.Name {
status.CurrentReplicas--
}
if getPodRevision(condemned[target]) == updateRevision.Name {
status.UpdatedReplicas--
}
if monotonic {
return &status, nil
}
}

// OnDelete更新模式下,不自动删除pod,需要手动删除pod来触发更新
if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
return &status, nil
}

// 经过上面那么多条件的过滤和准备,现在要开始对replicas里的合法pod进行检查了
updateMin := 0
if set.Spec.UpdateStrategy.RollingUpdate != nil {
updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
}
// 按pod的序号倒序检查
for target := len(replicas) - 1; target >= updateMin; target-- {

// 如果pod的revision不符合updateRevision,那么删除重建此pod
if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update",
set.Namespace,
set.Name,
replicas[target].Name)
err := ssc.podControl.DeleteStatefulPod(set, replicas[target])
status.CurrentReplicas--
return &status, err
}

// 合法pod更新过程中,还未到达ready状态的pod,等待它
if !isHealthy(replicas[target]) {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to update",
set.Namespace,
set.Name,
replicas[target].Name)
return &status, nil
}

}
return &status, nil
}

updateStatefulSet函数总结

  1. 每个循环的周期中,最多操作一个pod
  2. 根据sts.spec.replicas对比现有pod的序号,对pod进行划分,一部分划为合法(保留/重建),一部分划为非法(删除)
  3. 对pods进行划分,一部分划入current(old) set阵营,另一部分划入update(new) set阵营
  4. 更新过程中,无论是删减、还是新建,都保持pod数量固定,有序地递增、递减
  5. 最终保证所有的pod都归属于update revision

总结

statefulset 在设计上与 deployment 有许多不同的地方,例如:

  • deployment通过rs管理pod,sts通过controllerRevision管理pod;

  • deployment curd是无序的,sts强保证有序curd

  • sts需要检查存储的匹配

在了解sts管理操作pod方式的基础上来看代码,会有许多的帮助。

赏一瓶快乐回宅水吧~
-------------本文结束感谢您的阅读-------------