Kubernetes Project Exploration, Part 2 - kubectl/kube-apiserver mechanism analysis and source code walk-through
Introduction
kubectl/kube-apiserver together form the client-server communication model of Kubernetes. As interface of k8s, it is the entry point to dive into the whole system.
kubectl
Kubectl, the command line client of k8s, is usually the first tool people use to get in touch with k8s. In essence it is just an http client requesting different APIs and getting results from k8s API server.
Project Structure
In the good old days when k8s project was relatively small, kubectl source code was directly inside kubernetes main project.
For the purpose of making projects more modularized, kubectl was recently factored out of main k8s project and became a stand-alone project kubernetes/kubectl. kubernetes/pkg/kubectl/
directory now contains only code which forwards logic to kubectl library.
The kubectl project structure is as follows:
staging/src/k8s.io/kubectl/pkg
├── apply
├── apps
├── cmd
├── describe
├── drain
├── explain
├── generate
├── generated
├── metricsutil
├── polymorphichelpers
├── proxy
├── rawhttp
├── scale
├── scheme
├── util
└── validation
We can see that contents of kubectl project are basically implementations of all kinds of kubectl commands.
Process of “kubectl get”
Here is what the most common kubectl operation get
looks like:
First, in k8s main project, we call NewCmdGet
from kubectl library:
// source: https://github.com/kubernetes/kubernetes/blob/1b32dfdafdcd6cce21415c75385970a9ae5b0f01/pkg/kubectl/cmd/cmd.go#L432
func NewKubectlCommand(in io.Reader, out, err io.Writer) *cobra.Command {
...
groups := templates.CommandGroups{
{
Message: "Basic Commands (Intermediate):",
Commands: []*cobra.Command{
explain.NewCmdExplain("kubectl", f, ioStreams),
get.NewCmdGet("kubectl", f, ioStreams),
edit.NewCmdEdit(f, ioStreams),
delete.NewCmdDelete(f, ioStreams),
},
},
}
...
templates.ActsAsRootCommand(cmds, filters, groups...)
...
return cmds
}
get
is a sub-command of kubectl
, so its implementation is similar to NewKubectlCommand
:
func NewCmdGet(parent string, f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.Command {
o := NewGetOptions(parent, streams)
cmd := &cobra.Command{
Use: "get [(-o|--output=)json|yaml|wide|custom-columns=...|custom-columns-file=...|go-template=...|go-template-file=...|jsonpath=...|jsonpath-file=...] (TYPE[.VERSION][.GROUP] [NAME | -l label] | TYPE[.VERSION][.GROUP]/NAME ...) [flags]",
DisableFlagsInUseLine: true,
Short: i18n.T("Display one or many resources"),
Long: getLong + "\n\n" + cmdutil.SuggestAPIResources(parent),
Example: getExample,
Run: func(cmd *cobra.Command, args []string) {
cmdutil.CheckErr(o.Complete(f, cmd, args))
cmdutil.CheckErr(o.Validate(cmd))
cmdutil.CheckErr(o.Run(f, cmd, args))
},
SuggestFor: []string{"list", "ps"},
}
...
return cmd
}
The actual get operation is inside function GetOptions.Run
:
//source: https://github.com/kubernetes/kubernetes/blob/b326948a9a317dbc17c6f32dfeea26e090bde3b0/staging/src/k8s.io/kubectl/pkg/cmd/get/get.go#L448
func (o *GetOptions) Run(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
r := f.NewBuilder().
Unstructured().
NamespaceParam(o.Namespace).DefaultNamespace().AllNamespaces(o.AllNamespaces).
FilenameParam(o.ExplicitNamespace, &o.FilenameOptions).
LabelSelectorParam(o.LabelSelector).
FieldSelectorParam(o.FieldSelector).
RequestChunksOf(chunkSize).
ResourceTypeOrNameArgs(true, args...).
ContinueOnError().
Latest().
Flatten().
TransformRequests(o.transformRequests).
Do()
...
infos, err := r.Infos()
...
printer.PrintObj(info.Object, w)
...
We can see that builder
pattern is used. All command line options correspond to part of build pipeline.
In Do
function, Visitor
pattern is used. A visitor is responsible for iterating all resources fetched from API server.
//source: https://github.com/kubernetes/kubernetes/blob/c386fb09a7bde5924a07bd271f6dbb5f4e698aa8/staging/src/k8s.io/cli-runtime/pkg/resource/builder.go#L919
func (b *Builder) visitByResource() *Result {
...
// retrieve one client for each resource
mappings, err := b.resourceTupleMappings()
if err != nil {
result.err = err
return result
}
clients := make(map[string]RESTClient)
for _, mapping := range mappings {
s := fmt.Sprintf("%s/%s", mapping.GroupVersionKind.GroupVersion().String(), mapping.Resource.Resource)
if _, ok := clients[s]; ok {
continue
}
client, err := b.getClient(mapping.GroupVersionKind.GroupVersion())
if err != nil {
result.err = err
return result
}
clients[s] = client
}
items := []Visitor{}
for _, tuple := range b.resourceTuples {
mapping, ok := mappings[tuple.Resource]
if !ok {
return result.withError(fmt.Errorf("resource %q is not recognized: %v", tuple.Resource, mappings))
}
s := fmt.Sprintf("%s/%s", mapping.GroupVersionKind.GroupVersion().String(), mapping.Resource.Resource)
client, ok := clients[s]
if !ok {
return result.withError(fmt.Errorf("could not find a client for resource %q", tuple.Resource))
}
selectorNamespace := b.namespace
info := &Info{
Client: client,
Mapping: mapping,
Namespace: selectorNamespace,
Name: tuple.Name,
}
items = append(items, info)
}
...
result.sources = items
return result
}
First a RESTClient
is retrieved. Then use this client to fetch resource and save/return result.
RESTClient
is implemented in client-go
library, this library is provided to developers to write customized client on their own.
//source: https://github.com/kubernetes/kubernetes/blob/b1098bd0d53658bfb945e485683d543ab7dc73ba/staging/src/k8s.io/client-go/rest/client.go#L107
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
base := *baseURL
if !strings.HasSuffix(base.Path, "/") {
base.Path += "/"
}
base.RawQuery = ""
base.Fragment = ""
return &RESTClient{
base: &base,
versionedAPIPath: versionedAPIPath,
content: config,
createBackoffMgr: readExpBackoffConfig,
rateLimiter: rateLimiter,
Client: client,
}, nil
}
In order to build a RESTClient, we need a URL(base + versionedAPIPath) and some configs(content, rateLimiter, etc.) And the client itself is based on Golang’s http.Client library.
Summary
So that’s it. We type some commands, which are converted to REST APIs. Result is fetched and stored in some resource struct, and finally printed to terminal. This is the typical workflow of kubectl
.
kube-apiserver
kube-apiserver is the core communication gateway between client/k8s-cluster and components inside k8s cluster.
Overview
Let’s have a quick review of what an http server usually does:
- open a socket listening to incoming request;
- route incoming request to a proper handler;
- handler processes request;
- read/save result to persistent storage;
Kube-apiserver’s core logic is of no difference.
In addition to the above, kube-apiserver also provides detailed implementation of authN/authZ/admission that regulates incoming request.
In general, kube-apiserver’s logic can be divided into two parts: setup server, and run server. Let’s take a look at them:
Server Setup
Before API server starts to serve requests, it has to be properly setup.
What setup is needed for a server? If we recall behavior of some famous web servers like Nginx, there are 2 types of setup: handler of request, and various server configurations.
Register Handler
The handlers are installed per APIGroup. Following is the function used to install all handlers in API Group:
//source: https://github.com/kubernetes/kubernetes/blob/4362d613f243a02558f03e90b8fcb58b4c6efb06/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go#L453
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
...
for _, apiGroupInfo := range apiGroupInfos {
if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
return fmt.Errorf("unable to install api resources: %v", err)
}
...
}
return nil
}
After several levels of decomposition, a specific type of API Resource is bound to Storage via a handler. It is done through following function:
//source: https://github.com/kubernetes/kubernetes/blob/c522ee08a3d248ec1097e3673119ffa7a4e1ef7b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go#L97
func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
var apiResources []metav1.APIResource
var errors []error
ws := a.newWebService()
// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
paths := make([]string, len(a.group.Storage))
var i int = 0
for path := range a.group.Storage {
paths[i] = path
i++
}
sort.Strings(paths)
for _, path := range paths {
apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
if err != nil {
errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
}
if apiResource != nil {
apiResources = append(apiResources, *apiResource)
}
}
return apiResources, ws, errors
}
The actual CRUD logic is implemented inside registerResourceHandlers
function:
//source: https://github.com/kubernetes/kubernetes/blob/c522ee08a3d248ec1097e3673119ffa7a4e1ef7b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go#L185
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
...
getter, isGetter := storage.(rest.Getter)
...
actions := []action{}
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
...
for _, action := range actions {
...
routes := []*restful.RouteBuilder{}
switch action.Verb {
case "GET":
if isGetterWithOptions {
handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
} else {
handler = restfulGetResource(getter, exporter, reqScope)
}
...
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Writes(producedObject)
...
routes = append(routes, route)
...
}
...
}
Different CRUD actions register with different handlers. All the handlers form routes
which will be routed by server.
Register Filter
A configuration is defined as a filter
. The setup of filters is as follows:
//source: https://github.com/kubernetes/kubernetes/blob/d74ab9e1a4929be208d4529fd12b76d3fcd5d546/staging/src/k8s.io/apiserver/pkg/server/config.go#L671
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
failedHandler := genericapifilters.Unauthorized(c.Serializer)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyChecker)
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
handler = genericfilters.WithPanicRecovery(handler)
return handler
}
In general, configuration is done by linking various filters together.
Serve
In essence, serve is the action of providing proper response given a request. Let’s see what serve process actually looks like in kube-apiserver:
//source: https://github.com/kubernetes/kubernetes/blob/13b6a929bc945f2bb97dbf7cd7f0fdd02b49bc0f/cmd/kube-apiserver/app/server.go#L161
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
server, err := CreateServerChain(completeOptions, stopCh)
if err != nil {
return err
}
prepared, err := server.PrepareRun()
if err != nil {
return err
}
return prepared.Run(stopCh)
}
The Run
function is associated with cobra.Command which forms kube-apiserver binary. It provides the main serve logic.
The process is clear in code. First do necessary configuration(register handler, filter, etc), then some preparation work, finally actually run the server.
The prepared server is run as follows:
//source: https://github.com/kubernetes/kubernetes/blob/4362d613f243a02558f03e90b8fcb58b4c6efb06/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go#L316
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
...
// close socket after delayed stopCh
stoppedCh, err := s.NonBlockingRun(delayedStopCh)
if err != nil {
return err
}
<-stopCh
// run shutdown hooks directly. This includes deregistering from the kubernetes endpoint in case of kube-apiserver.
err = s.RunPreShutdownHooks()
if err != nil {
return err
}
...
// Wait for all requests to finish, which are bounded by the RequestTimeout variable.
s.HandlerChainWaitGroup.Wait()
return nil
}
We can see that server is run in a non-blocking fashion. Also there are post-start/pre-shutdown hooks which provide extra customizability. There is also code to ensure graceful shutdown of server.
Finally, the core serving logic:
//source: https://github.com/kubernetes/kubernetes/blob/2c3687c255c014f7049eed159de30a82082656b6/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go#L147
func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error) {
if s.Listener == nil {
return nil, fmt.Errorf("listener must not be nil")
}
tlsConfig, err := s.tlsConfig(stopCh)
if err != nil {
return nil, err
}
secureServer := &http.Server{
Addr: s.Listener.Addr().String(),
Handler: handler,
MaxHeaderBytes: 1 << 20,
TLSConfig: tlsConfig,
}
...
// use tlsHandshakeErrorWriter to handle messages of tls handshake error
tlsErrorWriter := &tlsHandshakeErrorWriter{os.Stderr}
tlsErrorLogger := log.New(tlsErrorWriter, "", 0)
secureServer.ErrorLog = tlsErrorLogger
klog.Infof("Serving securely on %s", secureServer.Addr)
return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
}
//source: https://github.com/kubernetes/kubernetes/blob/2c3687c255c014f7049eed159de30a82082656b6/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go#L207
func RunServer(
server *http.Server,
ln net.Listener,
shutDownTimeout time.Duration,
stopCh <-chan struct{},
) (<-chan struct{}, error) {
...
go func() {
defer utilruntime.HandleCrash()
var listener net.Listener
listener = tcpKeepAliveListener{ln}
if server.TLSConfig != nil {
listener = tls.NewListener(listener, server.TLSConfig)
}
err := server.Serve(listener)
msg := fmt.Sprintf("Stopped listening on %s", ln.Addr().String())
select {
case <-stopCh:
klog.Info(msg)
default:
panic(fmt.Sprintf("%s due to error: %v", msg, err))
}
}()
return stoppedCh, nil
}
Apart from auxiliary code such as TLS config and graceful-shutdown, we can see that core serving logic is straight forward. Golang’s default http.Server
is used to handle incoming request. Whenever there is a new connection, a new go routine is created to serve it. No thread pool, no task queue, just pure concurrency.
Why so simple? Because goroutine
hides most concurrency details for us. Unlike an OS thread which is quite primitive and bare-metal, goroutine implements a user space “green-thread” which has many powerful features. Examples are channels for thread communication, goroutine scheduling/multiplex, etc.
Thus, user can write simple/clean concurrency code without trapped in multi-threading messes. That may be the reason why golang is such popular in distributed system and the cornerstone projects like Docker/Kubernetes are all written in go.
Comments