From b7528aba721a62b4284ed7e0bc6eb2ad87a25b7f Mon Sep 17 00:00:00 2001 From: JoeGruff Date: Thu, 16 May 2024 12:16:22 +0900 Subject: [PATCH] sync: Add rescan progress. --- asset/dcr/sync.go | 11 +++++--- asset/dcr/wallet.go | 2 -- cgo/addresses.go | 10 +++---- cgo/cgo.go | 1 - cgo/sync.go | 65 ++++++++++++++++++++++++++++++++++++--------- cgo/transactions.go | 16 +++++------ cgo/walletloader.go | 47 +++++++++++++++++++++++--------- 7 files changed, 109 insertions(+), 43 deletions(-) diff --git a/asset/dcr/sync.go b/asset/dcr/sync.go index 149625c..484cb32 100644 --- a/asset/dcr/sync.go +++ b/asset/dcr/sync.go @@ -7,6 +7,7 @@ import ( "decred.org/dcrwallet/v3/p2p" "decred.org/dcrwallet/v3/spv" + dcrwallet "decred.org/dcrwallet/v3/wallet" "github.com/decred/dcrd/addrmgr/v2" ) @@ -81,7 +82,11 @@ func (w *Wallet) IsSynced() bool { return false } -// RescanFromHeight rescans the wallet from the specified height. -func (w *Wallet) RescanFromHeight(ctx context.Context, startHeight int32) error { - return w.mainWallet.RescanFromHeight(ctx, w.syncer, startHeight) +// RescanProgressFromHeight rescans for relevant transactions in all blocks in +// the main chain starting at startHeight. Progress notifications and any +// errors are sent to the channel p. This function blocks until the rescan +// completes or ends in an error. p is closed before returning. +func (w *Wallet) RescanProgressFromHeight(ctx context.Context, + startHeight int32, p chan<- dcrwallet.RescanProgress) { + w.mainWallet.RescanProgressFromHeight(ctx, w.syncer, startHeight, p) } diff --git a/asset/dcr/wallet.go b/asset/dcr/wallet.go index f22b156..f4cedfa 100644 --- a/asset/dcr/wallet.go +++ b/asset/dcr/wallet.go @@ -79,8 +79,6 @@ func (w *Wallet) CloseWallet() error { } w.log.Info("Wallet closed") - w.mainWallet = nil - w.db = nil return nil } diff --git a/cgo/addresses.go b/cgo/addresses.go index 6060480..f847667 100644 --- a/cgo/addresses.go +++ b/cgo/addresses.go @@ -41,7 +41,7 @@ func newExternalAddress(cName *C.char) *C.char { return errCResponseWithCode(ErrCodeNotSynced, "newExternalAddress requested on an unsynced wallet") } - _, err := w.NewExternalAddress(ctx, udb.DefaultAccountNum) + _, err := w.NewExternalAddress(w.ctx, udb.DefaultAccountNum) if err != nil { return errCResponse("w.NewExternalAddress error: %v", err) } @@ -68,11 +68,11 @@ func signMessage(cName, cMessage, cAddress, cPassword *C.char) *C.char { return errCResponse("unable to decode address: %v", err) } - if err := w.MainWallet().Unlock(ctx, []byte(goString(cPassword)), nil); err != nil { + if err := w.MainWallet().Unlock(w.ctx, []byte(goString(cPassword)), nil); err != nil { return errCResponse("cannot unlock wallet: %v", err) } - sig, err := w.MainWallet().SignMessage(ctx, goString(cMessage), addr) + sig, err := w.MainWallet().SignMessage(w.ctx, goString(cMessage), addr) if err != nil { return errCResponse("unable to sign message: %v", err) } @@ -89,7 +89,7 @@ func addresses(cName *C.char) *C.char { return errCResponse("wallet with name %q is not loaded", goString(cName)) } - addrs, err := w.AddressesByAccount(ctx, defaultAccount) + addrs, err := w.AddressesByAccount(w.ctx, defaultAccount) if err != nil { return errCResponse("w.AddressesByAccount error: %v", err) } @@ -118,7 +118,7 @@ func defaultPubkey(cName *C.char) *C.char { return errCResponse("wallet with name %q is not loaded", goString(cName)) } - pubkey, err := w.AccountPubkey(ctx, defaultAccount) + pubkey, err := w.AccountPubkey(w.ctx, defaultAccount) if err != nil { return errCResponse("unable to get default pubkey: %v", err) } diff --git a/cgo/cgo.go b/cgo/cgo.go index 5585641..9ee2311 100644 --- a/cgo/cgo.go +++ b/cgo/cgo.go @@ -89,7 +89,6 @@ func shutdown() *C.char { logMtx.Lock() log.Debug("libwallet cgo shutdown") logBackend.Close() - log = nil logMtx.Unlock() initialized = false diff --git a/cgo/sync.go b/cgo/sync.go index c2a9d64..94630ce 100644 --- a/cgo/sync.go +++ b/cgo/sync.go @@ -7,6 +7,7 @@ import ( "strings" "decred.org/dcrwallet/v3/spv" + dcrwallet "decred.org/dcrwallet/v3/wallet" ) //export syncWallet @@ -42,6 +43,10 @@ func syncWallet(cName, cPeers *C.char) *C.char { }, FetchMissingCFiltersStarted: func() { w.syncStatusMtx.Lock() + if w.rescanning { + w.syncStatusMtx.Unlock() + return + } w.syncStatusCode = SSCFetchingCFilters w.syncStatusMtx.Unlock() w.log.Info("Fetching missing cfilters started.") @@ -57,6 +62,10 @@ func syncWallet(cName, cPeers *C.char) *C.char { }, FetchHeadersStarted: func() { w.syncStatusMtx.Lock() + if w.rescanning { + w.syncStatusMtx.Unlock() + return + } w.syncStatusCode = SSCFetchingHeaders w.syncStatusMtx.Unlock() w.log.Info("Fetching headers started.") @@ -72,6 +81,10 @@ func syncWallet(cName, cPeers *C.char) *C.char { }, DiscoverAddressesStarted: func() { w.syncStatusMtx.Lock() + if w.rescanning { + w.syncStatusMtx.Unlock() + return + } w.syncStatusCode = SSCDiscoveringAddrs w.syncStatusMtx.Unlock() w.log.Info("Discover addresses started.") @@ -81,6 +94,10 @@ func syncWallet(cName, cPeers *C.char) *C.char { }, RescanStarted: func() { w.syncStatusMtx.Lock() + if w.rescanning { + w.syncStatusMtx.Unlock() + return + } w.syncStatusCode = SSCRescanning w.syncStatusMtx.Unlock() w.log.Info("Rescan started.") @@ -95,7 +112,7 @@ func syncWallet(cName, cPeers *C.char) *C.char { w.log.Info("Rescan finished.") }, } - if err := w.StartSync(ctx, ntfns, peers...); err != nil { + if err := w.StartSync(w.ctx, ntfns, peers...); err != nil { return errCResponse(err.Error()) } return successCResponse("sync started") @@ -120,7 +137,7 @@ func syncWalletStatus(cName *C.char) *C.char { if !is { return errCResponse("backend is not an spv syncer") } - targetHeight := spvSyncer.EstimateMainChainTip(ctx) + targetHeight := spvSyncer.EstimateMainChainTip(w.ctx) // Sometimes it appears we miss a notification during start up. This is // a bandaid to put us as synced in that case. @@ -156,32 +173,56 @@ func syncWalletStatus(cName *C.char) *C.char { //export rescanFromHeight func rescanFromHeight(cName, cHeight *C.char) *C.char { - walletsMtx.Lock() - defer walletsMtx.Unlock() + height, err := strconv.ParseUint(goString(cHeight), 10, 32) + if err != nil { + return errCResponse("height is not an uint32: %v", err) + } name := goString(cName) - w, exists := wallets[name] + w, exists := loadedWallet(cName) if !exists { return errCResponse("wallet with name %q does not exist", name) } - height, err := strconv.ParseUint(goString(cHeight), 10, 32) - if err != nil { - return errCResponse("height is not an uint32: %v", err) + if !w.IsSynced() { + return errCResponseWithCode(ErrCodeNotSynced, "rescanFromHeight requested on an unsynced wallet") } - // We don't seem to get any feedback from wallet when doing rescans here. - // Just set status to rescanning and then to complete when done. w.syncStatusMtx.Lock() + if w.rescanning { + w.syncStatusMtx.Unlock() + return errCResponse("wallet %q already rescanning", name) + } w.syncStatusCode = SSCRescanning w.rescanning = true + w.rescanHeight = int(height) w.syncStatusMtx.Unlock() + w.Add(1) go func() { defer func() { w.syncStatusMtx.Lock() w.syncStatusCode = SSCComplete w.rescanning = false w.syncStatusMtx.Unlock() + w.Done() + }() + prog := make(chan dcrwallet.RescanProgress) + go func() { + w.RescanProgressFromHeight(w.ctx, int32(height), prog) }() - if err := w.RescanFromHeight(ctx, int32(height)); err != nil { - log.Errorf("rescan wallet %q error: %v", name, err) + for { + select { + case p, open := <-prog: + if !open { + return + } + if p.Err != nil { + log.Errorf("rescan wallet %q error: %v", name, err) + return + } + w.syncStatusMtx.Lock() + w.rescanHeight = int(p.ScannedThrough) + w.syncStatusMtx.Unlock() + case <-w.ctx.Done(): + return + } } }() return successCResponse("rescan from height %d for wallet %q started", height, name) diff --git a/cgo/transactions.go b/cgo/transactions.go index b520810..c29f9df 100644 --- a/cgo/transactions.go +++ b/cgo/transactions.go @@ -54,12 +54,12 @@ func createSignedTransaction(cName, cCreateSignedTxJSONReq *C.char) *C.char { ignoreInputs[i] = o } - if err := w.MainWallet().Unlock(ctx, []byte(req.Password), nil); err != nil { + if err := w.MainWallet().Unlock(w.ctx, []byte(req.Password), nil); err != nil { return errCResponse("cannot unlock wallet: %v", err) } defer w.MainWallet().Lock() - txBytes, txhash, fee, err := w.CreateSignedTransaction(ctx, outputs, inputs, ignoreInputs, uint64(req.FeeRate)) + txBytes, txhash, fee, err := w.CreateSignedTransaction(w.ctx, outputs, inputs, ignoreInputs, uint64(req.FeeRate)) if err != nil { return errCResponse("unable to sign send transaction: %v", err) } @@ -82,7 +82,7 @@ func sendRawTransaction(cName, cTxHex *C.char) *C.char { if !exists { return errCResponse("wallet with name %q does not exist", goString(cName)) } - txHash, err := w.SendRawTransaction(ctx, goString(cTxHex)) + txHash, err := w.SendRawTransaction(w.ctx, goString(cTxHex)) if err != nil { return errCResponse("unable to sign send transaction: %v", err) } @@ -95,7 +95,7 @@ func listUnspents(cName *C.char) *C.char { if !exists { return errCResponse("wallet with name %q does not exist", goString(cName)) } - res, err := w.MainWallet().ListUnspent(ctx, 1, math.MaxInt32, nil, defaultAccount) + res, err := w.MainWallet().ListUnspent(w.ctx, 1, math.MaxInt32, nil, defaultAccount) if err != nil { return errCResponse("unable to get unspents: %v", err) } @@ -107,7 +107,7 @@ func listUnspents(cName *C.char) *C.char { return errCResponse("unable to decode address: %v", err) } - ka, err := w.MainWallet().KnownAddress(ctx, addr) + ka, err := w.MainWallet().KnownAddress(w.ctx, addr) if err != nil { return errCResponse("unspent address is not known: %v", err) } @@ -139,7 +139,7 @@ func estimateFee(cName, cNBlocks *C.char) *C.char { if err != nil { return errCResponse("number of blocks is not a uint64: %v", err) } - txFee, err := w.FetchFeeFromOracle(ctx, nBlocks) + txFee, err := w.FetchFeeFromOracle(w.ctx, nBlocks) if err != nil { return errCResponse("unable to get fee from oracle: %v", err) } @@ -160,7 +160,7 @@ func listTransactions(cName, cFrom, cCount *C.char) *C.char { if err != nil { return errCResponse("count is not an int: %v", err) } - res, err := w.MainWallet().ListTransactions(ctx, int(from), int(count)) + res, err := w.MainWallet().ListTransactions(w.ctx, int(from), int(count)) if err != nil { return errCResponse("unable to get transactions: %v", err) } @@ -190,7 +190,7 @@ func bestBlock(cName *C.char) *C.char { if !exists { return errCResponse("wallet with name %q does not exist", goString(cName)) } - blockHash, blockHeight := w.MainWallet().MainChainTip(ctx) + blockHash, blockHeight := w.MainWallet().MainChainTip(w.ctx) res := &BestBlockRes{ Hash: blockHash.String(), Height: int(blockHeight), diff --git a/cgo/walletloader.go b/cgo/walletloader.go index 8dde575..7837568 100644 --- a/cgo/walletloader.go +++ b/cgo/walletloader.go @@ -2,6 +2,7 @@ package main import "C" import ( + "context" "encoding/json" "fmt" "sync" @@ -18,6 +19,10 @@ type wallet struct { *dcr.Wallet log slog.Logger + sync.WaitGroup + ctx context.Context + cancelCtx context.CancelFunc + syncStatusMtx sync.RWMutex syncStatusCode SyncStatusCode targetHeight, cfiltersHeight, headersHeight, rescanHeight, numPeers int @@ -66,15 +71,19 @@ func createWallet(cName, cDataDir, cNet, cPass, cMnemonic *C.char) *C.char { Birthday: birthday, } } + walletCtx, cancel := context.WithCancel(ctx) - w, err := dcr.CreateWallet(ctx, params, recoveryConfig) + w, err := dcr.CreateWallet(walletCtx, params, recoveryConfig) if err != nil { + cancel() return errCResponse(err.Error()) } wallets[name] = &wallet{ - Wallet: w, - log: logger, + Wallet: w, + log: logger, + ctx: walletCtx, + cancelCtx: cancel, } return successCResponse("wallet created") } @@ -108,14 +117,19 @@ func createWatchOnlyWallet(cName, cDataDir, cNet, cPub *C.char) *C.char { }, } - w, err := dcr.CreateWatchOnlyWallet(ctx, goString(cPub), params) + walletCtx, cancel := context.WithCancel(ctx) + + w, err := dcr.CreateWatchOnlyWallet(walletCtx, goString(cPub), params) if err != nil { + cancel() return errCResponse(err.Error()) } wallets[name] = &wallet{ - Wallet: w, - log: logger, + Wallet: w, + log: logger, + ctx: walletCtx, + cancelCtx: cancel, } return successCResponse("wallet created") } @@ -146,18 +160,25 @@ func loadWallet(cName, cDataDir, cNet *C.char) *C.char { DbDriver: "bdb", // use badgerdb for mobile! Logger: logger, } - w, err := dcr.LoadWallet(ctx, params) + + walletCtx, cancel := context.WithCancel(ctx) + + w, err := dcr.LoadWallet(walletCtx, params) if err != nil { + cancel() return errCResponse(err.Error()) } - if err = w.OpenWallet(ctx); err != nil { + if err = w.OpenWallet(walletCtx); err != nil { + cancel() return errCResponse(err.Error()) } wallets[name] = &wallet{ - Wallet: w, - log: logger, + Wallet: w, + log: logger, + ctx: walletCtx, + cancelCtx: cancel, } return successCResponse(fmt.Sprintf("wallet %q loaded", name)) } @@ -185,7 +206,7 @@ func walletBalance(cName *C.char) *C.char { } const confs = 1 - bals, err := w.AccountBalances(ctx, confs) + bals, err := w.AccountBalances(w.ctx, confs) if err != nil { return errCResponse("w.AccountBalances error: %v", err) } @@ -217,6 +238,8 @@ func closeWallet(cName *C.char) *C.char { if !exists { return errCResponse("wallet with name %q does not exist", name) } + w.cancelCtx() + w.Wait() if err := w.CloseWallet(); err != nil { return errCResponse("close wallet %q error: %v", name, err.Error()) } @@ -231,7 +254,7 @@ func changePassphrase(cName, cOldPass, cNewPass *C.char) *C.char { return errCResponse("wallet with name %q not loaded", goString(cName)) } - err := w.MainWallet().ChangePrivatePassphrase(ctx, []byte(goString(cOldPass)), + err := w.MainWallet().ChangePrivatePassphrase(w.ctx, []byte(goString(cOldPass)), []byte(goString(cNewPass))) if err != nil { return errCResponse("w.ChangePrivatePassphrase error: %v", err)