kavin

一篇带给你pod创建源码分析

kavin 运维技术 2022-11-17 685浏览 0

一篇带给你pod创建源码分析

大家好,我是华仔。

接触kubernetes已经4年多了,不过多是停留在能够使用,对其原理、源码不是很熟悉。对于平常执行的命令,它背后执行的流程、逻辑也不是很清楚。所以,最近打算去看看k8s各模块的源码。一来是加深对k8s各模块的理解和认识;二来是方便以后遇到问题好分析问题的根本原因,有理有据,则可以服人;再者后续跳槽也不怕被面试官的技术问题所难到了。那么今天,就来简单说一说pod创建的源码吧。文章有错误的地方还请指正,轻喷。首先,k8s的源码在github上即可获取。本次我看的是1.21.3。另外,很多翻译都是直译或翻译软件翻译的。请谅解。

正文

1、k8s源码中针对pod的增删改查是在源码包/pkg/kubelet/kubelet.go中的syncLoop()进行。如下所示:

//syncLoopisthemainloopforprocessingchanges.Itwatchesforchangesfrom
//threechannels(file,apiserver,andhttp)andcreatesaunionofthem.For
//anynewchangeseen,willrunasyncagainstdesiredstateandrunningstate.If
//nochangesareseentotheconfiguration,willsynchronizethelastknowndesired
//stateeverysync-frequencyseconds.Neverreturns.
//syncLoop是处理更改的主循环。它感知来自三个channel(file,apiserver,http)的pod的变化,并且聚合它们。有任何的改变发生,将运行状态同步为期望状态。反之,则在每个同步周期内同步最后已知的期望状态。
func(kl*Kubelet)syncLoop(updates<-chankubetypes.PodUpdate,handlerSyncHandler){
klog.InfoS("Startingkubeletmainsyncloop")

在syncLoop()中则通过kl.syncLoopIteration()针对pod具体执行具体的操作。

kl.syncLoopMonitor.Store(kl.clock.Now())
if!kl.syncLoopIteration(updates,handler,syncTicker.C,housekeepingTicker.C,plegCh){
break
}

2、在syncLoopIteration有几个重要的参数,如下所示:

//Arguments:
//1.configCh:achanneltoreadconfigeventsfrom
//2.handler:theSyncHandlertodispatchpodsto
//3.syncCh:achanneltoreadperiodicsynceventsfrom
//4.housekeepingCh:achanneltoreadhousekeepingeventsfrom
//5.plegCh:achanneltoreadPLEGupdatesfrom

//*configCh:dispatchthepodsfortheconfigchangetotheappropriate
//handlercallbackfortheeventtype
//*plegCh:updatetheruntimecache;syncpod
//*syncCh:syncallpodswaitingforsync
//*housekeepingCh:triggercleanupofpods
//*healthmanager:syncpodsthathavefailedorinwhichoneormore
//containershavefailedhealthchecks

func(kl*Kubelet)syncLoopIteration(configCh<-chankubetypes.PodUpdate,handlerSyncHandler,
syncCh<-chantime.Time,housekeepingCh<-chantime.Time,plegCh<-chan*pleg.PodLifecycleEvent)bool{
select{
caseu,open:=<-configCh:
//Updatefromaconfigsource;dispatchittotherighthandler
//callback.
if!open{
klog.ErrorS(nil,"Updatechannelisclosed,exitingthesyncloop")
returnfalse
}

SyncHandler是一个interface。包含对pod常见操作的几个方法。该接口由kubelet来实现。如下所示:

//SyncHandlerisaninterfaceimplementedbyKubelet,fortestability
#pod创建、更新、删除...
typeSyncHandlerinterface{
HandlePodAdditions(pods[]*v1.Pod)
HandlePodUpdates(pods[]*v1.Pod)
HandlePodRemoves(pods[]*v1.Pod)
HandlePodReconcile(pods[]*v1.Pod)
HandlePodSyncs(pods[]*v1.Pod)
HandlePodCleanups()error
}

3、针对pod可进行的操作如下,每个操作都有对应的方法。比如ADD,就会去执行HandlePodAdditions方法

//TheseconstantsidentifythePodOperationsthatcanbemadeonapodconfiguration.
const(
//SETisthecurrentpodconfiguration.
SETPodOperation=iota
//ADDsignifiespodsthatarenewtothissource.
ADD
//DELETEsignifiespodsthataregracefullydeletedfromthissource.
DELETE
//REMOVEsignifiespodsthathavebeenremovedfromthissource.
REMOVE
//UPDATEsignifiespodshavebeenupdatedinthissource.
UPDATE
//RECONCILEsignifiespodsthathaveunexpectedstatusinthissource,
//kubeletshouldreconcilestatuswiththissource.
RECONCILE
)


switchu.Op{
casekubetypes.ADD:
klog.V(2).InfoS("SyncLoopADD","source",u.Source,"pods",format.Pods(u.Pods))
//Afterrestarting,kubeletwillgetallexistingpodsthrough
//ADDasiftheyarenewpods.Thesepodswillthengothroughthe
//admissionprocessand*may*berejected.Thiscanberesolved
//oncewehavecheckpointing.
handler.HandlePodAdditions(u.Pods)

4、HandlePodAdditions又是如何去执行创建pod的呢?主要有以下几个操作:

1.根据pod的创建时间进行排序
sort.Sort(sliceutils.PodsByCreationTime(pods))

2.将pod添加到podmanager中.因为kubelet它会依赖这个podmanager作为期望状态的一个凭证。
如果一个在podmanager中无法查询,那么就意味着它已经被apiserver删除了,不再需要其他操作
//Alwaysaddthepodtothepodmanager.Kubeletreliesonthepod
//managerasthesourceoftruthforthedesiredstate.Ifapoddoes
//notexistinthepodmanager,itmeansthatithasbeendeletedin
//theapiserverandnoaction(otherthancleanup)isrequired.
kl.podManager.AddPod(pod)

3.判断pod是不是静态pod
mirrorPod,_:=kl.podManager.GetMirrorPodByPod(pod)

4.通过dispatchWork分发任务
kl.dispatchWork(pod,kubetypes.SyncPodCreate,mirrorPod,start)

5.将pod加入到probemanager,即健康检查.包括startupprobe、livenessprobe、readinessprobe。
kl.probeManager.AddPod(pod)

dispatchWork又做了哪些事情呢?如下:

//Runthesyncinanasyncworker.在一个异步worker中执行同步
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod:pod,
MirrorPod:mirrorPod,
UpdateType:syncType,
OnCompleteFunc:func(errerror){
iferr!=nil{
metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
}
},
})

那么UpdatePod()又做哪些事情呢?

//Creatinganewpodworkereithermeansthisisanewpod,orthatthe
//kubeletjustrestarted.Ineithercasethekubeletiswillingtobelieve
//thestatusofthepodforthefirstpodworkersync.Seecorresponding
//commentinsyncPod.
//创建一个新的podworker,意味着这是一个新的pod
gofunc(){
deferruntime.HandleCrash()
p.managePodLoop(podUpdates)
}()

managePodLoop()去执行同步。

forupdate:=rangepodUpdates{
err:=func()error{
podUID:=update.Pod.UID
//Thisisablockingcallthatwouldreturnonlyifthecache
//hasanentryforthepodthatisnewerthanminRuntimeCache
//Time.Thisensurestheworkerdoesn'tstartsyncinguntil
//afterthecacheisatleastnewerthanthefinishedtimeof
//theprevioussync.
status,err:=p.podCache.GetNewerThan(podUID,lastSyncTime)
iferr!=nil{
//Thisisthelegacyeventthrownbymanagepodloop
//allothereventsarenowdispatchedfromsyncPodFn
p.recorder.Eventf(update.Pod,v1.EventTypeWarning,events.FailedSync,"errordeterminingstatus:%v",err)
returnerr
}
//这里去做同步
err=p.syncPodFn(syncPodOptions{
mirrorPod:update.MirrorPod,
pod:update.Pod,
podStatus:status,
killPodOptions:update.KillPodOptions,
updateType:update.UpdateType,
})
lastSyncTime=time.Now()
returnerr
}()

5、最终调用到pkg/kubelet/kuberuntime/kuberuntime_manager.go中SyncPod()进行pod的创建

//SyncPodsyncstherunningpodintothedesiredpodbyexecutingfollowingsteps:
//执行以下的步骤将运行的pod同步到期望的状态
//1.Computesandboxandcontainerchanges.
//计算sanbox和container改变
//2.Killpodsandboxifnecessary.
//如果有必要就删除podsandbox
//3.Killanycontainersthatshouldnotberunning.
//删除不需要运行的容器
//4.Createsandboxifnecessary.
//需要的情况下创建sandbox
//5.Createephemeralcontainers.
//创建临时容器
//6.Createinitcontainers.
//创建初始化容器
//7.Createnormalcontainers.
//创建普通容器
func(m*kubeGenericRuntimeManager)SyncPod()

//Step1:Computesandboxandcontainerchanges.
podContainerChanges:=m.computePodActions(pod,podStatus)
klog.V(3).InfoS("computePodActionsgotforpod","podActions",podContainerChanges,"pod",klog.KObj(pod))
ifpodContainerChanges.CreateSandbox{
ref,err:=ref.GetReference(legacyscheme.Scheme,pod)
iferr!=nil{
klog.ErrorS(err,"Couldn'tmakeareftopod","pod",klog.KObj(pod))
}
ifpodContainerChanges.SandboxID!=""{
m.recorder.Eventf(ref,v1.EventTypeNormal,events.SandboxChanged,"Podsandboxchanged,itwillbekilledandre-created.")
}else{
klog.V(4).InfoS("SyncPodreceivednewpod,willcreateasandboxforit","pod",klog.KObj(pod))
}
}

//Step2:Killthepodifthesandboxhaschanged.
ifpodContainerChanges.KillPod{

//Step3:killanyrunningcontainersinthispodwhicharenottokeep.
forcontainerID,containerInfo:=rangepodContainerChanges.ContainersToKill{
klog.V(3).InfoS("Killingunwantedcontainerforpod","containerName",containerInfo.name,"containerID",containerID,"pod",klog.KObj(pod))
killContainerResult:=kubecontainer.NewSyncResult(kubecontainer.KillContainer,containerInfo.name)
result.AddSyncResult(killContainerResult)
iferr:=m.killContainer(pod,containerID,containerInfo.name,containerInfo.message,containerInfo.reason,nil);err!=nil{
killContainerResult.Fail(kubecontainer.ErrKillContainer,err.Error())
klog.ErrorS(err,"killContainerforpodfailed","containerName",containerInfo.name,"containerID",containerID,"pod",klog.KObj(pod))
return
}

//Step4:Createasandboxforthepodifnecessary.
podSandboxID:=podContainerChanges.SandboxID
ifpodContainerChanges.CreateSandbox{
varmsgstring
varerrerror

klog.V(4).InfoS("CreatingPodSandboxforpod","pod",klog.KObj(pod))
createSandboxResult:=kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox,format.Pod(pod))
result.AddSyncResult(createSandboxResult)
podSandboxID,msg,err=m.createPodSandbox(pod,podContainerChanges.Attempt)

//Step5:startephemeralcontainers
//Thesearestarted"prior"toinitcontainerstoallowrunningephemeralcontainersevenwhenthere
//areerrorsstartinganinitcontainer.Inpracticeinitcontainerswillstartfirstsinceephemeral
//containerscannotbespecifiedonpodcreation.
ifutilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers){
for_,idx:=rangepodContainerChanges.EphemeralContainersToStart{
start("ephemeralcontainer",ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
}
}

//Step6:starttheinitcontainer.
ifcontainer:=podContainerChanges.NextInitContainerToStart;container!=nil{
//Startthenextinitcontainer.
iferr:=start("initcontainer",containerStartSpec(container));err!=nil{
return
}

//Successfullystartedthecontainer;cleartheentryinthefailure
klog.V(4).InfoS("Completedinitcontainerforpod","containerName",container.Name,"pod",klog.KObj(pod))
}

//Step7:startcontainersinpodContainerChanges.ContainersToStart.
for_,idx:=rangepodContainerChanges.ContainersToStart{
start("container",containerStartSpec(&pod.Spec.Containers[idx]))
}

6、另外,pod worker还要做以下事情:

#创建pod数据目录、volume、获取imagepullsecrets。。。
newPodWorkers(klet.syncPod--->pkg/kubelet/kubelet.go)//通过syncPod
kubetypes.SyncPodKill
kubetypes.SyncPodCreate
podStatus.IPs=append(podStatus.IPs,ipInfo.IP)
runnable.Admit
kubetypes.IsStaticPod(pod)
kl.makePodDataDirs(pod)
kl.volumeManager.WaitForAttachAndMount(pod)
kl.getPullSecretsForPod(pod)
kl.containerRuntime.SyncPod(pkg/kubelet/container/runtime.go)

继续浏览有关 系统运维 的文章
发表评论