Kubernetes1.5源码分析(一) apiServer启动分析


源码版本

Kubernetes v1.5.0

简介

apiserver是K8S最重要的组成部分,不论是命令操作还是通过remote API进行控制,实际都需要经过apiserver。
apiserver是k8s系统中所有对象的增删改查盯的http/restful式服务端,其中盯是指watch操作。数据最终存储在分布式一致的etcd存储内,apiserver本身是无状态的,提供了这些数据访问的认证鉴权、缓存、api版本适配转换等一系列的功能。

关键结构:

ServerRunOptions结构:
路径: cmd/kube-apiserver/app/options/options.go
type ServerRunOptions struct {
// 重名,下面称为GenericServerRunOptions
GenericServerRunOptions     *genericoptions.ServerRunOptions    // 服务器通用的运行参数
AllowPrivileged             bool  // 是否配置超级权限,即允许Pod中运行的容器拥有系统特权
EventTTL                    time.Duration  // 事件留存事件, 默认1h
KubeletConfig               kubeletclient.KubeletClientConfig  // K8S kubelet配置
MaxConnectionBytesPerSec    int64   // 每秒的最大连接数
// 指定的话,可以通过SSH指定的秘钥文件和用户名对Node进行访问
SSHKeyfile                  string
SSHUser                     string
// 包含PEM-encoded x509 RSA公钥和私钥的文件路径,用于验证Service Account的token
// 不指定的话,则使用--tls-private-key-file指定的文件
ServiceAccountKeyFile       string
// 设置为true时,系统会到etcd验证ServiceAccount token是否存在
ServiceAccountLookup        bool
WebhookTokenAuthnConfigFile string
WebhookTokenAuthnCacheTTL   time.Duration
}


ServerRunOptions结构:
路径: pkg/genericapiserver/options/server_run_options.go
type ServerRunOptions struct {
// 准入控制,如:"AlwaysAdmit","LimitRanger","ReousrceQuota"等
AdmissionControl           string   
// 准入控制的配置文件
AdmissionControlConfigFile string   
// 用于广播给集群的所有成员自己的IP地址,不指定的话就使用"--bind-address"的IP地址
AdvertiseAddress           net.IP   
// 安全访问的认证模式列表,以逗号分隔,包括:AlwaysAllow、AlwaysDeny、ABAC、Webhook、RBAC
AuthorizationMode                        string
// mode设置为ABAC时使用的csv格式的授权配置文件
AuthorizationPolicyFile                  string
// 下列跟mode配置成webhook有关
AuthorizationWebhookConfigFile           string
AuthorizationWebhookCacheAuthorizedTTL   time.Duration
AuthorizationWebhookCacheUnauthorizedTTL time.Duration
// mode设置为RBAC时使用的超级用户名,用该用户名进行RBAC认证
AuthorizationRBACSuperUser               string

AnonymousAuth                bool
// 使用http基本认证的方式访问API Server的安全端口
BasicAuthFile                string
// 默认"0.0.0.0",apiServer在该地址的6443端口上开启https服务
BindAddress                  net.IP
// TLS证书所在目录,默认"/var/run/kubernetes"
CertDirectory                string
// 指定的话,该客户端证书将用于认证过程
ClientCAFile                 string
// 下列的云服务商有关
CloudConfigFile              string
CloudProvider                string
// CORS 跨域资源共享
CorsAllowedOriginList        []string
// 默认的持久化存储格式,比如"application/json"
DefaultStorageMediaType      string
// 指定清理的工作线程数,可以提高清理namespace的效率,但是会增加系统资源的占用
DeleteCollectionWorkers      int
// 日志相关策略
AuditLogPath                 string
AuditLogMaxAge               int
AuditLogMaxBackups           int
AuditLogMaxSize              int
// 使能GC
EnableGarbageCollection      bool
// 打开性能分析,可以通过<host>:<port>/debug/pprof/地址来查看程序栈,线程等信息
EnableProfiling              bool
EnableContentionProfiling    bool
// 使能swaggerUI,访问地址<host>:<port>/swagger-ui
EnableSwaggerUI              bool
// 使能watch cache,对所有的watch操作进行缓存
EnableWatchCache             bool
// 按资源覆盖etcd服务的设置,以逗号分隔,比如group/resource#servers,其中servers为: http://ip:port
EtcdServersOverrides         []string
StorageConfig                storagebackend.Config
// 用于生成该master对外的URL地址
ExternalHost                 string
// 绑定的不安全地址,即8080端口绑定的地址
InsecureBindAddress          net.IP
// 非安全端口,默认8080
InsecurePort                 int
// 设置keystone鉴权插件地址
KeystoneURL                  string
KeystoneCAFile               string

KubernetesServiceNodePort    int
LongRunningRequestRE         string
// master数量
MasterCount                  int
// 设置master服务所在的namespace,默认为default
MasterServiceNamespace       string
// 同时处理的最大请求数,默认为400,超过该请求数将被拒绝。仅用于长时间执行的请求
MaxRequestsInFlight          int
// 最小请求处理超时时间,默认1800s,仅用于watch request
MinRequestTimeout            int
// 该文件内设置鉴权机构
OIDCCAFile                   string
OIDCClientID                 string
OIDCIssuerURL                string
OIDCUsernameClaim            string
OIDCGroupsClaim              string
RequestHeaderUsernameHeaders []string
RequestHeaderClientCAFile    string
RequestHeaderAllowedNames    []string
// 一组key=value用于运行时的配置信息。api/<groupVersion>/<resource>,用于打开或者关闭对某个API版本的支持
// api/all和api/legacy特别用于支持所有版本的API或支持旧版本的API
RuntimeConfig                config.ConfigurationMap
// https安全端口,默认6443;设置为0,表示不开启https
SecurePort                   int
// service的Cluster IP池
ServiceClusterIPRange        net.IPNet // TODO: make this a list
// service的NodePort模式下能使用的主机端口号范围,默认是30000--32767
ServiceNodePortRange         utilnet.PortRange
// 持久化存储的资源版本号,例如"group1/version1,group2/version2,..."
StorageVersions              string
// The default values for StorageVersions. StorageVersions overrides
// these; you can change this if you want to change the defaults (e.g.,
// for testing). This is not actually exposed as a flag.
DefaultStorageVersions string
TargetRAMMB            int
// TLS CA文件
TLSCAFile              string
// 包含x509证书的文件路径,用于https认证
TLSCertFile            string
// 包含x509与tls-cert-file对应的私钥文件路径
TLSPrivateKeyFile      string
SNICertKeys            []config.NamedCertKey
// 用于访问APIServer安全端口的token认证文件路径
TokenAuthFile          string
// 使能token
EnableAnyToken         bool
// 设置各资源对象watch缓存大小的列表,以逗号分隔,格式为resource#size
// 前提是EnableWatchCache为true
WatchCacheSizes        []string
}


ApiServer启动 ##

路径:kubernetes/cmd/kube-apiserver/apiserver.go

**入口main()函数: **
func main() {
rand.Seed(time.Now().UTC().UnixNano())

// 新建一个apiserver对象
s := options.NewServerRunOptions()
// 接受用户命令行输入,其实就是自定义上述apiserver对象 
s.AddFlags(pflag.CommandLine)
// 解析并格式化用户传入的参数,最后填充APIServer结构体的各成员
flag.InitFlags()
// 初始化log配置,包括log输出位置、log等级等。
logs.InitLogs()
// 保证了即使apiserver异常崩溃了也能将内存中的log信息保存到磁盘文件中。 
defer logs.FlushLogs()
// 如果用户只是想看apiserver的版本号而不是启动apiserver,则打印apiserver的版本号并退出。
verflag.PrintAndExitIfRequested()

// 将创建的apiserver对象传入app.Run()中,最终绑定本地端口并绑定本地端口并创建一个HTTP Server与一个HTTPS Server。
if err := app.Run(s); err != nil {
    fmt.Fprintf(os.Stderr, "%v\n", err)
    os.Exit(1)
}
}


新建一个apiserver对象---NewServerRunOptions():

func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
    // 初始化通用的apiserver运行参数,包括etcd后端存储参数
    GenericServerRunOptions: genericoptions.NewServerRunOptions().WithEtcdOptions(),
    // 事件的存储保留时间
    EventTTL:                1 * time.Hour,
    // Node上kubelet的客户端配置
    KubeletConfig: kubeletclient.KubeletClientConfig{
        // kubelet通信端口
        Port: ports.KubeletPort,
        PreferredAddressTypes: []string{
            string(api.NodeHostName),
            string(api.NodeInternalIP),
            string(api.NodeExternalIP),
            string(api.NodeLegacyHostIP),
        },
        // 是否开启https
        EnableHttps: true,
        // HTTP超时
        HTTPTimeout: time.Duration(5) * time.Second,
    },
    // 将webhook token authenticator返回的响应保存在缓存内的时间
    WebhookTokenAuthnCacheTTL: 2 * time.Minute,
}
return &s
}


上面的接口在初始化GenericServerRunOptions参数时又调用了genericoptions.NewServerRunOptions().WithEtcdOptions()接口,先来看下与上面接口名字一样的NewServerRunOptions():
func NewServerRunOptions() *ServerRunOptions {
return &ServerRunOptions{
    // 以逗号作为分隔符的Admission Control插件的排序列表
    AdmissionControl:                         "AlwaysAdmit",
    AnonymousAuth:                            false,
    // 授权模式
    AuthorizationMode:                        "AlwaysAllow",
    AuthorizationWebhookCacheAuthorizedTTL:   5 * time.Minute,
    AuthorizationWebhookCacheUnauthorizedTTL: 30 * time.Second,
    // apiserver绑定的网卡地址
    BindAddress:                              net.ParseIP("0.0.0.0"),
    // 证书目录
    CertDirectory:                            "/var/run/kubernetes",
    // 默认的对象存储类型
    DefaultStorageMediaType:                  "application/json",
    DefaultStorageVersions:                   registered.AllPreferredGroupVersions(),
    DeleteCollectionWorkers:                  1,
    EnableGarbageCollection:                  true,
    EnableProfiling:                          true,
    EnableContentionProfiling:                false,
    EnableWatchCache:                         true,
    // HTTP绑定的IP地址
    InsecureBindAddress:                      net.ParseIP("127.0.0.1"),
    // 不安全端口(HTTP)
    InsecurePort:                             8080,
    LongRunningRequestRE:                     DefaultLongRunningRequestRE,
    // Kubernetes系统中Master的数量
    MasterCount:                              1,
    MasterServiceNamespace:                   api.NamespaceDefault,
    MaxRequestsInFlight:                      400,
    MinRequestTimeout:                        1800,
    // k8s运行时环境配置
    RuntimeConfig:                            make(config.ConfigurationMap),
    // 安全端口
    SecurePort:                               6443,
    ServiceNodePortRange:                     DefaultServiceNodePortRange,
    StorageVersions:                          registered.AllPreferredGroupVersions(),
}
}


可以看到初始化的时候会有SecurePort、InsecurePort,实际就是对应HTTP、HTTPS的绑定端口。
我们可以看到这里的控制还是很全面的,包括安全控制(CertDirectory, HTTPS默认启动)、权限控制(AdmissionControl,AuthorizationMode)、服务限流控制(MaxRequestsInFlight)等。
具体的参数前面介绍结构体时基本都有提到。
继续后端存储etcd的配置初始化WithEtcdOptions():
func (o *ServerRunOptions) WithEtcdOptions() *ServerRunOptions {
o.StorageConfig = storagebackend.Config{
    // etcd的默认路径前缀:/registry
    Prefix: DefaultEtcdPathPrefix,
    // 反序列化cache,未设置的话,会根据apiServer的内存限制进行配置
    DeserializationCacheSize: 0,
}
return o
}


到这里apiServer的运行参数初始化关键性步骤基本结束,至于后面的s.AddFlags(pflag.CommandLine)就是获取命令行的输入信息,然后进行重新覆盖,这里就不讲了。
可以根据kube-apiserver进程的命令行信息,把命令行传参和结构配置进行对应:

/usr/bin/kube-apiserver --logtostderr=true --v=0 --etcd-servers=http://test-master:2379 --insecure-bind-address=0.0.0.0 --port=8080 --kubelet-port=10250 --allow-privileged=false --service-cluster-ip-range=10.254.0.0/16 --admission-control=NamespaceLifecycle,NamespaceExists,LimitRanger,SecurityContextDeny,ServiceAccount,ResourceQuota --service-account-key-file=/var/run/kubernetes/apiserver.key




初始化完成之后,最重要的任务就是启动实例了。
所有的操作都是在run函数中执行,app.run()接口实现在cmd/kube-apiserver/app/server.go。
RUN源码分析:

func Run(s *options.ServerRunOptions) error {
// 检查etcd后端存储相关参数的有效性
genericvalidation.VerifyEtcdServersList(s.GenericServerRunOptions)
// 检查一些运行参数的有效性,并会设置一些默认值
// 比如options.AdvertiseAddress参数没有设置,并且bind-address也没有设置,
// k8s将会获取默认网卡的地址给该成员
genericapiserver.DefaultAndValidateRunOptions(s.GenericServerRunOptions)
// 根据之前初始化的GenericServerRunOptions对象来初始化创建genericapiserver.config
// NewConfig()是初始化了一个默认的config,
// ApplyOptions()根据GenericServerRunOptions进行再一遍的初始化
// Complete()对一些没填充的字段,可以根据别的字段进行初始化
// 实际NewConfig()中也调用了ApplyOptions()接口,只是参数是default值
genericConfig := genericapiserver.NewConfig(). // create the new config
                        ApplyOptions(s.GenericServerRunOptions).
                        Complete()

// 根据ServiceClusterIPRange输入参数,获取IPRange和ServiceIP
serviceIPRange, apiServerServiceIP, err := genericapiserver.DefaultServiceIPRange(s.GenericServerRunOptions.ServiceClusterIPRange)
if err != nil {TargetRAMMB
    glog.Fatalf("Error determining service IP ranges: %v", err)
}
// 有需要的话生成证书
if err := genericConfig.MaybeGenerateServingCerts(apiServerServiceIP); err != nil {
    glog.Fatalf("Failed to generate service certificate: %v", err)
}

// 初始化能力集,多次运行apiserver也只会初始化一次
capabilities.Initialize(capabilities.Capabilities{
    // 是否有超级权限
    AllowPrivileged: s.AllowPrivileged,
    // TODO(vmarmol): Implement support for HostNetworkSources.
    PrivilegedSources: capabilities.PrivilegedSources{
        HostNetworkSources: []string{},
        HostPIDSources:     []string{},
        HostIPCSources:     []string{},
    },
    // 每个用户连接的最大值,字节数/秒。当前只适用于长时间运行的请求
    PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
})

// 有需要的话设置网络隧道
var tunneler genericapiserver.Tunneler
var proxyDialerFn apiserver.ProxyDialerFunc
// 如果运行在云平台中,则需要安装本机的SSH Key到Kubernetes集群中所有节点上
// 可以用于通过该用户名和私钥,SSH到node上
if len(s.SSHUser) > 0 {
    // Get ssh key distribution func, if supported
    var installSSH genericapiserver.InstallSSHKey
    cloud, err := cloudprovider.InitCloudProvider(s.GenericServerRunOptions.CloudProvider, s.GenericServerRunOptions.CloudConfigFile)
    if err != nil {
        glog.Fatalf("Cloud provider could not be initialized: %v", err)
    }
    if cloud != nil {
        if instances, supported := cloud.Instances(); supported {
            installSSH = instances.AddSSHKeyToAllInstances
        }
    }
    if s.KubeletConfig.Port == 0 {
        glog.Fatalf("Must enable kubelet port if proxy ssh-tunneling is specified.")
    }
    // Set up the tunneler
    // TODO(cjcullen): If we want this to handle per-kubelet ports or other
    // kubelet listen-addresses, we need to plumb through options.
    healthCheckPath := &url.URL{
        Scheme: "https",
        Host:   net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.Port), 10)),
        Path:   "healthz",
    }
    tunneler = genericapiserver.NewSSHTunneler(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSH)

    // Use the tunneler's dialer to connect to the kubelet
    s.KubeletConfig.Dial = tunneler.Dial
    // Use the tunneler's dialer when proxying to pods, services, and nodes
    proxyDialerFn = tunneler.Dial
}

// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}

// 后端存储etcd的反序列化缓存没有设置的话,根据TargetRAMMB值进行恰当的设置
// TargetRAMMB:用户手动输入的apiServer的内存限制(单位:MB)
// 小于1000MB的话按1000MB算
if s.GenericServerRunOptions.StorageConfig.DeserializationCacheSize == 0 {
    glog.V(2).Infof("Initalizing deserialization cache size based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)

    clusterSize := s.GenericServerRunOptions.TargetRAMMB / 60
    s.GenericServerRunOptions.StorageConfig.DeserializationCacheSize = 25 * clusterSize
    if s.GenericServerRunOptions.StorageConfig.DeserializationCacheSize < 1000 {
        s.GenericServerRunOptions.StorageConfig.DeserializationCacheSize = 1000
    }
}
// 存储组版本
storageGroupsToEncodingVersion, err := s.GenericServerRunOptions.StorageGroupsToEncodingVersion()
if err != nil {
    glog.Fatalf("error generating storage version map: %s", err)
}
// 创建api工厂,包括请求头、解析工具、编码格式、API配置
// 创建了一个DefaultStorageFactory对象
storageFactory, err := genericapiserver.BuildDefaultStorageFactory(
    s.GenericServerRunOptions.StorageConfig, s.GenericServerRunOptions.DefaultStorageMediaType, api.Codecs,
    genericapiserver.NewDefaultResourceEncodingConfig(), storageGroupsToEncodingVersion,
    // FIXME: this GroupVersionResource override should be configurable
    []unversioned.GroupVersionResource{batch.Resource("cronjobs").WithVersion("v2alpha1")},
    master.DefaultAPIResourceConfigSource(), s.GenericServerRunOptions.RuntimeConfig)
if err != nil {
    glog.Fatalf("error in initializing storage factory: %s", err)
}
// 添加jobs和HPA(水平自动扩容)的接口
storageFactory.AddCohabitatingResources(batch.Resource("jobs"), extensions.Resource("jobs"))
storageFactory.AddCohabitatingResources(autoscaling.Resource("horizontalpodautoscalers"), extensions.Resource("horizontalpodautoscalers"))
// 根据用户输入的etcd-servers-overrides参数,设置对应groupResource对应的etcd地址
for _, override := range s.GenericServerRunOptions.EtcdServersOverrides {
    tokens := strings.Split(override, "#")
    if len(tokens) != 2 {
        glog.Errorf("invalid value of etcd server overrides: %s", override)
        continue
    }

    apiresource := strings.Split(tokens[0], "/")
    if len(apiresource) != 2 {
        glog.Errorf("invalid resource definition: %s", tokens[0])
        continue
    }
    group := apiresource[0]
    resource := apiresource[1]
    groupResource := unversioned.GroupResource{Group: group, Resource: resource}

    servers := strings.Split(tokens[1], ";")
    // 上面都是解析用户输入的字符串,并生成对应的groupResource
    // 设置对应groupResource的etcdLocation
    storageFactory.SetEtcdLocation(groupResource, servers)
}

// 授权认证有关
if len(s.ServiceAccountKeyFiles) == 0 && s.GenericServerRunOptions.TLSPrivateKeyFile != "" {
    if authenticator.IsValidServiceAccountKeyFile(s.GenericServerRunOptions.TLSPrivateKeyFile) {
        s.ServiceAccountKeyFiles = []string{s.GenericServerRunOptions.TLSPrivateKeyFile}
    } else {
        glog.Warning("No TLS key provided, service account token authentication disabled")
    }
}

var serviceAccountGetter serviceaccount.ServiceAccountTokenGetter
// 判断是否设置为true,是的话则创建接口用于从etcd验证ServiceAccount token是否存在
if s.ServiceAccountLookup {
    // If we need to look up service accounts and tokens,
    // go directly to etcd to avoid recursive auth insanity
    storageConfig, err := storageFactory.NewConfig(api.Resource("serviceaccounts"))
    if err != nil {
        glog.Fatalf("Unable to get serviceaccounts storage: %v", err)
    }
    serviceAccountGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storageConfig, storageFactory.ResourcePrefix(api.Resource("serviceaccounts")), storageFactory.ResourcePrefix(api.Resource("secrets")))
}

// 安全认证相关
apiAuthenticator, securityDefinitions, err := authenticator.New(authenticator.AuthenticatorConfig{
    Anonymous:                   s.GenericServerRunOptions.AnonymousAuth,
    AnyToken:                    s.GenericServerRunOptions.EnableAnyToken,
    // 指定basicauthfile文件所在的位置,当这个参数不为空的时候,
    // 会开启basicauth的认证方式,这是一个.csv文件,
    // 三列分别是password,username,useruid
    BasicAuthFile:               s.GenericServerRunOptions.BasicAuthFile,
    // 用于给客户端签名的根证书,当这个参数不为空的时候,
    // 会开启https的认证方式,会通过这个根证书对客户端的证书进行身份认证
    ClientCAFile:                s.GenericServerRunOptions.ClientCAFile,
    // 用于Token文件所在的位置,当这个参数不为空的时候,会采用token的认证方式,
    // token文件也是csv的格式,分别是“token,username,userid”
    TokenAuthFile:               s.GenericServerRunOptions.TokenAuthFile,
    OIDCIssuerURL:               s.GenericServerRunOptions.OIDCIssuerURL,
    OIDCClientID:                s.GenericServerRunOptions.OIDCClientID,
    OIDCCAFile:                  s.GenericServerRunOptions.OIDCCAFile,
    OIDCUsernameClaim:           s.GenericServerRunOptions.OIDCUsernameClaim,
    OIDCGroupsClaim:             s.GenericServerRunOptions.OIDCGroupsClaim,
    // 当不为空的时候,采用ServiceAccount的认证方式,这其实是一个公钥方式。
    // 发过来的信息是客户端使用对应的私钥加密,服务端使用指定的公钥来解密信息
    ServiceAccountKeyFiles:      s.ServiceAccountKeyFiles,
    // 默认为false。如果为true的话,就会从etcd中取出对应的ServiceAccount与
    // 传过来的信息进行对比验证,反之不会
    ServiceAccountLookup:        s.ServiceAccountLookup,
    ServiceAccountTokenGetter:   serviceAccountGetter,
    KeystoneURL:                 s.GenericServerRunOptions.KeystoneURL,
    KeystoneCAFile:              s.GenericServerRunOptions.KeystoneCAFile,
    WebhookTokenAuthnConfigFile: s.WebhookTokenAuthnConfigFile,
    WebhookTokenAuthnCacheTTL:   s.WebhookTokenAuthnCacheTTL,
    RequestHeaderConfig:         s.GenericServerRunOptions.AuthenticationRequestHeaderConfig(),
})

if err != nil {
    glog.Fatalf("Invalid Authentication Config: %v", err)
}

privilegedLoopbackToken := uuid.NewRandom().String()
selfClientConfig, err := s.GenericServerRunOptions.NewSelfClientConfig(privilegedLoopbackToken)
if err != nil {
    glog.Fatalf("Failed to create clientset: %v", err)
}
client, err := s.GenericServerRunOptions.NewSelfClient(privilegedLoopbackToken)
if err != nil {
    glog.Errorf("Failed to create clientset: %v", err)
}
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)

authorizationConfig := authorizer.AuthorizationConfig{
    PolicyFile:                  s.GenericServerRunOptions.AuthorizationPolicyFile,
    WebhookConfigFile:           s.GenericServerRunOptions.AuthorizationWebhookConfigFile,
    WebhookCacheAuthorizedTTL:   s.GenericServerRunOptions.AuthorizationWebhookCacheAuthorizedTTL,
    WebhookCacheUnauthorizedTTL: s.GenericServerRunOptions.AuthorizationWebhookCacheUnauthorizedTTL,
    RBACSuperUser:               s.GenericServerRunOptions.AuthorizationRBACSuperUser,
    InformerFactory:             sharedInformers,
}
authorizationModeNames := strings.Split(s.GenericServerRunOptions.AuthorizationMode, ",")
apiAuthorizer, err := authorizer.NewAuthorizerFromAuthorizationConfig(authorizationModeNames, authorizationConfig)
if err != nil {
    glog.Fatalf("Invalid Authorization Config: %v", err)
}

admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",")

// TODO(dims): We probably need to add an option "EnableLoopbackToken"
if apiAuthenticator != nil {
    var uid = uuid.NewRandom().String()
    tokens := make(map[string]*user.DefaultInfo)
    tokens[privilegedLoopbackToken] = &user.DefaultInfo{
        Name:   user.APIServerUser,
        UID:    uid,
        Groups: []string{user.SystemPrivilegedGroup},
    }

    tokenAuthenticator := authenticator.NewAuthenticatorFromTokens(tokens)
    apiAuthenticator = authenticatorunion.New(tokenAuthenticator, apiAuthenticator)

    tokenAuthorizer := authorizer.NewPrivilegedGroups(user.SystemPrivilegedGroup)
    apiAuthorizer = authorizerunion.New(tokenAuthorizer, apiAuthorizer)
}

pluginInitializer := admission.NewPluginInitializer(sharedInformers, apiAuthorizer)
// 准入控制器
admissionController, err := admission.NewFromPlugins(client, admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile, pluginInitializer)
if err != nil {
    glog.Fatalf("Failed to initialize plugins: %v", err)
}

proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
    Dial:            proxyDialerFn,
    TLSClientConfig: proxyTLSClientConfig,
})
kubeVersion := version.Get()

// genericConfig在该接口最开始进行了创建并初始化
genericConfig.Version = &kubeVersion
genericConfig.LoopbackClientConfig = selfClientConfig
genericConfig.Authenticator = apiAuthenticator
genericConfig.Authorizer = apiAuthorizer
genericConfig.AdmissionControl = admissionController
genericConfig.APIResourceConfigSource = storageFactory.APIResourceConfigSource
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
genericConfig.OpenAPIConfig.Definitions = generatedopenapi.OpenAPIDefinitions
genericConfig.EnableOpenAPISupport = true
genericConfig.EnableMetrics = true
genericConfig.OpenAPIConfig.SecurityDefinitions = securityDefinitions

// master.Config配置初始化
config := &master.Config{
    GenericConfig: genericConfig.Config,

    StorageFactory:          storageFactory,
    EnableWatchCache:        s.GenericServerRunOptions.EnableWatchCache,
    EnableCoreControllers:   true,
    DeleteCollectionWorkers: s.GenericServerRunOptions.DeleteCollectionWorkers,
    EventTTL:                s.EventTTL,
    KubeletClientConfig:     s.KubeletConfig,
    EnableUISupport:         true,
    EnableLogsSupport:       true,
    ProxyTransport:          proxyTransport,

    Tunneler: tunneler,

    ServiceIPRange:       serviceIPRange,
    APIServerServiceIP:   apiServerServiceIP,
    APIServerServicePort: 443,

    ServiceNodePortRange:      s.GenericServerRunOptions.ServiceNodePortRange,
    KubernetesServiceNodePort: s.GenericServerRunOptions.KubernetesServiceNodePort,

    MasterCount: s.GenericServerRunOptions.MasterCount,
}
// 判断是否对watch cache进行了使能,默认是true
// 是true的话,会初始化watchCacheSize,然后设置各个resource的CacheSize
if s.GenericServerRunOptions.EnableWatchCache {
    glog.V(2).Infof("Initalizing cache sizes based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
    cachesize.InitializeWatchCacheSizes(s.GenericServerRunOptions.TargetRAMMB)
    cachesize.SetWatchCacheSizes(s.GenericServerRunOptions.WatchCacheSizes)
}
// 创建master
// Complete()完善了config的初始化
// New()进行resources的初始化及RESTful-api注册
m, err := config.Complete().New()
if err != nil {
    return err
}

sharedInformers.Start(wait.NeverStop)
// 运行HTTP/HTTPS服务
m.GenericAPIServer.PrepareRun().Run(wait.NeverStop)
return nil
}


该接口调用主要用于生成master实例对象,各种api的请求最后都是通过master对象来处理的。
在最后APIServer会启动HTTP/HTTPS服务。

基本的启动流程就介绍完了,这里不进入细讲,由于大致了解下启动流程。
后面会继续分章节介绍各个关键点。

0 个评论

要回复文章请先登录注册