实现CRD控制逻辑
前言
上一篇已经设定了Unit所要实现的目标,完成了Unit结构体各子字段、ownResource字段的填充,为控制逻辑的实现做了基础铺垫。
本篇主要解决和实现的控制逻辑:
- 如何管理Unit下属的own Resources
- 如何使Unit和own Resources生命周期绑定
- 删除Unit资源前能否做一些自定义操作
逐一来实现。
管理own Resources
如前文所说,一共设计了5种ownResource分别对应StatefulSet/Deployment/Ingress/Service/Ingress这5种资源,每一种的管理无一例外需要进行增删改查,管理方式大同小异,因此,提炼了一个通用的接口:
controllers/unit_controller.go:34
type OwnResource interface {
// 根据Unit的指定,生成相应的各类own build-in资源对象,用作创建或更新
MakeOwnResource(instance *extensionsv1beta1.Mycrd, logger logr.Logger, scheme *runtime.Scheme) (interface{}, error)
// 判断此ownResource资源是否已存在
OwnResourceExist(instance *extensionsv1beta1.Mycrd, client client.Client, logger logr.Logger) (bool, interface{}, error)
// 获取对应的own build-in资源的状态,用来填充Mycrd的status字段
UpdateOwnResourceStatus(instance *extensionsv1beta1.Mycrd, client client.Client, logger logr.Logger) (*extensionsv1beta1.Mycrd, error)
// 创建/更新 Mycrd对应的own build-in资源
ApplyOwnResource(instance *extensionsv1beta1.Mycrd, client client.Client, logger logr.Logger, scheme *runtime.Scheme) error
}
对应有4个接口方法,分别应用于:
- MakeOwnResource() 根据Unit的指定,生成相应的各类own build-in资源对象,用作创建或更新
- OwnResourceExist() 判断此ownResource资源是否已存在
- UpdateOwnResourceStatus() 获取对应的own build-in资源的状态,用来填充Mycrd的status字段
- ApplyOwnResource() 创建/更新 Mycrd对应的own build-in资源
也即是说,每种ownResource结构体都要实现这4个方法,同时要注意,这4种方法被用作CUR(不包括D),要求是幂等性的,多次执行结果一致。
篇幅有限,这里只列举ownStatefulSet的实现方法,其他的几种资源的实现可直接去github库里查看。
ownStatefulSet 的接口方法实现
MakeOwnResource
api/v1/own_statefulSet.go:22
type OwnStatefulSet struct {
Spec appsv1.StatefulSetSpec
}
func (ownStatefulSet *OwnStatefulSet) MakeOwnResource(instance *Unit, logger logr.Logger,
scheme *runtime.Scheme) (interface{}, error) {
// new a StatefulSet object
sts := &appsv1.StatefulSet{
// metadata field inherited from owner Unit
ObjectMeta: metav1.ObjectMeta{Name: instance.Name, Namespace:instance.Namespace, Labels: instance.Labels},
Spec: ownStatefulSet.Spec,
}
// add some customize envs, ignore this step if you don't need it
customizeEnvs := []v1.EnvVar{
{
Name: "POD_NAME",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.name",
},
},
},
{
Name: "APPNAME",
Value: instance.Name,
},
}
var specEnvs []v1.EnvVar
templateEnvs := sts.Spec.Template.Spec.Containers[0].Env
for index := range templateEnvs {
if templateEnvs[index].Name != "POD_NAME" && templateEnvs[index].Name != "APPNAME" {
specEnvs = append(specEnvs, templateEnvs[index])
}
}
sts.Spec.Template.Spec.Containers[0].Env = append(specEnvs, customizeEnvs...)
// add ControllerReference for sts,the owner is Unit object
// 这一步在下方会解释它的作用
if err := controllerutil.SetControllerReference(instance, sts, scheme); err != nil {
msg := fmt.Sprintf("set controllerReference for StatefulSet %s/%s failed", instance.Namespace, instance.Name)
logger.Error(err, msg)
return nil, err
}
return sts, nil
}
OwnResourceExist
// Check if the StatefulSet already exists
func (ownStatefulSet *OwnStatefulSet) OwnResourceExist(instance *Unit, client client.Client,
logger logr.Logger) (bool, interface{}, error) {
found := &appsv1.StatefulSet{}
err := client.Get(context.TODO(), types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace}, found)
if err != nil {
if errors.IsNotFound(err) {
return false, nil, nil
}
msg := fmt.Sprintf("StatefulSet %s/%s found, but with error: %s \n", instance.Namespace, instance.Name)
logger.Error(err, msg)
return true, found, err
}
return true, found, nil
}
ApplyOwnResource
// apply this own resource, create or update
func (ownStatefulSet *OwnStatefulSet) ApplyOwnResource(instance *Unit, client client.Client,
logger logr.Logger, scheme *runtime.Scheme) error {
// assert if StatefulSet exist
exist, found, err := ownStatefulSet.OwnResourceExist(instance, client, logger)
if err != nil {
return err
}
// make StatefulSet object
sts, err := ownStatefulSet.MakeOwnResource(instance, logger, scheme)
if err != nil {
return err
}
newStatefulSet := sts.(*appsv1.StatefulSet)
// apply the StatefulSet object just make
if !exist {
// if StatefulSet not exist,then create it
msg := fmt.Sprintf("StatefulSet %s/%s not found, create it!", newStatefulSet.Namespace, newStatefulSet.Name)
logger.Info(msg)
if err := client.Create(context.TODO(), newStatefulSet); err != nil {
return err
}
return nil
} else {
foundStatefulSet := found.(*appsv1.StatefulSet)
// if StatefulSet exist with change,then try to update it
if !reflect.DeepEqual(newStatefulSet.Spec, foundStatefulSet.Spec) {
msg := fmt.Sprintf("Updating StatefulSet %s/%s", newStatefulSet.Namespace, newStatefulSet.Name)
logger.Info(msg)
return client.Update(context.TODO(), newStatefulSet)
}
return nil
}
}
UpdateOwnResourceStatus
func (ownStatefulSet *OwnStatefulSet) UpdateOwnResourceStatus(instance *Unit, client client.Client,
logger logr.Logger) (*Unit, error) {
found := &appsv1.StatefulSet{}
err := client.Get(context.TODO(), types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace}, found)
if err != nil {
return instance, err
}
instance.Status.BaseStatefulSet = found.Status
instance.Status.LastUpdateTime = metav1.Now()
return instance, nil
}
接口方法实现后,就要在Reconcile内调用了。
Apply OwnResources
getOwnResources
apply之前首先要生成所有的OwnResources对象,这个方法用来生成所有的ownResource对象放到一个array里。详情看注释
controllers/unit_controller.go:220
// 根据Unit.Spec生成其所有的own resource
func (r *UnitReconciler) getOwnResources(instance *customv1.Unit) ([]OwnResource, error) {
var ownResources []OwnResource
// Deployment 和StatefulSet 二者只能存在其一。由于可以动态选择,所以ownDeployment或ownStatefulSet在后端生成,不由前端指定
if instance.Spec.Category == "Deployment" {
ownDeployment := customv1.OwnDeployment{
Spec: appsv1.DeploymentSpec{
Replicas: instance.Spec.Replicas,
Selector: instance.Spec.Selector,
Template: instance.Spec.Template,
},
}
ownDeployment.Spec.Template.Labels = instance.Spec.Selector.MatchLabels
ownResources = append(ownResources, &ownDeployment)
} else {
ownStatefulSet := &customv1.OwnStatefulSet{
Spec: appsv1.StatefulSetSpec{
Replicas: instance.Spec.Replicas,
Selector: instance.Spec.Selector,
Template: instance.Spec.Template,
ServiceName: instance.Name,
},
}
ownResources = append(ownResources, ownStatefulSet)
}
// 将关联的资源(svc/ing/pvc)加入ownResources中
if instance.Spec.RelationResource.Service != nil {
ownResources = append(ownResources, instance.Spec.RelationResource.Service)
}
if instance.Spec.RelationResource.Ingress != nil {
ownResources = append(ownResources, instance.Spec.RelationResource.Ingress)
}
if instance.Spec.RelationResource.PVC != nil {
ownResources = append(ownResources, instance.Spec.RelationResource.PVC)
}
return ownResources, nil
}
apply
// 3. 创建或更新操作
// 3.1 根据Unit.spec 生成Unit关联的所有own build-in resource
ownResources, err := r.getOwnResources(instance)
if err != nil {
msg := fmt.Sprintf("%s %s Reconciler.getOwnResource() function error", instance.Namespace, instance.Name)
r.Log.Error(err, msg)
return ctrl.Result{}, err
}
// 3.2 判断各own resource 是否存在,不存在则创建,存在则判断spec是否有变化,有变化则更新
success := true
for _, ownResource := range ownResources {
if err = ownResource.ApplyOwnResource(instance, r.Client, r.Log, r.Scheme); err != nil {
fmt.Println("apply resource err:", err)
success = false
}
}
这里就用到了ApplyOwnResource接口方法了。Apply完成后,下一步就需要更新状态了
Update OwnResourceStatus
controllers/unit_controller.go:152
// 4. update Unit.status
// 4.1 更新实例Unit.Status 字段
updateInstance := instance.DeepCopy()
for _, ownResource := range ownResources {
updateInstance, err = ownResource.UpdateOwnResourceStatus(updateInstance, r.Client, r.Log)
if err != nil {
//fmt.Println("update Unit ownresource status error:", err)
success = false
}
}
// 4.2 apply update to apiServer if status changed
if updateInstance != nil && !reflect.DeepEqual(updateInstance.Status, instance.Status) {
if err := r.Status().Update(context.Background(), updateInstance); err != nil {
r.Log.Error(err, "unable to update Unit status")
}
}
由于Status Update可能比较频繁,因此有两点值得一提:
- 读请求走的是informer local storage的cache,而写请求是发起APIServer的直连,为了减轻APIServer的压力,要尽可能的减少写请求,所以这里设计为所有的OwnResouces更新完毕后才发起一次Update请求。
- 为了与Spec的写操作区分开,这里使用的是
r.Status().Update()
的status局部更新方法,而不是r.Update()
整体更新的方法,这样可以尽量避免写操作的并发冲突。 - 如果连续多次Update,每次Update后Resource Object的Revision会更新,因此每次Update完成后,需要重新Get后才能再次Update,否则会报错。
生命周期绑定
上面描述了创建/更新Unit时如何同步Apply更新到own resources,那如何保证Unit生命周期结束时(删除),own resources跟随一起结束呢?
实现的方式非常简单,K8s已经替我们实现了,只需要给own resource加上一组特殊的标记即可。
还是以StatefulSet举例,回顾上面的OwnStatefulSet.MakeOwnResource()
方法,你会看到这么几行代码:
func (ownStatefulSet *OwnStatefulSet) MakeOwnResource(instance *Unit, logger logr.Logger,
scheme *runtime.Scheme) (interface{}, error) {
...
// add ControllerReference for sts,the owner is Unit object
if err := controllerutil.SetControllerReference(instance, sts, scheme); err != nil {
msg := fmt.Sprintf("set controllerReference for StatefulSet %s/%s failed", instance.Namespace, instance.Name)
logger.Error(err, msg)
return nil, err
}
...
}
controllerutil.SetControllerReference(OwnerObj, OwnObj, scheme)
方法,可以为被管理的own resource实例加上控制来源描述,这样,它便可与所属对象实现生命周期的绑定了。
这个方法在K8s build-in资源中也多处使用,例如StatefulSet用来管理Pod。找一个被Sts管理的pod实例来看看:
加上这个ownerReferences描述之后,owner 删除前,也会同步删除own resources.
当然,如果在删除owner时希望非级联删除,可以在kubectl命令末尾追加--cascade=false
参数。
PreDelete操作
在上面添加了SetControllerReference的步骤后,默认的PreDelete策略是: 在删除Owner之前,先确保删除所有的Own resources,如果放任删除干净了,即使是local cache也找不到资源的信息。如果需要在此之前实现自定义的删除前操作,添加额外的自定义PreDelete钩子,可以按下面的方式实现。
PreDelete钩子
使用controller GC的Finalizer终结器机制,可以在Delete之前加入PreDelete钩子:
翻译一下,如果资源对象被直接删除,就无法再读取任何被删除对象的信息,这就会导致后续的清理工作因为信息不足无法进行,Finalizer字段设计来处理这种情况:
- Finalizer本身只是一串随机字符串标识,controller负责添加它和删除它。
- 添加了Finalizer之后,delete操作就会变成update操作,即为对象加上deletionTimestamp时间戳
- Finalizer已主动清空(视为清理后续的任务已处理完成)之后,当前时间大于deletionTimestamp,就会开始执行gc
显而易见,PreDelete钩子要放在主动清除Finalizer之前。来看代码:
controllers/unit_controller.go:98
// 2. 删除操作
// 如果资源对象被直接删除,就无法再读取任何被删除对象的信息,这就会导致后续的清理工作因为信息不足无法进行,Finalizer字段设计来处理这种情况:
// 2.1 当资源对象 Finalizer字段不为空时,delete操作就会变成update操作,即为对象加上deletionTimestamp时间戳
// 2.2 当 当前时间在deletionTimestamp时间之后,且Finalizer已清空(视为清理后续的任务已处理完成)的情况下,就会gc此对象了
myFinalizerName := "storage.finalizers.tutorial.kubebuilder.io"
//orphanFinalizerName := "orphan"
// 2.1 DeletionTimestamp 时间戳为空,代表着当前对象不处于被删除的状态,为了开启Finalizer机制,先给它加上一段Finalizers,内容随机非空字符串即可
if instance.ObjectMeta.DeletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object. This is equivalent
// registering our finalizer.
if !containsString(instance.ObjectMeta.Finalizers, myFinalizerName) {
instance.ObjectMeta.Finalizers = append(instance.ObjectMeta.Finalizers, myFinalizerName)
if err := r.Update(ctx, instance); err != nil {
r.Log.Error(err, "Add Finalizers error", instance.Namespace, instance.Name)
return ctrl.Result{}, err
}
}
} else {
// 2.2 DeletionTimestamp不为空,说明对象已经开始进入删除状态了,执行自己的删除步骤后续的逻辑,并清除掉自己的finalizer字段,等待自动gc
if containsString(instance.ObjectMeta.Finalizers, myFinalizerName) {
// 在删除owner resource之前,先执行自定义的预删除步骤,例如删除owner resource
if err := r.PreDelete(instance); err != nil {
// if fail to delete the external dependency here, return with error
// so that it can be retried
return ctrl.Result{}, err
}
// 移出掉自定义的Finalizers,这样当Finalizers为空时,gc就会正式开始了
instance.ObjectMeta.Finalizers = removeString(instance.ObjectMeta.Finalizers, myFinalizerName)
if err := r.Update(ctx, instance); err != nil {
return ctrl.Result{}, err
}
}
// Stop reconciliation as the item is being deleted
return ctrl.Result{}, nil
}
// Unit pre delete logic
func (r *UnitReconciler) PreDelete(instance *customv1.Unit) error {
// 特别说明,own resource加上了ControllerReference之后,owner resource gc删除前,会先自动删除它的所有
// own resources,因此绑定ControllerReference后无需再特别处理删除own resource。
// 这里留空出来,是为了如果有自定义的pre delete逻辑的需要,可在这里实现。
return nil
}
// Helper functions to check and remove string from a slice of strings.
func containsString(slice []string, s string) bool {
for _, item := range slice {
if item == s {
return true
}
}
return false
}
func removeString(slice []string, s string) (result []string) {
for _, item := range slice {
if item == s {
continue
}
result = append(result, item)
}
return
}
如此,在PreDelete()方法内部,就可以实现自定义的PreDelete钩子的逻辑了。
总结
控制器逻辑的核心,还是围绕着owner和own resources之间来进行的,在这个过程中,尽量减少写请求的频率,同时可以充分利用k8s现有的各种生命周期管理机制。