diff --git a/pkg/netceptor/netceptor.go b/pkg/netceptor/netceptor.go index 27839cf19..ffdacf840 100644 --- a/pkg/netceptor/netceptor.go +++ b/pkg/netceptor/netceptor.go @@ -798,6 +798,7 @@ func (s *Netceptor) monitorConnectionAging() { for conn := range timedOut { s.Logger.Warning("Timing out connection %s, idle for the past %s\n", conn, s.maxConnectionIdleTime) timedOut[conn]() + s.removeConnection(conn) } case <-s.context.Done(): return @@ -1844,6 +1845,24 @@ func (s *Netceptor) sendAndLogConnectionRejection(remoteNodeID string, ci *connI return fmt.Errorf("%s: rejected connection with node %s because %s", s.nodeID, remoteNodeID, reason) } +func (s *Netceptor) removeConnection(remoteNodeID string) { + if remoteNodeID != "" { + s.connLock.Lock() + delete(s.connections, remoteNodeID) + s.connLock.Unlock() + s.knownNodeLock.Lock() + _, ok := s.knownConnectionCosts[remoteNodeID] + if ok { + delete(s.knownConnectionCosts[remoteNodeID], s.nodeID) + } + _, ok = s.knownConnectionCosts[s.nodeID] + if ok { + delete(s.knownConnectionCosts[s.nodeID], remoteNodeID) + } + s.knownNodeLock.Unlock() + } +} + // Main Netceptor protocol loop. func (s *Netceptor) runProtocol(ctx context.Context, sess BackendSession, bi *BackendInfo) error { if bi.connectionCost <= 0.0 { @@ -1856,14 +1875,7 @@ func (s *Netceptor) runProtocol(ctx context.Context, sess BackendSession, bi *Ba defer func() { _ = sess.Close() if established { - s.connLock.Lock() - delete(s.connections, remoteNodeID) - s.connLock.Unlock() - s.knownNodeLock.Lock() - delete(s.knownConnectionCosts[remoteNodeID], s.nodeID) - delete(s.knownConnectionCosts[s.nodeID], remoteNodeID) - s.knownNodeLock.Unlock() - + s.removeConnection(remoteNodeID) select { case s.sendRouteFloodChan <- 0: case <-ctx.Done(): // ctx is a child of s.context