1. 程式人生 > >透過 In-memory Channel 看 Knative Eventing 中 Broker/Trigger 工作機制

透過 In-memory Channel 看 Knative Eventing 中 Broker/Trigger 工作機制

In-memory Channel是當前Knative Eventing中預設的Channel, 也是一般剛接觸Knative Eventing首先了解到的Channel。本文通過分析 In-memory Channel 來進一步瞭解 Knative Eventing 中Broker/Trigger事件處理機制。

事件處理概覽

我們先整體看一下Knative Eventing 工作機制示意圖:

通過 namespace 建立預設 Broker 如果不指定Channel,會使用預設的 Inmemory Channel。

下面我們從資料平面開始分析Event事件是如何通過In-memory Channel分發到Knative Service

Ingress

Ingress是事件進入Channel前的第一級過濾,但目前的功能僅僅是接收事件然後轉發到Channel。過濾功能處理TODO狀態。

func (h *handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
    tctx := cloudevents.HTTPTransportContextFrom(ctx)
    if tctx.Method != http.MethodPost {
        resp.Status = http.StatusMethodNotAllowed
        return nil
    }

    // tctx.URI is actually the path...
    if tctx.URI != "/" {
        resp.Status = http.StatusNotFound
        return nil
    }

    ctx, _ = tag.New(ctx, tag.Insert(TagBroker, h.brokerName))
    defer func() {
        stats.Record(ctx, MeasureEventsTotal.M(1))
    }()

    send := h.decrementTTL(&event)
    if !send {
        ctx, _ = tag.New(ctx, tag.Insert(TagResult, "droppedDueToTTL"))
        return nil
    }

    // TODO Filter.

    ctx, _ = tag.New(ctx, tag.Insert(TagResult, "dispatched"))
    return h.sendEvent(ctx, tctx, event)
}

In-memory Channel

Broker 字面意思為代理者,那麼它代理的是誰呢?是Channel。為什麼要代理Channel呢,而不直接發給訪問Channel。這個其實涉及到Broker/Trigger設計的初衷:對事件過濾處理。我們知道Channel(訊息通道)負責事件傳遞,Subscription(訂閱)負責訂閱事件,通常這二者的模型如下:

這裡就涉及到訊息佇列和訂閱分發的實現。那麼在In-memory Channel中如何實現的呢?
其實 In-memory 的核心處理在Fanout Handler中,它負責將接收到的事件分發到不同的 Subscription。
In-memory Channel處理示意圖:

`

事件接收並分發核心程式碼如下:

func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error {
    return func(_ provisioners.ChannelReference, m *provisioners.Message) error {
        if f.config.AsyncHandler {
            go func() {
                // Any returned error is already logged in f.dispatch().
                _ = f.dispatch(m)
            }()
            return nil
        }
        return f.dispatch(m)
    }
}

當前分發機制預設是非同步機制(可通過AsyncHandler引數控制分發機制)。

訊息分發機制:

// dispatch takes the request, fans it out to each subscription in f.config. If all the fanned out
// requests return successfully, then return nil. Else, return an error.
func (f *Handler) dispatch(msg *provisioners.Message) error {
    errorCh := make(chan error, len(f.config.Subscriptions))
    for _, sub := range f.config.Subscriptions {
        go func(s eventingduck.SubscriberSpec) {
            errorCh <- f.makeFanoutRequest(*msg, s)
        }(sub)
    }

    for range f.config.Subscriptions {
        select {
        case err := <-errorCh:
            if err != nil {
                f.logger.Error("Fanout had an error", zap.Error(err))
                return err
            }
        case <-time.After(f.timeout):
            f.logger.Error("Fanout timed out")
            return errors.New("fanout timed out")
        }
    }
    // All Subscriptions returned err = nil.
    return nil
}

通過這裡的程式碼,我們可以看到分發處理超時機制。預設為60s。也就是說如果分發的請求響應超過60s,那麼In-memory會報錯:Fanout timed out。

Filter

一般的訊息分發會將訊息傳送給訂閱的服務,但在 Broker/Trigger 模型中需要對事件進行過濾處理,這個處理的地方就是在Filter 中。

  • 根據請求獲取Trigger資訊。Filter中會根據請求的資訊拿到 Trigger 名稱,然後通過獲取Trigger對應的資源資訊拿到過濾規則
  • 根據Trigger 過濾規則進行事件的過濾處理
  • 最後將滿足過濾規則的分發到Kservice

其中過濾規則處理程式碼如下:

func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool {
    if ts.Filter == nil || ts.Filter.SourceAndType == nil {
        r.logger.Error("No filter specified")
        ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-fail"))
        return false
    }

    // Record event count and filtering time
    startTS := time.Now()
    defer func() {
        filterTimeMS := int64(time.Now().Sub(startTS) / time.Millisecond)
        stats.Record(ctx, MeasureTriggerFilterTime.M(filterTimeMS))
    }()

    filterType := ts.Filter.SourceAndType.Type
    if filterType != eventingv1alpha1.TriggerAnyFilter && filterType != event.Type() {
        r.logger.Debug("Wrong type", zap.String("trigger.spec.filter.sourceAndType.type", filterType), zap.String("event.Type()", event.Type()))
        ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail"))
        return false
    }
    filterSource := ts.Filter.SourceAndType.Source
    s := event.Context.AsV01().Source
    actualSource := s.String()
    if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != actualSource {
        r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.sourceAndType.source", filterSource), zap.String("message.source", actualSource))
        ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail"))

        return false
    }

    ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "pass"))
    return true
}

當前的機制是所有的訂閱事件都會通過 Filter 集中進行事件過濾,如果一個Broker有大量的訂閱Trigger,是否會給Filter帶來效能上的壓力? 這個在實際場景 Broker/Trigger 的運用中需要考慮到這個問題。

結論

作為內建的預設Channel實現,In-memory 可以說很好的完成了事件接收並轉發的使命,並且 Knative Eventing 在接下來的迭代中會支援部署時指定設定預設的Channel。有興趣的同學可以持續關注一下。

作者:元毅

原文連結

本文為雲棲社群原創內容,未經