大家好,我是乔克。
在Kubernetes中,Pod是最小的调度单元,它由各种各样的Controller管理,比如ReplicaSet Controller,Deployment Controller等。
Kubernetes内置了许多Controller,这些Controller能满足80%的业务需求,但是企业里也难免需要自定义Controller来适配自己的业务需求。
网上自定义Controller的文章很多,基本都差不多。俗话说:光说不练假把式,本篇文章主要是自己的一个实践归档总结,如果对你有帮助,可以一键三连!
本文主要从以下几个方面进行介绍,其中包括理论部分和具体实践部分。
Controller的实现逻辑
当我们向kube-apiserver提出创建一个Deployment需求的时候,首先是会把这个需求存储到Etcd中,如果这时候没有Controller的话,这条数据仅仅是存在Etcd中,并没有产生实际的作用。
所以就有了Deployment Controller,它实时监听kube-apiserver中的Deployment对象,如果对象有增加、删除、修改等变化,它就会做出相应的相应处理,如下
//pkg/controller/deployment/deployment_controller.go121行 ..... dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc:dc.addDeployment, UpdateFunc:dc.updateDeployment, //Thiswillenterthesyncloopandno-op,becausethedeploymenthasbeendeletedfromthestore. DeleteFunc:dc.deleteDeployment, }) ......
其实现的逻辑图如下(图片来自网络):
可以看到图的上半部分都由client-go实现了,下半部分才是我们具体需要去处理的。
client-go主要包含Reflector、Informer、Indexer三个组件。
- Reflector会List&Watch kube-apiserver中的特定资源,然后会把变化的资源放入Delta FIFO队列中。
- Informer会从Delta FIFO队列中拿取对象交给相应的HandleDeltas。
- Indexer会将对象存储到缓存中。
上面部分不需要我们去开发,我们主要关注下半部分。
当把数据交给Informer的回调函数HandleDeltas后,Distribute会将资源对象分发到具体的处理函数,这些处理函数通过一系列判断过后,把满足需求的对象放入Workqueue中,然后再进行后续的处理。
code-generator介绍
上一节说到我们只需要去实现具体的业务需求,这是为什么呢?主要是因为kubernetes为我们提供了code-generator【1】这样的代码生成器工具,可以通过它自动生成客户端访问的一些代码,比如Informer、ClientSet等。
code-generator提供了以下工具为Kubernetes中的资源生成代码:
- deepcopy-gen:生成深度拷贝方法,为每个 T 类型生成 func (t* T) DeepCopy() *T 方法,API 类型都需要实现深拷贝
- client-gen:为资源生成标准的 clientset
- informer-gen:生成 informer,提供事件机制来响应资源的事件
- lister-gen:生成 Lister**,**为 get 和 list 请求提供只读缓存层(通过 indexer 获取)
如果需要自动生成,就需要在代码中加入对应格式的配置,如
其中:
- // +genclient表示需要创建client
- // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object表示在需要实现k8s.io/apimachinery/pkg/runtime.Object这个接口
除此还有更多的用法,可以参考Kubernetes Deep Dive: Code Generation for CustomResources【2】进行学习。
CRD介绍
CRD全称CustomResourceDefinition,中文简称自定义资源,上面说的Controller主要就是用来管理自定义的资源。
我们可以通过下面命令来查看当前集群中使用了哪些CRD,如下:
#kubectlgetcrd NAMECREATEDAT ackalertrules.alert.alibabacloud.com2021-06-15T02:19:59Z alertmanagers.monitoring.coreos.com2019-12-12T12:50:00Z aliyunlogconfigs.log.alibabacloud.com2019-12-02T10:15:02Z apmservers.apm.k8s.elastic.co2020-09-14T01:52:53Z batchreleases.alicloud.com2019-12-02T10:15:53Z beats.beat.k8s.elastic.co2020-09-14T01:52:53Z chaosblades.chaosblade.io2021-06-15T02:30:54Z elasticsearches.elasticsearch.k8s.elastic.co2020-09-14T01:52:53Z enterprisesearches.enterprisesearch.k8s.elastic.co2020-09-14T01:52:53Z globaljobs.jobs.aliyun.com2020-04-26T14:40:53Z kibanas.kibana.k8s.elastic.co2020-09-14T01:52:54Z prometheuses.monitoring.coreos.com2019-12-12T12:50:01Z prometheusrules.monitoring.coreos.com2019-12-12T12:50:02Z servicemonitors.monitoring.coreos.com2019-12-12T12:50:03Z
但是仅仅是创建一个CRD对象是不够的,因为它是静态的,创建过后仅仅是保存在Etcd中,如果需要其有意义,就需要Controller配合。
创建CRD的例子如下:
apiVersion:apiextensions.k8s.io/v1 kind:CustomResourceDefinition metadata: #name必须匹配下面的spec字段:<plural>.<group> name:students.coolops.io spec: #group名用于RESTAPI中的定义:/apis/<group>/<version> group:coolops.io #列出自定义资源的所有API版本 versions: -name:v1#版本名称,比如v1、v1beta1 served:true#是否开启通过RESTAPIs访问`/apis/<group>/<version>/...` storage:true#必须将一个且只有一个版本标记为存储版本 schema:#定义自定义对象的声明规范 openAPIV3Schema: type:object properties: spec: type:object properties: name: type:string school: type:string scope:Namespaced#定义作用范围:Namespaced(命名空间级别)或者Cluster(整个集群) names: plural:students#plural名字用于RESTAPI中的定义:/apis/<group>/<version>/<plural> shortNames:#shortNames相当于缩写形式 -stu kind:Student#kind是sigular的一个驼峰形式定义,在资源清单中会使用 singular:student#singular名称用于CLI操作或显示的一个别名
具体演示
本来准备根据官方的demo【3】进行讲解,但是感觉有点敷衍,而且这类教程网上一大堆,所以就准备自己实现一个数据库管理的一个Controller。
因为是演示怎么开发Controller,所以功能不会复杂,主要的功能是:
- 创建数据库实例
- 删除数据库实例
- 更新数据库实例
开发环境说明
本次实验环境如下:
创建CRD
CRD是基础,Controller主要是为CRD服务的,所以我们要先定义好CRD资源,便于开发。
apiVersion:apiextensions.k8s.io/v1 kind:CustomResourceDefinition metadata: name:databasemanagers.coolops.cn spec: group:coolops.cn versions: -name:v1alpha1 served:true storage:true schema: openAPIV3Schema: type:object properties: spec: type:object properties: deploymentName: type:strin replicas: type:integer minimum:1 maximum:10 dbtype: type:string status: type:object properties: availableReplicas: type:integer names: kind:DatabaseManager plural:databasemanagers singular:databasemanager shortNames: -dm scope:Namespaced
创建CRD,检验是否能创建成功。
#kubectlapply-fcrd.yaml customresourcedefinition.apiextensions.k8s.io/databasemanagers.coolops.cncreated #kubectlgetcrd|grepdatabasemanagers databasemanagers.coolops.cn2021-11-22T02:31:29Z
自定义一个测试用例,如下:
apiVersion:coolops.cn/v1alpha1 kind:DatabaseManager metadata: name:example-mysql spec: dbtype:"mysql" deploymentName:"example-mysql" replicas:1
创建后进行查看:
#kubectlapply-fexample-mysql.yaml databasemanager.coolops.cn/example-mysqlcreated #kubectlgetdm NAMEAGE example-mysql9s
不过现在仅仅是创建了一个静态数据,并没有任何实际的应用,下面来编写Controller来管理这个CRD。
开发Controller
项目地址:https://gitee.com/coolops/database-manager-controller
自动生成代码
1、创建项目目录database-manager-controller,并进行go mod 初始化
#mkdirdatabase-manager-controller #cddatabase-manager-controller #gomodinit
2、创建源码包目录pkg/apis/databasemanager
#mkdirpkg/apis/databasemanager-p #cdpkg/apis/databasemanager
3、在pkg/apis/databasemanager目录下创建register.go文件,并写入一下内容
packagedatabasemanager //GroupNameisthegroupfordatabasemanager const( GroupName="coolops.cn" )
4、在pkg/apis/databasemanager目录下创建v1alpha1目录,进行版本管理
#mkdirv1alpha1 #cdv1alpha1
5、在v1alpha1目录下创建doc.go文件,并写入以下内容
//+k8s:deepcopy-gen=package //+groupName=coolops.cn //Packagev1alpha1isthev1alpha1versionoftheAPI packagev1alpha1
其中// +k8s:deepcopy-gen=package和// +groupName=coolops.cn都是为了自动生成代码而写的配置。
6、在v1alpha1目录下创建type.go文件,并写入以下内容
packagev1alpha1 importmetav1"k8s.io/apimachinery/pkg/apis/meta/v1" //+genclient //+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object typeDatabaseManagerstruct{ metav1.TypeMeta`json:",inline"` metav1.ObjectMeta`json:"metadata,omitempty"` SpecDatabaseManagerSpec`json:"spec"` StatusDatabaseManagerStatus`json:"status"` } //DatabaseManagerSpec期望状态 typeDatabaseManagerSpecstruct{ DeploymentNamestring`json:"deploymentName"` Replicas*int32`json:"replicas"` Dbtypestring`json:"dbtype"` } //DatabaseManagerStatus当前状态 typeDatabaseManagerStatusstruct{ AvailableReplicasint32`json:"availableReplicas"` } //+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object //DatabaseManagerListisalistofDatabaseManagerListresources typeDatabaseManagerListstruct{ metav1.TypeMeta`json:",inline"` metav1.ListMeta`json:"metadata"` Items[]DatabaseManager`json:"items"` }
type.go主要定义我们的资源类型。
7、在v1alpha1目录下创建register.go文件,并写入以下内容
packagev1alpha1 import( dbcontroller"database-manager-controller/pkg/apis/databasemanager" metav1"k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" ) //SchemeGroupVersionisgroupversionusedtoregistertheseobjects varSchemeGroupVersion=schema.GroupVersion{Group:dbcontroller.GroupName,Version:dbcontroller.Version} //KindtakesanunqualifiedkindandreturnsbackaGroupqualifiedGroupKind funcKind(kindstring)schema.GroupKind{ returnSchemeGroupVersion.WithKind(kind).GroupKind() } //ResourcetakesanunqualifiedresourceandreturnsaGroupqualifiedGroupResource funcResource(resourcestring)schema.GroupResource{ returnSchemeGroupVersion.WithResource(resource).GroupResource() } var( //SchemeBuilderinitializesaschemebuilder SchemeBuilder=runtime.NewSchemeBuilder(addKnownTypes) //AddToSchemeisaglobalfunctionthatregistersthisAPIgroup&versiontoascheme AddToScheme=SchemeBuilder.AddToScheme ) //AddsthelistofknowntypestoScheme. funcaddKnownTypes(scheme*runtime.Scheme)error{ scheme.AddKnownTypes(SchemeGroupVersion, &DatabaseManager{}, &DatabaseManagerList{}, ) metav1.AddToGroupVersion(scheme,SchemeGroupVersion) returnnil }
register.go的作用是通过addKnownTypes方法使得client可以知道DatabaseManager类型的API对象。
至此,自动生成代码的准备工作完成了,目前的代码目录结构如下:
#tree. . ├──artifacts │└──database-manager │├──crd.yaml │└──example-mysql.yaml ├──go.mod ├──go.sum ├──LICENSE ├──pkg │└──apis │└──databasemanager │├──register.go │└──v1alpha1 │├──doc.go │├──register.go │└──type.go
接下里就使用code-generator进行代码自动生成了。
8、创建生成代码的脚本
以下代码主要参考sample-controller【3】
(1)在项目根目录下,创建hack目录,代码生成的脚本配置在该目录下
#mkdirhack&&cdhack
(2)创建tools.go文件,添加 code-generator 依赖
//go:buildtools //+buildtools //Thispackageimportsthingsrequiredbybuildscripts,toforce`gomod`toseethemasdependencies packagetools import_"k8s.io/code-generator"
(3)创建update-codegen.sh文件,用来生成代码
#!/usr/bin/envbash set-oerrexit set-onounset set-opipefail SCRIPT_ROOT=$(dirname"${BASH_SOURCE[0]}")/.. CODEGEN_PKG=${CODEGEN_PKG:-$(cd"${SCRIPT_ROOT}";ls-d-1./vendor/k8s.io/code-generator2>/dev/null||echo../code-generator)} #generatethecodewith: #--output-basebecausethisscriptshouldalsobeabletoruninsidethevendordirof #k8s.io/kubernetes.Theoutput-baseisneededforthegeneratorstooutputintothevendordir #insteadofthe$GOPATHdirectly.Fornormalprojectsthiscanbedropped. bash"${CODEGEN_PKG}"/generate-groups.sh"deepcopy,client,informer,lister"\ database-manager-controller/pkg/clientdatabase-manager-controller/pkg/apis\ databasemanager:v1alpha1\ --output-base"$(dirname"${BASH_SOURCE[0]}")/../.."\ --go-header-file"${SCRIPT_ROOT}"/hack/boilerplate.go.txt #Touseyourownboilerplatetextappend: #--go-header-file"${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt
其中以下代码段根据实际情况进行修改。
bash"${CODEGEN_PKG}"/generate-groups.sh"deepcopy,client,informer,lister"\ database-manager-controller/pkg/clientdatabase-manager-controller/pkg/apis\ databasemanager:v1alpha1\ --output-base"$(dirname"${BASH_SOURCE[0]}")/../.."\ --go-header-file"${SCRIPT_ROOT}"/hack/boilerplate.go.txt
(4)创建verify-codegen.sh文件,主要用于校验生成的代码是否为最新的
#!/usr/bin/envbash set-oerrexit set-onounset set-opipefail SCRIPT_ROOT=$(dirname"${BASH_SOURCE[0]}")/.. DIFFROOT="${SCRIPT_ROOT}/pkg" TMP_DIFFROOT="${SCRIPT_ROOT}/_tmp/pkg" _tmp="${SCRIPT_ROOT}/_tmp" cleanup(){ rm-rf"${_tmp}" } trap"cleanup"EXITSIGINT cleanup mkdir-p"${TMP_DIFFROOT}" cp-a"${DIFFROOT}"/*"${TMP_DIFFROOT}" "${SCRIPT_ROOT}/hack/update-codegen.sh" echo"diffing${DIFFROOT}againstfreshlygeneratedcodegen" ret=0 diff-Naupr"${DIFFROOT}""${TMP_DIFFROOT}"||ret=$? cp-a"${TMP_DIFFROOT}"/*"${DIFFROOT}" if[[$ret-eq0]] then echo"${DIFFROOT}uptodate." else echo"${DIFFROOT}isoutofdate.Pleaserunhack/update-codegen.sh" exit1 fi
(5)创建boilerplate.go.txt,主要用于为代码添加开源协议
/* CopyrightTheKubernetesAuthors. LicensedundertheApacheLicense,Version2.0(the"License"); youmaynotusethisfileexceptincompliancewiththeLicense. YoumayobtainacopyoftheLicenseat http://www.apache.org/licenses/LICENSE-2.0 Unlessrequiredbyapplicablelaworagreedtoinwriting,software distributedundertheLicenseisdistributedonan"ASIS"BASIS, WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied. SeetheLicenseforthespecificlanguagegoverningpermissionsand limitationsundertheLicense. */
(6)配置go vendor依赖目录
从update-codegen.sh脚本可以看到该代码生成脚本是利用vendor目录下的依赖进行的,我们项目本身没有配置,执行以下命令进行创建。
#gomodvendor
(7)在项目根目录下执行脚本生成代码
#chmod+xhack/update-codegen.sh #./hack/update-codegen.sh Generatingdeepcopyfuncs Generatingclientsetfordatabasemanager:v1alpha1atdatabase-manager-controller/pkg/client/clientset Generatinglistersfordatabasemanager:v1alpha1atdatabase-manager-controller/pkg/client/listers Generatinginformersfordatabasemanager:v1alpha1atdatabase-manager-controller/pkg/client/informers
然后新的目录结构如下:
#treepkg/ pkg/ ├──apis │└──databasemanager │├──register.go │└──v1alpha1 │├──doc.go │├──register.go │├──type.go │└──zz_generated.deepcopy.go └──client ├──clientset │└──versioned │├──clientset.go │├──doc.go │├──fake ││├──clientset_generated.go ││├──doc.go ││└──register.go │├──scheme ││├──doc.go ││└──register.go │└──typed │└──databasemanager │└──v1alpha1 │├──databasemanager_client.go │├──databasemanager.go │├──doc.go │├──fake ││├──doc.go ││├──fake_databasemanager_client.go ││└──fake_databasemanager.go │└──generated_expansion.go ├──informers │└──externalversions │├──databasemanager ││├──interface.go ││└──v1alpha1 ││├──databasemanager.go ││└──interface.go │├──factory.go │├──generic.go │└──internalinterfaces │└──factory_interfaces.go └──listers └──databasemanager └──v1alpha1 ├──databasemanager.go └──expansion_generated.go
Controller开发
上面已经完成了自动代码的生成,生成了informer、lister、clientset的代码,下面就开始编写真正的Controller功能了。
我们需要实现的功能是:
- 创建数据库实例
- 更新数据库实例
- 删除数据库实例
(1)在代码根目录创建controller.go文件,编写如下内容
packagemain import( "context" dbmanagerv1"database-manager-controller/pkg/apis/databasemanager/v1alpha1" clientset"database-manager-controller/pkg/client/clientset/versioned" dbmanagerscheme"database-manager-controller/pkg/client/clientset/versioned/scheme" informers"database-manager-controller/pkg/client/informers/externalversions/databasemanager/v1alpha1" listers"database-manager-controller/pkg/client/listers/databasemanager/v1alpha1" "fmt" "github.com/golang/glog" appsv1"k8s.io/api/apps/v1" corev1"k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1"k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" utilruntime"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" appsinformers"k8s.io/client-go/informers/apps/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1"k8s.io/client-go/kubernetes/typed/core/v1" appslisters"k8s.io/client-go/listers/apps/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "time" ) constcontrollerAgentName="database-manager-controller" const( //SuccessSynced用来表示事件被成功同步 SuccessSynced="Synced" //MessageResourceSynced表示事件被触发时的消息信息 MessageResourceSynced="databasemanagersyncedsuccessfully" MessageResourceExists="Resource%qalreadyexistsandisnotmanagedbyDatabaseManager" ErrResourceExists="ErrResourceExists" ) typeControllerstruct{ //kubeclientset是kubernetes的clientset kubeclientsetkubernetes.Interface //dbmanagerclientset是自己定义的APIGroup的clientset dbmanagerclientsetclientset.Interface //deploymentsListerlistdeployment对象 deploymentsListerappslisters.DeploymentLister //deploymentsSynced同步deployment对象 deploymentsSyncedcache.InformerSynced //dbmanagerListerlistdatabasemanager对象 dbmanagerListerlisters.DatabaseManagerLister //dbmanagerSynced同步DatabaseManager对象 dbmanagerSyncedcache.InformerSynced //workqueue限速的队列 workqueueworkqueue.RateLimitingInterface //recorder事件记录器 recorderrecord.EventRecorder } //NewController初始化Controller funcNewController(kubeclientsetkubernetes.Interface,dbmanagerclientsetclientset.Interface, dbmanagerinformerinformers.DatabaseManagerInformer,deploymentInformerappsinformers.DeploymentInformer)*Controller{ utilruntime.Must(dbmanagerscheme.AddToScheme(scheme.Scheme)) glog.V(4).Info("Createeventbroadcaster") //创建eventBroadcaster eventBroadcaster:=record.NewBroadcaster() //保存events到日志 eventBroadcaster.StartLogging(glog.Infof) //上报events到APIServer eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface:kubeclientset.CoreV1().Events("")}) recorder:=eventBroadcaster.NewRecorder(scheme.Scheme,corev1.EventSource{Component:controllerAgentName}) //初始化Controller controller:=&Controller{ kubeclientset:kubeclientset, dbmanagerclientset:dbmanagerclientset, deploymentsLister:deploymentInformer.Lister(), deploymentsSynced:deploymentInformer.Informer().HasSynced, dbmanagerLister:dbmanagerinformer.Lister(), dbmanagerSynced:dbmanagerinformer.Informer().HasSynced, workqueue:workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),"DatabaseManagers"), recorder:recorder, } glog.Info("Startupeventhandlers") //注册EventHandler,分别对于添加、更新、删除事件,具体的操作由事件对应的API将其加入队列中 dbmanagerinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc:controller.enqueueDatabaseManager, UpdateFunc:func(oldObj,newObjinterface{}){ oldDBManager:=oldObj.(*dbmanagerv1.DatabaseManager) newDBManager:=newObj.(*dbmanagerv1.DatabaseManager) ifoldDBManager.ResourceVersion==newDBManager.ResourceVersion{ return } controller.enqueueDatabaseManager(newObj) }, DeleteFunc:controller.enqueueDatabaseManagerForDelete, }) //注册DeploymentEventHandler deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc:controller.handleObject, UpdateFunc:func(old,newinterface{}){ newDepl:=new.(*appsv1.Deployment) oldDepl:=old.(*appsv1.Deployment) ifnewDepl.ResourceVersion==oldDepl.ResourceVersion{ //如果没有改变,就返回 return } controller.handleObject(new) }, DeleteFunc:controller.handleObject, }) returncontroller } //Run启动入口 func(c*Controller)Run(threadinessint,stopCh<-chanstruct{})error{ deferutilruntime.HandleCrash() deferc.workqueue.ShuttingDown() glog.Info("startcontroller,cachesync") //同步缓存数据 ifok:=cache.WaitForCacheSync(stopCh,c.dbmanagerSynced);!ok{ returnfmt.Errorf("failedtowaitforcachestosync") } glog.Info("beginstartworkerthread") //开启work线程 fori:=0;i<threadiness;i++{ gowait.Until(c.runWorker,time.Second,stopCh) } glog.Info("workerthreadstarted!!!!!!") <-stopCh glog.Info("workerthreadstopped!!!!!!") returnnil } //runWorker是一个死循环,会一直调用processNextWorkItem从workqueue中取出数据 func(c*Controller)runWorker(){ forc.processNextWorkItem(){ } } //processNextWorkItem从workqueue中取出数据进行处理 func(c*Controller)processNextWorkItem()bool{ obj,shutdown:=c.workqueue.Get() ifshutdown{ returnfalse } //Wewrapthisblockinafuncsowecandeferc.workqueue.Done. err:=func(objinterface{})error{ deferc.workqueue.Done(obj) varkeystring varokbool ifkey,ok=obj.(string);!ok{ c.workqueue.Forget(obj) runtime.HandleError(fmt.Errorf("expectedstringinworkqueuebutgot%#v",obj)) returnnil } //在syncHandler中处理业务 iferr:=c.syncHandler(key);err!=nil{ returnfmt.Errorf("errorsyncing'%s':%s",key,err.Error()) } c.workqueue.Forget(obj) glog.Infof("Successfullysynced'%s'",key) returnnil }(obj) iferr!=nil{ runtime.HandleError(err) returntrue } returntrue } //syncHandler处理业务Handler func(c*Controller)syncHandler(keystring)error{ //通过split得到namespace和name namespace,name,err:=cache.SplitMetaNamespaceKey(key) iferr!=nil{ runtime.HandleError(fmt.Errorf("invalidresourcekey:%s",key)) returnnil } //从缓存中取对象 dbManager,err:=c.dbmanagerLister.DatabaseManagers(namespace).Get(name) iferr!=nil{ //如果DatabaseManager对象被删除了,就会走到这里 iferrors.IsNotFound(err){ glog.Infof("DatabaseManager对象被删除,请在这里执行实际的删除业务:%s/%s...",namespace,name) returnnil } runtime.HandleError(fmt.Errorf("failedtolistDatabaseManagerby:%s/%s",namespace,name)) returnerr } glog.Infof("这里是databasemanager对象的期望状态:%#v...",dbManager) //获取是否有deploymentName deploymentName:=dbManager.Spec.DeploymentName ifdeploymentName==""{ utilruntime.HandleError(fmt.Errorf("%s:deploymentName不能为空",key)) returnnil } //判断deployment是否在集群中存在 deployment,err:=c.deploymentsLister.Deployments(dbManager.Namespace).Get(deploymentName) iferrors.IsNotFound(err){ //如果没有找到,就创建 deployment,err=c.kubeclientset.AppsV1().Deployments(dbManager.Namespace).Create( context.TODO(),newDeployment(dbManager),metav1.CreateOptions{}) } //如果Create或者Get都出错,则返回 iferr!=nil{ returnerr } //如果这个deployment不是由DatabaseManager控制,应该报告这个事件 if!metav1.IsControlledBy(deployment,dbManager){ msg:=fmt.Sprintf(MessageResourceExists,deployment.Name) c.recorder.Event(dbManager,corev1.EventTypeWarning,ErrResourceExists,msg) returnfmt.Errorf("%s",msg) } //如果replicas和期望的不等,则更新deployment ifdbManager.Spec.Replicas!=nil&&*dbManager.Spec.Replicas!=*deployment.Spec.Replicas{ klog.V(4).Infof("DatabaseManager%sreplicas:%d,deploymentreplicas:%d",name,*dbManager.Spec.Replicas,*deployment.Spec.Replicas) deployment,err=c.kubeclientset.AppsV1().Deployments(dbManager.Namespace).Update(context.TODO(),newDeployment(dbManager),metav1.UpdateOptions{}) } iferr!=nil{ returnerr } //更新状态 err=c.updateDatabaseManagerStatus(dbManager,deployment) iferr!=nil{ returnerr } glog.Infof("实际状态是从业务层面得到的,此处应该去的实际状态,与期望状态做对比,并根据差异做出响应(新增或者删除)") c.recorder.Event(dbManager,corev1.EventTypeNormal,SuccessSynced,MessageResourceSynced) returnnil } //updateDatabaseManagerStatus更新DatabaseManager状态 func(c*Controller)updateDatabaseManagerStatus(dbmanager*dbmanagerv1.DatabaseManager,deployment*appsv1.Deployment)error{ dbmanagerCopy:=dbmanager.DeepCopy() dbmanagerCopy.Status.AvailableReplicas=deployment.Status.AvailableReplicas _,err:=c.dbmanagerclientset.CoolopsV1alpha1().DatabaseManagers(dbmanager.Namespace).Update(context.TODO(),dbmanagerCopy,metav1.UpdateOptions{}) returnerr } func(c*Controller)handleObject(objinterface{}){ varobjectmetav1.Object varokbool ifobject,ok=obj.(metav1.Object);!ok{ tombstone,ok:=obj.(cache.DeletedFinalStateUnknown) if!ok{ utilruntime.HandleError(fmt.Errorf("errordecodingobject,invalidtype")) return } object,ok=tombstone.Obj.(metav1.Object) if!ok{ utilruntime.HandleError(fmt.Errorf("errordecodingobjecttombstone,invalidtype")) return } klog.V(4).Infof("Recovereddeletedobject'%s'fromtombstone",object.GetName()) } klog.V(4).Infof("Processingobject:%s",object.GetName()) ifownerRef:=metav1.GetControllerOf(object);ownerRef!=nil{ //检查对象是否和DatabaseManager对象关联,如果不是就退出 ifownerRef.Kind!="DatabaseManager"{ return } dbmanage,err:=c.dbmanagerLister.DatabaseManagers(object.GetNamespace()).Get(ownerRef.Name) iferr!=nil{ klog.V(4).Infof("ignoringorphanedobject'%s'ofdatabaseManager'%s'",object.GetSelfLink(),ownerRef.Name) return } c.enqueueDatabaseManager(dbmanage) return } } funcnewDeployment(dbmanager*dbmanagerv1.DatabaseManager)*appsv1.Deployment{ varimagestring varnamestring switchdbmanager.Spec.Dbtype{ case"mysql": image="mysql:5.7" name="mysql" case"mariadb": image="mariadb:10.7.1" name="mariadb" default: image="mysql:5.7" name="mysql" } labels:=map[string]string{ "app":dbmanager.Spec.Dbtype, } return&appsv1.Deployment{ ObjectMeta:metav1.ObjectMeta{ Namespace:dbmanager.Namespace, Name:dbmanager.Name, OwnerReferences:[]metav1.OwnerReference{ *metav1.NewControllerRef(dbmanager,dbmanagerv1.SchemeGroupVersion.WithKind("DatabaseManager")), }, }, Spec:appsv1.DeploymentSpec{ Replicas:dbmanager.Spec.Replicas, Selector:&metav1.LabelSelector{MatchLabels:labels}, Template:corev1.PodTemplateSpec{ ObjectMeta:metav1.ObjectMeta{Labels:labels}, Spec:corev1.PodSpec{ Containers:[]corev1.Container{ { Name:name, Image:image, }, }, }, }, }, } } //数据先放入缓存,再入队列 func(c*Controller)enqueueDatabaseManager(objinterface{}){ varkeystring varerrerror //将对象放入缓存 ifkey,err=cache.MetaNamespaceKeyFunc(obj);err!=nil{ runtime.HandleError(err) return } //将key放入队列 c.workqueue.AddRateLimited(key) } //删除操作 func(c*Controller)enqueueDatabaseManagerForDelete(objinterface{}){ varkeystring varerrerror //从缓存中删除指定对象 key,err=cache.DeletionHandlingMetaNamespaceKeyFunc(obj) iferr!=nil{ runtime.HandleError(err) return } //再将key放入队列 c.workqueue.AddRateLimited(key) }
其主要逻辑和文章开头介绍的Controller实现逻辑一样,其中关键点在于:
- 在NewController方法中,定义了DatabaseManager和Deployment对象的Event Handler,除了同步缓存外,还将对应的Key放入queue中。
- 实际处理业务的方法是syncHandler,可以根据实际请求来编写代码以达到业务需求。
2、在项目根目录下创建main.go,编写入口函数
(1)编写处理系统信号量的Handler
这部分直接使用的demo中的代码【3】
(2)编写入口main函数
packagemain import( "flag" "time" kubeinformers"k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" clientset"database-manager-controller/pkg/client/clientset/versioned" informers"database-manager-controller/pkg/client/informers/externalversions" "database-manager-controller/pkg/signals" ) var( masterURLstring kubeconfigstring ) funcmain(){ //klog.InitFlags(nil) flag.Parse() //设置处理系统信号的Channel stopCh:=signals.SetupSignalHandler() //处理入参 cfg,err:=clientcmd.BuildConfigFromFlags(masterURL,kubeconfig) iferr!=nil{ klog.Fatalf("Errorbuildingkubeconfig:%s",err.Error()) } //初始化kubeClient kubeClient,err:=kubernetes.NewForConfig(cfg) iferr!=nil{ klog.Fatalf("Errorbuildingkubernetesclientset:%s",err.Error()) } //初始化dbmanagerClient dbmanagerClient,err:=clientset.NewForConfig(cfg) iferr!=nil{ klog.Fatalf("Errorbuildingexampleclientset:%s",err.Error()) } kubeInformerFactory:=kubeinformers.NewSharedInformerFactory(kubeClient,time.Second*30) dbmanagerInformerFactory:=informers.NewSharedInformerFactory(dbmanagerClient,time.Second*30) //初始化controller controller:=NewController(kubeClient,dbmanagerClient, dbmanagerInformerFactory.Coolops().V1alpha1().DatabaseManagers(),kubeInformerFactory.Apps().V1().Deployments()) //noticethatthereisnoneedtorunStartmethodsinaseparategoroutine.(i.e.gokubeInformerFactory.Start(stopCh) //Startmethodisnon-blockingandrunsallregisteredinformersinadedicatedgoroutine. kubeInformerFactory.Start(stopCh) dbmanagerInformerFactory.Start(stopCh) iferr=controller.Run(2,stopCh);err!=nil{ klog.Fatalf("Errorrunningcontroller:%s",err.Error()) } } funcinit(){ flag.StringVar(&kubeconfig,"kubeconfig","","Pathtoakubeconfig.Onlyrequiredifout-of-cluster.") flag.StringVar(&masterURL,"master","","TheaddressoftheKubernetesAPIserver.Overridesanyvalueinkubeconfig.Onlyrequiredifout-of-cluster.") }
测试Controller
1、在项目目录下添加一个Makefile
build: echo"builddatabasemanagercontroller" CGO_ENABLED=0GOOS=linuxGOARCH=amd64gobuild.
2、执行make build进行编译
#makebuild echo"builddatabasemanagercontroller" builddatabasemanagercontroller CGO_ENABLED=0GOOS=linuxGOARCH=amd64gobuild.
然后会输出database-manager-controller一个二进制文件。
3、运行controller
#chmod+xdatabase-manager-controller #./database-manager-controller-kubeconfig=$HOME/.kube/config-alsologtostderr=true I112309:52:41.59572629173controller.go:81]Startupeventhandlers I112309:52:41.59744829173controller.go:120]startcontroller,cachesync I112309:52:41.69971629173controller.go:125]beginstartworkerthread I112309:52:41.69973729173controller.go:130]workerthreadstarted!!!!!!
4、创建一个CRD测试用例,观察日志以及是否创建deployment
(1)测试样例如下
#catexample-mysql.yaml apiVersion:coolops.cn/v1alpha1 kind:DatabaseManager metadata: name:example-mysql spec: dbtype:"mysql" deploymentName:"mysql" replicas:1
(2)执行以下命令进行创建,观察日志
#kubectlapply-fexample-mysql.yaml databasemanager.coolops.cn/example-mysqlcreated
可以看到对于的deployment和pod已经创建,不过由于Deployment的配置没有配置完全,mysql没有正常启动。
我们其实是可以看到Controller获取到了事件。
如果我们删除对象,也可以从日志里正常看到响应。
总结
上面就是自定义Controller的整个开发过程,相对来说还是比较简单,大部分东西社区都做好了,我们只需要套模子,然后实现自己的逻辑就行。
整个过程主要是参考sample-controller【3】 ,现在简单整理如下:
- 确定好目的,然后创建CRD,定义需要的对象
- 按规定编写代码,定义好CRD所需要的type,然后使用code-generator进行代码自动生成,生成需要的informer、lister、clientset。
- 编写Controller,实现具体的业务逻辑
- 编写完成后就是验证,看看是否符合预期,根据具体情况再做进一步的调整
转载请注明:IT运维空间 » 运维技术 » Kubernetes中自定义Controller
发表评论