Kubernetes Project Exploration, Part 4 - Kubernetes device plugin framework and implementation of NVIDIA device plugin

7 minute read

Introduction

Kubernetes is largely a resource manager for cluster. Among all hardware resources, only CPU and memory are natively supported.

Since there are lots of hardware vendors, it is unrealistic to add each hardware’s specific code to k8s main project.

Thus, a device plugin framework is developed to provide support for different devices. Hardware vendors need to implement “driver” for their devices to run on k8s cluster.

Use of device plugin

A device plugin is deployed as a DaemonSet on each node. It monitors device on each node and collaborates with kubelet to run a container with device enabled.

A sample yaml file for NVIDIA GPU device plugin is as follows:

apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
spec:
    template:
        metadata:
            labels:
                - name: device-plugin
        spec:
            containers:
                name: device-plugin-ctr
                image: NVIDIA/device-plugin:1.0
                volumeMounts:
                  - mountPath: /device-plugin
                  - name: device-plugin
            volumes:
             - name: device-plugin
               hostPath:
                   path: /var/lib/kubelet/device-plugins

After device plugin is deployed, a pod can request device from cluster, just as other types of resources. Sample YAML file is as follows:

apiVersion: v1
kind: Pod
metadata:
  name: demo-pod
spec:
  containers:
    - name: demo-container-1
      image: k8s.gcr.io/pause:2.0
      resources:
        limits:
          nvidia.com/gpu: 2

Device plugin framework

Device plugin framework is implemented in kubelet to provide support of managing different device plugins. It does following work:

  1. Register device plugin.
  2. Allocate device to container.

plugin registration

A device plugin has to be registered to kubelet first in order for it to be used.

The registration communication is done via grpc. Plugin framework serves as a grpc server and device plugin is the client.

The registration protobuf message is as follows:

//source: https://github.com/kubernetes/kubernetes/blob/296f7c91bb52cd724ce6d6d120d5d41ed459d677/staging/src/k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1/api.proto#L23
service Registration {
	rpc Register(RegisterRequest) returns (Empty) {}
}

message RegisterRequest {
	// Version of the API the Device Plugin was built against
	string version = 1;
	// Name of the unix socket the device plugin is listening on
	// PATH = path.Join(DevicePluginPath, endpoint)
	string endpoint = 2;
	// Schedulable resource name. As of now it's expected to be a DNS Label
	string resource_name = 3;
	// Options to be communicated with Device Manager
	DevicePluginOptions options = 4;
}

From RegisterRequest message we can see that a version, a grpc endpoint, a resource name and some options are needed to represent a device plugin.

Register method is implemented in plugin framework:

//source: https://github.com/kubernetes/kubernetes/blob/4eadf404480e0653e29a9367841080d94ea4017c/pkg/kubelet/cm/devicemanager/manager.go#L312
func (m *ManagerImpl) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
	klog.V(2).Infof("Registering Plugin %s at endpoint %s", pluginName, endpoint)

	e, err := newEndpointImpl(endpoint, pluginName, m.callback)
	if err != nil {
		return fmt.Errorf("failed to dial device plugin with socketPath %s: %v", endpoint, err)
	}

	options, err := e.client.GetDevicePluginOptions(context.Background(), &pluginapi.Empty{})
	if err != nil {
		return fmt.Errorf("failed to get device plugin options: %v", err)
	}

	m.registerEndpoint(pluginName, options, e)
	go m.runEndpoint(pluginName, e)

	return nil
}

In general, an endpoint is registered and run for the plugin. The term endpoint represents a single registered device plugin. Its definition is as follows:

//source: https://github.com/kubernetes/kubernetes/blob/abf87c99c63984ba426239e0aed657bf9a8a9054/pkg/kubelet/cm/devicemanager/endpoint.go#L35
type endpoint interface {
	run()
	stop()
	allocate(devs []string) (*pluginapi.AllocateResponse, error)
    callback(resourceName string, devices []pluginapi.Device)
    ...
}

type endpointImpl struct {
	client     pluginapi.DevicePluginClient
	clientConn *grpc.ClientConn

	socketPath   string
	resourceName string

	mutex sync.Mutex
	cb    monitorCallback
}

It contains necessary methods/fields for plugin framework to communicate with plugin.

device discovery

After plugin is registered, kubelet will start watching for device changes. The logic is as follows:

//source: https://github.com/kubernetes/kubernetes/blob/abf87c99c63984ba426239e0aed657bf9a8a9054/pkg/kubelet/cm/devicemanager/endpoint.go#L96
func (e *endpointImpl) run() {
	stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
	for {
		response, err := stream.Recv()
		devs := response.Devices
		var newDevs []pluginapi.Device
		for _, d := range devs {
			newDevs = append(newDevs, *d)
		}

		e.callback(e.resourceName, newDevs)
	}
}

Device is watched through ListAndWatch grpc call. Whenever there is device changes, it is received by kubelet and a callback is called to record device info.

device allocation

The key usage of a device plugin is to have device allocated to a container.

It is done through endpoint::allocate function:

// allocate issues Allocate gRPC call to the device plugin.
func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
	return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{
		ContainerRequests: []*pluginapi.ContainerAllocateRequest{
			{DevicesIDs: devs},
		},
	})
}

The function sends grpc request with ContainerAllocateRequest and return with ContainerAllocateResponse. They are both protobuf message, definition is as follows:

// source: https://github.com/kubernetes/kubernetes/blob/296f7c91bb52cd724ce6d6d120d5d41ed459d677/staging/src/k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1/api.proto#L162
message ContainerAllocateRequest {
	repeated string devicesIDs = 1;
}
message ContainerAllocateResponse {
  	// List of environment variable to be set in the container to access one of more devices.
	map<string, string> envs = 1;
	// Mounts for the container.
	repeated Mount mounts = 2;
	// Devices for the container.
	repeated DeviceSpec devices = 3;
	// Container annotations to pass to the container runtime
	map<string, string> annotations = 4;
}

The request message is simple: if container want to use some device, it just sends device’s ID to the plugin.

The response message indicates how to use this device in container.

DeviceSpec specifies core device attribute including path/permission/etc.

mounts is needed for device driver/library to be mounted into container.

Also, environment variable/annotations help access of device in container as well.

nvidia device plugin

Introduction

NVIDIA’s k8s device plugin is crucial for bringing GPU workload to Kubernetes. It serves similar purpose as nvidia-docker.

what nvidia-device-plugin does

In general, nvidia-device-plugin is a GPU device manager for Kubernetes cluster. It will:

  1. respond to grpc requests from plugin-framework;
  2. monitor all gpus on node;
  3. return device for allocation;

Project Architecture

server.go

nvidia-device-plugin is basically a grpc server. server.go implements all rpc functions and server related logic.

nvidia.go

The operations of GPU are implemented in this file which are used by server.go.

gpu-monitoring-tools

This project provides golang bindings of lower level management libraries which is used by nvidia-device-plugin project.

NVML(NVIDIA Management Library)

This library provides C-API for monitoring/management of NVIDIA GPU devices. It is used by nvidia-smi and other libraries including gpu-monitoring-tools.

ListAndWatch

This function monitors GPU devices on node.

//source: https://gitlab.com/nvidia/kubernetes/device-plugin/blob/4167bfd7fdfdbec6a5378af3589650714cf2ab3f/server.go#L218
func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
    s.Send(&pluginapi.ListAndWatchResponse{Devices: m.apiDevices()})
	for {
		select {
		case <-m.stop:
			return nil
		case d := <-m.health:
			d.Health = pluginapi.Unhealthy
			log.Printf("'%s' device marked unhealthy: %s", m.resourceName, d.ID)
			s.Send(&pluginapi.ListAndWatchResponse{Devices: m.apiDevices()})
		}
	}
}

The logic is clear. The function keeps checking state of devices. Whenever there is an update of health, a notification is sent to kubelet through grpc.

The health check is as follows:

//source: https://gitlab.com/nvidia/kubernetes/device-plugin/blob/4167bfd7fdfdbec6a5378af3589650714cf2ab3f/nvidia.go#L159
func checkHealth(stop <-chan interface{}, devices []*Device, unhealthy chan<- *Device) {
	...
	for _, d := range devices {
		gpu, _, _, err := nvml.ParseMigDeviceUUID(d.ID)
		if err != nil {
			gpu = d.ID
		}
		err = nvml.RegisterEventForDevice(eventSet, nvml.XidCriticalError, gpu)
		if err != nil && strings.HasSuffix(err.Error(), "Not Supported") {
			log.Printf("Warning: %s is too old to support healthchecking: %s. Marking it unhealthy.", d.ID, err)
			unhealthy <- d
			continue
		}
	}

	for {
		e, err := nvml.WaitForEvent(eventSet, 5000)
		if err != nil && e.Etype != nvml.XidCriticalError {
			continue
		}
		...
		for _, d := range devices {
			// Please see https://github.com/NVIDIA/gpu-monitoring-tools/blob/148415f505c96052cb3b7fdf443b34ac853139ec/bindings/go/nvml/nvml.h#L1424
			// for the rationale why gi and ci can be set as such when the UUID is a full GPU UUID and not a MIG device UUID.
			gpu, gi, ci, err := nvml.ParseMigDeviceUUID(d.ID)
			if err != nil {
				gpu = d.ID
				gi = 0xFFFFFFFF
				ci = 0xFFFFFFFF
			}

			if gpu == *e.UUID && gi == *e.GpuInstanceId && ci == *e.ComputeInstanceId {
				log.Printf("XidCriticalError: Xid=%d on Device=%s, the device will go unhealthy.", e.Edata, d.ID)
				unhealthy <- d
			}
		}
	}
}

The GPU event is registered and monitored through NVML’s binding function. When there is an unhealthy-GPU event, it is reported back to ListAndWatch through unhealthy channel.

Allocate

Let’s see what info is sent back to kubelet in order for container to use GPU device:

//source: https://gitlab.com/nvidia/kubernetes/device-plugin/blob/4167bfd7fdfdbec6a5378af3589650714cf2ab3f/server.go#L265
func (m *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
	responses := pluginapi.AllocateResponse{}
	for _, req := range reqs.ContainerRequests {
		for _, id := range req.DevicesIDs {
			if !m.deviceExists(id) {
				return nil, fmt.Errorf("invalid allocation request for '%s': unknown device: %s", m.resourceName, id)
			}
		}
		response := pluginapi.ContainerAllocateResponse{}
		if *deviceListStrategyFlag == DeviceListStrategyVolumeMounts {
			response.Envs = m.apiEnvs(m.deviceListEnvvar, []string{deviceListAsVolumeMountsContainerPathRoot})
			response.Mounts = m.apiMounts(req.DevicesIDs)
		}
		if *passDeviceSpecs {
			response.Devices = m.apiDeviceSpecs(req.DevicesIDs)
		}
		responses.ContainerResponses = append(responses.ContainerResponses, &response)
	}
	return &responses, nil
}

As code indicates, DeviceSpec, Mount and Envs consist of response.

Here is what DeviceSpec looks like:

//source: https://gitlab.com/nvidia/kubernetes/device-plugin/blob/4167bfd7fdfdbec6a5378af3589650714cf2ab3f/server.go#L351
func (m *NvidiaDevicePlugin) apiDeviceSpecs(filter []string) []*pluginapi.DeviceSpec {
	var specs []*pluginapi.DeviceSpec

	paths := []string{
		"/dev/nvidiactl",
		"/dev/nvidia-uvm",
		"/dev/nvidia-uvm-tools",
		"/dev/nvidia-modeset",
	}

	for _, p := range paths {
		if _, err := os.Stat(p); err == nil {
			spec := &pluginapi.DeviceSpec{
				ContainerPath: p,
				HostPath:      p,
				Permissions:   "rw",
			}
			specs = append(specs, spec)
		}
	}
    ...
	return specs
}

We can see that all NVIDIA related device paths have been attached to response. Kubelet will mount all these paths to enable GPU inside container.

Comments