diff --git a/psiphon/common/inproxy/session.go b/psiphon/common/inproxy/session.go index 2be89053e..44cb936d3 100644 --- a/psiphon/common/inproxy/session.go +++ b/psiphon/common/inproxy/session.go @@ -37,6 +37,7 @@ import ( "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol" lrucache "github.com/cognusion/go-cache-lru" "github.com/flynn/noise" + "github.com/marusama/semaphore" "golang.org/x/crypto/curve25519" "golang.zx2c4.com/wireguard/replay" ) @@ -50,6 +51,8 @@ const ( resetSessionTokenName = "psiphon-inproxy-session-reset-session-token" resetSessionTokenNonceSize = 32 + + maxResponderConcurrentNewSessions = 32768 ) const ( @@ -450,6 +453,12 @@ func (s *InitiatorSessions) getSession( s.mutex.Lock() defer s.mutex.Unlock() + // Note: unlike in ResponderSessions.getSession, there is no indication, + // in profiling, of high lock contention and blocking here when holding + // the mutex lock while calling newSession. The lock is left in place to + // preserve the semantics of only one concurrent newSession call, + // particularly for brokers initiating new sessions with servers. + session, ok := s.sessions[publicKey] if ok { return session, false, session.isReadyToShare(nil), nil @@ -860,8 +869,10 @@ type ResponderSessions struct { obfuscationReplayHistory *obfuscationReplayHistory expectedInitiatorPublicKeys *sessionPublicKeyLookup - mutex sync.Mutex + mutex sync.RWMutex sessions *lrucache.Cache + + concurrentNewSessions semaphore.Semaphore } // NewResponderSessions creates a new ResponderSessions which allows any @@ -883,6 +894,7 @@ func NewResponderSessions( applyTTL: true, obfuscationReplayHistory: newObfuscationReplayHistory(), sessions: lrucache.NewWithLRU(sessionsTTL, 1*time.Minute, sessionsMaxSize), + concurrentNewSessions: semaphore.New(maxResponderConcurrentNewSessions), }, nil } @@ -1210,16 +1222,35 @@ func (s *ResponderSessions) touchSession(sessionID ID, session *session) { // creates a new session, and places it in the cache, if not found. func (s *ResponderSessions) getSession(sessionID ID) (*session, error) { - s.mutex.Lock() - defer s.mutex.Unlock() + // Concurrency: profiling indicates that holding the mutex lock here when + // calling newSession leads to high contention and blocking. Instead, + // release the lock after checking for an existing session, and then + // recheck -- using lrucache.Add, which fails if an entry exists -- when + // inserting. + // + // A read-only lock is obtained on the initial check, allowing for + // concurrent checks; however, note that lrucache has its own RWMutex and + // obtains a write lock in Get when LRU ejection may need to be performed. + // + // A semaphore is used to enforce a sanity check maximum number of + // concurrent newSession calls. + // + // TODO: add a timeout or stop signal to Acquire? strSessionID := string(sessionID[:]) + s.mutex.RLock() entry, ok := s.sessions.Get(strSessionID) + s.mutex.RUnlock() + if ok { return entry.(*session), nil } + err := s.concurrentNewSessions.Acquire(context.Background(), 1) + if err != nil { + return nil, errors.Trace(err) + } session, err := newSession( false, // !isInitiator s.privateKey, @@ -1230,12 +1261,20 @@ func (s *ResponderSessions) getSession(sessionID ID) (*session, error) { nil, &sessionID, s.expectedInitiatorPublicKeys) + s.concurrentNewSessions.Release(1) + if err != nil { return nil, errors.Trace(err) } - s.sessions.Set( + s.mutex.Lock() + err = s.sessions.Add( strSessionID, session, lrucache.DefaultExpiration) + s.mutex.Unlock() + + if err != nil { + return nil, errors.Trace(err) + } return session, nil }