Skip to content

Commit

Permalink
[release-1.15] Make adapter pkg independent from brokerfilter package (
Browse files Browse the repository at this point in the history
…#8373)

Make adapter independent from brokerfilter package
  • Loading branch information
creydr authored Dec 4, 2024
1 parent 329aebc commit 20e5495
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 75 deletions.
3 changes: 1 addition & 2 deletions pkg/adapter/apiserver/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (

"knative.dev/eventing/pkg/adapter/v2"
v1 "knative.dev/eventing/pkg/apis/sources/v1"
brokerfilter "knative.dev/eventing/pkg/broker/filter"
"knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
)

Expand Down Expand Up @@ -73,7 +72,7 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er
logger: a.logger,
ref: a.config.EventMode == v1.ReferenceMode,
apiServerSourceName: a.name,
filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...),
filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...),
}
if a.config.ResourceOwner != nil {
a.logger.Infow("will be filtered",
Expand Down
5 changes: 2 additions & 3 deletions pkg/adapter/apiserver/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
kubetesting "k8s.io/client-go/testing"
adaptertest "knative.dev/eventing/pkg/adapter/v2/test"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
brokerfilter "knative.dev/eventing/pkg/broker/filter"
"knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
rectesting "knative.dev/eventing/pkg/reconciler/testing"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -299,7 +298,7 @@ func makeResourceAndTestingClient() (*resourceDelegate, *adaptertest.TestCloudEv
source: "unit-test",
apiServerSourceName: apiServerSourceNameTest,
logger: logger,
filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...),
filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...),
}, ce
}

Expand All @@ -313,6 +312,6 @@ func makeRefAndTestingClient() (*resourceDelegate, *adaptertest.TestCloudEventsC
apiServerSourceName: apiServerSourceNameTest,
logger: zap.NewExample().Sugar(),
ref: true,
filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...),
filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...),
}, ce
}
5 changes: 2 additions & 3 deletions pkg/adapter/apiserver/delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
adaptertest "knative.dev/eventing/pkg/adapter/v2/test"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/sources"
brokerfilter "knative.dev/eventing/pkg/broker/filter"
"knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
)

Expand Down Expand Up @@ -87,7 +86,7 @@ func TestFilterFails(t *testing.T) {
source: "unit-test",
apiServerSourceName: apiServerSourceNameTest,
logger: logger,
filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), filters)...),
filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), filters)...),
}

delegate.Update(simplePod("unit", "test"))
Expand All @@ -104,7 +103,7 @@ func TestEmptyFiltersList(t *testing.T) {
source: "unit-test",
apiServerSourceName: apiServerSourceNameTest,
logger: logger,
filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), filters)...),
filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), filters)...),
}

delegate.Update(simplePod("unit", "test"))
Expand Down
69 changes: 3 additions & 66 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcT
return
}
logger.Debug("Adding filter to filtersMap")
fm.Set(trigger, createSubscriptionsAPIFilters(logger, trigger))
fm.Set(trigger, subscriptionsapi.CreateSubscriptionsAPIFilters(logger, trigger.Spec.Filters))
kncloudevents.AddOrUpdateAddressableHandler(clientConfig, duckv1.Addressable{
URL: trigger.Status.SubscriberURI,
CACerts: trigger.Status.SubscriberCACerts,
Expand All @@ -120,7 +120,7 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcT
return
}
logger.Debug("Updating filter in filtersMap")
fm.Set(trigger, createSubscriptionsAPIFilters(logger, trigger))
fm.Set(trigger, subscriptionsapi.CreateSubscriptionsAPIFilters(logger, trigger.Spec.Filters))
kncloudevents.AddOrUpdateAddressableHandler(clientConfig, duckv1.Addressable{
URL: trigger.Status.SubscriberURI,
CACerts: trigger.Status.SubscriberCACerts,
Expand Down Expand Up @@ -581,70 +581,7 @@ func (h *Handler) filterEvent(ctx context.Context, trigger *eventingv1.Trigger,
}

func applySubscriptionsAPIFilters(ctx context.Context, trigger *eventingv1.Trigger, event cloudevents.Event) eventfilter.FilterResult {
return createSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trigger).Filter(ctx, event)
}

func createSubscriptionsAPIFilters(logger *zap.Logger, trigger *eventingv1.Trigger) eventfilter.Filter {
if len(trigger.Spec.Filters) == 0 {
logger.Debug("Found no filters for trigger", zap.Any("trigger.Spec", trigger.Spec))
return subscriptionsapi.NewNoFilter()
}
return subscriptionsapi.NewAllFilter(MaterializeFiltersList(logger, trigger.Spec.Filters)...)
}

func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.SubscriptionsAPIFilter) eventfilter.Filter {
var materializedFilter eventfilter.Filter
var err error
switch {
case len(filter.Exact) > 0:
// The webhook validates that this map has only a single key:value pair.
materializedFilter, err = subscriptionsapi.NewExactFilter(filter.Exact)
if err != nil {
logger.Debug("Invalid exact expression", zap.Any("filters", filter.Exact), zap.Error(err))
return nil
}
case len(filter.Prefix) > 0:
// The webhook validates that this map has only a single key:value pair.
materializedFilter, err = subscriptionsapi.NewPrefixFilter(filter.Prefix)
if err != nil {
logger.Debug("Invalid prefix expression", zap.Any("filters", filter.Exact), zap.Error(err))
return nil
}
case len(filter.Suffix) > 0:
// The webhook validates that this map has only a single key:value pair.
materializedFilter, err = subscriptionsapi.NewSuffixFilter(filter.Suffix)
if err != nil {
logger.Debug("Invalid suffix expression", zap.Any("filters", filter.Exact), zap.Error(err))
return nil
}
case len(filter.All) > 0:
materializedFilter = subscriptionsapi.NewAllFilter(MaterializeFiltersList(logger, filter.All)...)
case len(filter.Any) > 0:
materializedFilter = subscriptionsapi.NewAnyFilter(MaterializeFiltersList(logger, filter.Any)...)
case filter.Not != nil:
materializedFilter = subscriptionsapi.NewNotFilter(materializeSubscriptionsAPIFilter(logger, *filter.Not))
case filter.CESQL != "":
if materializedFilter, err = subscriptionsapi.NewCESQLFilter(filter.CESQL); err != nil {
// This is weird, CESQL expression should be validated when Trigger's are created.
logger.Debug("Found an Invalid CE SQL expression", zap.String("expression", filter.CESQL))
return nil
}
}
return materializedFilter
}

// MaterialzieFilterList allows any component that supports `SubscriptionsAPIFilter` to process them
func MaterializeFiltersList(logger *zap.Logger, filters []eventingv1.SubscriptionsAPIFilter) []eventfilter.Filter {
materializedFilters := make([]eventfilter.Filter, 0, len(filters))
for _, f := range filters {
f := materializeSubscriptionsAPIFilter(logger, f)
if f == nil {
logger.Warn("Failed to parse filter. Skipping filter.", zap.Any("filter", f))
continue
}
materializedFilters = append(materializedFilters, f)
}
return materializedFilters
return subscriptionsapi.CreateSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trigger.Spec.Filters).Filter(ctx, event)
}

func applyAttributesFilter(ctx context.Context, filter *eventingv1.TriggerFilter, event cloudevents.Event) eventfilter.FilterResult {
Expand Down
2 changes: 1 addition & 1 deletion pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {
trig.Status.SubscriberURI = url
}
triggerinformerfake.Get(ctx).Informer().GetStore().Add(trig)
filtersMap.Set(trig, createSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trig))
filtersMap.Set(trig, subscriptionsapi.CreateSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trig.Spec.Filters))

// create the needed broker object
b := &v1.Broker{
Expand Down
87 changes: 87 additions & 0 deletions pkg/eventfilter/subscriptionsapi/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright 2024 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package subscriptionsapi

import (
"go.uber.org/zap"
v1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/eventfilter"
)

// MaterializeSubscriptionsAPIFilter materializes a SubscriptionsAPIFilter into a runnable Filter.
func MaterializeSubscriptionsAPIFilter(logger *zap.Logger, filter v1.SubscriptionsAPIFilter) eventfilter.Filter {
var materializedFilter eventfilter.Filter
var err error
switch {
case len(filter.Exact) > 0:
// The webhook validates that this map has only a single key:value pair.
materializedFilter, err = NewExactFilter(filter.Exact)
if err != nil {
logger.Debug("Invalid exact expression", zap.Any("filters", filter.Exact), zap.Error(err))
return nil
}
case len(filter.Prefix) > 0:
// The webhook validates that this map has only a single key:value pair.
materializedFilter, err = NewPrefixFilter(filter.Prefix)
if err != nil {
logger.Debug("Invalid prefix expression", zap.Any("filters", filter.Exact), zap.Error(err))
return nil
}
case len(filter.Suffix) > 0:
// The webhook validates that this map has only a single key:value pair.
materializedFilter, err = NewSuffixFilter(filter.Suffix)
if err != nil {
logger.Debug("Invalid suffix expression", zap.Any("filters", filter.Exact), zap.Error(err))
return nil
}
case len(filter.All) > 0:
materializedFilter = NewAllFilter(MaterializeFiltersList(logger, filter.All)...)
case len(filter.Any) > 0:
materializedFilter = NewAnyFilter(MaterializeFiltersList(logger, filter.Any)...)
case filter.Not != nil:
materializedFilter = NewNotFilter(MaterializeSubscriptionsAPIFilter(logger, *filter.Not))
case filter.CESQL != "":
if materializedFilter, err = NewCESQLFilter(filter.CESQL); err != nil {
// This is weird, CESQL expression should be validated when Trigger's are created.
logger.Debug("Found an Invalid CE SQL expression", zap.String("expression", filter.CESQL))
return nil
}
}
return materializedFilter
}

func CreateSubscriptionsAPIFilters(logger *zap.Logger, filters []v1.SubscriptionsAPIFilter) eventfilter.Filter {
if len(filters) == 0 {
logger.Debug("no filters provided")
return NewNoFilter()
}
return NewAllFilter(MaterializeFiltersList(logger, filters)...)
}

// MaterialzieFilterList allows any component that supports `SubscriptionsAPIFilter` to process them
func MaterializeFiltersList(logger *zap.Logger, filters []v1.SubscriptionsAPIFilter) []eventfilter.Filter {
materializedFilters := make([]eventfilter.Filter, 0, len(filters))
for _, f := range filters {
f := MaterializeSubscriptionsAPIFilter(logger, f)
if f == nil {
logger.Warn("Failed to parse filter. Skipping filter.", zap.Any("filter", f))
continue
}
materializedFilters = append(materializedFilters, f)
}
return materializedFilters
}

0 comments on commit 20e5495

Please sign in to comment.