apiVersion: v1 kind: ReplicationController metadata: name: nginx spec: replicas: 3 selector: app: nginx template: metadata: name: nginx labels: app: nginx spec: containers: - name: nginx image: nginx ports: - containerPort: 80
command := app.NewControllerManagerCommand()
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
func KnownControllers() []string {
ret := sets.StringKeySet(NewControllerInitializers(IncludeCloudLoops))
Controller初始化方法 可知加入replicationcontroller
// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc. This allows for structured downstream composition and subdivision.
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["replicationcontroller"] = startReplicationController
func startReplicationController(ctx ControllerContext) (http.Handler, bool, error) {
go replicationcontroller.NewReplicationManager(
).Run(int(ctx.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Stop)
return nil, true, nil
// NewReplicationManager configures a replication manager with the specified event recorder
func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
return &ReplicationManager{
*replicaset.NewBaseController(informerAdapter{rcInformer}, podInformer, clientsetAdapter{kubeClient}, burstReplicas,
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replication-controller"}),
引數解釋 PodInformer 列出所有的Pod列表
// PodInformer provides access to a shared informer and lister for
// Pods.
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
// ReplicationControllerInformer provides access to a shared informer and lister for
// ReplicationControllers.
type ReplicationControllerInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.ReplicationControllerLister
返回物件為ReplicationManager ReplicaSetController的包裝器 用來同步系統的中RC
// ReplicationManager is responsible for synchronizing ReplicationController objects stored
// in the system with actual running pods.
// It is actually just a wrapper around ReplicaSetController.
type ReplicationManager struct {
podControl controller.PodControlInterface引數 增刪Pod的一系列介面
// PodControlInterface is an interface that knows how to add or delete pods
// created as an interface to allow testing.
type PodControlInterface interface {
// CreatePods creates new pods according to the spec.
CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error
// CreatePodsOnNode creates a new pod according to the spec on the specified node,
// and sets the ControllerRef.
CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
// CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller.
CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
// DeletePod deletes the pod identified by podID.
DeletePod(namespace string, podID string, object runtime.Object) error
// PatchPod patches the pod.
PatchPod(namespace, name string, data []byte) error
// NewBaseController is the implementation of NewReplicaSetController with additional injected
// parameters so that it can also serve as the implementation of NewReplicationController.
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
rsc := &ReplicaSetController{
GroupVersionKind: gvk,
kubeClient: kubeClient,
podControl: podControl,
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
AddFunc: rsc.enqueueReplicaSet,
UpdateFunc: rsc.updateRS,
// This will enter the sync loop and no-op, because the replica set has been deleted from the store.
// Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the replica set.
DeleteFunc: rsc.enqueueReplicaSet,
type SharedIndexInformer interface {
// AddIndexers add indexers to the informer before it starts.
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
Run(int(ctx.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Stop)
// ReplicationControllerConfiguration contains elements describing ReplicationController.
type ReplicationControllerConfiguration struct {
// concurrentRCSyncs is the number of replication controllers that are
// allowed to sync concurrently. Larger number = more responsive replica
// management, but more CPU (and network) load.
ConcurrentRCSyncs int32
// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rsc.queue.ShutDown()
controllerName := strings.ToLower(rsc.Kind)
klog.Infof("Starting %v controller", controllerName)
defer klog.Infof("Shutting down %v controller", controllerName)
if !controller.WaitForCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh)
WaitForCacheSync方法 是cache WaitForCacheSync的包裝 進行日誌處理等操作
// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages
// indicating that the controller identified by controllerName is waiting for syncs, followed by
// either a successful or failed sync.
func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {
klog.Infof("Waiting for caches to sync for %s controller", controllerName)
if !cache.WaitForCacheSync(stopCh, cacheSyncs...) {
utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName))
return false
klog.Infof("Caches are synced for %s controller", controllerName)
return true
/ WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
// if the controller should shutdown
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
err := wait.PollUntil(syncedPollPeriod,
func() (bool, error) {
for _, syncFunc := range cacheSyncs {
if !syncFunc() {
return false, nil
return true, nil
if err != nil {
klog.V(2).Infof("stop requested")
return false
klog.V(4).Infof("caches populated")
return true
for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh)
worker方法 啟動一個工作者執行緒來取出佇列中元素,執行,並且標記完成,保證相同的key不會被併發執行
/ worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rsc *ReplicaSetController) worker() {
for rsc.processNextWorkItem() {
processNextWorkItem方法 從佇列中取出一個key,等待執行完畢,如果執行失敗,則延時一段時間重新加入到佇列中
func (rsc *ReplicaSetController) processNextWorkItem() bool {
key, quit := rsc.queue.Get()
if quit {
return false
defer rsc.queue.Done(key)
err := rsc.syncHandler(key.(string))
if err == nil {
return true
utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
return true
err := rsc.syncHandler(key.(string))
rsc.syncHandler = rsc.syncReplicaSet
保持給定的RC key數量與指定數量一致
// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
過濾所有active Pod
// Ignore inactive pods.
var filteredPods []*v1.Pod
for _, pod := range allPods {
if controller.IsPodActive(pod) {
filteredPods = append(filteredPods, pod)
判斷Pod的條件 Pod狀態 Pending Running Successed Failed Unknown
func IsPodActive(p *v1.Pod) bool {
return v1.PodSucceeded != p.Status.Phase &&
v1.PodFailed != p.Status.Phase &&
p.DeletionTimestamp == nil
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
// RecheckDeletionTimestamp returns a CanAdopt() function to recheck deletion.
// The CanAdopt() function calls getObject() to fetch the latest value,
// and denies adoption attempts if that object has a non-nil DeletionTimestamp.
func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() error {
return func() error {
obj, err := getObject()
if err != nil {
return fmt.Errorf("can't recheck DeletionTimestamp: %v", err)
if obj.GetDeletionTimestamp() != nil {
return fmt.Errorf("%v/%v has just been deleted at %v", obj.GetNamespace(), obj.GetName(), obj.GetDeletionTimestamp())
return nil
func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {
// ClaimPods tries to take ownership of a list of Pods. // // It will reconcile the following: // * Adopt orphans if the selector matches. // * Release owned objects if the selector no longer matches.
定義方法實現 Pod是否符合該RS的標籤選擇器規則
match := func(obj metav1.Object) bool {
pod := obj.(*v1.Pod)
// Check selector first so filters only run on potentially matching Pods.
if !m.Selector.Matches(labels.Set(pod.Labels)) {
return false
for _, filter := range filters {
if !filter(pod) {
return false
return true
adopt := func(obj metav1.Object) error {
return m.AdoptPod(obj.(*v1.Pod))
// AdoptPod sends a patch to take control of the pod. It returns the error if
// the patching fails.
func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error {
if err := m.CanAdopt(); err != nil {
return fmt.Errorf("can't adopt Pod %v/%v (%v): %v", pod.Namespace, pod.Name, pod.UID, err)
// Note that ValidateOwnerReferences() will reject this patch if another
// OwnerReference exists with controller=true.
addControllerPatch := fmt.Sprintf(
m.controllerKind.GroupVersion(), m.controllerKind.Kind,
m.Controller.GetName(), m.Controller.GetUID(), pod.UID)
return m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(addControllerPatch))
release := func(obj metav1.Object) error {
return m.ReleasePod(obj.(*v1.Pod))
// ReleasePod sends a patch to free the pod from the control of the controller.
// It returns the error if the patching fails. 404 and 422 errors are ignored.
func (m *PodControllerRefManager) ReleasePod(pod *v1.Pod) error {
klog.V(2).Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s",
pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName())
deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.Controller.GetUID(), pod.UID)
err := m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(deleteOwnerRefPatch))
for _, pod := range pods {
ok, err := m.ClaimObject(pod, match, adopt, release)
if err != nil {
errlist = append(errlist, err)
if ok {
claimed = append(claimed, pod)
controllerRef := metav1.GetControllerOf(obj)
if controllerRef != nil {
if controllerRef.UID != m.Controller.GetUID() {
// Owned by someone else. Ignore.
return false, nil
if match(obj) {
// We already own it and the selector matches.
// Return true (successfully claimed) before checking deletion timestamp.
// We're still allowed to claim things we already own while being deleted
// because doing so requires taking no actions.
return true, nil
// Owned by us but selector doesn't match.
// Try to release, unless we're being deleted.
if m.Controller.GetDeletionTimestamp() != nil {
return false, nil
if err := release(obj); err != nil {
// If the pod no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
// Either someone else released it, or there was a transient error.
// The controller should requeue and try again if it's still stale.
return false, err
// Successfully released.
return false, nil
// It's an orphan.
if m.Controller.GetDeletionTimestamp() != nil || !match(obj) {
// Ignore if we're being deleted or selector doesn't match.
return false, nil
if obj.GetDeletionTimestamp() != nil {
// Ignore if the object is being deleted
return false, nil
// Selector matches. Try to adopt.
if err := adopt(obj); err != nil {
// If the pod no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
// Either someone else claimed it first, or there was a transient error.
// The controller should requeue and try again if it's still orphaned.
return false, err
// Successfully adopted.
return true, nil
if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
// manageReplicas checks and updates replicas for the given ReplicaSet.
// Does NOT modify <filteredPods>.
// It will requeue the replica set in case of an error while creating/deleting pods.
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
rs = rs.DeepCopy()
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
// Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
// Multiple things could lead to this update failing. Requeuing the replica set ensures
// Returning an error causes a requeue without forcing a hotloop
return err
// Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
command := app.NewControllerManagerCommand()
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc
func startReplicationController(ctx ControllerContext) (http.Handler, bool, error) {
func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager {
Run(int(ctx.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Stop)
func (rsc *ReplicaSetController) worker() {
func (rsc *ReplicaSetController) processNextWorkItem() bool {
syncHandler func(rsKey string) error
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {