Skip to content

Commit

Permalink
Register event listeners in KafkaChannel controller and register cust…
Browse files Browse the repository at this point in the history
…om condition set (knative-extensions#1223)
  • Loading branch information
aliok authored Sep 8, 2021
1 parent f76d520 commit 737b6d2
Showing 1 changed file with 26 additions and 0 deletions.
26 changes: 26 additions & 0 deletions control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,22 @@ import (
"context"

"github.com/Shopify/sarama"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"

messagingv1beta "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
kafkachannelinformer "knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel"
kafkachannelreconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"

kubeclient "knative.dev/pkg/client/injection/kube/client"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"

"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/resolver"

"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
Expand All @@ -38,6 +44,8 @@ import (

func NewController(ctx context.Context, watcher configmap.Watcher, configs *config.Env) *controller.Impl {

messagingv1beta.RegisterAlternateKafkaChannelConditionSet(base.ConditionSet)

configmapInformer := configmapinformer.Get(ctx)

reconciler := &Reconciler{
Expand All @@ -57,8 +65,22 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
ConfigMapLister: configmapInformer.Lister(),
}

logger := logging.FromContext(ctx)

_, err := reconciler.GetOrCreateDataPlaneConfigMap(ctx)
if err != nil {
logger.Fatal("Failed to get or create data plane config map",
zap.String("configmap", configs.DataPlaneConfigMapAsString()),
zap.Error(err),
)
}

impl := kafkachannelreconciler.NewImpl(ctx, reconciler)

channelInformer := kafkachannelinformer.Get(ctx)

channelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker)

reconciler.SecretTracker = impl.Tracker
Expand All @@ -75,5 +97,9 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
),
))

channelInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: reconciler.OnDeleteObserver,
})

return impl
}

0 comments on commit 737b6d2

Please sign in to comment.