eclipse静态网站开发,餐饮如何做网络营销,新郑建设局网站,html展示wordpress本文基于Kubernetes v1.22.4版本进行源码学习 1、kubelet工作原理
1#xff09;、kubelet核心工作 kubelet的工作核心就是一个控制循环#xff0c;即#xff1a;SyncLoop#xff08;图中的大圆圈#xff09;。而驱动这个控制循环运行的事件#xff0c;包括#xff1a;P… 本文基于Kubernetes v1.22.4版本进行源码学习 1、kubelet工作原理
1、kubelet核心工作 kubelet的工作核心就是一个控制循环即SyncLoop图中的大圆圈。而驱动这个控制循环运行的事件包括Pod更新事件、Pod生命周期变化、kubelet本身设置的执行周期、定时的清理事件
kubelet还负责维护着很多其他的子控制循环也就是图中的小圆圈叫做xxxManager比如probeManager会定时去监控Pod中容器的健康状况当前支持两种类型的探针livenessProbe和readinessProbestatusManager负责维护状态信息并把Pod状态更新到APIServercontainerRefManager是容器引用的管理用来报告容器的创建、失败等事件
2、CRI与容器运行时
kubelet调用下层容器运行时的执行过程并不会直接调用Docker的API而是通过一组叫作CRIContainer Runtime Interface容器运行时接口的gRPC接口来间接执行的 CRI shim负责响应CRI请求扮演kubelet与容器项目之间的垫片shim。CRI shim实现了CRI规定的每个接口然后把具体的CRI请求翻译成对后端容器项目的请求或者操作
每一种容器项目都可以自己实现一个CRI shim自行对CRI请求进行处理这样Kubernetes就有了一个统一的容器抽象层使得下层容器运行时可以自由地对接进入Kubernetes当中
如果使用Docker的话dockershim负责处理CRI请求然后组装成Docker API请求发给Docker Daemon
CRI接口可以分为两组
RuntimeService主要是跟容器相关的操作比如创建、启动、删除容器执行exec命令等ImageManagerService主要是容器镜像相关的操作比如拉取镜像、删除镜像等
CRI接口核心方法如下图 CRD设计的一个重要原则就是确保这个接口本身只关注容器不关注Pod在CRI的设计里并没有一个直接创建Pod或者启动Pod的接口
PodSandboxManager中包含RunPodSandbox方法这个PodSandbox对应的并不是Kubernetes里的Pod API对象而只是抽取了Pod里的一部分与容器运行时相关的字段比如HostName、DnsConfig、CgroupParent等。所以说PodSandbox描述的其实是Kubernetes将Pod这个概念映射到容器运行时层面所需要的字段或者说是一个Pod对象子集
比如当执行kubectl run创建了一个名叫foo的、包括了A、B两个容器的Pod之后。如果是Docker项目dockershim就会创建出一个名叫foo的Infra容器pause容器用来hold住整个Pod的Network Namespace
2、kubelet启动过程
pkg/kubelet/kubelet.go的Run()方法启动了kubelet各个模块代码如下
// pkg/kubelet/kubelet.go
func (kl *Kubelet) Run(updates -chan kubetypes.PodUpdate) {if kl.logServer nil {kl.logServer http.StripPrefix(/logs/, http.FileServer(http.Dir(/var/log/)))}if kl.kubeClient nil {klog.InfoS(No API server defined - no node status update will be sent)}// Start the cloud provider sync managerif kl.cloudResourceSyncManager ! nil {go kl.cloudResourceSyncManager.Run(wait.NeverStop)}// 启动不依赖container runtime的一些模块if err : kl.initializeModules(); err ! nil {kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())klog.ErrorS(err, Failed to initialize internal modules)os.Exit(1)}// Start volume manager// 启动volume managergo kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)if kl.kubeClient ! nil {// Start syncing node status immediately, this may set up things the runtime needs to run.// 定时同步node状态go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)// 更新pod CIDR、runtime状态以及执行首次node状态同步go kl.fastStatusUpdateOnce()// start syncing lease// 启动node lease机制go kl.nodeLeaseController.Run(wait.NeverStop)}// 定时更新runtime状态go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)// Set up iptables util rulesif kl.makeIPTablesUtilChains {kl.initNetworkUtil()}// Start component sync loops.// 启动statusManagerkl.statusManager.Start()// Start syncing RuntimeClasses if enabled.// 启动runtimeClassManagerif kl.runtimeClassManager ! nil {kl.runtimeClassManager.Start(wait.NeverStop)}// Start the pod lifecycle event generator.// 启动pleg,该模块主要用于周期性地向container runtime刷新当前所有容器的状态kl.pleg.Start()// 启动kubelet事件循环,不停监听外部数据的变化执行pod的相应操作kl.syncLoop(updates, kl)
}Run()方法主要逻辑如下
调用kl.initializeModules()方法启动不依赖container runtime的一些模块启动volume manager定时同步node状态调用kl.fastStatusUpdateOnce()方法更新pod CIDR、runtime状态以及执行首次node状态同步启动node lease机制同步节点租约定时执行kl.updateRuntimeUp()方法更新runtime状态启动statusManager、runtimeClassManager调用kl.pleg.Start()方法启动pleg该模块主要用于周期性地向container runtime刷新当前所有容器的状态调用kl.syncLoop()方法启动kubelet事件循环不停监听外部数据的变化执行pod的相应操作
1、initializeModules()方法
// pkg/kubelet/kubelet.go
func (kl *Kubelet) initializeModules() error {// Prometheus metrics.metrics.Register(collectors.NewVolumeStatsCollector(kl),collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),)metrics.SetNodeName(kl.nodeName)servermetrics.Register()// Setup filesystem directories.if err : kl.setupDataDirs(); err ! nil {return err}// If the container logs directory does not exist, create it.if _, err : os.Stat(ContainerLogsDir); err ! nil {if err : kl.os.MkdirAll(ContainerLogsDir, 0755); err ! nil {return fmt.Errorf(failed to create directory %q: %v, ContainerLogsDir, err)}}// Start the image manager.// 启动imageManagerkl.imageManager.Start()// Start the certificate manager if it was enabled.// 启动certificateManager,证书相关if kl.serverCertificateManager ! nil {kl.serverCertificateManager.Start()}// Start out of memory watcher.// 启动oomWatcherif kl.oomWatcher ! nil {if err : kl.oomWatcher.Start(kl.nodeRef); err ! nil {return fmt.Errorf(failed to start OOM watcher: %w, err)}}// Start resource analyzer// 启动resource analyzer,刷新volume stats到缓存中kl.resourceAnalyzer.Start()return nil
}initializeModules()方法主要逻辑如下
启动imageManager实际上是realImageGCManager启动certificateManager证书相关启动oomWatcher启动resource analyzer刷新volume stats到缓存中
kl.imageManager.Start()方法代码如下
// pkg/kubelet/images/image_gc_manager.go
func (im *realImageGCManager) Start() {go wait.Until(func() {// Initial detection make detected time unknown in the past.var ts time.Timeif im.initialized {ts time.Now()}// 找出所有的image,并删除不再使用的image_, err : im.detectImages(ts)if err ! nil {klog.InfoS(Failed to monitor images, err, err)} else {im.initialized true}}, 5*time.Minute, wait.NeverStop)// Start a goroutine periodically updates image cache.// 更新image的缓存go wait.Until(func() {// 调用CRI接口,获取最新的imageimages, err : im.runtime.ListImages()if err ! nil {klog.InfoS(Failed to update image list, err, err)} else {im.imageCache.set(images)}}, 30*time.Second, wait.NeverStop)}realImageGCManager的Start()方法启动两个协程
定时调用detectImages()方法会找出所有正在使用的image然后删除不再使用的image定时获取最新的image调用imageCache()方法更新image的缓存
2、fastStatusUpdateOnce()方法
// pkg/kubelet/kubelet.go
func (kl *Kubelet) fastStatusUpdateOnce() {for {time.Sleep(100 * time.Millisecond)node, err : kl.GetNode()if err ! nil {klog.ErrorS(err, Error getting node)continue}if len(node.Spec.PodCIDRs) ! 0 {podCIDRs : strings.Join(node.Spec.PodCIDRs, ,)if _, err : kl.updatePodCIDR(podCIDRs); err ! nil {klog.ErrorS(err, Pod CIDR update failed, CIDR, podCIDRs)continue}// 更新runtime状态kl.updateRuntimeUp()// node状态同步kl.syncNodeStatus()return}}
}fastStatusUpdateOnce()方法启动一个循环尝试立即更新pod CIDR。更新pod CIDR后会更新runtime状态并同步node状态。该方法在一次成功的node状态同步后直接返回仅在kubelet启动期间执行
kl.updateRuntimeUp()方法代码如下
// pkg/kubelet/kubelet.go
// 首次执行的时候会初始化runtime依赖模块,然后更新runtimeState
func (kl *Kubelet) updateRuntimeUp() {kl.updateRuntimeMux.Lock()defer kl.updateRuntimeMux.Unlock()// 获取containerRuntime状态s, err : kl.containerRuntime.Status()if err ! nil {klog.ErrorS(err, Container runtime sanity check failed)return}if s nil {klog.ErrorS(nil, Container runtime status is nil)return}// Periodically log the whole runtime status for debugging.klog.V(4).InfoS(Container runtime status, status, s)networkReady : s.GetRuntimeCondition(kubecontainer.NetworkReady)// 检查network和runtime是否处于ready状态if networkReady nil || !networkReady.Status {klog.ErrorS(nil, Container runtime network not ready, networkReady, networkReady)kl.runtimeState.setNetworkState(fmt.Errorf(container runtime network not ready: %v, networkReady))} else {// Set nil if the container runtime network is ready.kl.runtimeState.setNetworkState(nil)}// information in RuntimeReady condition will be propagated to NodeReady condition.// 获取运行时状态runtimeReady : s.GetRuntimeCondition(kubecontainer.RuntimeReady)// If RuntimeReady is not set or is false, report an error.if runtimeReady nil || !runtimeReady.Status {klog.ErrorS(nil, Container runtime not ready, runtimeReady, runtimeReady)kl.runtimeState.setRuntimeState(fmt.Errorf(container runtime not ready: %v, runtimeReady))return}kl.runtimeState.setRuntimeState(nil)// 调用kl.initializeRuntimeDependentModules初始化依赖模块kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)kl.runtimeState.setRuntimeSync(kl.clock.Now())
}updateRuntimeUp()方法会获取containerRuntime状态信息然后根据返回containerRuntime状态检查网络、runtime是不是已经处于ready状态接着调用kl.initializeRuntimeDependentModules()方法初始化依赖模块这里会启动cadvisor、containerManager、evictionManager、containerLogManager、pluginManager、shutdownManager最后设置runtime同步时间
3、syncLoop()方法
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoop(updates -chan kubetypes.PodUpdate, handler SyncHandler) {klog.InfoS(Starting kubelet main sync loop)// The syncTicker wakes up kubelet to checks if there are any pod workers// that need to be syncd. A one-second period is sufficient because the// sync interval is defaulted to 10s.syncTicker : time.NewTicker(time.Second)defer syncTicker.Stop()housekeepingTicker : time.NewTicker(housekeepingPeriod)defer housekeepingTicker.Stop()plegCh : kl.pleg.Watch()const (base 100 * time.Millisecondmax 5 * time.Secondfactor 2)duration : base// Responsible for checking limits in resolv.conf// The limits do not have anything to do with individual pods// Since this is called in syncLoop, we dont need to call it anywhere elseif kl.dnsConfigurer ! nil kl.dnsConfigurer.ResolverConfig ! {kl.dnsConfigurer.CheckLimitsForResolvConf()}for {if err : kl.runtimeState.runtimeErrors(); err ! nil {klog.ErrorS(err, Skipping pod synchronization)// exponential backofftime.Sleep(duration)duration time.Duration(math.Min(float64(max), factor*float64(duration)))continue}// reset backoff if we have a successduration basekl.syncLoopMonitor.Store(kl.clock.Now())// 调用kl.syncLoopIteration方法if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {break}kl.syncLoopMonitor.Store(kl.clock.Now())}
}syncLoop()方法在一个循环中不断的调用syncLoopIteration()方法
关于syncLoopIteration()方法中涉及的channel后面会详细介绍这里只关注syncLoopIteration()方法中的处理逻辑
1configCh
// pkg/kubelet/kubelet.go
// 该方法会监听多个channel,当发现任何一个channel有数据就交给handler去处理,在handler中通过调用dispatchWork分发任务
func (kl *Kubelet) syncLoopIteration(configCh -chan kubetypes.PodUpdate, handler SyncHandler,syncCh -chan time.Time, housekeepingCh -chan time.Time, plegCh -chan *pleg.PodLifecycleEvent) bool {select {case u, open : -configCh:// 该模块将同时watch 3个不同来源的pod信息的变化(file,http,apiServer)// 一旦某个来源的pod信息发生了变化(创建/更新/删除),这个channel中就会出现被更新的pod信息和更新的具体操作// Update from a config source; dispatch it to the right handler// callback.if !open {klog.ErrorS(nil, Update channel is closed, exiting the sync loop)return false}switch u.Op {case kubetypes.ADD:klog.V(2).InfoS(SyncLoop ADD, source, u.Source, pods, format.Pods(u.Pods))// After restarting, kubelet will get all existing pods through// ADD as if they are new pods. These pods will then go through the// admission process and *may* be rejected. This can be resolved// once we have checkpointing.handler.HandlePodAdditions(u.Pods)case kubetypes.UPDATE:klog.V(2).InfoS(SyncLoop UPDATE, source, u.Source, pods, format.Pods(u.Pods))handler.HandlePodUpdates(u.Pods)case kubetypes.REMOVE:klog.V(2).InfoS(SyncLoop REMOVE, source, u.Source, pods, format.Pods(u.Pods))handler.HandlePodRemoves(u.Pods)case kubetypes.RECONCILE:klog.V(4).InfoS(SyncLoop RECONCILE, source, u.Source, pods, format.Pods(u.Pods))handler.HandlePodReconcile(u.Pods)case kubetypes.DELETE:klog.V(2).InfoS(SyncLoop DELETE, source, u.Source, pods, format.Pods(u.Pods))// DELETE is treated as a UPDATE because of graceful deletion.handler.HandlePodUpdates(u.Pods)case kubetypes.SET:// TODO: Do we want to support this?klog.ErrorS(nil, Kubelet does not support snapshot update)default:klog.ErrorS(nil, Invalid operation type received, operation, u.Op)}kl.sourcesReady.AddSource(u.Source)...}return true
} configCh是读取配置事件的管道该模块将同时watch 3个不同来源的Pod信息的变化file、http、APIServer一旦某个来源的Pod信息发生了变化创建/更新/删除这个channel中就会出现被更新的Pod信息和更新的具体操作
2plegCh
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh -chan kubetypes.PodUpdate, handler SyncHandler,syncCh -chan time.Time, housekeepingCh -chan time.Time, plegCh -chan *pleg.PodLifecycleEvent) bool {select {...case e : -plegCh:if e.Type pleg.ContainerStarted {// record the most recent time we observed a container start for this pod.// this lets us selectively invalidate the runtimeCache when processing a delete for this pod// to make sure we dont miss handling graceful termination for containers we reported as having started.kl.lastContainerStartedTime.Add(e.ID, time.Now())}if isSyncPodWorthy(e) {// PLEG event for a pod; sync it.if pod, ok : kl.podManager.GetPodByUID(e.ID); ok {klog.V(2).InfoS(SyncLoop (PLEG): event for pod, pod, klog.KObj(pod), event, e)handler.HandlePodSyncs([]*v1.Pod{pod})} else {// If the pod no longer exists, ignore the event.klog.V(4).InfoS(SyncLoop (PLEG): pod does not exist, ignore irrelevant event, event, e)}}if e.Type pleg.ContainerDied {if containerID, ok : e.Data.(string); ok {kl.cleanUpContainersInPod(e.ID, containerID)}}...}return true
} kl.pleg.Start()的时候会每秒钟调用一次relist根据最新的PodStatus生成PodLiftCycleEvent然后存入到plegCh中
syncLoop()方法中调用kl.pleg.Watch()获取plegCh然后传给syncLoopIteration()方法syncLoopIteration()方法中消费plegCh中的数据在handler中通过调用dispatchWork分发任务
3syncCh
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh -chan kubetypes.PodUpdate, handler SyncHandler,syncCh -chan time.Time, housekeepingCh -chan time.Time, plegCh -chan *pleg.PodLifecycleEvent) bool {select {...case -syncCh:// Sync pods waiting for syncpodsToSync : kl.getPodsToSync()if len(podsToSync) 0 {break}klog.V(4).InfoS(SyncLoop (SYNC) pods, total, len(podsToSync), pods, format.Pods(podsToSync))// 同步最新保存的pod状态handler.HandlePodSyncs(podsToSync)...}return true
} syncCh是由syncLoop()方法里面创建的一个定时任务每秒钟会向syncCh添加一个数据这个方法会同步所有等待同步的Pod
4kl.livenessManager.Updates()、kl.readinessManager.Updates()、kl.startupManager.Updates()
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh -chan kubetypes.PodUpdate, handler SyncHandler,syncCh -chan time.Time, housekeepingCh -chan time.Time, plegCh -chan *pleg.PodLifecycleEvent) bool {select {...case update : -kl.livenessManager.Updates():// 如果探针健康检查失败,需要更新pod的状态if update.Result proberesults.Failure {handleProbeSync(kl, update, handler, liveness, unhealthy)}case update : -kl.readinessManager.Updates():// 当readiness状态变更时,更新容器和pod的状态ready : update.Result proberesults.Successkl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)status : if ready {status ready}handleProbeSync(kl, update, handler, readiness, status)case update : -kl.startupManager.Updates():// 当startup状态变更时,更新容器和pod的状态started : update.Result proberesults.Successkl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)status : unhealthyif started {status started}handleProbeSync(kl, update, handler, startup, status)...}return true
} 6housekeepingCh
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh -chan kubetypes.PodUpdate, handler SyncHandler,syncCh -chan time.Time, housekeepingCh -chan time.Time, plegCh -chan *pleg.PodLifecycleEvent) bool {select {...case -housekeepingCh:if !kl.sourcesReady.AllReady() {// If the sources arent ready or volume manager has not yet synced the states,// skip housekeeping, as we may accidentally delete pods from unready sources.klog.V(4).InfoS(SyncLoop (housekeeping, skipped): sources arent ready yet)} else {start : time.Now()klog.V(4).InfoS(SyncLoop (housekeeping))// 执行一些清理工作,包括终止pod workers、删除不想要的pod,移除volumes、pod目录if err : handler.HandlePodCleanups(); err ! nil {klog.ErrorS(err, Failed cleaning pods)}duration : time.Since(start)if duration housekeepingWarningDuration {klog.ErrorS(fmt.Errorf(housekeeping took too long), Housekeeping took longer than 15s, seconds, duration.Seconds())}klog.V(4).InfoS(SyncLoop (housekeeping) end)}}return true
}housekeepingCh也是由由syncLoop()方法创建的每两秒钟会触发清理包括终止Pod Workers、删除不想要的Pod移除Volumes、Pod目录等
syncLoopIteration()方法监听如下的channel根据事件做不同的处理
configCh监听file、HTTP、APIServer的时间更新plegChpleg子模块每秒钟调用一次relist根据最新的PodStatus生成podLiftCycleEvent然后存入到plegCh中syncCh定时器管道, 每隔一秒去同步最新保存的Pod状态kl.livenessManager.Updates()如果探针检查失败,需要更新Pod的状态kl.readinessManager.Updates()当readiness状态变更时更新容器和Pod的状态kl.startupManager.Updates()当startup状态变更时更新容器和Pod的状态housekeepingCh每两秒钟会触发Pod清理工作
4、小结
kubelet启动过程如下图 3、syncLoopIteration()方法中涉及的channel
1、configCh configCh相关的代码调用流程如上图关于syncLoopIteration()方法中configCh的处理逻辑前面已经讲过了这里来看下kubelet是如何监听APIServer并将Pod信息变化写入configCh的
// pkg/kubelet/kubelet.go
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {manifestURLHeader : make(http.Header)if len(kubeCfg.StaticPodURLHeader) 0 {for k, v : range kubeCfg.StaticPodURLHeader {for i : range v {manifestURLHeader.Add(k, v[i])}}}// source of all configuration// 初始化config.PodConfigcfg : config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)// 添加三种数据来源分别是file、http、apiServer// define file config sourceif kubeCfg.StaticPodPath ! {klog.InfoS(Adding static pod path, path, kubeCfg.StaticPodPath)config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))}// define url config sourceif kubeCfg.StaticPodURL ! {klog.InfoS(Adding pod URL with HTTP header, URL, kubeCfg.StaticPodURL, header, manifestURLHeader)config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))}if kubeDeps.KubeClient ! nil {klog.InfoS(Adding apiserver pod source)config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(kubetypes.ApiserverSource))}return cfg, nil
}makePodSourceConfig()方法中先初始化config.PodConfig然后添加三种数据来源分别是file、http、APIServer调用cfg.Channel()方法会创建对应的channel
1NewSourceApiserver()
这里先来看监听APIServer的部分NewSourceApiserver()方法代码如下
// pkg/kubelet/config/apiserver.go
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan- interface{}) {// 创建ListWatch,监听当前node的pod变化lw : cache.NewListWatchFromClient(c.CoreV1().RESTClient(), pods, metav1.NamespaceAll, fields.OneTermEqualSelector(spec.nodeName, string(nodeName)))// The Reflector responsible for watching pods at the apiserver should be run only after// the node sync with the apiserver has completed.klog.InfoS(Waiting for node sync before watching apiserver pods)go func() {for {if nodeHasSynced() {klog.V(4).InfoS(node sync completed)break}time.Sleep(WaitForAPIServerSyncPeriod)klog.V(4).InfoS(node sync has not completed yet)}klog.InfoS(Watching apiserver)// 如果node sync完成,调用newSourceApiserverFromLW方法newSourceApiserverFromLW(lw, updates)}()
}func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan- interface{}) {send : func(objs []interface{}) {var pods []*v1.Podfor _, o : range objs {pods append(pods, o.(*v1.Pod))}// 监听到apiServer当前node的pod信息变化后写入channel,后续listen()方法会监听这个channel接收值updates - kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}}// 调用client-go API来创建reflectorr : cache.NewReflector(lw, v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)go r.Run(wait.NeverStop)
}newSourceApiserverFromLW()方法中调用client-go API来创建Reflector当监听到APIServer中当前Node的Pod信息变化后写入channel
2cfg.Channel()
makePodSourceConfig()方法中调用cfg.Channel()方法会创建对应的channel
// pkg/kubelet/config/config.go
// 给每个来源注册一个专用的channel
func (c *PodConfig) Channel(source string) chan- interface{} {c.sourcesLock.Lock()defer c.sourcesLock.Unlock()c.sources.Insert(source)// 调用c.mux.Channel方法return c.mux.Channel(source)
}// pkg/util/config/config.go
func (m *Mux) Channel(source string) chan interface{} {if len(source) 0 {panic(Channel given an empty name)}m.sourceLock.Lock()defer m.sourceLock.Unlock()channel, exists : m.sources[source]if exists {return channel}newChannel : make(chan interface{})m.sources[source] newChannel// 同时启动goroutine去监听新数据// 这里创建的channel最终会传入newSourceApiserverFromLW中定义的send函数中,当监听到apiServer当前node的pod数据变化后会写入channel// listen函数会一直监听这个channel来接收数据go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)return newChannel
}func (m *Mux) listen(source string, listenChannel -chan interface{}) {for update : range listenChannel {// 调用Merge方法m.merger.Merge(source, update)}
}Channel()方法中创建的channel最终会传入newSourceApiserverFromLW()方法中定义的send()函数中当监听到APIServer当前Node的Pod信息数据变化后会写入channel这里的listen()方法会一直监听这个channel来接收数据listen()方法调用Merge()方法处理接收到的数据
// pkg/kubelet/config/config.go
func (s *podStorage) Merge(source string, change interface{}) error {s.updateLock.Lock()defer s.updateLock.Unlock()seenBefore : s.sourcesSeen.Has(source)// 区分pod变更类型adds, updates, deletes, removes, reconciles : s.merge(source, change)firstSet : !seenBefore s.sourcesSeen.Has(source)// deliver update notificationsswitch s.mode {case PodConfigNotificationIncremental:// 最终将pod变更信息传入configChif len(removes.Pods) 0 {s.updates - *removes}if len(adds.Pods) 0 {s.updates - *adds}if len(updates.Pods) 0 {s.updates - *updates}if len(deletes.Pods) 0 {s.updates - *deletes}if firstSet len(adds.Pods) 0 len(updates.Pods) 0 len(deletes.Pods) 0 {// Send an empty update when first seeing the source and there are// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that// the source is ready.s.updates - *adds}// Only add reconcile support here, because kubelet doesnt support Snapshot update now.if len(reconciles.Pods) 0 {s.updates - *reconciles}...}return nil
}Merge()方法中会区分Pod变更类型最终将Pod变更信息传入configChkl.syncLoopIteration()方法中监听configCh交给handler去处理在handler中通过调用dispatchWork分发任务
configCh数据写入流程如下图 2、plegCh 初始化pleg并运行代码如下
// pkg/kubelet/pleg/generic.go
func (g *GenericPLEG) Start() {go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}Start()方法中启动一个gorounite函数每一秒执行一次relist()方法
// pkg/kubelet/pleg/generic.go
func (g *GenericPLEG) relist() {klog.V(5).InfoS(GenericPLEG: Relisting)if lastRelistTime : g.getRelistTime(); !lastRelistTime.IsZero() {metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))}timestamp : g.clock.Now()defer func() {metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))}()// Get all the pods.// 调用runtime获取当前node的所有pod和container信息(最终调用CRI接口)podList, err : g.runtime.GetPods(true)if err ! nil {klog.ErrorS(err, GenericPLEG: Unable to retrieve pods)return}g.updateRelistTime(timestamp)pods : kubecontainer.Pods(podList)// update running pod and container countupdateRunningPodAndContainerMetrics(pods)g.podRecords.setCurrent(pods)// Compare the old and the current pods, and generate events.eventsByPodID : map[types.UID][]*PodLifecycleEvent{}for pid : range g.podRecords {oldPod : g.podRecords.getOld(pid)pod : g.podRecords.getCurrent(pid)// Get all containers in the old and the new pod.// 获得pod中的所有containerallContainers : getContainersFromPods(oldPod, pod)for _, container : range allContainers {// 检查container是否有变化,如果有变化,生成podLifecycleEventevents : computeEvents(oldPod, pod, container.ID)for _, e : range events {updateEvents(eventsByPodID, e)}}}var needsReinspection map[types.UID]*kubecontainer.Podif g.cacheEnabled() {needsReinspection make(map[types.UID]*kubecontainer.Pod)}// If there are events associated with a pod, we should update the// podCache.// 遍历所有发生的event的podfor pid, events : range eventsByPodID {pod : g.podRecords.getCurrent(pid)if g.cacheEnabled() {// updateCache() will inspect the pod and update the cache. If an// error occurs during the inspection, we want PLEG to retry again// in the next relist. To achieve this, we do not update the// associated podRecord of the pod, so that the change will be// detect again in the next relist.// TODO: If many pods changed during the same relist period,// inspecting the pod and getting the PodStatus to update the cache// serially may take a while. We should be aware of this and// parallelize if needed.if err : g.updateCache(pod, pid); err ! nil {// Rely on updateCache calling GetPodStatus to log the actual error.klog.V(4).ErrorS(err, PLEG: Ignoring events for pod, pod, klog.KRef(pod.Namespace, pod.Name))// make sure we try to reinspect the pod during the next relistingneedsReinspection[pid] podcontinue} else {// this pod was in the list to reinspect and we did so because it had events, so remove it// from the list (we dont want the reinspection code below to inspect it a second time in// this relist execution)delete(g.podsToReinspect, pid)}}// Update the internal storage and send out the events.g.podRecords.update(pid)// Map from containerId to exit code; used as a temporary cache for lookupcontainerExitCode : make(map[string]int)// 遍历这个pod的所有event变化for i : range events {// Filter out events that are not reliable and no other components use yet.if events[i].Type ContainerChanged {continue}select {// 推送到kubelet的plegCh中case g.eventChannel - events[i]:default:metrics.PLEGDiscardEvents.Inc()klog.ErrorS(nil, Event channel is full, discard this relist() cycle event)}// Log exit code of containers when they finished in a particular eventif events[i].Type ContainerDied {// Fill up containerExitCode map for ContainerDied event when first time appearedif len(containerExitCode) 0 pod ! nil g.cache ! nil {// Get updated podStatusstatus, err : g.cache.Get(pod.ID)if err nil {for _, containerStatus : range status.ContainerStatuses {containerExitCode[containerStatus.ID.ID] containerStatus.ExitCode}}}if containerID, ok : events[i].Data.(string); ok {if exitCode, ok : containerExitCode[containerID]; ok pod ! nil {klog.V(2).InfoS(Generic (PLEG): container finished, podID, pod.ID, containerID, containerID, exitCode, exitCode)}}}}}if g.cacheEnabled() {// reinspect any pods that failed inspection during the previous relistif len(g.podsToReinspect) 0 {klog.V(5).InfoS(GenericPLEG: Reinspecting pods that previously failed inspection)for pid, pod : range g.podsToReinspect {if err : g.updateCache(pod, pid); err ! nil {// Rely on updateCache calling GetPodStatus to log the actual error.klog.V(5).ErrorS(err, PLEG: pod failed reinspection, pod, klog.KRef(pod.Namespace, pod.Name))needsReinspection[pid] pod}}}// Update the cache timestamp. This needs to happen *after*// all pods have been properly updated in the cache.g.cache.UpdateTime(timestamp)}// make sure we retain the list of pods that need reinspecting the next time relist is calledg.podsToReinspect needsReinspection
}relist()方法中主要逻辑如下
调用runtime获取当前Node的所有Pod和Container信息最终调用CRI接口遍历所有Pod检查container是否有变化如果有变化生成podLifecycleEvent遍历所有发生的event的Pod遍历Pod的所有event变化推送到kubelet的plegChg中
3、syncCh
所有的Pod进入syncLoopIteration()方法后最终会走到managePodLoop()方法中会将Pod信息添加到workQueue队列里
// pkg/kubelet/pleg/generic.go
func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) {// Requeue the last update if the last sync returned error.switch {case syncErr nil:// No error; requeue at the regular resync interval.p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))case strings.Contains(syncErr.Error(), NetworkNotReadyErrorMsg):// Network is not ready; back off for short period of time and retry as network might be ready soon.p.workQueue.Enqueue(pod.UID, wait.Jitter(backOffOnTransientErrorPeriod, workerBackOffPeriodJitterFactor))default:// Error occurred during the sync; back off and then retry.p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))}p.completeWorkQueueNext(pod.UID)
}syncCh是由syncLoop()方法里面创建的一个定时任务每秒钟会调用getPodsToSync()方法从workQueue中获取等待同步的Pod进行同步
4、小结
kubelet核心流程如下图 参考
11.深入k8skubelet工作原理及其初始化源码分析
45 | 幕后英雄SIG-Node与CRI
46 | 解读 CRI 与 容器运行时
kubelet启动创建Pod源码分析
kubelet源码分析 syncLoopIteration一 configCh
kubelet源码分析 syncLoopIteration二 plegCh、syncCh、relist
kubelet源码分析 housekeeping 定时清理