Kubernetes源码分析-apiserver
名词解释
API Object:Kubernetes 内部管理的基本元素,是 K8S 在 ETCD 中信息存储单元。例如 Deployment,Pod,Service,都是 API Object。代码内部常用 API 来表示它们。
API Group:一组 API Object 组成的一个具有共有性质的对象集合。例如 apps group,由 Deployment,ReplicaSet,StatefulSet 等等 API Object 组成。
Legacy API Object:在 K8S 项目初始阶段所引入的 API Object 没有显式定义在 API Group 下面,例如 Pod,Event,Node 等等。在代码中有时也称他们为 core API object。
apiserver
关于如何源码调试 apiserver 的源码,请看附录
启动 apiserver 时会携带许多参数,但是大致可以分为如下几类:
- Generic flags:通用参数。
- Etcd flags:Etcd存储相关参数。
- Secure serving flags:HTTPS服务相关参数。
- Insecure serving flags:HTTP服务相关参数。
- Auditing flags:审计相关参数。
- Features flags:新特性相关参数。
- Authentication flags:认证相关参数。
- Authorization flags:授权相关参数。
- Cloud provider flags:云服务提供商相关参数。
- Api enablement flags:控制开启/禁用特定的资源版本或资源参数。
- Admission flags:准入控制器相关参数。
- Misc flags:其他参数。
- Global flags:全局参数。
启动流程
api-server 是控制平面的一个组件,对应启动的代码在
cmd/kube-apiserver/apiserver.go 文件里。本质上是一个 cobra
的 cli 应用程序,解析参数,配置 server 并启动。
|
options 是提供给用户来进行自定义配置的,基于 options 可以构建出一个
Config 来指定 server 的各种状态。config 构造完成之后,options
就没有用处了,用 config 来构造 server,对应方法
CreateServerChain。主要由四个组件,Aggregation
Server、Master、Extension Server、Not Found
Handler。构建过程是从右到左,请求流程是从左到右。
- Aggregation Server:负责转发请求到 Master 或 Custom API Server;
- Master:Kube API Server,负责 Build-in 的 API Object 相关处理;
- Extension Server:Customer Resource 的处理由它完成。包括 CR 和 CRD;
- Not Found Handler:找不到对应的 API Object 返回;
|
kubeAPIServer
kubeAPIServer 也就是 master server 来接受 RESTful 请求,构造处理一些
API Object 资源。config.KubeAPIs.New 就是在构建
kubeAPIServer,构建 StorageProviders,Install 对应的
APIInfo。这里就区分 internal API 和 Legacy API,对应两种不同的
StorageProviders,主要的职责是和 etcd
进行数据交互,并且处理对应的 RESTful API 请求。对每一个 Provider
都会生成一个 APIGroupInfo,最后 Install 整个
APIGroupInfo。这部分的内容和 Generic Server
有关,这里先大致叙述一下整个流程,不做详细描述。
func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
// ...
restStorageProviders, err := c.StorageProviders(client)
if err != nil {
return nil, err
}
if err := s.ControlPlane.InstallAPIs(restStorageProviders...); err != nil {
return nil, err
}
// ...
return s, nil
}
func (c CompletedConfig) StorageProviders(client *kubernetes.Clientset) ([]controlplaneapiserver.RESTStorageProvider, error) {
legacyRESTStorageProvider, err := corerest.New(corerest.Config{
GenericConfig: *c.ControlPlane.NewCoreGenericConfig(),
Proxy: corerest.ProxyConfig{
Transport: c.ControlPlane.Extra.ProxyTransport,
KubeletClientConfig: c.Extra.KubeletClientConfig,
},
Services: corerest.ServicesConfig{
ClusterIPRange: c.Extra.ServiceIPRange,
SecondaryClusterIPRange: c.Extra.SecondaryServiceIPRange,
NodePortRange: c.Extra.ServiceNodePortRange,
IPRepairInterval: c.Extra.RepairServicesInterval,
},
})
if err != nil {
return nil, err
}
// The order here is preserved in discovery.
// If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
// the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
// This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go
// with specific priorities.
// TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
// handlers that we have.
return []controlplaneapiserver.RESTStorageProvider{
legacyRESTStorageProvider,
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.ControlPlane.Generic.Authentication.Authenticator, APIAudiences: c.ControlPlane.Generic.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.ControlPlane.Generic.Authorization.Authorizer, RuleResolver: c.ControlPlane.Generic.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
discoveryrest.StorageProvider{},
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.ControlPlane.Generic.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
svmrest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{InformerFactory: c.ControlPlane.Generic.SharedInformerFactory},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.StorageProvider{},
admissionregistrationrest.RESTStorageProvider{Authorizer: c.ControlPlane.Generic.Authorization.Authorizer, DiscoveryClient: client.Discovery()},
eventsrest.RESTStorageProvider{TTL: c.ControlPlane.EventTTL},
resourcerest.RESTStorageProvider{NamespaceClient: client.CoreV1().Namespaces()},
}, nil
}
func (s *Server) InstallAPIs(restStorageProviders ...RESTStorageProvider) error {
nonLegacy := []*genericapiserver.APIGroupInfo{}
// used later in the loop to filter the served resource by those that have expired.
resourceExpirationEvaluatorOpts := genericapiserver.ResourceExpirationEvaluatorOptions{
CurrentVersion: s.GenericAPIServer.EffectiveVersion.EmulationVersion(),
Prerelease: s.GenericAPIServer.EffectiveVersion.BinaryVersion().PreRelease(),
EmulationForwardCompatible: s.GenericAPIServer.EmulationForwardCompatible,
RuntimeConfigEmulationForwardCompatible: s.GenericAPIServer.RuntimeConfigEmulationForwardCompatible,
}
resourceExpirationEvaluator, err := genericapiserver.NewResourceExpirationEvaluatorFromOptions(resourceExpirationEvaluatorOpts)
if err != nil {
return err
}
for _, restStorageBuilder := range restStorageProviders {
groupName := restStorageBuilder.GroupName()
apiGroupInfo, err := restStorageBuilder.NewRESTStorage(s.APIResourceConfigSource, s.RESTOptionsGetter)
if err != nil {
return fmt.Errorf("problem initializing API group %q: %w", groupName, err)
}
if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 {
// If we have no storage for any resource configured, this API group is effectively disabled.
// This can happen when an entire API group, version, or development-stage (alpha, beta, GA) is disabled.
klog.Infof("API group %q is not enabled, skipping.", groupName)
continue
}
// Remove resources that serving kinds that are removed or not introduced yet at the current version.
// We do this here so that we don't accidentally serve versions without resources or openapi information that for kinds we don't serve.
// This is a spot above the construction of individual storage handlers so that no sig accidentally forgets to check.
err = resourceExpirationEvaluator.RemoveUnavailableKinds(groupName, apiGroupInfo.Scheme, apiGroupInfo.VersionedResourcesStorageMap, s.APIResourceConfigSource)
if err != nil {
return err
}
if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 {
klog.V(1).Infof("Removing API group %v because it is time to stop serving it because it has no versions per APILifecycle.", groupName)
continue
}
klog.V(1).Infof("Enabling API group %q.", groupName)
if postHookProvider, ok := restStorageBuilder.(genericapiserver.PostStartHookProvider); ok {
name, hook, err := postHookProvider.PostStartHook()
if err != nil {
return fmt.Errorf("error building PostStartHook: %w", err)
}
s.GenericAPIServer.AddPostStartHookOrDie(name, hook)
}
if len(groupName) == 0 {
// the legacy group for core APIs is special that it is installed into /api via this special install method.
if err := s.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering legacy API: %w", err)
}
} else {
// everything else goes to /apis
nonLegacy = append(nonLegacy, &apiGroupInfo)
}
}
if err := s.GenericAPIServer.InstallAPIGroups(nonLegacy...); err != nil {
return fmt.Errorf("error in registering group versions: %w", err)
}
return nil
}
scheme
schema 是 kubernetes 资源管理的核心数据结构。由以前文章我们了解到 kubernetes 会将其管理的资源划分为 group/version/kind 的概念,scheme 可以利用进行 GVK 进行一下操作:
- 可以将资源在内部版本和其他版本中相互转化
- 可以序列化和反序列化的过程中识别资源类型,创建资源对象,设置默认值等等。
在 server.go 里没有看到 scheme 的构造,以及怎么把对应的
API Object 信息填入 scheme 中。在 server.go 中有一句
import:
|
在 pkg/controlplane/import_known_versions.go
中写了许多导包语句:
|
导入的都是对应 API Groups 的 install,比如 core/install
就是 Legacy API Groups 的 install。选取 apps/install
来看一下,install 的具体工作原理:
|
显而易见,就是自动执行了 Install
方法。AddToScheme 的作用就是把对应包下的所有 API Object
全都加到 scheme 当中。Must 方法比较简单,做了一个 error
的判断,如果发生了 error 就抛出一个 panic。
到目前为止还没有解释过 Scheme 是什么。Scheme 是一个结构体,内含处理内外部 Version 之间转换,GVK 和 Go Type 之间转换的数据和方法。主要作用:
- GVK 和 Go Type 之间转换:Scheme 内部有两张 map,分别对应 gvk 到 type 和 type 到 gvk;
- API Object 默认值:API Object 有许多属性,使用者在操作一个 Object 时,不太可能给出所有属性值;另外在 Object 从一个 Version 转换到另一个 Version 时也可能需要为不存在对应关系的字段赋值;
- 内外部 Version 之间转换:所有外部 Version 都会被转化为内部 Version,转换函数是记录在 scheme 内部;
|
来看一下 Internal Version 是怎么注册的。之前说到
install.go 文件用于执行每一个包下面的
AddToScheme 方法,该方法被定义在每个包下面的
register.go 文件下。以 apps 包为例:
|
Scheme 的构造遵循了设计模式中的 Builder
设计模式,register 本身的逻辑并不复杂。来看一下 external
version 有什么不同。以 v1 包为例:
|
这里注册的都是 API Object 的 Types,而我们说 Scheme 维护了内外部
version 的转变函数,这部分在 pkg/apis/app/v1/register.go
注册。也就是说 external version
由两部分注册,一部分是转变函数,另一部分是 Types。
整体逻辑看上去和 internal version
没有太大区别,注意下在这个文件夹下有一个自动生成的文件
zz_generated.deepcopy.go。每一个 external version
都需要进行转换,转为成为 internal version 才可以被 k8s
进行处理。那么不同版本的代码会有很多,每一个版本都需要做数据转换的逻辑。具体如何做代码的生成,在附录做具体的解读。
Generic Server
Generic Server 在 api-server 中最基础的一个功能,就是提供暴露 http 服务所需的基础设施。对外提供 restful 服务来操作 API Object。每个内部 Server 都是构建在 Generic Server 之上,把自己的内容填入 Generic Server。每个 Generic Server 最重要的输出是 Director,本质上是一个 mux 和 go container 的组合,所有的 http request 都是被这些 director 处理的。

之前聊过所有的 Server 的新建都是从 Config 的 New
方法出发,在这个方法里就对 GenericServer 做了初始化。
|
大部分都是从 Config 上做的 copy,有几个字段特别关注一下,一个是
apiServerHandler,这是提供 RESTful 接口重要的成员。另一个是
postStartHooks 和
preShutdownHooks。GenericServer 提供 Http 服务的方式和
go-rest 相同,构造 Mux、Container 来组装 director。用 director
构造处理链,最后处理链实现 Http 的接口。
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
gorestfulContainer := restful.NewContainer()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
a.FullHandlerChain.ServeHTTP(w, r)
}
在图中我们看到 APIServer 是一条处理链,来看一下这条链路是怎么构建的。我们知道 apiServerHandler 是通过这一行来实例化的。
|
其中 delegationTarget 就是 genericServer,就是
notFoundHandler,当前 director
无法处理了都会交给它,因此在装配的时候有这几行代码。
|
OpenAPI
OpenAPI 规范(OAS)是一种通用的、和编程语言无关的 API 描述规范,使人类和计算机都可以发现和理解服务的功能,而无需访问源代码、文档或针对接口进行嗅探。正确定义后,使用者可以使用最少的实现逻辑来理解远程服务并与之交互。
OpenAPI 是由 Swagger 发展而来的一个规范,一种形式化描述 Restful Service 的语言,便于消费者理解和使用一个 Service。通过 OpenAPI 规范,可以描述一个服务:
- 提供哪些 RESTful 服务;
- 各服务接收的输入以及输出对象格式;
- 支持的操作,如 post,get 等;
swagger.json 文件定义了 kubernetes 对外提供的 restful
service,客户端可以按照规定来向 api server 发 http
请求。该文件定义了绝大多数当前版本内建的 API Object,并且每个外部版本
API Object 的组合拥有 swagger 中的一套定义。
在 hack/update-openapi-spec.sh 中定义了
swagger.json 如何生成。具体的 json 文件在
api/openapi-spec/swagger.json。对于一个 API
Object,会生成一些 Defination 给 API Service
提供对外的服务,那这一步是如何自动生成的。
在每一个 API Group 下都有一个 doc.go,例如
staging/src/k8s.io/api/apps/v1/doc.go。会有一行注解来标志需不需要为该
group 下的 API Object 生成 openapi 相关的内容。
|
在 types.go 里会为每个 API Object 定义属性:
|
在 pkg/generated/openapi/zz_generated.openai.go 里生成
openapi definition。在这个文件中可以找到所有内建的 API Object 的 OpenAPI
definition,并且每个外部版本和 API Object 的组合会有一个
Definition。这里的 key 和 swagger.json 中的 definition id
有一对一映射关系。
API Object 装载
当有了 API Object 的定义,generic server 需要提供各种 API Object 的
RESTful 接口。例如我们在执行 kubectl apply -f nginx.yaml
时,需要识别这是要部署一个 deployment。

最后 Install 的调用对象是
APIInstaller,里面包含了许多信息,例如
Storage、Creater、Convertor。
registerResourceHandlers 方法很长,约 900
多行,这里大致叙述一下这个函数所作的工作:
- 获取被注册 object 的 group 与 version,确定是不是 subresource;
- 确定其区不区分 namespace;
- 根据传入的 storage 对象实现的接口,确定支持的各种操作(verbs);
- 创建 ListOptions,CreateOptions,PatchOptions,UpdateOptions 以及其他各种 Options;
- 生成 apiResource 并返回;
- 制作 actions lists,每个 resource 的每个 verb 一条记录;
- 决定放入 etcd 时使用的 version;以及从 etcd 取出时可以转化为的 version;
- 生成 ResourceInfo 并返回;
- 根据 Serializer 得出支持的 MediaTypes。从而设置 webservice 的 response 中属性;
- 把以上各个环节得到的信息,放入 reqScope 中;
- 视情况去计算 reqScope 的 FieldManager 属性;
- 逐个处理 Actions list 中的 action,基于 reqScope 等信息,为他们生成 route 并注册到 webservice 中;
- 更新 apiResource 并返回;
详细的代码这里不再记录。