Skip to content

Commit

Permalink
Merge pull request #717 from rod-hynes/responder-sessions-concurrency
Browse files Browse the repository at this point in the history
Mitigate ResponderSessions lock contention and blocking
  • Loading branch information
rod-hynes authored Jan 7, 2025
2 parents d36db4f + 9d95e3c commit 20b5ec4
Showing 1 changed file with 43 additions and 4 deletions.
47 changes: 43 additions & 4 deletions psiphon/common/inproxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
lrucache "github.com/cognusion/go-cache-lru"
"github.com/flynn/noise"
"github.com/marusama/semaphore"
"golang.org/x/crypto/curve25519"
"golang.zx2c4.com/wireguard/replay"
)
Expand All @@ -50,6 +51,8 @@ const (

resetSessionTokenName = "psiphon-inproxy-session-reset-session-token"
resetSessionTokenNonceSize = 32

maxResponderConcurrentNewSessions = 32768
)

const (
Expand Down Expand Up @@ -450,6 +453,12 @@ func (s *InitiatorSessions) getSession(
s.mutex.Lock()
defer s.mutex.Unlock()

// Note: unlike in ResponderSessions.getSession, there is no indication,
// in profiling, of high lock contention and blocking here when holding
// the mutex lock while calling newSession. The lock is left in place to
// preserve the semantics of only one concurrent newSession call,
// particularly for brokers initiating new sessions with servers.

session, ok := s.sessions[publicKey]
if ok {
return session, false, session.isReadyToShare(nil), nil
Expand Down Expand Up @@ -860,8 +869,10 @@ type ResponderSessions struct {
obfuscationReplayHistory *obfuscationReplayHistory
expectedInitiatorPublicKeys *sessionPublicKeyLookup

mutex sync.Mutex
mutex sync.RWMutex
sessions *lrucache.Cache

concurrentNewSessions semaphore.Semaphore
}

// NewResponderSessions creates a new ResponderSessions which allows any
Expand All @@ -883,6 +894,7 @@ func NewResponderSessions(
applyTTL: true,
obfuscationReplayHistory: newObfuscationReplayHistory(),
sessions: lrucache.NewWithLRU(sessionsTTL, 1*time.Minute, sessionsMaxSize),
concurrentNewSessions: semaphore.New(maxResponderConcurrentNewSessions),
}, nil
}

Expand Down Expand Up @@ -1210,16 +1222,35 @@ func (s *ResponderSessions) touchSession(sessionID ID, session *session) {
// creates a new session, and places it in the cache, if not found.
func (s *ResponderSessions) getSession(sessionID ID) (*session, error) {

s.mutex.Lock()
defer s.mutex.Unlock()
// Concurrency: profiling indicates that holding the mutex lock here when
// calling newSession leads to high contention and blocking. Instead,
// release the lock after checking for an existing session, and then
// recheck -- using lrucache.Add, which fails if an entry exists -- when
// inserting.
//
// A read-only lock is obtained on the initial check, allowing for
// concurrent checks; however, note that lrucache has its own RWMutex and
// obtains a write lock in Get when LRU ejection may need to be performed.
//
// A semaphore is used to enforce a sanity check maximum number of
// concurrent newSession calls.
//
// TODO: add a timeout or stop signal to Acquire?

strSessionID := string(sessionID[:])

s.mutex.RLock()
entry, ok := s.sessions.Get(strSessionID)
s.mutex.RUnlock()

if ok {
return entry.(*session), nil
}

err := s.concurrentNewSessions.Acquire(context.Background(), 1)
if err != nil {
return nil, errors.Trace(err)
}
session, err := newSession(
false, // !isInitiator
s.privateKey,
Expand All @@ -1230,12 +1261,20 @@ func (s *ResponderSessions) getSession(sessionID ID) (*session, error) {
nil,
&sessionID,
s.expectedInitiatorPublicKeys)
s.concurrentNewSessions.Release(1)

if err != nil {
return nil, errors.Trace(err)
}

s.sessions.Set(
s.mutex.Lock()
err = s.sessions.Add(
strSessionID, session, lrucache.DefaultExpiration)
s.mutex.Unlock()

if err != nil {
return nil, errors.Trace(err)
}

return session, nil
}
Expand Down

0 comments on commit 20b5ec4

Please sign in to comment.