From f8de9df6e5622d5f3cef97659228968209ffeb2e Mon Sep 17 00:00:00 2001 From: severindellsperger Date: Thu, 18 Jul 2024 11:38:24 +0200 Subject: [PATCH] test network processor --- pkg/processor/event_dispatcher.go | 7 +- pkg/processor/link_event_processor.go | 4 +- pkg/processor/network_processor.go | 55 ++-- pkg/processor/network_processor_test.go | 344 ++++++++++++++++++++++++ 4 files changed, 384 insertions(+), 26 deletions(-) create mode 100644 pkg/processor/network_processor_test.go diff --git a/pkg/processor/event_dispatcher.go b/pkg/processor/event_dispatcher.go index b1d9d72..74444ff 100644 --- a/pkg/processor/event_dispatcher.go +++ b/pkg/processor/event_dispatcher.go @@ -36,6 +36,9 @@ func NewEventDispatcher(nodeEventHandler *NodeEventProcessor, linkEventHandler * } func (dispatcher *EventDispatcher) Dispatch(event domain.NetworkEvent) bool { - handler := dispatcher.eventHandlers[reflect.TypeOf(event)] - return handler.HandleEvent(event) + if event != nil { + handler := dispatcher.eventHandlers[reflect.TypeOf(event)] + return handler.HandleEvent(event) + } + return false } diff --git a/pkg/processor/link_event_processor.go b/pkg/processor/link_event_processor.go index dfe1397..3f3dabc 100644 --- a/pkg/processor/link_event_processor.go +++ b/pkg/processor/link_event_processor.go @@ -127,7 +127,9 @@ func (processor *LinkEventProcessor) ProcessLinks(links []domain.Link) error { return err } } - processor.graph.UpdateSubGraphs() + if len(links) > 0 { + processor.graph.UpdateSubGraphs() + } return nil } diff --git a/pkg/processor/network_processor.go b/pkg/processor/network_processor.go index 6c3d2d3..a31cbaf 100644 --- a/pkg/processor/network_processor.go +++ b/pkg/processor/network_processor.go @@ -24,6 +24,7 @@ type NetworkProcessor struct { linkProcessor LinkProcessor prefixProcessor PrefixProcessor sidProcessor SidProcessor + mutexesLocked bool } type EventOptions struct { @@ -49,6 +50,7 @@ func NewNetworkProcessor(graph graph.Graph, cache cache.Cache, eventChan chan do prefixProcessor: eventOptions.PrefixEventProcessor, sidProcessor: eventOptions.SidEventProcessor, eventDispatcher: eventOptions.EventDispatcher, + mutexesLocked: false, } } @@ -68,41 +70,48 @@ func (processor *NetworkProcessor) ProcessSids(sids []domain.Sid) { processor.sidProcessor.ProcessSids(sids) } +func (processor *NetworkProcessor) dispatchEvent(event domain.NetworkEvent, timer *time.Timer, holdTime time.Duration) { + if !processor.mutexesLocked { + processor.log.Debugln("Locking cache and graph mutexes") + processor.cache.Lock() + processor.graph.Lock() + processor.mutexesLocked = true + } + if processor.eventDispatcher.Dispatch(event) { + processor.needsSubgraphUpdate = true + } + timer.Reset(holdTime) +} + +func (processor *NetworkProcessor) triggerUpdates() { + if processor.mutexesLocked { + processor.log.Debugln("Unlocking cache and graph mutexes") + processor.cache.Unlock() + processor.graph.Unlock() + processor.mutexesLocked = false + } + if processor.needsSubgraphUpdate { + processor.graph.UpdateSubGraphs() + processor.needsSubgraphUpdate = false + } + processor.updateChan <- struct{}{} +} + func (processor *NetworkProcessor) Start() { holdTime := helper.NetworkProcessorHoldTime processor.log.Infof("Starting processing network updates with hold time %s", holdTime.String()) timer := time.NewTimer(holdTime) defer timer.Stop() - mutexesLocked := false for { select { case event := <-processor.eventChan: - if !mutexesLocked { - processor.log.Debugln("Locking cache and graph mutexes") - processor.cache.Lock() - processor.graph.Lock() - mutexesLocked = true - } - if processor.eventDispatcher.Dispatch(event) { - processor.needsSubgraphUpdate = true - } - timer.Reset(holdTime) + processor.dispatchEvent(event, timer, holdTime) case <-timer.C: - if mutexesLocked { - processor.log.Debugln("Unlocking cache and graph mutexes") - processor.cache.Unlock() - processor.graph.Unlock() - mutexesLocked = false - } - if processor.needsSubgraphUpdate { - processor.graph.UpdateSubGraphs() - processor.needsSubgraphUpdate = false - } - processor.updateChan <- struct{}{} + processor.triggerUpdates() case <-processor.quitChan: - if mutexesLocked { + if processor.mutexesLocked { processor.cache.Unlock() processor.graph.Unlock() } diff --git a/pkg/processor/network_processor_test.go b/pkg/processor/network_processor_test.go new file mode 100644 index 0000000..2852a47 --- /dev/null +++ b/pkg/processor/network_processor_test.go @@ -0,0 +1,344 @@ +package processor + +import ( + "sync" + "testing" + "time" + + "github.com/hawkv6/hawkeye/pkg/cache" + "github.com/hawkv6/hawkeye/pkg/domain" + "github.com/hawkv6/hawkeye/pkg/graph" + "github.com/hawkv6/hawkeye/pkg/helper" + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + "google.golang.org/protobuf/proto" +) + +func TestNewNetworkProcessor(t *testing.T) { + tests := []struct { + name string + }{ + { + name: "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + graphMock := graph.NewMockGraph(gomock.NewController(t)) + cache := cache.NewMockCache(gomock.NewController(t)) + nodeEventProcessor := NewNodeEventProcessor(graphMock, cache) + linkEventProcessor := NewLinkEventProcessor(graphMock, cache) + prefixEventProcessor := NewPrefixEventProcessor(graphMock, cache) + sidEventProcessor := NewSidEventProcessor(graphMock, cache) + eventDispatcher := NewEventDispatcher(nodeEventProcessor, linkEventProcessor, prefixEventProcessor, sidEventProcessor) + eventOptions := EventOptions{ + NodeEventProcessor: nodeEventProcessor, + LinkEventProcessor: linkEventProcessor, + PrefixEventProcessor: prefixEventProcessor, + SidEventProcessor: sidEventProcessor, + EventDispatcher: eventDispatcher, + } + networkProcessor := NewNetworkProcessor(graphMock, cache, nil, nil, eventOptions) + assert.NotNil(t, networkProcessor) + }) + } +} + +func TestNetworkProcessor_ProcessNodes(t *testing.T) { + tests := []struct { + name string + }{ + { + name: "TestNetworkProcessor_ProcessNodes", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + graphMock := graph.NewMockGraph(gomock.NewController(t)) + cache := cache.NewMockCache(gomock.NewController(t)) + nodeEventProcessor := NewNodeEventProcessor(graphMock, cache) + linkEventProcessor := NewLinkEventProcessor(graphMock, cache) + prefixEventProcessor := NewPrefixEventProcessor(graphMock, cache) + sidEventProcessor := NewSidEventProcessor(graphMock, cache) + eventDispatcher := NewEventDispatcher(nodeEventProcessor, linkEventProcessor, prefixEventProcessor, sidEventProcessor) + eventOptions := EventOptions{ + NodeEventProcessor: nodeEventProcessor, + LinkEventProcessor: linkEventProcessor, + PrefixEventProcessor: prefixEventProcessor, + SidEventProcessor: sidEventProcessor, + EventDispatcher: eventDispatcher, + } + networkProcessor := NewNetworkProcessor(graphMock, cache, nil, nil, eventOptions) + networkProcessor.ProcessNodes(nil) + }) + } +} + +func TestNetworkProcessor_ProcessLinks(t *testing.T) { + tests := []struct { + name string + }{ + { + name: "TestNetworkProcessor_ProcessLinks", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + graphMock := graph.NewMockGraph(gomock.NewController(t)) + cache := cache.NewMockCache(gomock.NewController(t)) + nodeEventProcessor := NewNodeEventProcessor(graphMock, cache) + linkEventProcessor := NewLinkEventProcessor(graphMock, cache) + prefixEventProcessor := NewPrefixEventProcessor(graphMock, cache) + sidEventProcessor := NewSidEventProcessor(graphMock, cache) + eventDispatcher := NewEventDispatcher(nodeEventProcessor, linkEventProcessor, prefixEventProcessor, sidEventProcessor) + eventOptions := EventOptions{ + NodeEventProcessor: nodeEventProcessor, + LinkEventProcessor: linkEventProcessor, + PrefixEventProcessor: prefixEventProcessor, + SidEventProcessor: sidEventProcessor, + EventDispatcher: eventDispatcher, + } + networkProcessor := NewNetworkProcessor(graphMock, cache, nil, nil, eventOptions) + assert.NoError(t, networkProcessor.ProcessLinks(nil)) + }) + } +} + +func TestNetworkProcessor_ProcessPrefixes(t *testing.T) { + tests := []struct { + name string + }{ + { + name: "TestNetworkProcessor_ProcessPrefixes", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + graphMock := graph.NewMockGraph(gomock.NewController(t)) + cache := cache.NewMockCache(gomock.NewController(t)) + nodeEventProcessor := NewNodeEventProcessor(graphMock, cache) + linkEventProcessor := NewLinkEventProcessor(graphMock, cache) + prefixEventProcessor := NewPrefixEventProcessor(graphMock, cache) + sidEventProcessor := NewSidEventProcessor(graphMock, cache) + eventDispatcher := NewEventDispatcher(nodeEventProcessor, linkEventProcessor, prefixEventProcessor, sidEventProcessor) + eventOptions := EventOptions{ + NodeEventProcessor: nodeEventProcessor, + LinkEventProcessor: linkEventProcessor, + PrefixEventProcessor: prefixEventProcessor, + SidEventProcessor: sidEventProcessor, + EventDispatcher: eventDispatcher, + } + networkProcessor := NewNetworkProcessor(graphMock, cache, nil, nil, eventOptions) + networkProcessor.ProcessPrefixes(nil) + }) + } +} + +func TestNetworkProcessor_ProcessSids(t *testing.T) { + tests := []struct { + name string + }{ + { + name: "TestNetworkProcessor_ProcessSids", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + graphMock := graph.NewMockGraph(gomock.NewController(t)) + cache := cache.NewMockCache(gomock.NewController(t)) + nodeEventProcessor := NewNodeEventProcessor(graphMock, cache) + linkEventProcessor := NewLinkEventProcessor(graphMock, cache) + prefixEventProcessor := NewPrefixEventProcessor(graphMock, cache) + sidEventProcessor := NewSidEventProcessor(graphMock, cache) + eventDispatcher := NewEventDispatcher(nodeEventProcessor, linkEventProcessor, prefixEventProcessor, sidEventProcessor) + eventOptions := EventOptions{ + NodeEventProcessor: nodeEventProcessor, + LinkEventProcessor: linkEventProcessor, + PrefixEventProcessor: prefixEventProcessor, + SidEventProcessor: sidEventProcessor, + EventDispatcher: eventDispatcher, + } + networkProcessor := NewNetworkProcessor(graphMock, cache, nil, nil, eventOptions) + networkProcessor.ProcessSids(nil) + }) + } +} + +func TestNetworkProcessor_dispatchEvent(t *testing.T) { + tests := []struct { + name string + }{ + { + name: "TestNetworkProcessor_dispatchEvent set update false", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + graphMock := graph.NewMockGraph(gomock.NewController(t)) + cacheMock := cache.NewMockCache(gomock.NewController(t)) + nodeEventProcessor := NewNodeEventProcessor(graphMock, cacheMock) + linkEventProcessor := NewLinkEventProcessor(graphMock, cacheMock) + prefixEventProcessor := NewPrefixEventProcessor(graphMock, cacheMock) + sidEventProcessor := NewSidEventProcessor(graphMock, cacheMock) + eventDispatcher := NewEventDispatcher(nodeEventProcessor, linkEventProcessor, prefixEventProcessor, sidEventProcessor) + eventOptions := EventOptions{ + NodeEventProcessor: nodeEventProcessor, + LinkEventProcessor: linkEventProcessor, + PrefixEventProcessor: prefixEventProcessor, + SidEventProcessor: sidEventProcessor, + EventDispatcher: eventDispatcher, + } + networkProcessor := NewNetworkProcessor(graphMock, cacheMock, nil, nil, eventOptions) + + cacheMock.EXPECT().Lock().Return().AnyTimes() + graphMock.EXPECT().Lock().Return().AnyTimes() + deleteNodeEvent := domain.NewDeleteNodeEvent("node key") + node, err := domain.NewDomainNode(proto.String("node key"), proto.String("igp router id"), proto.String("node name"), []uint32{}) + assert.Nil(t, err) + cacheMock.EXPECT().GetNodeByKey(gomock.Any()).Return(node).AnyTimes() + cacheMock.EXPECT().RemoveNode(gomock.Any()).Return().AnyTimes() + graphMock.EXPECT().GetNode(gomock.Any()).Return(nil).AnyTimes() + graphMock.EXPECT().NodeExists(gomock.Any()).Return(true).AnyTimes() + graphMock.EXPECT().DeleteNode(gomock.Any()).Return().AnyTimes() + + holdTime := helper.NetworkProcessorHoldTime + timer := time.NewTimer(holdTime) + networkProcessor.dispatchEvent(deleteNodeEvent, timer, holdTime) + }) + } +} + +func TestNetworkProcessor_triggerUpdates(t *testing.T) { + tests := []struct { + name string + needsSubgraphUpdate bool + }{ + { + name: "TestNetworkProcessor_triggerUpdates no subgraph update", + needsSubgraphUpdate: false, + }, + { + name: "TestNetworkProcessor_triggerUpdates subgraph update", + needsSubgraphUpdate: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + graphMock := graph.NewMockGraph(gomock.NewController(t)) + cacheMock := cache.NewMockCache(gomock.NewController(t)) + nodeEventProcessor := NewNodeEventProcessor(graphMock, cacheMock) + linkEventProcessor := NewLinkEventProcessor(graphMock, cacheMock) + prefixEventProcessor := NewPrefixEventProcessor(graphMock, cacheMock) + sidEventProcessor := NewSidEventProcessor(graphMock, cacheMock) + eventDispatcher := NewEventDispatcher(nodeEventProcessor, linkEventProcessor, prefixEventProcessor, sidEventProcessor) + eventOptions := EventOptions{ + NodeEventProcessor: nodeEventProcessor, + LinkEventProcessor: linkEventProcessor, + PrefixEventProcessor: prefixEventProcessor, + SidEventProcessor: sidEventProcessor, + EventDispatcher: eventDispatcher, + } + networkProcessor := NewNetworkProcessor(graphMock, cacheMock, nil, make(chan struct{}), eventOptions) + if tt.needsSubgraphUpdate { + networkProcessor.needsSubgraphUpdate = true + graphMock.EXPECT().UpdateSubGraphs().Return().AnyTimes() + } + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + networkProcessor.triggerUpdates() + wg.Done() + }() + + <-networkProcessor.updateChan + wg.Wait() + + }) + } +} + +func TestNetworkProcessor_Start(t *testing.T) { + tests := []struct { + name string + }{ + { + name: "TestNetworkProcessor_Start", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + graphMock := graph.NewMockGraph(gomock.NewController(t)) + cacheMock := cache.NewMockCache(gomock.NewController(t)) + nodeEventProcessor := NewNodeEventProcessor(graphMock, cacheMock) + linkEventProcessor := NewLinkEventProcessor(graphMock, cacheMock) + prefixEventProcessor := NewPrefixEventProcessor(graphMock, cacheMock) + sidEventProcessor := NewSidEventProcessor(graphMock, cacheMock) + eventDispatcher := NewEventDispatcher(nodeEventProcessor, linkEventProcessor, prefixEventProcessor, sidEventProcessor) + eventOptions := EventOptions{ + NodeEventProcessor: nodeEventProcessor, + LinkEventProcessor: linkEventProcessor, + PrefixEventProcessor: prefixEventProcessor, + SidEventProcessor: sidEventProcessor, + EventDispatcher: eventDispatcher, + } + eventChan := make(chan domain.NetworkEvent) + networkProcessor := NewNetworkProcessor(graphMock, cacheMock, eventChan, make(chan struct{}), eventOptions) + cacheMock.EXPECT().Lock().Return().AnyTimes() + cacheMock.EXPECT().Unlock().Return().AnyTimes() + graphMock.EXPECT().Lock().Return().AnyTimes() + graphMock.EXPECT().Unlock().Return().AnyTimes() + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + networkProcessor.Start() + wg.Done() + }() + eventChan <- nil + time.Sleep(helper.NetworkProcessorHoldTime) + networkProcessor.quitChan <- struct{}{} + wg.Wait() + }) + } +} + +func TestNetworkProcessor_Stop(t *testing.T) { + tests := []struct { + name string + }{ + { + name: "TestNetworkProcessor_Stop", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + graphMock := graph.NewMockGraph(gomock.NewController(t)) + cacheMock := cache.NewMockCache(gomock.NewController(t)) + nodeEventProcessor := NewNodeEventProcessor(graphMock, cacheMock) + linkEventProcessor := NewLinkEventProcessor(graphMock, cacheMock) + prefixEventProcessor := NewPrefixEventProcessor(graphMock, cacheMock) + sidEventProcessor := NewSidEventProcessor(graphMock, cacheMock) + eventDispatcher := NewEventDispatcher(nodeEventProcessor, linkEventProcessor, prefixEventProcessor, sidEventProcessor) + eventOptions := EventOptions{ + NodeEventProcessor: nodeEventProcessor, + LinkEventProcessor: linkEventProcessor, + PrefixEventProcessor: prefixEventProcessor, + SidEventProcessor: sidEventProcessor, + EventDispatcher: eventDispatcher, + } + eventChan := make(chan domain.NetworkEvent) + networkProcessor := NewNetworkProcessor(graphMock, cacheMock, eventChan, make(chan struct{}), eventOptions) + cacheMock.EXPECT().Lock().Return().AnyTimes() + cacheMock.EXPECT().Unlock().Return().AnyTimes() + graphMock.EXPECT().Lock().Return().AnyTimes() + graphMock.EXPECT().Unlock().Return().AnyTimes() + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + networkProcessor.Start() + wg.Done() + }() + networkProcessor.Stop() + wg.Wait() + }) + } +}