From 51a1a909f7d1267af9a25f5170cbf59fd5b448fb Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 7 Dec 2024 12:53:16 +0800 Subject: [PATCH 01/10] quicreuse: define an interface for the quic.Listener --- p2p/transport/quicreuse/connmgr.go | 12 ++++++++++-- p2p/transport/quicreuse/listener.go | 2 +- p2p/transport/quicreuse/reuse.go | 12 ++++++++++-- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/p2p/transport/quicreuse/connmgr.go b/p2p/transport/quicreuse/connmgr.go index c3aa0fa046..43e43f8be5 100644 --- a/p2p/transport/quicreuse/connmgr.go +++ b/p2p/transport/quicreuse/connmgr.go @@ -15,6 +15,14 @@ import ( quicmetrics "github.com/quic-go/quic-go/metrics" ) +type QUICListener interface { + Accept(ctx context.Context) (quic.Connection, error) + Close() error + Addr() net.Addr +} + +var _ QUICListener = &quic.Listener{} + type ConnManager struct { reuseUDP4 *reuse reuseUDP6 *reuse @@ -201,7 +209,7 @@ func (c *ConnManager) transportForListen(association any, network string, laddr } return &singleOwnerTransport{ packetConn: conn, - Transport: quic.Transport{ + Transport: &quic.Transport{ Conn: conn, StatelessResetKey: &c.srk, TokenGeneratorKey: &c.tokenKey, @@ -279,7 +287,7 @@ func (c *ConnManager) TransportWithAssociationForDial(association any, network s if err != nil { return nil, err } - return &singleOwnerTransport{Transport: quic.Transport{Conn: conn, StatelessResetKey: &c.srk}, packetConn: conn}, nil + return &singleOwnerTransport{Transport: &quic.Transport{Conn: conn, StatelessResetKey: &c.srk}, packetConn: conn}, nil } func (c *ConnManager) Protocols() []int { diff --git a/p2p/transport/quicreuse/listener.go b/p2p/transport/quicreuse/listener.go index 4ee20042d3..42f1d00cef 100644 --- a/p2p/transport/quicreuse/listener.go +++ b/p2p/transport/quicreuse/listener.go @@ -29,7 +29,7 @@ type protoConf struct { } type quicListener struct { - l *quic.Listener + l QUICListener transport refCountedQuicTransport running chan struct{} addrs []ma.Multiaddr diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index c6fc611331..7865ffd2bf 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -25,11 +25,11 @@ type refCountedQuicTransport interface { IncreaseCount() Dial(ctx context.Context, addr net.Addr, tlsConf *tls.Config, conf *quic.Config) (quic.Connection, error) - Listen(tlsConf *tls.Config, conf *quic.Config) (*quic.Listener, error) + Listen(tlsConf *tls.Config, conf *quic.Config) (QUICListener, error) } type singleOwnerTransport struct { - quic.Transport + *quic.Transport // Used to write packets directly around QUIC. packetConn net.PacketConn @@ -54,6 +54,10 @@ func (c *singleOwnerTransport) WriteTo(b []byte, addr net.Addr) (int, error) { return c.Transport.WriteTo(b, addr) } +func (c *singleOwnerTransport) Listen(tlsConf *tls.Config, conf *quic.Config) (QUICListener, error) { + return c.Transport.Listen(tlsConf, conf) +} + // Constant. Defined as variables to simplify testing. var ( garbageCollectInterval = 30 * time.Second @@ -122,6 +126,10 @@ func (c *refcountedTransport) LocalAddr() net.Addr { return c.Transport.Conn.LocalAddr() } +func (c *refcountedTransport) Listen(tlsConf *tls.Config, conf *quic.Config) (QUICListener, error) { + return c.Transport.Listen(tlsConf, conf) +} + func (c *refcountedTransport) DecreaseCount() { c.mutex.Lock() c.refCount-- From f4e4e5361473f3ec74f01901c4955dbfab27fd22 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 7 Dec 2024 13:25:01 +0800 Subject: [PATCH 02/10] quicreuse: define an interface for the quic.Transport --- p2p/transport/quicreuse/connmgr.go | 30 ++++++++++++++++++++----- p2p/transport/quicreuse/connmgr_test.go | 14 +++++++----- p2p/transport/quicreuse/reuse.go | 19 +++++++++++----- 3 files changed, 47 insertions(+), 16 deletions(-) diff --git a/p2p/transport/quicreuse/connmgr.go b/p2p/transport/quicreuse/connmgr.go index 43e43f8be5..6249ee5904 100644 --- a/p2p/transport/quicreuse/connmgr.go +++ b/p2p/transport/quicreuse/connmgr.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "errors" + "io" "net" "sync" @@ -23,6 +24,14 @@ type QUICListener interface { var _ QUICListener = &quic.Listener{} +type QUICTransport interface { + Listen(tlsConf *tls.Config, conf *quic.Config) (QUICListener, error) + Dial(ctx context.Context, addr net.Addr, tlsConf *tls.Config, conf *quic.Config) (quic.Connection, error) + WriteTo(b []byte, addr net.Addr) (int, error) + ReadNonQUICPacket(ctx context.Context, b []byte) (int, net.Addr, error) + io.Closer +} + type ConnManager struct { reuseUDP4 *reuse reuseUDP6 *reuse @@ -209,10 +218,13 @@ func (c *ConnManager) transportForListen(association any, network string, laddr } return &singleOwnerTransport{ packetConn: conn, - Transport: &quic.Transport{ - Conn: conn, - StatelessResetKey: &c.srk, - TokenGeneratorKey: &c.tokenKey, + localAddr: conn.LocalAddr(), + Transport: &wrappedQUICTransport{ + &quic.Transport{ + Conn: conn, + StatelessResetKey: &c.srk, + TokenGeneratorKey: &c.tokenKey, + }, }, }, nil } @@ -287,7 +299,7 @@ func (c *ConnManager) TransportWithAssociationForDial(association any, network s if err != nil { return nil, err } - return &singleOwnerTransport{Transport: &quic.Transport{Conn: conn, StatelessResetKey: &c.srk}, packetConn: conn}, nil + return &singleOwnerTransport{Transport: &wrappedQUICTransport{&quic.Transport{Conn: conn, StatelessResetKey: &c.srk}}, packetConn: conn}, nil } func (c *ConnManager) Protocols() []int { @@ -307,3 +319,11 @@ func (c *ConnManager) Close() error { func (c *ConnManager) ClientConfig() *quic.Config { return c.clientConfig } + +type wrappedQUICTransport struct { + *quic.Transport +} + +func (t *wrappedQUICTransport) Listen(tlsConf *tls.Config, conf *quic.Config) (QUICListener, error) { + return t.Transport.Listen(tlsConf, conf) +} diff --git a/p2p/transport/quicreuse/connmgr_test.go b/p2p/transport/quicreuse/connmgr_test.go index 8e7da2cd7f..aeca87be4f 100644 --- a/p2p/transport/quicreuse/connmgr_test.go +++ b/p2p/transport/quicreuse/connmgr_test.go @@ -97,9 +97,10 @@ func TestConnectionPassedToQUICForListening(t *testing.T) { quicTr, err := cm.transportForListen(nil, netw, naddr) require.NoError(t, err) defer quicTr.Close() - if _, ok := quicTr.(*singleOwnerTransport).Transport.Conn.(quic.OOBCapablePacketConn); !ok { - t.Fatal("connection passed to quic-go cannot be type asserted to a *net.UDPConn") - } + // TODO: fix test + // if _, ok := quicTr.(*singleOwnerTransport).Transport.Conn.(quic.OOBCapablePacketConn); !ok { + // t.Fatal("connection passed to quic-go cannot be type asserted to a *net.UDPConn") + // } } func TestAcceptErrorGetCleanedUp(t *testing.T) { @@ -156,9 +157,10 @@ func TestConnectionPassedToQUICForDialing(t *testing.T) { require.NoError(t, err, "dial error") defer quicTr.Close() - if _, ok := quicTr.(*singleOwnerTransport).Transport.Conn.(quic.OOBCapablePacketConn); !ok { - t.Fatal("connection passed to quic-go cannot be type asserted to a *net.UDPConn") - } + // TODO: fix test + // if _, ok := quicTr.(*singleOwnerTransport).Transport.Conn.(quic.OOBCapablePacketConn); !ok { + // t.Fatal("connection passed to quic-go cannot be type asserted to a *net.UDPConn") + // } } func getTLSConfForProto(t *testing.T, alpn string) (peer.ID, *tls.Config) { diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index 7865ffd2bf..6e14a61cc5 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -29,19 +29,28 @@ type refCountedQuicTransport interface { } type singleOwnerTransport struct { - *quic.Transport + Transport QUICTransport + + localAddr net.Addr // Used to write packets directly around QUIC. packetConn net.PacketConn } +var _ QUICTransport = &singleOwnerTransport{} + func (c *singleOwnerTransport) IncreaseCount() {} -func (c *singleOwnerTransport) DecreaseCount() { - c.Transport.Close() +func (c *singleOwnerTransport) DecreaseCount() { c.Transport.Close() } +func (c *singleOwnerTransport) LocalAddr() net.Addr { + return c.localAddr } -func (c *singleOwnerTransport) LocalAddr() net.Addr { - return c.Transport.Conn.LocalAddr() +func (c *singleOwnerTransport) Dial(ctx context.Context, addr net.Addr, tlsConf *tls.Config, conf *quic.Config) (quic.Connection, error) { + return c.Transport.Dial(ctx, addr, tlsConf, conf) +} + +func (c *singleOwnerTransport) ReadNonQUICPacket(ctx context.Context, b []byte) (int, net.Addr, error) { + return c.Transport.ReadNonQUICPacket(ctx, b) } func (c *singleOwnerTransport) Close() error { From 53e9aa220662319c989abc2eebea771020a99b20 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 7 Dec 2024 17:50:29 +0800 Subject: [PATCH 03/10] reuse: add a ConnManager.AddTransport method --- p2p/transport/quicreuse/connmgr.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/p2p/transport/quicreuse/connmgr.go b/p2p/transport/quicreuse/connmgr.go index 6249ee5904..33abf13165 100644 --- a/p2p/transport/quicreuse/connmgr.go +++ b/p2p/transport/quicreuse/connmgr.go @@ -118,6 +118,31 @@ func (c *ConnManager) getReuse(network string) (*reuse, error) { } } +// TODO: think about error handling here. There shouldn't be any need to return an error +func (c *ConnManager) AddTransport(tr QUICTransport, conn net.PacketConn) error { + c.quicListenersMu.Lock() + defer c.quicListenersMu.Unlock() + + refCountedTr := &singleOwnerTransport{ + Transport: tr, + packetConn: conn, + localAddr: conn.LocalAddr(), + } + + // TODO: think about quic.Config handling + ln, err := newQuicListener(refCountedTr, c.serverConfig) + if err != nil { + return err + } + + key := conn.LocalAddr().String() + c.quicListeners[key] = quicListenerEntry{ + refCount: 1, + ln: ln, + } + return nil +} + func (c *ConnManager) ListenQUIC(addr ma.Multiaddr, tlsConf *tls.Config, allowWindowIncrease func(conn quic.Connection, delta uint64) bool) (Listener, error) { return c.ListenQUICAndAssociate(nil, addr, tlsConf, allowWindowIncrease) } From 3ea903bfb1e4e800599378f4fe5981ffad0d8489 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 15 Dec 2024 21:38:53 +0800 Subject: [PATCH 04/10] reuse: use a refcounted transport --- p2p/transport/quicreuse/connmgr.go | 23 +++++++++---- p2p/transport/quicreuse/nonquic_packetconn.go | 6 ++-- p2p/transport/quicreuse/reuse.go | 33 +++++++++++-------- 3 files changed, 38 insertions(+), 24 deletions(-) diff --git a/p2p/transport/quicreuse/connmgr.go b/p2p/transport/quicreuse/connmgr.go index 33abf13165..a1cd9013eb 100644 --- a/p2p/transport/quicreuse/connmgr.go +++ b/p2p/transport/quicreuse/connmgr.go @@ -118,16 +118,25 @@ func (c *ConnManager) getReuse(network string) (*reuse, error) { } } -// TODO: think about error handling here. There shouldn't be any need to return an error -func (c *ConnManager) AddTransport(tr QUICTransport, conn net.PacketConn) error { +func (c *ConnManager) AddTransport(network string, tr QUICTransport, conn net.PacketConn) error { c.quicListenersMu.Lock() defer c.quicListenersMu.Unlock() - refCountedTr := &singleOwnerTransport{ - Transport: tr, - packetConn: conn, - localAddr: conn.LocalAddr(), + refCountedTr := &refcountedTransport{ + QUICTransport: tr, + packetConn: conn, + refCount: 1, + } + + var reuse *reuse + reuse, err := c.getReuse(network) + if err != nil { + return err } + // TODO: this assumes that the connection is a listening on 0.0.0.0 + // We should generalize this to support listening on specific addresses + reuse.globalListeners[conn.LocalAddr().(*net.UDPAddr).Port] = refCountedTr + reuse.globalDialers[conn.LocalAddr().(*net.UDPAddr).Port] = refCountedTr // TODO: think about quic.Config handling ln, err := newQuicListener(refCountedTr, c.serverConfig) @@ -217,7 +226,7 @@ func (c *ConnManager) SharedNonQUICPacketConn(network string, laddr *net.UDPAddr ctx: ctx, ctxCancel: cancel, owningTransport: t, - tr: &t.Transport, + tr: t.QUICTransport, }, nil } return nil, errors.New("expected to be able to share with a QUIC listener, but the QUIC listener is not using a refcountedTransport. `DisableReuseport` should not be set") diff --git a/p2p/transport/quicreuse/nonquic_packetconn.go b/p2p/transport/quicreuse/nonquic_packetconn.go index 2f950e76a1..833bd5804a 100644 --- a/p2p/transport/quicreuse/nonquic_packetconn.go +++ b/p2p/transport/quicreuse/nonquic_packetconn.go @@ -4,8 +4,6 @@ import ( "context" "net" "time" - - "github.com/quic-go/quic-go" ) // nonQUICPacketConn is a net.PacketConn that can be used to read and write @@ -13,7 +11,7 @@ import ( // other transports like WebRTC. type nonQUICPacketConn struct { owningTransport refCountedQuicTransport - tr *quic.Transport + tr QUICTransport ctx context.Context ctxCancel context.CancelFunc readCtx context.Context @@ -32,7 +30,7 @@ func (n *nonQUICPacketConn) Close() error { // LocalAddr implements net.PacketConn. func (n *nonQUICPacketConn) LocalAddr() net.Addr { - return n.tr.Conn.LocalAddr() + return n.owningTransport.LocalAddr() } // ReadFrom implements net.PacketConn. diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index 6e14a61cc5..d21316fa45 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -74,7 +74,7 @@ var ( ) type refcountedTransport struct { - quic.Transport + QUICTransport // Used to write packets directly around QUIC. packetConn net.PacketConn @@ -123,20 +123,20 @@ func (c *refcountedTransport) IncreaseCount() { func (c *refcountedTransport) Close() error { // TODO(when we drop support for go 1.19) use errors.Join - c.Transport.Close() + c.QUICTransport.Close() return c.packetConn.Close() } func (c *refcountedTransport) WriteTo(b []byte, addr net.Addr) (int, error) { - return c.Transport.WriteTo(b, addr) + return c.QUICTransport.WriteTo(b, addr) } func (c *refcountedTransport) LocalAddr() net.Addr { - return c.Transport.Conn.LocalAddr() + return c.packetConn.LocalAddr() } func (c *refcountedTransport) Listen(tlsConf *tls.Config, conf *quic.Config) (QUICListener, error) { - return c.Transport.Listen(tlsConf, conf) + return c.QUICTransport.Listen(tlsConf, conf) } func (c *refcountedTransport) DecreaseCount() { @@ -319,11 +319,16 @@ func (r *reuse) transportForDialLocked(association any, network string, source * if err != nil { return nil, err } - tr := &refcountedTransport{Transport: quic.Transport{ - Conn: conn, - StatelessResetKey: r.statelessResetKey, - TokenGeneratorKey: r.tokenGeneratorKey, - }, packetConn: conn} + tr := &refcountedTransport{ + QUICTransport: &wrappedQUICTransport{ + Transport: &quic.Transport{ + Conn: conn, + StatelessResetKey: r.statelessResetKey, + TokenGeneratorKey: r.tokenGeneratorKey, + }, + }, + packetConn: conn, + } r.globalDialers[conn.LocalAddr().(*net.UDPAddr).Port] = tr return tr, nil } @@ -368,9 +373,11 @@ func (r *reuse) TransportForListen(network string, laddr *net.UDPAddr) (*refcoun } localAddr := conn.LocalAddr().(*net.UDPAddr) tr := &refcountedTransport{ - Transport: quic.Transport{ - Conn: conn, - StatelessResetKey: r.statelessResetKey, + QUICTransport: &wrappedQUICTransport{ + Transport: &quic.Transport{ + Conn: conn, + StatelessResetKey: r.statelessResetKey, + }, }, packetConn: conn, } From 0a09ad187411dbee1667ca50b2d4f8e5fb74bf29 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 16 Dec 2024 23:15:22 +0800 Subject: [PATCH 05/10] clean up code --- p2p/transport/quicreuse/connmgr.go | 13 ++++++++----- p2p/transport/quicreuse/reuse.go | 7 +++++-- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/p2p/transport/quicreuse/connmgr.go b/p2p/transport/quicreuse/connmgr.go index a1cd9013eb..da35ff6567 100644 --- a/p2p/transport/quicreuse/connmgr.go +++ b/p2p/transport/quicreuse/connmgr.go @@ -122,6 +122,11 @@ func (c *ConnManager) AddTransport(network string, tr QUICTransport, conn net.Pa c.quicListenersMu.Lock() defer c.quicListenersMu.Unlock() + localAddr, ok := conn.LocalAddr().(*net.UDPAddr) + if !ok { + return errors.New("expected a conn.LocalAddr() to return a *net.UDPAddr") + } + refCountedTr := &refcountedTransport{ QUICTransport: tr, packetConn: conn, @@ -133,12 +138,10 @@ func (c *ConnManager) AddTransport(network string, tr QUICTransport, conn net.Pa if err != nil { return err } - // TODO: this assumes that the connection is a listening on 0.0.0.0 - // We should generalize this to support listening on specific addresses - reuse.globalListeners[conn.LocalAddr().(*net.UDPAddr).Port] = refCountedTr - reuse.globalDialers[conn.LocalAddr().(*net.UDPAddr).Port] = refCountedTr + if err := reuse.AddTransport(refCountedTr, localAddr); err != nil { + return err + } - // TODO: think about quic.Config handling ln, err := newQuicListener(refCountedTr, c.serverConfig) if err != nil { return err diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index d21316fa45..8944e99a5e 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -382,13 +382,16 @@ func (r *reuse) TransportForListen(network string, laddr *net.UDPAddr) (*refcoun packetConn: conn, } tr.IncreaseCount() + return tr, r.AddTransport(tr, localAddr) +} +func (r *reuse) AddTransport(tr *refcountedTransport, localAddr *net.UDPAddr) error { // Deal with listen on a global address if localAddr.IP.IsUnspecified() { // The kernel already checked that the laddr is not already listen // so we need not check here (when we create ListenUDP). r.globalListeners[localAddr.Port] = tr - return tr, nil + return nil } // Deal with listen on a unicast address @@ -402,7 +405,7 @@ func (r *reuse) TransportForListen(network string, laddr *net.UDPAddr) (*refcoun // The kernel already checked that the laddr is not already listen // so we need not check here (when we create ListenUDP). r.unicast[localAddr.IP.String()][localAddr.Port] = tr - return tr, nil + return nil } func (r *reuse) Close() error { From e2e572919cc5585643548ae3db0ad8c8453f7a17 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 20 Dec 2024 12:18:19 +0800 Subject: [PATCH 06/10] quicreuse: fix adding of externally constructed transport We shouldn't create the listener entry. This happens once the libp2p QUIC transport starts listening. --- p2p/transport/quicreuse/connmgr.go | 16 +------- p2p/transport/quicreuse/connmgr_test.go | 50 +++++++++++++++++++++++++ p2p/transport/quicreuse/reuse.go | 23 +++++++++--- 3 files changed, 69 insertions(+), 20 deletions(-) diff --git a/p2p/transport/quicreuse/connmgr.go b/p2p/transport/quicreuse/connmgr.go index da35ff6567..7fbffb17eb 100644 --- a/p2p/transport/quicreuse/connmgr.go +++ b/p2p/transport/quicreuse/connmgr.go @@ -138,21 +138,7 @@ func (c *ConnManager) AddTransport(network string, tr QUICTransport, conn net.Pa if err != nil { return err } - if err := reuse.AddTransport(refCountedTr, localAddr); err != nil { - return err - } - - ln, err := newQuicListener(refCountedTr, c.serverConfig) - if err != nil { - return err - } - - key := conn.LocalAddr().String() - c.quicListeners[key] = quicListenerEntry{ - refCount: 1, - ln: ln, - } - return nil + return reuse.AddTransport(refCountedTr, localAddr) } func (c *ConnManager) ListenQUIC(addr ma.Multiaddr, tlsConf *tls.Config, allowWindowIncrease func(conn quic.Connection, delta uint64) bool) (Listener, error) { diff --git a/p2p/transport/quicreuse/connmgr_test.go b/p2p/transport/quicreuse/connmgr_test.go index aeca87be4f..63e518d1ae 100644 --- a/p2p/transport/quicreuse/connmgr_test.go +++ b/p2p/transport/quicreuse/connmgr_test.go @@ -259,3 +259,53 @@ func testListener(t *testing.T, enableReuseport bool) { checkClosed(t, cm) } + +func TestExternalTransport(t *testing.T) { + conn, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4zero}) + require.NoError(t, err) + defer conn.Close() + port := conn.LocalAddr().(*net.UDPAddr).Port + tr := &quic.Transport{Conn: conn} + defer tr.Close() + + cm, err := NewConnManager(quic.StatelessResetKey{}, quic.TokenGeneratorKey{}) + require.NoError(t, err) + require.NoError(t, cm.AddTransport("udp4", &wrappedQUICTransport{tr}, conn)) + + // make sure this transport is used when listening on the same port + ln, err := cm.ListenQUICAndAssociate( + "quic", + ma.StringCast(fmt.Sprintf("/ip4/0.0.0.0/udp/%d", port)), + &tls.Config{NextProtos: []string{"libp2p"}}, + func(quic.Connection, uint64) bool { return false }, + ) + require.NoError(t, err) + defer ln.Close() + require.Equal(t, port, ln.Addr().(*net.UDPAddr).Port) + + // make sure this transport is used when dialing out + udpLn, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1)}) + require.NoError(t, err) + defer udpLn.Close() + addrChan := make(chan net.Addr, 1) + go func() { + _, addr, _ := udpLn.ReadFrom(make([]byte, 2000)) + addrChan <- addr + }() + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + _, err = cm.DialQUIC( + ctx, + ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/udp/%d/quic-v1", udpLn.LocalAddr().(*net.UDPAddr).Port)), + &tls.Config{NextProtos: []string{"libp2p"}}, + func(quic.Connection, uint64) bool { return false }, + ) + require.ErrorIs(t, err, context.DeadlineExceeded) + + select { + case addr := <-addrChan: + require.Equal(t, port, addr.(*net.UDPAddr).Port) + case <-time.After(time.Second): + t.Fatal("timeout") + } +} diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index 8944e99a5e..8191b0061b 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -3,6 +3,8 @@ package quicreuse import ( "context" "crypto/tls" + "errors" + "fmt" "net" "sync" "time" @@ -333,6 +335,20 @@ func (r *reuse) transportForDialLocked(association any, network string, source * return tr, nil } +func (r *reuse) AddTransport(tr *refcountedTransport, laddr *net.UDPAddr) error { + r.mutex.Lock() + defer r.mutex.Unlock() + + if !laddr.IP.IsUnspecified() { + return errors.New("adding transport for specific IP not supported") + } + if _, ok := r.globalDialers[laddr.Port]; ok { + return fmt.Errorf("already have global dialer for port %d", laddr.Port) + } + r.globalDialers[laddr.Port] = tr + return nil +} + func (r *reuse) TransportForListen(network string, laddr *net.UDPAddr) (*refcountedTransport, error) { r.mutex.Lock() defer r.mutex.Unlock() @@ -382,16 +398,13 @@ func (r *reuse) TransportForListen(network string, laddr *net.UDPAddr) (*refcoun packetConn: conn, } tr.IncreaseCount() - return tr, r.AddTransport(tr, localAddr) -} -func (r *reuse) AddTransport(tr *refcountedTransport, localAddr *net.UDPAddr) error { // Deal with listen on a global address if localAddr.IP.IsUnspecified() { // The kernel already checked that the laddr is not already listen // so we need not check here (when we create ListenUDP). r.globalListeners[localAddr.Port] = tr - return nil + return tr, nil } // Deal with listen on a unicast address @@ -405,7 +418,7 @@ func (r *reuse) AddTransport(tr *refcountedTransport, localAddr *net.UDPAddr) er // The kernel already checked that the laddr is not already listen // so we need not check here (when we create ListenUDP). r.unicast[localAddr.IP.String()][localAddr.Port] = tr - return nil + return tr, nil } func (r *reuse) Close() error { From 8db5f55ef95e7d55d01ed7e255a7ef808451e1fd Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 20 Dec 2024 12:22:03 +0800 Subject: [PATCH 07/10] quicreuse: prevent gc and shutdown of externally constructed transports --- p2p/transport/quicreuse/connmgr.go | 2 +- p2p/transport/quicreuse/reuse.go | 16 +++++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/p2p/transport/quicreuse/connmgr.go b/p2p/transport/quicreuse/connmgr.go index 7fbffb17eb..7632a07d6c 100644 --- a/p2p/transport/quicreuse/connmgr.go +++ b/p2p/transport/quicreuse/connmgr.go @@ -130,7 +130,7 @@ func (c *ConnManager) AddTransport(network string, tr QUICTransport, conn net.Pa refCountedTr := &refcountedTransport{ QUICTransport: tr, packetConn: conn, - refCount: 1, + isExternal: true, } var reuse *reuse diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index 8191b0061b..1ae0468b83 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -84,6 +84,7 @@ type refcountedTransport struct { mutex sync.Mutex refCount int unusedSince time.Time + isExternal bool // if the transport was created externally, it is neither gc-ed nor closed assocations map[any]struct{} } @@ -151,6 +152,9 @@ func (c *refcountedTransport) DecreaseCount() { } func (c *refcountedTransport) ShouldGarbageCollect(now time.Time) bool { + if c.isExternal { + return false + } c.mutex.Lock() defer c.mutex.Unlock() return !c.unusedSince.IsZero() && c.unusedSince.Add(maxUnusedDuration).Before(now) @@ -193,14 +197,20 @@ func (r *reuse) gc() { defer func() { r.mutex.Lock() for _, tr := range r.globalListeners { - tr.Close() + if !tr.isExternal { + tr.Close() + } } for _, tr := range r.globalDialers { - tr.Close() + if !tr.isExternal { + tr.Close() + } } for _, trs := range r.unicast { for _, tr := range trs { - tr.Close() + if !tr.isExternal { + tr.Close() + } } } r.mutex.Unlock() From 15da2723fccb3e5f27768b894e15aea2848b4d2b Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 9 Jan 2025 18:41:45 -0500 Subject: [PATCH 08/10] Fix todos --- p2p/transport/quicreuse/connmgr_test.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/p2p/transport/quicreuse/connmgr_test.go b/p2p/transport/quicreuse/connmgr_test.go index 63e518d1ae..9d0d4f7f70 100644 --- a/p2p/transport/quicreuse/connmgr_test.go +++ b/p2p/transport/quicreuse/connmgr_test.go @@ -97,10 +97,9 @@ func TestConnectionPassedToQUICForListening(t *testing.T) { quicTr, err := cm.transportForListen(nil, netw, naddr) require.NoError(t, err) defer quicTr.Close() - // TODO: fix test - // if _, ok := quicTr.(*singleOwnerTransport).Transport.Conn.(quic.OOBCapablePacketConn); !ok { - // t.Fatal("connection passed to quic-go cannot be type asserted to a *net.UDPConn") - // } + if _, ok := quicTr.(*singleOwnerTransport).packetConn.(quic.OOBCapablePacketConn); !ok { + t.Fatal("connection passed to quic-go cannot be type asserted to a *net.UDPConn") + } } func TestAcceptErrorGetCleanedUp(t *testing.T) { @@ -157,10 +156,9 @@ func TestConnectionPassedToQUICForDialing(t *testing.T) { require.NoError(t, err, "dial error") defer quicTr.Close() - // TODO: fix test - // if _, ok := quicTr.(*singleOwnerTransport).Transport.Conn.(quic.OOBCapablePacketConn); !ok { - // t.Fatal("connection passed to quic-go cannot be type asserted to a *net.UDPConn") - // } + if _, ok := quicTr.(*singleOwnerTransport).packetConn.(quic.OOBCapablePacketConn); !ok { + t.Fatal("connection passed to quic-go cannot be type asserted to a *net.UDPConn") + } } func getTLSConfForProto(t *testing.T, alpn string) (peer.ID, *tls.Config) { From d8cc67d7afadfedb9bb280644721de773649ff1a Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 10 Jan 2025 12:16:14 -0500 Subject: [PATCH 09/10] return a doneSignal --- p2p/transport/quicreuse/connmgr.go | 17 ++++++++------ p2p/transport/quicreuse/connmgr_test.go | 10 ++++++++- p2p/transport/quicreuse/reuse.go | 30 ++++++++++++------------- 3 files changed, 33 insertions(+), 24 deletions(-) diff --git a/p2p/transport/quicreuse/connmgr.go b/p2p/transport/quicreuse/connmgr.go index 7632a07d6c..26a3c74841 100644 --- a/p2p/transport/quicreuse/connmgr.go +++ b/p2p/transport/quicreuse/connmgr.go @@ -118,27 +118,30 @@ func (c *ConnManager) getReuse(network string) (*reuse, error) { } } -func (c *ConnManager) AddTransport(network string, tr QUICTransport, conn net.PacketConn) error { +// LendTransport is an advanced method used to lend an existing QUICTransport +// to the ConnManager. The ConnManager will close the returned channel when it +// is done with the transport, so that the owner may safely close the transport. +func (c *ConnManager) LendTransport(network string, tr QUICTransport, conn net.PacketConn) (<-chan struct{}, error) { c.quicListenersMu.Lock() defer c.quicListenersMu.Unlock() localAddr, ok := conn.LocalAddr().(*net.UDPAddr) if !ok { - return errors.New("expected a conn.LocalAddr() to return a *net.UDPAddr") + return nil, errors.New("expected a conn.LocalAddr() to return a *net.UDPAddr") } refCountedTr := &refcountedTransport{ - QUICTransport: tr, - packetConn: conn, - isExternal: true, + QUICTransport: tr, + packetConn: conn, + borrowDoneSignal: make(chan struct{}), } var reuse *reuse reuse, err := c.getReuse(network) if err != nil { - return err + return nil, err } - return reuse.AddTransport(refCountedTr, localAddr) + return refCountedTr.borrowDoneSignal, reuse.AddTransport(refCountedTr, localAddr) } func (c *ConnManager) ListenQUIC(addr ma.Multiaddr, tlsConf *tls.Config, allowWindowIncrease func(conn quic.Connection, delta uint64) bool) (Listener, error) { diff --git a/p2p/transport/quicreuse/connmgr_test.go b/p2p/transport/quicreuse/connmgr_test.go index 9d0d4f7f70..51646bac98 100644 --- a/p2p/transport/quicreuse/connmgr_test.go +++ b/p2p/transport/quicreuse/connmgr_test.go @@ -268,7 +268,8 @@ func TestExternalTransport(t *testing.T) { cm, err := NewConnManager(quic.StatelessResetKey{}, quic.TokenGeneratorKey{}) require.NoError(t, err) - require.NoError(t, cm.AddTransport("udp4", &wrappedQUICTransport{tr}, conn)) + doneWithTr, err := cm.LendTransport("udp4", &wrappedQUICTransport{tr}, conn) + require.NoError(t, err) // make sure this transport is used when listening on the same port ln, err := cm.ListenQUICAndAssociate( @@ -306,4 +307,11 @@ func TestExternalTransport(t *testing.T) { case <-time.After(time.Second): t.Fatal("timeout") } + + cm.Close() + select { + case <-doneWithTr: + default: + t.Fatal("doneWithTr not closed") + } } diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index 1ae0468b83..a3eff4c220 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -84,7 +84,11 @@ type refcountedTransport struct { mutex sync.Mutex refCount int unusedSince time.Time - isExternal bool // if the transport was created externally, it is neither gc-ed nor closed + + // Only set for transports we are borrowing. + // If set, we will _never_ close the underlying transport. We only close this + // channel to signal to the owner that we are done with it. + borrowDoneSignal chan struct{} assocations map[any]struct{} } @@ -125,9 +129,12 @@ func (c *refcountedTransport) IncreaseCount() { } func (c *refcountedTransport) Close() error { - // TODO(when we drop support for go 1.19) use errors.Join - c.QUICTransport.Close() - return c.packetConn.Close() + if c.borrowDoneSignal != nil { + close(c.borrowDoneSignal) + return nil + } + + return errors.Join(c.QUICTransport.Close(), c.packetConn.Close()) } func (c *refcountedTransport) WriteTo(b []byte, addr net.Addr) (int, error) { @@ -152,9 +159,6 @@ func (c *refcountedTransport) DecreaseCount() { } func (c *refcountedTransport) ShouldGarbageCollect(now time.Time) bool { - if c.isExternal { - return false - } c.mutex.Lock() defer c.mutex.Unlock() return !c.unusedSince.IsZero() && c.unusedSince.Add(maxUnusedDuration).Before(now) @@ -197,20 +201,14 @@ func (r *reuse) gc() { defer func() { r.mutex.Lock() for _, tr := range r.globalListeners { - if !tr.isExternal { - tr.Close() - } + tr.Close() } for _, tr := range r.globalDialers { - if !tr.isExternal { - tr.Close() - } + tr.Close() } for _, trs := range r.unicast { for _, tr := range trs { - if !tr.isExternal { - tr.Close() - } + tr.Close() } } r.mutex.Unlock() From cee80441859afc004078266674c3ebe218eca5ed Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 10 Jan 2025 12:25:08 -0500 Subject: [PATCH 10/10] avoid storing localAddr --- p2p/transport/quicreuse/connmgr.go | 1 - p2p/transport/quicreuse/reuse.go | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/p2p/transport/quicreuse/connmgr.go b/p2p/transport/quicreuse/connmgr.go index 26a3c74841..def7c5da56 100644 --- a/p2p/transport/quicreuse/connmgr.go +++ b/p2p/transport/quicreuse/connmgr.go @@ -244,7 +244,6 @@ func (c *ConnManager) transportForListen(association any, network string, laddr } return &singleOwnerTransport{ packetConn: conn, - localAddr: conn.LocalAddr(), Transport: &wrappedQUICTransport{ &quic.Transport{ Conn: conn, diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index a3eff4c220..e329ea49dd 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -33,8 +33,6 @@ type refCountedQuicTransport interface { type singleOwnerTransport struct { Transport QUICTransport - localAddr net.Addr - // Used to write packets directly around QUIC. packetConn net.PacketConn } @@ -44,7 +42,7 @@ var _ QUICTransport = &singleOwnerTransport{} func (c *singleOwnerTransport) IncreaseCount() {} func (c *singleOwnerTransport) DecreaseCount() { c.Transport.Close() } func (c *singleOwnerTransport) LocalAddr() net.Addr { - return c.localAddr + return c.packetConn.LocalAddr() } func (c *singleOwnerTransport) Dial(ctx context.Context, addr net.Addr, tlsConf *tls.Config, conf *quic.Config) (quic.Connection, error) {