Kubernetes源码分析-apiserver

名词解释

API Object:Kubernetes 内部管理的基本元素,是 K8S 在 ETCD 中信息存储单元。例如 Deployment,Pod,Service,都是 API Object。代码内部常用 API 来表示它们。

API Group:一组 API Object 组成的一个具有共有性质的对象集合。例如 apps group,由 Deployment,ReplicaSet,StatefulSet 等等 API Object 组成。

Legacy API Object:在 K8S 项目初始阶段所引入的 API Object 没有显式定义在 API Group 下面,例如 Pod,Event,Node 等等。在代码中有时也称他们为 core API object。

apiserver

Tip 如何调试

关于如何源码调试 apiserver 的源码,请看附录

启动 apiserver 时会携带许多参数,但是大致可以分为如下几类:

  • Generic flags:通用参数。
  • Etcd flags:Etcd存储相关参数。
  • Secure serving flags:HTTPS服务相关参数。
  • Insecure serving flags:HTTP服务相关参数。
  • Auditing flags:审计相关参数。
  • Features flags:新特性相关参数。
  • Authentication flags:认证相关参数。
  • Authorization flags:授权相关参数。
  • Cloud provider flags:云服务提供商相关参数。
  • Api enablement flags:控制开启/禁用特定的资源版本或资源参数。
  • Admission flags:准入控制器相关参数。
  • Misc flags:其他参数。
  • Global flags:全局参数。

启动流程

api-server 是控制平面的一个组件,对应启动的代码在 cmd/kube-apiserver/apiserver.go 文件里。本质上是一个 cobra 的 cli 应用程序,解析参数,配置 server 并启动。

// Run runs the specified APIServer.  This should never exit.
func Run(ctx context.Context, opts options.CompletedOptions) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", utilversion.Get())

klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))

config, err := NewConfig(opts)
if err != nil {
return err
}
completed, err := config.Complete()
if err != nil {
return err
}
server, err := CreateServerChain(completed)
if err != nil {
return err
}

prepared, err := server.PrepareRun()
if err != nil {
return err
}

return prepared.Run(ctx)
}

options 是提供给用户来进行自定义配置的,基于 options 可以构建出一个 Config 来指定 server 的各种状态。config 构造完成之后,options 就没有用处了,用 config 来构造 server,对应方法 CreateServerChain。主要由四个组件,Aggregation Server、Master、Extension Server、Not Found Handler。构建过程是从右到左,请求流程是从左到右。

  • Aggregation Server:负责转发请求到 Master 或 Custom API Server;
  • Master:Kube API Server,负责 Build-in 的 API Object 相关处理;
  • Extension Server:Customer Resource 的处理由它完成。包括 CR 和 CRD;
  • Not Found Handler:找不到对应的 API Object 返回;
func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
notFoundHandler := notfoundhandler.New(config.KubeAPIs.ControlPlane.Generic.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
crdAPIEnabled := config.ApiExtensions.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))

kubeAPIServer, err := config.KubeAPIs.New(apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}

// aggregator comes last in the chain
aggregatorServer, err := controlplaneapiserver.CreateAggregatorServer(config.Aggregator, kubeAPIServer.ControlPlane.GenericAPIServer, apiExtensionsServer.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdAPIEnabled, apiVersionPriorities)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
}

return aggregatorServer, nil
}

kubeAPIServer

kubeAPIServer 也就是 master server 来接受 RESTful 请求,构造处理一些 API Object 资源。config.KubeAPIs.New 就是在构建 kubeAPIServer,构建 StorageProviders,Install 对应的 APIInfo。这里就区分 internal API 和 Legacy API,对应两种不同的 StorageProviders,主要的职责是和 etcd 进行数据交互,并且处理对应的 RESTful API 请求。对每一个 Provider 都会生成一个 APIGroupInfo,最后 Install 整个 APIGroupInfo。这部分的内容和 Generic Server 有关,这里先大致叙述一下整个流程,不做详细描述。

func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
	// ...
	restStorageProviders, err := c.StorageProviders(client)
	if err != nil {
		return nil, err
	}

	if err := s.ControlPlane.InstallAPIs(restStorageProviders...); err != nil {
		return nil, err
	}

	// ...

	return s, nil
}
func (c CompletedConfig) StorageProviders(client *kubernetes.Clientset) ([]controlplaneapiserver.RESTStorageProvider, error) {
	legacyRESTStorageProvider, err := corerest.New(corerest.Config{
		GenericConfig: *c.ControlPlane.NewCoreGenericConfig(),
		Proxy: corerest.ProxyConfig{
			Transport:           c.ControlPlane.Extra.ProxyTransport,
			KubeletClientConfig: c.Extra.KubeletClientConfig,
		},
		Services: corerest.ServicesConfig{
			ClusterIPRange:          c.Extra.ServiceIPRange,
			SecondaryClusterIPRange: c.Extra.SecondaryServiceIPRange,
			NodePortRange:           c.Extra.ServiceNodePortRange,
			IPRepairInterval:        c.Extra.RepairServicesInterval,
		},
	})
	if err != nil {
		return nil, err
	}

	// The order here is preserved in discovery.
	// If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
	// the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
	// This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go
	// with specific priorities.
	// TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
	// handlers that we have.
	return []controlplaneapiserver.RESTStorageProvider{
		legacyRESTStorageProvider,
		apiserverinternalrest.StorageProvider{},
		authenticationrest.RESTStorageProvider{Authenticator: c.ControlPlane.Generic.Authentication.Authenticator, APIAudiences: c.ControlPlane.Generic.Authentication.APIAudiences},
		authorizationrest.RESTStorageProvider{Authorizer: c.ControlPlane.Generic.Authorization.Authorizer, RuleResolver: c.ControlPlane.Generic.RuleResolver},
		autoscalingrest.RESTStorageProvider{},
		batchrest.RESTStorageProvider{},
		certificatesrest.RESTStorageProvider{},
		coordinationrest.RESTStorageProvider{},
		discoveryrest.StorageProvider{},
		networkingrest.RESTStorageProvider{},
		noderest.RESTStorageProvider{},
		policyrest.RESTStorageProvider{},
		rbacrest.RESTStorageProvider{Authorizer: c.ControlPlane.Generic.Authorization.Authorizer},
		schedulingrest.RESTStorageProvider{},
		storagerest.RESTStorageProvider{},
		svmrest.RESTStorageProvider{},
		flowcontrolrest.RESTStorageProvider{InformerFactory: c.ControlPlane.Generic.SharedInformerFactory},
		// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
		// See https://github.com/kubernetes/kubernetes/issues/42392
		appsrest.StorageProvider{},
		admissionregistrationrest.RESTStorageProvider{Authorizer: c.ControlPlane.Generic.Authorization.Authorizer, DiscoveryClient: client.Discovery()},
		eventsrest.RESTStorageProvider{TTL: c.ControlPlane.EventTTL},
		resourcerest.RESTStorageProvider{NamespaceClient: client.CoreV1().Namespaces()},
	}, nil
}
func (s *Server) InstallAPIs(restStorageProviders ...RESTStorageProvider) error {
	nonLegacy := []*genericapiserver.APIGroupInfo{}

	// used later in the loop to filter the served resource by those that have expired.
	resourceExpirationEvaluatorOpts := genericapiserver.ResourceExpirationEvaluatorOptions{
		CurrentVersion:                          s.GenericAPIServer.EffectiveVersion.EmulationVersion(),
		Prerelease:                              s.GenericAPIServer.EffectiveVersion.BinaryVersion().PreRelease(),
		EmulationForwardCompatible:              s.GenericAPIServer.EmulationForwardCompatible,
		RuntimeConfigEmulationForwardCompatible: s.GenericAPIServer.RuntimeConfigEmulationForwardCompatible,
	}
	resourceExpirationEvaluator, err := genericapiserver.NewResourceExpirationEvaluatorFromOptions(resourceExpirationEvaluatorOpts)
	if err != nil {
		return err
	}

	for _, restStorageBuilder := range restStorageProviders {
		groupName := restStorageBuilder.GroupName()
		apiGroupInfo, err := restStorageBuilder.NewRESTStorage(s.APIResourceConfigSource, s.RESTOptionsGetter)
		if err != nil {
			return fmt.Errorf("problem initializing API group %q: %w", groupName, err)
		}
		if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 {
			// If we have no storage for any resource configured, this API group is effectively disabled.
			// This can happen when an entire API group, version, or development-stage (alpha, beta, GA) is disabled.
			klog.Infof("API group %q is not enabled, skipping.", groupName)
			continue
		}

		// Remove resources that serving kinds that are removed or not introduced yet at the current version.
		// We do this here so that we don't accidentally serve versions without resources or openapi information that for kinds we don't serve.
		// This is a spot above the construction of individual storage handlers so that no sig accidentally forgets to check.
		err = resourceExpirationEvaluator.RemoveUnavailableKinds(groupName, apiGroupInfo.Scheme, apiGroupInfo.VersionedResourcesStorageMap, s.APIResourceConfigSource)
		if err != nil {
			return err
		}
		if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 {
			klog.V(1).Infof("Removing API group %v because it is time to stop serving it because it has no versions per APILifecycle.", groupName)
			continue
		}

		klog.V(1).Infof("Enabling API group %q.", groupName)

		if postHookProvider, ok := restStorageBuilder.(genericapiserver.PostStartHookProvider); ok {
			name, hook, err := postHookProvider.PostStartHook()
			if err != nil {
				return fmt.Errorf("error building PostStartHook: %w", err)
			}
			s.GenericAPIServer.AddPostStartHookOrDie(name, hook)
		}

		if len(groupName) == 0 {
			// the legacy group for core APIs is special that it is installed into /api via this special install method.
			if err := s.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
				return fmt.Errorf("error in registering legacy API: %w", err)
			}
		} else {
			// everything else goes to /apis
			nonLegacy = append(nonLegacy, &apiGroupInfo)
		}
	}

	if err := s.GenericAPIServer.InstallAPIGroups(nonLegacy...); err != nil {
		return fmt.Errorf("error in registering group versions: %w", err)
	}
	return nil
}

scheme

schema 是 kubernetes 资源管理的核心数据结构。由以前文章我们了解到 kubernetes 会将其管理的资源划分为 group/version/kind 的概念,scheme 可以利用进行 GVK 进行一下操作:

  • 可以将资源在内部版本和其他版本中相互转化
  • 可以序列化和反序列化的过程中识别资源类型,创建资源对象,设置默认值等等。

server.go 里没有看到 scheme 的构造,以及怎么把对应的 API Object 信息填入 scheme 中。在 server.go 中有一句 import:

import (
// ...
"k8s.io/kubernetes/pkg/controlplane"
// ...
)

pkg/controlplane/import_known_versions.go 中写了许多导包语句:

import (
// These imports are the API groups the API server will support.
_ "k8s.io/kubernetes/pkg/apis/admission/install"
_ "k8s.io/kubernetes/pkg/apis/admissionregistration/install"
_ "k8s.io/kubernetes/pkg/apis/apiserverinternal/install"
_ "k8s.io/kubernetes/pkg/apis/apps/install"
_ "k8s.io/kubernetes/pkg/apis/authentication/install"
_ "k8s.io/kubernetes/pkg/apis/authorization/install"
_ "k8s.io/kubernetes/pkg/apis/autoscaling/install"
_ "k8s.io/kubernetes/pkg/apis/batch/install"
_ "k8s.io/kubernetes/pkg/apis/certificates/install"
_ "k8s.io/kubernetes/pkg/apis/coordination/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
_ "k8s.io/kubernetes/pkg/apis/discovery/install"
_ "k8s.io/kubernetes/pkg/apis/events/install"
_ "k8s.io/kubernetes/pkg/apis/extensions/install"
_ "k8s.io/kubernetes/pkg/apis/flowcontrol/install"
_ "k8s.io/kubernetes/pkg/apis/imagepolicy/install"
_ "k8s.io/kubernetes/pkg/apis/networking/install"
_ "k8s.io/kubernetes/pkg/apis/node/install"
_ "k8s.io/kubernetes/pkg/apis/policy/install"
_ "k8s.io/kubernetes/pkg/apis/rbac/install"
_ "k8s.io/kubernetes/pkg/apis/resource/install"
_ "k8s.io/kubernetes/pkg/apis/scheduling/install"
_ "k8s.io/kubernetes/pkg/apis/storage/install"
_ "k8s.io/kubernetes/pkg/apis/storagemigration/install"
)

导入的都是对应 API Groups 的 install,比如 core/install 就是 Legacy API Groups 的 install。选取 apps/install 来看一下,install 的具体工作原理:

package install

import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/apps/v1"
"k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/apis/apps/v1beta2"
)

func init() {
Install(legacyscheme.Scheme)
}

// Install registers the API group and adds types to a scheme
func Install(scheme *runtime.Scheme) {
utilruntime.Must(apps.AddToScheme(scheme))
utilruntime.Must(v1beta1.AddToScheme(scheme))
utilruntime.Must(v1beta2.AddToScheme(scheme))
utilruntime.Must(v1.AddToScheme(scheme))
utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion, v1beta2.SchemeGroupVersion, v1beta1.SchemeGroupVersion))
}

显而易见,就是自动执行了 Install 方法。AddToScheme 的作用就是把对应包下的所有 API Object 全都加到 scheme 当中。Must 方法比较简单,做了一个 error 的判断,如果发生了 error 就抛出一个 panic。

到目前为止还没有解释过 Scheme 是什么。Scheme 是一个结构体,内含处理内外部 Version 之间转换,GVK 和 Go Type 之间转换的数据和方法。主要作用:

  • GVK 和 Go Type 之间转换:Scheme 内部有两张 map,分别对应 gvk 到 type 和 type 到 gvk;
  • API Object 默认值:API Object 有许多属性,使用者在操作一个 Object 时,不太可能给出所有属性值;另外在 Object 从一个 Version 转换到另一个 Version 时也可能需要为不存在对应关系的字段赋值;
  • 内外部 Version 之间转换:所有外部 Version 都会被转化为内部 Version,转换函数是记录在 scheme 内部;
// staging/src/k8s.io/apimachinery/pkg/runtime/scheme.go
type Scheme struct {
gvkToType map[schema.GroupVersionKind]reflect.Type
typeToGVK map[reflect.Type][]schema.GroupVersionKind
unversionedTypes map[reflect.Type]schema.GroupVersionKind
unversionedKinds map[string]reflect.Type
fieldLabelConversionFuncs map[schema.GroupVersionKind]FieldLabelConversionFunc
defaulterFuncs map[reflect.Type]func(interface{})
validationFuncs map[reflect.Type]func(ctx context.Context, op operation.Operation, object, oldObject interface{}) field.ErrorList
converter *conversion.Converter
versionPriority map[string][]string
observedVersions []schema.GroupVersion
schemeName string
}

来看一下 Internal Version 是怎么注册的。之前说到 install.go 文件用于执行每一个包下面的 AddToScheme 方法,该方法被定义在每个包下面的 register.go 文件下。以 apps 包为例:

var (
// SchemeBuilder stores functions to add things to a scheme.
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
// AddToScheme applies all stored functions t oa scheme.
AddToScheme = SchemeBuilder.AddToScheme
)

// GroupName is the group name use in this package
const GroupName = "apps"

// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: runtime.APIVersionInternal}

// Kind takes an unqualified kind and returns a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}

// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}

// Adds the list of known types to the given scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
// TODO this will get cleaned up with the scheme types are fixed
scheme.AddKnownTypes(SchemeGroupVersion,
&DaemonSet{},
&DaemonSetList{},
&Deployment{},
&DeploymentList{},
&DeploymentRollback{},
&autoscaling.Scale{},
&StatefulSet{},
&StatefulSetList{},
&ControllerRevision{},
&ControllerRevisionList{},
&ReplicaSet{},
&ReplicaSetList{},
)
return nil
}

Scheme 的构造遵循了设计模式中的 Builder 设计模式,register 本身的逻辑并不复杂。来看一下 external version 有什么不同。以 v1 包为例:

// staging/src/k8s.io/api/apps/v1/register.go
const GroupName = "apps"

// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}

// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}

var (
// TODO: move SchemeBuilder with zz_generated.deepcopy.go to k8s.io/api.
// localSchemeBuilder and AddToScheme will stay in k8s.io/kubernetes.
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
localSchemeBuilder = &SchemeBuilder
AddToScheme = localSchemeBuilder.AddToScheme
)

// Adds the list of known types to the given scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Deployment{},
&DeploymentList{},
&StatefulSet{},
&StatefulSetList{},
&DaemonSet{},
&DaemonSetList{},
&ReplicaSet{},
&ReplicaSetList{},
&ControllerRevision{},
&ControllerRevisionList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}

这里注册的都是 API Object 的 Types,而我们说 Scheme 维护了内外部 version 的转变函数,这部分在 pkg/apis/app/v1/register.go 注册。也就是说 external version 由两部分注册,一部分是转变函数,另一部分是 Types。

整体逻辑看上去和 internal version 没有太大区别,注意下在这个文件夹下有一个自动生成的文件 zz_generated.deepcopy.go。每一个 external version 都需要进行转换,转为成为 internal version 才可以被 k8s 进行处理。那么不同版本的代码会有很多,每一个版本都需要做数据转换的逻辑。具体如何做代码的生成,在附录做具体的解读。

Generic Server

Generic Server 在 api-server 中最基础的一个功能,就是提供暴露 http 服务所需的基础设施。对外提供 restful 服务来操作 API Object。每个内部 Server 都是构建在 Generic Server 之上,把自己的内容填入 Generic Server。每个 Generic Server 最重要的输出是 Director,本质上是一个 mux 和 go container 的组合,所有的 http request 都是被这些 director 处理的。

之前聊过所有的 Server 的新建都是从 Config 的 New 方法出发,在这个方法里就对 GenericServer 做了初始化。

s := &GenericAPIServer{
discoveryAddresses: c.DiscoveryAddresses,
LoopbackClientConfig: c.LoopbackClientConfig,
legacyAPIGroupPrefixes: c.LegacyAPIGroupPrefixes,
admissionControl: c.AdmissionControl,
Serializer: c.Serializer,
AuditBackend: c.AuditBackend,
Authorizer: c.Authorization.Authorizer,
delegationTarget: delegationTarget,
EquivalentResourceRegistry: c.EquivalentResourceRegistry,
NonLongRunningRequestWaitGroup: c.NonLongRunningRequestWaitGroup,
WatchRequestWaitGroup: c.WatchRequestWaitGroup,
Handler: apiServerHandler,
UnprotectedDebugSocket: debugSocket,

listedPathProvider: apiServerHandler,

minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
ShutdownTimeout: c.RequestTimeout,
ShutdownDelayDuration: c.ShutdownDelayDuration,
ShutdownWatchTerminationGracePeriod: c.ShutdownWatchTerminationGracePeriod,
SecureServingInfo: c.SecureServing,
ExternalAddress: c.ExternalAddress,

openAPIConfig: c.OpenAPIConfig,
openAPIV3Config: c.OpenAPIV3Config,
skipOpenAPIInstallation: c.SkipOpenAPIInstallation,

postStartHooks: map[string]postStartHookEntry{},
preShutdownHooks: map[string]preShutdownHookEntry{},
disabledPostStartHooks: c.DisabledPostStartHooks,

healthzRegistry: healthCheckRegistry{path: "/healthz", checks: c.HealthzChecks},
livezRegistry: healthCheckRegistry{path: "/livez", checks: c.LivezChecks, clock: clock.RealClock{}},
readyzRegistry: healthCheckRegistry{path: "/readyz", checks: c.ReadyzChecks},
livezGracePeriod: c.LivezGracePeriod,

DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer),

maxRequestBodyBytes: c.MaxRequestBodyBytes,

lifecycleSignals: c.lifecycleSignals,
ShutdownSendRetryAfter: c.ShutdownSendRetryAfter,

APIServerID: c.APIServerID,
StorageReadinessHook: NewStorageReadinessHook(c.StorageInitializationTimeout),
StorageVersionManager: c.StorageVersionManager,

EffectiveVersion: c.EffectiveVersion,
EmulationForwardCompatible: c.EmulationForwardCompatible,
RuntimeConfigEmulationForwardCompatible: c.RuntimeConfigEmulationForwardCompatible,
FeatureGate: c.FeatureGate,

muxAndDiscoveryCompleteSignals: map[string]<-chan struct{}{},
}

大部分都是从 Config 上做的 copy,有几个字段特别关注一下,一个是 apiServerHandler,这是提供 RESTful 接口重要的成员。另一个是 postStartHookspreShutdownHooks。GenericServer 提供 Http 服务的方式和 go-rest 相同,构造 Mux、Container 来组装 director。用 director 构造处理链,最后处理链实现 Http 的接口。

func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
	nonGoRestfulMux := mux.NewPathRecorderMux(name)
	if notFoundHandler != nil {
		nonGoRestfulMux.NotFoundHandler(notFoundHandler)
	}

	gorestfulContainer := restful.NewContainer()
	gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
	gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
		logStackOnRecover(s, panicReason, httpWriter)
	})
	gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
		serviceErrorHandler(s, serviceErr, request, response)
	})

	director := director{
		name:               name,
		goRestfulContainer: gorestfulContainer,
		nonGoRestfulMux:    nonGoRestfulMux,
	}

	return &APIServerHandler{
		FullHandlerChain:   handlerChainBuilder(director),
		GoRestfulContainer: gorestfulContainer,
		NonGoRestfulMux:    nonGoRestfulMux,
		Director:           director,
	}
}
func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	a.FullHandlerChain.ServeHTTP(w, r)
}

在图中我们看到 APIServer 是一条处理链,来看一下这条链路是怎么构建的。我们知道 apiServerHandler 是通过这一行来实例化的。

apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())

其中 delegationTarget 就是 genericServer,就是 notFoundHandler,当前 director 无法处理了都会交给它,因此在装配的时候有这几行代码。

if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}

OpenAPI

Tip OpenAPI

OpenAPI 规范(OAS)是一种通用的、和编程语言无关的 API 描述规范,使人类和计算机都可以发现和理解服务的功能,而无需访问源代码、文档或针对接口进行嗅探。正确定义后,使用者可以使用最少的实现逻辑来理解远程服务并与之交互。

OpenAPI 是由 Swagger 发展而来的一个规范,一种形式化描述 Restful Service 的语言,便于消费者理解和使用一个 Service。通过 OpenAPI 规范,可以描述一个服务:

  • 提供哪些 RESTful 服务;
  • 各服务接收的输入以及输出对象格式;
  • 支持的操作,如 post,get 等;

swagger.json 文件定义了 kubernetes 对外提供的 restful service,客户端可以按照规定来向 api server 发 http 请求。该文件定义了绝大多数当前版本内建的 API Object,并且每个外部版本 API Object 的组合拥有 swagger 中的一套定义。

hack/update-openapi-spec.sh 中定义了 swagger.json 如何生成。具体的 json 文件在 api/openapi-spec/swagger.json。对于一个 API Object,会生成一些 Defination 给 API Service 提供对外的服务,那这一步是如何自动生成的。

在每一个 API Group 下都有一个 doc.go,例如 staging/src/k8s.io/api/apps/v1/doc.go。会有一行注解来标志需不需要为该 group 下的 API Object 生成 openapi 相关的内容。

// +k8s:openapi-gen=true

package v1

types.go 里会为每个 API Object 定义属性:

type Deployment struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

// Specification of the desired behavior of the Deployment.
// +optional
Spec DeploymentSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`

// Most recently observed status of the Deployment.
// +optional
Status DeploymentStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

pkg/generated/openapi/zz_generated.openai.go 里生成 openapi definition。在这个文件中可以找到所有内建的 API Object 的 OpenAPI definition,并且每个外部版本和 API Object 的组合会有一个 Definition。这里的 key 和 swagger.json 中的 definition id 有一对一映射关系。

API Object 装载

当有了 API Object 的定义,generic server 需要提供各种 API Object 的 RESTful 接口。例如我们在执行 kubectl apply -f nginx.yaml 时,需要识别这是要部署一个 deployment。

最后 Install 的调用对象是 APIInstaller,里面包含了许多信息,例如 StorageCreaterConvertor

registerResourceHandlers 方法很长,约 900 多行,这里大致叙述一下这个函数所作的工作:

  1. 获取被注册 object 的 group 与 version,确定是不是 subresource;
  2. 确定其区不区分 namespace;
  3. 根据传入的 storage 对象实现的接口,确定支持的各种操作(verbs);
  4. 创建 ListOptions,CreateOptions,PatchOptions,UpdateOptions 以及其他各种 Options;
  5. 生成 apiResource 并返回;
  6. 制作 actions lists,每个 resource 的每个 verb 一条记录;
  7. 决定放入 etcd 时使用的 version;以及从 etcd 取出时可以转化为的 version;
  8. 生成 ResourceInfo 并返回;
  9. 根据 Serializer 得出支持的 MediaTypes。从而设置 webservice 的 response 中属性;
  10. 把以上各个环节得到的信息,放入 reqScope 中;
  11. 视情况去计算 reqScope 的 FieldManager 属性;
  12. 逐个处理 Actions list 中的 action,基于 reqScope 等信息,为他们生成 route 并注册到 webservice 中;
  13. 更新 apiResource 并返回;

详细的代码这里不再记录。

Master Server