kavin

Kubernetes中自定义Controller

kavin 运维技术 2022-11-18 476浏览 0

Kubernetes中自定义Controller

大家好,我是乔克。

在Kubernetes中,Pod是最小的调度单元,它由各种各样的Controller管理,比如ReplicaSet Controller,Deployment Controller等。

Kubernetes内置了许多Controller,这些Controller能满足80%的业务需求,但是企业里也难免需要自定义Controller来适配自己的业务需求。

网上自定义Controller的文章很多,基本都差不多。俗话说:光说不练假把式,本篇文章主要是自己的一个实践归档总结,如果对你有帮助,可以一键三连!

本文主要从以下几个方面进行介绍,其中包括理论部分和具体实践部分。

Kubernetes中自定义Controller

Controller的实现逻辑

当我们向kube-apiserver提出创建一个Deployment需求的时候,首先是会把这个需求存储到Etcd中,如果这时候没有Controller的话,这条数据仅仅是存在Etcd中,并没有产生实际的作用。

所以就有了Deployment Controller,它实时监听kube-apiserver中的Deployment对象,如果对象有增加、删除、修改等变化,它就会做出相应的相应处理,如下

//pkg/controller/deployment/deployment_controller.go121行
.....
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc:dc.addDeployment,
UpdateFunc:dc.updateDeployment,
//Thiswillenterthesyncloopandno-op,becausethedeploymenthasbeendeletedfromthestore.
DeleteFunc:dc.deleteDeployment,
})
......

其实现的逻辑图如下(图片来自网络):

Kubernetes中自定义Controller

可以看到图的上半部分都由client-go实现了,下半部分才是我们具体需要去处理的。

client-go主要包含Reflector、Informer、Indexer三个组件。

  • Reflector会List&Watch kube-apiserver中的特定资源,然后会把变化的资源放入Delta FIFO队列中。
  • Informer会从Delta FIFO队列中拿取对象交给相应的HandleDeltas。
  • Indexer会将对象存储到缓存中。

上面部分不需要我们去开发,我们主要关注下半部分。

当把数据交给Informer的回调函数HandleDeltas后,Distribute会将资源对象分发到具体的处理函数,这些处理函数通过一系列判断过后,把满足需求的对象放入Workqueue中,然后再进行后续的处理。

code-generator介绍

上一节说到我们只需要去实现具体的业务需求,这是为什么呢?主要是因为kubernetes为我们提供了code-generator【1】这样的代码生成器工具,可以通过它自动生成客户端访问的一些代码,比如Informer、ClientSet等。

code-generator提供了以下工具为Kubernetes中的资源生成代码:

  • deepcopy-gen:生成深度拷贝方法,为每个 T 类型生成 func (t* T) DeepCopy() *T 方法,API 类型都需要实现深拷贝
  • client-gen:为资源生成标准的 clientset
  • informer-gen:生成 informer,提供事件机制来响应资源的事件
  • lister-gen:生成 Lister**,**为 get 和 list 请求提供只读缓存层(通过 indexer 获取)

如果需要自动生成,就需要在代码中加入对应格式的配置,如

Kubernetes中自定义Controller

其中:

  • // +genclient表示需要创建client
  • // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object表示在需要实现k8s.io/apimachinery/pkg/runtime.Object这个接口

除此还有更多的用法,可以参考Kubernetes Deep Dive: Code Generation for CustomResources【2】进行学习。

CRD介绍

CRD全称CustomResourceDefinition,中文简称自定义资源,上面说的Controller主要就是用来管理自定义的资源。

我们可以通过下面命令来查看当前集群中使用了哪些CRD,如下:

#kubectlgetcrd
NAMECREATEDAT
ackalertrules.alert.alibabacloud.com2021-06-15T02:19:59Z
alertmanagers.monitoring.coreos.com2019-12-12T12:50:00Z
aliyunlogconfigs.log.alibabacloud.com2019-12-02T10:15:02Z
apmservers.apm.k8s.elastic.co2020-09-14T01:52:53Z
batchreleases.alicloud.com2019-12-02T10:15:53Z
beats.beat.k8s.elastic.co2020-09-14T01:52:53Z
chaosblades.chaosblade.io2021-06-15T02:30:54Z
elasticsearches.elasticsearch.k8s.elastic.co2020-09-14T01:52:53Z
enterprisesearches.enterprisesearch.k8s.elastic.co2020-09-14T01:52:53Z
globaljobs.jobs.aliyun.com2020-04-26T14:40:53Z
kibanas.kibana.k8s.elastic.co2020-09-14T01:52:54Z
prometheuses.monitoring.coreos.com2019-12-12T12:50:01Z
prometheusrules.monitoring.coreos.com2019-12-12T12:50:02Z
servicemonitors.monitoring.coreos.com2019-12-12T12:50:03Z

但是仅仅是创建一个CRD对象是不够的,因为它是静态的,创建过后仅仅是保存在Etcd中,如果需要其有意义,就需要Controller配合。

创建CRD的例子如下:

apiVersion:apiextensions.k8s.io/v1
kind:CustomResourceDefinition
metadata:
#name必须匹配下面的spec字段:<plural>.<group>
name:students.coolops.io
spec:
#group名用于RESTAPI中的定义:/apis/<group>/<version>
group:coolops.io
#列出自定义资源的所有API版本
versions:
-name:v1#版本名称,比如v1、v1beta1
served:true#是否开启通过RESTAPIs访问`/apis/<group>/<version>/...`
storage:true#必须将一个且只有一个版本标记为存储版本
schema:#定义自定义对象的声明规范
openAPIV3Schema:
type:object
properties:
spec:
type:object
properties:
name:
type:string
school:
type:string
scope:Namespaced#定义作用范围:Namespaced(命名空间级别)或者Cluster(整个集群)
names:
plural:students#plural名字用于RESTAPI中的定义:/apis/<group>/<version>/<plural>
shortNames:#shortNames相当于缩写形式
-stu
kind:Student#kind是sigular的一个驼峰形式定义,在资源清单中会使用
singular:student#singular名称用于CLI操作或显示的一个别名

具体演示

本来准备根据官方的demo【3】进行讲解,但是感觉有点敷衍,而且这类教程网上一大堆,所以就准备自己实现一个数据库管理的一个Controller。

因为是演示怎么开发Controller,所以功能不会复杂,主要的功能是:

  • 创建数据库实例
  • 删除数据库实例
  • 更新数据库实例

开发环境说明

本次实验环境如下:

Kubernetes中自定义Controller

创建CRD

CRD是基础,Controller主要是为CRD服务的,所以我们要先定义好CRD资源,便于开发。

apiVersion:apiextensions.k8s.io/v1
kind:CustomResourceDefinition
metadata:
name:databasemanagers.coolops.cn
spec:
group:coolops.cn
versions:
-name:v1alpha1
served:true
storage:true
schema:
openAPIV3Schema:
type:object
properties:
spec:
type:object
properties:
deploymentName:
type:strin
replicas:
type:integer
minimum:1
maximum:10
dbtype:
type:string
status:
type:object
properties:
availableReplicas:
type:integer
names:
kind:DatabaseManager
plural:databasemanagers
singular:databasemanager
shortNames:
-dm
scope:Namespaced

创建CRD,检验是否能创建成功。

#kubectlapply-fcrd.yaml
customresourcedefinition.apiextensions.k8s.io/databasemanagers.coolops.cncreated
#kubectlgetcrd|grepdatabasemanagers
databasemanagers.coolops.cn2021-11-22T02:31:29Z

自定义一个测试用例,如下:

apiVersion:coolops.cn/v1alpha1
kind:DatabaseManager
metadata:
name:example-mysql
spec:
dbtype:"mysql"
deploymentName:"example-mysql"
replicas:1

创建后进行查看:

#kubectlapply-fexample-mysql.yaml
databasemanager.coolops.cn/example-mysqlcreated
#kubectlgetdm
NAMEAGE
example-mysql9s

不过现在仅仅是创建了一个静态数据,并没有任何实际的应用,下面来编写Controller来管理这个CRD。

开发Controller

项目地址:https://gitee.com/coolops/database-manager-controller

自动生成代码

1、创建项目目录database-manager-controller,并进行go mod 初始化

#mkdirdatabase-manager-controller
#cddatabase-manager-controller
#gomodinit

2、创建源码包目录pkg/apis/databasemanager

#mkdirpkg/apis/databasemanager-p
#cdpkg/apis/databasemanager

3、在pkg/apis/databasemanager目录下创建register.go文件,并写入一下内容

packagedatabasemanager

//GroupNameisthegroupfordatabasemanager
const(
GroupName="coolops.cn"
)

4、在pkg/apis/databasemanager目录下创建v1alpha1目录,进行版本管理

#mkdirv1alpha1
#cdv1alpha1

5、在v1alpha1目录下创建doc.go文件,并写入以下内容

//+k8s:deepcopy-gen=package
//+groupName=coolops.cn

//Packagev1alpha1isthev1alpha1versionoftheAPI
packagev1alpha1

其中// +k8s:deepcopy-gen=package和// +groupName=coolops.cn都是为了自动生成代码而写的配置。

6、在v1alpha1目录下创建type.go文件,并写入以下内容

packagev1alpha1

importmetav1"k8s.io/apimachinery/pkg/apis/meta/v1"

//+genclient
//+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

typeDatabaseManagerstruct{
metav1.TypeMeta`json:",inline"`
metav1.ObjectMeta`json:"metadata,omitempty"`
SpecDatabaseManagerSpec`json:"spec"`
StatusDatabaseManagerStatus`json:"status"`
}

//DatabaseManagerSpec期望状态
typeDatabaseManagerSpecstruct{
DeploymentNamestring`json:"deploymentName"`
Replicas*int32`json:"replicas"`
Dbtypestring`json:"dbtype"`
}

//DatabaseManagerStatus当前状态
typeDatabaseManagerStatusstruct{
AvailableReplicasint32`json:"availableReplicas"`
}

//+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

//DatabaseManagerListisalistofDatabaseManagerListresources
typeDatabaseManagerListstruct{
metav1.TypeMeta`json:",inline"`
metav1.ListMeta`json:"metadata"`

Items[]DatabaseManager`json:"items"`
}

type.go主要定义我们的资源类型。

7、在v1alpha1目录下创建register.go文件,并写入以下内容

packagev1alpha1

import(
dbcontroller"database-manager-controller/pkg/apis/databasemanager"
metav1"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)

//SchemeGroupVersionisgroupversionusedtoregistertheseobjects
varSchemeGroupVersion=schema.GroupVersion{Group:dbcontroller.GroupName,Version:dbcontroller.Version}

//KindtakesanunqualifiedkindandreturnsbackaGroupqualifiedGroupKind
funcKind(kindstring)schema.GroupKind{
returnSchemeGroupVersion.WithKind(kind).GroupKind()
}

//ResourcetakesanunqualifiedresourceandreturnsaGroupqualifiedGroupResource
funcResource(resourcestring)schema.GroupResource{
returnSchemeGroupVersion.WithResource(resource).GroupResource()
}

var(
//SchemeBuilderinitializesaschemebuilder
SchemeBuilder=runtime.NewSchemeBuilder(addKnownTypes)
//AddToSchemeisaglobalfunctionthatregistersthisAPIgroup&versiontoascheme
AddToScheme=SchemeBuilder.AddToScheme
)

//AddsthelistofknowntypestoScheme.
funcaddKnownTypes(scheme*runtime.Scheme)error{
scheme.AddKnownTypes(SchemeGroupVersion,
&DatabaseManager{},
&DatabaseManagerList{},
)
metav1.AddToGroupVersion(scheme,SchemeGroupVersion)
returnnil
}

register.go的作用是通过addKnownTypes方法使得client可以知道DatabaseManager类型的API对象。

至此,自动生成代码的准备工作完成了,目前的代码目录结构如下:

#tree.
.
├──artifacts
│└──database-manager
│├──crd.yaml
│└──example-mysql.yaml
├──go.mod
├──go.sum
├──LICENSE
├──pkg
│└──apis
│└──databasemanager
│├──register.go
│└──v1alpha1
│├──doc.go
│├──register.go
│└──type.go

接下里就使用code-generator进行代码自动生成了。

8、创建生成代码的脚本

以下代码主要参考sample-controller【3】

(1)在项目根目录下,创建hack目录,代码生成的脚本配置在该目录下

#mkdirhack&&cdhack

(2)创建tools.go文件,添加 code-generator 依赖

//go:buildtools
//+buildtools

//Thispackageimportsthingsrequiredbybuildscripts,toforce`gomod`toseethemasdependencies
packagetools

import_"k8s.io/code-generator"

(3)创建update-codegen.sh文件,用来生成代码

#!/usr/bin/envbash

set-oerrexit
set-onounset
set-opipefail

SCRIPT_ROOT=$(dirname"${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${CODEGEN_PKG:-$(cd"${SCRIPT_ROOT}";ls-d-1./vendor/k8s.io/code-generator2>/dev/null||echo../code-generator)}

#generatethecodewith:
#--output-basebecausethisscriptshouldalsobeabletoruninsidethevendordirof
#k8s.io/kubernetes.Theoutput-baseisneededforthegeneratorstooutputintothevendordir
#insteadofthe$GOPATHdirectly.Fornormalprojectsthiscanbedropped.
bash"${CODEGEN_PKG}"/generate-groups.sh"deepcopy,client,informer,lister"\
database-manager-controller/pkg/clientdatabase-manager-controller/pkg/apis\
databasemanager:v1alpha1\
--output-base"$(dirname"${BASH_SOURCE[0]}")/../.."\
--go-header-file"${SCRIPT_ROOT}"/hack/boilerplate.go.txt

#Touseyourownboilerplatetextappend:
#--go-header-file"${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt

其中以下代码段根据实际情况进行修改。

bash"${CODEGEN_PKG}"/generate-groups.sh"deepcopy,client,informer,lister"\
database-manager-controller/pkg/clientdatabase-manager-controller/pkg/apis\
databasemanager:v1alpha1\
--output-base"$(dirname"${BASH_SOURCE[0]}")/../.."\
--go-header-file"${SCRIPT_ROOT}"/hack/boilerplate.go.txt

(4)创建verify-codegen.sh文件,主要用于校验生成的代码是否为最新的

#!/usr/bin/envbash

set-oerrexit
set-onounset
set-opipefail

SCRIPT_ROOT=$(dirname"${BASH_SOURCE[0]}")/..

DIFFROOT="${SCRIPT_ROOT}/pkg"
TMP_DIFFROOT="${SCRIPT_ROOT}/_tmp/pkg"
_tmp="${SCRIPT_ROOT}/_tmp"

cleanup(){
rm-rf"${_tmp}"
}
trap"cleanup"EXITSIGINT

cleanup

mkdir-p"${TMP_DIFFROOT}"
cp-a"${DIFFROOT}"/*"${TMP_DIFFROOT}"

"${SCRIPT_ROOT}/hack/update-codegen.sh"
echo"diffing${DIFFROOT}againstfreshlygeneratedcodegen"
ret=0
diff-Naupr"${DIFFROOT}""${TMP_DIFFROOT}"||ret=$?
cp-a"${TMP_DIFFROOT}"/*"${DIFFROOT}"
if[[$ret-eq0]]
then
echo"${DIFFROOT}uptodate."
else
echo"${DIFFROOT}isoutofdate.Pleaserunhack/update-codegen.sh"
exit1
fi

(5)创建boilerplate.go.txt,主要用于为代码添加开源协议

/*
CopyrightTheKubernetesAuthors.

LicensedundertheApacheLicense,Version2.0(the"License");
youmaynotusethisfileexceptincompliancewiththeLicense.
YoumayobtainacopyoftheLicenseat

http://www.apache.org/licenses/LICENSE-2.0

Unlessrequiredbyapplicablelaworagreedtoinwriting,software
distributedundertheLicenseisdistributedonan"ASIS"BASIS,
WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied.
SeetheLicenseforthespecificlanguagegoverningpermissionsand
limitationsundertheLicense.
*/

(6)配置go vendor依赖目录

从update-codegen.sh脚本可以看到该代码生成脚本是利用vendor目录下的依赖进行的,我们项目本身没有配置,执行以下命令进行创建。

#gomodvendor

(7)在项目根目录下执行脚本生成代码

#chmod+xhack/update-codegen.sh
#./hack/update-codegen.sh
Generatingdeepcopyfuncs
Generatingclientsetfordatabasemanager:v1alpha1atdatabase-manager-controller/pkg/client/clientset
Generatinglistersfordatabasemanager:v1alpha1atdatabase-manager-controller/pkg/client/listers
Generatinginformersfordatabasemanager:v1alpha1atdatabase-manager-controller/pkg/client/informers

然后新的目录结构如下:

#treepkg/
pkg/
├──apis
│└──databasemanager
│├──register.go
│└──v1alpha1
│├──doc.go
│├──register.go
│├──type.go
│└──zz_generated.deepcopy.go
└──client
├──clientset
│└──versioned
│├──clientset.go
│├──doc.go
│├──fake
││├──clientset_generated.go
││├──doc.go
││└──register.go
│├──scheme
││├──doc.go
││└──register.go
│└──typed
│└──databasemanager
│└──v1alpha1
│├──databasemanager_client.go
│├──databasemanager.go
│├──doc.go
│├──fake
││├──doc.go
││├──fake_databasemanager_client.go
││└──fake_databasemanager.go
│└──generated_expansion.go
├──informers
│└──externalversions
│├──databasemanager
││├──interface.go
││└──v1alpha1
││├──databasemanager.go
││└──interface.go
│├──factory.go
│├──generic.go
│└──internalinterfaces
│└──factory_interfaces.go
└──listers
└──databasemanager
└──v1alpha1
├──databasemanager.go
└──expansion_generated.go

Controller开发

上面已经完成了自动代码的生成,生成了informer、lister、clientset的代码,下面就开始编写真正的Controller功能了。

我们需要实现的功能是:

  • 创建数据库实例
  • 更新数据库实例
  • 删除数据库实例

(1)在代码根目录创建controller.go文件,编写如下内容

packagemain

import(
"context"
dbmanagerv1"database-manager-controller/pkg/apis/databasemanager/v1alpha1"
clientset"database-manager-controller/pkg/client/clientset/versioned"
dbmanagerscheme"database-manager-controller/pkg/client/clientset/versioned/scheme"
informers"database-manager-controller/pkg/client/informers/externalversions/databasemanager/v1alpha1"
listers"database-manager-controller/pkg/client/listers/databasemanager/v1alpha1"
"fmt"
"github.com/golang/glog"
appsv1"k8s.io/api/apps/v1"
corev1"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
utilruntime"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsinformers"k8s.io/client-go/informers/apps/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1"k8s.io/client-go/kubernetes/typed/core/v1"
appslisters"k8s.io/client-go/listers/apps/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"time"
)

constcontrollerAgentName="database-manager-controller"

const(
//SuccessSynced用来表示事件被成功同步
SuccessSynced="Synced"
//MessageResourceSynced表示事件被触发时的消息信息
MessageResourceSynced="databasemanagersyncedsuccessfully"
MessageResourceExists="Resource%qalreadyexistsandisnotmanagedbyDatabaseManager"
ErrResourceExists="ErrResourceExists"
)

typeControllerstruct{
//kubeclientset是kubernetes的clientset
kubeclientsetkubernetes.Interface
//dbmanagerclientset是自己定义的APIGroup的clientset
dbmanagerclientsetclientset.Interface

//deploymentsListerlistdeployment对象
deploymentsListerappslisters.DeploymentLister
//deploymentsSynced同步deployment对象
deploymentsSyncedcache.InformerSynced

//dbmanagerListerlistdatabasemanager对象
dbmanagerListerlisters.DatabaseManagerLister
//dbmanagerSynced同步DatabaseManager对象
dbmanagerSyncedcache.InformerSynced

//workqueue限速的队列
workqueueworkqueue.RateLimitingInterface
//recorder事件记录器
recorderrecord.EventRecorder
}

//NewController初始化Controller
funcNewController(kubeclientsetkubernetes.Interface,dbmanagerclientsetclientset.Interface,
dbmanagerinformerinformers.DatabaseManagerInformer,deploymentInformerappsinformers.DeploymentInformer)*Controller{

utilruntime.Must(dbmanagerscheme.AddToScheme(scheme.Scheme))
glog.V(4).Info("Createeventbroadcaster")
//创建eventBroadcaster
eventBroadcaster:=record.NewBroadcaster()
//保存events到日志
eventBroadcaster.StartLogging(glog.Infof)
//上报events到APIServer
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface:kubeclientset.CoreV1().Events("")})
recorder:=eventBroadcaster.NewRecorder(scheme.Scheme,corev1.EventSource{Component:controllerAgentName})

//初始化Controller
controller:=&Controller{
kubeclientset:kubeclientset,
dbmanagerclientset:dbmanagerclientset,
deploymentsLister:deploymentInformer.Lister(),
deploymentsSynced:deploymentInformer.Informer().HasSynced,
dbmanagerLister:dbmanagerinformer.Lister(),
dbmanagerSynced:dbmanagerinformer.Informer().HasSynced,
workqueue:workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),"DatabaseManagers"),
recorder:recorder,
}

glog.Info("Startupeventhandlers")

//注册EventHandler,分别对于添加、更新、删除事件,具体的操作由事件对应的API将其加入队列中
dbmanagerinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc:controller.enqueueDatabaseManager,
UpdateFunc:func(oldObj,newObjinterface{}){
oldDBManager:=oldObj.(*dbmanagerv1.DatabaseManager)
newDBManager:=newObj.(*dbmanagerv1.DatabaseManager)
ifoldDBManager.ResourceVersion==newDBManager.ResourceVersion{
return
}
controller.enqueueDatabaseManager(newObj)
},
DeleteFunc:controller.enqueueDatabaseManagerForDelete,
})

//注册DeploymentEventHandler
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc:controller.handleObject,
UpdateFunc:func(old,newinterface{}){
newDepl:=new.(*appsv1.Deployment)
oldDepl:=old.(*appsv1.Deployment)
ifnewDepl.ResourceVersion==oldDepl.ResourceVersion{
//如果没有改变,就返回
return
}
controller.handleObject(new)
},
DeleteFunc:controller.handleObject,
})

returncontroller
}

//Run启动入口
func(c*Controller)Run(threadinessint,stopCh<-chanstruct{})error{
deferutilruntime.HandleCrash()
deferc.workqueue.ShuttingDown()

glog.Info("startcontroller,cachesync")
//同步缓存数据
ifok:=cache.WaitForCacheSync(stopCh,c.dbmanagerSynced);!ok{
returnfmt.Errorf("failedtowaitforcachestosync")
}

glog.Info("beginstartworkerthread")
//开启work线程
fori:=0;i<threadiness;i++{
gowait.Until(c.runWorker,time.Second,stopCh)
}

glog.Info("workerthreadstarted!!!!!!")
<-stopCh
glog.Info("workerthreadstopped!!!!!!")
returnnil
}

//runWorker是一个死循环,会一直调用processNextWorkItem从workqueue中取出数据
func(c*Controller)runWorker(){
forc.processNextWorkItem(){

}
}

//processNextWorkItem从workqueue中取出数据进行处理
func(c*Controller)processNextWorkItem()bool{
obj,shutdown:=c.workqueue.Get()

ifshutdown{
returnfalse
}

//Wewrapthisblockinafuncsowecandeferc.workqueue.Done.
err:=func(objinterface{})error{
deferc.workqueue.Done(obj)
varkeystring
varokbool

ifkey,ok=obj.(string);!ok{
c.workqueue.Forget(obj)
runtime.HandleError(fmt.Errorf("expectedstringinworkqueuebutgot%#v",obj))
returnnil
}
//在syncHandler中处理业务
iferr:=c.syncHandler(key);err!=nil{
returnfmt.Errorf("errorsyncing'%s':%s",key,err.Error())
}

c.workqueue.Forget(obj)
glog.Infof("Successfullysynced'%s'",key)
returnnil
}(obj)

iferr!=nil{
runtime.HandleError(err)
returntrue
}

returntrue
}

//syncHandler处理业务Handler
func(c*Controller)syncHandler(keystring)error{
//通过split得到namespace和name
namespace,name,err:=cache.SplitMetaNamespaceKey(key)
iferr!=nil{
runtime.HandleError(fmt.Errorf("invalidresourcekey:%s",key))
returnnil
}

//从缓存中取对象
dbManager,err:=c.dbmanagerLister.DatabaseManagers(namespace).Get(name)
iferr!=nil{
//如果DatabaseManager对象被删除了,就会走到这里
iferrors.IsNotFound(err){
glog.Infof("DatabaseManager对象被删除,请在这里执行实际的删除业务:%s/%s...",namespace,name)
returnnil
}

runtime.HandleError(fmt.Errorf("failedtolistDatabaseManagerby:%s/%s",namespace,name))

returnerr
}

glog.Infof("这里是databasemanager对象的期望状态:%#v...",dbManager)

//获取是否有deploymentName
deploymentName:=dbManager.Spec.DeploymentName

ifdeploymentName==""{
utilruntime.HandleError(fmt.Errorf("%s:deploymentName不能为空",key))
returnnil
}
//判断deployment是否在集群中存在
deployment,err:=c.deploymentsLister.Deployments(dbManager.Namespace).Get(deploymentName)
iferrors.IsNotFound(err){
//如果没有找到,就创建
deployment,err=c.kubeclientset.AppsV1().Deployments(dbManager.Namespace).Create(
context.TODO(),newDeployment(dbManager),metav1.CreateOptions{})
}

//如果Create或者Get都出错,则返回
iferr!=nil{
returnerr
}

//如果这个deployment不是由DatabaseManager控制,应该报告这个事件
if!metav1.IsControlledBy(deployment,dbManager){
msg:=fmt.Sprintf(MessageResourceExists,deployment.Name)
c.recorder.Event(dbManager,corev1.EventTypeWarning,ErrResourceExists,msg)
returnfmt.Errorf("%s",msg)
}

//如果replicas和期望的不等,则更新deployment
ifdbManager.Spec.Replicas!=nil&&*dbManager.Spec.Replicas!=*deployment.Spec.Replicas{
klog.V(4).Infof("DatabaseManager%sreplicas:%d,deploymentreplicas:%d",name,*dbManager.Spec.Replicas,*deployment.Spec.Replicas)
deployment,err=c.kubeclientset.AppsV1().Deployments(dbManager.Namespace).Update(context.TODO(),newDeployment(dbManager),metav1.UpdateOptions{})
}

iferr!=nil{
returnerr
}

//更新状态
err=c.updateDatabaseManagerStatus(dbManager,deployment)
iferr!=nil{
returnerr
}

glog.Infof("实际状态是从业务层面得到的,此处应该去的实际状态,与期望状态做对比,并根据差异做出响应(新增或者删除)")

c.recorder.Event(dbManager,corev1.EventTypeNormal,SuccessSynced,MessageResourceSynced)
returnnil
}

//updateDatabaseManagerStatus更新DatabaseManager状态
func(c*Controller)updateDatabaseManagerStatus(dbmanager*dbmanagerv1.DatabaseManager,deployment*appsv1.Deployment)error{
dbmanagerCopy:=dbmanager.DeepCopy()
dbmanagerCopy.Status.AvailableReplicas=deployment.Status.AvailableReplicas
_,err:=c.dbmanagerclientset.CoolopsV1alpha1().DatabaseManagers(dbmanager.Namespace).Update(context.TODO(),dbmanagerCopy,metav1.UpdateOptions{})
returnerr
}

func(c*Controller)handleObject(objinterface{}){
varobjectmetav1.Object
varokbool
ifobject,ok=obj.(metav1.Object);!ok{
tombstone,ok:=obj.(cache.DeletedFinalStateUnknown)
if!ok{
utilruntime.HandleError(fmt.Errorf("errordecodingobject,invalidtype"))
return
}
object,ok=tombstone.Obj.(metav1.Object)
if!ok{
utilruntime.HandleError(fmt.Errorf("errordecodingobjecttombstone,invalidtype"))
return
}
klog.V(4).Infof("Recovereddeletedobject'%s'fromtombstone",object.GetName())
}
klog.V(4).Infof("Processingobject:%s",object.GetName())
ifownerRef:=metav1.GetControllerOf(object);ownerRef!=nil{
//检查对象是否和DatabaseManager对象关联,如果不是就退出
ifownerRef.Kind!="DatabaseManager"{
return
}

dbmanage,err:=c.dbmanagerLister.DatabaseManagers(object.GetNamespace()).Get(ownerRef.Name)
iferr!=nil{
klog.V(4).Infof("ignoringorphanedobject'%s'ofdatabaseManager'%s'",object.GetSelfLink(),ownerRef.Name)
return
}

c.enqueueDatabaseManager(dbmanage)
return
}
}

funcnewDeployment(dbmanager*dbmanagerv1.DatabaseManager)*appsv1.Deployment{
varimagestring
varnamestring
switchdbmanager.Spec.Dbtype{
case"mysql":
image="mysql:5.7"
name="mysql"
case"mariadb":
image="mariadb:10.7.1"
name="mariadb"
default:
image="mysql:5.7"
name="mysql"
}

labels:=map[string]string{
"app":dbmanager.Spec.Dbtype,
}
return&appsv1.Deployment{
ObjectMeta:metav1.ObjectMeta{
Namespace:dbmanager.Namespace,
Name:dbmanager.Name,
OwnerReferences:[]metav1.OwnerReference{
*metav1.NewControllerRef(dbmanager,dbmanagerv1.SchemeGroupVersion.WithKind("DatabaseManager")),
},
},
Spec:appsv1.DeploymentSpec{
Replicas:dbmanager.Spec.Replicas,
Selector:&metav1.LabelSelector{MatchLabels:labels},
Template:corev1.PodTemplateSpec{
ObjectMeta:metav1.ObjectMeta{Labels:labels},
Spec:corev1.PodSpec{
Containers:[]corev1.Container{
{
Name:name,
Image:image,
},
},
},
},
},
}
}

//数据先放入缓存,再入队列
func(c*Controller)enqueueDatabaseManager(objinterface{}){
varkeystring
varerrerror
//将对象放入缓存
ifkey,err=cache.MetaNamespaceKeyFunc(obj);err!=nil{
runtime.HandleError(err)
return
}

//将key放入队列
c.workqueue.AddRateLimited(key)
}

//删除操作
func(c*Controller)enqueueDatabaseManagerForDelete(objinterface{}){
varkeystring
varerrerror
//从缓存中删除指定对象
key,err=cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
iferr!=nil{
runtime.HandleError(err)
return
}
//再将key放入队列
c.workqueue.AddRateLimited(key)
}

其主要逻辑和文章开头介绍的Controller实现逻辑一样,其中关键点在于:

  • 在NewController方法中,定义了DatabaseManager和Deployment对象的Event Handler,除了同步缓存外,还将对应的Key放入queue中。
  • 实际处理业务的方法是syncHandler,可以根据实际请求来编写代码以达到业务需求。

2、在项目根目录下创建main.go,编写入口函数

(1)编写处理系统信号量的Handler

这部分直接使用的demo中的代码【3】

(2)编写入口main函数

packagemain

import(
"flag"
"time"

kubeinformers"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"

clientset"database-manager-controller/pkg/client/clientset/versioned"
informers"database-manager-controller/pkg/client/informers/externalversions"
"database-manager-controller/pkg/signals"
)

var(
masterURLstring
kubeconfigstring
)

funcmain(){
//klog.InitFlags(nil)
flag.Parse()

//设置处理系统信号的Channel
stopCh:=signals.SetupSignalHandler()

//处理入参
cfg,err:=clientcmd.BuildConfigFromFlags(masterURL,kubeconfig)
iferr!=nil{
klog.Fatalf("Errorbuildingkubeconfig:%s",err.Error())
}

//初始化kubeClient
kubeClient,err:=kubernetes.NewForConfig(cfg)
iferr!=nil{
klog.Fatalf("Errorbuildingkubernetesclientset:%s",err.Error())
}

//初始化dbmanagerClient
dbmanagerClient,err:=clientset.NewForConfig(cfg)
iferr!=nil{
klog.Fatalf("Errorbuildingexampleclientset:%s",err.Error())
}

kubeInformerFactory:=kubeinformers.NewSharedInformerFactory(kubeClient,time.Second*30)
dbmanagerInformerFactory:=informers.NewSharedInformerFactory(dbmanagerClient,time.Second*30)

//初始化controller
controller:=NewController(kubeClient,dbmanagerClient,
dbmanagerInformerFactory.Coolops().V1alpha1().DatabaseManagers(),kubeInformerFactory.Apps().V1().Deployments())

//noticethatthereisnoneedtorunStartmethodsinaseparategoroutine.(i.e.gokubeInformerFactory.Start(stopCh)
//Startmethodisnon-blockingandrunsallregisteredinformersinadedicatedgoroutine.
kubeInformerFactory.Start(stopCh)
dbmanagerInformerFactory.Start(stopCh)

iferr=controller.Run(2,stopCh);err!=nil{
klog.Fatalf("Errorrunningcontroller:%s",err.Error())
}
}

funcinit(){
flag.StringVar(&kubeconfig,"kubeconfig","","Pathtoakubeconfig.Onlyrequiredifout-of-cluster.")
flag.StringVar(&masterURL,"master","","TheaddressoftheKubernetesAPIserver.Overridesanyvalueinkubeconfig.Onlyrequiredifout-of-cluster.")
}

测试Controller

1、在项目目录下添加一个Makefile

build:
echo"builddatabasemanagercontroller"
CGO_ENABLED=0GOOS=linuxGOARCH=amd64gobuild.

2、执行make build进行编译

#makebuild
echo"builddatabasemanagercontroller"
builddatabasemanagercontroller
CGO_ENABLED=0GOOS=linuxGOARCH=amd64gobuild.

然后会输出database-manager-controller一个二进制文件。

3、运行controller

#chmod+xdatabase-manager-controller
#./database-manager-controller-kubeconfig=$HOME/.kube/config-alsologtostderr=true
I112309:52:41.59572629173controller.go:81]Startupeventhandlers
I112309:52:41.59744829173controller.go:120]startcontroller,cachesync
I112309:52:41.69971629173controller.go:125]beginstartworkerthread
I112309:52:41.69973729173controller.go:130]workerthreadstarted!!!!!!

4、创建一个CRD测试用例,观察日志以及是否创建deployment

(1)测试样例如下

#catexample-mysql.yaml
apiVersion:coolops.cn/v1alpha1
kind:DatabaseManager
metadata:
name:example-mysql
spec:
dbtype:"mysql"
deploymentName:"mysql"
replicas:1

(2)执行以下命令进行创建,观察日志

#kubectlapply-fexample-mysql.yaml
databasemanager.coolops.cn/example-mysqlcreated

可以看到对于的deployment和pod已经创建,不过由于Deployment的配置没有配置完全,mysql没有正常启动。

Kubernetes中自定义Controller

我们其实是可以看到Controller获取到了事件。

Kubernetes中自定义Controller

如果我们删除对象,也可以从日志里正常看到响应。

Kubernetes中自定义Controller

总结

上面就是自定义Controller的整个开发过程,相对来说还是比较简单,大部分东西社区都做好了,我们只需要套模子,然后实现自己的逻辑就行。

整个过程主要是参考sample-controller【3】 ,现在简单整理如下:

  • 确定好目的,然后创建CRD,定义需要的对象
  • 按规定编写代码,定义好CRD所需要的type,然后使用code-generator进行代码自动生成,生成需要的informer、lister、clientset。
  • 编写Controller,实现具体的业务逻辑
  • 编写完成后就是验证,看看是否符合预期,根据具体情况再做进一步的调整

继续浏览有关 系统运维 的文章
发表评论