实现CRD控制逻辑

前言

上一篇已经设定了Unit所要实现的目标,完成了Unit结构体各子字段、ownResource字段的填充,为控制逻辑的实现做了基础铺垫。

本篇主要解决和实现的控制逻辑:

  • 如何管理Unit下属的own Resources
  • 如何使Unit和own Resources生命周期绑定
  • 删除Unit资源前能否做一些自定义操作

逐一来实现。

管理own Resources

如前文所说,一共设计了5种ownResource分别对应StatefulSet/Deployment/Ingress/Service/Ingress这5种资源,每一种的管理无一例外需要进行增删改查,管理方式大同小异,因此,提炼了一个通用的接口:

controllers/unit_controller.go:34

type OwnResource interface {
    // 根据Unit的指定,生成相应的各类own build-in资源对象,用作创建或更新
    MakeOwnResource(instance *extensionsv1beta1.Mycrd, logger logr.Logger, scheme *runtime.Scheme) (interface{}, error)

    // 判断此ownResource资源是否已存在
    OwnResourceExist(instance *extensionsv1beta1.Mycrd, client client.Client, logger logr.Logger) (bool, interface{}, error)

    // 获取对应的own build-in资源的状态,用来填充Mycrd的status字段
    UpdateOwnResourceStatus(instance *extensionsv1beta1.Mycrd, client client.Client, logger logr.Logger) (*extensionsv1beta1.Mycrd, error)

    // 创建/更新 Mycrd对应的own build-in资源
    ApplyOwnResource(instance *extensionsv1beta1.Mycrd, client client.Client, logger logr.Logger, scheme *runtime.Scheme) error
}

对应有4个接口方法,分别应用于:

  • MakeOwnResource() 根据Unit的指定,生成相应的各类own build-in资源对象,用作创建或更新
  • OwnResourceExist() 判断此ownResource资源是否已存在
  • UpdateOwnResourceStatus() 获取对应的own build-in资源的状态,用来填充Mycrd的status字段
  • ApplyOwnResource() 创建/更新 Mycrd对应的own build-in资源

也即是说,每种ownResource结构体都要实现这4个方法,同时要注意,这4种方法被用作CUR(不包括D),要求是幂等性的,多次执行结果一致。

篇幅有限,这里只列举ownStatefulSet的实现方法,其他的几种资源的实现可直接去github库里查看。

ownStatefulSet 的接口方法实现

MakeOwnResource

api/v1/own_statefulSet.go:22

type OwnStatefulSet struct {
    Spec appsv1.StatefulSetSpec
}

func (ownStatefulSet *OwnStatefulSet) MakeOwnResource(instance *Unit, logger logr.Logger,
    scheme *runtime.Scheme) (interface{}, error) {

    // new a StatefulSet object
    sts := &appsv1.StatefulSet{
        // metadata field inherited from owner Unit
        ObjectMeta: metav1.ObjectMeta{Name: instance.Name, Namespace:instance.Namespace, Labels: instance.Labels},
        Spec:       ownStatefulSet.Spec,
    }

    // add some customize envs, ignore this step if you don't need it
    customizeEnvs := []v1.EnvVar{
        {
            Name: "POD_NAME",
            ValueFrom: &v1.EnvVarSource{
                FieldRef: &v1.ObjectFieldSelector{
                    APIVersion: "v1",
                    FieldPath:  "metadata.name",
                },
            },
        },
        {
            Name:  "APPNAME",
            Value: instance.Name,
        },
    }

    var specEnvs []v1.EnvVar
    templateEnvs := sts.Spec.Template.Spec.Containers[0].Env
    for index := range templateEnvs {
        if templateEnvs[index].Name != "POD_NAME" && templateEnvs[index].Name != "APPNAME" {
            specEnvs = append(specEnvs, templateEnvs[index])
        }
    }

    sts.Spec.Template.Spec.Containers[0].Env = append(specEnvs, customizeEnvs...)

    // add ControllerReference for sts,the owner is Unit object
  // 这一步在下方会解释它的作用
    if err := controllerutil.SetControllerReference(instance, sts, scheme); err != nil {
        msg := fmt.Sprintf("set controllerReference for StatefulSet %s/%s failed", instance.Namespace, instance.Name)
        logger.Error(err, msg)
        return nil, err
    }

    return sts, nil
}

OwnResourceExist

// Check if the StatefulSet already exists
func (ownStatefulSet *OwnStatefulSet) OwnResourceExist(instance *Unit, client client.Client,
    logger logr.Logger) (bool, interface{}, error) {

    found := &appsv1.StatefulSet{}
    err := client.Get(context.TODO(), types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace}, found)
    if err != nil {
        if errors.IsNotFound(err) {
            return false, nil, nil
        }
        msg := fmt.Sprintf("StatefulSet %s/%s found, but with error: %s  \n", instance.Namespace, instance.Name)
        logger.Error(err, msg)
        return true, found, err
    }
    return true, found, nil
}

ApplyOwnResource

// apply this own resource, create or update
func (ownStatefulSet *OwnStatefulSet) ApplyOwnResource(instance *Unit, client client.Client,
    logger logr.Logger, scheme *runtime.Scheme) error {

    // assert if StatefulSet exist
    exist, found, err := ownStatefulSet.OwnResourceExist(instance, client, logger)
    if err != nil {
        return err
    }

    // make StatefulSet object
    sts, err := ownStatefulSet.MakeOwnResource(instance, logger, scheme)
    if err != nil {
        return err
    }
    newStatefulSet := sts.(*appsv1.StatefulSet)

    // apply the StatefulSet object just make
    if !exist {
        // if StatefulSet not exist,then create it
        msg := fmt.Sprintf("StatefulSet %s/%s not found, create it!", newStatefulSet.Namespace, newStatefulSet.Name)
        logger.Info(msg)
        if err := client.Create(context.TODO(), newStatefulSet); err != nil {
            return err
        }
        return nil

    } else {
        foundStatefulSet := found.(*appsv1.StatefulSet)

        // if StatefulSet exist with change,then try to update it
        if !reflect.DeepEqual(newStatefulSet.Spec, foundStatefulSet.Spec) {
            msg := fmt.Sprintf("Updating StatefulSet %s/%s", newStatefulSet.Namespace, newStatefulSet.Name)
            logger.Info(msg)
            return client.Update(context.TODO(), newStatefulSet)
        }
        return nil
    }
}

UpdateOwnResourceStatus

func (ownStatefulSet *OwnStatefulSet) UpdateOwnResourceStatus(instance *Unit, client client.Client,
    logger logr.Logger) (*Unit, error) {

    found := &appsv1.StatefulSet{}
    err := client.Get(context.TODO(), types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace}, found)
    if err != nil {
        return instance, err
    }
    instance.Status.BaseStatefulSet = found.Status
    instance.Status.LastUpdateTime = metav1.Now()
    return instance, nil
}

接口方法实现后,就要在Reconcile内调用了。

Apply OwnResources

getOwnResources

apply之前首先要生成所有的OwnResources对象,这个方法用来生成所有的ownResource对象放到一个array里。详情看注释

controllers/unit_controller.go:220

// 根据Unit.Spec生成其所有的own resource
func (r *UnitReconciler) getOwnResources(instance *customv1.Unit) ([]OwnResource, error) {
    var ownResources []OwnResource

    // Deployment 和StatefulSet 二者只能存在其一。由于可以动态选择,所以ownDeployment或ownStatefulSet在后端生成,不由前端指定
    if instance.Spec.Category == "Deployment" {
        ownDeployment := customv1.OwnDeployment{
            Spec: appsv1.DeploymentSpec{
                Replicas: instance.Spec.Replicas,
                Selector: instance.Spec.Selector,
                Template: instance.Spec.Template,
            },
        }
        ownDeployment.Spec.Template.Labels = instance.Spec.Selector.MatchLabels
        ownResources = append(ownResources, &ownDeployment)

    } else {
        ownStatefulSet := &customv1.OwnStatefulSet{
            Spec: appsv1.StatefulSetSpec{
                Replicas:    instance.Spec.Replicas,
                Selector:    instance.Spec.Selector,
                Template:    instance.Spec.Template,
                ServiceName: instance.Name,
            },
        }

        ownResources = append(ownResources, ownStatefulSet)
    }

    // 将关联的资源(svc/ing/pvc)加入ownResources中
    if instance.Spec.RelationResource.Service != nil {
        ownResources = append(ownResources, instance.Spec.RelationResource.Service)
    }
    if instance.Spec.RelationResource.Ingress != nil {
        ownResources = append(ownResources, instance.Spec.RelationResource.Ingress)
    }
    if instance.Spec.RelationResource.PVC != nil {
        ownResources = append(ownResources, instance.Spec.RelationResource.PVC)
    }
    return ownResources, nil
}

apply

    // 3. 创建或更新操作
    // 3.1 根据Unit.spec 生成Unit关联的所有own build-in resource
    ownResources, err := r.getOwnResources(instance)
    if err != nil {
        msg := fmt.Sprintf("%s %s Reconciler.getOwnResource() function error", instance.Namespace, instance.Name)
        r.Log.Error(err, msg)
        return ctrl.Result{}, err
    }

    // 3.2 判断各own resource 是否存在,不存在则创建,存在则判断spec是否有变化,有变化则更新
    success := true
    for _, ownResource := range ownResources {
        if err = ownResource.ApplyOwnResource(instance, r.Client, r.Log, r.Scheme); err != nil {
            fmt.Println("apply resource err:", err)
            success = false
        }
    }

这里就用到了ApplyOwnResource接口方法了。Apply完成后,下一步就需要更新状态了

Update OwnResourceStatus

controllers/unit_controller.go:152

    // 4. update Unit.status
    // 4.1 更新实例Unit.Status 字段
    updateInstance := instance.DeepCopy()
    for _, ownResource := range ownResources {
        updateInstance, err = ownResource.UpdateOwnResourceStatus(updateInstance, r.Client, r.Log)
        if err != nil {
            //fmt.Println("update Unit ownresource status error:", err)
            success = false
        }
    }

    // 4.2 apply update to apiServer if status changed
    if updateInstance != nil && !reflect.DeepEqual(updateInstance.Status, instance.Status) {
        if err := r.Status().Update(context.Background(), updateInstance); err != nil {
            r.Log.Error(err, "unable to update Unit status")
        }
    }

由于Status Update可能比较频繁,因此有两点值得一提:

  • 读请求走的是informer local storage的cache,而写请求是发起APIServer的直连,为了减轻APIServer的压力,要尽可能的减少写请求,所以这里设计为所有的OwnResouces更新完毕后才发起一次Update请求。
  • 为了与Spec的写操作区分开,这里使用的是r.Status().Update()的status局部更新方法,而不是r.Update()整体更新的方法,这样可以尽量避免写操作的并发冲突。
  • 如果连续多次Update,每次Update后Resource Object的Revision会更新,因此每次Update完成后,需要重新Get后才能再次Update,否则会报错。

生命周期绑定

上面描述了创建/更新Unit时如何同步Apply更新到own resources,那如何保证Unit生命周期结束时(删除),own resources跟随一起结束呢?

实现的方式非常简单,K8s已经替我们实现了,只需要给own resource加上一组特殊的标记即可。

还是以StatefulSet举例,回顾上面的OwnStatefulSet.MakeOwnResource()方法,你会看到这么几行代码:

func (ownStatefulSet *OwnStatefulSet) MakeOwnResource(instance *Unit, logger logr.Logger,
    scheme *runtime.Scheme) (interface{}, error) {
  ...

    // add ControllerReference for sts,the owner is Unit object
    if err := controllerutil.SetControllerReference(instance, sts, scheme); err != nil {
        msg := fmt.Sprintf("set controllerReference for StatefulSet %s/%s failed", instance.Namespace, instance.Name)
        logger.Error(err, msg)
        return nil, err
    }

    ...
}

controllerutil.SetControllerReference(OwnerObj, OwnObj, scheme)方法,可以为被管理的own resource实例加上控制来源描述,这样,它便可与所属对象实现生命周期的绑定了。

这个方法在K8s build-in资源中也多处使用,例如StatefulSet用来管理Pod。找一个被Sts管理的pod实例来看看:

加上这个ownerReferences描述之后,owner 删除前,也会同步删除own resources.

当然,如果在删除owner时希望非级联删除,可以在kubectl命令末尾追加--cascade=false参数。

PreDelete操作

在上面添加了SetControllerReference的步骤后,默认的PreDelete策略是: 在删除Owner之前,先确保删除所有的Own resources,如果放任删除干净了,即使是local cache也找不到资源的信息。如果需要在此之前实现自定义的删除前操作,添加额外的自定义PreDelete钩子,可以按下面的方式实现。

PreDelete钩子

使用controller GC的Finalizer终结器机制,可以在Delete之前加入PreDelete钩子:

Advanced topics-Finalizer

翻译一下,如果资源对象被直接删除,就无法再读取任何被删除对象的信息,这就会导致后续的清理工作因为信息不足无法进行,Finalizer字段设计来处理这种情况:

  • Finalizer本身只是一串随机字符串标识,controller负责添加它和删除它。
  • 添加了Finalizer之后,delete操作就会变成update操作,即为对象加上deletionTimestamp时间戳
  • Finalizer已主动清空(视为清理后续的任务已处理完成)之后,当前时间大于deletionTimestamp,就会开始执行gc

显而易见,PreDelete钩子要放在主动清除Finalizer之前。来看代码:

controllers/unit_controller.go:98

    // 2. 删除操作
    // 如果资源对象被直接删除,就无法再读取任何被删除对象的信息,这就会导致后续的清理工作因为信息不足无法进行,Finalizer字段设计来处理这种情况:
    // 2.1 当资源对象 Finalizer字段不为空时,delete操作就会变成update操作,即为对象加上deletionTimestamp时间戳
    // 2.2 当 当前时间在deletionTimestamp时间之后,且Finalizer已清空(视为清理后续的任务已处理完成)的情况下,就会gc此对象了

    myFinalizerName := "storage.finalizers.tutorial.kubebuilder.io"
    //orphanFinalizerName := "orphan"

    // 2.1 DeletionTimestamp 时间戳为空,代表着当前对象不处于被删除的状态,为了开启Finalizer机制,先给它加上一段Finalizers,内容随机非空字符串即可
    if instance.ObjectMeta.DeletionTimestamp.IsZero() {
        // The object is not being deleted, so if it does not have our finalizer,
        // then lets add the finalizer and update the object. This is equivalent
        // registering our finalizer.
        if !containsString(instance.ObjectMeta.Finalizers, myFinalizerName) {
            instance.ObjectMeta.Finalizers = append(instance.ObjectMeta.Finalizers, myFinalizerName)
            if err := r.Update(ctx, instance); err != nil {
                r.Log.Error(err, "Add Finalizers error", instance.Namespace, instance.Name)
                return ctrl.Result{}, err
            }
        }
    } else {
        // 2.2  DeletionTimestamp不为空,说明对象已经开始进入删除状态了,执行自己的删除步骤后续的逻辑,并清除掉自己的finalizer字段,等待自动gc
        if containsString(instance.ObjectMeta.Finalizers, myFinalizerName) {

            // 在删除owner resource之前,先执行自定义的预删除步骤,例如删除owner resource
            if err := r.PreDelete(instance); err != nil {
                // if fail to delete the external dependency here, return with error
                // so that it can be retried
                return ctrl.Result{}, err
            }

            // 移出掉自定义的Finalizers,这样当Finalizers为空时,gc就会正式开始了
            instance.ObjectMeta.Finalizers = removeString(instance.ObjectMeta.Finalizers, myFinalizerName)
            if err := r.Update(ctx, instance); err != nil {
                return ctrl.Result{}, err
            }
        }

        // Stop reconciliation as the item is being deleted
        return ctrl.Result{}, nil
    }


// Unit pre delete logic
func (r *UnitReconciler) PreDelete(instance *customv1.Unit) error {
    // 特别说明,own resource加上了ControllerReference之后,owner resource gc删除前,会先自动删除它的所有
    // own resources,因此绑定ControllerReference后无需再特别处理删除own resource。

    // 这里留空出来,是为了如果有自定义的pre delete逻辑的需要,可在这里实现。

    return nil
}

// Helper functions to check and remove string from a slice of strings.
func containsString(slice []string, s string) bool {
    for _, item := range slice {
        if item == s {
            return true
        }
    }
    return false
}

func removeString(slice []string, s string) (result []string) {
    for _, item := range slice {
        if item == s {
            continue
        }
        result = append(result, item)
    }
    return
}

如此,在PreDelete()方法内部,就可以实现自定义的PreDelete钩子的逻辑了。

总结

控制器逻辑的核心,还是围绕着owner和own resources之间来进行的,在这个过程中,尽量减少写请求的频率,同时可以充分利用k8s现有的各种生命周期管理机制。

results matching ""

    No results matching ""