Skip to content
This repository has been archived by the owner on Sep 10, 2022. It is now read-only.

make it possible to skip the handshake negotiation #87

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,26 @@ import (
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/transport"

ma "github.com/multiformats/go-multiaddr"
)

type multiaddrConn struct {
local, remote ma.Multiaddr
}

var _ network.ConnMultiaddrs = &multiaddrConn{}

func newMultiaddrConn(conn network.ConnMultiaddrs, proto ma.Protocol) network.ConnMultiaddrs {
return &multiaddrConn{
local: addSecurityProtocol(conn.LocalMultiaddr(), proto),
remote: addSecurityProtocol(conn.RemoteMultiaddr(), proto),
}
}

func (c *multiaddrConn) LocalMultiaddr() ma.Multiaddr { return c.local }
func (c *multiaddrConn) RemoteMultiaddr() ma.Multiaddr { return c.remote }

type transportConn struct {
mux.MuxedConn
network.ConnMultiaddrs
Expand Down Expand Up @@ -38,3 +56,7 @@ func (t *transportConn) String() string {
func (t *transportConn) Stat() network.Stat {
return t.stat
}

func addSecurityProtocol(addr ma.Multiaddr, proto ma.Protocol) ma.Multiaddr {
return addr.Encapsulate(ma.Cast(proto.VCode))
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ go 1.16
require (
github.com/ipfs/go-log v1.0.4
github.com/jbenet/go-temp-err-catcher v0.1.0
github.com/libp2p/go-libp2p-core v0.10.0
github.com/libp2p/go-libp2p-core v0.10.1-0.20210921170543-f829c09c1ca0
github.com/libp2p/go-libp2p-mplex v0.4.1
github.com/libp2p/go-libp2p-pnet v0.2.0
github.com/multiformats/go-multiaddr v0.3.3
github.com/multiformats/go-multiaddr v0.4.1
github.com/stretchr/testify v1.7.0
)
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS
github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw=
github.com/libp2p/go-libp2p-core v0.5.0/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZasHhFfQKibzTls0=
github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.10.0 h1:jFy7v5Muq58GTeYkPhGzIH8Qq4BFfziqc0ixPd/pP9k=
github.com/libp2p/go-libp2p-core v0.10.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-core v0.10.1-0.20210921170543-f829c09c1ca0 h1:W1EqIm0+QVnfQ2SV138//D9NyAhseDxqLMoWvvhYsQQ=
github.com/libp2p/go-libp2p-core v0.10.1-0.20210921170543-f829c09c1ca0/go.mod h1:KlkHsZ0nKerWsXLZJm3LfFQwusI5k3iN4BgtYTE4IYE=
github.com/libp2p/go-libp2p-mplex v0.4.1 h1:/pyhkP1nLwjG3OM+VuaNJkQT/Pqq73WzB3aDN3Fx1sc=
github.com/libp2p/go-libp2p-mplex v0.4.1/go.mod h1:cmy+3GfqfM1PceHTLL7zQzAAYaryDu6iPSC+CIb094g=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
Expand Down Expand Up @@ -115,8 +115,8 @@ github.com/multiformats/go-multiaddr v0.2.1/go.mod h1:s/Apk6IyxfvMjDafnhJgJ3/46z
github.com/multiformats/go-multiaddr v0.2.2/go.mod h1:NtfXiOtHvghW9KojvtySjH5y0u0xW5UouOmQQrn6a3Y=
github.com/multiformats/go-multiaddr v0.3.0/go.mod h1:dF9kph9wfJ+3VLAaeBqo9Of8x4fJxp6ggJGteB8HQTI=
github.com/multiformats/go-multiaddr v0.3.1/go.mod h1:uPbspcUPd5AfaP6ql3ujFY+QWzmBD8uLLL4bXW0XfGc=
github.com/multiformats/go-multiaddr v0.3.3 h1:vo2OTSAqnENB2rLk79pLtr+uhj+VAzSe3uef5q0lRSs=
github.com/multiformats/go-multiaddr v0.3.3/go.mod h1:lCKNGP1EQ1eZ35Za2wlqnabm9xQkib3fyB+nZXHLag0=
github.com/multiformats/go-multiaddr v0.4.1 h1:Pq37uLx3hsyNlTDir7FZyU8+cFCTqd5y1KiM2IzOutI=
github.com/multiformats/go-multiaddr v0.4.1/go.mod h1:3afI9HfVW8csiF8UZqtpYRiDyew8pRX7qLIGHu9FLuM=
github.com/multiformats/go-multiaddr-net v0.2.0/go.mod h1:gGdH3UXny6U3cKKYCvpXI5rnK7YaOIEOPVDI9tsJbEA=
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk=
Expand Down
9 changes: 9 additions & 0 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

logging "github.com/ipfs/go-log"
tec "github.com/jbenet/go-temp-err-catcher"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

Expand Down Expand Up @@ -157,6 +158,14 @@ func (l *listener) Accept() (transport.CapableConn, error) {
return nil, l.err
}

func (l *listener) Multiaddr() ma.Multiaddr {
secProto := l.upgrader.SecurityProtocol()
if secProto.Code == 0 {
return l.Listener.Multiaddr()
}
return addSecurityProtocol(l.Listener.Multiaddr(), secProto)
}

func (l *listener) String() string {
if s, ok := l.transport.(fmt.Stringer); ok {
return fmt.Sprintf("<stream.Listener[%s] %s>", s, l.Multiaddr())
Expand Down
52 changes: 43 additions & 9 deletions listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,60 @@ func createListener(t *testing.T, upgrader *st.Upgrader) transport.Listener {
return upgrader.UpgradeListener(nil, ln)
}

func getLastProtocol(addr ma.Multiaddr) ma.Protocol {
protos := addr.Protocols()
return protos[len(protos)-1]
}

func TestAcceptSingleConn(t *testing.T) {
require := require.New(t)

id, upgrader := createUpgrader(t)
id, upgrader := createUpgraderWithSecureMuxer(t)
ln := createListener(t, upgrader)
defer ln.Close()

cconn, err := dial(t, upgrader, ln.Multiaddr(), id)
require.NoError(err)
require.Equal(getLastProtocol(cconn.LocalMultiaddr()).Code, ma.P_TCP)
require.Equal(getLastProtocol(cconn.RemoteMultiaddr()).Code, ma.P_TCP)

sconn, err := ln.Accept()
require.NoError(err)
require.Equal(getLastProtocol(sconn.LocalMultiaddr()).Code, ma.P_TCP)
require.Equal(getLastProtocol(sconn.RemoteMultiaddr()).Code, ma.P_TCP)

testConn(t, cconn, sconn)
}

func TestAcceptSingleConnWithSecureTransport(t *testing.T) {
require := require.New(t)

id, upgrader := createUpgraderWithSecureTransport(t)
ln := createListener(t, upgrader)
defer ln.Close()

protos := ln.Multiaddr().Protocols()
if protos[len(protos)-1].Code != upgrader.SecurityProtocol().Code {
t.Fatalf("expected listener multiaddr to contain the security protocol, but got %s", ln.Multiaddr())
}

cconn, err := dial(t, upgrader, ln.Multiaddr(), id)
require.NoError(err)
require.Equal(getLastProtocol(cconn.LocalMultiaddr()).Code, upgrader.SecurityProtocol().Code)
require.Equal(getLastProtocol(cconn.RemoteMultiaddr()).Code, upgrader.SecurityProtocol().Code)

sconn, err := ln.Accept()
require.NoError(err)
require.Equal(getLastProtocol(sconn.LocalMultiaddr()).Code, upgrader.SecurityProtocol().Code)
require.Equal(getLastProtocol(sconn.RemoteMultiaddr()).Code, upgrader.SecurityProtocol().Code)

testConn(t, cconn, sconn)
}

func TestAcceptMultipleConns(t *testing.T) {
require := require.New(t)

id, upgrader := createUpgrader(t)
id, upgrader := createUpgraderWithSecureMuxer(t)
ln := createListener(t, upgrader)
defer ln.Close()

Expand Down Expand Up @@ -103,7 +137,7 @@ func TestConnectionsClosedIfNotAccepted(t *testing.T) {
transport.AcceptTimeout = timeout
t.Cleanup(func() { transport.AcceptTimeout = origAcceptTimeout })

id, upgrader := createUpgrader(t)
id, upgrader := createUpgraderWithSecureMuxer(t)
ln := createListener(t, upgrader)
defer ln.Close()

Expand Down Expand Up @@ -137,7 +171,7 @@ func TestConnectionsClosedIfNotAccepted(t *testing.T) {
func TestFailedUpgradeOnListen(t *testing.T) {
require := require.New(t)

id, upgrader := createUpgrader(t)
id, upgrader := createUpgraderWithSecureMuxer(t)
upgrader.Muxer = &errorMuxer{}
ln := createListener(t, upgrader)
defer ln.Close()
Expand All @@ -159,7 +193,7 @@ func TestFailedUpgradeOnListen(t *testing.T) {
func TestListenerClose(t *testing.T) {
require := require.New(t)

_, upgrader := createUpgrader(t)
_, upgrader := createUpgraderWithSecureMuxer(t)
ln := createListener(t, upgrader)

errCh := make(chan error)
Expand Down Expand Up @@ -190,7 +224,7 @@ func TestListenerClose(t *testing.T) {
func TestListenerCloseClosesQueued(t *testing.T) {
require := require.New(t)

id, upgrader := createUpgrader(t)
id, upgrader := createUpgraderWithSecureMuxer(t)
ln := createListener(t, upgrader)

var conns []transport.CapableConn
Expand Down Expand Up @@ -230,7 +264,7 @@ func TestListenerCloseClosesQueued(t *testing.T) {
func TestConcurrentAccept(t *testing.T) {
var num = 3 * st.AcceptQueueLength

id, upgrader := createUpgrader(t)
id, upgrader := createUpgraderWithSecureMuxer(t)
blockingMuxer := newBlockingMuxer()
upgrader.Muxer = blockingMuxer

Expand Down Expand Up @@ -280,7 +314,7 @@ func TestConcurrentAccept(t *testing.T) {
func TestAcceptQueueBacklogged(t *testing.T) {
require := require.New(t)

id, upgrader := createUpgrader(t)
id, upgrader := createUpgraderWithSecureMuxer(t)
ln := createListener(t, upgrader)
defer ln.Close()

Expand Down Expand Up @@ -318,7 +352,7 @@ func TestListenerConnectionGater(t *testing.T) {
require := require.New(t)

testGater := &testGater{}
id, upgrader := createUpgrader(t)
id, upgrader := createUpgraderWithSecureMuxer(t)
upgrader.ConnGater = testGater

ln := createListener(t, upgrader)
Expand Down
52 changes: 42 additions & 10 deletions upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
ipnet "github.com/libp2p/go-libp2p-core/pnet"
"github.com/libp2p/go-libp2p-core/sec"
"github.com/libp2p/go-libp2p-core/transport"

pnet "github.com/libp2p/go-libp2p-pnet"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

Expand All @@ -26,11 +28,15 @@ var AcceptQueueLength = 16

// Upgrader is a multistream upgrader that can upgrade an underlying connection
// to a full transport connection (secure and multiplexed).
// The Upgrader can act in two distinct modes:
// 1. Negotiating the security protocol using multistream: in this case, SecureMuxer must be set.
// 2. Using the security protocol encoded in the multiaddr: in this case, SecureTransport must be set.
type Upgrader struct {
PSK ipnet.PSK
Secure sec.SecureMuxer
Muxer mux.Multiplexer
ConnGater connmgr.ConnectionGater
PSK ipnet.PSK
SecureMuxer sec.SecureMuxer // only used for upgraders that handle addresses without the security protocol
SecureTransport sec.SecureTransport // only used for upgraders that handle addresses containing the security protocol
Muxer mux.Multiplexer
ConnGater connmgr.ConnectionGater
}

// UpgradeListener upgrades the passed multiaddr-net listener into a full libp2p-transport listener.
Expand All @@ -49,6 +55,17 @@ func (u *Upgrader) UpgradeListener(t transport.Transport, list manet.Listener) t
return l
}

// SecurityProtocol return the security protocol that this upgrader uses.
// Note that this function only makes sense when not using a SecureMuxer.
// If a SecureMuxer is used, the zero value of ma.Protocol is returned.
func (u *Upgrader) SecurityProtocol() ma.Protocol {
if u.SecureMuxer != nil {
var zero ma.Protocol
return zero
}
return u.SecureTransport.Protocol()
}

// UpgradeOutbound upgrades the given outbound multiaddr-net connection into a
// full libp2p-transport connection.
// Deprecated: use Upgrade instead.
Expand Down Expand Up @@ -108,21 +125,36 @@ func (u *Upgrader) Upgrade(ctx context.Context, t transport.Transport, maconn ma
return nil, fmt.Errorf("failed to negotiate stream multiplexer: %s", err)
}

tc := &transportConn{
var addrConn network.ConnMultiaddrs
addrConn = maconn
if u.SecureTransport != nil {
addrConn = newMultiaddrConn(maconn, u.SecurityProtocol())
}
return &transportConn{
MuxedConn: smconn,
ConnMultiaddrs: maconn,
ConnMultiaddrs: addrConn,
ConnSecurity: sconn,
transport: t,
stat: stat,
}
return tc, nil
}, nil
}

func (u *Upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID, dir network.Direction) (sec.SecureConn, bool, error) {
marten-seemann marked this conversation as resolved.
Show resolved Hide resolved
if u.SecureMuxer != nil {
if dir == network.DirInbound {
return u.SecureMuxer.SecureInbound(ctx, conn, p)
}
return u.SecureMuxer.SecureOutbound(ctx, conn, p)
}

// When we're not using the security handshake negotiation,
// we've already dealt with TCP simultaneous open during the TCP handshake.
if dir == network.DirInbound {
return u.Secure.SecureInbound(ctx, conn, p)
sconn, err := u.SecureTransport.SecureInbound(ctx, conn, p)
return sconn, true, err
}
return u.Secure.SecureOutbound(ctx, conn, p)
sconn, err := u.SecureTransport.SecureOutbound(ctx, conn, p)
return sconn, false, err
}

func (u *Upgrader) setupMuxer(ctx context.Context, conn net.Conn, server bool) (mux.MuxedConn, error) {
Expand Down
23 changes: 17 additions & 6 deletions upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,25 @@ import (
"github.com/stretchr/testify/require"
)

func createUpgrader(t *testing.T) (peer.ID, *st.Upgrader) {
func createUpgraderWithSecureMuxer(t *testing.T) (peer.ID, *st.Upgrader) {
priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
require.NoError(t, err)
id, err := peer.IDFromPrivateKey(priv)
require.NoError(t, err)
return id, &st.Upgrader{
Secure: &MuxAdapter{tpt: insecure.NewWithIdentity(id, priv)},
Muxer: &negotiatingMuxer{},
SecureMuxer: &MuxAdapter{tpt: insecure.NewWithIdentity(id, priv)},
Muxer: &negotiatingMuxer{},
}
}

func createUpgraderWithSecureTransport(t *testing.T) (peer.ID, *st.Upgrader) {
priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
require.NoError(t, err)
id, err := peer.IDFromPrivateKey(priv)
require.NoError(t, err)
return id, &st.Upgrader{
SecureTransport: insecure.NewWithIdentity(id, priv),
Muxer: &negotiatingMuxer{},
}
}

Expand All @@ -51,7 +62,7 @@ func (m *negotiatingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, er
return mplex.DefaultTransport.NewConn(c, isServer)
}

// blockingMuxer blocks the muxer negotiation until the contain chan is closed
// blockingMuxer blocks the muxer negotiation until the contained chan is closed
type blockingMuxer struct {
unblock chan struct{}
}
Expand Down Expand Up @@ -112,12 +123,12 @@ func dial(t *testing.T, upgrader *st.Upgrader, raddr ma.Multiaddr, p peer.ID) (t
func TestOutboundConnectionGating(t *testing.T) {
require := require.New(t)

id, upgrader := createUpgrader(t)
id, upgrader := createUpgraderWithSecureMuxer(t)
ln := createListener(t, upgrader)
defer ln.Close()

testGater := &testGater{}
_, dialUpgrader := createUpgrader(t)
_, dialUpgrader := createUpgraderWithSecureMuxer(t)
dialUpgrader.ConnGater = testGater
conn, err := dial(t, dialUpgrader, ln.Multiaddr(), id)
require.NoError(err)
Expand Down