diff --git a/pkg/channel/fanout/fanout_event_handler.go b/pkg/channel/fanout/fanout_event_handler.go index 6f82030e4ea..ea22d0f17f6 100644 --- a/pkg/channel/fanout/fanout_event_handler.go +++ b/pkg/channel/fanout/fanout_event_handler.go @@ -46,10 +46,11 @@ const ( ) type Subscription struct { - Subscriber duckv1.Addressable - Reply *duckv1.Addressable - DeadLetter *duckv1.Addressable - RetryConfig *kncloudevents.RetryConfig + Subscriber duckv1.Addressable + Reply *duckv1.Addressable + DeadLetter *duckv1.Addressable + RetryConfig *kncloudevents.RetryConfig + ServiceAccount *types.NamespacedName } // Config for a fanout.EventHandler. @@ -130,15 +131,17 @@ func NewFanoutEventHandler( func SubscriberSpecToFanoutConfig(sub eventingduckv1.SubscriberSpec) (*Subscription, error) { destination := duckv1.Addressable{ - URL: sub.SubscriberURI, - CACerts: sub.SubscriberCACerts, + URL: sub.SubscriberURI, + CACerts: sub.SubscriberCACerts, + Audience: sub.SubscriberAudience, } var reply *duckv1.Addressable if sub.ReplyURI != nil { reply = &duckv1.Addressable{ - URL: sub.ReplyURI, - CACerts: sub.ReplyCACerts, + URL: sub.ReplyURI, + CACerts: sub.ReplyCACerts, + Audience: sub.ReplyAudience, } } @@ -146,8 +149,9 @@ func SubscriberSpecToFanoutConfig(sub eventingduckv1.SubscriberSpec) (*Subscript if sub.Delivery != nil && sub.Delivery.DeadLetterSink != nil && sub.Delivery.DeadLetterSink.URI != nil { // Subscription reconcilers resolves the URI. deadLetter = &duckv1.Addressable{ - URL: sub.Delivery.DeadLetterSink.URI, - CACerts: sub.Delivery.DeadLetterSink.CACerts, + URL: sub.Delivery.DeadLetterSink.URI, + CACerts: sub.Delivery.DeadLetterSink.CACerts, + Audience: sub.Delivery.DeadLetterSink.Audience, } } @@ -317,11 +321,18 @@ func (f *FanoutEventHandler) dispatch(ctx context.Context, subs []Subscription, // makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and // the `sink` portions of the subscription. func (f *FanoutEventHandler) makeFanoutRequest(ctx context.Context, event event.Event, additionalHeaders nethttp.Header, sub Subscription) (*kncloudevents.DispatchInfo, error) { - return f.eventDispatcher.SendEvent(ctx, event, sub.Subscriber, + dispatchOptions := []kncloudevents.SendOption{ kncloudevents.WithHeader(additionalHeaders), kncloudevents.WithReply(sub.Reply), kncloudevents.WithDeadLetterSink(sub.DeadLetter), - kncloudevents.WithRetryConfig(sub.RetryConfig)) + kncloudevents.WithRetryConfig(sub.RetryConfig), + } + + if sub.ServiceAccount != nil { + dispatchOptions = append(dispatchOptions, kncloudevents.WithOIDCAuthentication(sub.ServiceAccount)) + } + + return f.eventDispatcher.SendEvent(ctx, event, sub.Subscriber, dispatchOptions...) } type DispatchResult struct { diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index cb1024dec5c..30b494a3fc8 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -88,7 +88,7 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec return nil } - config, err := newConfigForInMemoryChannel(imc) + config, err := newConfigForInMemoryChannel(ctx, imc) if err != nil { logging.FromContext(ctx).Error("Error creating config for in memory channels", zap.Error(err)) return err @@ -209,11 +209,31 @@ func (r *Reconciler) patchSubscriberStatus(ctx context.Context, imc *v1.InMemory } // newConfigForInMemoryChannel creates a new Config for a single inmemory channel. -func newConfigForInMemoryChannel(imc *v1.InMemoryChannel) (*multichannelfanout.ChannelConfig, error) { +func newConfigForInMemoryChannel(ctx context.Context, imc *v1.InMemoryChannel) (*multichannelfanout.ChannelConfig, error) { + featureFlags := feature.FromContext(ctx) + if featureFlags.IsOIDCAuthentication() && len(imc.Spec.Subscribers) != len(imc.Status.Subscribers) { + return nil, fmt.Errorf("the number of subscribers in the status does not match the number of subscribers in the spec") + } subs := make([]fanout.Subscription, len(imc.Spec.Subscribers)) + // we will need to find the subscriber status for the service account, so let's make it easy to search for that + subStats := make(map[types.UID]eventingduckv1.SubscriberStatus, len(imc.Status.Subscribers)) + for _, sub := range imc.Status.Subscribers { + subStats[sub.UID] = sub + } + for i, sub := range imc.Spec.Subscribers { conf, err := fanout.SubscriberSpecToFanoutConfig(sub) + if featureFlags.IsOIDCAuthentication() { + if subStat, ok := subStats[sub.UID]; ok { + conf.ServiceAccount = &types.NamespacedName{ + Name: *subStat.Auth.ServiceAccountName, + Namespace: imc.Namespace, + } + } else { + return nil, fmt.Errorf("no matching subscriber status for a subscriber in the spec field") + } + } if err != nil { return nil, err }