Kubernetes1.5源码分析(四) apiServer资源的etcd接口实现


源码版本

Kubernetes v1.5.0

简介

k8s的各个组件与apiServer交互操作各种资源对象,最终都会落入到etcd中。
k8s为所有对外提供服务的Restful资源实现了一套通用的符合Restful要求的etcd操作接口,每个服务接口负责处理一类(Kind)资源对象。
这些资源对象包括pods、bindings、podTemplates、RC、Services等。

Storage创建

要了解etcd操作接口的实现,我们先需要了解下Master.GenericAPIServer.storage结构:
storage map[string]rest.Storage

该storage变量是个map,Key是REST API的path,Value是rest.Storage接口,该接口就是一个通用的符合Restful要求的资源存储接口。
核心组资源列表的创建要查看pkg/registry/core/rest/storage_core.go中的NewLegacyRESTStorage()接口:
接口调用流程: main --> App.Run --> config.Complete().New() --> m.InstallLegacyAPI() --> NewLegacyRESTStorage()
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter genericapiserver.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
....
// 创建podStorage
podStorage := podetcd.NewStorage(
    restOptionsGetter(api.Resource("pods")),
    nodeStorage.KubeletConnectionInfo,
    c.ProxyTransport,
    podDisruptionClient,
)

...
// 资源列表
restStorageMap := map[string]rest.Storage{
    "pods":             podStorage.Pod,
    "pods/attach":      podStorage.Attach,
    "pods/status":      podStorage.Status,
    "pods/log":         podStorage.Log,
    "pods/exec":        podStorage.Exec,
    "pods/portforward": podStorage.PortForward,
    "pods/proxy":       podStorage.Proxy,
    "pods/binding":     podStorage.Binding,
    "bindings":         podStorage.Binding,

    "podTemplates": podTemplateStorage,

    "replicationControllers":        controllerStorage.Controller,
    "replicationControllers/status": controllerStorage.Status,

    "services":        serviceRest.Service,
    "services/proxy":  serviceRest.Proxy,
    "services/status": serviceStatusStorage,

    "endpoints": endpointsStorage,

    ...

    "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
}
if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}) {
    restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
}
if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "policy", Version: "v1beta1"}) {
    restStorageMap["pods/eviction"] = podStorage.Eviction
}
apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap

return restStorage, apiGroupInfo, nil
}

该接口在ApiServer源码分析的第二章介绍资源注册的时候已经讲过,这里我们主要分析后端存储etcd操作接口的实现。
我们以Pod资源为例,进行介绍:
路径: pkg/registry/core/pod/etcd/etcd.go
func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage {
// 完成prefix
prefix := "/" + opts.ResourcePrefix

newListFunc := func() runtime.Object { return &api.PodList{} }
// 调用接口装饰器,返回该storage的etcd操作接口及资源delete接口
// 该opts传参进来的,需要到上一层查看master.go下的restOptionsFactory.NewFor
storageInterface, dFunc := opts.Decorator(
    opts.StorageConfig,
    // 这一下的参数都是用于开启cache时的接口使用
    cachesize.GetWatchCacheSizeByResource(cachesize.Pods),
    &api.Pod{},
    prefix,
    pod.Strategy,
    newListFunc,
    pod.NodeNameTriggerFunc,
)

store := &registry.Store{
    NewFunc:     func() runtime.Object { return &api.Pod{} },
    NewListFunc: newListFunc,
    KeyRootFunc: func(ctx api.Context) string {
        return registry.NamespaceKeyRootFunc(ctx, prefix)
    },
    KeyFunc: func(ctx api.Context, name string) (string, error) {
        return registry.NamespaceKeyFunc(ctx, prefix, name)
    },
    ObjectNameFunc: func(obj runtime.Object) (string, error) {
        return obj.(*api.Pod).Name, nil
    },
    PredicateFunc:           pod.MatchPod,
    QualifiedResource:       api.Resource("pods"),
    EnableGarbageCollection: opts.EnableGarbageCollection,
    DeleteCollectionWorkers: opts.DeleteCollectionWorkers,

    CreateStrategy:      pod.Strategy,
    UpdateStrategy:      pod.Strategy,
    DeleteStrategy:      pod.Strategy,
    ReturnDeletedObject: true,

    Storage:     storageInterface,
    DestroyFunc: dFunc,
}

statusStore := *store
statusStore.UpdateStrategy = pod.StatusStrategy

return PodStorage{
    Pod:         &REST{store, proxyTransport},
    Binding:     &BindingREST{store: store},
    Eviction:    newEvictionStorage(store, podDisruptionBudgetClient),
    Status:      &StatusREST{store: &statusStore},
    Log:         &podrest.LogREST{Store: store, KubeletConn: k},
    Proxy:       &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
    Exec:        &podrest.ExecREST{Store: store, KubeletConn: k},
    Attach:      &podrest.AttachREST{Store: store, KubeletConn: k},
    PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
}
}

该接口中调用了opts.Decorator()接口返回了关键的storage interface及清除操作资源的接口。
要看该接口的实现,我们得先从opts的创建开始。
restOptionsGetter(api.Resource("pods"))该步完成了opts的创建,api.Resource("pods")其实就是拼接了一个GroupResource的结构,我们需要从头开始介绍restOptionsGetter接口的由来。
路径:pkg/master/master.go
func (c completedConfig) New() (*Master, error) {
...
restOptionsFactory := restOptionsFactory{
    deleteCollectionWorkers: c.DeleteCollectionWorkers,
    enableGarbageCollection: c.GenericConfig.EnableGarbageCollection,
    storageFactory:          c.StorageFactory,
}

// 判断是否使能了用于Watch的Cache
// 有无cache赋值的是不同的接口实现
// restOptionsFactory.storageDecorator:是一个各个资源的REST interface(CRUD)装饰者
// 后面调用NewStorage()时会用到该接口,并输出对应的CRUD接口及销毁接口。
// 可以参考pkg/registry/core/pod/etcd/etcd.go中的NewStorage()
// 其实这里有无cache的接口差异就在于:有cache的话,就提供操作cache的接口;无cache的话,就提供直接操作etcd的接口
if c.EnableWatchCache {
    restOptionsFactory.storageDecorator = registry.StorageWithCacher
} else {
    restOptionsFactory.storageDecorator = generic.UndecoratedStorage
}
// install legacy rest storage
if c.GenericConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) {
    legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
        StorageFactory:       c.StorageFactory,
        ProxyTransport:       c.ProxyTransport,
        KubeletClientConfig:  c.KubeletClientConfig,
        EventTTL:             c.EventTTL,
        ServiceIPRange:       c.ServiceIPRange,
        ServiceNodePortRange: c.ServiceNodePortRange,
        LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
    }
    m.InstallLegacyAPI(c.Config, restOptionsFactory.NewFor, legacyRESTStorageProvider)
}
...
}

该接口初始化了一个restOptionsFactory变量,里面指定了最大的删除回收资源的协程数,是否使能GC和storageFactory,还根据是否使能了WatchCache来完成NewStorage()接口中调用的装饰器接口的赋值。
restOptionsFactory.NewForj接口一直被往下传,直到NewLegacyRESTStorage()接口中被调用然后创建了opts,我们看下该接口实现:
路径: pkg/master/master.go
type restOptionsFactory struct {
deleteCollectionWorkers int
enableGarbageCollection bool
storageFactory          genericapiserver.StorageFactory
storageDecorator        generic.StorageDecorator
}

func (f restOptionsFactory) NewFor(resource unversioned.GroupResource) generic.RESTOptions {
// 创建该资源的Storage Config
storageConfig, err := f.storageFactory.NewConfig(resource)
if err != nil {
    glog.Fatalf("Unable to find storage destination for %v, due to %v", resource, err.Error())
}
// 最终返回的就是RESTOptions, 就是前面的opts的类型
// 需要关注f.storageDecorator的由来
return generic.RESTOptions{
    // 用于生成Storage的config
    StorageConfig:           storageConfig,
    Decorator:               f.storageDecorator,
    DeleteCollectionWorkers: f.deleteCollectionWorkers,
    EnableGarbageCollection: f.enableGarbageCollection,
    ResourcePrefix:          f.storageFactory.ResourcePrefix(resource),
}
}

该接口比较简单,初始化了一个generic.RESTOptions变量,即opts。我们需要找出opts.Decorator的由来,就只需要看下上一个接口判断EnableWatchCache时就明白了。
opts.Decorator该接口最终返回了storage的interface和清除操作资源的接口。可以想一下带缓冲和不带缓冲的接口实现肯定不一致,所以这里需要进行区分:
- registry.StorageWithCacher:该接口是返回了操作cache的接口,和清除cache的操作接口
- generic.UndecoratedStorage: 该接口会根据你配置的后端类型(etcd2/etcd3等),来返回不同的etcd操作接口,其实是为所有的资源对象创建了etcd的链接,然后通过该链接发送不同的命令,最后还返回了断开该链接的接口。
所以实现完全不一样,一个操作cache,一个操作实际的etcd。

先看registry.StorageWithCacher()接口实现:
路径: pkg/registry/generic/registry/storage_factory.go
func StorageWithCacher(
storageConfig *storagebackend.Config,
capacity int,
objectType runtime.Object,
resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object,
triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {

// storageConfig是后端存储的config,定义了存储类型,存储服务器List,TLS证书信息,Cache大小等。
// 该接口就是generic.UndecoratedStorage()接口的实现,StorageWithCacher()接口就是多了下面的cacher操作
s, d := generic.NewRawStorage(storageConfig)
// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
// Currently it has two layers of same storage interface -- cacher and low level kv.
cacherConfig := storage.CacherConfig{
    CacheCapacity:        capacity,
    Storage:              s,
    Versioner:            etcdstorage.APIObjectVersioner{},
    Type:                 objectType,
    ResourcePrefix:       resourcePrefix,
    NewListFunc:          newListFunc,
    TriggerPublisherFunc: triggerFunc,
    Codec:                storageConfig.Codec,
}
// 根据是否有namespace来进行区分赋值
// KeyFunc函数用于获取该object的Key: 
// 有namespace的话,key的格式:prefix + "/" + Namespace + "/" + name
// 无namespace的话,key的格式:prefix + "/" + name
if scopeStrategy.NamespaceScoped() {
    cacherConfig.KeyFunc = func(obj runtime.Object) (string, error) {
        return storage.NamespaceKeyFunc(resourcePrefix, obj)
    }
} else {
    cacherConfig.KeyFunc = func(obj runtime.Object) (string, error) {
        return storage.NoNamespaceKeyFunc(resourcePrefix, obj)
    }
}
// 根据之前初始化的Cacher的config,进行cacher创建
// 比较关键,后面进行介绍
cacher := storage.NewCacherFromConfig(cacherConfig)
destroyFunc := func() {
    cacher.Stop()
    d()
}

return cacher, destroyFunc
}

先调用NewRawStorage()接口创建了一个存储后端,我们先看下这个接口实现:
路径: pkg/registry/generic/storage_decorator.go
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) {
s, d, err := factory.Create(*config)
if err != nil {
    glog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err)
}
return s, d
}

没啥好说的,继续看Create():
路径: pkg/storage/storagebackend/factory/factory.go
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
// 判断下存储类型:etcd2 、etcd3
switch c.Type {
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD2:
    return newETCD2Storage(c)
case storagebackend.StorageTypeETCD3:
    // TODO: We have the following features to implement:
    // - Support secure connection by using key, cert, and CA files.
    // - Honor "https" scheme to support secure connection in gRPC.
    // - Support non-quorum read.
    return newETCD3Storage(c)
default:
    return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}

挑个etcd2看下实现:
路径: pkg/storage/storagebackend/factory/etcd2.go
func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
// 根据配置的TLS证书信息创建http.Transport
tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
if err != nil {
    return nil, nil, err
}
// 创建etcd2 client,返回的是httpClusterClient结构
client, err := newETCD2Client(tr, c.ServerList)
if err != nil {
    return nil, nil, err
}
// 根据入参初始化一个实现了storage.Interface接口的etcdHelper变量
s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize)
// 返回etcdHelper变量,及关闭链接的函数
return s, tr.CloseIdleConnections, nil
}

前两步都是为了创建与etcd连接的client,后一步比较关键:
路径: pkg/storage/etcd/etcd_helper.go
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface {
return &etcdHelper{
    // 创建一个httpMembersAPI变量,附带很多方法
    etcdMembersAPI: etcd.NewMembersAPI(client),
    // 创建一个httpKeysAPI变量,同样附带各类方法
    etcdKeysAPI:    etcd.NewKeysAPI(client),
    // 编解码使用
    codec:          codec,
    versioner:      APIObjectVersioner{},
    // 用于序列化反序列化,版本间转换,兼容等
    copier:         api.Scheme,
    pathPrefix:     path.Join("/", prefix),
    quorum:         quorum,
    // 创建cache结构
    cache:          utilcache.NewCache(cacheSize),
}
}

该接口很简单的初始化,需要关注的是etcdHelper附带的通用的RESTFul 方法:

可以看到storage.Interface接口所需要的方法都实现了。

继续回到StorageWithCacher()接口,在往下走就是CacherConfig的初始化,就不介绍了,直接进入cacher的创建接口:
路径: pkg/storage/cacher.go
func NewCacherFromConfig(config CacherConfig) *Cacher {
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc)
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)

// Give this error when it is constructed rather than when you get the
// first watch item, because it's much easier to track down that way.
if obj, ok := config.Type.(runtime.Object); ok {
    if err := runtime.CheckCodec(config.Codec, obj); err != nil {
        panic("storage codec doesn't seem to match given type: " + err.Error())
    }
}

cacher := &Cacher{
    ready:       newReady(),
    storage:     config.Storage,
    objectType:  reflect.TypeOf(config.Type),
    watchCache:  watchCache,
    reflector:   cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
    versioner:   config.Versioner,
    triggerFunc: config.TriggerPublisherFunc,
    watcherIdx:  0,
    watchers: indexedWatchers{
        allWatchers:   make(map[int]*cacheWatcher),
        valueWatchers: make(map[string]watchersMap),
    },
    // TODO: Figure out the correct value for the buffer size.
    incoming: make(chan watchCacheEvent, 100),
    // We need to (potentially) stop both:
    // - wait.Until go-routine
    // - reflector.ListAndWatch
    // and there are no guarantees on the order that they will stop.
    // So we will be simply closing the channel, and synchronizing on the WaitGroup.
    stopCh: make(chan struct{}),
}
watchCache.SetOnEvent(cacher.processEvent)
go cacher.dispatchEvents()

stopCh := cacher.stopCh
cacher.stopWg.Add(1)
go func() {
    defer cacher.stopWg.Done()
    wait.Until(
        func() {
            if !cacher.isStopped() {
                cacher.startCaching(stopCh)
            }
        }, time.Second, stopCh,
    )
}()
return cacher
}

该接口主要用于开启cacher,而该cache只用于WATCH和LIST的request。
我们在看下Cacher结构体:


该接口必然也实现了storage.Interface接口所需要的方法。
因为该Cacher只用于WATCH和LIST的request,所以你可以看下cacher提供的API,除了WATCH和LIST相关的之外的接口都是调用了之前创建的storage的API。
查看下cacher.Create和Delete:
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
return c.storage.Create(ctx, key, obj, out, ttl)
}

func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error {
return c.storage.Delete(ctx, key, out, preconditions)
}


到这里registry.StorageWithCacher()接口就结束了,我们继续回到前面讲的另外一个接口generic.UndecoratedStorage():
路径:pkg/registry/generic/storage_decorator.go
func UndecoratedStorage(
config *storagebackend.Config,
capacity int,
objectType runtime.Object,
resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object,
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
return NewRawStorage(config)
}

发现registry.StorageWithCacher()接口也是调用了NewRawStorage()接口,其实现就少了cache。

这里接触到了cache,下节会专门介绍该cache实现。

用户配置

1. --watch-cache: 该apiServer的参数默认就是true的,用于打开watch cache
2. --watch-cache-sizes: 既然有enable cache,那就少不了cache sizes,而且该size可以指定各类资源所使用的cache size。格式: resource#size
3. --storage-backend: 后端持久化存储类型,可选项为etcd2(默认)、etcd3

0 个评论

要回复文章请先登录注册