kubernetes1.9源码阅读 replication controller的Informer机制


replication controller是kube-controller-manager中一个重要的控制器,主要是rs进行控制,确保pods的数量恰好和rs的规定一致。因此replication controller主要对这两类进行watch,一类是replicationset,另一类是pods。本文是replication controller的源码阅读笔记,会包括client-go的Informer机制,希望帮助开始阅读kubernetes源码的小伙伴们,更希望与对kubernetes源码阅读感兴趣的小伙伴儿们交流,有错误的地方也希望能指出,共同进步。

入口程序

cmd/kube-controller-manager/controller-manager.go main

  1. 调用options.NewCMServer,构建CMServer;

  2. 调用app.Run方法,运行CMServer;


5-1.png


启动CMServer

cmd/kube-controller-manager/app/controllermanager.go Run

  1. 调用createClients, 创建apiserver客户端,通过REST方式访问APIserver提供的API服务;

  2. 启动协程调用go startHTTP,运行http Server;

  3. 调用record.NewBroadcaster,创建eventBroadcaster对象,接收EventBroadcaster发送的event,输出到logging中,并输出到EventSink,并使用recorder记录"controller-mananger"的事件;

  4. 调用CreateControllerContext,在CreateControllerContext方法中,会调用informers.NewSharedInformerFactory,创建sharedInformerFactory(client-go/informers/factory.go)对象;

  5. 调用StartControllers,启动Controllers;

  6. 调用ctx.InformerFactory.Start,在这里是调用sharedInformerFactory.start(client-go/informers/factory.go);


5-3.png

  1. saTokenControllerInitFunc和NewControllerInitializers定义了controllers的InitFunc;


5-4.png


startReplicationController

cmd/kube-controller-manager/app/core.go startReplicationController
  1. 协程启动调用replicationcontroller.NewReplicationManager,构建ReplicationManager;


(1) 调用ctx.InformerFactory.Core().V1().Pods(),这里调用sharedInformerFactory(client-go/informers/factory.go)对象的Core().V1().Pods()方法,将会构建PodInformer对象,

(2) 以此方式,创建ReplicationControllersInformer;
  1. 运行ReplicationManager;


5-5.png


NewReplicationManager

pkg/controller/replication/replication_set.go NewReplicationManager
  1. 在NewReplicationManager中,


(1) 首先,调用record.NewBroadcaster,创建eventBroadcaster对象,调用eventBroadcaster.StartLogging,接收EventBroadcaster发送的event,输出到logging中;调用 eventBroadcaster.StartRecordingToSink,event输出到EventSink,并调用eventBroadcaster.NewRecorder记录"replication-controller"的事件;

(2) 将调用NewBaseController;
  1. 在NewBaseController方法中,


(1) 构建ReplicaSetController对象,包括了podControl,它定义了对Pod的操作,是由RealPodControl去调用apiserver完成创建实现;

(2) 将调用 rsInformer.Informer().AddEventHandler,这将调用rsInformer的构造函数NewReplicaSetInformer,rsInformer将event handler包装成listerner,然后添加到s.processor.listeners中,并定义对象处理的回调函数AddFunc、UpdateFunc、DeleteFunc;

(3) 同时,调用rsInformer的Lister方法;

(4) 最后,调用rsInformer.Informer().HasSynced,判断是否缓存完成;

(5) 以此方式,调用podInformer.Informer().AddEventHandler、podInformer的Lister方法及podInformer.Informer().HasSynced;

(6) 设置rsc.syncHandler;syncHandler负责pod与rc的同步,确保Pod副本数与rc规定的相同;

5-6.png


PodInformer

client-go/informers/core/v1/pod.go NewPodInformer

  1. 构建cache.listWatch对象,定义了ListFunc和WatchFunc;

  2. 调用cache.NewSharedIndexInformer;


5-7.png


NewSharedIndexInformer

client-go/tools/cache/shared_informer.go NewSharedIndexInformer

5-9.png


运行ReplicationManager

pkg/controller/replicaset/replica_set.go Run

  1. 调用controller.WaitForCacheSync方法,在controller.WaitForCacheSync中,将调用ca che.WaitForCacheSync;

  2. 调用rsc.worker, 将启动workers调用rsc.syncHandler,syncHandler负责pod与rc的同步,确保Pod副本数与rc规定的相同;


5-8.png


sharedInformerFactory.Start

client-go/informers/factory.go Start
  1. 调用Informer.Run,这里调用SharedIndexInformer.Run;


5-10.png


SharedIndexInformer.Run

client-go/tools/cache/shared_informer.go Run

  1. 调用NewDeltaFIFO,创建queue;

  2. 定义Deltas处理函数s.HandleDeltas;

  3. 调用New(cfg),构建sharedIndexInformer的controller;

  4. 调用s.cacheMutationDetector.Run,检查缓存对象是否变化;

  5. 调用s.processor.run,将调用sharedProcessor.run,会调用Listener.run和Listener.pop,执行处理queue的函数;

  6. 调用s.controller.Run,构建Reflector,进行对etcd的缓存;


5-11.png


sharedIndexedInformer.controller.Run

client-go/tools/cache/controller.go Run
  1. 调用NewReflector,构建Reflector;


(1) Reflector对象,包括ListerWatcher、ObjectType、Queue、FullResyncPeriod;

  1. 调用r.run,将调用reflector.ListAndWatch,执行r.List、r.watch、r.watchHandler,进行对etcd的缓存;

  2. 调用c.processLoop,reflector向queue里面添加数据,processLoop会不停去消费这里这些数据;


5-12.png


controller.processLoop

client-go/tools/cache/controller.go processLoop
  1. cache.PopProcessFunc(c.config.Process)将前面Process函数传递进去;


5-13.png


DeltaFIFO.Pop

client-go/tools/cache/delta_fifo.go Pop
  1. 主要从f.items取出object,然后调用process函数进行处理;


5-14.png


处理DeltaFIFO

client-go/tools/cache/shared_informer.go HandleDeltas
  1. 调用s.process.distribute,将调用Listener.add,负责将watch的资源传到listener;


5-15.png


Listener.add/pop/run

client-go/tools/cache/shared_informer.go sharedProcessor.run/add/pop;

  1. listenser的add函数负责将notify装进pendingNotifications;

  2. pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel;

  3. run函数则负责取出notify,然后根据notify的类型(增加、删除、更新)触发相应的处理函数,这些函数在ReplicaSetController注册,分别是:rsc.addPod、rsc.updatePod、rsc.deletePod、rsc.enqueueReplicaSet、rsc.updateRS、rsc.enqueueReplicaSet


5-16.png


rsc.addPod

pkg/controller/replicaset/replica_set.go addPod

  1. 首先会根据pod返回rc,当pod不属于任何rc时,则返回。找到rc以后,更新rm.expectations.CreationObserved这个rc的期望值,也就是假如一个rc有4个pod,现在检测到创建了一个pod,则会将这个rc的期望值减少,变为3。然后将这个rc放入队列;

  2. 调用rsc.enqueueReplicaSet,将调用rsc.queue.Add;


5-17.png


rsc.worker

pkg/controller/replicaset/replica_set.go worker()
  1. 调用rsc.syncHandler,这里会调用rsc.syncReplicaSet,syncReplicaSet负责pod与rc的同步,确保Pod副本数与rc规定的相同;


5-19.png


相关阅读:
1. http://licyhust.com/

0 个评论

要回复文章请先登录注册