Skip to content

Commit

Permalink
Channel dispatcher authenticates the requests with OIDC
Browse files Browse the repository at this point in the history
Signed-off-by: Calum Murray <[email protected]>
  • Loading branch information
Cali0707 committed Nov 13, 2023
1 parent 7a645f8 commit 4f864c7
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 14 deletions.
35 changes: 23 additions & 12 deletions pkg/channel/fanout/fanout_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ const (
)

type Subscription struct {
Subscriber duckv1.Addressable
Reply *duckv1.Addressable
DeadLetter *duckv1.Addressable
RetryConfig *kncloudevents.RetryConfig
Subscriber duckv1.Addressable
Reply *duckv1.Addressable
DeadLetter *duckv1.Addressable
RetryConfig *kncloudevents.RetryConfig
ServiceAccount *types.NamespacedName
}

// Config for a fanout.EventHandler.
Expand Down Expand Up @@ -130,24 +131,27 @@ func NewFanoutEventHandler(

func SubscriberSpecToFanoutConfig(sub eventingduckv1.SubscriberSpec) (*Subscription, error) {
destination := duckv1.Addressable{
URL: sub.SubscriberURI,
CACerts: sub.SubscriberCACerts,
URL: sub.SubscriberURI,
CACerts: sub.SubscriberCACerts,
Audience: sub.SubscriberAudience,
}

var reply *duckv1.Addressable
if sub.ReplyURI != nil {
reply = &duckv1.Addressable{
URL: sub.ReplyURI,
CACerts: sub.ReplyCACerts,
URL: sub.ReplyURI,
CACerts: sub.ReplyCACerts,
Audience: sub.ReplyAudience,
}
}

var deadLetter *duckv1.Addressable
if sub.Delivery != nil && sub.Delivery.DeadLetterSink != nil && sub.Delivery.DeadLetterSink.URI != nil {
// Subscription reconcilers resolves the URI.
deadLetter = &duckv1.Addressable{
URL: sub.Delivery.DeadLetterSink.URI,
CACerts: sub.Delivery.DeadLetterSink.CACerts,
URL: sub.Delivery.DeadLetterSink.URI,
CACerts: sub.Delivery.DeadLetterSink.CACerts,
Audience: sub.Delivery.DeadLetterSink.Audience,
}
}

Expand Down Expand Up @@ -317,11 +321,18 @@ func (f *FanoutEventHandler) dispatch(ctx context.Context, subs []Subscription,
// makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and
// the `sink` portions of the subscription.
func (f *FanoutEventHandler) makeFanoutRequest(ctx context.Context, event event.Event, additionalHeaders nethttp.Header, sub Subscription) (*kncloudevents.DispatchInfo, error) {
return f.eventDispatcher.SendEvent(ctx, event, sub.Subscriber,
dispatchOptions := []kncloudevents.SendOption{
kncloudevents.WithHeader(additionalHeaders),
kncloudevents.WithReply(sub.Reply),
kncloudevents.WithDeadLetterSink(sub.DeadLetter),
kncloudevents.WithRetryConfig(sub.RetryConfig))
kncloudevents.WithRetryConfig(sub.RetryConfig),
}

if sub.ServiceAccount != nil {
dispatchOptions = append(dispatchOptions, kncloudevents.WithOIDCAuthentication(sub.ServiceAccount))
}

return f.eventDispatcher.SendEvent(ctx, event, sub.Subscriber, dispatchOptions...)
}

type DispatchResult struct {
Expand Down
24 changes: 22 additions & 2 deletions pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec
return nil
}

config, err := newConfigForInMemoryChannel(imc)
config, err := newConfigForInMemoryChannel(ctx, imc)
if err != nil {
logging.FromContext(ctx).Error("Error creating config for in memory channels", zap.Error(err))
return err
Expand Down Expand Up @@ -209,11 +209,31 @@ func (r *Reconciler) patchSubscriberStatus(ctx context.Context, imc *v1.InMemory
}

// newConfigForInMemoryChannel creates a new Config for a single inmemory channel.
func newConfigForInMemoryChannel(imc *v1.InMemoryChannel) (*multichannelfanout.ChannelConfig, error) {
func newConfigForInMemoryChannel(ctx context.Context, imc *v1.InMemoryChannel) (*multichannelfanout.ChannelConfig, error) {
featureFlags := feature.FromContext(ctx)
if featureFlags.IsOIDCAuthentication() && len(imc.Spec.Subscribers) != len(imc.Status.Subscribers) {
return nil, fmt.Errorf("the number of subscribers in the status does not match the number of subscribers in the spec")
}
subs := make([]fanout.Subscription, len(imc.Spec.Subscribers))

// we will need to find the subscriber status for the service account, so let's make it easy to search for that
subStats := make(map[types.UID]eventingduckv1.SubscriberStatus, len(imc.Status.Subscribers))
for _, sub := range imc.Status.Subscribers {
subStats[sub.UID] = sub
}

for i, sub := range imc.Spec.Subscribers {
conf, err := fanout.SubscriberSpecToFanoutConfig(sub)
if featureFlags.IsOIDCAuthentication() {
if subStat, ok := subStats[sub.UID]; ok {
conf.ServiceAccount = &types.NamespacedName{
Name: *subStat.Auth.ServiceAccountName,
Namespace: imc.Namespace,
}
} else {
return nil, fmt.Errorf("no matching subscriber status for a subscriber in the spec field")
}
}
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 4f864c7

Please sign in to comment.