Kubernetes34--ReplicationController原始碼--slowStartBatch
阿新 • • 發佈:2018-12-26
在RC數量控制過程中,需要建立一部分Pod物件,呼叫slowStartBatch方法
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error { boolPtr := func(b bool) *bool { return &b } controllerRef := &metav1.OwnerReference{ APIVersion: rsc.GroupVersion().String(), Kind: rsc.Kind, Name: rs.Name, UID: rs.UID, BlockOwnerDeletion: boolPtr(true), Controller: boolPtr(true), } err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef) if err != nil && errors.IsTimeout(err) { // Pod is created but its initialization has timed out. // If the initialization is successful eventually, the // controller will observe the creation via the informer. // If the initialization fails, or if the pod keeps // uninitialized for a long time, the informer will not // receive any update, and the controller will create a new // pod when the expectation expires. return nil } return err })
看一下說明:
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // and double with each successful iteration in a kind of "slow start". // This handles attempts to start large numbers of pods that would // likely all fail with the same error. For example a project with a // low quota that attempts to create a large number of pods will be // prevented from spamming the API service with the pod create requests // after one of its pods fails. Conveniently, this also prevents the // event spam that those failures would generate.
為了防止由於同一個錯誤導致大量Pod不能建立,這裡採用慢開始分發策略,開始啟動一部分Pod建立請求,監測建立情況,如果全部建立成功,則下一個迴圈增加建立Pod數量。
具體實現如下:
// slowStartBatch tries to call the provided function a total of 'count' times, // starting slow to check for errors, then speeding up if calls succeed. // // It groups the calls into batches, starting with a group of initialBatchSize. // Within each batch, it may call the function multiple times concurrently. // // If a whole batch succeeds, the next batch may get exponentially larger. // If there are any failures in a batch, all remaining batches are skipped // after waiting for the current batch to complete. // // It returns the number of successful calls to the function. func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
func:待執行的方法
count: 總共需要執行的次數
initialBatchSize:初始時需要執行的次數
func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
remaining := count
successes := 0
for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
errCh := make(chan error, batchSize)
var wg sync.WaitGroup
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func() {
defer wg.Done()
if err := fn(); err != nil {
errCh <- err
}
}()
}
wg.Wait()
curSuccesses := batchSize - len(errCh)
successes += curSuccesses
if len(errCh) > 0 {
return successes, <-errCh
}
remaining -= batchSize
}
return successes, nil
}
還需要執行的次數
remaining := count
一共成功總次數
successes := 0
batchSize在一輪迴圈中需要執行的次數,如果剩餘次數大於2*batchSize,則下一輪迴圈增加兩倍
for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining
使用類似於執行緒組同步技術啟動多個執行緒同時執行,等待最終結果完成
var wg sync.WaitGroup
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func() {
defer wg.Done()
if err := fn(); err != nil {
errCh <- err
}
}()
}
wg.Wait()
if len(errCh) > 0 {
return successes, <-errCh
}
如果在某一輪中出現錯誤,則返回當前成功的總次數,剩餘次數不執行。