Skip to content

Commit

Permalink
Pass Handlers to NewRegistry and remove AddHandlers
Browse files Browse the repository at this point in the history
This ensures all handlers are registered before starting the controller.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis authored and skitt committed Nov 22, 2023
1 parent 95059bb commit fd07457
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 51 deletions.
31 changes: 14 additions & 17 deletions pkg/event/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,25 @@ type Registry struct {

var logger = log.Logger{Logger: logf.Log.WithName("EventRegistry")}

// NewRegistry creates a new registry with the given name, typically referencing the owner, to manage event
// Handlers that match the given networkPlugin name.
func NewRegistry(name, networkPlugin string) *Registry {
return &Registry{
// NewRegistry creates a new registry with the given name, typically referencing the owner, to manage event
// Handlers that match the given networkPlugin name. The given event Handlers whose associated network plugin matches the given
// networkPlugin name are added. Non-matching Handlers are ignored. Handlers will be called in registration order.
func NewRegistry(name, networkPlugin string, eventHandlers ...Handler) (*Registry, error) {
r := &Registry{
name: name,
networkPlugin: strings.ToLower(networkPlugin),
eventHandlers: []Handler{},
remoteEndpointTimeStamp: map[string]v1.Time{},
}

for _, eventHandler := range eventHandlers {
err := r.addHandler(eventHandler)
if err != nil {
return nil, err
}
}

return r, nil
}

// GetName returns the name of the registry.
Expand Down Expand Up @@ -78,19 +88,6 @@ func (er *Registry) addHandler(eventHandler Handler) error {
return nil
}

// AddHandlers adds the given event Handlers whose associated network plugin matches the network plugin
// associated with this registry. Non-matching Handlers are ignored. Handlers will be called in registration order.
func (er *Registry) AddHandlers(eventHandlers ...Handler) error {
for _, eventHandler := range eventHandlers {
err := er.addHandler(eventHandler)
if err != nil {
return err
}
}

return nil
}

func (er *Registry) SetHandlerState(handlerState HandlerState) {
_ = er.invokeHandlers("SetHandlerState", func(h Handler) error {
h.SetState(handlerState)
Expand Down
10 changes: 5 additions & 5 deletions pkg/event/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ var _ = Describe("Event Registry", func() {

BeforeEach(func() {
allTestEvents = make(chan testing.TestEvent, 1000)
registry = event.NewRegistry("test-registry", npGenericKubeproxyIptables)

nonMatchingHandlers = []*testing.TestHandler{
testing.NewTestHandler("ovn-handler", cni.OVNKubernetes, allTestEvents),
Expand All @@ -57,8 +56,10 @@ var _ = Describe("Event Registry", func() {
testing.NewTestHandler("wildcard-handler", event.AnyNetworkPlugin, allTestEvents),
}

err := registry.AddHandlers(logger.NewHandler(), matchingHandlers[0], nonMatchingHandlers[0], matchingHandlers[1],
matchingHandlers[2])
var err error

registry, err = event.NewRegistry("test-registry", npGenericKubeproxyIptables, logger.NewHandler(), matchingHandlers[0],
nonMatchingHandlers[0], matchingHandlers[1], matchingHandlers[2])
Expect(err).NotTo(HaveOccurred())
})

Expand Down Expand Up @@ -144,8 +145,7 @@ var _ = Describe("Event Registry", func() {
When("SetHandlerState is called on the registry", func() {
It("should invoke SetState on the handlers", func() {
h := testing.NewTestHandler("test", event.AnyNetworkPlugin, nil)
registry := event.NewRegistry("test-registry", event.AnyNetworkPlugin)
err := registry.AddHandlers(h)
registry, err := event.NewRegistry("test-registry", event.AnyNetworkPlugin, h)
Expect(err).NotTo(HaveOccurred())

registry.SetHandlerState(&testing.TestHandlerState{Gateway: true})
Expand Down
4 changes: 2 additions & 2 deletions pkg/event/testing/controller_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func NewControllerSupport() *ControllerSupport {
func (c *ControllerSupport) Start(handler event.Handler) {
stopCh := make(chan struct{})

registry := event.NewRegistry("test-registry", handler.GetNetworkPlugins()[0])
Expect(registry.AddHandlers(handler)).To(Succeed())
registry, err := event.NewRegistry("test-registry", handler.GetNetworkPlugins()[0], handler)
Expect(err).To(Succeed())

config := controller.Config{
RestMapper: test.GetRESTMapperFor(&corev1.Node{}, &submV1.Endpoint{}),
Expand Down
38 changes: 11 additions & 27 deletions pkg/routeagent_driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,32 +90,22 @@ func main() {
var env environment.Specification

err := envconfig.Process("submariner", &env)
if err != nil {
logger.Fatalf("Error reading the environment variables: %s", err.Error())
}
logger.FatalOnError(err, "Error reading the environment variables")

cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
logger.Fatalf("Error building kubeconfig: %s", err.Error())
}
logger.FatalOnError(err, "Error building kubeconfig")

k8sClientSet, err := kubernetes.NewForConfig(cfg)
if err != nil {
logger.Fatalf("Error building clientset: %s", err.Error())
}
logger.FatalOnError(err, "Error building clientset")

dynamicClientSet, err := dynamic.NewForConfig(cfg)
if err != nil {
logger.Fatalf("Error building dynamic client: %s", err.Error())
}
logger.FatalOnError(err, "Error building dynamic clientr")

err = v1.AddToScheme(scheme.Scheme)
logger.FatalOnError(err, "Error adding submariner to the scheme")

smClientset, err := submarinerClientset.NewForConfig(cfg)
if err != nil {
logger.Fatalf("Error building submariner clientset: %s", err.Error())
}
logger.FatalOnError(err, "Error building submariner clientset")

if env.WaitForNode {
waitForNodeReady(k8sClientSet)
Expand All @@ -131,8 +121,7 @@ func main() {

config := &watcher.Config{RestConfig: cfg}

registry := event.NewRegistry("routeagent_driver", np)
if err := registry.AddHandlers(
registry, err := event.NewRegistry("routeagent_driver", np,
eventlogger.NewHandler(),
kubeproxy.NewSyncHandler(env.ClusterCidr, env.ServiceCidr),
ovn.NewHandler(&ovn.HandlerConfig{
Expand All @@ -149,10 +138,9 @@ func main() {
cabledriver.NewXRFMCleanupHandler(),
cabledriver.NewVXLANCleanup(),
mtu.NewMTUHandler(env.ClusterCidr, len(env.GlobalCidr) != 0, getTCPMssValue(k8sClientSet)),
calico.NewCalicoIPPoolHandler(cfg),
); err != nil {
logger.Fatalf("Error registering the handlers: %s", err.Error())
}
calico.NewCalicoIPPoolHandler(cfg))

logger.FatalOnError(err, "Error registering the handlers")

if env.Uninstall {
uninstall(k8sClientSet, registry)
Expand All @@ -169,14 +157,10 @@ func main() {
MasterURL: masterURL,
Kubeconfig: kubeconfig,
})
if err != nil {
logger.Fatalf("Error creating controller for event handling %v", err)
}
logger.FatalOnError(err, "Error creating controller for event handling")

err = ctl.Start(stopCh)
if err != nil {
logger.Fatalf("Error starting controller: %v", err)
}
logger.FatalOnError(err, "Error starting controller")

<-stopCh
ctl.Stop()
Expand Down

0 comments on commit fd07457

Please sign in to comment.