Kubernetes31--ReplicationController原始碼--RC執行過程
ReplicationController簡介
ReplicationController(簡稱RC)是確保使用者定義的Pod副本數保持不變。在使用者定義範圍內,如果pod增多,則ReplicationController會終止額外的pod,如果減少,RC會建立新的pod,始終保持在定義範圍。例如,RC會在Pod維護(例如核心升級)後在節點上重新建立新Pod。ReplicationController會替換由於某些原因而被刪除或終止的pod,例如在節點故障或中斷節點維護(例如核心升級)的情況下。因此,即使應用只需要一個pod,我們也建議使用ReplicationController。RC跨多個Node節點監視多個pod。可以使用ReplicaSet或者Deployment來代替執行RC功能。
建立模板檔案如下:
apiVersion: v1 kind: ReplicationController metadata: name: nginx spec: replicas: 3 selector: app: nginx template: metadata: name: nginx labels: app: nginx spec: containers: - name: nginx image: nginx ports: - containerPort: 80
HPA策略通過計算叢集目標值改變Spec.replicas屬性來改變叢集數量值,因此研究一下RC是如何保證叢集數量與Spec.replicas屬性值保持一致的。
kube-controller-manager啟動過程
程式碼位置:kubernetes/cmd/kube-controller-manager/controller-manager.go
command := app.NewControllerManagerCommand()
配置預設的Controller以及禁用的Controller
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
func KnownControllers() []string {
ret := sets.StringKeySet(NewControllerInitializers(IncludeCloudLoops))
Controller初始化方法 可知加入replicationcontroller
// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc. This allows for structured downstream composition and subdivision.
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["replicationcontroller"] = startReplicationController
啟動ReplicationController方法
func startReplicationController(ctx ControllerContext) (http.Handler, bool, error) {
go replicationcontroller.NewReplicationManager(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().ReplicationControllers(),
ctx.ClientBuilder.ClientOrDie("replication-controller"),
replicationcontroller.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Stop)
return nil, true, nil
}
// NewReplicationManager configures a replication manager with the specified event recorder
func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
return &ReplicationManager{
*replicaset.NewBaseController(informerAdapter{rcInformer}, podInformer, clientsetAdapter{kubeClient}, burstReplicas,
v1.SchemeGroupVersion.WithKind("ReplicationController"),
"replication_controller",
"replicationmanager",
podControlAdapter{controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replication-controller"}),
}},
),
}
}
引數解釋 PodInformer 列出所有的Pod列表
// PodInformer provides access to a shared informer and lister for
// Pods.
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
ReplicationControllerInformer列出所有的RC列表
// ReplicationControllerInformer provides access to a shared informer and lister for
// ReplicationControllers.
type ReplicationControllerInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.ReplicationControllerLister
}
返回物件為ReplicationManager ReplicaSetController的包裝器 用來同步系統的中RC
// ReplicationManager is responsible for synchronizing ReplicationController objects stored
// in the system with actual running pods.
// It is actually just a wrapper around ReplicaSetController.
type ReplicationManager struct {
replicaset.ReplicaSetController
}
ReplicaSetController物件
podControl controller.PodControlInterface引數 增刪Pod的一系列介面
// PodControlInterface is an interface that knows how to add or delete pods
// created as an interface to allow testing.
type PodControlInterface interface {
// CreatePods creates new pods according to the spec.
CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error
// CreatePodsOnNode creates a new pod according to the spec on the specified node,
// and sets the ControllerRef.
CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
// CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller.
CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
// DeletePod deletes the pod identified by podID.
DeletePod(namespace string, podID string, object runtime.Object) error
// PatchPod patches the pod.
PatchPod(namespace, name string, data []byte) error
}
NewBaseController方法
// NewBaseController is the implementation of NewReplicaSetController with additional injected
// parameters so that it can also serve as the implementation of NewReplicationController.
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
構建ReplicasSetController物件
rsc := &ReplicaSetController{
GroupVersionKind: gvk,
kubeClient: kubeClient,
podControl: podControl,
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
}
註冊事件監聽
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.enqueueReplicaSet,
UpdateFunc: rsc.updateRS,
// This will enter the sync loop and no-op, because the replica set has been deleted from the store.
// Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the replica set.
DeleteFunc: rsc.enqueueReplicaSet,
})
ReplicaSetInformer中SharedIndexInformer物件
type SharedIndexInformer interface {
SharedInformer
// AddIndexers add indexers to the informer before it starts.
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}
回到startReplicationController,構建ReplicationManager之後執行啟動方法run
Run(int(ctx.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Stop)
ConcurrentRCSyncs引數定義了併發執行RC的數量
// ReplicationControllerConfiguration contains elements describing ReplicationController.
type ReplicationControllerConfiguration struct {
// concurrentRCSyncs is the number of replication controllers that are
// allowed to sync concurrently. Larger number = more responsive replica
// management, but more CPU (and network) load.
ConcurrentRCSyncs int32
}
run方法
// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rsc.queue.ShutDown()
controllerName := strings.ToLower(rsc.Kind)
klog.Infof("Starting %v controller", controllerName)
defer klog.Infof("Shutting down %v controller", controllerName)
if !controller.WaitForCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh)
}
<-stopCh
}
WaitForCacheSync方法 是cache WaitForCacheSync的包裝 進行日誌處理等操作
// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages
// indicating that the controller identified by controllerName is waiting for syncs, followed by
// either a successful or failed sync.
func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {
klog.Infof("Waiting for caches to sync for %s controller", controllerName)
if !cache.WaitForCacheSync(stopCh, cacheSyncs...) {
utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName))
return false
}
klog.Infof("Caches are synced for %s controller", controllerName)
return true
}
sharedInformer中一個方法,等待快取更新,如果controller關閉則返回false
/ WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
// if the controller should shutdown
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
err := wait.PollUntil(syncedPollPeriod,
func() (bool, error) {
for _, syncFunc := range cacheSyncs {
if !syncFunc() {
return false, nil
}
}
return true, nil
},
stopCh)
if err != nil {
klog.V(2).Infof("stop requested")
return false
}
klog.V(4).Infof("caches populated")
return true
}
啟動多個執行緒來併發執行rc
for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh)
}
worker方法 啟動一個工作者執行緒來取出佇列中元素,執行,並且標記完成,保證相同的key不會被併發執行
/ worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rsc *ReplicaSetController) worker() {
for rsc.processNextWorkItem() {
}
}
processNextWorkItem方法 從佇列中取出一個key,等待執行完畢,如果執行失敗,則延時一段時間重新加入到佇列中
func (rsc *ReplicaSetController) processNextWorkItem() bool {
key, quit := rsc.queue.Get()
if quit {
return false
}
defer rsc.queue.Done(key)
err := rsc.syncHandler(key.(string))
if err == nil {
rsc.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
rsc.queue.AddRateLimited(key)
return true
}
核心方法syncReplicaSet,同步某個key
err := rsc.syncHandler(key.(string))
在NewBaseController方法中聲明瞭該方法
rsc.syncHandler = rsc.syncReplicaSet
保持給定的RC key數量與指定數量一致
// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
根據key獲取ReplicaSet物件
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
根據label標籤獲取所有Pod
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
過濾所有active Pod
// Ignore inactive pods.
var filteredPods []*v1.Pod
for _, pod := range allPods {
if controller.IsPodActive(pod) {
filteredPods = append(filteredPods, pod)
}
}
判斷Pod的條件 Pod狀態 Pending Running Successed Failed Unknown
func IsPodActive(p *v1.Pod) bool {
return v1.PodSucceeded != p.Status.Phase &&
v1.PodFailed != p.Status.Phase &&
p.DeletionTimestamp == nil
}
將處於活躍狀態的Pod進行處理,之前filteredPods都是從cache中獲取的,進行再一次檢查,判斷是否刪除,是否某些Pod已經不屬於該RS,是否有新的Pod屬於該RS
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
再一次判斷一下該Pod物件是否已經被刪除,檢查一下DeletionTimestamp屬性
// RecheckDeletionTimestamp returns a CanAdopt() function to recheck deletion.
//
// The CanAdopt() function calls getObject() to fetch the latest value,
// and denies adoption attempts if that object has a non-nil DeletionTimestamp.
func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() error {
return func() error {
obj, err := getObject()
if err != nil {
return fmt.Errorf("can't recheck DeletionTimestamp: %v", err)
}
if obj.GetDeletionTimestamp() != nil {
return fmt.Errorf("%v/%v has just been deleted at %v", obj.GetNamespace(), obj.GetName(), obj.GetDeletionTimestamp())
}
return nil
}
}
func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {
// ClaimPods tries to take ownership of a list of Pods. // // It will reconcile the following: // * Adopt orphans if the selector matches. // * Release owned objects if the selector no longer matches.
定義方法實現 Pod是否符合該RS的標籤選擇器規則
match := func(obj metav1.Object) bool {
pod := obj.(*v1.Pod)
// Check selector first so filters only run on potentially matching Pods.
if !m.Selector.Matches(labels.Set(pod.Labels)) {
return false
}
for _, filter := range filters {
if !filter(pod) {
return false
}
}
return true
}
如果有新的Pod符合則將Pod加入到該RS中
adopt := func(obj metav1.Object) error {
return m.AdoptPod(obj.(*v1.Pod))
}
// AdoptPod sends a patch to take control of the pod. It returns the error if
// the patching fails.
func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error {
if err := m.CanAdopt(); err != nil {
return fmt.Errorf("can't adopt Pod %v/%v (%v): %v", pod.Namespace, pod.Name, pod.UID, err)
}
// Note that ValidateOwnerReferences() will reject this patch if another
// OwnerReference exists with controller=true.
addControllerPatch := fmt.Sprintf(
`{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`,
m.controllerKind.GroupVersion(), m.controllerKind.Kind,
m.Controller.GetName(), m.Controller.GetUID(), pod.UID)
return m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(addControllerPatch))
}
如果有Pod已經不屬於該RS,則釋放該Pod
release := func(obj metav1.Object) error {
return m.ReleasePod(obj.(*v1.Pod))
}
// ReleasePod sends a patch to free the pod from the control of the controller.
// It returns the error if the patching fails. 404 and 422 errors are ignored.
func (m *PodControllerRefManager) ReleasePod(pod *v1.Pod) error {
klog.V(2).Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s",
pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName())
deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.Controller.GetUID(), pod.UID)
err := m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(deleteOwnerRefPatch))
遍歷每一個Pod
for _, pod := range pods {
ok, err := m.ClaimObject(pod, match, adopt, release)
if err != nil {
errlist = append(errlist, err)
continue
}
if ok {
claimed = append(claimed, pod)
}
}
獲取Pod的擁有者Controller引用
controllerRef := metav1.GetControllerOf(obj)
如果Pod已經被某個Controller持有:
if controllerRef != nil {
if controllerRef.UID != m.Controller.GetUID() {
// Owned by someone else. Ignore.
return false, nil
}
if match(obj) {
// We already own it and the selector matches.
// Return true (successfully claimed) before checking deletion timestamp.
// We're still allowed to claim things we already own while being deleted
// because doing so requires taking no actions.
return true, nil
}
// Owned by us but selector doesn't match.
// Try to release, unless we're being deleted.
if m.Controller.GetDeletionTimestamp() != nil {
return false, nil
}
if err := release(obj); err != nil {
// If the pod no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
}
// Either someone else released it, or there was a transient error.
// The controller should requeue and try again if it's still stale.
return false, err
}
// Successfully released.
return false, nil
}
如果Controller被刪除或者不符合選擇器或者Pod已經被刪除
// It's an orphan.
if m.Controller.GetDeletionTimestamp() != nil || !match(obj) {
// Ignore if we're being deleted or selector doesn't match.
return false, nil
}
if obj.GetDeletionTimestamp() != nil {
// Ignore if the object is being deleted
return false, nil
}
如果符合選擇器,並且沒被刪除,則加入到RS中
// Selector matches. Try to adopt.
if err := adopt(obj); err != nil {
// If the pod no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
}
// Either someone else claimed it first, or there was a transient error.
// The controller should requeue and try again if it's still orphaned.
return false, err
}
// Successfully adopted.
return true, nil
Pod會被某個Controller所持有,因此在處理之前需要判斷一下該Pod物件是否被其他Controller所持有,並且檢查Pod標籤是否與選擇器相同,最終使RS擁有對於Pod的擁有權,從而進行下一步操作。
回到syncReplicaSet方法
如果RS需要同步並且該RS沒有被刪除,則執行數量調整
if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
核心方法manageReplicas
// manageReplicas checks and updates replicas for the given ReplicaSet.
// Does NOT modify <filteredPods>.
// It will requeue the replica set in case of an error while creating/deleting pods.
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
計算當前RS調整的結果狀態
rs = rs.DeepCopy()
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
更新RS狀態
// Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
// Multiple things could lead to this update failing. Requeuing the replica set ensures
// Returning an error causes a requeue without forcing a hotloop
return err
}
如果副本數量已經和定義數量相符,則延時一段時間繼續執行RS同步
// Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
總結
副本數量定義形式ReplicationController,ReplicaSet或者Deployment。內部實現使用replicationcontroller來實現,具體過程如下:
kubernetes/cmd/kube-controller-manager/controller-manager.go
啟動ConrtollerManager
command := app.NewControllerManagerCommand()
初始化各種Controller
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc
啟動ReplicationController
func startReplicationController(ctx ControllerContext) (http.Handler, bool, error) {
構建ReplicationManager物件
func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager {
啟動RUN
Run(int(ctx.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Stop)
執行worker方法
func (rsc *ReplicaSetController) worker() {
執行processNextWorkItem
func (rsc *ReplicaSetController) processNextWorkItem() bool {
執行同步方法
syncHandler func(rsKey string) error
核心方法
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
下面研究一下syncReplicaSet方法以及SharedInformer,RateLimitingInterface介面,快取式通知介面,延時工作佇列。