体育建设网站,想找做拼接屏的公司去哪个网站,做的网站 为什么百度搜不到,WordPress离线博客In-memory Channel是当前Knative Eventing中默认的Channel, 也是一般刚接触Knative Eventing首先了解到的Channel。本文通过分析 In-memory Channel 来进一步了解 Knative Eventing 中Broker/Trigger事件处理机制。
事件处理概览
我们先整体看一下Knative Eventing 工作机制示…In-memory Channel是当前Knative Eventing中默认的Channel, 也是一般刚接触Knative Eventing首先了解到的Channel。本文通过分析 In-memory Channel 来进一步了解 Knative Eventing 中Broker/Trigger事件处理机制。
事件处理概览
我们先整体看一下Knative Eventing 工作机制示意图 通过 namespace 创建默认 Broker 如果不指定Channel会使用默认的 Inmemory Channel。
下面我们从数据平面开始分析Event事件是如何通过In-memory Channel分发到Knative Service
Ingress
Ingress是事件进入Channel前的第一级过滤但目前的功能仅仅是接收事件然后转发到Channel。过滤功能处理TODO状态。
func (h *handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {tctx : cloudevents.HTTPTransportContextFrom(ctx)if tctx.Method ! http.MethodPost {resp.Status http.StatusMethodNotAllowedreturn nil}// tctx.URI is actually the path...if tctx.URI ! / {resp.Status http.StatusNotFoundreturn nil}ctx, _ tag.New(ctx, tag.Insert(TagBroker, h.brokerName))defer func() {stats.Record(ctx, MeasureEventsTotal.M(1))}()send : h.decrementTTL(event)if !send {ctx, _ tag.New(ctx, tag.Insert(TagResult, droppedDueToTTL))return nil}// TODO Filter.ctx, _ tag.New(ctx, tag.Insert(TagResult, dispatched))return h.sendEvent(ctx, tctx, event)
}
In-memory Channel
Broker 字面意思为代理者那么它代理的是谁呢是Channel。为什么要代理Channel呢而不直接发给访问Channel。这个其实涉及到Broker/Trigger设计的初衷对事件过滤处理。我们知道Channel消息通道负责事件传递Subscription订阅负责订阅事件通常这二者的模型如下
这里就涉及到消息队列和订阅分发的实现。那么在In-memory Channel中如何实现的呢 其实 In-memory 的核心处理在Fanout Handler中它负责将接收到的事件分发到不同的 Subscription。 In-memory Channel处理示意图
事件接收并分发核心代码如下
func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error {return func(_ provisioners.ChannelReference, m *provisioners.Message) error {if f.config.AsyncHandler {go func() {// Any returned error is already logged in f.dispatch()._ f.dispatch(m)}()return nil}return f.dispatch(m)}
}
当前分发机制默认是异步机制可通过AsyncHandler参数控制分发机制。
消息分发机制
// dispatch takes the request, fans it out to each subscription in f.config. If all the fanned out
// requests return successfully, then return nil. Else, return an error.
func (f *Handler) dispatch(msg *provisioners.Message) error {errorCh : make(chan error, len(f.config.Subscriptions))for _, sub : range f.config.Subscriptions {go func(s eventingduck.SubscriberSpec) {errorCh - f.makeFanoutRequest(*msg, s)}(sub)}for range f.config.Subscriptions {select {case err : -errorCh:if err ! nil {f.logger.Error(Fanout had an error, zap.Error(err))return err}case -time.After(f.timeout):f.logger.Error(Fanout timed out)return errors.New(fanout timed out)}}// All Subscriptions returned err nil.return nil
}
通过这里的代码我们可以看到分发处理超时机制。默认为60s。也就是说如果分发的请求响应超过60s那么In-memory会报错Fanout timed out。
Filter
一般的消息分发会将消息发送给订阅的服务但在 Broker/Trigger 模型中需要对事件进行过滤处理这个处理的地方就是在Filter 中。
根据请求获取Trigger信息。Filter中会根据请求的信息拿到 Trigger 名称然后通过获取Trigger对应的资源信息拿到过滤规则根据Trigger 过滤规则进行事件的过滤处理最后将满足过滤规则的分发到Kservice
其中过滤规则处理代码如下
func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool {if ts.Filter nil || ts.Filter.SourceAndType nil {r.logger.Error(No filter specified)ctx, _ tag.New(ctx, tag.Upsert(TagFilterResult, empty-fail))return false}// Record event count and filtering timestartTS : time.Now()defer func() {filterTimeMS : int64(time.Now().Sub(startTS) / time.Millisecond)stats.Record(ctx, MeasureTriggerFilterTime.M(filterTimeMS))}()filterType : ts.Filter.SourceAndType.Typeif filterType ! eventingv1alpha1.TriggerAnyFilter filterType ! event.Type() {r.logger.Debug(Wrong type, zap.String(trigger.spec.filter.sourceAndType.type, filterType), zap.String(event.Type(), event.Type()))ctx, _ tag.New(ctx, tag.Upsert(TagFilterResult, fail))return false}filterSource : ts.Filter.SourceAndType.Sources : event.Context.AsV01().SourceactualSource : s.String()if filterSource ! eventingv1alpha1.TriggerAnyFilter filterSource ! actualSource {r.logger.Debug(Wrong source, zap.String(trigger.spec.filter.sourceAndType.source, filterSource), zap.String(message.source, actualSource))ctx, _ tag.New(ctx, tag.Upsert(TagFilterResult, fail))return false}ctx, _ tag.New(ctx, tag.Upsert(TagFilterResult, pass))return true
}
当前的机制是所有的订阅事件都会通过 Filter 集中进行事件过滤如果一个Broker有大量的订阅Trigger是否会给Filter带来性能上的压力 这个在实际场景 Broker/Trigger 的运用中需要考虑到这个问题。
结论
作为内置的默认Channel实现In-memory 可以说很好的完成了事件接收并转发的使命并且 Knative Eventing 在接下来的迭代中会支持部署时指定设置默认的Channel。有兴趣的同学可以持续关注一下。
原文链接 本文为云栖社区原创内容未经允许不得转载。