From 88912448df9ff9c1bf9d615c42a22ac92f440d54 Mon Sep 17 00:00:00 2001 From: Linden <70739041+linden@users.noreply.github.com> Date: Sun, 18 Feb 2024 14:37:51 -0800 Subject: [PATCH 1/5] feat: move to `tempdb` for tests bbolt has a heavy dependency on memory mapping and the file system which isn't yet supported in the browser wasm runtime, so instead tempdb keeps everything in-memory so we can support both browser and native targets while testing. --- bamboozle_unit_test.go | 5 ++--- banman/store_test.go | 27 +++++----------------- blockmanager_test.go | 4 +--- filterdb/db_test.go | 12 ++-------- go.mod | 8 ++++--- go.sum | 7 +++++- headerfs/index_test.go | 51 ++++++++++-------------------------------- headerfs/store_test.go | 11 +++------ sync_test.go | 6 ++--- 9 files changed, 38 insertions(+), 93 deletions(-) diff --git a/bamboozle_unit_test.go b/bamboozle_unit_test.go index e33eea644..7428fda6b 100644 --- a/bamboozle_unit_test.go +++ b/bamboozle_unit_test.go @@ -15,6 +15,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcwallet/walletdb" "github.com/lightninglabs/neutrino/headerfs" + _ "github.com/linden/tempdb" ) func decodeHashNoError(str string) *chainhash.Hash { @@ -525,9 +526,7 @@ func runCheckCFCheckptSanityTestCase(t *testing.T, testCase *cfCheckptTestCase) } defer os.RemoveAll(tempDir) - db, err := walletdb.Create( - "bdb", tempDir+"/weks.db", true, dbOpenTimeout, - ) + db, err := walletdb.Create("tempdb", "weks.db") if err != nil { t.Fatalf("Error opening DB: %s", err) } diff --git a/banman/store_test.go b/banman/store_test.go index e4b5f8c40..52b42f89c 100644 --- a/banman/store_test.go +++ b/banman/store_test.go @@ -1,46 +1,30 @@ package banman_test import ( - "io/ioutil" "net" - "os" - "path/filepath" "testing" "time" "github.com/btcsuite/btcwallet/walletdb" - _ "github.com/btcsuite/btcwallet/walletdb/bdb" "github.com/lightninglabs/neutrino/banman" + _ "github.com/linden/tempdb" ) // createTestBanStore creates a test Store backed by a boltdb instance. -func createTestBanStore(t *testing.T) (banman.Store, func()) { +func createTestBanStore(t *testing.T) banman.Store { t.Helper() - dbDir, err := ioutil.TempDir("", "") + db, err := walletdb.Create("tempdb", "test.db") if err != nil { - t.Fatalf("unable to create db dir: %v", err) - } - dbPath := filepath.Join(dbDir, "test.db") - - db, err := walletdb.Create("bdb", dbPath, true, time.Second*10) - if err != nil { - os.RemoveAll(dbDir) t.Fatalf("unable to create db: %v", err) } - cleanUp := func() { - db.Close() - os.RemoveAll(dbDir) - } - banStore, err := banman.NewStore(db) if err != nil { - cleanUp() t.Fatalf("unable to create ban store: %v", err) } - return banStore, cleanUp + return banStore } // TestBanStore ensures that the BanStore's state correctly reflects the @@ -50,8 +34,7 @@ func TestBanStore(t *testing.T) { // We'll start by creating our test BanStore backed by a boltdb // instance. - banStore, cleanUp := createTestBanStore(t) - defer cleanUp() + banStore := createTestBanStore(t) // checkBanStore is a helper closure to ensure to the IP network's ban // status is correctly reflected within the BanStore. diff --git a/blockmanager_test.go b/blockmanager_test.go index 85dae52bb..ef69a5000 100644 --- a/blockmanager_test.go +++ b/blockmanager_test.go @@ -59,9 +59,7 @@ func setupBlockManager(t *testing.T) (*blockManager, headerfs.BlockHeaderStore, // Set up the block and filter header stores. tempDir := t.TempDir() - db, err := walletdb.Create( - "bdb", tempDir+"/weks.db", true, dbOpenTimeout, - ) + db, err := walletdb.Create("tempdb", "weks.db") if err != nil { return nil, nil, nil, fmt.Errorf("error opening DB: %s", err) } diff --git a/filterdb/db_test.go b/filterdb/db_test.go index 6d444d557..4ba5db2ec 100644 --- a/filterdb/db_test.go +++ b/filterdb/db_test.go @@ -3,27 +3,19 @@ package filterdb import ( "math/rand" "testing" - "time" "github.com/btcsuite/btcd/btcutil/gcs" "github.com/btcsuite/btcd/btcutil/gcs/builder" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcwallet/walletdb" - _ "github.com/btcsuite/btcwallet/walletdb/bdb" + _ "github.com/linden/tempdb" "github.com/stretchr/testify/require" ) func createTestDatabase(t *testing.T) FilterDatabase { - tempDir := t.TempDir() - - db, err := walletdb.Create( - "bdb", tempDir+"/test.db", true, time.Second*10, - ) + db, err := walletdb.Create("tempdb", "test.db") require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, db.Close()) - }) filterDB, err := New(db, chaincfg.SimNetParams) require.NoError(t, err) diff --git a/go.mod b/go.mod index 9d5f1b921..1aeffc555 100644 --- a/go.mod +++ b/go.mod @@ -7,11 +7,12 @@ require ( github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f github.com/btcsuite/btcwallet/wallet/txauthor v1.2.3 - github.com/btcsuite/btcwallet/walletdb v1.3.5 + github.com/btcsuite/btcwallet/walletdb v1.4.0 github.com/btcsuite/btcwallet/wtxmgr v1.5.0 github.com/davecgh/go-spew v1.1.1 github.com/lightninglabs/neutrino/cache v1.1.0 github.com/lightningnetwork/lnd/queue v1.0.1 + github.com/linden/tempdb v0.0.0-20240218031655-83bc03e79f51 github.com/stretchr/testify v1.8.1 ) @@ -28,10 +29,11 @@ require ( github.com/lightningnetwork/lnd/clock v1.0.1 // indirect github.com/lightningnetwork/lnd/ticker v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - go.etcd.io/bbolt v1.3.5-0.20200615073812-232d8fc87f50 // indirect golang.org/x/crypto v0.1.0 // indirect golang.org/x/sys v0.1.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) -go 1.18 +go 1.21.2 + +toolchain go1.21.5 diff --git a/go.sum b/go.sum index aec8781e4..7cbb6007a 100644 --- a/go.sum +++ b/go.sum @@ -27,8 +27,9 @@ github.com/btcsuite/btcwallet/wallet/txrules v1.2.0 h1:BtEN5Empw62/RVnZ0VcJaVtVl github.com/btcsuite/btcwallet/wallet/txrules v1.2.0/go.mod h1:AtkqiL7ccKWxuLYtZm8Bu8G6q82w4yIZdgq6riy60z0= github.com/btcsuite/btcwallet/wallet/txsizes v1.1.0 h1:wZnOolEAeNOHzHTnznw/wQv+j35ftCIokNrnOTOU5o8= github.com/btcsuite/btcwallet/wallet/txsizes v1.1.0/go.mod h1:pauEU8UuMFiThe5PB3EO+gO5kx87Me5NvdQDsTuq6cs= -github.com/btcsuite/btcwallet/walletdb v1.3.5 h1:SoxUPLgJUkyO1XqON6X7x+rjHJoIpRQov8o8X6gNoz8= github.com/btcsuite/btcwallet/walletdb v1.3.5/go.mod h1:oJDxAEUHVtnmIIBaa22wSBPTVcs6hUp5NKWmI8xDwwU= +github.com/btcsuite/btcwallet/walletdb v1.4.0 h1:/C5JRF+dTuE2CNMCO/or5N8epsrhmSM4710uBQoYPTQ= +github.com/btcsuite/btcwallet/walletdb v1.4.0/go.mod h1:oJDxAEUHVtnmIIBaa22wSBPTVcs6hUp5NKWmI8xDwwU= github.com/btcsuite/btcwallet/wtxmgr v1.5.0 h1:WO0KyN4l6H3JWnlFxfGR7r3gDnlGT7W2cL8vl6av4SU= github.com/btcsuite/btcwallet/wtxmgr v1.5.0/go.mod h1:TQVDhFxseiGtZwEPvLgtfyxuNUDsIdaJdshvWzR0HJ4= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd h1:R/opQEbFEy9JGkIguV40SvRY1uliPX8ifOvi6ICsFCw= @@ -83,6 +84,10 @@ github.com/lightningnetwork/lnd/queue v1.0.1 h1:jzJKcTy3Nj5lQrooJ3aaw9Lau3I0IwvQ github.com/lightningnetwork/lnd/queue v1.0.1/go.mod h1:vaQwexir73flPW43Mrm7JOgJHmcEFBWWSl9HlyASoms= github.com/lightningnetwork/lnd/ticker v1.0.0 h1:S1b60TEGoTtCe2A0yeB+ecoj/kkS4qpwh6l+AkQEZwU= github.com/lightningnetwork/lnd/ticker v1.0.0/go.mod h1:iaLXJiVgI1sPANIF2qYYUJXjoksPNvGNYowB8aRbpX0= +github.com/linden/tempdb v0.0.0-20231124230014-42fe18a60308 h1:3J67IzgcvBcl1UyzMuExSPmq7hejA1Vr1E7ixKqAUds= +github.com/linden/tempdb v0.0.0-20231124230014-42fe18a60308/go.mod h1:xR9HUmc4girdp/lNzw1jOt53GaCSmctyB8t+Q6EkWp8= +github.com/linden/tempdb v0.0.0-20240218031655-83bc03e79f51 h1:RfZREkHD3XpItaIN2I1/tb2hCzE2TN5e14OkTH6Sv74= +github.com/linden/tempdb v0.0.0-20240218031655-83bc03e79f51/go.mod h1:xR9HUmc4girdp/lNzw1jOt53GaCSmctyB8t+Q6EkWp8= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= diff --git a/headerfs/index_test.go b/headerfs/index_test.go index ebebfd4fc..edb5192aa 100644 --- a/headerfs/index_test.go +++ b/headerfs/index_test.go @@ -5,47 +5,29 @@ import ( "crypto/rand" "encoding/binary" "fmt" - "io/ioutil" mathRand "math/rand" - "os" "testing" - "time" "github.com/btcsuite/btcwallet/walletdb" - _ "github.com/btcsuite/btcwallet/walletdb/bdb" + _ "github.com/linden/tempdb" ) -func createTestIndex(t testing.TB) (func(), *headerIndex, error) { - tempDir, err := ioutil.TempDir("", "neutrino") +func createTestIndex(t testing.TB) (*headerIndex, error) { + db, err := walletdb.Create("tempdb", "test.db") if err != nil { - return nil, nil, err - } - - db, err := walletdb.Create( - "bdb", tempDir+"/test.db", true, time.Second*10, - ) - if err != nil { - return nil, nil, err - } - - cleanUp := func() { - _ = db.Close() - fi, _ := os.Stat(tempDir + "/test.db") - t.Logf("DB file size at cleanup: %d bytes\n", fi.Size()) - _ = os.RemoveAll(tempDir) + return nil, err } filterDB, err := newHeaderIndex(db, Block) if err != nil { - return nil, nil, err + return nil, err } - return cleanUp, filterDB, nil + return filterDB, nil } func TestAddHeadersIndexRetrieve(t *testing.T) { - cleanUp, hIndex, err := createTestIndex(t) - defer cleanUp() + hIndex, err := createTestIndex(t) if err != nil { t.Fatalf("unable to create test db: %v", err) } @@ -115,11 +97,10 @@ func TestAddHeadersIndexRetrieve(t *testing.T) { // location in the bbolt database for reduced memory consumption don't impact // existing users that already have entries in their database. func TestHeaderStorageFallback(t *testing.T) { - cleanUp, hIndex, err := createTestIndex(t) + hIndex, err := createTestIndex(t) if err != nil { t.Fatalf("unable to create test db: %v", err) } - defer cleanUp() // First, write some headers directly to the root index bucket manually // to simulate users with the old database format. @@ -238,7 +219,7 @@ func BenchmarkWriteHeadersSmallBatch(b *testing.B) { numBatches = 5000 ) for n := 0; n < b.N; n++ { - cleanUp, hIndex, err := createTestIndex(b) + hIndex, err := createTestIndex(b) if err != nil { b.Fatalf("unable to create test db: %v", err) } @@ -249,8 +230,6 @@ func BenchmarkWriteHeadersSmallBatch(b *testing.B) { b.Fatalf("error writing random batch: %v", err) } } - - cleanUp() } } @@ -262,7 +241,7 @@ func BenchmarkWriteHeadersMediumBatch(b *testing.B) { numBatches = 250 ) for n := 0; n < b.N; n++ { - cleanUp, hIndex, err := createTestIndex(b) + hIndex, err := createTestIndex(b) if err != nil { b.Fatalf("unable to create test db: %v", err) } @@ -273,8 +252,6 @@ func BenchmarkWriteHeadersMediumBatch(b *testing.B) { b.Fatalf("error writing random batch: %v", err) } } - - cleanUp() } } @@ -286,7 +263,7 @@ func BenchmarkWriteHeadersLargeBatch(b *testing.B) { numBatches = 50 ) for n := 0; n < b.N; n++ { - cleanUp, hIndex, err := createTestIndex(b) + hIndex, err := createTestIndex(b) if err != nil { b.Fatalf("unable to create test db: %v", err) } @@ -297,8 +274,6 @@ func BenchmarkWriteHeadersLargeBatch(b *testing.B) { b.Fatalf("error writing random batch: %v", err) } } - - cleanUp() } } @@ -306,7 +281,7 @@ func BenchmarkWriteHeadersLargeBatch(b *testing.B) { // index with a hash. func BenchmarkHeightLookupLatency(b *testing.B) { // Start by creating an index with 10k headers. - cleanUp, hIndex, err := createTestIndex(b) + hIndex, err := createTestIndex(b) if err != nil { b.Fatalf("unable to create test db: %v", err) } @@ -328,8 +303,6 @@ func BenchmarkHeightLookupLatency(b *testing.B) { b.Fatalf("error fetching height: %v", err) } } - - cleanUp() } // writeRandomBatch creates a random batch with numHeaders headers and writes it diff --git a/headerfs/store_test.go b/headerfs/store_test.go index 3c00736a5..c10032f34 100644 --- a/headerfs/store_test.go +++ b/headerfs/store_test.go @@ -6,7 +6,6 @@ import ( "io/ioutil" "math/rand" "os" - "path/filepath" "reflect" "testing" "time" @@ -16,6 +15,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcwallet/walletdb" "github.com/davecgh/go-spew/spew" + _ "github.com/linden/tempdb" ) func createTestBlockHeaderStore() (func(), walletdb.DB, string, @@ -26,10 +26,7 @@ func createTestBlockHeaderStore() (func(), walletdb.DB, string, return nil, nil, "", nil, err } - dbPath := filepath.Join(tempDir, "test.db") - db, err := walletdb.Create( - "bdb", dbPath, true, time.Second*10, - ) + db, err := walletdb.Create("tempdb", "test.db") if err != nil { return nil, nil, "", nil, err } @@ -41,7 +38,6 @@ func createTestBlockHeaderStore() (func(), walletdb.DB, string, cleanUp := func() { os.RemoveAll(tempDir) - db.Close() } return cleanUp, db, tempDir, hStore.(*blockHeaderStore), nil @@ -231,8 +227,7 @@ func createTestFilterHeaderStore() (func(), walletdb.DB, string, *FilterHeaderSt return nil, nil, "", nil, err } - dbPath := filepath.Join(tempDir, "test.db") - db, err := walletdb.Create("bdb", dbPath, true, time.Second*10) + db, err := walletdb.Create("tempdb", "test.db") if err != nil { return nil, nil, "", nil, err } diff --git a/sync_test.go b/sync_test.go index 20e77c347..83f9a1f28 100644 --- a/sync_test.go +++ b/sync_test.go @@ -27,10 +27,10 @@ import ( "github.com/btcsuite/btclog" "github.com/btcsuite/btcwallet/wallet/txauthor" "github.com/btcsuite/btcwallet/walletdb" - _ "github.com/btcsuite/btcwallet/walletdb/bdb" "github.com/lightninglabs/neutrino" "github.com/lightninglabs/neutrino/banman" "github.com/lightninglabs/neutrino/headerfs" + _ "github.com/linden/tempdb" ) var ( @@ -1140,9 +1140,7 @@ func TestNeutrinoSync(t *testing.T) { t.Fatalf("Failed to create temporary directory: %s", err) } defer os.RemoveAll(tempDir) - db, err := walletdb.Create( - "bdb", tempDir+"/weks.db", true, dbOpenTimeout, - ) + db, err := walletdb.Create("tempdb", "weks.db") if err != nil { t.Fatalf("Error opening DB: %s\n", err) } From 9b9791bb9a5637db1c4df8b3a740b407657a1ea7 Mon Sep 17 00:00:00 2001 From: Linden <70739041+linden@users.noreply.github.com> Date: Sun, 18 Feb 2024 14:43:42 -0800 Subject: [PATCH 2/5] fix(`headerfs`): use iteration-specific pointer when truncating --- headerfs/index_test.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/headerfs/index_test.go b/headerfs/index_test.go index edb5192aa..4737a0ac4 100644 --- a/headerfs/index_test.go +++ b/headerfs/index_test.go @@ -173,12 +173,18 @@ func TestHeaderStorageFallback(t *testing.T) { t.Fatalf("error setting new tip: %v", err) } for _, header := range newHeaderEntries { - if err := hIndex.truncateIndex(&header.hash, true); err != nil { + // Copy the header hash so we can create a iteration-specific pointer. + // https://github.com/golang/go/discussions/56010#discussion-4441437. + hash := header.hash + + if err := hIndex.truncateIndex(&hash, true); err != nil { t.Fatalf("error truncating tip: %v", err) } } for _, header := range oldHeaderEntries { - if err := hIndex.truncateIndex(&header.hash, true); err != nil { + hash := header.hash + + if err := hIndex.truncateIndex(&hash, true); err != nil { t.Fatalf("error truncating tip: %v", err) } } From bb4bacf4028242af3edeaf49531193950033e598 Mon Sep 17 00:00:00 2001 From: Linden <70739041+linden@users.noreply.github.com> Date: Sun, 18 Feb 2024 14:48:56 -0800 Subject: [PATCH 3/5] feat(`headerfs`): separate file-system dependencies in `headerStore` --- headerfs/file.go | 201 +++++++++++++- headerfs/store.go | 627 ++++++++++++++++--------------------------- headerfs/truncate.go | 4 +- mock_store.go | 4 + 4 files changed, 439 insertions(+), 397 deletions(-) diff --git a/headerfs/file.go b/headerfs/file.go index 61258b22b..84bc30e08 100644 --- a/headerfs/file.go +++ b/headerfs/file.go @@ -1,18 +1,211 @@ +//go:build !windows && !js && !wasm + package headerfs import ( "bytes" "fmt" "os" + "path/filepath" + "sync" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcwallet/walletdb" ) -// ErrHeaderNotFound is returned when a target header on disk (flat file) can't -// be found. -type ErrHeaderNotFound struct { - error +// headerBufPool is a pool of bytes.Buffer that will be re-used by the various +// headerStore implementations to batch their header writes to disk. By +// utilizing this variable we can minimize the total number of allocations when +// writing headers to disk. +var headerBufPool = sync.Pool{ + New: func() interface{} { return new(bytes.Buffer) }, +} + +// headerStore combines a on-disk set of headers within a flat file in addition +// to a database which indexes that flat file. Together, these two abstractions +// can be used in order to build an indexed header store for any type of +// "header" as it deals only with raw bytes, and leaves it to a higher layer to +// interpret those raw bytes accordingly. +// +// TODO(roasbeef): quickcheck coverage. +type headerStore struct { + mtx sync.RWMutex // nolint:structcheck // false positive because used as embedded struct only + + fileName string + + file *os.File + + hType HeaderType + + *headerIndex +} + +// newHeaderStore creates a new headerStore given an already open database, a +// target file path for the flat-file and a particular header type. The target +// file will be created as necessary. +func newHeaderStore(db walletdb.DB, filePath string, + hType HeaderType) (*headerStore, error) { + + var flatFileName string + switch hType { + case Block: + flatFileName = "block_headers.bin" + case RegularFilter: + flatFileName = "reg_filter_headers.bin" + default: + return nil, fmt.Errorf("unrecognized filter type: %v", hType) + } + + flatFileName = filepath.Join(filePath, flatFileName) + + // We'll open the file, creating it if necessary and ensuring that all + // writes are actually appends to the end of the file. + fileFlags := os.O_RDWR | os.O_APPEND | os.O_CREATE + headerFile, err := os.OpenFile(flatFileName, fileFlags, 0644) + if err != nil { + return nil, err + } + + // With the file open, we'll then create the header index so we can + // have random access into the flat files. + index, err := newHeaderIndex(db, hType) + if err != nil { + return nil, err + } + + return &headerStore{ + fileName: flatFileName, + file: headerFile, + hType: hType, + headerIndex: index, + }, nil +} + +// WriteHeaders writes a set of headers to disk and updates the index in a +// single atomic transaction. +// +// NOTE: Part of the BlockHeaderStore interface. +func (h *blockHeaderStore) WriteHeaders(hdrs ...BlockHeader) error { + // Lock store for write. + h.mtx.Lock() + defer h.mtx.Unlock() + + // First, we'll grab a buffer from the write buffer pool so we an + // reduce our total number of allocations, and also write the headers + // in a single swoop. + headerBuf := headerBufPool.Get().(*bytes.Buffer) + headerBuf.Reset() + defer headerBufPool.Put(headerBuf) + + // Next, we'll write out all the passed headers in series into the + // buffer we just extracted from the pool. + for _, header := range hdrs { + if err := header.Serialize(headerBuf); err != nil { + return err + } + } + + // With all the headers written to the buffer, we'll now write out the + // entire batch in a single write call. + if err := h.appendRaw(headerBuf.Bytes()); err != nil { + return err + } + + // Once those are written, we'll then collate all the headers into + // headerEntry instances so we can write them all into the index in a + // single atomic batch. + headerLocs := make([]headerEntry, len(hdrs)) + for i, header := range hdrs { + headerLocs[i] = header.toIndexEntry() + } + + return h.addHeaders(headerLocs) +} + +// WriteHeaders writes a batch of filter headers to persistent storage. The +// headers themselves are appended to the flat file, and then the index updated +// to reflect the new entires. +func (f *FilterHeaderStore) WriteHeaders(hdrs ...FilterHeader) error { + // Lock store for write. + f.mtx.Lock() + defer f.mtx.Unlock() + + // If there are 0 headers to be written, return immediately. This + // prevents the newTip assignment from panicking because of an index + // of -1. + if len(hdrs) == 0 { + return nil + } + + // First, we'll grab a buffer from the write buffer pool so we an + // reduce our total number of allocations, and also write the headers + // in a single swoop. + headerBuf := headerBufPool.Get().(*bytes.Buffer) + headerBuf.Reset() + defer headerBufPool.Put(headerBuf) + + // Next, we'll write out all the passed headers in series into the + // buffer we just extracted from the pool. + for _, header := range hdrs { + if _, err := headerBuf.Write(header.FilterHash[:]); err != nil { + return err + } + } + + // With all the headers written to the buffer, we'll now write out the + // entire batch in a single write call. + if err := f.appendRaw(headerBuf.Bytes()); err != nil { + return err + } + + // As the block headers should already be written, we only need to + // update the tip pointer for this particular header type. + newTip := hdrs[len(hdrs)-1].toIndexEntry().hash + return f.truncateIndex(&newTip, false) +} + +// Remove the file. +func (h *headerStore) Remove() error { + // Close the file before removing it. This is required by some + // OS, e.g., Windows. + if err := h.file.Close(); err != nil { + return err + } + if err := os.Remove(h.fileName); err != nil { + return err + } + + return nil +} + +// Calculate the current height. +func (h *headerStore) height() (uint32, bool, error) { + fileInfo, err := h.file.Stat() + if err != nil { + return 0, false, err + } + + size := fileInfo.Size() + + // Check if the file is empty. Fallback to a height of zero. + if size == 0 { + return 0, true, nil + } + + var fileHeight uint32 + + // Compute the size of the current file so we can + // calculate the latest header written to disk. + switch h.hType { + case Block: + fileHeight = uint32(size/80) - 1 + + case RegularFilter: + fileHeight = uint32(size/32) - 1 + } + + return fileHeight, false, nil } // appendRaw appends a new raw header to the end of the flat file. diff --git a/headerfs/store.go b/headerfs/store.go index b43320bda..7002d55a1 100644 --- a/headerfs/store.go +++ b/headerfs/store.go @@ -1,11 +1,7 @@ package headerfs import ( - "bytes" "fmt" - "os" - "path/filepath" - "sync" "time" "github.com/btcsuite/btcd/blockchain" @@ -16,6 +12,30 @@ import ( "github.com/btcsuite/btcwallet/walletdb" ) +// FilterHeader represents a filter header (basic or extended). The filter +// header itself is coupled with the block height and hash of the filter's +// block. +type FilterHeader struct { + // HeaderHash is the hash of the block header that this filter header + // corresponds to. + HeaderHash chainhash.Hash + + // FilterHash is the filter header itself. + FilterHash chainhash.Hash + + // Height is the block height of the filter header in the main chain. + Height uint32 +} + +// toIndexEntry converts the filter header into a index entry to be stored +// within the database. +func (f *FilterHeader) toIndexEntry() headerEntry { + return headerEntry{ + hash: f.HeaderHash, + height: f.Height, + } +} + // BlockStamp represents a block, identified by its height and time stamp in // the chain. We also lift the timestamp from the block header itself into this // struct as well. @@ -74,71 +94,33 @@ type BlockHeaderStore interface { // The information about the new header tip after truncation is // returned. RollbackLastBlock() (*BlockStamp, error) -} -// headerBufPool is a pool of bytes.Buffer that will be re-used by the various -// headerStore implementations to batch their header writes to disk. By -// utilizing this variable we can minimize the total number of allocations when -// writing headers to disk. -var headerBufPool = sync.Pool{ - New: func() interface{} { return new(bytes.Buffer) }, + // Close and delete the BlockHeaderStore. + Remove() error } -// headerStore combines a on-disk set of headers within a flat file in addition -// to a database which indexes that flat file. Together, these two abstractions -// can be used in order to build an indexed header store for any type of -// "header" as it deals only with raw bytes, and leaves it to a higher layer to -// interpret those raw bytes accordingly. -// -// TODO(roasbeef): quickcheck coverage. -type headerStore struct { - mtx sync.RWMutex // nolint:structcheck // false positive because used as embedded struct only - - fileName string - - file *os.File +// BlockHeader is a Bitcoin block header that also has its height included. +type BlockHeader struct { + *wire.BlockHeader - *headerIndex + // Height is the height of this block header within the current main + // chain. + Height uint32 } -// newHeaderStore creates a new headerStore given an already open database, a -// target file path for the flat-file and a particular header type. The target -// file will be created as necessary. -func newHeaderStore(db walletdb.DB, filePath string, - hType HeaderType) (*headerStore, error) { - - var flatFileName string - switch hType { - case Block: - flatFileName = "block_headers.bin" - case RegularFilter: - flatFileName = "reg_filter_headers.bin" - default: - return nil, fmt.Errorf("unrecognized filter type: %v", hType) - } - - flatFileName = filepath.Join(filePath, flatFileName) - - // We'll open the file, creating it if necessary and ensuring that all - // writes are actually appends to the end of the file. - fileFlags := os.O_RDWR | os.O_APPEND | os.O_CREATE - headerFile, err := os.OpenFile(flatFileName, fileFlags, 0644) - if err != nil { - return nil, err - } - - // With the file open, we'll then create the header index so we can - // have random access into the flat files. - index, err := newHeaderIndex(db, hType) - if err != nil { - return nil, err +// toIndexEntry converts the BlockHeader into a matching headerEntry. This +// method is used when a header is to be written to persistent storage. +func (b *BlockHeader) toIndexEntry() headerEntry { + return headerEntry{ + hash: b.BlockHash(), + height: b.Height, } +} - return &headerStore{ - fileName: flatFileName, - file: headerFile, - headerIndex: index, - }, nil +// ErrHeaderNotFound is returned when a target header on persistent storage (flat file) can't +// be found. +type ErrHeaderNotFound struct { + error } // blockHeaderStore is an implementation of the BlockHeaderStore interface, a @@ -153,86 +135,6 @@ type blockHeaderStore struct { // BlockHeaderStore interface. var _ BlockHeaderStore = (*blockHeaderStore)(nil) -// NewBlockHeaderStore creates a new instance of the blockHeaderStore based on -// a target file path, an open database instance, and finally a set of -// parameters for the target chain. These parameters are required as if this is -// the initial start up of the blockHeaderStore, then the initial genesis -// header will need to be inserted. -func NewBlockHeaderStore(filePath string, db walletdb.DB, - netParams *chaincfg.Params) (BlockHeaderStore, error) { - - hStore, err := newHeaderStore(db, filePath, Block) - if err != nil { - return nil, err - } - - // With the header store created, we'll fetch the file size to see if - // we need to initialize it with the first header or not. - fileInfo, err := hStore.file.Stat() - if err != nil { - return nil, err - } - - bhs := &blockHeaderStore{ - headerStore: hStore, - } - - // If the size of the file is zero, then this means that we haven't yet - // written the initial genesis header to disk, so we'll do so now. - if fileInfo.Size() == 0 { - genesisHeader := BlockHeader{ - BlockHeader: &netParams.GenesisBlock.Header, - Height: 0, - } - if err := bhs.WriteHeaders(genesisHeader); err != nil { - return nil, err - } - - return bhs, nil - } - - // As a final initialization step (if this isn't the first time), we'll - // ensure that the header tip within the flat files, is in sync with - // out database index. - tipHash, tipHeight, err := bhs.chainTip() - if err != nil { - return nil, err - } - - // First, we'll compute the size of the current file so we can - // calculate the latest header written to disk. - fileHeight := uint32(fileInfo.Size()/80) - 1 - - // Using the file's current height, fetch the latest on-disk header. - latestFileHeader, err := bhs.readHeader(fileHeight) - if err != nil { - return nil, err - } - - // If the index's tip hash, and the file on-disk match, then we're - // done here. - latestBlockHash := latestFileHeader.BlockHash() - if tipHash.IsEqual(&latestBlockHash) { - return bhs, nil - } - - // TODO(roasbeef): below assumes index can never get ahead? - // * we always update files _then_ indexes - // * need to dual pointer walk back for max safety - - // Otherwise, we'll need to truncate the file until it matches the - // current index tip. - for fileHeight > tipHeight { - if err := bhs.singleTruncate(); err != nil { - return nil, err - } - - fileHeight-- - } - - return bhs, nil -} - // FetchHeader attempts to retrieve a block header determined by the passed // block height. // @@ -249,7 +151,7 @@ func (h *blockHeaderStore) FetchHeader(hash *chainhash.Hash) (*wire.BlockHeader, return nil, 0, err } - // With the height known, we can now read the header from disk. + // With the height known, we can now read the header from persistent storage. header, err := h.readHeader(height) if err != nil { return nil, 0, err @@ -313,7 +215,7 @@ func (h *blockHeaderStore) HeightFromHash(hash *chainhash.Hash) (uint32, error) return h.heightFromHash(hash) } -// RollbackLastBlock rollsback both the index, and on-disk header file by a +// RollbackLastBlock rollsback both the index, and on-persistent storage header file by a // _single_ header. This method is meant to be used in the case of re-org which // disconnects the latest block header from the end of the main chain. The // information about the new header tip after truncation is returned. @@ -331,7 +233,7 @@ func (h *blockHeaderStore) RollbackLastBlock() (*BlockStamp, error) { } // With this height obtained, we'll use it to read the previous header - // from disk, so we can populate our return value which requires the + // from persistent storage, so we can populate our return value which requires the // prev header hash. prevHeader, err := h.readHeader(chainTipHeight - 1) if err != nil { @@ -357,65 +259,6 @@ func (h *blockHeaderStore) RollbackLastBlock() (*BlockStamp, error) { }, nil } -// BlockHeader is a Bitcoin block header that also has its height included. -type BlockHeader struct { - *wire.BlockHeader - - // Height is the height of this block header within the current main - // chain. - Height uint32 -} - -// toIndexEntry converts the BlockHeader into a matching headerEntry. This -// method is used when a header is to be written to disk. -func (b *BlockHeader) toIndexEntry() headerEntry { - return headerEntry{ - hash: b.BlockHash(), - height: b.Height, - } -} - -// WriteHeaders writes a set of headers to disk and updates the index in a -// single atomic transaction. -// -// NOTE: Part of the BlockHeaderStore interface. -func (h *blockHeaderStore) WriteHeaders(hdrs ...BlockHeader) error { - // Lock store for write. - h.mtx.Lock() - defer h.mtx.Unlock() - - // First, we'll grab a buffer from the write buffer pool so we an - // reduce our total number of allocations, and also write the headers - // in a single swoop. - headerBuf := headerBufPool.Get().(*bytes.Buffer) - headerBuf.Reset() - defer headerBufPool.Put(headerBuf) - - // Next, we'll write out all the passed headers in series into the - // buffer we just extracted from the pool. - for _, header := range hdrs { - if err := header.Serialize(headerBuf); err != nil { - return err - } - } - - // With all the headers written to the buffer, we'll now write out the - // entire batch in a single write call. - if err := h.appendRaw(headerBuf.Bytes()); err != nil { - return err - } - - // Once those are written, we'll then collate all the headers into - // headerEntry instances so we can write them all into the index in a - // single atomic batch. - headerLocs := make([]headerEntry, len(hdrs)) - for i, header := range hdrs { - headerLocs[i] = header.toIndexEntry() - } - - return h.addHeaders(headerLocs) -} - // blockLocatorFromHash takes a given block hash and then creates a block // locator using it as the root of the locator. We'll start by taking a single // step backwards, then keep doubling the distance until genesis after we get @@ -494,7 +337,7 @@ func (h *blockHeaderStore) BlockLocatorFromHash(hash *chainhash.Hash) ( return h.blockLocatorFromHash(hash) } -// CheckConnectivity cycles through all of the block headers on disk, from last +// CheckConnectivity cycles through all of the block headers on persistent storage, from last // to first, and makes sure they all connect to each other. Additionally, at // each block header, we also ensure that the index entry for that height and // hash also match up properly. @@ -536,12 +379,12 @@ func (h *blockHeaderStore) CheckConnectivity() error { // With the header retrieved, we'll now fetch the // height for this current header hash to ensure the - // on-disk state and the index matches up properly. + // on-persistent storage state and the index matches up properly. indexHeight, err := h.heightFromHashWithTx( tx, &newHeaderHash, ) if err != nil { - return fmt.Errorf("index and on-disk file "+ + return fmt.Errorf("index and on-persistent storage file "+ "out of sync at height: %v", height) } @@ -595,170 +438,91 @@ func (h *blockHeaderStore) ChainTip() (*wire.BlockHeader, uint32, error) { return &latestHeader, tipHeight, nil } -// FilterHeaderStore is an implementation of a fully fledged database for any -// variant of filter headers. The FilterHeaderStore combines a flat file to -// store the block headers with a database instance for managing the index into -// the set of flat files. -type FilterHeaderStore struct { - *headerStore -} - -// NewFilterHeaderStore returns a new instance of the FilterHeaderStore based -// on a target file path, filter type, and target net parameters. These -// parameters are required as if this is the initial start up of the -// FilterHeaderStore, then the initial genesis filter header will need to be -// inserted. -func NewFilterHeaderStore(filePath string, db walletdb.DB, - filterType HeaderType, netParams *chaincfg.Params, - headerStateAssertion *FilterHeader) (*FilterHeaderStore, error) { +// NewBlockHeaderStore creates a new instance of the blockHeaderStore based on +// a target file path, an open database instance, and finally a set of +// parameters for the target chain. These parameters are required as if this is +// the initial start up of the blockHeaderStore, then the initial genesis +// header will need to be inserted. +func NewBlockHeaderStore(filePath string, db walletdb.DB, + netParams *chaincfg.Params) (BlockHeaderStore, error) { - fStore, err := newHeaderStore(db, filePath, filterType) + hStore, err := newHeaderStore(db, filePath, Block) if err != nil { return nil, err } - // With the header store created, we'll fetch the fiie size to see if + // With the header store created, we'll fetch the file size to see if // we need to initialize it with the first header or not. - fileInfo, err := fStore.file.Stat() + height, genesis, err := hStore.height() if err != nil { return nil, err } - fhs := &FilterHeaderStore{ - fStore, + bhs := &blockHeaderStore{ + headerStore: hStore, } - // TODO(roasbeef): also reconsile with block header state due to way - // roll back works atm - // If the size of the file is zero, then this means that we haven't yet - // written the initial genesis header to disk, so we'll do so now. - if fileInfo.Size() == 0 { - var genesisFilterHash chainhash.Hash - switch filterType { - case RegularFilter: - basicFilter, err := builder.BuildBasicFilter( - netParams.GenesisBlock, nil, - ) - if err != nil { - return nil, err - } - - genesisFilterHash, err = builder.MakeHeaderForFilter( - basicFilter, - netParams.GenesisBlock.Header.PrevBlock, - ) - if err != nil { - return nil, err - } - - default: - return nil, fmt.Errorf("unknown filter type: %v", filterType) - } - - genesisHeader := FilterHeader{ - HeaderHash: *netParams.GenesisHash, - FilterHash: genesisFilterHash, - Height: 0, - } - if err := fhs.WriteHeaders(genesisHeader); err != nil { - return nil, err + // written the initial genesis header to persistent storage, so we'll do so now. + if genesis { + genesisHeader := BlockHeader{ + BlockHeader: &netParams.GenesisBlock.Header, + Height: 0, } - - return fhs, nil - } - - // If we have a state assertion then we'll check it now to see if we - // need to modify our filter header files before we proceed. - if headerStateAssertion != nil { - reset, err := fhs.maybeResetHeaderState( - headerStateAssertion, - ) - if err != nil { + if err := bhs.WriteHeaders(genesisHeader); err != nil { return nil, err } - // If the filter header store was reset, we'll re-initialize it - // to recreate our on-disk state. - if reset { - return NewFilterHeaderStore( - filePath, db, filterType, netParams, nil, - ) - } + return bhs, nil } - // As a final initialization step, we'll ensure that the header tip - // within the flat files, is in sync with out database index. - tipHash, tipHeight, err := fhs.chainTip() + // As a final initialization step (if this isn't the first time), we'll + // ensure that the header tip within the flat files, is in sync with + // out database index. + tipHash, tipHeight, err := bhs.chainTip() if err != nil { return nil, err } - // First, we'll compute the size of the current file so we can - // calculate the latest header written to disk. - fileHeight := uint32(fileInfo.Size()/32) - 1 + // Move back to the last header written. + height-- - // Using the file's current height, fetch the latest on-disk header. - latestFileHeader, err := fhs.readHeader(fileHeight) + // Using the current height, fetch the latest on-persistent storage header. + latestFileHeader, err := bhs.readHeader(height) if err != nil { return nil, err } - // If the index's tip hash, and the file on-disk match, then we're - // doing here. - if tipHash.IsEqual(latestFileHeader) { - return fhs, nil + // If the index's tip hash, and the file on-persistent storage match, then we're + // done here. + latestBlockHash := latestFileHeader.BlockHash() + if tipHash.IsEqual(&latestBlockHash) { + return bhs, nil } + // TODO(roasbeef): below assumes index can never get ahead? + // * we always update files _then_ indexes + // * need to dual pointer walk back for max safety + // Otherwise, we'll need to truncate the file until it matches the // current index tip. - for fileHeight > tipHeight { - if err := fhs.singleTruncate(); err != nil { + for height > tipHeight { + if err := bhs.singleTruncate(); err != nil { return nil, err } - fileHeight-- + height-- } - // TODO(roasbeef): make above into func - - return fhs, nil + return bhs, nil } -// maybeResetHeaderState will reset the header state if the header assertion -// fails, but only if the target height is found. The boolean returned indicates -// that header state was reset. -func (f *FilterHeaderStore) maybeResetHeaderState( - headerStateAssertion *FilterHeader) (bool, error) { - - // First, we'll attempt to locate the header at this height. If no such - // header is found, then we'll exit early. - assertedHeader, err := f.FetchHeaderByHeight( - headerStateAssertion.Height, - ) - if _, ok := err.(*ErrHeaderNotFound); ok { - return false, nil - } - if err != nil { - return false, err - } - - // If our on disk state and the provided header assertion don't match, - // then we'll purge this state so we can sync it anew once we fully - // start up. - if *assertedHeader != headerStateAssertion.FilterHash { - // Close the file before removing it. This is required by some - // OS, e.g., Windows. - if err := f.file.Close(); err != nil { - return true, err - } - if err := os.Remove(f.fileName); err != nil { - return true, err - } - return true, nil - } - - return false, nil +// FilterHeaderStore is an implementation of a fully fledged database for any +// variant of filter headers. The FilterHeaderStore combines a flat file to +// store the block headers with a database instance for managing the index into +// the set of flat files. +type FilterHeaderStore struct { + *headerStore } // FetchHeader returns the filter header that corresponds to the passed block @@ -810,70 +574,33 @@ func (f *FilterHeaderStore) FetchHeaderAncestors(numHeaders uint32, return headers, startHeight, nil } -// FilterHeader represents a filter header (basic or extended). The filter -// header itself is coupled with the block height and hash of the filter's -// block. -type FilterHeader struct { - // HeaderHash is the hash of the block header that this filter header - // corresponds to. - HeaderHash chainhash.Hash - - // FilterHash is the filter header itself. - FilterHash chainhash.Hash - - // Height is the block height of the filter header in the main chain. - Height uint32 -} - -// toIndexEntry converts the filter header into a index entry to be stored -// within the database. -func (f *FilterHeader) toIndexEntry() headerEntry { - return headerEntry{ - hash: f.HeaderHash, - height: f.Height, - } -} - -// WriteHeaders writes a batch of filter headers to persistent storage. The -// headers themselves are appended to the flat file, and then the index updated -// to reflect the new entries. -func (f *FilterHeaderStore) WriteHeaders(hdrs ...FilterHeader) error { - // Lock store for write. - f.mtx.Lock() - defer f.mtx.Unlock() +// maybeResetHeaderState will reset the header state if the header assertion +// fails, but only if the target height is found. The boolean returned indicates +// that header state was reset. +func (f *FilterHeaderStore) maybeResetHeaderState( + headerStateAssertion *FilterHeader) (bool, error) { - // If there are 0 headers to be written, return immediately. This - // prevents the newTip assignment from panicking because of an index - // of -1. - if len(hdrs) == 0 { - return nil + // First, we'll attempt to locate the header at this height. If no such + // header is found, then we'll exit early. + assertedHeader, err := f.FetchHeaderByHeight( + headerStateAssertion.Height, + ) + if _, ok := err.(*ErrHeaderNotFound); ok { + return false, nil } - - // First, we'll grab a buffer from the write buffer pool so we an - // reduce our total number of allocations, and also write the headers - // in a single swoop. - headerBuf := headerBufPool.Get().(*bytes.Buffer) - headerBuf.Reset() - defer headerBufPool.Put(headerBuf) - - // Next, we'll write out all the passed headers in series into the - // buffer we just extracted from the pool. - for _, header := range hdrs { - if _, err := headerBuf.Write(header.FilterHash[:]); err != nil { - return err - } + if err != nil { + return false, err } - // With all the headers written to the buffer, we'll now write out the - // entire batch in a single write call. - if err := f.appendRaw(headerBuf.Bytes()); err != nil { - return err + // If our persistent state and the provided header assertion don't match, + // then we'll purge this state so we can sync it anew once we fully + // start up. + if *assertedHeader != headerStateAssertion.FilterHash { + err = f.Remove() + return true, err } - // As the block headers should already be written, we only need to - // update the tip pointer for this particular header type. - newTip := hdrs[len(hdrs)-1].toIndexEntry().hash - return f.truncateIndex(&newTip, false) + return false, nil } // ChainTip returns the latest filter header and height known to the @@ -896,7 +623,7 @@ func (f *FilterHeaderStore) ChainTip() (*chainhash.Hash, uint32, error) { return latestHeader, tipHeight, nil } -// RollbackLastBlock rollsback both the index, and on-disk header file by a +// RollbackLastBlock rollsback both the index, and on-persistent storage header file by a // _single_ filter header. This method is meant to be used in the case of // re-org which disconnects the latest filter header from the end of the main // chain. The information about the latest header tip after truncation is @@ -913,7 +640,7 @@ func (f *FilterHeaderStore) RollbackLastBlock(newTip *chainhash.Hash) (*BlockSta } // With this height obtained, we'll use it to read what will be the new - // chain tip from disk. + // chain tip from persistent storage. newHeightTip := chainTipHeight - 1 newHeaderTip, err := f.readHeader(newHeightTip) if err != nil { @@ -935,3 +662,121 @@ func (f *FilterHeaderStore) RollbackLastBlock(newTip *chainhash.Hash) (*BlockSta Hash: *newHeaderTip, }, nil } + +// NewFilterHeaderStore returns a new instance of the FilterHeaderStore based +// on a target file path, filter type, and target net parameters. These +// parameters are required as if this is the initial start up of the +// FilterHeaderStore, then the initial genesis filter header will need to be +// inserted. +func NewFilterHeaderStore(filePath string, db walletdb.DB, + filterType HeaderType, netParams *chaincfg.Params, + headerStateAssertion *FilterHeader) (*FilterHeaderStore, error) { + + fStore, err := newHeaderStore(db, filePath, filterType) + if err != nil { + return nil, err + } + + // With the header store created, we'll fetch the fiie size to see if + // we need to initialize it with the first header or not. + height, genesis, err := fStore.height() + if err != nil { + return nil, err + } + + fhs := &FilterHeaderStore{ + fStore, + } + + // TODO(roasbeef): also reconsile with block header state due to way + // roll back works atm + + // If the size of the file is zero, then this means that we haven't yet + // written the initial genesis header to disk, so we'll do so now. + if genesis { + var genesisFilterHash chainhash.Hash + switch filterType { + case RegularFilter: + basicFilter, err := builder.BuildBasicFilter( + netParams.GenesisBlock, nil, + ) + if err != nil { + return nil, err + } + + genesisFilterHash, err = builder.MakeHeaderForFilter( + basicFilter, + netParams.GenesisBlock.Header.PrevBlock, + ) + if err != nil { + return nil, err + } + + default: + return nil, fmt.Errorf("unknown filter type: %v", filterType) + } + + genesisHeader := FilterHeader{ + HeaderHash: *netParams.GenesisHash, + FilterHash: genesisFilterHash, + Height: 0, + } + if err := fhs.WriteHeaders(genesisHeader); err != nil { + return nil, err + } + + return fhs, nil + } + + // If we have a state assertion then we'll check it now to see if we + // need to modify our filter header files before we proceed. + if headerStateAssertion != nil { + reset, err := fhs.maybeResetHeaderState( + headerStateAssertion, + ) + if err != nil { + return nil, err + } + + // If the filter header store was reset, we'll re-initialize it + // to recreate our on-disk state. + if reset { + return NewFilterHeaderStore( + filePath, db, filterType, netParams, nil, + ) + } + } + + // As a final initialization step, we'll ensure that the header tip + // within the flat files, is in sync with out database index. + tipHash, tipHeight, err := fhs.chainTip() + if err != nil { + return nil, err + } + + // Using the file's current height, fetch the latest on-disk header. + latestFileHeader, err := fhs.readHeader(height) + if err != nil { + return nil, err + } + + // If the index's tip hash, and the file on-disk match, then we're + // doing here. + if tipHash.IsEqual(latestFileHeader) { + return fhs, nil + } + + // Otherwise, we'll need to truncate the file until it matches the + // current index tip. + for height > tipHeight { + if err := fhs.singleTruncate(); err != nil { + return nil, err + } + + height-- + } + + // TODO(roasbeef): make above into func + + return fhs, nil +} diff --git a/headerfs/truncate.go b/headerfs/truncate.go index 4b1999cd8..8564286b4 100644 --- a/headerfs/truncate.go +++ b/headerfs/truncate.go @@ -1,5 +1,5 @@ -//go:build !windows -// +build !windows +//go:build !windows && !js && !wasm +// +build !windows,!js,!wasm package headerfs diff --git a/mock_store.go b/mock_store.go index a94a391c6..966ea512b 100644 --- a/mock_store.go +++ b/mock_store.go @@ -82,3 +82,7 @@ func (m *mockBlockHeaderStore) WriteHeaders(headers ...headerfs.BlockHeader) err return nil } + +func (m *mockBlockHeaderStore) Remove() error { + return nil +} From e6f5a70214deea3384d55f999ce85c4f774c76ac Mon Sep 17 00:00:00 2001 From: Linden <70739041+linden@users.noreply.github.com> Date: Sun, 18 Feb 2024 14:52:07 -0800 Subject: [PATCH 4/5] feat(`headerfs`): add indexeddb storage backend --- go.mod | 1 + go.sum | 4 +- headerfs/js.go | 402 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 405 insertions(+), 2 deletions(-) create mode 100644 headerfs/js.go diff --git a/go.mod b/go.mod index 1aeffc555..3318d491c 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/lightninglabs/neutrino/cache v1.1.0 github.com/lightningnetwork/lnd/queue v1.0.1 + github.com/linden/indexeddb v0.0.0-20240218035359-81389d584a5e github.com/linden/tempdb v0.0.0-20240218031655-83bc03e79f51 github.com/stretchr/testify v1.8.1 ) diff --git a/go.sum b/go.sum index 7cbb6007a..a63e56d93 100644 --- a/go.sum +++ b/go.sum @@ -84,8 +84,8 @@ github.com/lightningnetwork/lnd/queue v1.0.1 h1:jzJKcTy3Nj5lQrooJ3aaw9Lau3I0IwvQ github.com/lightningnetwork/lnd/queue v1.0.1/go.mod h1:vaQwexir73flPW43Mrm7JOgJHmcEFBWWSl9HlyASoms= github.com/lightningnetwork/lnd/ticker v1.0.0 h1:S1b60TEGoTtCe2A0yeB+ecoj/kkS4qpwh6l+AkQEZwU= github.com/lightningnetwork/lnd/ticker v1.0.0/go.mod h1:iaLXJiVgI1sPANIF2qYYUJXjoksPNvGNYowB8aRbpX0= -github.com/linden/tempdb v0.0.0-20231124230014-42fe18a60308 h1:3J67IzgcvBcl1UyzMuExSPmq7hejA1Vr1E7ixKqAUds= -github.com/linden/tempdb v0.0.0-20231124230014-42fe18a60308/go.mod h1:xR9HUmc4girdp/lNzw1jOt53GaCSmctyB8t+Q6EkWp8= +github.com/linden/indexeddb v0.0.0-20240218035359-81389d584a5e h1:6FTMUiW0wI+my/+7w4CEEYt5zvGmbAjugt2q/fCUyBM= +github.com/linden/indexeddb v0.0.0-20240218035359-81389d584a5e/go.mod h1:2AP38q2ks+pRwMc2EhmCexlyri456KCewavN30MZQ8Y= github.com/linden/tempdb v0.0.0-20240218031655-83bc03e79f51 h1:RfZREkHD3XpItaIN2I1/tb2hCzE2TN5e14OkTH6Sv74= github.com/linden/tempdb v0.0.0-20240218031655-83bc03e79f51/go.mod h1:xR9HUmc4girdp/lNzw1jOt53GaCSmctyB8t+Q6EkWp8= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= diff --git a/headerfs/js.go b/headerfs/js.go new file mode 100644 index 000000000..87ca34f2c --- /dev/null +++ b/headerfs/js.go @@ -0,0 +1,402 @@ +//go:build js && wasm + +package headerfs + +import ( + "bytes" + "errors" + "fmt" + "strconv" + "strings" + "sync" + "syscall/js" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcwallet/walletdb" + "github.com/linden/indexeddb" +) + +const headersStore = "headers" + +// headerStore combines a IndexedDB store of headers within a flat file in addition +// to a database which indexes that flat file. Together, these two abstractions +// can be used in order to build an indexed header store for any type of +// "header" as it deals only with raw bytes, and leaves it to a higher layer to +// interpret those raw bytes accordingly. +// +// TODO(roasbeef): quickcheck coverage. +type headerStore struct { + mtx sync.RWMutex // nolint:structcheck // false positive because used as embedded struct only + + idb *indexeddb.DB + + *headerIndex +} + +// readHeader reads a full block header from the flat-file. The header read is +// determined by the hight value. +func (h *headerStore) readHeader(height uint32) (wire.BlockHeader, error) { + // Create a new read/write transaction, scoped to headers. + tx, err := h.idb.NewTransaction([]string{headersStore}, indexeddb.ReadWriteMode) + if err != nil { + return wire.BlockHeader{}, err + } + + // Get the headers store. + hdrs := tx.Store(headersStore) + + // Get the raw header. + val, err := hdrs.Get(height) + + // check if the value was not found as that case is handled differently. + if errors.Is(err, indexeddb.ErrValueNotFound) { + return wire.BlockHeader{}, &ErrHeaderNotFound{err} + } + + if err != nil { + return wire.BlockHeader{}, err + } + + // Ensure the value is a string. + if val.Type() != js.TypeString { + return wire.BlockHeader{}, fmt.Errorf("unexpected type: %s", val.Type()) + } + + var header wire.BlockHeader + + // Unquote the value. + unquote, err := strconv.Unquote(val.String()) + if err != nil { + return wire.BlockHeader{}, err + } + + // Create a new reader from the raw header. + reader := strings.NewReader(unquote) + + // Finally, decode the bytes into a proper bitcoin header. + err = header.Deserialize(reader) + + return header, err +} + +func (h *headerStore) height() (uint32, bool, error) { + // Create a new read-only transaction. Scoped to headers. + tx, err := h.idb.NewTransaction([]string{headersStore}, indexeddb.ReadMode) + if err != nil { + return 0, false, err + } + + // Get the headers store. + hdrs := tx.Store(headersStore) + + // Count the amount of headers. + count, err := hdrs.Count() + + // Fallback to the zero value if no value is found. + if errors.Is(err, indexeddb.ErrValueNotFound) || count == 0 { + return 0, true, nil + } + + if err != nil { + return 0, false, err + } + + // Subtract one as the block height does not include the genesis block. + return uint32(count - 1), false, nil +} + +func (h *headerStore) singleTruncate() error { + // Get the current height. + height, genesis, err := h.height() + if err != nil { + return err + } + + // Create a write transaction. Scoped to height. + tx, err := h.idb.NewTransaction([]string{headersStore}, indexeddb.ReadWriteMode) + if err != nil { + return err + } + + // Get the height store. + hdrs := tx.Store(headersStore) + + // Delete the genesis block. + if genesis { + return hdrs.Delete("genesis") + } + + // Delete the header. + return hdrs.Delete(height + 1) +} + +// Remove every key/value and set the height to 0. +func (h *headerStore) Remove() error { + // Create a write transaction. Scoped to headers. + tx, err := h.idb.NewTransaction([]string{headersStore}, indexeddb.ReadWriteMode) + if err != nil { + return err + } + + // Get the headers store. + hdrs := tx.Store(headersStore) + + // Clear all the headers. + return hdrs.Clear() +} + +// newHeaderStore creates a new headerStore given an already open database, a +// target file path for the flat-file and a particular header type. The target +// file will be created as necessary. +func newHeaderStore(db walletdb.DB, filePath string, hType HeaderType) (*headerStore, error) { + var prefix string + switch hType { + case Block: + prefix = "blocks" + case RegularFilter: + prefix = "filters" + default: + return nil, fmt.Errorf("unrecognized filter type: %v", hType) + } + + // Prefix with the file path to prevent collisions. + prefix = filePath + "-" + prefix + + // Create the database. + idb, err := indexeddb.New(prefix, 1, func(up *indexeddb.Upgrade) error { + // Create the headers store. + up.CreateStore(headersStore) + + return nil + }) + if err != nil { + return nil, err + } + + // With the file open, we'll then create the header index so we can + // have random access into the flat files. + index, err := newHeaderIndex(db, hType) + if err != nil { + return nil, err + } + + return &headerStore{ + idb: idb, + headerIndex: index, + }, nil +} + +// readHeaderRange will attempt to fetch a series of block headers within the +// target height range. +// +// NOTE: The end height is _inclusive_ so we'll fetch all headers from the +// startHeight up to the end height, including the final header. +func (h *blockHeaderStore) readHeaderRange(startHeight uint32, + endHeight uint32) ([]wire.BlockHeader, error) { + var headers []wire.BlockHeader + + // Add headers from start height to end height. + for height := startHeight; height <= endHeight; height++ { + // Read the header. + header, err := h.readHeader(height) + if err != nil { + return nil, err + } + + // Append the header to the slice. + headers = append(headers, header) + } + + return headers, nil +} + +// WriteHeaders writes a set of headers to disk and updates the index in a +// single atomic transaction. +// +// NOTE: Part of the BlockHeaderStore interface. +func (h *blockHeaderStore) WriteHeaders(hdrs ...BlockHeader) error { + // Lock store for write. + h.mtx.Lock() + defer h.mtx.Unlock() + + height, genesis, err := h.height() + if err != nil { + return err + } + + tx, err := h.idb.NewTransaction([]string{headersStore}, indexeddb.ReadWriteMode) + if err != nil { + return err + } + + str := tx.Store(headersStore) + + // Next, we'll write out all the passed headers in series. + for _, header := range hdrs { + buf := new(bytes.Buffer) + + // Serialize the header. + if err := header.Serialize(buf); err != nil { + return err + } + + var key any + + if genesis { + key = height + genesis = false + } else { + // Add space for the genesis block. + key = height + 1 + } + + // Put the block header. + err = str.Put( + key, + // Quote the string so it is UTF-8 safe. + strconv.Quote(buf.String()), + ) + if err != nil { + return err + } + + height++ + + } + + // Once those are written, we'll then collate all the headers into + // headerEntry instances so we can write them all into the index in a + // single atomic batch. + headerLocs := make([]headerEntry, len(hdrs)) + for i, header := range hdrs { + headerLocs[i] = header.toIndexEntry() + } + + return h.addHeaders(headerLocs) +} + +// readHeader reads a single filter header at the specified height from the +// flat files on disk. +func (f *FilterHeaderStore) readHeader(height uint32) (*chainhash.Hash, error) { + // Create a new read-only transaction. + tx, err := f.idb.NewTransaction([]string{headersStore}, indexeddb.ReadMode) + if err != nil { + return nil, err + } + + // Get the headers store. + hdrs := tx.Store(headersStore) + + // Get the header. + val, err := hdrs.Get(height) + + // check if the value was not found as that case is handled differently. + if errors.Is(err, indexeddb.ErrValueNotFound) { + return nil, &ErrHeaderNotFound{err} + } + + if err != nil { + return nil, err + } + + // Ensure the value is a string. + if val.Type() != js.TypeString { + return nil, fmt.Errorf("unexpected type: %s", val.Type()) + } + + // Unquote the value. + unquote, err := strconv.Unquote(val.String()) + if err != nil { + return nil, err + } + + // Cast the hash to a chainhash. + return (*chainhash.Hash)([]byte(unquote)), nil +} + +// readHeaderRange will attempt to fetch a series of filter headers within the +// target height range. This method batches a set of reads into a single system +// call thereby increasing performance when reading a set of contiguous +// headers. +// +// NOTE: The end height is _inclusive_ so we'll fetch all headers from the +// startHeight up to the end height, including the final header. +func (f *FilterHeaderStore) readHeaderRange(startHeight uint32, + endHeight uint32) ([]chainhash.Hash, error) { + var headers []chainhash.Hash + + // Add headers from start height to end height. + for height := startHeight; height <= endHeight; height++ { + // Read the header. + header, err := f.readHeader(height) + if err != nil { + return nil, err + } + + // Append the header to the slice. + headers = append(headers, *header) + } + + return headers, nil +} + +// WriteHeaders writes a batch of filter headers to persistent storage. The +// headers themselves are appended to the flat file, and then the index updated +// to reflect the new entires. +func (f *FilterHeaderStore) WriteHeaders(hdrs ...FilterHeader) error { + // Lock store for write. + f.mtx.Lock() + defer f.mtx.Unlock() + + // If there are 0 headers to be written, return immediately. This + // prevents the newTip assignment from panicking because of an index + // of -1. + if len(hdrs) == 0 { + return nil + } + + height, genesis, err := f.height() + if err != nil { + return err + } + + // Create a new transaction. + tx, err := f.idb.NewTransaction([]string{headersStore}, indexeddb.ReadWriteMode) + if err != nil { + return err + } + + str := tx.Store(headersStore) + + // Next, we'll write out all the passed headers in series into the + // buffer we just extracted from the pool. + for _, header := range hdrs { + var key any + + if genesis { + key = height + genesis = false + } else { + // Add space for the genesis block. + key = height + 1 + } + + // Put the filter header. + err = str.Put( + key, + // Encode the filter hash. + strconv.Quote(string(header.FilterHash[:])), + ) + if err != nil { + return err + } + + height++ + } + + // As the block headers should already be written, we only need to + // update the tip pointer for this particular header type. + newTip := hdrs[len(hdrs)-1].toIndexEntry().hash + return f.truncateIndex(&newTip, false) +} From 6fba2171e09397c79f9c6bab5e050534c117a9cc Mon Sep 17 00:00:00 2001 From: Linden <70739041+linden@users.noreply.github.com> Date: Sun, 18 Feb 2024 15:07:24 -0800 Subject: [PATCH 5/5] fix: skip tests that require starting a process on `js` https://github.com/linden/wasmexec/issues/2 --- blockmanager_test.go | 7 +++++++ sync_test.go | 6 ++++++ 2 files changed, 13 insertions(+) diff --git a/blockmanager_test.go b/blockmanager_test.go index ef69a5000..a3a1ac7c0 100644 --- a/blockmanager_test.go +++ b/blockmanager_test.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "reflect" + "runtime" "strings" "testing" "time" @@ -874,6 +875,12 @@ func TestBlockManagerDetectBadPeers(t *testing.T) { func TestHandleHeaders(t *testing.T) { t.Parallel() + // skip this test because we can't spawn a process in the browser. + // https://github.com/linden/wasmexec/issues/2. + if runtime.GOOS == "js" { + t.Skip("start process is unsupported in the browser, skipping test.") + } + // First, we set up a block manager and a fake peer that will act as the // test's remote peer. bm, _, _, err := setupBlockManager(t) diff --git a/sync_test.go b/sync_test.go index 83f9a1f28..9884a8cf5 100644 --- a/sync_test.go +++ b/sync_test.go @@ -1035,6 +1035,12 @@ func testRandomBlocks(harness *neutrinoHarness, t *testing.T) { } func TestNeutrinoSync(t *testing.T) { + // skip this test because we can't spawn a process in the browser. + // https://github.com/linden/wasmexec/issues/2. + if runtime.GOOS == "js" { + t.Skip("start process is unsupported in the browser, skipping test.") + } + // Set up logging. logger := btclog.NewBackend(os.Stdout) chainLogger := logger.Logger("CHAIN")