大家好,我是华仔。
接触kubernetes已经4年多了,不过多是停留在能够使用,对其原理、源码不是很熟悉。对于平常执行的命令,它背后执行的流程、逻辑也不是很清楚。所以,最近打算去看看k8s各模块的源码。一来是加深对k8s各模块的理解和认识;二来是方便以后遇到问题好分析问题的根本原因,有理有据,则可以服人;再者后续跳槽也不怕被面试官的技术问题所难到了。那么今天,就来简单说一说pod创建的源码吧。文章有错误的地方还请指正,轻喷。首先,k8s的源码在github上即可获取。本次我看的是1.21.3。另外,很多翻译都是直译或翻译软件翻译的。请谅解。
正文
1、k8s源码中针对pod的增删改查是在源码包/pkg/kubelet/kubelet.go中的syncLoop()进行。如下所示:
//syncLoopisthemainloopforprocessingchanges.Itwatchesforchangesfrom //threechannels(file,apiserver,andhttp)andcreatesaunionofthem.For //anynewchangeseen,willrunasyncagainstdesiredstateandrunningstate.If //nochangesareseentotheconfiguration,willsynchronizethelastknowndesired //stateeverysync-frequencyseconds.Neverreturns. //syncLoop是处理更改的主循环。它感知来自三个channel(file,apiserver,http)的pod的变化,并且聚合它们。有任何的改变发生,将运行状态同步为期望状态。反之,则在每个同步周期内同步最后已知的期望状态。 func(kl*Kubelet)syncLoop(updates<-chankubetypes.PodUpdate,handlerSyncHandler){ klog.InfoS("Startingkubeletmainsyncloop")
在syncLoop()中则通过kl.syncLoopIteration()针对pod具体执行具体的操作。
kl.syncLoopMonitor.Store(kl.clock.Now()) if!kl.syncLoopIteration(updates,handler,syncTicker.C,housekeepingTicker.C,plegCh){ break }
2、在syncLoopIteration有几个重要的参数,如下所示:
//Arguments: //1.configCh:achanneltoreadconfigeventsfrom //2.handler:theSyncHandlertodispatchpodsto //3.syncCh:achanneltoreadperiodicsynceventsfrom //4.housekeepingCh:achanneltoreadhousekeepingeventsfrom //5.plegCh:achanneltoreadPLEGupdatesfrom //*configCh:dispatchthepodsfortheconfigchangetotheappropriate //handlercallbackfortheeventtype //*plegCh:updatetheruntimecache;syncpod //*syncCh:syncallpodswaitingforsync //*housekeepingCh:triggercleanupofpods //*healthmanager:syncpodsthathavefailedorinwhichoneormore //containershavefailedhealthchecks func(kl*Kubelet)syncLoopIteration(configCh<-chankubetypes.PodUpdate,handlerSyncHandler, syncCh<-chantime.Time,housekeepingCh<-chantime.Time,plegCh<-chan*pleg.PodLifecycleEvent)bool{ select{ caseu,open:=<-configCh: //Updatefromaconfigsource;dispatchittotherighthandler //callback. if!open{ klog.ErrorS(nil,"Updatechannelisclosed,exitingthesyncloop") returnfalse }
SyncHandler是一个interface。包含对pod常见操作的几个方法。该接口由kubelet来实现。如下所示:
//SyncHandlerisaninterfaceimplementedbyKubelet,fortestability #pod创建、更新、删除... typeSyncHandlerinterface{ HandlePodAdditions(pods[]*v1.Pod) HandlePodUpdates(pods[]*v1.Pod) HandlePodRemoves(pods[]*v1.Pod) HandlePodReconcile(pods[]*v1.Pod) HandlePodSyncs(pods[]*v1.Pod) HandlePodCleanups()error }
3、针对pod可进行的操作如下,每个操作都有对应的方法。比如ADD,就会去执行HandlePodAdditions方法
//TheseconstantsidentifythePodOperationsthatcanbemadeonapodconfiguration. const( //SETisthecurrentpodconfiguration. SETPodOperation=iota //ADDsignifiespodsthatarenewtothissource. ADD //DELETEsignifiespodsthataregracefullydeletedfromthissource. DELETE //REMOVEsignifiespodsthathavebeenremovedfromthissource. REMOVE //UPDATEsignifiespodshavebeenupdatedinthissource. UPDATE //RECONCILEsignifiespodsthathaveunexpectedstatusinthissource, //kubeletshouldreconcilestatuswiththissource. RECONCILE ) switchu.Op{ casekubetypes.ADD: klog.V(2).InfoS("SyncLoopADD","source",u.Source,"pods",format.Pods(u.Pods)) //Afterrestarting,kubeletwillgetallexistingpodsthrough //ADDasiftheyarenewpods.Thesepodswillthengothroughthe //admissionprocessand*may*berejected.Thiscanberesolved //oncewehavecheckpointing. handler.HandlePodAdditions(u.Pods)
4、HandlePodAdditions又是如何去执行创建pod的呢?主要有以下几个操作:
1.根据pod的创建时间进行排序 sort.Sort(sliceutils.PodsByCreationTime(pods)) 2.将pod添加到podmanager中.因为kubelet它会依赖这个podmanager作为期望状态的一个凭证。 如果一个在podmanager中无法查询,那么就意味着它已经被apiserver删除了,不再需要其他操作 //Alwaysaddthepodtothepodmanager.Kubeletreliesonthepod //managerasthesourceoftruthforthedesiredstate.Ifapoddoes //notexistinthepodmanager,itmeansthatithasbeendeletedin //theapiserverandnoaction(otherthancleanup)isrequired. kl.podManager.AddPod(pod) 3.判断pod是不是静态pod mirrorPod,_:=kl.podManager.GetMirrorPodByPod(pod) 4.通过dispatchWork分发任务 kl.dispatchWork(pod,kubetypes.SyncPodCreate,mirrorPod,start) 5.将pod加入到probemanager,即健康检查.包括startupprobe、livenessprobe、readinessprobe。 kl.probeManager.AddPod(pod)
dispatchWork又做了哪些事情呢?如下:
//Runthesyncinanasyncworker.在一个异步worker中执行同步 kl.podWorkers.UpdatePod(&UpdatePodOptions{ Pod:pod, MirrorPod:mirrorPod, UpdateType:syncType, OnCompleteFunc:func(errerror){ iferr!=nil{ metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start)) } }, })
那么UpdatePod()又做哪些事情呢?
//Creatinganewpodworkereithermeansthisisanewpod,orthatthe //kubeletjustrestarted.Ineithercasethekubeletiswillingtobelieve //thestatusofthepodforthefirstpodworkersync.Seecorresponding //commentinsyncPod. //创建一个新的podworker,意味着这是一个新的pod gofunc(){ deferruntime.HandleCrash() p.managePodLoop(podUpdates) }()
managePodLoop()去执行同步。
forupdate:=rangepodUpdates{ err:=func()error{ podUID:=update.Pod.UID //Thisisablockingcallthatwouldreturnonlyifthecache //hasanentryforthepodthatisnewerthanminRuntimeCache //Time.Thisensurestheworkerdoesn'tstartsyncinguntil //afterthecacheisatleastnewerthanthefinishedtimeof //theprevioussync. status,err:=p.podCache.GetNewerThan(podUID,lastSyncTime) iferr!=nil{ //Thisisthelegacyeventthrownbymanagepodloop //allothereventsarenowdispatchedfromsyncPodFn p.recorder.Eventf(update.Pod,v1.EventTypeWarning,events.FailedSync,"errordeterminingstatus:%v",err) returnerr } //这里去做同步 err=p.syncPodFn(syncPodOptions{ mirrorPod:update.MirrorPod, pod:update.Pod, podStatus:status, killPodOptions:update.KillPodOptions, updateType:update.UpdateType, }) lastSyncTime=time.Now() returnerr }()
5、最终调用到pkg/kubelet/kuberuntime/kuberuntime_manager.go中SyncPod()进行pod的创建
//SyncPodsyncstherunningpodintothedesiredpodbyexecutingfollowingsteps: //执行以下的步骤将运行的pod同步到期望的状态 //1.Computesandboxandcontainerchanges. //计算sanbox和container改变 //2.Killpodsandboxifnecessary. //如果有必要就删除podsandbox //3.Killanycontainersthatshouldnotberunning. //删除不需要运行的容器 //4.Createsandboxifnecessary. //需要的情况下创建sandbox //5.Createephemeralcontainers. //创建临时容器 //6.Createinitcontainers. //创建初始化容器 //7.Createnormalcontainers. //创建普通容器 func(m*kubeGenericRuntimeManager)SyncPod() //Step1:Computesandboxandcontainerchanges. podContainerChanges:=m.computePodActions(pod,podStatus) klog.V(3).InfoS("computePodActionsgotforpod","podActions",podContainerChanges,"pod",klog.KObj(pod)) ifpodContainerChanges.CreateSandbox{ ref,err:=ref.GetReference(legacyscheme.Scheme,pod) iferr!=nil{ klog.ErrorS(err,"Couldn'tmakeareftopod","pod",klog.KObj(pod)) } ifpodContainerChanges.SandboxID!=""{ m.recorder.Eventf(ref,v1.EventTypeNormal,events.SandboxChanged,"Podsandboxchanged,itwillbekilledandre-created.") }else{ klog.V(4).InfoS("SyncPodreceivednewpod,willcreateasandboxforit","pod",klog.KObj(pod)) } } //Step2:Killthepodifthesandboxhaschanged. ifpodContainerChanges.KillPod{ //Step3:killanyrunningcontainersinthispodwhicharenottokeep. forcontainerID,containerInfo:=rangepodContainerChanges.ContainersToKill{ klog.V(3).InfoS("Killingunwantedcontainerforpod","containerName",containerInfo.name,"containerID",containerID,"pod",klog.KObj(pod)) killContainerResult:=kubecontainer.NewSyncResult(kubecontainer.KillContainer,containerInfo.name) result.AddSyncResult(killContainerResult) iferr:=m.killContainer(pod,containerID,containerInfo.name,containerInfo.message,containerInfo.reason,nil);err!=nil{ killContainerResult.Fail(kubecontainer.ErrKillContainer,err.Error()) klog.ErrorS(err,"killContainerforpodfailed","containerName",containerInfo.name,"containerID",containerID,"pod",klog.KObj(pod)) return } //Step4:Createasandboxforthepodifnecessary. podSandboxID:=podContainerChanges.SandboxID ifpodContainerChanges.CreateSandbox{ varmsgstring varerrerror klog.V(4).InfoS("CreatingPodSandboxforpod","pod",klog.KObj(pod)) createSandboxResult:=kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox,format.Pod(pod)) result.AddSyncResult(createSandboxResult) podSandboxID,msg,err=m.createPodSandbox(pod,podContainerChanges.Attempt) //Step5:startephemeralcontainers //Thesearestarted"prior"toinitcontainerstoallowrunningephemeralcontainersevenwhenthere //areerrorsstartinganinitcontainer.Inpracticeinitcontainerswillstartfirstsinceephemeral //containerscannotbespecifiedonpodcreation. ifutilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers){ for_,idx:=rangepodContainerChanges.EphemeralContainersToStart{ start("ephemeralcontainer",ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx])) } } //Step6:starttheinitcontainer. ifcontainer:=podContainerChanges.NextInitContainerToStart;container!=nil{ //Startthenextinitcontainer. iferr:=start("initcontainer",containerStartSpec(container));err!=nil{ return } //Successfullystartedthecontainer;cleartheentryinthefailure klog.V(4).InfoS("Completedinitcontainerforpod","containerName",container.Name,"pod",klog.KObj(pod)) } //Step7:startcontainersinpodContainerChanges.ContainersToStart. for_,idx:=rangepodContainerChanges.ContainersToStart{ start("container",containerStartSpec(&pod.Spec.Containers[idx])) }
6、另外,pod worker还要做以下事情:
#创建pod数据目录、volume、获取imagepullsecrets。。。 newPodWorkers(klet.syncPod--->pkg/kubelet/kubelet.go)//通过syncPod kubetypes.SyncPodKill kubetypes.SyncPodCreate podStatus.IPs=append(podStatus.IPs,ipInfo.IP) runnable.Admit kubetypes.IsStaticPod(pod) kl.makePodDataDirs(pod) kl.volumeManager.WaitForAttachAndMount(pod) kl.getPullSecretsForPod(pod) kl.containerRuntime.SyncPod(pkg/kubelet/container/runtime.go)
转载请注明:IT运维空间 » 运维技术 » 一篇带给你pod创建源码分析
发表评论