From 20e549596e994dcfe41c7aac20d895f8755b004d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Wed, 4 Dec 2024 16:30:45 +0100 Subject: [PATCH] [release-1.15] Make adapter pkg independent from brokerfilter package (#8373) Make adapter independent from brokerfilter package --- pkg/adapter/apiserver/adapter.go | 3 +- pkg/adapter/apiserver/adapter_test.go | 5 +- pkg/adapter/apiserver/delegate_test.go | 5 +- pkg/broker/filter/filter_handler.go | 69 +---------------- pkg/broker/filter/filter_handler_test.go | 2 +- pkg/eventfilter/subscriptionsapi/create.go | 87 ++++++++++++++++++++++ 6 files changed, 96 insertions(+), 75 deletions(-) create mode 100644 pkg/eventfilter/subscriptionsapi/create.go diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index aba3301306c..cda3e617e44 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -34,7 +34,6 @@ import ( "knative.dev/eventing/pkg/adapter/v2" v1 "knative.dev/eventing/pkg/apis/sources/v1" - brokerfilter "knative.dev/eventing/pkg/broker/filter" "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" ) @@ -73,7 +72,7 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er logger: a.logger, ref: a.config.EventMode == v1.ReferenceMode, apiServerSourceName: a.name, - filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...), + filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...), } if a.config.ResourceOwner != nil { a.logger.Infow("will be filtered", diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 93b33df0bb0..8c931170f60 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -35,7 +35,6 @@ import ( kubetesting "k8s.io/client-go/testing" adaptertest "knative.dev/eventing/pkg/adapter/v2/test" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" - brokerfilter "knative.dev/eventing/pkg/broker/filter" "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" rectesting "knative.dev/eventing/pkg/reconciler/testing" "knative.dev/pkg/logging" @@ -299,7 +298,7 @@ func makeResourceAndTestingClient() (*resourceDelegate, *adaptertest.TestCloudEv source: "unit-test", apiServerSourceName: apiServerSourceNameTest, logger: logger, - filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...), + filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...), }, ce } @@ -313,6 +312,6 @@ func makeRefAndTestingClient() (*resourceDelegate, *adaptertest.TestCloudEventsC apiServerSourceName: apiServerSourceNameTest, logger: zap.NewExample().Sugar(), ref: true, - filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...), + filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...), }, ce } diff --git a/pkg/adapter/apiserver/delegate_test.go b/pkg/adapter/apiserver/delegate_test.go index 00fc9dfe691..d2d978b1f5a 100644 --- a/pkg/adapter/apiserver/delegate_test.go +++ b/pkg/adapter/apiserver/delegate_test.go @@ -22,7 +22,6 @@ import ( adaptertest "knative.dev/eventing/pkg/adapter/v2/test" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/apis/sources" - brokerfilter "knative.dev/eventing/pkg/broker/filter" "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" ) @@ -87,7 +86,7 @@ func TestFilterFails(t *testing.T) { source: "unit-test", apiServerSourceName: apiServerSourceNameTest, logger: logger, - filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), filters)...), + filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), filters)...), } delegate.Update(simplePod("unit", "test")) @@ -104,7 +103,7 @@ func TestEmptyFiltersList(t *testing.T) { source: "unit-test", apiServerSourceName: apiServerSourceNameTest, logger: logger, - filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), filters)...), + filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), filters)...), } delegate.Update(simplePod("unit", "test")) diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 8ea2565ebfa..ab61e11f75e 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -108,7 +108,7 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcT return } logger.Debug("Adding filter to filtersMap") - fm.Set(trigger, createSubscriptionsAPIFilters(logger, trigger)) + fm.Set(trigger, subscriptionsapi.CreateSubscriptionsAPIFilters(logger, trigger.Spec.Filters)) kncloudevents.AddOrUpdateAddressableHandler(clientConfig, duckv1.Addressable{ URL: trigger.Status.SubscriberURI, CACerts: trigger.Status.SubscriberCACerts, @@ -120,7 +120,7 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcT return } logger.Debug("Updating filter in filtersMap") - fm.Set(trigger, createSubscriptionsAPIFilters(logger, trigger)) + fm.Set(trigger, subscriptionsapi.CreateSubscriptionsAPIFilters(logger, trigger.Spec.Filters)) kncloudevents.AddOrUpdateAddressableHandler(clientConfig, duckv1.Addressable{ URL: trigger.Status.SubscriberURI, CACerts: trigger.Status.SubscriberCACerts, @@ -581,70 +581,7 @@ func (h *Handler) filterEvent(ctx context.Context, trigger *eventingv1.Trigger, } func applySubscriptionsAPIFilters(ctx context.Context, trigger *eventingv1.Trigger, event cloudevents.Event) eventfilter.FilterResult { - return createSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trigger).Filter(ctx, event) -} - -func createSubscriptionsAPIFilters(logger *zap.Logger, trigger *eventingv1.Trigger) eventfilter.Filter { - if len(trigger.Spec.Filters) == 0 { - logger.Debug("Found no filters for trigger", zap.Any("trigger.Spec", trigger.Spec)) - return subscriptionsapi.NewNoFilter() - } - return subscriptionsapi.NewAllFilter(MaterializeFiltersList(logger, trigger.Spec.Filters)...) -} - -func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.SubscriptionsAPIFilter) eventfilter.Filter { - var materializedFilter eventfilter.Filter - var err error - switch { - case len(filter.Exact) > 0: - // The webhook validates that this map has only a single key:value pair. - materializedFilter, err = subscriptionsapi.NewExactFilter(filter.Exact) - if err != nil { - logger.Debug("Invalid exact expression", zap.Any("filters", filter.Exact), zap.Error(err)) - return nil - } - case len(filter.Prefix) > 0: - // The webhook validates that this map has only a single key:value pair. - materializedFilter, err = subscriptionsapi.NewPrefixFilter(filter.Prefix) - if err != nil { - logger.Debug("Invalid prefix expression", zap.Any("filters", filter.Exact), zap.Error(err)) - return nil - } - case len(filter.Suffix) > 0: - // The webhook validates that this map has only a single key:value pair. - materializedFilter, err = subscriptionsapi.NewSuffixFilter(filter.Suffix) - if err != nil { - logger.Debug("Invalid suffix expression", zap.Any("filters", filter.Exact), zap.Error(err)) - return nil - } - case len(filter.All) > 0: - materializedFilter = subscriptionsapi.NewAllFilter(MaterializeFiltersList(logger, filter.All)...) - case len(filter.Any) > 0: - materializedFilter = subscriptionsapi.NewAnyFilter(MaterializeFiltersList(logger, filter.Any)...) - case filter.Not != nil: - materializedFilter = subscriptionsapi.NewNotFilter(materializeSubscriptionsAPIFilter(logger, *filter.Not)) - case filter.CESQL != "": - if materializedFilter, err = subscriptionsapi.NewCESQLFilter(filter.CESQL); err != nil { - // This is weird, CESQL expression should be validated when Trigger's are created. - logger.Debug("Found an Invalid CE SQL expression", zap.String("expression", filter.CESQL)) - return nil - } - } - return materializedFilter -} - -// MaterialzieFilterList allows any component that supports `SubscriptionsAPIFilter` to process them -func MaterializeFiltersList(logger *zap.Logger, filters []eventingv1.SubscriptionsAPIFilter) []eventfilter.Filter { - materializedFilters := make([]eventfilter.Filter, 0, len(filters)) - for _, f := range filters { - f := materializeSubscriptionsAPIFilter(logger, f) - if f == nil { - logger.Warn("Failed to parse filter. Skipping filter.", zap.Any("filter", f)) - continue - } - materializedFilters = append(materializedFilters, f) - } - return materializedFilters + return subscriptionsapi.CreateSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trigger.Spec.Filters).Filter(ctx, event) } func applyAttributesFilter(ctx context.Context, filter *eventingv1.TriggerFilter, event cloudevents.Event) eventfilter.FilterResult { diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index e220e401774..062355015e1 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -651,7 +651,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) { trig.Status.SubscriberURI = url } triggerinformerfake.Get(ctx).Informer().GetStore().Add(trig) - filtersMap.Set(trig, createSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trig)) + filtersMap.Set(trig, subscriptionsapi.CreateSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trig.Spec.Filters)) // create the needed broker object b := &v1.Broker{ diff --git a/pkg/eventfilter/subscriptionsapi/create.go b/pkg/eventfilter/subscriptionsapi/create.go new file mode 100644 index 00000000000..3f56f6e7f0c --- /dev/null +++ b/pkg/eventfilter/subscriptionsapi/create.go @@ -0,0 +1,87 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package subscriptionsapi + +import ( + "go.uber.org/zap" + v1 "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/pkg/eventfilter" +) + +// MaterializeSubscriptionsAPIFilter materializes a SubscriptionsAPIFilter into a runnable Filter. +func MaterializeSubscriptionsAPIFilter(logger *zap.Logger, filter v1.SubscriptionsAPIFilter) eventfilter.Filter { + var materializedFilter eventfilter.Filter + var err error + switch { + case len(filter.Exact) > 0: + // The webhook validates that this map has only a single key:value pair. + materializedFilter, err = NewExactFilter(filter.Exact) + if err != nil { + logger.Debug("Invalid exact expression", zap.Any("filters", filter.Exact), zap.Error(err)) + return nil + } + case len(filter.Prefix) > 0: + // The webhook validates that this map has only a single key:value pair. + materializedFilter, err = NewPrefixFilter(filter.Prefix) + if err != nil { + logger.Debug("Invalid prefix expression", zap.Any("filters", filter.Exact), zap.Error(err)) + return nil + } + case len(filter.Suffix) > 0: + // The webhook validates that this map has only a single key:value pair. + materializedFilter, err = NewSuffixFilter(filter.Suffix) + if err != nil { + logger.Debug("Invalid suffix expression", zap.Any("filters", filter.Exact), zap.Error(err)) + return nil + } + case len(filter.All) > 0: + materializedFilter = NewAllFilter(MaterializeFiltersList(logger, filter.All)...) + case len(filter.Any) > 0: + materializedFilter = NewAnyFilter(MaterializeFiltersList(logger, filter.Any)...) + case filter.Not != nil: + materializedFilter = NewNotFilter(MaterializeSubscriptionsAPIFilter(logger, *filter.Not)) + case filter.CESQL != "": + if materializedFilter, err = NewCESQLFilter(filter.CESQL); err != nil { + // This is weird, CESQL expression should be validated when Trigger's are created. + logger.Debug("Found an Invalid CE SQL expression", zap.String("expression", filter.CESQL)) + return nil + } + } + return materializedFilter +} + +func CreateSubscriptionsAPIFilters(logger *zap.Logger, filters []v1.SubscriptionsAPIFilter) eventfilter.Filter { + if len(filters) == 0 { + logger.Debug("no filters provided") + return NewNoFilter() + } + return NewAllFilter(MaterializeFiltersList(logger, filters)...) +} + +// MaterialzieFilterList allows any component that supports `SubscriptionsAPIFilter` to process them +func MaterializeFiltersList(logger *zap.Logger, filters []v1.SubscriptionsAPIFilter) []eventfilter.Filter { + materializedFilters := make([]eventfilter.Filter, 0, len(filters)) + for _, f := range filters { + f := MaterializeSubscriptionsAPIFilter(logger, f) + if f == nil { + logger.Warn("Failed to parse filter. Skipping filter.", zap.Any("filter", f)) + continue + } + materializedFilters = append(materializedFilters, f) + } + return materializedFilters +}