diff --git a/pkg/event/registry.go b/pkg/event/registry.go index 3f6b996ae..1d20474cd 100644 --- a/pkg/event/registry.go +++ b/pkg/event/registry.go @@ -40,15 +40,25 @@ type Registry struct { var logger = log.Logger{Logger: logf.Log.WithName("EventRegistry")} -// NewRegistry creates a new registry with the given name, typically referencing the owner, to manage event -// Handlers that match the given networkPlugin name. -func NewRegistry(name, networkPlugin string) *Registry { - return &Registry{ +// NewRegistry creates a new registry with the given name, typically referencing the owner, to manage event +// Handlers that match the given networkPlugin name. The given event Handlers whose associated network plugin matches the given +// networkPlugin name are added. Non-matching Handlers are ignored. Handlers will be called in registration order. +func NewRegistry(name, networkPlugin string, eventHandlers ...Handler) (*Registry, error) { + r := &Registry{ name: name, networkPlugin: strings.ToLower(networkPlugin), eventHandlers: []Handler{}, remoteEndpointTimeStamp: map[string]v1.Time{}, } + + for _, eventHandler := range eventHandlers { + err := r.addHandler(eventHandler) + if err != nil { + return nil, err + } + } + + return r, nil } // GetName returns the name of the registry. @@ -78,19 +88,6 @@ func (er *Registry) addHandler(eventHandler Handler) error { return nil } -// AddHandlers adds the given event Handlers whose associated network plugin matches the network plugin -// associated with this registry. Non-matching Handlers are ignored. Handlers will be called in registration order. -func (er *Registry) AddHandlers(eventHandlers ...Handler) error { - for _, eventHandler := range eventHandlers { - err := er.addHandler(eventHandler) - if err != nil { - return err - } - } - - return nil -} - func (er *Registry) SetHandlerState(handlerState HandlerState) { _ = er.invokeHandlers("SetHandlerState", func(h Handler) error { h.SetState(handlerState) diff --git a/pkg/event/registry_test.go b/pkg/event/registry_test.go index 0cefdd14a..7117e2286 100644 --- a/pkg/event/registry_test.go +++ b/pkg/event/registry_test.go @@ -45,7 +45,6 @@ var _ = Describe("Event Registry", func() { BeforeEach(func() { allTestEvents = make(chan testing.TestEvent, 1000) - registry = event.NewRegistry("test-registry", npGenericKubeproxyIptables) nonMatchingHandlers = []*testing.TestHandler{ testing.NewTestHandler("ovn-handler", cni.OVNKubernetes, allTestEvents), @@ -57,8 +56,10 @@ var _ = Describe("Event Registry", func() { testing.NewTestHandler("wildcard-handler", event.AnyNetworkPlugin, allTestEvents), } - err := registry.AddHandlers(logger.NewHandler(), matchingHandlers[0], nonMatchingHandlers[0], matchingHandlers[1], - matchingHandlers[2]) + var err error + + registry, err = event.NewRegistry("test-registry", npGenericKubeproxyIptables, logger.NewHandler(), matchingHandlers[0], + nonMatchingHandlers[0], matchingHandlers[1], matchingHandlers[2]) Expect(err).NotTo(HaveOccurred()) }) @@ -144,8 +145,7 @@ var _ = Describe("Event Registry", func() { When("SetHandlerState is called on the registry", func() { It("should invoke SetState on the handlers", func() { h := testing.NewTestHandler("test", event.AnyNetworkPlugin, nil) - registry := event.NewRegistry("test-registry", event.AnyNetworkPlugin) - err := registry.AddHandlers(h) + registry, err := event.NewRegistry("test-registry", event.AnyNetworkPlugin, h) Expect(err).NotTo(HaveOccurred()) registry.SetHandlerState(&testing.TestHandlerState{Gateway: true}) diff --git a/pkg/event/testing/controller_support.go b/pkg/event/testing/controller_support.go index 414f33177..415c24c3a 100644 --- a/pkg/event/testing/controller_support.go +++ b/pkg/event/testing/controller_support.go @@ -62,8 +62,8 @@ func NewControllerSupport() *ControllerSupport { func (c *ControllerSupport) Start(handler event.Handler) { stopCh := make(chan struct{}) - registry := event.NewRegistry("test-registry", handler.GetNetworkPlugins()[0]) - Expect(registry.AddHandlers(handler)).To(Succeed()) + registry, err := event.NewRegistry("test-registry", handler.GetNetworkPlugins()[0], handler) + Expect(err).To(Succeed()) config := controller.Config{ RestMapper: test.GetRESTMapperFor(&corev1.Node{}, &submV1.Endpoint{}), diff --git a/pkg/routeagent_driver/main.go b/pkg/routeagent_driver/main.go index dc68c5803..95f3fdc33 100644 --- a/pkg/routeagent_driver/main.go +++ b/pkg/routeagent_driver/main.go @@ -90,32 +90,22 @@ func main() { var env environment.Specification err := envconfig.Process("submariner", &env) - if err != nil { - logger.Fatalf("Error reading the environment variables: %s", err.Error()) - } + logger.FatalOnError(err, "Error reading the environment variables") cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) - if err != nil { - logger.Fatalf("Error building kubeconfig: %s", err.Error()) - } + logger.FatalOnError(err, "Error building kubeconfig") k8sClientSet, err := kubernetes.NewForConfig(cfg) - if err != nil { - logger.Fatalf("Error building clientset: %s", err.Error()) - } + logger.FatalOnError(err, "Error building clientset") dynamicClientSet, err := dynamic.NewForConfig(cfg) - if err != nil { - logger.Fatalf("Error building dynamic client: %s", err.Error()) - } + logger.FatalOnError(err, "Error building dynamic clientr") err = v1.AddToScheme(scheme.Scheme) logger.FatalOnError(err, "Error adding submariner to the scheme") smClientset, err := submarinerClientset.NewForConfig(cfg) - if err != nil { - logger.Fatalf("Error building submariner clientset: %s", err.Error()) - } + logger.FatalOnError(err, "Error building submariner clientset") if env.WaitForNode { waitForNodeReady(k8sClientSet) @@ -131,8 +121,7 @@ func main() { config := &watcher.Config{RestConfig: cfg} - registry := event.NewRegistry("routeagent_driver", np) - if err := registry.AddHandlers( + registry, err := event.NewRegistry("routeagent_driver", np, eventlogger.NewHandler(), kubeproxy.NewSyncHandler(env.ClusterCidr, env.ServiceCidr), ovn.NewHandler(&ovn.HandlerConfig{ @@ -149,10 +138,9 @@ func main() { cabledriver.NewXRFMCleanupHandler(), cabledriver.NewVXLANCleanup(), mtu.NewMTUHandler(env.ClusterCidr, len(env.GlobalCidr) != 0, getTCPMssValue(k8sClientSet)), - calico.NewCalicoIPPoolHandler(cfg), - ); err != nil { - logger.Fatalf("Error registering the handlers: %s", err.Error()) - } + calico.NewCalicoIPPoolHandler(cfg)) + + logger.FatalOnError(err, "Error registering the handlers") if env.Uninstall { uninstall(k8sClientSet, registry) @@ -169,14 +157,10 @@ func main() { MasterURL: masterURL, Kubeconfig: kubeconfig, }) - if err != nil { - logger.Fatalf("Error creating controller for event handling %v", err) - } + logger.FatalOnError(err, "Error creating controller for event handling") err = ctl.Start(stopCh) - if err != nil { - logger.Fatalf("Error starting controller: %v", err) - } + logger.FatalOnError(err, "Error starting controller") <-stopCh ctl.Stop()