Kubernetes Project Exploration, Part 2 - kubectl/kube-apiserver mechanism analysis and source code walk-through

8 minute read

Introduction

kubectl/kube-apiserver together form the client-server communication model of Kubernetes. As interface of k8s, it is the entry point to dive into the whole system.

kubectl

Kubectl, the command line client of k8s, is usually the first tool people use to get in touch with k8s. In essence it is just an http client requesting different APIs and getting results from k8s API server.

Project Structure

In the good old days when k8s project was relatively small, kubectl source code was directly inside kubernetes main project.

For the purpose of making projects more modularized, kubectl was recently factored out of main k8s project and became a stand-alone project kubernetes/kubectl. kubernetes/pkg/kubectl/ directory now contains only code which forwards logic to kubectl library.

The kubectl project structure is as follows:

staging/src/k8s.io/kubectl/pkg
├── apply
├── apps
├── cmd
├── describe
├── drain
├── explain
├── generate
├── generated
├── metricsutil
├── polymorphichelpers
├── proxy
├── rawhttp
├── scale
├── scheme
├── util
└── validation

We can see that contents of kubectl project are basically implementations of all kinds of kubectl commands.

Process of “kubectl get”

Here is what the most common kubectl operation get looks like:

First, in k8s main project, we call NewCmdGet from kubectl library:

// source: https://github.com/kubernetes/kubernetes/blob/1b32dfdafdcd6cce21415c75385970a9ae5b0f01/pkg/kubectl/cmd/cmd.go#L432
func NewKubectlCommand(in io.Reader, out, err io.Writer) *cobra.Command {
    ...
    groups := templates.CommandGroups{
		{
			Message: "Basic Commands (Intermediate):",
			Commands: []*cobra.Command{
				explain.NewCmdExplain("kubectl", f, ioStreams),
				get.NewCmdGet("kubectl", f, ioStreams),
				edit.NewCmdEdit(f, ioStreams),
				delete.NewCmdDelete(f, ioStreams),
			},
		},
    }
    ...
    templates.ActsAsRootCommand(cmds, filters, groups...)
    ...
    return cmds
}

get is a sub-command of kubectl, so its implementation is similar to NewKubectlCommand:

func NewCmdGet(parent string, f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.Command {
	o := NewGetOptions(parent, streams)
	cmd := &cobra.Command{
		Use:                   "get [(-o|--output=)json|yaml|wide|custom-columns=...|custom-columns-file=...|go-template=...|go-template-file=...|jsonpath=...|jsonpath-file=...] (TYPE[.VERSION][.GROUP] [NAME | -l label] | TYPE[.VERSION][.GROUP]/NAME ...) [flags]",
		DisableFlagsInUseLine: true,
		Short:                 i18n.T("Display one or many resources"),
		Long:                  getLong + "\n\n" + cmdutil.SuggestAPIResources(parent),
		Example:               getExample,
		Run: func(cmd *cobra.Command, args []string) {
			cmdutil.CheckErr(o.Complete(f, cmd, args))
			cmdutil.CheckErr(o.Validate(cmd))
			cmdutil.CheckErr(o.Run(f, cmd, args))
		},
		SuggestFor: []string{"list", "ps"},
	}
    ...
	return cmd
}

The actual get operation is inside function GetOptions.Run:

//source: https://github.com/kubernetes/kubernetes/blob/b326948a9a317dbc17c6f32dfeea26e090bde3b0/staging/src/k8s.io/kubectl/pkg/cmd/get/get.go#L448
func (o *GetOptions) Run(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
	r := f.NewBuilder().
		Unstructured().
		NamespaceParam(o.Namespace).DefaultNamespace().AllNamespaces(o.AllNamespaces).
		FilenameParam(o.ExplicitNamespace, &o.FilenameOptions).
		LabelSelectorParam(o.LabelSelector).
		FieldSelectorParam(o.FieldSelector).
		RequestChunksOf(chunkSize).
		ResourceTypeOrNameArgs(true, args...).
		ContinueOnError().
		Latest().
		Flatten().
		TransformRequests(o.transformRequests).
        Do()
    ...
    infos, err := r.Infos()
    ...
    printer.PrintObj(info.Object, w)
	...

We can see that builder pattern is used. All command line options correspond to part of build pipeline.

In Do function, Visitor pattern is used. A visitor is responsible for iterating all resources fetched from API server.

//source: https://github.com/kubernetes/kubernetes/blob/c386fb09a7bde5924a07bd271f6dbb5f4e698aa8/staging/src/k8s.io/cli-runtime/pkg/resource/builder.go#L919
func (b *Builder) visitByResource() *Result {
    ...
	// retrieve one client for each resource
	mappings, err := b.resourceTupleMappings()
	if err != nil {
		result.err = err
		return result
	}
	clients := make(map[string]RESTClient)
	for _, mapping := range mappings {
		s := fmt.Sprintf("%s/%s", mapping.GroupVersionKind.GroupVersion().String(), mapping.Resource.Resource)
		if _, ok := clients[s]; ok {
			continue
		}
		client, err := b.getClient(mapping.GroupVersionKind.GroupVersion())
		if err != nil {
			result.err = err
			return result
		}
		clients[s] = client
	}
	items := []Visitor{}
	for _, tuple := range b.resourceTuples {
		mapping, ok := mappings[tuple.Resource]
		if !ok {
			return result.withError(fmt.Errorf("resource %q is not recognized: %v", tuple.Resource, mappings))
		}
		s := fmt.Sprintf("%s/%s", mapping.GroupVersionKind.GroupVersion().String(), mapping.Resource.Resource)
		client, ok := clients[s]
		if !ok {
			return result.withError(fmt.Errorf("could not find a client for resource %q", tuple.Resource))
		}
		selectorNamespace := b.namespace
		info := &Info{
			Client:    client,
			Mapping:   mapping,
			Namespace: selectorNamespace,
			Name:      tuple.Name,
		}
		items = append(items, info)
	}
	...
	result.sources = items
	return result
}

First a RESTClient is retrieved. Then use this client to fetch resource and save/return result.

RESTClient is implemented in client-go library, this library is provided to developers to write customized client on their own.

//source: https://github.com/kubernetes/kubernetes/blob/b1098bd0d53658bfb945e485683d543ab7dc73ba/staging/src/k8s.io/client-go/rest/client.go#L107
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
	if len(config.ContentType) == 0 {
		config.ContentType = "application/json"
	}

	base := *baseURL
	if !strings.HasSuffix(base.Path, "/") {
		base.Path += "/"
	}
	base.RawQuery = ""
	base.Fragment = ""

	return &RESTClient{
		base:             &base,
		versionedAPIPath: versionedAPIPath,
		content:          config,
		createBackoffMgr: readExpBackoffConfig,
		rateLimiter:      rateLimiter,

		Client: client,
	}, nil
}

In order to build a RESTClient, we need a URL(base + versionedAPIPath) and some configs(content, rateLimiter, etc.) And the client itself is based on Golang’s http.Client library.

Summary

So that’s it. We type some commands, which are converted to REST APIs. Result is fetched and stored in some resource struct, and finally printed to terminal. This is the typical workflow of kubectl.

kube-apiserver

kube-apiserver is the core communication gateway between client/k8s-cluster and components inside k8s cluster.

Overview

Let’s have a quick review of what an http server usually does:

  1. open a socket listening to incoming request;
  2. route incoming request to a proper handler;
  3. handler processes request;
  4. read/save result to persistent storage;

Kube-apiserver’s core logic is of no difference.

In addition to the above, kube-apiserver also provides detailed implementation of authN/authZ/admission that regulates incoming request.

In general, kube-apiserver’s logic can be divided into two parts: setup server, and run server. Let’s take a look at them:

Server Setup

Before API server starts to serve requests, it has to be properly setup.

What setup is needed for a server? If we recall behavior of some famous web servers like Nginx, there are 2 types of setup: handler of request, and various server configurations.

Register Handler

The handlers are installed per APIGroup. Following is the function used to install all handlers in API Group:

//source: https://github.com/kubernetes/kubernetes/blob/4362d613f243a02558f03e90b8fcb58b4c6efb06/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go#L453
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
	...
	for _, apiGroupInfo := range apiGroupInfos {
		if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
			return fmt.Errorf("unable to install api resources: %v", err)
		}
	...
	}
	return nil
}

After several levels of decomposition, a specific type of API Resource is bound to Storage via a handler. It is done through following function:

//source: https://github.com/kubernetes/kubernetes/blob/c522ee08a3d248ec1097e3673119ffa7a4e1ef7b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go#L97
func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
	var apiResources []metav1.APIResource
	var errors []error
	ws := a.newWebService()

	// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
	paths := make([]string, len(a.group.Storage))
	var i int = 0
	for path := range a.group.Storage {
		paths[i] = path
		i++
	}
	sort.Strings(paths)
	for _, path := range paths {
		apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
		if err != nil {
			errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
		}
		if apiResource != nil {
			apiResources = append(apiResources, *apiResource)
		}
	}
	return apiResources, ws, errors
}

The actual CRUD logic is implemented inside registerResourceHandlers function:

//source: https://github.com/kubernetes/kubernetes/blob/c522ee08a3d248ec1097e3673119ffa7a4e1ef7b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go#L185
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
	...
	getter, isGetter := storage.(rest.Getter)
	...
	actions := []action{}
	actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
	actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
	actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
	...
	for _, action := range actions {
		...
		routes := []*restful.RouteBuilder{}
		switch action.Verb {
		case "GET":
			if isGetterWithOptions {
				handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
			} else {
				handler = restfulGetResource(getter, exporter, reqScope)
			}
			...
			route := ws.GET(action.Path).To(handler).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
				Returns(http.StatusOK, "OK", producedObject).
				Writes(producedObject)
			...
			routes = append(routes, route)
			...
	}
	...
}

Different CRUD actions register with different handlers. All the handlers form routes which will be routed by server.

Register Filter

A configuration is defined as a filter. The setup of filters is as follows:

//source: https://github.com/kubernetes/kubernetes/blob/d74ab9e1a4929be208d4529fd12b76d3fcd5d546/staging/src/k8s.io/apiserver/pkg/server/config.go#L671
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
	handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)
	handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
	handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
	failedHandler := genericapifilters.Unauthorized(c.Serializer)
	failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)
	handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
	handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
	handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
	handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
	handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
	handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyChecker)
	handler = genericapifilters.WithWarningRecorder(handler)
	handler = genericapifilters.WithCacheControl(handler)
	handler = genericapifilters.WithRequestReceivedTimestamp(handler)
	handler = genericfilters.WithPanicRecovery(handler)
	return handler
}

In general, configuration is done by linking various filters together.

Serve

In essence, serve is the action of providing proper response given a request. Let’s see what serve process actually looks like in kube-apiserver:

//source: https://github.com/kubernetes/kubernetes/blob/13b6a929bc945f2bb97dbf7cd7f0fdd02b49bc0f/cmd/kube-apiserver/app/server.go#L161
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
	server, err := CreateServerChain(completeOptions, stopCh)
	if err != nil {
		return err
	}

	prepared, err := server.PrepareRun()
	if err != nil {
		return err
	}

	return prepared.Run(stopCh)
}

The Run function is associated with cobra.Command which forms kube-apiserver binary. It provides the main serve logic.

The process is clear in code. First do necessary configuration(register handler, filter, etc), then some preparation work, finally actually run the server.

The prepared server is run as follows:

//source: https://github.com/kubernetes/kubernetes/blob/4362d613f243a02558f03e90b8fcb58b4c6efb06/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go#L316
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
	...

	// close socket after delayed stopCh
	stoppedCh, err := s.NonBlockingRun(delayedStopCh)
	if err != nil {
		return err
	}

	<-stopCh

	// run shutdown hooks directly. This includes deregistering from the kubernetes endpoint in case of kube-apiserver.
	err = s.RunPreShutdownHooks()
	if err != nil {
		return err
	}
	...
	// Wait for all requests to finish, which are bounded by the RequestTimeout variable.
	s.HandlerChainWaitGroup.Wait()

	return nil
}

We can see that server is run in a non-blocking fashion. Also there are post-start/pre-shutdown hooks which provide extra customizability. There is also code to ensure graceful shutdown of server.

Finally, the core serving logic:

//source: https://github.com/kubernetes/kubernetes/blob/2c3687c255c014f7049eed159de30a82082656b6/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go#L147
func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error) {
	if s.Listener == nil {
		return nil, fmt.Errorf("listener must not be nil")
	}

	tlsConfig, err := s.tlsConfig(stopCh)
	if err != nil {
		return nil, err
	}

	secureServer := &http.Server{
		Addr:           s.Listener.Addr().String(),
		Handler:        handler,
		MaxHeaderBytes: 1 << 20,
		TLSConfig:      tlsConfig,
	}
	...
	// use tlsHandshakeErrorWriter to handle messages of tls handshake error
	tlsErrorWriter := &tlsHandshakeErrorWriter{os.Stderr}
	tlsErrorLogger := log.New(tlsErrorWriter, "", 0)
	secureServer.ErrorLog = tlsErrorLogger

	klog.Infof("Serving securely on %s", secureServer.Addr)
	return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
}
//source: https://github.com/kubernetes/kubernetes/blob/2c3687c255c014f7049eed159de30a82082656b6/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go#L207
func RunServer(
	server *http.Server,
	ln net.Listener,
	shutDownTimeout time.Duration,
	stopCh <-chan struct{},
) (<-chan struct{}, error) {
	...
	go func() {
		defer utilruntime.HandleCrash()

		var listener net.Listener
		listener = tcpKeepAliveListener{ln}
		if server.TLSConfig != nil {
			listener = tls.NewListener(listener, server.TLSConfig)
		}

		err := server.Serve(listener)

		msg := fmt.Sprintf("Stopped listening on %s", ln.Addr().String())
		select {
		case <-stopCh:
			klog.Info(msg)
		default:
			panic(fmt.Sprintf("%s due to error: %v", msg, err))
		}
	}()
	return stoppedCh, nil
}

Apart from auxiliary code such as TLS config and graceful-shutdown, we can see that core serving logic is straight forward. Golang’s default http.Server is used to handle incoming request. Whenever there is a new connection, a new go routine is created to serve it. No thread pool, no task queue, just pure concurrency.

Why so simple? Because goroutine hides most concurrency details for us. Unlike an OS thread which is quite primitive and bare-metal, goroutine implements a user space “green-thread” which has many powerful features. Examples are channels for thread communication, goroutine scheduling/multiplex, etc.

Thus, user can write simple/clean concurrency code without trapped in multi-threading messes. That may be the reason why golang is such popular in distributed system and the cornerstone projects like Docker/Kubernetes are all written in go.

Comments