diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go
index ef527bcace3..79af3702a8b 100644
--- a/cmd/broker/filter/main.go
+++ b/cmd/broker/filter/main.go
@@ -40,6 +40,7 @@ import (
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/broker/filter"
+ brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
"knative.dev/eventing/pkg/reconciler/names"
)
@@ -125,7 +126,7 @@ func main() {
// We are running both the receiver (takes messages in from the Broker) and the dispatcher (send
// the messages to the triggers' subscribers) in this binary.
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx)
- handler, err := filter.NewHandler(logger, oidcTokenVerifier, oidcTokenProvider, triggerinformer.Get(ctx), reporter, ctxFunc)
+ handler, err := filter.NewHandler(logger, oidcTokenVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), reporter, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
diff --git a/config/brokers/mt-channel-broker/roles/filter-clusterrole.yaml b/config/brokers/mt-channel-broker/roles/filter-clusterrole.yaml
index 99753d1e486..9075fbd57e6 100644
--- a/config/brokers/mt-channel-broker/roles/filter-clusterrole.yaml
+++ b/config/brokers/mt-channel-broker/roles/filter-clusterrole.yaml
@@ -22,6 +22,8 @@ rules:
- apiGroups:
- eventing.knative.dev
resources:
+ - brokers
+ - brokers/status
- triggers
- triggers/status
verbs:
diff --git a/config/channels/in-memory-channel/resources/in-memory-channel.yaml b/config/channels/in-memory-channel/resources/in-memory-channel.yaml
index a83d841ac09..ce9ddd6cb55 100644
--- a/config/channels/in-memory-channel/resources/in-memory-channel.yaml
+++ b/config/channels/in-memory-channel/resources/in-memory-channel.yaml
@@ -247,6 +247,9 @@ spec:
deadLetterSinkCACerts:
description: Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
type: string
+ deadLetterSinkAudience:
+ description: OIDC audience of the dead letter sink.
+ type: string
observedGeneration:
description: ObservedGeneration is the 'Generation' of the Service that was last processed by the controller.
type: integer
diff --git a/config/core/resources/broker.yaml b/config/core/resources/broker.yaml
index f5bb83aade1..365692f9727 100644
--- a/config/core/resources/broker.yaml
+++ b/config/core/resources/broker.yaml
@@ -165,6 +165,9 @@ spec:
deadLetterSinkCACerts:
description: Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
type: string
+ deadLetterSinkAudience:
+ description: OIDC audience of the dead letter sink.
+ type: string
observedGeneration:
description: ObservedGeneration is the 'Generation' of the Service that was last processed by the controller.
type: integer
diff --git a/config/core/resources/channel.yaml b/config/core/resources/channel.yaml
index f99cac516d6..1898a01e0e6 100644
--- a/config/core/resources/channel.yaml
+++ b/config/core/resources/channel.yaml
@@ -285,6 +285,9 @@ spec:
deadLetterSinkCACerts:
description: Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
type: string
+ deadLetterSinkAudience:
+ description: OIDC audience of the dead letter sink.
+ type: string
observedGeneration:
description: ObservedGeneration is the 'Generation' of the Service that was last processed by the controller.
type: integer
diff --git a/config/core/resources/subscription.yaml b/config/core/resources/subscription.yaml
index 0e32272b9a9..4e55ea0fdf3 100644
--- a/config/core/resources/subscription.yaml
+++ b/config/core/resources/subscription.yaml
@@ -207,6 +207,9 @@ spec:
deadLetterSinkCACerts:
description: Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
type: string
+ deadLetterSinkAudience:
+ description: OIDC audience of the dead letter sink.
+ type: string
replyUri:
description: ReplyURI is the fully resolved URI for the spec.reply.
type: string
diff --git a/config/core/resources/trigger.yaml b/config/core/resources/trigger.yaml
index dc05924f41c..515b1261981 100644
--- a/config/core/resources/trigger.yaml
+++ b/config/core/resources/trigger.yaml
@@ -186,6 +186,9 @@ spec:
deadLetterSinkCACerts:
description: Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
type: string
+ deadLetterSinkAudience:
+ description: OIDC audience of the dead letter sink.
+ type: string
observedGeneration:
description: ObservedGeneration is the 'Generation' of the Service that was last processed by the controller.
type: integer
diff --git a/docs/eventing-api.md b/docs/eventing-api.md
index 29a9767bcf8..b48faf9b2c3 100644
--- a/docs/eventing-api.md
+++ b/docs/eventing-api.md
@@ -453,6 +453,18 @@ string
according to https://www.rfc-editor.org/rfc/rfc7468.
+
+
+deadLetterSinkAudience
+
+string
+
+ |
+
+(Optional)
+ DeadLetterSinkAudience is the OIDC audience of the DeadLetterSink
+ |
+
Subscribable
diff --git a/pkg/apis/duck/v1/delivery_types.go b/pkg/apis/duck/v1/delivery_types.go
index 2c0306f2006..48048a09d6e 100644
--- a/pkg/apis/duck/v1/delivery_types.go
+++ b/pkg/apis/duck/v1/delivery_types.go
@@ -158,6 +158,9 @@ type DeliveryStatus struct {
// according to https://www.rfc-editor.org/rfc/rfc7468.
// +optional
DeadLetterSinkCACerts *string `json:"deadLetterSinkCACerts,omitempty"`
+ // DeadLetterSinkAudience is the OIDC audience of the DeadLetterSink
+ // +optional
+ DeadLetterSinkAudience *string `json:"deadLetterSinkAudience,omitempty"`
}
func (ds *DeliveryStatus) IsSet() bool {
@@ -166,14 +169,16 @@ func (ds *DeliveryStatus) IsSet() bool {
func NewDeliveryStatusFromAddressable(addr *duckv1.Addressable) DeliveryStatus {
return DeliveryStatus{
- DeadLetterSinkURI: addr.URL,
- DeadLetterSinkCACerts: addr.CACerts,
+ DeadLetterSinkURI: addr.URL,
+ DeadLetterSinkCACerts: addr.CACerts,
+ DeadLetterSinkAudience: addr.Audience,
}
}
func NewDestinationFromDeliveryStatus(status DeliveryStatus) duckv1.Destination {
return duckv1.Destination{
- URI: status.DeadLetterSinkURI,
- CACerts: status.DeadLetterSinkCACerts,
+ URI: status.DeadLetterSinkURI,
+ CACerts: status.DeadLetterSinkCACerts,
+ Audience: status.DeadLetterSinkAudience,
}
}
diff --git a/pkg/apis/duck/v1/zz_generated.deepcopy.go b/pkg/apis/duck/v1/zz_generated.deepcopy.go
index 9152b596fbe..eccdb7859f9 100644
--- a/pkg/apis/duck/v1/zz_generated.deepcopy.go
+++ b/pkg/apis/duck/v1/zz_generated.deepcopy.go
@@ -189,6 +189,11 @@ func (in *DeliveryStatus) DeepCopyInto(out *DeliveryStatus) {
*out = new(string)
**out = **in
}
+ if in.DeadLetterSinkAudience != nil {
+ in, out := &in.DeadLetterSinkAudience, &out.DeadLetterSinkAudience
+ *out = new(string)
+ **out = **in
+ }
return
}
diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go
index 4e5fe49e093..9acb91a3845 100644
--- a/pkg/broker/filter/filter_handler.go
+++ b/pkg/broker/filter/filter_handler.go
@@ -44,7 +44,7 @@ import (
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
- "knative.dev/eventing/pkg/broker"
+ eventingbroker "knative.dev/eventing/pkg/broker"
v1 "knative.dev/eventing/pkg/client/informers/externalversions/eventing/v1"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1"
"knative.dev/eventing/pkg/eventfilter"
@@ -66,6 +66,7 @@ const (
defaultMaxIdleConnectionsPerHost = 100
FilterAudience = "mt-broker-filter"
+ skipTTL = -1
)
// Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber.
@@ -76,6 +77,7 @@ type Handler struct {
eventDispatcher *kncloudevents.Dispatcher
triggerLister eventinglisters.TriggerLister
+ brokerLister eventinglisters.BrokerLister
logger *zap.Logger
withContext func(ctx context.Context) context.Context
filtersMap *subscriptionsapi.FiltersMap
@@ -83,7 +85,7 @@ type Handler struct {
}
// NewHandler creates a new Handler and its associated EventReceiver.
-func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, reporter StatsReporter, wc func(ctx context.Context) context.Context) (*Handler, error) {
+func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, brokerInformer v1.BrokerInformer, reporter StatsReporter, wc func(ctx context.Context) context.Context) (*Handler, error) {
kncloudevents.ConfigureConnectionArgs(&kncloudevents.ConnectionArgs{
MaxIdleConns: defaultMaxIdleConnections,
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
@@ -134,6 +136,7 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcT
reporter: reporter,
eventDispatcher: kncloudevents.NewDispatcher(oidcTokenProvider),
triggerLister: triggerInformer.Lister(),
+ brokerLister: brokerInformer.Lister(),
logger: logger,
tokenVerifier: tokenVerifier,
withContext: wc,
@@ -141,13 +144,9 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcT
}, nil
}
-// 1. validate request
-// 2. extract event from request
-// 3. get trigger from its trigger reference extracted from the request URI
-// 4. filter event
-// 5. send event to trigger's subscriber
-// 6. write the response
func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
+ ctx := h.withContext(request.Context())
+
writer.Header().Set("Allow", "POST")
if request.Method != http.MethodPost {
@@ -162,7 +161,12 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
return
}
- ctx := h.withContext(request.Context())
+ trigger, err := h.getTrigger(triggerRef)
+ if err != nil {
+ h.logger.Info("Unable to get the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef))
+ writer.WriteHeader(http.StatusBadRequest)
+ return
+ }
event, err := cehttp.NewEventFromHTTPRequest(request)
if err != nil {
@@ -171,6 +175,8 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
return
}
+ h.logger.Debug("Received message", zap.Any("trigger", triggerRef.NamespacedName), zap.Stringer("event", event))
+
ctx, span := trace.StartSpan(ctx, tracing.TriggerMessagingDestination(triggerRef.NamespacedName))
defer span.End()
@@ -184,30 +190,6 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
span.AddAttributes(opencensusclient.EventTraceAttributes(event)...)
}
- // Remove the TTL attribute that is used by the Broker.
- ttl, err := broker.GetTTL(event.Context)
- if err != nil {
- // Only messages sent by the Broker should be here. If the attribute isn't here, then the
- // event wasn't sent by the Broker, so we can drop it.
- h.logger.Warn("No TTL seen, dropping", zap.Any("triggerRef", triggerRef), zap.Any("event", event))
- // Return a BadRequest error, so the upstream can decide how to handle it, e.g. sending
- // the message to a Dead Letter Sink.
- writer.WriteHeader(http.StatusBadRequest)
- return
- }
- if err := broker.DeleteTTL(event.Context); err != nil {
- h.logger.Warn("Failed to delete TTL.", zap.Error(err))
- }
-
- h.logger.Debug("Received message", zap.Any("triggerRef", triggerRef))
-
- t, err := h.getTrigger(triggerRef)
- if err != nil {
- h.logger.Info("Unable to get the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef))
- writer.WriteHeader(http.StatusBadRequest)
- return
- }
-
features := feature.FromContext(ctx)
if features.IsOIDCAuthentication() {
h.logger.Debug("OIDC authentication is enabled")
@@ -227,14 +209,108 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
h.logger.Debug("Request contained a valid JWT. Continuing...")
}
+ if triggerRef.IsReply {
+ h.handleDispatchToReplyRequest(ctx, trigger, writer, request, event)
+ return
+ }
+
+ if triggerRef.IsDLS {
+ h.handleDispatchToDLSRequest(ctx, trigger, writer, request, event)
+ return
+ }
+
+ h.handleDispatchToSubscriberRequest(ctx, trigger, writer, request, event)
+}
+
+func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) {
+ broker, err := h.brokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker)
+ if err != nil {
+ h.logger.Info("Unable to get the Broker", zap.Error(err))
+ writer.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ target := broker.Status.Address
+
reportArgs := &ReportArgs{
- ns: t.Namespace,
- trigger: t.Name,
- broker: t.Spec.Broker,
- filterType: triggerFilterAttribute(t.Spec.Filter, "type"),
+ ns: trigger.Namespace,
+ trigger: trigger.Name,
+ broker: trigger.Spec.Broker,
+ requestType: "reply_forward",
}
- subscriberURI := t.Status.SubscriberURI
+ h.logger.Info("sending to reply", zap.Any("target", target))
+
+ // since the broker-filter acts here like a proxy, we don't filter headers
+ h.send(ctx, writer, request.Header, *target, reportArgs, event, trigger, skipTTL)
+}
+
+func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) {
+ broker, err := h.brokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker)
+ if err != nil {
+ h.logger.Info("Unable to get the Broker", zap.Error(err))
+ writer.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ var target *duckv1.Addressable
+ if trigger.Status.DeadLetterSinkURI != nil {
+ target = &duckv1.Addressable{
+ URL: trigger.Status.DeadLetterSinkURI,
+ CACerts: trigger.Status.DeadLetterSinkCACerts,
+ Audience: trigger.Status.DeadLetterSinkAudience,
+ }
+ } else if broker.Status.DeadLetterSinkURI != nil {
+ target = &duckv1.Addressable{
+ URL: broker.Status.DeadLetterSinkURI,
+ CACerts: broker.Status.DeadLetterSinkCACerts,
+ Audience: broker.Status.DeadLetterSinkAudience,
+ }
+ }
+
+ reportArgs := &ReportArgs{
+ ns: trigger.Namespace,
+ trigger: trigger.Name,
+ broker: trigger.Spec.Broker,
+ requestType: "dls_forward",
+ }
+
+ h.logger.Info("sending to dls", zap.Any("target", target))
+
+ // since the broker-filter acts here like a proxy, we don't filter headers
+ h.send(ctx, writer, request.Header, *target, reportArgs, event, trigger, skipTTL)
+}
+
+func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) {
+ triggerRef := types.NamespacedName{
+ Name: trigger.Name,
+ Namespace: trigger.Namespace,
+ }
+
+ // Remove the TTL attribute that is used by the Broker.
+ ttl, err := eventingbroker.GetTTL(event.Context)
+ if err != nil {
+ // Only messages sent by the Broker should be here. If the attribute isn't here, then the
+ // event wasn't sent by the Broker, so we can drop it.
+ h.logger.Warn("No TTL seen, dropping", zap.Any("triggerRef", triggerRef), zap.Any("event", event))
+ // Return a BadRequest error, so the upstream can decide how to handle it, e.g. sending
+ // the message to a Dead Letter Sink.
+ writer.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ if err := eventingbroker.DeleteTTL(event.Context); err != nil {
+ h.logger.Warn("Failed to delete TTL.", zap.Error(err))
+ }
+
+ reportArgs := &ReportArgs{
+ ns: trigger.Namespace,
+ trigger: trigger.Name,
+ broker: trigger.Spec.Broker,
+ filterType: triggerFilterAttribute(trigger.Spec.Filter, "type"),
+ requestType: "filter",
+ }
+
+ subscriberURI := trigger.Status.SubscriberURI
if subscriberURI == nil {
// Record the event count.
writer.WriteHeader(http.StatusBadRequest)
@@ -243,8 +319,8 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
}
// Check if the event should be sent.
- ctx = logging.WithLogger(ctx, h.logger.Sugar().With(zap.String("trigger", fmt.Sprintf("%s/%s", t.GetNamespace(), t.GetName()))))
- filterResult := h.filterEvent(ctx, t, *event)
+ ctx = logging.WithLogger(ctx, h.logger.Sugar().With(zap.String("trigger", fmt.Sprintf("%s/%s", trigger.GetNamespace(), trigger.GetName()))))
+ filterResult := h.filterEvent(ctx, trigger, *event)
if filterResult == eventfilter.FailFilter {
// We do not count the event. The event will be counted in the broker ingress.
@@ -255,11 +331,12 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
h.reportArrivalTime(event, reportArgs)
target := duckv1.Addressable{
- URL: t.Status.SubscriberURI,
- CACerts: t.Status.SubscriberCACerts,
- Audience: t.Status.SubscriberAudience,
+ URL: trigger.Status.SubscriberURI,
+ CACerts: trigger.Status.SubscriberCACerts,
+ Audience: trigger.Status.SubscriberAudience,
}
- h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, t, ttl)
+
+ h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, trigger, ttl)
}
func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) {
@@ -295,7 +372,7 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers
writer.WriteHeader(dispatchInfo.ResponseCode)
// Read Response body to responseErr
- errExtensionInfo := broker.ErrExtensionInfo{
+ errExtensionInfo := eventingbroker.ErrExtensionInfo{
ErrDestination: target.URL,
ErrResponseBody: dispatchInfo.ResponseBody,
}
@@ -359,10 +436,12 @@ func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter,
return http.StatusBadGateway, err
}
- // Reattach the TTL (with the same value) to the response event before sending it to the Broker.
- if err := broker.SetTTL(event.Context, ttl); err != nil {
- writer.WriteHeader(http.StatusInternalServerError)
- return http.StatusInternalServerError, fmt.Errorf("failed to reset TTL: %w", err)
+ if ttl != skipTTL {
+ // Reattach the TTL (with the same value) to the response event before sending it to the Broker.
+ if err := eventingbroker.SetTTL(event.Context, ttl); err != nil {
+ writer.WriteHeader(http.StatusInternalServerError)
+ return http.StatusInternalServerError, fmt.Errorf("failed to reset TTL: %w", err)
+ }
}
eventResponse := binding.ToMessage(event)
@@ -384,7 +463,7 @@ func (h *Handler) reportArrivalTime(event *event.Event, reportArgs *ReportArgs)
// Record the event processing time. This might be off if the receiver and the filter pods are running in
// different nodes with different clocks.
var arrivalTimeStr string
- if extErr := event.ExtensionAs(broker.EventArrivalTime, &arrivalTimeStr); extErr == nil {
+ if extErr := event.ExtensionAs(eventingbroker.EventArrivalTime, &arrivalTimeStr); extErr == nil {
arrivalTime, err := time.Parse(time.RFC3339, arrivalTimeStr)
if err == nil {
_ = h.reporter.ReportEventProcessingTime(reportArgs, time.Since(arrivalTime))
diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go
index 4fec820722e..1444f643094 100644
--- a/pkg/broker/filter/filter_handler_test.go
+++ b/pkg/broker/filter/filter_handler_test.go
@@ -38,6 +38,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
+ v1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/broker"
@@ -46,6 +47,7 @@ import (
"knative.dev/pkg/logging"
reconcilertesting "knative.dev/pkg/reconciler/testing"
+ brokerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake"
triggerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake"
// Fake injection client
@@ -416,6 +418,9 @@ func TestReceiver(t *testing.T) {
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
+ ctx = feature.ToContext(ctx, feature.Flags{
+ feature.OIDCAuthentication: feature.Enabled,
+ })
fh := fakeHandler{
failRequest: tc.requestFails,
@@ -433,8 +438,8 @@ func TestReceiver(t *testing.T) {
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx)
- // Replace the SubscriberURI to point at our fake server.
for _, trig := range tc.triggers {
+ // Replace the SubscriberURI to point at our fake server.
if trig.Status.SubscriberURI != nil && trig.Status.SubscriberURI.String() == toBeReplaced {
url, err := apis.ParseURL(s.URL)
@@ -444,13 +449,24 @@ func TestReceiver(t *testing.T) {
trig.Status.SubscriberURI = url
}
triggerinformerfake.Get(ctx).Informer().GetStore().Add(trig)
+
+ // create the needed broker object
+ b := &v1.Broker{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: trig.Spec.Broker,
+ Namespace: trig.Namespace,
+ },
+ }
+ brokerinformerfake.Get(ctx).Informer().GetStore().Add(b)
}
+
reporter := &mockReporter{}
r, err := NewHandler(
logger,
oidcTokenVerifier,
oidcTokenProvider,
triggerinformerfake.Get(ctx),
+ brokerinformerfake.Get(ctx),
reporter,
func(ctx context.Context) context.Context {
return ctx
@@ -632,6 +648,15 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {
}
triggerinformerfake.Get(ctx).Informer().GetStore().Add(trig)
filtersMap.Set(trig, createSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trig))
+
+ // create the needed broker object
+ b := &v1.Broker{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: trig.Spec.Broker,
+ Namespace: trig.Namespace,
+ },
+ }
+ brokerinformerfake.Get(ctx).Informer().GetStore().Add(b)
}
reporter := &mockReporter{}
r, err := NewHandler(
@@ -639,6 +664,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {
oidcTokenVerifier,
oidcTokenProvider,
triggerinformerfake.Get(ctx),
+ brokerinformerfake.Get(ctx),
reporter,
func(ctx context.Context) context.Context {
return feature.ToContext(context.TODO(), feature.Flags{
diff --git a/pkg/broker/filter/stats_reporter.go b/pkg/broker/filter/stats_reporter.go
index 21e71637aef..246e4d52bbf 100644
--- a/pkg/broker/filter/stats_reporter.go
+++ b/pkg/broker/filter/stats_reporter.go
@@ -67,16 +67,18 @@ var (
// go.opencensus.io/tag/validate.go. Currently those restrictions are:
// - length between 1 and 255 inclusive
// - characters are printable US-ASCII
- triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType)
- responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode)
- responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass)
+ triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType)
+ triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type")
+ responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode)
+ responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass)
)
type ReportArgs struct {
- ns string
- trigger string
- broker string
- filterType string
+ ns string
+ trigger string
+ broker string
+ filterType string
+ requestType string
}
func init() {
@@ -114,19 +116,19 @@ func register() {
Description: eventCountM.Description(),
Measure: eventCountM,
Aggregation: view.Count(),
- TagKeys: []tag.Key{triggerFilterTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
+ TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: dispatchTimeInMsecM.Description(),
Measure: dispatchTimeInMsecM,
Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000
- TagKeys: []tag.Key{triggerFilterTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
+ TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: processingTimeInMsecM.Description(),
Measure: processingTimeInMsecM,
Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000
- TagKeys: []tag.Key{triggerFilterTypeKey, broker.UniqueTagKey, broker.ContainerTagKey},
+ TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
)
if err != nil {
@@ -187,6 +189,7 @@ func (r *reporter) generateTag(args *ReportArgs, tags ...tag.Mutator) (context.C
tag.Insert(broker.ContainerTagKey, r.container),
tag.Insert(broker.UniqueTagKey, r.uniqueName),
tag.Insert(triggerFilterTypeKey, valueOrAny(args.filterType)),
+ tag.Insert(triggerFilterRequestTypeKey, args.requestType),
)...)
return ctx, err
}
diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go
index bd919bdd313..b40894b6dfc 100644
--- a/pkg/reconciler/broker/broker.go
+++ b/pkg/reconciler/broker/broker.go
@@ -33,6 +33,7 @@ import (
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/utils/pointer"
"knative.dev/pkg/kmeta"
+ "knative.dev/pkg/resolver"
"knative.dev/pkg/apis"
duckapis "knative.dev/pkg/apis/duck"
@@ -74,6 +75,8 @@ type Reconciler struct {
channelableTracker ducklib.ListableTracker
+ uriResolver *resolver.URIResolver
+
// If specified, only reconcile brokers with these labels
brokerClass string
}
@@ -186,11 +189,16 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk
b.Status.PropagateIngressAvailability(ingressEndpoints)
if b.Spec.Delivery != nil && b.Spec.Delivery.DeadLetterSink != nil {
- if triggerChan.Status.DeliveryStatus.IsSet() {
- b.Status.MarkDeadLetterSinkResolvedSucceeded(triggerChan.Status.DeliveryStatus)
- } else {
- b.Status.MarkDeadLetterSinkResolvedFailed(fmt.Sprintf("Channel %s didn't set status.deadLetterSinkURI", triggerChan.Name), "")
+ deadLetterSinkAddr, err := r.uriResolver.AddressableFromDestinationV1(ctx, *b.Spec.Delivery.DeadLetterSink, b)
+ logging.FromContext(ctx).Errorw("broker has deliver spec set. Will use it to mark dls status", zap.Any("dls-addr", deadLetterSinkAddr), zap.Any("broker.spec.delivery", b.Spec.Delivery))
+ if err != nil {
+ b.Status.DeliveryStatus = duckv1.DeliveryStatus{}
+ logging.FromContext(ctx).Errorw("Unable to get the dead letter sink's URI", zap.Error(err))
+ b.Status.MarkDeadLetterSinkResolvedFailed("Unable to get the dead letter sink's URI", "%v", err)
+ return err
}
+ ds := duckv1.NewDeliveryStatusFromAddressable(deadLetterSinkAddr)
+ b.Status.MarkDeadLetterSinkResolvedSucceeded(ds)
} else {
b.Status.MarkDeadLetterSinkNotConfigured()
}
diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go
index b10a7891991..0bfe7b27e09 100644
--- a/pkg/reconciler/broker/broker_test.go
+++ b/pkg/reconciler/broker/broker_test.go
@@ -33,6 +33,7 @@ import (
clientgotesting "k8s.io/client-go/testing"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
+ v1addr "knative.dev/pkg/client/injection/ducks/duck/v1/addressable"
v1a1addr "knative.dev/pkg/client/injection/ducks/duck/v1alpha1/addressable"
v1b1addr "knative.dev/pkg/client/injection/ducks/duck/v1beta1/addressable"
"knative.dev/pkg/configmap"
@@ -40,6 +41,7 @@ import (
fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake"
logtesting "knative.dev/pkg/logging/testing"
"knative.dev/pkg/network"
+ "knative.dev/pkg/resolver"
"knative.dev/pkg/tracker"
"knative.dev/eventing/pkg/apis/eventing"
@@ -788,6 +790,7 @@ func TestReconcile(t *testing.T) {
ctx = channelable.WithDuck(ctx)
ctx = v1a1addr.WithDuck(ctx)
ctx = v1b1addr.WithDuck(ctx)
+ ctx = v1addr.WithDuck(ctx)
cm, err := listers.GetConfigMapLister().ConfigMaps(testNS).Get("config-features")
if err == nil {
@@ -806,6 +809,7 @@ func TestReconcile(t *testing.T) {
configmapLister: listers.GetConfigMapLister(),
secretLister: listers.GetSecretLister(),
channelableTracker: duck.NewListableTrackerFromTracker(ctx, channelable.Get, tracker.New(func(types.NamespacedName) {}, 0)),
+ uriResolver: resolver.NewURIResolverFromTracker(ctx, tracker.New(func(types.NamespacedName) {}, 0)),
}
return broker.NewReconciler(ctx, logger,
fakeeventingclient.Get(ctx), listers.GetBrokerLister(),
diff --git a/pkg/reconciler/broker/controller.go b/pkg/reconciler/broker/controller.go
index 1e946b8846b..d581a90cdb2 100644
--- a/pkg/reconciler/broker/controller.go
+++ b/pkg/reconciler/broker/controller.go
@@ -30,6 +30,7 @@ import (
secretinformer "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret"
"knative.dev/pkg/logging"
pkgreconciler "knative.dev/pkg/reconciler"
+ "knative.dev/pkg/resolver"
"knative.dev/pkg/system"
"knative.dev/pkg/tracing"
tracingconfig "knative.dev/pkg/tracing/config"
@@ -109,6 +110,7 @@ func NewController(
})
r.channelableTracker = duck.NewListableTrackerFromTracker(ctx, channelable.Get, impl.Tracker)
+ r.uriResolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker)
brokerInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: brokerFilter,
diff --git a/pkg/reconciler/broker/resources/subscription.go b/pkg/reconciler/broker/resources/subscription.go
index 6db259df9c7..99246aa05a7 100644
--- a/pkg/reconciler/broker/resources/subscription.go
+++ b/pkg/reconciler/broker/resources/subscription.go
@@ -33,7 +33,7 @@ import (
// NewSubscription returns a placeholder subscription for trigger 't', from brokerTrigger to 'dest'
// replying to brokerIngress.
-func NewSubscription(t *eventingv1.Trigger, brokerTrigger, brokerRef *corev1.ObjectReference, dest *duckv1.Destination, delivery *eventingduckv1.DeliverySpec) *messagingv1.Subscription {
+func NewSubscription(t *eventingv1.Trigger, brokerTrigger *corev1.ObjectReference, dest, reply *duckv1.Destination, delivery *eventingduckv1.DeliverySpec) *messagingv1.Subscription {
return &messagingv1.Subscription{
ObjectMeta: metav1.ObjectMeta{
Namespace: t.Namespace,
@@ -50,15 +50,8 @@ func NewSubscription(t *eventingv1.Trigger, brokerTrigger, brokerRef *corev1.Obj
Name: brokerTrigger.Name,
},
Subscriber: dest,
- Reply: &duckv1.Destination{
- Ref: &duckv1.KReference{
- APIVersion: brokerRef.APIVersion,
- Kind: brokerRef.Kind,
- Name: brokerRef.Name,
- Namespace: brokerRef.Namespace,
- },
- },
- Delivery: delivery,
+ Reply: reply,
+ Delivery: delivery,
},
}
}
diff --git a/pkg/reconciler/broker/resources/subscription_test.go b/pkg/reconciler/broker/resources/subscription_test.go
index eaed2125cef..5002f24eb86 100644
--- a/pkg/reconciler/broker/resources/subscription_test.go
+++ b/pkg/reconciler/broker/resources/subscription_test.go
@@ -47,12 +47,6 @@ func TestNewSubscription(t *testing.T) {
Kind: "tc-kind",
APIVersion: "tc-apiVersion",
}
- brokerRef := &corev1.ObjectReference{
- Name: "broker-name",
- Namespace: "t-namespace",
- Kind: "broker-kind",
- APIVersion: "broker-apiVersion",
- }
delivery := &eventingduckv1.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
URI: apis.HTTP("dlc.example.com"),
@@ -61,7 +55,15 @@ func TestNewSubscription(t *testing.T) {
dest := &duckv1.Destination{
URI: apis.HTTP("example.com"),
}
- got := NewSubscription(trigger, triggerChannelRef, brokerRef, dest, delivery)
+ reply := &duckv1.Destination{
+ Ref: &duckv1.KReference{
+ Name: "broker-name",
+ Namespace: "t-namespace",
+ Kind: "broker-kind",
+ APIVersion: "broker-apiVersion",
+ },
+ }
+ got := NewSubscription(trigger, triggerChannelRef, dest, reply, delivery)
want := &messagingv1.Subscription{
ObjectMeta: metav1.ObjectMeta{
Namespace: "t-namespace",
diff --git a/pkg/reconciler/broker/trigger/trigger.go b/pkg/reconciler/broker/trigger/trigger.go
index c9071e3e2ee..69ca52104b6 100644
--- a/pkg/reconciler/broker/trigger/trigger.go
+++ b/pkg/reconciler/broker/trigger/trigger.go
@@ -199,7 +199,7 @@ func (r *Reconciler) resolveDeadLetterSink(ctx context.Context, b *eventingv1.Br
// subscribeToBrokerChannel subscribes service 'svc' to the Broker's channels.
func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1.Broker, t *eventingv1.Trigger, brokerTrigger *corev1.ObjectReference) (*messagingv1.Subscription, error) {
- var dest *duckv1.Destination
+ var dest, reply, dls *duckv1.Destination
featureFlags := feature.FromContext(ctx)
if featureFlags.IsPermissiveTransportEncryption() || featureFlags.IsStrictTransportEncryption() {
caCerts, err := r.getCaCerts()
@@ -215,6 +215,24 @@ func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1
},
CACerts: pointer.String(caCerts),
}
+
+ reply = &duckv1.Destination{
+ URI: &apis.URL{
+ Scheme: "https",
+ Host: network.GetServiceHostname("broker-filter", system.Namespace()),
+ Path: path.GenerateReply(t),
+ },
+ CACerts: pointer.String(caCerts),
+ }
+
+ dls = &duckv1.Destination{
+ URI: &apis.URL{
+ Scheme: "https",
+ Host: network.GetServiceHostname("broker-filter", system.Namespace()),
+ Path: path.GenerateDLS(t),
+ },
+ CACerts: pointer.String(caCerts),
+ }
} else {
dest = &duckv1.Destination{
URI: &apis.URL{
@@ -223,30 +241,57 @@ func (r *Reconciler) subscribeToBrokerChannel(ctx context.Context, b *eventingv1
Path: path.Generate(t),
},
}
- }
- if featureFlags.IsOIDCAuthentication() {
- dest.Audience = pointer.String(filter.FilterAudience)
- }
+ reply = &duckv1.Destination{
+ URI: &apis.URL{
+ Scheme: "http",
+ Host: network.GetServiceHostname("broker-filter", system.Namespace()),
+ Path: path.GenerateReply(t),
+ },
+ }
- // Note that we have to hard code the brokerGKV stuff as sometimes typemeta is not
- // filled in. So instead of b.TypeMeta.Kind and b.TypeMeta.APIVersion, we have to
- // do it this way.
- brokerObjRef := &corev1.ObjectReference{
- Kind: brokerGVK.Kind,
- APIVersion: brokerGVK.GroupVersion().String(),
- Name: b.Name,
- Namespace: b.Namespace,
+ dls = &duckv1.Destination{
+ URI: &apis.URL{
+ Scheme: "http",
+ Host: network.GetServiceHostname("broker-filter", system.Namespace()),
+ Path: path.GenerateDLS(t),
+ },
+ }
}
- delivery := t.Spec.Delivery
+ delivery := t.Spec.Delivery.DeepCopy() // copy object to avoid in-place update bugs
if delivery == nil {
- delivery = b.Spec.Delivery
+ delivery = b.Spec.Delivery.DeepCopy() // copy object to avoid in-place update bugs
}
recorder := controller.GetEventRecorder(ctx)
- expected := resources.NewSubscription(t, brokerTrigger, brokerObjRef, dest, delivery)
+ var expected *messagingv1.Subscription
+ if featureFlags.IsOIDCAuthentication() {
+ dest.Audience = pointer.String(filter.FilterAudience)
+ reply.Audience = pointer.String(filter.FilterAudience)
+ dls.Audience = pointer.String(filter.FilterAudience)
+
+ if delivery != nil && delivery.DeadLetterSink != nil {
+ delivery.DeadLetterSink = dls
+ }
+
+ expected = resources.NewSubscription(t, brokerTrigger, dest, reply, delivery)
+ } else {
+ // in case OIDC is not enabled, we don't need to route everything throuh
+ // broker-filter because we need it only then to add the token from the
+ // trigger OIDC service account
+ reply = &duckv1.Destination{
+ Ref: &duckv1.KReference{
+ APIVersion: brokerGVK.GroupVersion().String(),
+ Kind: brokerGVK.Kind,
+ Name: b.Name,
+ Namespace: b.Namespace,
+ },
+ }
+
+ expected = resources.NewSubscription(t, brokerTrigger, dest, reply, delivery)
+ }
sub, err := r.subscriptionLister.Subscriptions(t.Namespace).Get(expected.Name)
// If the resource doesn't exist, we'll create it.
diff --git a/pkg/reconciler/broker/trigger/trigger_test.go b/pkg/reconciler/broker/trigger/trigger_test.go
index 71906abf4f6..c7582c7ff44 100644
--- a/pkg/reconciler/broker/trigger/trigger_test.go
+++ b/pkg/reconciler/broker/trigger/trigger_test.go
@@ -42,6 +42,7 @@ import (
"knative.dev/pkg/network"
"knative.dev/pkg/ptr"
"knative.dev/pkg/resolver"
+ "knative.dev/pkg/system"
"knative.dev/pkg/tracker"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
@@ -299,7 +300,7 @@ func TestReconcile(t *testing.T) {
WithTriggerRetry(5, nil, nil)),
},
WantCreates: []runtime.Object{
- resources.NewSubscription(makeTrigger(testNS), createTriggerChannelRef(), makeBrokerRef(), makeServiceURI(), makeDelivery(nil, ptr.Int32(5), nil, nil)),
+ resources.NewSubscription(makeTrigger(testNS), createTriggerChannelRef(), makeServiceURI(), makeBrokerRef(), makeDelivery(nil, ptr.Int32(5), nil, nil)),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewTrigger(triggerName, testNS, brokerName,
@@ -333,7 +334,7 @@ func TestReconcile(t *testing.T) {
WithTriggerDeadLeaderSink(duckv1.Destination{URI: dlsURL})),
},
WantCreates: []runtime.Object{
- resources.NewSubscription(makeTrigger(testNS), createTriggerChannelRef(), makeBrokerRef(), makeServiceURI(), makeDelivery(&duckv1.Destination{URI: dlsURL}, nil, nil, nil)),
+ resources.NewSubscription(makeTrigger(testNS), createTriggerChannelRef(), makeServiceURI(), makeBrokerRef(), makeDelivery(&duckv1.Destination{URI: dlsURL}, nil, nil, nil)),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewTrigger(triggerName, testNS, brokerName,
@@ -377,8 +378,8 @@ func TestReconcile(t *testing.T) {
resources.NewSubscription(
makeTrigger(testNS),
createTriggerChannelRef(),
- makeBrokerRef(),
makeServiceURI(),
+ makeBrokerRef(),
makeDelivery(&duckv1.Destination{URI: dlsURL, CACerts: pointer.String(string(eventingtlstesting.CA))}, nil, nil, nil),
),
},
@@ -439,8 +440,8 @@ func TestReconcile(t *testing.T) {
resources.NewSubscription(
makeTrigger(testNS),
createTriggerChannelRef(),
- makeBrokerRef(),
makeServiceURIHTTPS(),
+ makeBrokerRef(),
makeDelivery(&duckv1.Destination{URI: dlsURL, CACerts: pointer.String(string(eventingtlstesting.CA))}, nil, nil, nil),
),
},
@@ -501,8 +502,8 @@ func TestReconcile(t *testing.T) {
resources.NewSubscription(
makeTrigger(testNS),
createTriggerChannelRef(),
- makeBrokerRef(),
makeServiceURIHTTPS(),
+ makeBrokerRef(),
makeDelivery(&duckv1.Destination{URI: dlsURL, CACerts: pointer.String(string(eventingtlstesting.CA))}, nil, nil, nil),
),
},
@@ -563,8 +564,8 @@ func TestReconcile(t *testing.T) {
resources.NewSubscription(
makeTrigger(testNS),
createTriggerChannelRef(),
- makeBrokerRef(),
makeServiceURI(),
+ makeBrokerRef(),
makeDelivery(&duckv1.Destination{URI: dlsURL, CACerts: pointer.String(string(eventingtlstesting.CA))}, nil, nil, nil),
),
},
@@ -962,8 +963,8 @@ func TestReconcile(t *testing.T) {
resources.NewSubscription(
makeTrigger(testNS),
createTriggerChannelRef(),
- makeBrokerRef(),
makeServiceURI(),
+ makeBrokerRef(),
makeDelivery(&dlsSVCDest, nil, nil, nil),
),
},
@@ -1029,8 +1030,8 @@ func TestReconcile(t *testing.T) {
resources.NewSubscription(
makeTrigger(testNS),
createTriggerChannelRef(),
- makeBrokerRef(),
makeServiceURI(),
+ makeBrokerRef(),
makeDelivery(&dlsSVCDest, nil, nil, nil),
),
},
@@ -1091,8 +1092,8 @@ func TestReconcile(t *testing.T) {
resources.NewSubscription(
makeTrigger(testNS),
createTriggerChannelRef(),
- makeBrokerRef(),
makeServiceURI(),
+ makeBrokerRef(),
makeDelivery(&dlsSVCDest, nil, nil, nil),
),
},
@@ -1154,8 +1155,8 @@ func TestReconcile(t *testing.T) {
resources.NewSubscription(
makeTrigger(testNS),
createTriggerChannelRef(),
- makeBrokerRef(),
makeServiceURI(),
+ makeBrokerRef(),
makeDelivery(&dlsSVCDest, nil, nil, nil),
),
},
@@ -1590,7 +1591,56 @@ func TestReconcile(t *testing.T) {
),
}},
WantCreates: []runtime.Object{
- resources.NewSubscription(makeTrigger(testNS), createTriggerChannelRef(), makeBrokerRef(), makeServiceURIWithAudience(), makeEmptyDelivery()),
+ resources.NewSubscription(makeTrigger(testNS), createTriggerChannelRef(), makeServiceURIWithAudience(), makeReplyDestinationViaBrokerFilter(), makeEmptyDelivery()),
+ },
+ WantDeletes: []clientgotesting.DeleteActionImpl{{
+ ActionImpl: clientgotesting.ActionImpl{
+ Namespace: testNS,
+ Resource: eventingduckv1.SchemeGroupVersion.WithResource("subscriptions"),
+ },
+ Name: subscriptionName,
+ }},
+ },
+ {
+ Name: "OIDC: Route reply & DLS via broker-filter",
+ Key: testKey,
+ Ctx: feature.ToContext(context.Background(), feature.Flags{
+ feature.OIDCAuthentication: feature.Enabled,
+ }),
+ Objects: allBrokerObjectsReadyPlus([]runtime.Object{
+ makeReadySubscription(testNS),
+ makeTriggerOIDCServiceAccount(),
+ NewTrigger(triggerName, testNS, brokerName,
+ WithTriggerUID(triggerUID),
+ WithTriggerSubscriberURI(subscriberURI),
+ WithInitTriggerConditions,
+ WithTriggerDeadLeaderSink(duckv1.Destination{URI: dlsURL}),
+ WithTriggerDeadLetterSinkResolvedSucceeded(),
+ WithTriggerStatusDeadLetterSinkURI(duckv1.Addressable{URL: dlsURL}),
+ )}...),
+ WantErr: false,
+ WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
+ Object: NewTrigger(triggerName, testNS, brokerName,
+ WithTriggerUID(triggerUID),
+ WithTriggerSubscriberURI(subscriberURI),
+ WithTriggerBrokerReady(),
+ // The first reconciliation will initialize the status conditions.
+ WithInitTriggerConditions,
+ WithTriggerDependencyReady(),
+ WithTriggerSubscribed(),
+ WithTriggerStatusSubscriberURI(subscriberURI),
+ WithTriggerSubscriberResolvedSucceeded(),
+ WithTriggerDeadLetterSinkNotConfigured(),
+ WithTriggerSubscriptionNotConfigured(),
+ WithTriggerOIDCIdentityCreatedSucceeded(),
+ WithTriggerOIDCServiceAccountName(makeTriggerOIDCServiceAccount().Name),
+ WithTriggerDeadLeaderSink(duckv1.Destination{URI: dlsURL}),
+ WithTriggerDeadLetterSinkResolvedSucceeded(),
+ WithTriggerStatusDeadLetterSinkURI(duckv1.Addressable{URL: dlsURL}),
+ ),
+ }},
+ WantCreates: []runtime.Object{
+ resources.NewSubscription(makeTrigger(testNS), createTriggerChannelRef(), makeServiceURIWithAudience(), makeReplyDestinationViaBrokerFilter(), makeDLSViaBrokerFilter()),
},
WantDeletes: []clientgotesting.DeleteActionImpl{{
ActionImpl: clientgotesting.ActionImpl{
@@ -1759,7 +1809,7 @@ func makeServiceURIHTTPS() *duckv1.Destination {
}
func makeFilterSubscription(subscriberNamespace string) *messagingv1.Subscription {
- return resources.NewSubscription(makeTrigger(subscriberNamespace), createTriggerChannelRef(), makeBrokerRef(), makeServiceURI(), makeEmptyDelivery())
+ return resources.NewSubscription(makeTrigger(subscriberNamespace), createTriggerChannelRef(), makeServiceURI(), makeBrokerRef(), makeEmptyDelivery())
}
func makeTrigger(subscriberNamespace string) *eventingv1.Trigger {
@@ -1790,12 +1840,25 @@ func makeTrigger(subscriberNamespace string) *eventingv1.Trigger {
}
}
-func makeBrokerRef() *corev1.ObjectReference {
- return &corev1.ObjectReference{
- APIVersion: "eventing.knative.dev/v1",
- Kind: "Broker",
- Namespace: testNS,
- Name: brokerName,
+func makeBrokerRef() *duckv1.Destination {
+ return &duckv1.Destination{
+ Ref: &duckv1.KReference{
+ APIVersion: "eventing.knative.dev/v1",
+ Kind: "Broker",
+ Namespace: testNS,
+ Name: brokerName,
+ },
+ }
+}
+
+func makeReplyDestinationViaBrokerFilter() *duckv1.Destination {
+ return &duckv1.Destination{
+ URI: &apis.URL{
+ Scheme: "http",
+ Host: network.GetServiceHostname("broker-filter", system.Namespace()),
+ Path: fmt.Sprintf("/triggers/%s/%s/%s/reply", testNS, triggerName, triggerUID),
+ },
+ Audience: pointer.String(filter.FilterAudience),
}
}
@@ -1813,6 +1876,20 @@ func makeDelivery(dls *duckv1.Destination, retry *int32, backoffPolicy *eventing
return ds
}
+func makeDLSViaBrokerFilter() *eventingduckv1.DeliverySpec {
+ ds := &eventingduckv1.DeliverySpec{
+ DeadLetterSink: &duckv1.Destination{
+ URI: &apis.URL{
+ Scheme: "http",
+ Host: network.GetServiceHostname("broker-filter", system.Namespace()),
+ Path: fmt.Sprintf("/triggers/%s/%s/%s/dls", testNS, triggerName, triggerUID),
+ },
+ Audience: pointer.String(filter.FilterAudience),
+ },
+ }
+ return ds
+}
+
func allBrokerObjectsReadyPlus(objs ...runtime.Object) []runtime.Object {
brokerObjs := []runtime.Object{
NewBroker(brokerName, testNS,
@@ -1861,6 +1938,7 @@ func makeReadySubscription(subscriberNamespace string) *messagingv1.Subscription
func makeReadySubscriptionWithAudience(subscriberNamespace string) *messagingv1.Subscription {
s := makeReadySubscription(subscriberNamespace)
s.Spec.Subscriber.Audience = ptr.String(filter.FilterAudience)
+ s.Spec.Reply = makeReplyDestinationViaBrokerFilter() // for OIDC the reply requests get routed via the filter
return s
}
diff --git a/pkg/reconciler/sugar/trigger/path/path.go b/pkg/reconciler/sugar/trigger/path/path.go
index c7114f17cae..0e4106f2541 100644
--- a/pkg/reconciler/sugar/trigger/path/path.go
+++ b/pkg/reconciler/sugar/trigger/path/path.go
@@ -18,6 +18,7 @@ package path
import (
"fmt"
+ "path"
"strings"
"k8s.io/apimachinery/pkg/types"
@@ -25,7 +26,9 @@ import (
)
const (
- prefix = "triggers"
+ prefix = "triggers"
+ replySuffix = "reply"
+ dlsSuffix = "dls"
)
// Generate generates the Path portion of a URI to send events to the given Trigger.
@@ -33,17 +36,27 @@ func Generate(t *v1.Trigger) string {
return fmt.Sprintf("/%s/%s/%s/%s", prefix, t.Namespace, t.Name, t.UID)
}
+func GenerateReply(t *v1.Trigger) string {
+ return path.Join(Generate(t), replySuffix)
+}
+
+func GenerateDLS(t *v1.Trigger) string {
+ return path.Join(Generate(t), dlsSuffix)
+}
+
type NamespacedNameUID struct {
types.NamespacedName
- UID types.UID
+ UID types.UID
+ IsReply bool
+ IsDLS bool
}
// Parse parses the Path portion of a URI to determine which Trigger the request corresponds to. It
-// is expected to be in the form "/triggers/namespace/name/uid".
+// is expected to be in the form "/triggers/namespace/name/uid" and eventually a "/reply" or "/dls" suffix.
func Parse(path string) (NamespacedNameUID, error) {
parts := strings.Split(path, "/")
- if len(parts) != 5 {
- return NamespacedNameUID{}, fmt.Errorf("incorrect number of parts in the path, expected 5, actual %d, '%s'", len(parts), path)
+ if len(parts) != 5 && len(parts) != 6 {
+ return NamespacedNameUID{}, fmt.Errorf("incorrect number of parts in the path, expected 5 or 6, actual %d, '%s'", len(parts), path)
}
if parts[0] != "" {
return NamespacedNameUID{}, fmt.Errorf("text before the first slash, actual '%s'", path)
@@ -51,11 +64,14 @@ func Parse(path string) (NamespacedNameUID, error) {
if parts[1] != prefix {
return NamespacedNameUID{}, fmt.Errorf("incorrect prefix, expected '%s', actual '%s'", prefix, path)
}
+
return NamespacedNameUID{
NamespacedName: types.NamespacedName{
Namespace: parts[2],
Name: parts[3],
},
- UID: types.UID(parts[4]),
+ UID: types.UID(parts[4]),
+ IsReply: len(parts) == 6 && parts[5] == replySuffix,
+ IsDLS: len(parts) == 6 && parts[5] == dlsSuffix,
}, nil
}
diff --git a/pkg/reconciler/sugar/trigger/path/path_test.go b/pkg/reconciler/sugar/trigger/path/path_test.go
new file mode 100644
index 00000000000..1215571dcbf
--- /dev/null
+++ b/pkg/reconciler/sugar/trigger/path/path_test.go
@@ -0,0 +1,83 @@
+/*
+Copyright 2023 The Knative Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package path
+
+import (
+ "testing"
+
+ "github.com/google/go-cmp/cmp"
+ "k8s.io/apimachinery/pkg/types"
+)
+
+func TestParse(t *testing.T) {
+ tests := []struct {
+ name string
+ path string
+ want NamespacedNameUID
+ wantErr bool
+ }{
+ {
+ path: "/triggers/namespace/name/uid",
+ want: NamespacedNameUID{
+ NamespacedName: types.NamespacedName{
+ Name: "name",
+ Namespace: "namespace",
+ },
+ UID: "uid",
+ IsReply: false,
+ IsDLS: false,
+ },
+ },
+ {
+ path: "/triggers/namespace/name/uid/reply",
+ want: NamespacedNameUID{
+ NamespacedName: types.NamespacedName{
+ Name: "name",
+ Namespace: "namespace",
+ },
+ UID: "uid",
+ IsReply: true,
+ IsDLS: false,
+ },
+ },
+ {
+ path: "/triggers/namespace/name/uid/dls",
+ want: NamespacedNameUID{
+ NamespacedName: types.NamespacedName{
+ Name: "name",
+ Namespace: "namespace",
+ },
+ UID: "uid",
+ IsReply: false,
+ IsDLS: true,
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := Parse(tt.path)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+
+ if diff := cmp.Diff(tt.want, got); diff != "" {
+ t.Error("unexpected diff (-want, +got) =", diff)
+ }
+ })
+ }
+}
diff --git a/test/auth/features/oidc/broker.go b/test/auth/features/oidc/broker.go
index 17d0c4b3750..7069d0c058d 100644
--- a/test/auth/features/oidc/broker.go
+++ b/test/auth/features/oidc/broker.go
@@ -18,16 +18,29 @@ package oidc
import (
"github.com/cloudevents/sdk-go/v2/test"
+ "github.com/google/uuid"
"knative.dev/eventing/test/rekt/resources/broker"
+ "knative.dev/eventing/test/rekt/resources/delivery"
"knative.dev/eventing/test/rekt/resources/trigger"
- v1 "knative.dev/pkg/apis/duck/v1"
+ duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/reconciler-test/pkg/eventshub"
eventassert "knative.dev/reconciler-test/pkg/eventshub/assert"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/resources/service"
)
-func BrokerSendEventWithOIDCToken() *feature.Feature {
+func BrokerSendEventWithOIDC() *feature.FeatureSet {
+ return &feature.FeatureSet{
+ Name: "Broker send events with OIDC support",
+ Features: []*feature.Feature{
+ BrokerSendEventWithOIDCTokenToSubscriber(),
+ BrokerSendEventWithOIDCTokenToReply(),
+ BrokerSendEventWithOIDCTokenToDLS(),
+ },
+ }
+}
+
+func BrokerSendEventWithOIDCTokenToSubscriber() *feature.Feature {
f := feature.NewFeatureNamed("Broker supports flow with OIDC tokens")
source := feature.MakeRandomK8sName("source")
@@ -53,7 +66,7 @@ func BrokerSendEventWithOIDCToken() *feature.Feature {
f.Setup("install trigger", trigger.Install(
triggerName,
brokerName,
- trigger.WithSubscriberFromDestination(&v1.Destination{
+ trigger.WithSubscriberFromDestination(&duckv1.Destination{
Ref: service.AsKReference(sink),
Audience: &sinkAudience,
}),
@@ -72,3 +85,110 @@ func BrokerSendEventWithOIDCToken() *feature.Feature {
return f
}
+
+func BrokerSendEventWithOIDCTokenToDLS() *feature.Feature {
+ f := feature.NewFeature()
+
+ brokerName := feature.MakeRandomK8sName("broker")
+ dls := feature.MakeRandomK8sName("dls")
+ triggerName := feature.MakeRandomK8sName("trigger")
+ source := feature.MakeRandomK8sName("source")
+ dlsAudience := "dls-audience"
+
+ event := test.FullEvent()
+ event.SetID(uuid.New().String())
+
+ // Install DLS sink
+ f.Setup("install dead letter sink", eventshub.Install(dls,
+ eventshub.OIDCReceiverAudience(dlsAudience),
+ eventshub.StartReceiver))
+
+ // Install broker with DLS config
+ brokerConfig := append(
+ broker.WithEnvConfig(),
+ delivery.WithDeadLetterSinkFromDestination(&duckv1.Destination{
+ Ref: service.AsKReference(dls),
+ Audience: &dlsAudience,
+ }),
+ )
+ f.Setup("install broker", broker.Install(brokerName, brokerConfig...))
+ f.Setup("Broker is ready", broker.IsReady(brokerName))
+
+ // Install Trigger
+ f.Setup("install trigger", trigger.Install(triggerName, brokerName,
+ trigger.WithSubscriber(nil, "bad://uri")))
+ f.Setup("trigger is ready", trigger.IsReady(triggerName))
+
+ // Send events after data plane is ready.
+ f.Requirement("install source", eventshub.Install(source,
+ eventshub.StartSenderToResource(broker.GVR(), brokerName),
+ eventshub.InputEvent(event),
+ ))
+
+ // Assert events ended up where we expected.
+ f.Stable("broker with DLS").
+ Must("deliver event to DLQ", eventassert.OnStore(dls).MatchReceivedEvent(test.HasId(event.ID())).AtLeast(1))
+
+ return f
+}
+
+func BrokerSendEventWithOIDCTokenToReply() *feature.Feature {
+ f := feature.NewFeature()
+
+ brokerName := feature.MakeRandomK8sName("broker")
+ subscriber := feature.MakeRandomK8sName("subscriber")
+ reply := feature.MakeRandomK8sName("reply")
+ triggerName := feature.MakeRandomK8sName("trigger")
+ helperTriggerName := feature.MakeRandomK8sName("helper-trigger")
+ source := feature.MakeRandomK8sName("source")
+
+ event := test.FullEvent()
+ event.SetID(uuid.New().String())
+
+ replyEventType := "reply-type"
+ replyEventSource := "reply-source"
+
+ // Install subscriber
+ f.Setup("install subscriber", eventshub.Install(subscriber,
+ eventshub.ReplyWithTransformedEvent(replyEventType, replyEventSource, ""),
+ eventshub.StartReceiver))
+
+ // Install sink for reply
+ // Hint: we don't need to require OIDC auth at the reply sink, because the
+ // actual reply is sent to the broker ingress, which must support OIDC. This
+ // reply sink is only to check that the reply as sent and routed correctly.
+ f.Setup("install sink for reply", eventshub.Install(reply,
+ eventshub.StartReceiver))
+
+ // Install broker
+ f.Setup("install broker", broker.Install(brokerName, broker.WithEnvConfig()...))
+ f.Setup("Broker is ready", broker.IsReady(brokerName))
+
+ // Install Trigger
+ f.Setup("install trigger", trigger.Install(triggerName, brokerName,
+ trigger.WithSubscriber(service.AsKReference(subscriber), ""),
+ trigger.WithFilter(map[string]string{
+ "type": event.Type(),
+ })))
+ f.Setup("trigger is ready", trigger.IsReady(triggerName))
+
+ // Install helper trigger to route replys to reply-sink
+ f.Setup("install helper trigger", trigger.Install(helperTriggerName, brokerName,
+ trigger.WithSubscriber(service.AsKReference(reply), ""),
+ trigger.WithFilter(map[string]string{
+ "type": replyEventType,
+ })))
+ f.Setup("helper trigger is ready", trigger.IsReady(helperTriggerName))
+
+ // Send events after data plane is ready.
+ f.Requirement("install source", eventshub.Install(source,
+ eventshub.StartSenderToResource(broker.GVR(), brokerName),
+ eventshub.InputEvent(event),
+ ))
+
+ // Assert events ended up where we expected.
+ f.Stable("broker with reply").
+ Must("deliver event to reply sink", eventassert.OnStore(reply).MatchReceivedEvent(test.HasSource(replyEventSource)).AtLeast(1))
+
+ return f
+}
diff --git a/test/auth/oidc_test.go b/test/auth/oidc_test.go
index 5a6797d9b9a..218db1bdcac 100644
--- a/test/auth/oidc_test.go
+++ b/test/auth/oidc_test.go
@@ -57,7 +57,20 @@ func TestBrokerSupportsOIDC(t *testing.T) {
env.Prerequisite(ctx, t, brokerfeatures.GoesReady(name, broker.WithEnvConfig()...))
env.TestSet(ctx, t, oidc.AddressableOIDCConformance(broker.GVR(), "Broker", name, env.Namespace()))
- env.Test(ctx, t, oidc.BrokerSendEventWithOIDCToken())
+}
+
+func TestBrokerSendsEventsWithOIDCSupport(t *testing.T) {
+ t.Parallel()
+
+ ctx, env := global.Environment(
+ knative.WithKnativeNamespace(system.Namespace()),
+ knative.WithLoggingConfig,
+ knative.WithTracingConfig,
+ k8s.WithEventListener,
+ environment.Managed(t),
+ )
+
+ env.TestSet(ctx, t, oidc.BrokerSendEventWithOIDC())
}
func TestChannelImplSupportsOIDC(t *testing.T) {
diff --git a/test/rekt/resources/broker/broker.yaml b/test/rekt/resources/broker/broker.yaml
index a393aabe72f..ec73900e5bb 100644
--- a/test/rekt/resources/broker/broker.yaml
+++ b/test/rekt/resources/broker/broker.yaml
@@ -57,6 +57,13 @@ spec:
{{ if .delivery.deadLetterSink.uri }}
uri: {{ .delivery.deadLetterSink.uri }}
{{ end }}
+ {{ if .delivery.deadLetterSink.CACerts }}
+ CACerts: |-
+ {{ .delivery.deadLetterSink.CACerts }}
+ {{ end }}
+ {{ if .delivery.deadLetterSink.audience }}
+ audience: {{ .delivery.deadLetterSink.audience }}
+ {{ end }}
{{ end }}
{{ if .delivery.retry }}
retry: {{ .delivery.retry}}
diff --git a/test/rekt/resources/delivery/delivery.go b/test/rekt/resources/delivery/delivery.go
index a66d94cea50..8348e5647d7 100644
--- a/test/rekt/resources/delivery/delivery.go
+++ b/test/rekt/resources/delivery/delivery.go
@@ -17,6 +17,8 @@ limitations under the License.
package delivery
import (
+ "strings"
+
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/reconciler-test/pkg/manifest"
@@ -52,6 +54,51 @@ func WithDeadLetterSink(ref *duckv1.KReference, uri string) manifest.CfgFn {
}
}
+// WithDeadLetterSink adds the dead letter sink related config to the config.
+func WithDeadLetterSinkFromDestination(dest *duckv1.Destination) manifest.CfgFn {
+ return func(cfg map[string]interface{}) {
+ if _, set := cfg["delivery"]; !set {
+ cfg["delivery"] = map[string]interface{}{}
+ }
+
+ delivery := cfg["delivery"].(map[string]interface{})
+ if _, set := delivery["deadLetterSink"]; !set {
+ delivery["deadLetterSink"] = map[string]interface{}{}
+ }
+
+ uri := dest.URI
+ ref := dest.Ref
+
+ dls := delivery["deadLetterSink"].(map[string]interface{})
+ if uri != nil {
+ dls["uri"] = uri.String()
+ }
+
+ if ref != nil {
+ if _, set := dls["ref"]; !set {
+ dls["ref"] = map[string]interface{}{}
+ }
+ dref := dls["ref"].(map[string]interface{})
+ dref["apiVersion"] = ref.APIVersion
+ dref["kind"] = ref.Kind
+ if ref.Namespace != "" {
+ dref["namespace"] = ref.Namespace
+ }
+ dref["name"] = ref.Name
+ }
+
+ if dest.CACerts != nil {
+ // This is a multi-line string and should be indented accordingly.
+ // Replace "new line" with "new line + spaces".
+ dls["CACerts"] = strings.ReplaceAll(*dest.CACerts, "\n", "\n ")
+ }
+
+ if dest.Audience != nil {
+ dls["audience"] = *dest.Audience
+ }
+ }
+}
+
// WithRetry adds the retry related config to the config.
func WithRetry(count int32, backoffPolicy *eventingv1.BackoffPolicyType, backoffDelay *string) manifest.CfgFn {
return func(cfg map[string]interface{}) {
diff --git a/test/rekt/resources/delivery/delivery.yaml b/test/rekt/resources/delivery/delivery.yaml
index 1da57349cb8..06d9175ecf2 100644
--- a/test/rekt/resources/delivery/delivery.yaml
+++ b/test/rekt/resources/delivery/delivery.yaml
@@ -31,6 +31,13 @@ spec:
{{ if .delivery.deadLetterSink.uri }}
uri: {{ .delivery.deadLetterSink.uri }}
{{ end }}
+ {{ if .delivery.deadLetterSink.CACerts }}
+ CACerts: |-
+ {{ .delivery.deadLetterSink.CACerts }}
+ {{ end }}
+ {{ if .delivery.deadLetterSink.audience }}
+ audience: {{ .delivery.deadLetterSink.audience }}
+ {{ end }}
{{ end }}
{{ if .delivery.retry }}
retry: {{ .delivery.retry}}