Kubernetes Project Exploration, Part 3 - kubelet mechanism and source code analysis

7 minute read

Overview

Kubelet is a node agent running on each Kubernetes node. It is basically a pod manager controlling pods running on the node.

PodSpec yaml file is provided by client and received through API server, then kubelet will update pod on scheduled node.

Kubelet acts as a gRPC client communicating with container runtime, instruct runtime to do the actual container operation.

Following is the structure of kubelet:

kubelet structure

Project Architecture

kubelet is one of the core components of Kubernetes. Its source code sits directly inside main project at pkg/kubelet folder.

Following are important sub-folders of kubelet project:

server

Kubelet communicates with control plane through http call. Thus kubelet itself is an http server.

config

The main input of kubelet server is “config” of Pod. config folder contains Object-Oriented abstraction of different types of config.

pod/images/network/volumemanager

These folders contain management code of different aspects of a Pod. They determine how a Pod is run.

cm

One of the main goal of Kubernetes is resource management. Kubelet should be able to allocate proper resources specified by PodSpec.

In kubelet project this is implemented inside cm folder, aka container manager. Resources like cpu, memory and device will be managed by ContainerManager.

container/kuberuntime

These two folders together serve as the interface of container runtime.

In order to operate a container, kubelet calls functions inside container folder, which then calls functions inside kuberuntime.

kuberuntime folder contains grpc services that communicate with underlying container runtime such as Docker’s containerd.

prober/stats/metrics/logs

kubelet provides various ways to increase the observability of managed pods. These folders contain code which collects info that is later fetched by control plane.

Process of creating a Pod

One of the typical use cases of Kubernetes is to start a Pod. The steps are:

  1. User uses kubectl to communicate with API server asking creation of a resource;
  2. Scheduler schedules Pod to a proper node;
  3. Pod is started on this node by kubelet;

Kubelet is responsible for step 3 of pod creation. Let’s walk through the code to see how it is done.

start kubelet

//source: https://github.com/kubernetes/kubernetes/blob/75242fce7aa8a8f9e703b8602587900ca5aaf937/cmd/kubelet/app/server.go#L1178
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
	// start the kubelet
	go k.Run(podCfg.Updates())

	// start the kubelet server
	if enableServer {
		go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth,
			enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling, kubeCfg.EnableSystemLogHandler)

	}
	...
}

The main kubelet logic is clear, there are two things to do:

  1. Start kubelet server which listens to incoming instructions;
  2. Start a goroutine which handles instructions of pod;

Following is the pod instruction handling logic:

//source: https://github.com/kubernetes/kubernetes/blob/75242fce7aa8a8f9e703b8602587900ca5aaf937/pkg/kubelet/kubelet.go#L1306
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
	if kl.logServer == nil {
		kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
	}
	if err := kl.initializeModules(); err != nil {
		kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
		klog.Fatal(err)
	}

	// Start volume manager
	go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

	if kl.kubeClient != nil {
		// Start syncing node status immediately, this may set up things the runtime needs to run.
		go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
		go kl.fastStatusUpdateOnce()

		// start syncing lease
		go kl.nodeLeaseController.Run(wait.NeverStop)
	}

	// Start a goroutine responsible for killing pods (that are not properly
	// handled by pod workers).
	go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)

	// Start component sync loops.
	kl.statusManager.Start()
	kl.probeManager.Start()

	// Start syncing RuntimeClasses if enabled.
	if kl.runtimeClassManager != nil {
		kl.runtimeClassManager.Start(wait.NeverStop)
	}

	// Start the pod lifecycle event generator.
	kl.pleg.Start()
	kl.syncLoop(updates, kl)
}

We can see that it consists of all kinds of “managers” which handle logic like podUpdate, volume, status, logs, liveness probe, etc.

PodConfig

The specification of pod is stored in a struct PodConfig:

//source: https://github.com/kubernetes/kubernetes/blob/0ed41c3f1036785c6c86dd35d20412c8387cf382/pkg/kubelet/config/config.go#L56
type PodConfig struct {
	pods *podStorage
	mux  *config.Mux

	// the channel of denormalized changes passed to listeners
	updates chan kubetypes.PodUpdate

	// contains the list of all configured sources
	sourcesLock sync.Mutex
	sources     sets.String
}

It contains a list of pods and a channel updates which listens to update of pods.

The update of pod is represented as follows:

//source: https://github.com/kubernetes/kubernetes/blob/0ed41c3f1036785c6c86dd35d20412c8387cf382/pkg/kubelet/types/pod_update.go#L74
type PodUpdate struct {
	Pods   []*v1.Pod
	Op     PodOperation
	Source string
}

Whenever there is an update of Pod, it is sent through updates channel and received by handler, which performs pod update on the node.

PodConfig synchronizer

Among all handling logic, syncLoop is the main loop processing update of Pods. It syncs from running state to desired state.

//source: https://github.com/kubernetes/kubernetes/blob/8c724d793370605d0c474eb6e4fb74779212ff1d/pkg/kubelet/kubelet.go#L1785
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
	klog.Info("Starting kubelet main sync loop.")
	// The syncTicker wakes up kubelet to checks if there are any pod workers
	// that need to be sync'd. A one-second period is sufficient because the
	// sync interval is defaulted to 10s.
	syncTicker := time.NewTicker(time.Second)
	defer syncTicker.Stop()
	housekeepingTicker := time.NewTicker(housekeepingPeriod)
	defer housekeepingTicker.Stop()
	plegCh := kl.pleg.Watch()
	const (
		base   = 100 * time.Millisecond
		max    = 5 * time.Second
		factor = 2
	)
	duration := base
	for {
		if err := kl.runtimeState.runtimeErrors(); err != nil {
			klog.Errorf("skipping pod synchronization - %v", err)
			// exponential backoff
			time.Sleep(duration)
			duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
			continue
		}
		// reset backoff if we have a success
		duration = base

		kl.syncLoopMonitor.Store(kl.clock.Now())
		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
		kl.syncLoopMonitor.Store(kl.clock.Now())
	}
}

Basically it is a non-ending loop listening to config update. It is also woken up periodically to sync to last known desired state.

Based on type of Pod update, the update event is dispatched to appropriate handler:

//source: https://github.com/kubernetes/kubernetes/blob/8c724d793370605d0c474eb6e4fb74779212ff1d/pkg/kubelet/kubelet.go#L1859
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
		// Update from a config source; dispatch it to the right handler
		// callback.
		if !open {
			klog.Errorf("Update channel is closed. Exiting the sync loop.")
			return false
		}

		switch u.Op {
		case kubetypes.ADD:
			klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
			// After restarting, kubelet will get all existing pods through
			// ADD as if they are new pods. These pods will then go through the
			// admission process and *may* be rejected. This can be resolved
			// once we have checkpointing.
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.REMOVE:
			klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
			handler.HandlePodRemoves(u.Pods)
		case kubetypes.RECONCILE:
			klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
			handler.HandlePodReconcile(u.Pods)
		case kubetypes.DELETE:
			klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
			// DELETE is treated as a UPDATE because of graceful deletion.
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.SET:
			// TODO: Do we want to support this?
			klog.Errorf("Kubelet does not support snapshot update")
		default:
			klog.Errorf("Invalid event type received: %d.", u.Op)
		}
	...
	}
	return true
}

Pod Update Handler

According to the above code, a pod-ADD operation is handled by HandlePodAdditions:

//source: https://github.com/kubernetes/kubernetes/blob/8c724d793370605d0c474eb6e4fb74779212ff1d/pkg/kubelet/kubelet.go#L2010
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
	start := kl.clock.Now()
	sort.Sort(sliceutils.PodsByCreationTime(pods))
	for _, pod := range pods {
		existingPods := kl.podManager.GetPods()
		// Always add the pod to the pod manager. Kubelet relies on the pod
		// manager as the source of truth for the desired state. If a pod does
		// not exist in the pod manager, it means that it has been deleted in
		// the apiserver and no action (other than cleanup) is required.
		kl.podManager.AddPod(pod)
		...
		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
		kl.probeManager.AddPod(pod)
	}
}

The pod is first added to podManager indicating the desired state. Then actual pod creation work is dispatched to a pod worker and asynchronously handled.

The podWorkers struct has UpdatePod function which creates a goroutine handling pod-update work.

//source: https://github.com/kubernetes/kubernetes/blob/442a69c3bdf6fe8e525b05887e57d89db1e2f3a5/pkg/kubelet/pod_workers.go#L220
go func() {
	defer runtime.HandleCrash()
	p.managePodLoop(podUpdates)
}()

A syncPodFn function is used to do the actual pod operation. It is configured when kubelet starts.

//source:https://github.com/kubernetes/kubernetes/blob/442a69c3bdf6fe8e525b05887e57d89db1e2f3a5/pkg/kubelet/pod_workers.go#L175
err = p.syncPodFn(syncPodOptions{
	mirrorPod:      update.MirrorPod,
	pod:            update.Pod,
	podStatus:      status,
	killPodOptions: update.KillPodOptions,
	updateType:     update.UpdateType,
})
//source: https://github.com/kubernetes/kubernetes/blob/8c724d793370605d0c474eb6e4fb74779212ff1d/pkg/kubelet/kubelet.go#L1438
func (kl *Kubelet) syncPod(o syncPodOptions) error {
	...
	result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
	...
}
//source: https://github.com/kubernetes/kubernetes/blob/e6c67c32e140f88c923499b2a35fb96b34fdfdd2/pkg/kubelet/kuberuntime/kuberuntime_manager.go#L661
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
	...
}

The pod operation is backed by container runtime. The function of starting a container is as follows:

//source: https://github.com/kubernetes/kubernetes/blob/6d001ebb68efd8a499c07b37b9b59158ca6159c8/pkg/kubelet/kuberuntime/kuberuntime_container.go#L134
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
	container := spec.container

	// Step 1: pull the image.
	imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
	if err != nil {
		s, _ := grpcstatus.FromError(err)
		m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
		return msg, err
	}

	// Step 2: create the container.
	// For a new container, the RestartCount should be 0
	...
	containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
	if err != nil {
		s, _ := grpcstatus.FromError(err)
		m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
		return s.Message(), ErrCreateContainer
	}
	err = m.internalLifecycle.PreStartContainer(pod, container, containerID)

	// Step 3: start the container.
	err = m.runtimeService.StartContainer(containerID)
	if err != nil {
		s, _ := grpcstatus.FromError(err)
		m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", s.Message())
		return s.Message(), kubecontainer.ErrRunContainer
	}
	m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, fmt.Sprintf("Started container %s", container.Name))

	// Step 4: execute the post start hook.
	...
}

This is the lowest point of pod creation logic in kubelet. Kubelet would use grpc client to communicate with Container Runtime(e.g Docker containerd) and instruct it to do the actual container operation.

Comments