万字长文:Kubernetes 创建 Pod 时,背后到底发生了什么?
时间:2022-10-23 08:00:00
关注「开源Linux」,选择星标
回复「学习」,有我为您筛选的学习资料~
全文大纲:
K8s 组件启动过程
kubectl(命令行客户端)
kube-apiserver
写入 etcd
Initializers
Control loops(控制循环)
Kubelet
本文试图回答以下问题:敲下kubectl run nginx --image=nginx --replicas=3
命令后,K8s 发生了什么事?
我们需要找出这个问题:
了解 K8s 几个核心组件的启动过程,他们分别做了什么,
请求从客户端开始 pod ready 整个过程。
0 K8s 组件启动过程
首先,看看几个核心组件的启动过程。
0.1 kube-apiserver 启动
调用栈
创建命令行(kube-apiserver
)入口:
main//cmd/kube-apiserver/apiserver.go |-cmd:=app.NewAPIServerCommand()//cmd/kube-apiserver/app/server.go ||-RunE:=func(){ |Complete() ||-ApplyAuthorization(s.Authorization) ||-ifTLS: |ServiceAccounts.KeyFiles=[]string{CertKey.KeyFile} |Validate() |Run(completedOptions,handlers)//核心逻辑 |} |-cmd.Execute()
kube-apiserver
启动后,将执行到其中Run()
方法:
Run()//cmd/kube-apiserver/app/server.go |-server=CreateServerChain() ||-CreateKubeAPIServerConfig() |||-buildGenericConfig ||||-genericapiserver.NewConfig()//staging/src/k8s.io/apiserver/pkg/server/config.go |||||-return&Config{ ||||Serializer:codecs, ||||BuildHandlerChainFunc:DefaultBuildHandlerChain,//注册handler ||||} |||| ||||-OpenAPIConfig=DefaultOpenAPIConfig()//OpenAPIschema ||||-kubeapiserver.NewStorageFactoryConfig()//etcd相关配置 ||||-APIResourceConfig=genericConfig.MergedResourceConfig ||||-storageFactoryConfig.Complete(s.Etcd) ||||-storageFactory=completedStorageFactoryConfig.New() ||nbsp; | |-s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig)
| | | |-BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
| | | |-pluginInitializers, admissionPostStartHook = admissionConfig.New()
| | |
| | |-capabilities.Initialize
| | |-controlplane.ServiceIPRange()
| | |-config := &controlplane.Config{}
| | |-AddPostStartHook("start-kube-apiserver-admission-initializer", admissionPostStartHook)
| | |-ServiceAccountIssuerURL = s.Authentication.ServiceAccounts.Issuer
| | |-ServiceAccountJWKSURI = s.Authentication.ServiceAccounts.JWKSURI
| | |-ServiceAccountPublicKeys = pubKeys
| |
| |-createAPIExtensionsServer
| |-CreateKubeAPIServer
| |-createAggregatorServer // cmd/kube-apiserver/app/aggregator.go
| | |-aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer) // staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
| | | |-apiGroupInfo := NewRESTStorage()
| | | |-GenericAPIServer.InstallAPIGroup(&apiGroupInfo)
| | | |-InstallAPIGroups
| | | |-openAPIModels := s.getOpenAPIModels(APIGroupPrefix, apiGroupInfos...)
| | | |-for apiGroupInfo := range apiGroupInfos {
| | | | s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels)
| | | | s.DiscoveryGroupManager.AddGroup(apiGroup)
| | | | s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
| | | |
| | | |-GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler)
| | | |-GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
| | | |-
| | |-
|-prepared = server.PrepareRun() // staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
| |-GenericAPIServer.AddPostStartHookOrDie
| |-GenericAPIServer.PrepareRun
| | |-routes.OpenAPI{}.Install()
| | |-registerResourceHandlers // staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
| | |-POST: XX
| | |-GET: XX
| |
| |-openapiaggregator.BuildAndRegisterAggregator()
| |-openapiaggregator.NewAggregationController()
| |-preparedAPIAggregator{}
|-prepared.Run() // staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
|-s.runnable.Run()
一些重要步骤
创建 server chain。Server aggregation(聚合)是一种支持多 apiserver 的方式,其中 包括了一个 generic apiserver[3],作为默认实现。
生成 OpenAPI schema,保存到 apiserver 的 Config.OpenAPIConfig 字段[4]。
遍历 schema 中的所有 API group,为每个 API group 配置一个 storage provider[5], 这是一个通用 backend 存储抽象层。
遍历每个 group 版本,为每个 HTTP route 配置 REST mappings[6]。稍后处理请求时,就能将 requests 匹配到合适的 handler。
0.2 controller-manager 启动
调用栈
NewDeploymentController
NewReplicaSetController
0.3 kubelet 启动
调用栈
main // cmd/kubelet/kubelet.go
|-NewKubeletCommand // cmd/kubelet/app/server.go
|-Run // cmd/kubelet/app/server.go
|-initForOS // cmd/kubelet/app/server.go
|-run // cmd/kubelet/app/server.go
|-initConfigz // cmd/kubelet/app/server.go
|-InitCloudProvider
|-NewContainerManager
|-ApplyOOMScoreAdj
|-PreInitRuntimeService
|-RunKubelet // cmd/kubelet/app/server.go
| |-k = createAndInitKubelet // cmd/kubelet/app/server.go
| | |-NewMainKubelet
| | | |-watch k8s Service
| | | |-watch k8s Node
| | | |-klet := &Kubelet{}
| | | |-init klet fields
| | |
| | |-k.BirthCry()
| | |-k.StartGarbageCollection()
| |
| |-startKubelet(k) // cmd/kubelet/app/server.go
| |-go k.Run() // -> pkg/kubelet/kubelet.go
| | |-go cloudResourceSyncManager.Run()
| | |-initializeModules
| | |-go volumeManager.Run()
| | |-go nodeLeaseController.Run()
| | |-initNetworkUtil() // setup iptables
| | |-go Until(PerformPodKillingWork, 1*time.Second, neverStop)
| | |-statusManager.Start()
| | |-runtimeClassManager.Start
| | |-pleg.Start()
| | |-syncLoop(updates, kl) // pkg/kubelet/kubelet.go
| |
| |-k.ListenAndServe
|
|-go http.ListenAndServe(healthz)
0.4 小结
以上核心组件启动完成后,就可以从命令行发起请求创建 pod 了。
1 kubectl(命令行客户端)
1.0 调用栈概览
NewKubectlCommand // staging/src/k8s.io/kubectl/pkg/cmd/cmd.go
|-matchVersionConfig = NewMatchVersionFlags()
|-f = cmdutil.NewFactory(matchVersionConfig)
| |-clientGetter = matchVersionConfig
|-NewCmdRun(f) // staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
| |-Complete // staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
| |-Run(f) // staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
| |-validate parameters
| |-generators = GeneratorFn("run")
| |-runObj = createGeneratedObject(generators) // staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
| | |-obj = generator.Generate() // -> staging/src/k8s.io/kubectl/pkg/generate/versioned/run.go
| | | |-get pod params
| | | |-pod = v1.Pod{params}
| | | |-return &pod
| | |-mapper = f.ToRESTMapper() // -> staging/src/k8s.io/cli-runtime/pkg/genericclioptions/config_flags.go
| | | |-f.clientGetter.ToRESTMapper() // -> staging/src/k8s.io/kubectl/pkg/cmd/util/factory_client_access.go
| | | |-f.Delegate.ToRESTMapper() // -> staging/src/k8s.io/kubectl/pkg/cmd/util/kubectl_match_version.go
| | | |-ToRESTMapper // -> staging/src/k8s.io/cli-runtime/pkg/resource/builder.go
| | | |-delegate() // staging/src/k8s.io/cli-runtime/pkg/resource/builder.go
| | |--actualObj = resource.NewHelper(mapping).XX.Create(obj)
| |-PrintObj(runObj.Object)
|
|-NewCmdEdit(f) // kubectl edit 命令
|-NewCmdScale(f) // kubectl scale 命令
|-NewCmdCordon(f) // kubectl cordon 命令
|-NewCmdUncordon(f)
|-NewCmdDrain(f)
|-NewCmdTaint(f)
|-NewCmdExecute(f)
|-...
1.1 参数验证(validation)和资源对象生成器(generator)
参数验证
敲下 kubectl
命令后,它首先会做一些客户端侧的验证。如果命令行参数有问题,例如,镜像名为空或格式不对[7], 这里会直接报错,从而避免了将明显错误的请求发给 kube-apiserver,减轻了后者的压力。
此外,kubectl 还会检查其他一些配置,例如
是否需要记录(record)这条命令(用于 rollout 或审计)
是否只是测试执行(
--dry-run
)
创建 HTTP 请求
所有查询或修改 K8s 资源的操作都需要与 kube-apiserver 交互,后者会进一步和 etcd 通信。
因此,验证通过之后,kubectl 接下来会创建发送给 kube-apiserver 的 HTTP 请求。
Generators
创建 HTTP 请求用到了所谓的 generator[8](文档[9]) ,它封装了资源的序列化(serialization)操作。例如,创建 pod 时用到的 generator 是 BasicPod[10]:
// staging/src/k8s.io/kubectl/pkg/generate/versioned/run.go
type BasicPod struct{}
func (BasicPod) ParamNames() []generate.GeneratorParam {
return []generate.GeneratorParam{
{Name: "labels", Required: false},
{Name: "name", Required: true},
{Name: "image", Required: true},
...
}
}
每个 generator 都实现了一个 Generate()
方法,用于生成一个该资源的运行时对象(runtime object)。对于 BasicPod
,其实现[11]为:
func (BasicPod) Generate(genericParams map[string]interface{}) (runtime.Object, error) {
pod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{ // metadata 字段
Name: name,
Labels: labels,
...
},
Spec: v1.PodSpec{ // spec 字段
ServiceAccountName: params["serviceaccount"],
Containers: []v1.Container{
{
Name: name,
Image: params["image"]
},
},
},
}
return &pod, nil
}
1.2 API group 和版本协商(version negotiation)
有了 runtime object 之后,kubectl 需要用合适的 API 将请求发送给 kube-apiserver。
API Group
K8s 用 API group 来管理 resource API。这是一种不同于 monolithic API(所有 API 扁平化)的 API 管理方式。
具体来说,同一资源的不同版本的 API,会放到一个 group 里面。例如 Deployment 资源的 API group 名为 apps
,最新的版本是 v1
。这也是为什么 我们在创建 Deployment 时,需要在 yaml 中指定 apiVersion: apps/v1
的原因。
版本协商
生成 runtime object 之后,kubectl 就开始搜索合适的 API group 和版本[12]:
// staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
obj := generator.Generate(params) // 创建运行时对象
mapper := f.ToRESTMapper() // 寻找适合这个资源(对象)的 API group
然后创建一个正确版本的客户端(versioned client)[13],
// staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
gvks, _ := scheme.Scheme.ObjectKinds(obj)
mapping := mapper.RESTMapping(gvks[0].GroupKind(), gvks[0].Version)
这个客户端能感知资源的 REST 语义。
以上过程称为版本协商。在实现上,kubectl 会扫描 kube-apiserver 的 /apis
路径(OpenAPI 格式的 schema 文档),获取所有的 API groups。
出于性能考虑,kubectl 会缓存这份 OpenAPI schema[14], 路径是 ~/.kube/cache/discovery
。想查看这个 API discovery 过程,可以删除这个文件, 然后随便执行一条 kubectl 命令,并指定足够大的日志级别(例如 kubectl get ds -v 10
)。
发送 HTTP 请求
现在有了 runtime object,也找到了正确的 API,因此接下来就是 将请求真正发送出去[15]:
// staging/src/k8s.io/kubectl/pkg/cmd/cmd.go
actualObj = resource.
NewHelper(client, mapping).
DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
WithFieldManager(o.fieldManager).
Create(o.Namespace, false, obj)
发送成功后,会以恰当的格式打印返回的消息。
1.3 客户端认证(client auth)
前面其实有意漏掉了一步:客户端认证。它发生在发送 HTTP 请求之前。
用户凭证(credentials)一般都放在 kubeconfig 文件中,但这个文件可以位于多个位置, 优先级从高到低:
命令行
--kubeconfig
环境变量
$KUBECONFIG
某些预定义的路径[16],例如
~/.kube
。
这个文件中存储了集群、用户认证等信息,如下面所示:
apiVersion: v1
clusters:
- cluster:
certificate-authority: /etc/kubernetes/pki/ca.crt
server: https://192.168.2.100:443
name: k8s-cluster-1
contexts:
- context:
cluster: k8s-cluster-1
user: default-user
name: default-context
current-context: default-context
kind: Config
preferences: {}
users:
- name: default-user
user:
client-certificate: /etc/kubernetes/pki/admin.crt
client-key: /etc/kubernetes/pki/admin.key
有了这些信息之后,客户端就可以组装 HTTP 请求的认证头了。支持的认证方式有几种:
X509 证书:放到 TLS[17] 中发送;
Bearer token:放到 HTTP
"Authorization"
头中发送[18];用户名密码:放到 HTTP basic auth 发送[19];
OpenID auth:需要先由用户手动处理,将其转成一个 token,然后和 bearer token 类似发送。
2 kube-apiserver
请求从客户端发出后,便来到服务端,也就是 kube-apiserver。
2.0 调用栈概览
buildGenericConfig
|-genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs) // cmd/kube-apiserver/app/server.go
NewConfig // staging/src/k8s.io/apiserver/pkg/server/config.go
|-return &Config{
Serializer: codecs,
BuildHandlerChainFunc: DefaultBuildHandlerChain,
} /
/
/
/
DefaultBuildHandlerChain // staging/src/k8s.io/apiserver/pkg/server/config.go
|-handler := filterlatency.TrackCompleted(apiHandler)
|-handler = genericapifilters.WithAuthorization(handler)
|-handler = genericapifilters.WithAudit(handler)
|-handler = genericapifilters.WithAuthentication(handler)
|-return handler
WithAuthentication
|-withAuthentication
|-resp, ok := AuthenticateRequest(req)
| |-for h := range authHandler.Handlers {
| resp, ok := currAuthRequestHandler.AuthenticateRequest(req)
| if ok {
| return resp, ok, err
| }
| }
| return nil, false, utilerrors.NewAggregate(errlist)
|
|-audiencesAreAcceptable(apiAuds, resp.Audiences)
|-req.Header.Del("Authorization")
|-req = req.WithContext(WithUser(req.Context(), resp.User))
|-return handler.ServeHTTP(w, req)
2.1 认证(Authentication)
kube-apiserver 首先会对请求进行认证(authentication),以确保用户身份是合法的(verify that the requester is who they say they are)。
具体过程:启动时,检查所有的命令行参数[20],组织成一个 authenticator list,例如,
如果指定了
--client-ca-file
,就会将 x509 证书加到这个列表;如果指定了
--token-auth-file
,就会将 token 加到这个列表;
不同 anthenticator 做的事情有所不同:
x509 handler[21] 验证该 HTTP 请求是用 TLS key 加密的,并且有 CA root 证书的签名。
bearer token handler[22] 验证请求中带的 token(HTTP Authorization 头中),在 apiserver 的 auth file 中是存在的(
--token-auth-file
)。basicauth handler[23] 对 basic auth 信息进行校验。
如果认证成功,就会将 Authorization
头从请求中删除,然后在上下文中加上用户信息[24]。这使得后面的步骤(例如鉴权和 admission control)能用到这里已经识别出的用户身份信息。
// staging/src/k8s.io/apiserver/pkg/endpoints/filters/authentication.go
// WithAuthentication creates an http handler that tries to authenticate the given request as a user, and then
// stores any such user found onto the provided context for the request.
// On success, "Authorization" header is removed from the request and handler
// is invoked to serve the request.
func WithAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler,
apiAuds authenticator.Audiences) http.Handler {
return withAuthentication(handler, auth, failed, apiAuds, recordAuthMetrics)
}
func withAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler,
apiAuds authenticator.Audiences, metrics recordMetrics) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
resp, ok := auth.AuthenticateRequest(req) // 遍历所有 authenticator,任何一个成功就返回 OK
if !ok {
return failed.ServeHTTP(w, req) // 所有认证方式都失败了
}
if !audiencesAreAcceptable(apiAuds, resp.Audiences) {
fmt.Errorf("unable to match the audience: %v , accepted: %v", resp.Audiences, apiAuds)
failed.ServeHTTP(w, req)
return
}
req.Header.Del("Authorization") // 认证成功后,这个 header 就没有用了,可以删掉
// 将用户信息添加到请求上下文中,供后面的步骤使用
req = req.WithContext(WithUser(req.Context(), resp.User))
handler.ServeHTTP(w, req)
})
}
AuthenticateRequest()
实现:遍历所有 authenticator,任何一个成功就返回 OK,
// staging/src/k8s.io/apiserver/pkg/authentication/request/union/union.go
func (authHandler *unionAuthRequestHandler) AuthenticateRequest(req) (*Response, bool) {
for currAuthRequestHandler := range authHandler.Handlers {
resp, ok := currAuthRequestHandler.AuthenticateRequest(req)
if ok {
return resp, ok, err
}
}
return nil, false, utilerrors.NewAggregate(errlist)
}
2.2 鉴权(Authorization)
发送者身份(认证)是一个问题,但他是否有权限执行这个操作(鉴权),是另一个问题。因此确认发送者身份之后,还需要进行鉴权。
鉴权的过程与认证非常相似,也是逐个匹配 authorizer 列表中的 authorizer:如果都失败了, 返回 Forbidden
并停止 进一步处理[25]。如果成功,就继续。
内置的 几种 authorizer 类型:
webhook[26]:与其他服务交互,验证是否有权限。
ABAC[27]:根据静态文件中规定的策略(policies)来进行鉴权。
RBAC[28]:根据 role 进行鉴权,其中 role 是 k8s 管理员提前配置的。
Node[29]:确保 node clients,例如 kubelet,只能访问本机内的资源。
要看它们的具体做了哪些事情,可以查看它们各自的 Authorize()
方法。
2.3 Admission control
至此,认证和鉴权都通过了。但这还没结束,K8s 中的其它组件还需要对请求进行检查, 其中就包括 admission controllers[30]。
与鉴权的区别
鉴权(authorization)在前面,关注的是用户是否有操作权限,
Admission controllers 在更后面,对请求进行拦截和过滤,确保它们符合一些更广泛的集群规则和限制, 是将请求对象持久化到 etcd 之前的最后堡垒。
工作方式
与认证和鉴权类似,也是遍历一个列表,
但有一点核心区别:任何一个 controller 检查没通过,请求就会失败。
设计:可扩展
每个 controller 作为一个 plugin 存放在 plugin/pkg/admission 目录[31],
设计时已经考虑,只需要实现很少的几个接口
但注意,admission controller 最终会编译到 k8s 的二进制文件(而非独立的 plugin binary)
类型
Admission controllers 通常按不同目的分类,包括:资源管理、安全管理、默认值管 理、引用一致性(referential consistency)等类型。
例如,下面是资源管理类的几个 controller:
InitialResources
:为容器设置默认的资源限制(基于过去的使用量);LimitRanger
:为容器的 requests and limits 设置默认值,或对特定资源设置上限(例如,内存默认 512MB,最高不超过 2GB)。ResourceQuota
:资源配额。
3 写入 etcd
至此,K8s 已经完成对请求的验证,允许它进行接下来的处理。
kube-apiserver 将对请求进行反序列化,构造 runtime objects( kubectl generator 的反过程),并将它们持久化到 etcd。下面详细 看这个过程。
3.0 调用栈概览
对于本文创建 pod 的请求,相应的入口是POST handler[32],它又会进一步将请求委托给一个创建具体资源的 handler。
registerResourceHandlers // staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
|-case POST:
// staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
switch () {
case "POST": // Create a resource.
var handler restful.RouteFunction
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResource(creater, reqScope, admit)
}
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, .., handler)
article := GetArticleForNoun(kind, " ")
doc := "create" + article + kind
if isSubresource {
doc = "create " + subresource + " of" + article + kind
}
route := ws.POST(action.Path).To(handler).
Doc(doc).
Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Returns(http.StatusCreated, "Created", producedObject).
Returns(http.StatusAccepted, "Accepted", producedObject).
Reads(defaultVersionedObject).
Writes(producedObject)
AddObjectParams(ws, route, versionedCreateOptions)
addParams(route, action.Params)
routes = append(routes, route)
}
for route := range routes {
route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
Group: reqScope.Kind.Group,
Version: reqScope.Kind.Version,
Kind: reqScope.Kind.Kind,
})
route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
ws.Route(route)
}
3.1 kube-apiserver 请求处理过程
从 apiserver 的请求处理函数开始:
// staging/src/k8s.io/apiserver/pkg/server/handler.go
func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
path := req.URL.Path
// check to see if our webservices want to claim this path
for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
switch {
case ws.RootPath() == "/apis":
if path == "/apis" || path == "/apis/" {
return d.goRestfulContainer.Dispatch(w, req)
}
case strings.HasPrefix(path, ws.RootPath()):
if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
return d.goRestfulContainer.Dispatch(w, req)
}
}
}
// if we didn't find a match, then we just skip gorestful altogether
d.nonGoRestfulMux.ServeHTTP(w, req)
}
如果能匹配到请求(例如匹配到前面注册的路由),它将分派给相应的 handler[33];否则,fall back 到path-based handler[34](GET /apis
到达的就是这里);
基于 path 的 handlers:
// staging/src/k8s.io/apiserver/pkg/server/mux/pathrecorder.go
func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok {
return exactHandler.ServeHTTP(w, r)
}
for prefixHandler := range h.prefixHandlers {
if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) {
return prefixHandler.handler.ServeHTTP(w, r)
}
}
h.notFoundHandler.ServeHTTP(w, r)
}
如果还是没有找到路由,就会 fallback 到 non-gorestful handler,最终可能是一个 not found handler。
对于我们的场景,会匹配到一条已经注册的、名为`createHandler`[35]为的路由。
3.2 Create handler 处理过程
// staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go
func createHandler(r rest.NamedCreater, scope *RequestScope, admit Interface, includeName bool) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
namespace, name := scope.Namer.Name(req) // 获取资源的 namespace 和 name(etcd item key)
s := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
body := limitedReadBody(req, scope.MaxRequestBodyBytes)
obj, gvk := decoder.Decode(body, &defaultGVK, original)
admit = admission.WithAudit(admit, ae)
requestFunc := func() (runtime.Object, error) {
return r.Create(
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
)
}
result := finishRequest(ctx, func() (runtime.Object, error) {
if scope.FieldManager != nil {
liveObj := scope.Creater.New(scope.Kind)
obj = scope.FieldManager.UpdateNoErrors(liveObj, obj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
admit = fieldmanager.NewManagedFieldsValidatingAdmissionController(admit)
}
admit.(admission.MutationInterface)
mutatingAdmission.Handles(admission.Create)
mutatingAdmission.Admit(ctx, admissionAttributes, scope)
return requestFunc()
})
code := http.StatusCreated
status, ok := result.(*metav1.Status)
transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result)
}
}
首先解析 HTTP request,然后执行基本的验证,例如保证 JSON 与 versioned API resource 期望的是一致的;
执行审计和最终 admission;
将资源最终写到 etcd[36], 这会进一步调用到 storage provider[37]。
etcd key 的格式一般是
(例如,/ default/nginx-0
),但这个也是可配置的。最后,storage provider 执行一次
get
操作,确保对象真的创建成功了。如果有额外的收尾任务(additional finalization),会执行 post-create handlers 和 decorators。返回 生成的[38]HTTP response。
以上过程可以看出,apiserver 做了大量的事情。
总结:至此我们的 pod 资源已经在 etcd 中了。但是,此时 kubectl get pods -n
还看不见它。
4 Initializers
对象持久化到 etcd 之后,apiserver 并未将其置位对外可见,它也不会立即就被调度, 而是要先等一些 initializers[39] 运行完成。
4.1 Initializer
Initializer 是与特定资源类型(resource type)相关的 controller,
负责在该资源对外可见之前对它们执行一些处理,
如果一种资源类型没有注册任何 initializer,这个步骤就会跳过,资源对外立即可见。
这是一种非常强大的特性,使得我们能执行一些通用的启动初始化(bootstrap)操作。例如,
向 Pod 注入 sidecar、暴露 80 端口,或打上特定的 annotation。
向某个 namespace 内的所有 pod 注入一个存放了测试证书(test certificates)的 volume。
禁止创建长度小于 20 个字符的 Secret (例如密码)。
4.2 InitializerConfiguration
可以用 InitializerConfiguration
声明对哪些资源类型(resource type)执行哪些 initializer。
例如,要实现所有 pod 创建时都运行一个自定义的 initializer custom-pod-initializer
, 可以用下面的 yaml:
apiVersion: admissionregistration.k8s.io/v1alpha1
kind: InitializerConfiguration
metadata:
name: custom-pod-initializer
initializers:
- name: podimage.example.com
rules:
- apiGroups:
- ""
apiVersions:
- v1
resources:
- pods
创建以上配置(kubectl create -f xx.yaml
)之后,K8s 会将custom-pod-initializer
追加到每个 pod 的 metadata.initializers.pending
字段。
在此之前需要启动 initializer controller,它会
定期扫描是否有新 pod 创建;
当检测到它的名字出现在 pod 的 pending 字段时,就会执行它的处理逻辑;
执行完成之后,它会将自己的名字从 pending list 中移除。
pending list 中的 initializers,每次只有第一个 initializer 能执行。当所有 initializer 执行完成,pending
字段为空之后,就认为这个对象已经完成初始化了(considered initialized)。
细心的同学可能会有疑问:前面说这个对象还没有对外可见,那用 户空间的 initializer controller 又是如何能检测并操作这个对象的呢?答案是:kube-apiserver 提供了一个 ?includeUninitialized
查询参数,它会返回所有对象, 包括那些还未完成初始化的(uninitialized ones)。
5 Control loops(控制循环)
至此,对象已经在 etcd 中了,所有的初始化步骤也已经完成了。下一步是设置资源拓扑(resource topology)。例如,一个 Deployment 其实就是一组 ReplicaSet,而一个 ReplicaSet 就是一组 Pod。K8s 是如何根据一个 HTTP 请求创建出这个层级关系的呢?靠的是 K8s 内置的控制器(controllers)。
K8s 中大量使用 "controllers",
一个 controller 就是一个异步脚本(an asynchronous script),
不断检查资源的当前状态(current state)和期望状态(desired state)是否一致,
如果不一致就尝试将其变成期望状态,这个过程称为 reconcile。
每个 controller 负责的东西都比较少,所有 controller 并行运行, 由 kube-controller-manager 统一管理。
5.1 Deployments controller
Deployments controller 启动
当一个 Deployment record 存储到 etcd 并(被 initializers)初始化之后, kube-apiserver 就会将其置为对外可见的。此后, Deployment controller 监听了 Deployment 资源的变动,因此此时就会检测到这个新创建的资源。
// pkg/controller/deployment/deployment_controller.go
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer DeploymentInformer, rsInformer ReplicaSetInformer,
podInformer PodInformer, client clientset.Interface) (*DeploymentController, error) {
dc := &DeploymentController{
client: client,
queue: workqueue.NewNamedRateLimitingQueue(),
}
dc.rsControl = controller.RealRSControl{ // ReplicaSet controller
KubeClient: client,
Recorder: dc.eventRecorder,
}
// 注册 Deployment 事件回调函数
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDeployment, // 有 Deployment 创建时触发
UpdateFunc: dc.updateDeployment,
DeleteFunc: dc.deleteDeployment,
})
// 注册 ReplicaSet 事件回调函数
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addReplicaSet,
UpdateFunc: dc.updateReplicaSet,
&