Skip to content

Commit

Permalink
test network processor
Browse files Browse the repository at this point in the history
  • Loading branch information
severindellsperger committed Jul 18, 2024
1 parent c90596a commit f8de9df
Show file tree
Hide file tree
Showing 4 changed files with 384 additions and 26 deletions.
7 changes: 5 additions & 2 deletions pkg/processor/event_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ func NewEventDispatcher(nodeEventHandler *NodeEventProcessor, linkEventHandler *
}

func (dispatcher *EventDispatcher) Dispatch(event domain.NetworkEvent) bool {
handler := dispatcher.eventHandlers[reflect.TypeOf(event)]
return handler.HandleEvent(event)
if event != nil {
handler := dispatcher.eventHandlers[reflect.TypeOf(event)]
return handler.HandleEvent(event)
}
return false
}
4 changes: 3 additions & 1 deletion pkg/processor/link_event_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ func (processor *LinkEventProcessor) ProcessLinks(links []domain.Link) error {
return err
}
}
processor.graph.UpdateSubGraphs()
if len(links) > 0 {
processor.graph.UpdateSubGraphs()
}
return nil
}

Expand Down
55 changes: 32 additions & 23 deletions pkg/processor/network_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type NetworkProcessor struct {
linkProcessor LinkProcessor
prefixProcessor PrefixProcessor
sidProcessor SidProcessor
mutexesLocked bool
}

type EventOptions struct {
Expand All @@ -49,6 +50,7 @@ func NewNetworkProcessor(graph graph.Graph, cache cache.Cache, eventChan chan do
prefixProcessor: eventOptions.PrefixEventProcessor,
sidProcessor: eventOptions.SidEventProcessor,
eventDispatcher: eventOptions.EventDispatcher,
mutexesLocked: false,
}
}

Expand All @@ -68,41 +70,48 @@ func (processor *NetworkProcessor) ProcessSids(sids []domain.Sid) {
processor.sidProcessor.ProcessSids(sids)
}

func (processor *NetworkProcessor) dispatchEvent(event domain.NetworkEvent, timer *time.Timer, holdTime time.Duration) {
if !processor.mutexesLocked {
processor.log.Debugln("Locking cache and graph mutexes")
processor.cache.Lock()
processor.graph.Lock()
processor.mutexesLocked = true
}
if processor.eventDispatcher.Dispatch(event) {
processor.needsSubgraphUpdate = true
}
timer.Reset(holdTime)
}

func (processor *NetworkProcessor) triggerUpdates() {
if processor.mutexesLocked {
processor.log.Debugln("Unlocking cache and graph mutexes")
processor.cache.Unlock()
processor.graph.Unlock()
processor.mutexesLocked = false
}
if processor.needsSubgraphUpdate {
processor.graph.UpdateSubGraphs()
processor.needsSubgraphUpdate = false
}
processor.updateChan <- struct{}{}
}

func (processor *NetworkProcessor) Start() {
holdTime := helper.NetworkProcessorHoldTime
processor.log.Infof("Starting processing network updates with hold time %s", holdTime.String())

timer := time.NewTimer(holdTime)
defer timer.Stop()
mutexesLocked := false

for {
select {
case event := <-processor.eventChan:
if !mutexesLocked {
processor.log.Debugln("Locking cache and graph mutexes")
processor.cache.Lock()
processor.graph.Lock()
mutexesLocked = true
}
if processor.eventDispatcher.Dispatch(event) {
processor.needsSubgraphUpdate = true
}
timer.Reset(holdTime)
processor.dispatchEvent(event, timer, holdTime)
case <-timer.C:
if mutexesLocked {
processor.log.Debugln("Unlocking cache and graph mutexes")
processor.cache.Unlock()
processor.graph.Unlock()
mutexesLocked = false
}
if processor.needsSubgraphUpdate {
processor.graph.UpdateSubGraphs()
processor.needsSubgraphUpdate = false
}
processor.updateChan <- struct{}{}
processor.triggerUpdates()
case <-processor.quitChan:
if mutexesLocked {
if processor.mutexesLocked {
processor.cache.Unlock()
processor.graph.Unlock()
}
Expand Down
Loading

0 comments on commit f8de9df

Please sign in to comment.