Skip to content

Commit

Permalink
neutrino: Added sideload functionality to blockmanager
Browse files Browse the repository at this point in the history
Signed-off-by: Maureen Ononiwu <[email protected]>
  • Loading branch information
Chinwendu20 committed Aug 22, 2023
1 parent ae9fc8e commit 2d2048d
Showing 1 changed file with 232 additions and 10 deletions.
242 changes: 232 additions & 10 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/neutrino/banman"
"github.com/lightninglabs/neutrino/blockntfns"
"github.com/lightninglabs/neutrino/chainDataLoader"
"github.com/lightninglabs/neutrino/chainsync"
"github.com/lightninglabs/neutrino/headerfs"
"github.com/lightninglabs/neutrino/headerlist"
Expand All @@ -45,6 +46,12 @@ const (
maxCFCheckptsPerQuery = wire.MaxCFHeadersPerMsg / wire.CFCheckptInterval
)

var (
//MaxHeaderWrite is the total amount of block headers written to the header store
//at once during sideloading.
MaxHeaderWrite = 2000
)

// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
var zeroHash chainhash.Hash

Expand Down Expand Up @@ -109,6 +116,9 @@ type blockManagerCfg struct {
checkResponse func(sp *ServerPeer, resp wire.Message,
quit chan<- struct{}, peerQuit chan<- struct{}),
options ...QueryOption)

// sideLoad is the config used to enable a non p2p fetching of headers to improve sync speed.
sideLoad SideLoadOpt
}

// blockManager provides a concurrency safe block manager for handling all
Expand Down Expand Up @@ -206,6 +216,9 @@ type blockManager struct { // nolint:maligned
minRetargetTimespan int64 // target timespan / adjustment factor
maxRetargetTimespan int64 // target timespan * adjustment factor
blocksPerRetarget int32 // target timespan / target time per block

//sideLoadReader is the reader used for a non p2p fetching of headers.
sideLoadReader chainDataLoader.Reader
}

// newBlockManager returns a new bitcoin block manager. Use Start to begin
Expand All @@ -214,7 +227,6 @@ func newBlockManager(cfg *blockManagerCfg) (*blockManager, error) {
targetTimespan := int64(cfg.ChainParams.TargetTimespan / time.Second)
targetTimePerBlock := int64(cfg.ChainParams.TargetTimePerBlock / time.Second)
adjustmentFactor := cfg.ChainParams.RetargetAdjustmentFactor

bm := blockManager{
cfg: cfg,
peerChan: make(chan interface{}, MaxPeers*3),
Expand Down Expand Up @@ -279,6 +291,28 @@ func newBlockManager(cfg *blockManagerCfg) (*blockManager, error) {
}
bm.filterHeaderTipHash = fh.BlockHash()

// If sideload is enabled initialize reader and assign to blockmanager.
if bm.cfg.sideLoad.Enabled {

reader, err := chainDataLoader.NewReader(&chainDataLoader.ReaderConfig{

SourceType: bm.cfg.sideLoad.SourceType,
Path: bm.cfg.sideLoad.Path,
})

log.Infof("Side loading enabled")

if err == nil {
bm.sideLoadReader = reader
} else {

bm.cfg.sideLoad.Enabled = false
fmt.Printf("Side loading disabled: %v\n", err)
log.Warnf("Side loading disabled: %v", err)
}

}

return &bm, nil
}

Expand Down Expand Up @@ -1981,6 +2015,15 @@ func checkCFCheckptSanity(cp map[string][]*chainhash.Hash,
func (b *blockManager) blockHandler() {
defer b.wg.Done()

// Attempt to sideLoad headers. If sideLoading is not enabled the function
// returns quickly.
b.sideLoadHeaders()

select {
case <-b.quit:
return
default:
}
candidatePeers := list.New()
out:
for {
Expand Down Expand Up @@ -2410,16 +2453,11 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
node := headerlist.Node{Header: *blockHeader}
prevNode := prevNodeEl
prevHash := prevNode.Header.BlockHash()
if prevHash.IsEqual(&blockHeader.PrevBlock) {
prevNodeHeight := prevNode.Height
prevNodeHeader := prevNode.Header
err := b.checkHeaderSanity(
blockHeader, false, prevNodeHeight,
&prevNodeHeader,
)
valid, err := b.verifyBlockHeader(blockHeader, *prevNode)
if valid {
if err != nil {
log.Warnf("Header doesn't pass sanity check: "+
"%s -- disconnecting peer", err)
log.Warnf("%v: "+
" -- disconnecting peer", err)
hmsg.peer.Disconnect()
return
}
Expand Down Expand Up @@ -2547,6 +2585,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
reorgHeader, true,
int32(prevNodeHeight), prevNodeHeader,
)

if err != nil {
log.Warnf("Header doesn't pass sanity"+
" check: %s -- disconnecting "+
Expand Down Expand Up @@ -2754,6 +2793,7 @@ func areHeadersConnected(headers []*wire.BlockHeader) bool {
}

lastHeader = blockHash

}

return true
Expand Down Expand Up @@ -3043,4 +3083,186 @@ func (l *lightHeaderCtx) RelativeAncestorCtx(
return newLightHeaderCtx(
ancestorHeight, ancestor, l.store, l.headerList,
)

}

func (b *blockManager) sideLoadHeaders() {
// If sideloading is not enabled on this chain
// return quickly.
if !b.cfg.sideLoad.Enabled {
return
}

// If headers contained in the side load source are for a different chain network return
// immediately.
if b.sideLoadReader.HeadersChain() != b.cfg.ChainParams.Net {
log.Error("headers from side load file are of network %v "+
"and so incompatible with neutrino's current bitcoin network "+
"-- skipping side loading", b.sideLoadReader.HeadersChain())

return
}
log.Infof("Side loading headers from %v to %v", b.headerTip, b.sideLoadReader.EndHeight())

// Set headerTip to enable reader supply header, node needs
err := b.sideLoadReader.SetHeight(int64(b.headerTip))
if err != nil {
log.Errorf("error while setting height for sideload--- skipping sideloading: "+
"%v", err)

return
}
headerBatch := make([]headerfs.BlockHeader, 0, MaxHeaderWrite)
for {
//Request header
header, headerErr := b.sideLoadReader.NextHeader()
// If any error occurs while fetching headers that does not indicate
// an end of file, return immediately.
if headerErr != nil && headerErr != chainDataLoader.ErrEndOfFile {

log.Errorf("error while fetching headers -- skipping sideloading %v", err)

return
}

var (
node *headerlist.Node
prevNode *headerlist.Node
)

// Update node height if header is not nil
if header != nil {

// Ensure there is a previous header to compare against.
prevNodeEl := b.headerList.Back()
if prevNodeEl == nil {
log.Warnf("side load - Header list does not contain a previous" +
"element as expected -- exiting side load")

return
}

node = &headerlist.Node{Header: *header}
prevNode = prevNodeEl
node.Height = prevNode.Height + 1
}

if b.cfg.sideLoad.Verify && headerErr == nil {
valid, err := b.verifyBlockHeader(header, *prevNode)
if err != nil || !valid {
log.Debugf("Side Load- Did not pass verification at height %v"+
"-- rolling back to last verified checkpoint and skipping sideload", node.Height)

prevCheckpoint := b.findPreviousHeaderCheckpoint(
node.Height,
)

log.Infof("Rolling back to previous validated "+
"checkpoint at height %d/hash %s",
prevCheckpoint.Height,
prevCheckpoint.Hash)

err = b.rollBackToHeight(uint32(prevCheckpoint.Height))
if err != nil {
panic(fmt.Sprintf("Rollback failed: %s", err))
// Should we panic here?
}
tipHeader, height, err := b.cfg.BlockHeaders.ChainTip()
if err != nil {
return
}
b.headerList.ResetHeaderState(headerlist.Node{
Height: int32(height),
Header: *tipHeader,
})
return
}

}

// Verify checkpoint only if verification is enabled
if b.nextCheckpoint != nil && headerErr == nil && b.cfg.sideLoad.Verify &&
node.Height == b.nextCheckpoint.Height {

nodeHash := node.Header.BlockHash()
if nodeHash.IsEqual(b.nextCheckpoint.Hash) {

log.Infof("Verified downloaded block "+
"header against checkpoint at height "+
"%d/hash %s", node.Height, nodeHash)
} else {
log.Warnf("Error at checkpoint while side loading headers, exiting"+
"%d/hash %s", node.Height, nodeHash)
return
}

}

if header != nil {
headerBatch = append(headerBatch, headerfs.BlockHeader{
BlockHeader: header,
Height: uint32(node.Height),
})

}

// Write header if batch is greater than the MaxHeaderWrite or if we have reached the
// end of file.
if len(headerBatch) >= MaxHeaderWrite || headerErr == chainDataLoader.ErrEndOfFile {
err = b.cfg.BlockHeaders.WriteHeaders(headerBatch...)
if err != nil {
log.Warnf("Error writing headers in blockheader store")
return
}
log.Debugf("Written header batch .... new tip %v", int(b.headerTip)+len(headerBatch))
// Updated header tip so as to enable cfhandler to start fetching cfheaders

if headerErr == chainDataLoader.ErrEndOfFile {
log.Infof("Successfully completed sideloading")
return
}
b.nextCheckpoint = b.findNextHeaderCheckpoint(node.Height)
log.Debug("Checkpoint-%v, at height-%v", b.nextCheckpoint.Height, node.Height)
b.newHeadersMtx.Lock()
b.headerTip = uint32(node.Height)
b.headerTipHash = node.Header.BlockHash()
b.newHeadersMtx.Unlock()
b.newHeadersSignal.Broadcast()

b.blkHeaderProgressLogger.LogBlockHeight(
header.Timestamp, node.Height,
)

headerBatch = make([]headerfs.BlockHeader, 0)

}

b.headerList.PushBack(*node)
// Check if we are to quit now
select {
case <-b.quit:
return
default:
}
}

}

func (b *blockManager) verifyBlockHeader(blockHeader *wire.BlockHeader, prevNode headerlist.Node) (bool, error) {
prevNodeHeader := prevNode.Header
prevHash := prevNode.Header.BlockHash()
prevNodeHeight := prevNode.Height

if prevHash.IsEqual(&blockHeader.PrevBlock) {
err := b.checkHeaderSanity(blockHeader, false, prevNodeHeight, &prevNodeHeader)

if err != nil {
return true, fmt.Errorf("did not pass sanity check: %w", err)
}
return true, nil

} else {
return false, nil
}

}

0 comments on commit 2d2048d

Please sign in to comment.