Skip to content

Commit

Permalink
Fetch blocks concurrently but send in order (#32)
Browse files Browse the repository at this point in the history
This PR rearchitects the main loop such that we can run multiple
goroutines to fetch blocks concurrently, but send them in order to the
API to ensure we don't get gaps. We produce block numbers to fetch on an
unbuffered channel (`ProduceBlockNumbers`), and each concurrent
`ConsumeBlock` goroutine gets a block number from that channel. The
`SendBlocks` goroutine receives all blocks on an unbuffered channel, but
buffers them in a map until they can be sent in order.

I deleted the test that asserted the exact number of blocks that were
sent. I couldn't manage to get it working without off by one errors
sometimes. Since the main use case is running it forever and not with a
given blocks, I figured this was an OK decision. Let me know.

Possibly problematic scenario: If block N - and only block N - is
somehow problematic from the RPC node, and must be retried ~forever,
what will happen? MaxBatchSize - 1 goroutines continue to fetch blocks
from the RPC node successfully, causing the buffer in SendBlocks to fill
up. The mechanism in this PR doesn't have a way to backpressure there.
Is that a problem? If memory footprint is a concern, we could possibly
compress it here: At this point, currently, the block response is
uncompressed. If we compress it first, it would take a lot more blocks
for this to consume a lot of memory.

Otherwise, if there are general problems with the RPC node such that all
requests take a long time or need to be retried, we'll get backpressured
because we limit the concurrent number of calls to the node to
MaxBatchSize.

FWIW I'm running this locally and getting 12 blocks per second with 5
workers!
  • Loading branch information
vegarsti authored Jun 21, 2024
1 parent f776e08 commit 34cc3e7
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 234 deletions.
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func main() {
rpcClient,
duneClient,
ingester.Config{
MaxBatchSize: 1,
MaxBatchSize: cfg.Concurrency,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
Stack: cfg.RPCStack,
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Config struct {
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
13 changes: 9 additions & 4 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@ type Ingester interface {
// Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested
Run(ctx context.Context, startBlockNumber int64, maxCount int64) error

// ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive.
// ProduceBlockNumbers sends block numbers from startBlockNumber to endBlockNumber to outChan, inclusive.
// If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain
// it will run continuously until the context is cancelled
ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber int64, endBlockNumber int64) error
ProduceBlockNumbers(ctx context.Context, outChan chan int64, startBlockNumber int64, endBlockNumber int64) error

// ConsumeBlocks fetches blocks sent on the channel and sends them on the other channel.
// It will run continuously until the context is cancelled, or the channel is closed.
// It can safely be run concurrently.
ConsumeBlocks(context.Context, chan int64, chan models.RPCBlock) error

// SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop
// it will block until:
// - the context is cancelled
// - channel is closed
// - a fatal error occurs
SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error
SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startFrom int64) error

// This is just a placeholder for now
Info() Info
Expand All @@ -33,7 +38,7 @@ type Ingester interface {
}

const (
defaultMaxBatchSize = 1
defaultMaxBatchSize = 5
defaultPollInterval = 1 * time.Second
defaultReportProgressInterval = 30 * time.Second
)
Expand Down
222 changes: 138 additions & 84 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,54 +11,64 @@ import (
"golang.org/x/sync/errgroup"
)

// Run fetches blocks from a node RPC and sends them in order to the Dune API.
//
// ProduceBlockNumbers (blockNumbers channel) -> ConsumeBlocks (blocks channel) -> SendBlocks -> Dune
//
// We produce block numbers to fetch on an unbuffered channel (ProduceBlockNumbers),
// and each concurrent ConsumeBlock goroutine gets a block number from that channel.
// The SendBlocks goroutine receives all blocks on an unbuffered channel,
// but buffers them in a map until they can be sent in order.
func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error {
ctx, cancel := context.WithCancel(ctx)
errGroup, ctx := errgroup.WithContext(ctx)

inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize) // we close this after ConsumeBlocks has returned

// Ingest until endBlockNumber, inclusive. If maxCount is <= 0, we ingest forever
endBlockNumber := startBlockNumber - 1 + maxCount

i.log.Info("Starting ingester",
"maxBatchSize", i.cfg.MaxBatchSize,
"startBlockNumber", startBlockNumber,
"endBlockNumber", endBlockNumber,
"maxCount", maxCount,
)
blockNumbers := make(chan int64)
defer close(blockNumbers)
blocks := make(chan models.RPCBlock)
defer close(blocks)

errGroup, ctx := errgroup.WithContext(ctx)
// Start MaxBatchSize goroutines to consume blocks concurrently
for range i.cfg.MaxBatchSize {
errGroup.Go(func() error {
return i.ConsumeBlocks(ctx, blockNumbers, blocks)
})
}
errGroup.Go(func() error {
return i.SendBlocks(ctx, inFlightChan)
return i.ReportProgress(ctx)
})
errGroup.Go(func() error {
return i.ReportProgress(ctx)
return i.SendBlocks(ctx, blocks, startBlockNumber)
})

err := i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, endBlockNumber)
close(inFlightChan)
cancel()
if err != nil {
if err := errGroup.Wait(); err != nil {
i.log.Error("errgroup wait", "error", err)
}
return errors.Errorf("consume blocks: %w", err)
}
// Ingest until endBlockNumber, inclusive. If maxCount is <= 0, we ingest forever
endBlockNumber := startBlockNumber - 1 + maxCount
i.log.Info("Starting ingester",
"max_batch_size", i.cfg.MaxBatchSize,
"run_forever", maxCount <= 0,
"start_block_number", startBlockNumber,
"end_block_number", endBlockNumber,
"batch_size", i.cfg.MaxBatchSize,
)

if err := errGroup.Wait(); err != nil && err != ErrFinishedConsumeBlocks {
return err
}
// Produce block numbers in the main goroutine
err := i.ProduceBlockNumbers(ctx, blockNumbers, startBlockNumber, endBlockNumber)
i.log.Info("ProduceBlockNumbers is done", "error", err)
i.log.Info("Cancelling context")
cancel()

return nil
return errGroup.Wait()
}

var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks")

// ConsumeBlocks from the NPC Node
func (i *ingester) ConsumeBlocks(
ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64,
// ProduceBlockNumbers to be consumed by multiple goroutines running ConsumeBlocks
func (i *ingester) ProduceBlockNumbers(
ctx context.Context, blockNumbers chan int64, startBlockNumber int64, endBlockNumber int64,
) error {
latestBlockNumber := i.tryUpdateLatestBlockNumber()

// Helper function
waitForBlock := func(ctx context.Context, blockNumber int64, latestBlockNumber int64) int64 {
for blockNumber > latestBlockNumber {
select {
Expand All @@ -77,82 +87,126 @@ func (i *ingester) ConsumeBlocks(

// Consume blocks forever if end is before start. This happens if Run is called with a maxCount of <= 0
dontStop := endBlockNumber < startBlockNumber

i.log.Info("Produce block numbers from", "startBlockNumber", startBlockNumber, "endBlockNumber", endBlockNumber)
for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ {
latestBlockNumber = waitForBlock(ctx, blockNumber, latestBlockNumber)
startTime := time.Now()

i.log.Info("Getting block by number", "blockNumber", blockNumber, "latestBlockNumber", latestBlockNumber)
block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("Context canceled, stopping..")
return ctx.Err()
}

i.log.Error("Failed to get block by number, continuing..",
"blockNumber", blockNumber,
"latestBlockNumber", latestBlockNumber,
"error", err,
)
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: blockNumber,
Error: err,
})

// TODO: should I sleep (backoff) here?
continue
}

atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber)
getBlockElapsed := time.Since(startTime)

select {
case <-ctx.Done():
i.log.Info("ProduceBlockNumbers: Context canceled, stopping")
return ctx.Err()
case outChan <- block:
case blockNumbers <- blockNumber:
}

distanceFromLatest := latestBlockNumber - block.BlockNumber
distanceFromLatest := latestBlockNumber - blockNumber
if distanceFromLatest > 0 {
// TODO: improve logs of processing speed and catchup estimated ETA
i.log.Info("We're behind, trying to catch up..",
"blockNumber", block.BlockNumber,
"blockNumber", blockNumber,
"latestBlockNumber", latestBlockNumber,
"distanceFromLatest", distanceFromLatest,
"getBlockElapsedMillis", getBlockElapsed.Milliseconds(),
"elapsedMillis", time.Since(startTime).Milliseconds(),
)
}
}
// Done consuming blocks, either because we reached the endBlockNumber or the context was canceled
i.log.Info("Finished consuming blocks", "latestBlockNumber", latestBlockNumber, "endBlockNumber", endBlockNumber)
i.log.Info("Finished producing block numbers")
return ErrFinishedConsumeBlocks
}

func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error {
for payload := range blocksCh {
// TODO: we should batch RCP blocks here before sending to Dune.
if err := i.dune.SendBlock(ctx, payload); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("Context canceled, stopping..")
// ConsumeBlocks from the RPC node. This can be run in multiple goroutines to parallelize block fetching.
func (i *ingester) ConsumeBlocks(
ctx context.Context, blockNumbers chan int64, blocks chan models.RPCBlock,
) error {
for {
select {
case <-ctx.Done():
i.log.Info("ConsumeBlocks: context is done")
return ctx.Err()
case blockNumber := <-blockNumbers:
startTime := time.Now()

i.log.Info("Getting block by number", "blockNumber", blockNumber)
block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("ConsumeBlocks: Context canceled, stopping")
return ctx.Err()
}

i.log.Error("Failed to get block by number, continuing..",
"blockNumber", blockNumber,
"error", err,
)
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: blockNumber,
Error: err,
})

// TODO: should we sleep (backoff) here?
continue
}

atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber)
getBlockElapsed := time.Since(startTime)
i.log.Info("Got block by number", "blockNumber", blockNumber, "elapsed", getBlockElapsed)
select {
case <-ctx.Done():
i.log.Info("ConsumeBlocks: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber)
return ctx.Err()
case blocks <- block:
i.log.Info("Sent block")
}
}
}
}

// SendBlocks to Dune. We receive blocks from the ConsumeBlocks goroutines, potentially out of order.
// We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune.
func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startBlockNumber int64) error {
i.log.Info("SendBlocks: Starting to receive blocks")
blockMap := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order
next := startBlockNumber
for {
select {
case <-ctx.Done():
i.log.Info("SendBlocks: Context canceled, stopping")
return ctx.Err()
case block, ok := <-blocksCh:
if !ok {
i.log.Info("SendBlocks: Channel is closed, returning")
return nil
}

blockMap[block.BlockNumber] = block
i.log.Info("Received block", "blockNumber", block.BlockNumber)

// Send this block only if we have sent all previous blocks
for block, ok := blockMap[next]; ok; block, ok = blockMap[next] {
i.log.Info("SendBlocks: Sending block to DuneAPI", "blockNumber", block.BlockNumber)
if err := i.dune.SendBlock(ctx, block); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return ctx.Err()
}
// TODO: implement DeadLetterQueue
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: block.BlockNumber,
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber)
}

// We've sent block N, so increment the pointer
delete(blockMap, next)
next++
}
// TODO: implement DeadLetterQueue
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlock failed, continuing..", "blockNumber", payload.BlockNumber, "error", err)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: payload.BlockNumber,
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", payload.BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, payload.BlockNumber)
}
}
return ctx.Err() // channel closed
}

func (i *ingester) tryUpdateLatestBlockNumber() int64 {
Expand Down Expand Up @@ -183,7 +237,7 @@ func (i *ingester) ReportProgress(ctx context.Context) error {

blocksPerSec := float64(lastIngested-previousIngested) / tNow.Sub(previousTime).Seconds()
newDistance := latest - lastIngested
fallingBehind := newDistance > (previousDistance + 1) // TODO: make is more stable
fallingBehind := newDistance > (previousDistance + 1) // TODO: make this more stable

rpcErrors := len(i.info.RPCErrors)
duneErrors := len(i.info.DuneErrors)
Expand Down
Loading

0 comments on commit 34cc3e7

Please sign in to comment.