锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

万字长文:Kubernetes 创建 Pod 时,背后到底发生了什么?

时间:2022-10-23 08:00:00 l300继电器

关注「开源Linux」,选择星标

回复「学习」,有我为您筛选的学习资料~

全文大纲:

  • K8s 组件启动过程

  • kubectl(命令行客户端)

  • kube-apiserver

  • 写入 etcd

  • Initializers

  • Control loops(控制循环)

  • Kubelet


本文试图回答以下问题:敲下kubectl run nginx --image=nginx --replicas=3命令后K8s 发生了什么事?

我们需要找出这个问题:

  1. 了解 K8s 几个核心组件的启动过程,他们分别做了什么,

  2. 请求从客户端开始 pod ready 整个过程。

0 K8s 组件启动过程

首先,看看几个核心组件的启动过程。

0.1 kube-apiserver 启动

调用栈

创建命令行(kube-apiserver)入口:

main//cmd/kube-apiserver/apiserver.go |-cmd:=app.NewAPIServerCommand()//cmd/kube-apiserver/app/server.go ||-RunE:=func(){ |Complete() ||-ApplyAuthorization(s.Authorization) ||-ifTLS: |ServiceAccounts.KeyFiles=[]string{CertKey.KeyFile} |Validate() |Run(completedOptions,handlers)//核心逻辑 |} |-cmd.Execute() 

kube-apiserver启动后,将执行到其中Run()方法:

Run()//cmd/kube-apiserver/app/server.go |-server=CreateServerChain() ||-CreateKubeAPIServerConfig() |||-buildGenericConfig ||||-genericapiserver.NewConfig()//staging/src/k8s.io/apiserver/pkg/server/config.go |||||-return&Config{ ||||Serializer:codecs, ||||BuildHandlerChainFunc:DefaultBuildHandlerChain,//注册handler ||||} |||| ||||-OpenAPIConfig=DefaultOpenAPIConfig()//OpenAPIschema ||||-kubeapiserver.NewStorageFactoryConfig()//etcd相关配置 ||||-APIResourceConfig=genericConfig.MergedResourceConfig ||||-storageFactoryConfig.Complete(s.Etcd) ||||-storageFactory=completedStorageFactoryConfig.New() ||nbsp; |   |-s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig)
 |           |   |   |-BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
 |           |   |   |-pluginInitializers, admissionPostStartHook = admissionConfig.New()
 |           |   |
 |           |   |-capabilities.Initialize
 |           |   |-controlplane.ServiceIPRange()
 |           |   |-config := &controlplane.Config{}
 |           |   |-AddPostStartHook("start-kube-apiserver-admission-initializer", admissionPostStartHook)
 |           |   |-ServiceAccountIssuerURL = s.Authentication.ServiceAccounts.Issuer
 |           |   |-ServiceAccountJWKSURI = s.Authentication.ServiceAccounts.JWKSURI
 |           |   |-ServiceAccountPublicKeys = pubKeys
 |           |
 |           |-createAPIExtensionsServer
 |           |-CreateKubeAPIServer
 |           |-createAggregatorServer    // cmd/kube-apiserver/app/aggregator.go
 |           |   |-aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)   // staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
 |           |   |  |-apiGroupInfo := NewRESTStorage()
 |           |   |  |-GenericAPIServer.InstallAPIGroup(&apiGroupInfo)
 |           |   |  |-InstallAPIGroups
 |           |   |  |-openAPIModels := s.getOpenAPIModels(APIGroupPrefix, apiGroupInfos...)
 |           |   |  |-for apiGroupInfo := range apiGroupInfos {
 |           |   |  |   s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels)
 |           |   |  |   s.DiscoveryGroupManager.AddGroup(apiGroup)
 |           |   |  |   s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
 |           |   |  |
 |           |   |  |-GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler)
 |           |   |  |-GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
 |           |   |  |-
 |           |   |-
 |-prepared = server.PrepareRun()     // staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
 |            |-GenericAPIServer.AddPostStartHookOrDie
 |            |-GenericAPIServer.PrepareRun
 |            |  |-routes.OpenAPI{}.Install()
 |            |     |-registerResourceHandlers // staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
 |            |         |-POST: XX
 |            |         |-GET: XX
 |            |
 |            |-openapiaggregator.BuildAndRegisterAggregator()
 |            |-openapiaggregator.NewAggregationController()
 |            |-preparedAPIAggregator{}
 |-prepared.Run() // staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
    |-s.runnable.Run()

一些重要步骤

  1. 创建 server chain。Server aggregation(聚合)是一种支持多 apiserver 的方式,其中 包括了一个 generic apiserver[3],作为默认实现。

  2. 生成 OpenAPI schema,保存到 apiserver 的 Config.OpenAPIConfig 字段[4]

  3. 遍历 schema 中的所有 API group,为每个 API group 配置一个 storage provider[5], 这是一个通用 backend 存储抽象层。

  4. 遍历每个 group 版本,为每个 HTTP route 配置 REST mappings[6]。稍后处理请求时,就能将 requests 匹配到合适的 handler。

0.2 controller-manager 启动

调用栈

NewDeploymentController
NewReplicaSetController

0.3 kubelet 启动

调用栈

main                                                                            // cmd/kubelet/kubelet.go
 |-NewKubeletCommand                                                            // cmd/kubelet/app/server.go
   |-Run                                                                        // cmd/kubelet/app/server.go
      |-initForOS                                                               // cmd/kubelet/app/server.go
      |-run                                                                     // cmd/kubelet/app/server.go
        |-initConfigz                                                           // cmd/kubelet/app/server.go
        |-InitCloudProvider
        |-NewContainerManager
        |-ApplyOOMScoreAdj
        |-PreInitRuntimeService
        |-RunKubelet                                                            // cmd/kubelet/app/server.go
        | |-k = createAndInitKubelet                                            // cmd/kubelet/app/server.go
        | |  |-NewMainKubelet
        | |  |  |-watch k8s Service
        | |  |  |-watch k8s Node
        | |  |  |-klet := &Kubelet{}
        | |  |  |-init klet fields
        | |  |
        | |  |-k.BirthCry()
        | |  |-k.StartGarbageCollection()
        | |
        | |-startKubelet(k)                                                     // cmd/kubelet/app/server.go
        |    |-go k.Run()                                                       // -> pkg/kubelet/kubelet.go
        |    |  |-go cloudResourceSyncManager.Run()
        |    |  |-initializeModules
        |    |  |-go volumeManager.Run()
        |    |  |-go nodeLeaseController.Run()
        |    |  |-initNetworkUtil() // setup iptables
        |    |  |-go Until(PerformPodKillingWork, 1*time.Second, neverStop)
        |    |  |-statusManager.Start()
        |    |  |-runtimeClassManager.Start
        |    |  |-pleg.Start()
        |    |  |-syncLoop(updates, kl)                                         // pkg/kubelet/kubelet.go
        |    |
        |    |-k.ListenAndServe
        |
        |-go http.ListenAndServe(healthz)

0.4 小结

以上核心组件启动完成后,就可以从命令行发起请求创建 pod 了。

1 kubectl(命令行客户端)

1.0 调用栈概览

NewKubectlCommand                                    // staging/src/k8s.io/kubectl/pkg/cmd/cmd.go
 |-matchVersionConfig = NewMatchVersionFlags()
 |-f = cmdutil.NewFactory(matchVersionConfig)
 |      |-clientGetter = matchVersionConfig
 |-NewCmdRun(f)                                      // staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
 |  |-Complete                                       // staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
 |  |-Run(f)                                         // staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
 |    |-validate parameters
 |    |-generators = GeneratorFn("run")
 |    |-runObj = createGeneratedObject(generators)   // staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
 |    |           |-obj = generator.Generate()       // -> staging/src/k8s.io/kubectl/pkg/generate/versioned/run.go
 |    |           |        |-get pod params
 |    |           |        |-pod = v1.Pod{params}
 |    |           |        |-return &pod
 |    |           |-mapper = f.ToRESTMapper()        // -> staging/src/k8s.io/cli-runtime/pkg/genericclioptions/config_flags.go
 |    |           |  |-f.clientGetter.ToRESTMapper() // -> staging/src/k8s.io/kubectl/pkg/cmd/util/factory_client_access.go
 |    |           |     |-f.Delegate.ToRESTMapper()  // -> staging/src/k8s.io/kubectl/pkg/cmd/util/kubectl_match_version.go
 |    |           |        |-ToRESTMapper            // -> staging/src/k8s.io/cli-runtime/pkg/resource/builder.go
 |    |           |        |-delegate()              //    staging/src/k8s.io/cli-runtime/pkg/resource/builder.go
 |    |           |--actualObj = resource.NewHelper(mapping).XX.Create(obj)
 |    |-PrintObj(runObj.Object)
 |
 |-NewCmdEdit(f)      // kubectl edit   命令
 |-NewCmdScale(f)     // kubectl scale  命令
 |-NewCmdCordon(f)    // kubectl cordon 命令
 |-NewCmdUncordon(f)
 |-NewCmdDrain(f)
 |-NewCmdTaint(f)
 |-NewCmdExecute(f)
 |-...

1.1 参数验证(validation)和资源对象生成器(generator)

参数验证

敲下 kubectl 命令后,它首先会做一些客户端侧的验证。如果命令行参数有问题,例如,镜像名为空或格式不对[7], 这里会直接报错,从而避免了将明显错误的请求发给 kube-apiserver,减轻了后者的压力。

此外,kubectl 还会检查其他一些配置,例如

  • 是否需要记录(record)这条命令(用于 rollout 或审计)

  • 是否只是测试执行(--dry-run

创建 HTTP 请求

所有查询或修改 K8s 资源的操作都需要与 kube-apiserver 交互,后者会进一步和 etcd 通信。

因此,验证通过之后,kubectl 接下来会创建发送给 kube-apiserver 的 HTTP 请求

Generators

创建 HTTP 请求用到了所谓的 generator[8](文档[9]) ,它封装了资源的序列化(serialization)操作。例如,创建 pod 时用到的 generator 是 BasicPod[10]

// staging/src/k8s.io/kubectl/pkg/generate/versioned/run.go

type BasicPod struct{}

func (BasicPod) ParamNames() []generate.GeneratorParam {
    return []generate.GeneratorParam{
        {Name: "labels", Required: false},
        {Name: "name", Required: true},
        {Name: "image", Required: true},
        ...
    }
}

每个 generator 都实现了一个 Generate() 方法,用于生成一个该资源的运行时对象(runtime object)。对于 BasicPod,其实现[11]为:

func (BasicPod) Generate(genericParams map[string]interface{}) (runtime.Object, error) {
    pod := v1.Pod{
        ObjectMeta: metav1.ObjectMeta{  // metadata 字段
            Name:        name,
            Labels:      labels,
            ...
        },
        Spec: v1.PodSpec{               // spec 字段
            ServiceAccountName: params["serviceaccount"],
            Containers: []v1.Container{
                {
                    Name:            name,
                    Image:           params["image"]
                },
            },
        },
    }

    return &pod, nil
}

1.2 API group 和版本协商(version negotiation)

有了 runtime object 之后,kubectl 需要用合适的 API 将请求发送给 kube-apiserver。

API Group

K8s 用 API group 来管理 resource API。这是一种不同于 monolithic API(所有 API 扁平化)的 API 管理方式。

具体来说,同一资源的不同版本的 API,会放到一个 group 里面。例如 Deployment 资源的 API group 名为 apps,最新的版本是 v1。这也是为什么 我们在创建 Deployment 时,需要在 yaml 中指定 apiVersion: apps/v1 的原因。

版本协商

生成 runtime object 之后,kubectl 就开始搜索合适的 API group 和版本[12]

// staging/src/k8s.io/kubectl/pkg/cmd/run/run.go

    obj := generator.Generate(params) // 创建运行时对象
    mapper := f.ToRESTMapper()        // 寻找适合这个资源(对象)的 API group

然后创建一个正确版本的客户端(versioned client)[13]

// staging/src/k8s.io/kubectl/pkg/cmd/run/run.go

    gvks, _ := scheme.Scheme.ObjectKinds(obj)
    mapping := mapper.RESTMapping(gvks[0].GroupKind(), gvks[0].Version)

这个客户端能感知资源的 REST 语义。

以上过程称为版本协商。在实现上,kubectl 会扫描 kube-apiserver 的 /apis 路径(OpenAPI 格式的 schema 文档),获取所有的 API groups。

出于性能考虑,kubectl 会缓存这份 OpenAPI schema[14], 路径是 ~/.kube/cache/discovery想查看这个 API discovery 过程,可以删除这个文件, 然后随便执行一条 kubectl 命令,并指定足够大的日志级别(例如 kubectl get ds -v 10)。

发送 HTTP 请求

现在有了 runtime object,也找到了正确的 API,因此接下来就是 将请求真正发送出去[15]

// staging/src/k8s.io/kubectl/pkg/cmd/cmd.go

        actualObj = resource.
            NewHelper(client, mapping).
            DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
            WithFieldManager(o.fieldManager).
            Create(o.Namespace, false, obj)

发送成功后,会以恰当的格式打印返回的消息。

1.3 客户端认证(client auth)

前面其实有意漏掉了一步:客户端认证。它发生在发送 HTTP 请求之前。

用户凭证(credentials)一般都放在 kubeconfig 文件中,但这个文件可以位于多个位置, 优先级从高到低:

  • 命令行 --kubeconfig

  • 环境变量 $KUBECONFIG

  • 某些预定义的路径[16],例如 ~/.kube

这个文件中存储了集群、用户认证等信息,如下面所示:

apiVersion: v1
clusters:
- cluster:
    certificate-authority: /etc/kubernetes/pki/ca.crt
    server: https://192.168.2.100:443
  name: k8s-cluster-1
contexts:
- context:
    cluster: k8s-cluster-1
    user: default-user
  name: default-context
current-context: default-context
kind: Config
preferences: {}
users:
- name: default-user
  user:
    client-certificate: /etc/kubernetes/pki/admin.crt
    client-key: /etc/kubernetes/pki/admin.key

有了这些信息之后,客户端就可以组装 HTTP 请求的认证头了。支持的认证方式有几种:

  • X509 证书:放到 TLS[17] 中发送;

  • Bearer token:放到 HTTP "Authorization" 头中发送[18]

  • 用户名密码:放到 HTTP basic auth 发送[19]

  • OpenID auth:需要先由用户手动处理,将其转成一个 token,然后和 bearer token 类似发送。

2 kube-apiserver

请求从客户端发出后,便来到服务端,也就是 kube-apiserver。

2.0 调用栈概览


buildGenericConfig
  |-genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)  // cmd/kube-apiserver/app/server.go

NewConfig       // staging/src/k8s.io/apiserver/pkg/server/config.go
 |-return &Config{
      Serializer:             codecs,
      BuildHandlerChainFunc:  DefaultBuildHandlerChain,
   }                          /
                            /
                          /
                        /
DefaultBuildHandlerChain       // staging/src/k8s.io/apiserver/pkg/server/config.go
 |-handler := filterlatency.TrackCompleted(apiHandler)
 |-handler = genericapifilters.WithAuthorization(handler)
 |-handler = genericapifilters.WithAudit(handler)
 |-handler = genericapifilters.WithAuthentication(handler)
 |-return handler


WithAuthentication
 |-withAuthentication
    |-resp, ok := AuthenticateRequest(req)
    |  |-for h := range authHandler.Handlers {
    |      resp, ok := currAuthRequestHandler.AuthenticateRequest(req)
    |      if ok {
    |          return resp, ok, err
    |      }
    |    }
    |    return nil, false, utilerrors.NewAggregate(errlist)
    |
    |-audiencesAreAcceptable(apiAuds, resp.Audiences)
    |-req.Header.Del("Authorization")
    |-req = req.WithContext(WithUser(req.Context(), resp.User))
    |-return handler.ServeHTTP(w, req)

2.1 认证(Authentication)

kube-apiserver 首先会对请求进行认证(authentication),以确保用户身份是合法的(verify that the requester is who they say they are)。

具体过程:启动时,检查所有的命令行参数[20],组织成一个 authenticator list,例如,

  • 如果指定了 --client-ca-file,就会将 x509 证书加到这个列表;

  • 如果指定了 --token-auth-file,就会将 token 加到这个列表;

不同 anthenticator 做的事情有所不同:

  • x509 handler[21] 验证该 HTTP 请求是用 TLS key 加密的,并且有 CA root 证书的签名。

  • bearer token handler[22] 验证请求中带的 token(HTTP Authorization 头中),在 apiserver 的 auth file 中是存在的(--token-auth-file)。

  • basicauth handler[23] 对 basic auth 信息进行校验。

如果认证成功,就会将 Authorization 头从请求中删除,然后在上下文中加上用户信息[24]。这使得后面的步骤(例如鉴权和 admission control)能用到这里已经识别出的用户身份信息。

// staging/src/k8s.io/apiserver/pkg/endpoints/filters/authentication.go

// WithAuthentication creates an http handler that tries to authenticate the given request as a user, and then
// stores any such user found onto the provided context for the request.
// On success, "Authorization" header is removed from the request and handler
// is invoked to serve the request.
func WithAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler,
    apiAuds authenticator.Audiences) http.Handler {
    return withAuthentication(handler, auth, failed, apiAuds, recordAuthMetrics)
}

func withAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler,
    apiAuds authenticator.Audiences, metrics recordMetrics) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
        resp, ok := auth.AuthenticateRequest(req) // 遍历所有 authenticator,任何一个成功就返回 OK
        if !ok {
            return failed.ServeHTTP(w, req)       // 所有认证方式都失败了
        }

        if !audiencesAreAcceptable(apiAuds, resp.Audiences) {
            fmt.Errorf("unable to match the audience: %v , accepted: %v", resp.Audiences, apiAuds)
            failed.ServeHTTP(w, req)
            return
        }

        req.Header.Del("Authorization") // 认证成功后,这个 header 就没有用了,可以删掉

        // 将用户信息添加到请求上下文中,供后面的步骤使用
        req = req.WithContext(WithUser(req.Context(), resp.User))
        handler.ServeHTTP(w, req)
    })
}

AuthenticateRequest() 实现:遍历所有 authenticator,任何一个成功就返回 OK,

// staging/src/k8s.io/apiserver/pkg/authentication/request/union/union.go

func (authHandler *unionAuthRequestHandler) AuthenticateRequest(req) (*Response, bool) {
    for currAuthRequestHandler := range authHandler.Handlers {
        resp, ok := currAuthRequestHandler.AuthenticateRequest(req)
        if ok {
            return resp, ok, err
        }
    }

    return nil, false, utilerrors.NewAggregate(errlist)
}

2.2 鉴权(Authorization)

发送者身份(认证)是一个问题,但他是否有权限执行这个操作(鉴权),是另一个问题。因此确认发送者身份之后,还需要进行鉴权。

鉴权的过程与认证非常相似,也是逐个匹配 authorizer 列表中的 authorizer:如果都失败了, 返回 Forbidden 并停止 进一步处理[25]。如果成功,就继续。

内置的 几种 authorizer 类型

  • webhook[26]:与其他服务交互,验证是否有权限。

  • ABAC[27]:根据静态文件中规定的策略(policies)来进行鉴权。

  • RBAC[28]:根据 role 进行鉴权,其中 role 是 k8s 管理员提前配置的。

  • Node[29]:确保 node clients,例如 kubelet,只能访问本机内的资源。

要看它们的具体做了哪些事情,可以查看它们各自的 Authorize() 方法。

2.3 Admission control

至此,认证和鉴权都通过了。但这还没结束,K8s 中的其它组件还需要对请求进行检查, 其中就包括 admission controllers[30]

与鉴权的区别

  • 鉴权(authorization)在前面,关注的是用户是否有操作权限

  • Admission controllers 在更后面,对请求进行拦截和过滤,确保它们符合一些更广泛的集群规则和限制, 是将请求对象持久化到 etcd 之前的最后堡垒

工作方式

  • 与认证和鉴权类似,也是遍历一个列表,

  • 但有一点核心区别:任何一个 controller 检查没通过,请求就会失败

设计:可扩展

  • 每个 controller 作为一个 plugin 存放在 plugin/pkg/admission 目录[31],

  • 设计时已经考虑,只需要实现很少的几个接口

  • 但注意,admission controller 最终会编译到 k8s 的二进制文件(而非独立的 plugin binary)

类型

Admission controllers 通常按不同目的分类,包括:资源管理、安全管理、默认值管 理、引用一致性(referential consistency)等类型。

例如,下面是资源管理类的几个 controller:

  • InitialResources:为容器设置默认的资源限制(基于过去的使用量);

  • LimitRanger:为容器的 requests and limits 设置默认值,或对特定资源设置上限(例如,内存默认 512MB,最高不超过 2GB)。

  • ResourceQuota:资源配额。

3 写入 etcd

至此,K8s 已经完成对请求的验证,允许它进行接下来的处理。

kube-apiserver 将对请求进行反序列化,构造 runtime objects( kubectl generator 的反过程),并将它们持久化到 etcd。下面详细 看这个过程。

3.0 调用栈概览

对于本文创建 pod 的请求,相应的入口是POST handler[32],它又会进一步将请求委托给一个创建具体资源的 handler。

registerResourceHandlers // staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
 |-case POST:
// staging/src/k8s.io/apiserver/pkg/endpoints/installer.go

        switch () {
        case "POST": // Create a resource.
            var handler restful.RouteFunction
            if isNamedCreater {
                handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
            } else {
                handler = restfulCreateResource(creater, reqScope, admit)
            }

            handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, .., handler)
            article := GetArticleForNoun(kind, " ")
            doc := "create" + article + kind
            if isSubresource {
                doc = "create " + subresource + " of" + article + kind
            }

            route := ws.POST(action.Path).To(handler).
                Doc(doc).
                Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
                Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
                Returns(http.StatusOK, "OK", producedObject).
                Returns(http.StatusCreated, "Created", producedObject).
                Returns(http.StatusAccepted, "Accepted", producedObject).
                Reads(defaultVersionedObject).
                Writes(producedObject)

            AddObjectParams(ws, route, versionedCreateOptions)
            addParams(route, action.Params)
            routes = append(routes, route)
        }

        for route := range routes {
            route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
                Group:   reqScope.Kind.Group,
                Version: reqScope.Kind.Version,
                Kind:    reqScope.Kind.Kind,
            })
            route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
            ws.Route(route)
        }

3.1 kube-apiserver 请求处理过程

从 apiserver 的请求处理函数开始:

// staging/src/k8s.io/apiserver/pkg/server/handler.go

func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    path := req.URL.Path

    // check to see if our webservices want to claim this path
    for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
        switch {
        case ws.RootPath() == "/apis":
            if path == "/apis" || path == "/apis/" {
                return d.goRestfulContainer.Dispatch(w, req)
            }

        case strings.HasPrefix(path, ws.RootPath()):
            if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
                return d.goRestfulContainer.Dispatch(w, req)
            }
        }
    }

    // if we didn't find a match, then we just skip gorestful altogether
    d.nonGoRestfulMux.ServeHTTP(w, req)
}

如果能匹配到请求(例如匹配到前面注册的路由),它将分派给相应的 handler[33];否则,fall back 到path-based handler[34]GET /apis 到达的就是这里);

基于 path 的 handlers:

// staging/src/k8s.io/apiserver/pkg/server/mux/pathrecorder.go

func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok {
        return exactHandler.ServeHTTP(w, r)
    }

    for prefixHandler := range h.prefixHandlers {
        if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) {
            return prefixHandler.handler.ServeHTTP(w, r)
        }
    }

    h.notFoundHandler.ServeHTTP(w, r)
}

如果还是没有找到路由,就会 fallback 到 non-gorestful handler,最终可能是一个 not found handler。

对于我们的场景,会匹配到一条已经注册的、名为`createHandler`[35]为的路由。

3.2 Create handler 处理过程

// staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go

func createHandler(r rest.NamedCreater, scope *RequestScope, admit Interface, includeName bool) http.HandlerFunc {
    return func(w http.ResponseWriter, req *http.Request) {
        namespace, name := scope.Namer.Name(req) // 获取资源的 namespace 和 name(etcd item key)
        s := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)

        body := limitedReadBody(req, scope.MaxRequestBodyBytes)
        obj, gvk := decoder.Decode(body, &defaultGVK, original)

        admit = admission.WithAudit(admit, ae)

        requestFunc := func() (runtime.Object, error) {
            return r.Create(
                name,
                obj,
                rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
            )
        }

        result := finishRequest(ctx, func() (runtime.Object, error) {
            if scope.FieldManager != nil {
                liveObj := scope.Creater.New(scope.Kind)
                obj = scope.FieldManager.UpdateNoErrors(liveObj, obj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
                admit = fieldmanager.NewManagedFieldsValidatingAdmissionController(admit)
            }

            admit.(admission.MutationInterface)
            mutatingAdmission.Handles(admission.Create)
            mutatingAdmission.Admit(ctx, admissionAttributes, scope)

            return requestFunc()
        })

        code := http.StatusCreated
        status, ok := result.(*metav1.Status)
        transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result)
    }
}
  1. 首先解析 HTTP request,然后执行基本的验证,例如保证 JSON 与 versioned API resource 期望的是一致的;

  2. 执行审计和最终 admission;

  3. 将资源最终写到 etcd[36], 这会进一步调用到 storage provider[37]

    etcd key 的格式一般是 /(例如,default/nginx-0),但这个也是可配置的。

  4. 最后,storage provider 执行一次 get 操作,确保对象真的创建成功了。如果有额外的收尾任务(additional finalization),会执行 post-create handlers 和 decorators。

  5. 返回 生成的[38]HTTP response。

以上过程可以看出,apiserver 做了大量的事情。

总结:至此我们的 pod 资源已经在 etcd 中了。但是,此时 kubectl get pods -n  还看不见它。

4 Initializers

对象持久化到 etcd 之后,apiserver 并未将其置位对外可见,它也不会立即就被调度, 而是要先等一些 initializers[39] 运行完成。

4.1 Initializer

Initializer 是与特定资源类型(resource type)相关的 controller

  • 负责在该资源对外可见之前对它们执行一些处理

  • 如果一种资源类型没有注册任何 initializer,这个步骤就会跳过,资源对外立即可见

这是一种非常强大的特性,使得我们能执行一些通用的启动初始化(bootstrap)操作。例如,

  • 向 Pod 注入 sidecar、暴露 80 端口,或打上特定的 annotation。

  • 向某个 namespace 内的所有 pod 注入一个存放了测试证书(test certificates)的 volume。

  • 禁止创建长度小于 20 个字符的 Secret (例如密码)。

4.2 InitializerConfiguration

可以用 InitializerConfiguration 声明对哪些资源类型(resource type)执行哪些 initializer

例如,要实现所有 pod 创建时都运行一个自定义的 initializer custom-pod-initializer, 可以用下面的 yaml:

apiVersion: admissionregistration.k8s.io/v1alpha1
kind: InitializerConfiguration
metadata:
  name: custom-pod-initializer
initializers:
  - name: podimage.example.com
    rules:
      - apiGroups:
          - ""
        apiVersions:
          - v1
        resources:
          - pods

创建以上配置(kubectl create -f xx.yaml)之后,K8s 会将custom-pod-initializer 追加到每个 pod 的 metadata.initializers.pending 字段

在此之前需要启动 initializer controller,它会

  • 定期扫描是否有新 pod 创建;

  • 检测到它的名字出现在 pod 的 pending 字段时,就会执行它的处理逻辑;

  • 执行完成之后,它会将自己的名字从 pending list 中移除。

pending list 中的 initializers,每次只有第一个 initializer 能执行。当所有 initializer 执行完成,pending 字段为空之后,就认为这个对象已经完成初始化了(considered initialized)。

细心的同学可能会有疑问:前面说这个对象还没有对外可见,那用 户空间的 initializer controller 又是如何能检测并操作这个对象的呢?答案是:kube-apiserver 提供了一个 ?includeUninitialized 查询参数,它会返回所有对象, 包括那些还未完成初始化的(uninitialized ones)。

5 Control loops(控制循环)

至此,对象已经在 etcd 中了,所有的初始化步骤也已经完成了。下一步是设置资源拓扑(resource topology)。例如,一个 Deployment 其实就是一组 ReplicaSet,而一个 ReplicaSet 就是一组 Pod。K8s 是如何根据一个 HTTP 请求创建出这个层级关系的呢?靠的是 K8s 内置的控制器(controllers)。

K8s 中大量使用 "controllers",

  • 一个 controller 就是一个异步脚本(an asynchronous script),

  • 不断检查资源的当前状态(current state)和期望状态(desired state)是否一致,

  • 如果不一致就尝试将其变成期望状态,这个过程称为 reconcile

每个 controller 负责的东西都比较少,所有 controller 并行运行, 由 kube-controller-manager 统一管理

5.1 Deployments controller

Deployments controller 启动

当一个 Deployment record 存储到 etcd 并(被 initializers)初始化之后, kube-apiserver 就会将其置为对外可见的。此后, Deployment controller 监听了 Deployment 资源的变动,因此此时就会检测到这个新创建的资源。

// pkg/controller/deployment/deployment_controller.go

// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer DeploymentInformer, rsInformer ReplicaSetInformer,
    podInformer PodInformer, client clientset.Interface) (*DeploymentController, error) {

    dc := &DeploymentController{
        client:        client,
        queue:         workqueue.NewNamedRateLimitingQueue(),
    }
    dc.rsControl = controller.RealRSControl{ // ReplicaSet controller
        KubeClient: client,
        Recorder:   dc.eventRecorder,
    }

    // 注册 Deployment 事件回调函数
    dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addDeployment,    // 有 Deployment 创建时触发
        UpdateFunc: dc.updateDeployment,
        DeleteFunc: dc.deleteDeployment,
    })
    // 注册 ReplicaSet 事件回调函数
    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addReplicaSet,
        UpdateFunc: dc.updateReplicaSet,
   &
锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章