From 1c3163945d2899d75aca0f0ffe22a34fb3175a0a Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Tue, 28 Jan 2025 15:02:38 +0000 Subject: [PATCH 1/2] Order mixclient and syncer shutdown During clean shutdown, signal the mixing client to shutdown first, waiting for it to finish running, before closing the RPC and SPV syncers. Updates the mixing module to a version that supports continuing active mixes before terminating the mixing client. --- chain/sync.go | 40 +++++++++++++++++++++++++++------------- dcrwallet.go | 9 +++++++-- go.mod | 2 ++ go.sum | 4 ++-- spv/sync.go | 48 +++++++++++++++++++++++++++++++++--------------- 5 files changed, 71 insertions(+), 32 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index 0d45c3395..dc7aa4b4e 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -542,9 +542,11 @@ func (s *Syncer) Run(ctx context.Context) (err error) { params := s.wallet.ChainParams() + ntfnCtx, ntfnCtxCancel := context.WithCancel(context.Background()) + defer ntfnCtxCancel() s.notifier = ¬ifier{ syncer: s, - ctx: ctx, + ctx: ntfnCtx, closed: make(chan struct{}), } addr, err := normalizeAddress(s.opts.Address, s.opts.DefaultPort) @@ -589,12 +591,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } opts = append(opts, wsrpc.WithTLSConfig(tc)) } - client, err := wsrpc.Dial(ctx, addr, opts...) + wsClient, err := wsrpc.Dial(ctx, addr, opts...) if err != nil { return err } - defer client.Close() - s.rpc = dcrd.New(client) + defer wsClient.Close() + s.rpc = dcrd.New(wsClient) // Verify that the server is running on the expected network. var netID wire.CurrencyNet @@ -723,10 +725,27 @@ func (s *Syncer) Run(ctx context.Context) (err error) { return err } + defer func() { + ntfnCtxCancel() + + select { + case <-ctx.Done(): + wsClient.Close() + default: + } + + // Wait for notifications to finish before returning + <-s.notifier.closed + }() + + // Ensure wallet.Run cleanly finishes/is canceled first when outer + // context is canceled. + walletCtx, walletCtxCancel := context.WithCancel(context.Background()) + defer walletCtxCancel() g.Go(func() error { // Run wallet background goroutines (currently, this just runs // mixclient). - return s.wallet.Run(ctx) + return s.wallet.Run(walletCtx) }) // Request notifications for mixing messages. @@ -739,18 +758,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) { log.Infof("Blockchain sync completed, wallet ready for general usage.") - // Wait for notifications to finish before returning - defer func() { - <-s.notifier.closed - }() - g.Go(func() error { select { case <-ctx.Done(): - client.Close() + walletCtxCancel() return ctx.Err() - case <-client.Done(): - return client.Err() + case <-wsClient.Done(): + return wsClient.Err() } }) return g.Wait() diff --git a/dcrwallet.go b/dcrwallet.go index 7013e9462..a0a656cfe 100644 --- a/dcrwallet.go +++ b/dcrwallet.go @@ -534,9 +534,10 @@ func spvLoop(ctx context.Context, w *wallet.Wallet) { for { err := syncer.Run(ctx) if done(ctx) { + loggers.SyncLog.Infof("SPV synchronization stopped") return } - log.Errorf("SPV synchronization ended: %v", err) + loggers.SyncLog.Errorf("SPV synchronization stopped: %v", err) } } @@ -571,7 +572,11 @@ func rpcSyncLoop(ctx context.Context, w *wallet.Wallet) { syncer := chain.NewSyncer(w, rpcOptions) err := syncer.Run(ctx) if err != nil { - loggers.SyncLog.Errorf("Wallet synchronization stopped: %v", err) + if errors.Is(err, context.Canceled) || ctx.Err() != nil { + loggers.SyncLog.Infof("RPC synchronization stopped") + return + } + loggers.SyncLog.Errorf("RPC synchronization stopped: %v", err) select { case <-ctx.Done(): return diff --git a/go.mod b/go.mod index bc9be06a5..23a6b4539 100644 --- a/go.mod +++ b/go.mod @@ -59,3 +59,5 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 // indirect lukechampine.com/blake3 v1.3.0 // indirect ) + +replace github.com/decred/dcrd/mixing => github.com/jrick/dcrd/mixing v0.0.0-20250123211715-8a5ce5c2063a diff --git a/go.sum b/go.sum index d882725a3..ee48009ec 100644 --- a/go.sum +++ b/go.sum @@ -50,8 +50,6 @@ github.com/decred/dcrd/gcs/v4 v4.1.0 h1:tpW7JW53yJZlgNwl/n2NL1b8NxHaIPRUyNuLMkB/ github.com/decred/dcrd/gcs/v4 v4.1.0/go.mod h1:nPTbGM/I3Ihe5KFvUmxZEqQP/jDZQjQ63+WEi/f4lqU= github.com/decred/dcrd/hdkeychain/v3 v3.1.2 h1:x25WuuE7zM/20EynuVMyOhL0K8BwGBBsexGq8xTiHFA= github.com/decred/dcrd/hdkeychain/v3 v3.1.2/go.mod h1:FnNJmZ7jqUDeAo6/c/xkQi5cuxh3EWtJeMmW6/Z8lcc= -github.com/decred/dcrd/mixing v0.4.2 h1:mpt2pNIFTI6L1hXrieAWJTQJv5t9WzHcNnhI+tnAG90= -github.com/decred/dcrd/mixing v0.4.2/go.mod h1:VF87lOn41kitgWVOwmXoB4qMYF7+bxItZXyw4JfW3EQ= github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.3.0 h1:l0DnCcILTNrpy8APF3FLN312ChpkQaAuW30aC/RgBaw= github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.3.0/go.mod h1:j+kkRPXPJB5S9VFOsx8SQLcU7PTFkPKRc1aCHN4ENzA= github.com/decred/dcrd/rpcclient/v8 v8.0.1 h1:hd81e4w1KSqvPcozJlnz6XJfWKDNuahgooH/N5E8vOU= @@ -78,6 +76,8 @@ github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LF github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= github.com/jrick/bitset v1.0.0 h1:Ws0PXV3PwXqWK2n7Vz6idCdrV/9OrBXgHEJi27ZB9Dw= github.com/jrick/bitset v1.0.0/go.mod h1:ZOYB5Uvkla7wIEY4FEssPVi3IQXa02arznRaYaAEPe4= +github.com/jrick/dcrd/mixing v0.0.0-20250123211715-8a5ce5c2063a h1:wGTjDa+kmjKBkhnc8BACcU+OreLC+gxTFHLz5t+AFkw= +github.com/jrick/dcrd/mixing v0.0.0-20250123211715-8a5ce5c2063a/go.mod h1:VF87lOn41kitgWVOwmXoB4qMYF7+bxItZXyw4JfW3EQ= github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/jrick/wsrpc/v2 v2.3.8 h1:9vfM8o9g00HXQb/3D6+Y9Cy1uybjD7K1272vtdXXBps= diff --git a/spv/sync.go b/spv/sync.go index 1cf9a57c3..10bca0ba7 100644 --- a/spv/sync.go +++ b/spv/sync.go @@ -321,7 +321,7 @@ func (s *Syncer) setRequiredHeight(tipHeight int32) { } // Run synchronizes the wallet, returning when synchronization fails or the -// context is cancelled. +// context is canceled. func (s *Syncer) Run(ctx context.Context) (err error) { s.doneMu.Lock() s.done = make(chan struct{}) @@ -367,23 +367,23 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } // Start background handlers to read received messages from remote peers - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { return s.receiveGetData(ctx) }) - g.Go(func() error { return s.receiveInv(ctx) }) - g.Go(func() error { return s.receiveHeadersAnnouncements(ctx) }) - g.Go(func() error { return s.receiveMixMsgs(ctx) }) + g, gctx := errgroup.WithContext(context.Background()) + g.Go(func() error { return s.receiveGetData(gctx) }) + g.Go(func() error { return s.receiveInv(gctx) }) + g.Go(func() error { return s.receiveHeadersAnnouncements(gctx) }) + g.Go(func() error { return s.receiveMixMsgs(gctx) }) s.lp.AddHandledMessages(p2p.MaskGetData | p2p.MaskInv) if len(s.persistentPeers) != 0 { for i := range s.persistentPeers { raddr := s.persistentPeers[i] - g.Go(func() error { return s.connectToPersistent(ctx, raddr) }) + g.Go(func() error { return s.connectToPersistent(gctx, raddr) }) } } else { - g.Go(func() error { return s.connectToCandidates(ctx) }) + g.Go(func() error { return s.connectToCandidates(gctx) }) } - g.Go(func() error { return s.handleMempool(ctx) }) + g.Go(func() error { return s.handleMempool(gctx) }) s.wallet.SetNetworkBackend(s) defer s.wallet.SetNetworkBackend(nil) @@ -392,7 +392,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { g.Go(func() error { // First step: fetch missing CFilters. progress := make(chan wallet.MissingCFilterProgress, 1) - go s.wallet.FetchMissingCFiltersWithProgress(ctx, s, progress) + go s.wallet.FetchMissingCFiltersWithProgress(gctx, s, progress) log.Debugf("Fetching missing CFilters...") s.fetchMissingCfiltersStart() @@ -408,14 +408,14 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // Next: fetch headers and cfilters up to mainchain tip. s.fetchHeadersStart() log.Debugf("Fetching headers and CFilters...") - err = s.initialSyncHeaders(ctx) + err = s.initialSyncHeaders(gctx) if err != nil { return err } s.fetchHeadersFinished() // Finally: Perform the initial rescan over the received blocks. - err = s.initialSyncRescan(ctx) + err = s.initialSyncRescan(gctx) if err != nil { return err } @@ -425,10 +425,28 @@ func (s *Syncer) Run(ctx context.Context) (err error) { return nil }) - // Run wallet background goroutines (currently, this just runs - // mixclient). + // Ensure wallet.Run cleanly finishes/is canceled first when outer + // context is canceled. + walletCtx, walletCtxCancel := context.WithCancel(context.Background()) + go func() { + select { + case <-ctx.Done(): + case <-gctx.Done(): + } + walletCtxCancel() + }() g.Go(func() error { - return s.wallet.Run(ctx) + // Run wallet background goroutines (currently, this just runs + // mixclient). + err := s.wallet.Run(walletCtx) + if err != nil { + return err + } + + // If gctx has not yet been canceled, do so here now. + // walletCtx is canceled after either ctx or gctx is canceled. + <-walletCtx.Done() + return walletCtx.Err() }) // Wait until cancellation or a handler errors. From f09ea4c1d6892ed294afe9ce0120134cea333c45 Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Tue, 28 Jan 2025 15:02:38 +0000 Subject: [PATCH 2/2] also pass walletCtx to initial sync --- spv/sync.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/spv/sync.go b/spv/sync.go index 10bca0ba7..41ced656a 100644 --- a/spv/sync.go +++ b/spv/sync.go @@ -388,11 +388,22 @@ func (s *Syncer) Run(ctx context.Context) (err error) { s.wallet.SetNetworkBackend(s) defer s.wallet.SetNetworkBackend(nil) + // Ensure initial sync and wallet.Run cleanly finish/are canceled + // first when outer context is canceled. + walletCtx, walletCtxCancel := context.WithCancel(context.Background()) + go func() { + select { + case <-ctx.Done(): + case <-gctx.Done(): + } + walletCtxCancel() + }() + // Perform the initial startup sync. g.Go(func() error { // First step: fetch missing CFilters. progress := make(chan wallet.MissingCFilterProgress, 1) - go s.wallet.FetchMissingCFiltersWithProgress(gctx, s, progress) + go s.wallet.FetchMissingCFiltersWithProgress(walletCtx, s, progress) log.Debugf("Fetching missing CFilters...") s.fetchMissingCfiltersStart() @@ -408,14 +419,14 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // Next: fetch headers and cfilters up to mainchain tip. s.fetchHeadersStart() log.Debugf("Fetching headers and CFilters...") - err = s.initialSyncHeaders(gctx) + err = s.initialSyncHeaders(walletCtx) if err != nil { return err } s.fetchHeadersFinished() // Finally: Perform the initial rescan over the received blocks. - err = s.initialSyncRescan(gctx) + err = s.initialSyncRescan(walletCtx) if err != nil { return err } @@ -425,16 +436,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { return nil }) - // Ensure wallet.Run cleanly finishes/is canceled first when outer - // context is canceled. - walletCtx, walletCtxCancel := context.WithCancel(context.Background()) - go func() { - select { - case <-ctx.Done(): - case <-gctx.Done(): - } - walletCtxCancel() - }() g.Go(func() error { // Run wallet background goroutines (currently, this just runs // mixclient).