Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NIT-2987] Store last message pruned in database #2838

Merged
merged 7 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 61 additions & 13 deletions arbnode/message_pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"

"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
Expand All @@ -23,15 +24,13 @@ import (

type MessagePruner struct {
stopwaiter.StopWaiter
transactionStreamer *TransactionStreamer
inboxTracker *InboxTracker
config MessagePrunerConfigFetcher
pruningLock sync.Mutex
lastPruneDone time.Time
cachedPrunedMessages uint64
cachedPrunedBlockHashesInputFeed uint64
cachedPrunedMessageResult uint64
cachedPrunedDelayedMessages uint64
transactionStreamer *TransactionStreamer
inboxTracker *InboxTracker
config MessagePrunerConfigFetcher
pruningLock sync.Mutex
lastPruneDone time.Time
cachedPrunedMessages uint64
cachedPrunedDelayedMessages uint64
}

type MessagePrunerConfig struct {
Expand Down Expand Up @@ -121,37 +120,51 @@ func (m *MessagePruner) prune(ctx context.Context, count arbutil.MessageIndex, g
}

func (m *MessagePruner) deleteOldMessagesFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64) error {
prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messageResultPrefix, &m.cachedPrunedMessageResult, uint64(messageCount))
if m.cachedPrunedMessages == 0 {
m.cachedPrunedMessages = fetchLastPrunedKey(m.transactionStreamer.db, lastPrunedMessageKey)
}
if m.cachedPrunedDelayedMessages == 0 {
m.cachedPrunedDelayedMessages = fetchLastPrunedKey(m.inboxTracker.db, lastPrunedDelayedMessageKey)
}
lastPrunedMessage := m.cachedPrunedMessages
prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messageResultPrefix, &lastPrunedMessage, uint64(messageCount))
if err != nil {
return fmt.Errorf("error deleting message results: %w", err)
}
if len(prunedKeysRange) > 0 {
log.Info("Pruned message results:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1])
}

prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, blockHashInputFeedPrefix, &m.cachedPrunedBlockHashesInputFeed, uint64(messageCount))
lastPrunedMessage = m.cachedPrunedMessages
prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, blockHashInputFeedPrefix, &lastPrunedMessage, uint64(messageCount))
if err != nil {
return fmt.Errorf("error deleting expected block hashes: %w", err)
}
if len(prunedKeysRange) > 0 {
log.Info("Pruned expected block hashes:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1])
}

prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, &m.cachedPrunedMessages, uint64(messageCount))
lastPrunedMessage = m.cachedPrunedMessages
prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, &lastPrunedMessage, uint64(messageCount))
if err != nil {
return fmt.Errorf("error deleting last batch messages: %w", err)
}
if len(prunedKeysRange) > 0 {
log.Info("Pruned last batch messages:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1])
}
insertLastPrunedKey(m.transactionStreamer.db, lastPrunedMessageKey, lastPrunedMessage)
m.cachedPrunedMessages = lastPrunedMessage

prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.inboxTracker.db, rlpDelayedMessagePrefix, &m.cachedPrunedDelayedMessages, delayedMessageCount)
lastPrunedDelayedMessage := m.cachedPrunedDelayedMessages
prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.inboxTracker.db, rlpDelayedMessagePrefix, &lastPrunedDelayedMessage, delayedMessageCount)
amsanghi marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("error deleting last batch delayed messages: %w", err)
}
if len(prunedKeysRange) > 0 {
log.Info("Pruned last batch delayed messages:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1])
}
insertLastPrunedKey(m.inboxTracker.db, lastPrunedDelayedMessageKey, lastPrunedMessage)
m.cachedPrunedDelayedMessages = lastPrunedDelayedMessage
return nil
}

Expand All @@ -177,3 +190,38 @@ func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, pref
}
return keys, err
}

func insertLastPrunedKey(db ethdb.Database, lastPrunedKey []byte, lastPrunedValue uint64) {
lastPrunedValueByte, err := rlp.EncodeToBytes(lastPrunedValue)
if err != nil {
log.Error("error encoding last pruned value: %w", err)
} else {
err = db.Put(lastPrunedKey, lastPrunedValueByte)
if err != nil {
log.Error("error saving last pruned value: %w", err)
}
}
}

func fetchLastPrunedKey(db ethdb.Database, lastPrunedKey []byte) uint64 {
hasKey, err := db.Has(lastPrunedKey)
if err != nil {
log.Warn("error checking for last pruned key: %w", err)
return 0
}
if !hasKey {
return 0
}
lastPrunedValueByte, err := db.Get(lastPrunedKey)
if err != nil {
log.Warn("error fetching last pruned key: %w", err)
return 0
}
var lastPrunedValue uint64
err = rlp.DecodeBytes(lastPrunedValueByte, &lastPrunedValue)
if err != nil {
log.Warn("error decoding last pruned value: %w", err)
return 0
}
return lastPrunedValue
}
10 changes: 6 additions & 4 deletions arbnode/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ var (
sequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata
delayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count

messageCountKey []byte = []byte("_messageCount") // contains the current message count
delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count
sequencerBatchCountKey []byte = []byte("_sequencerBatchCount") // contains the current sequencer message count
dbSchemaVersion []byte = []byte("_schemaVersion") // contains a uint64 representing the database schema version
messageCountKey []byte = []byte("_messageCount") // contains the current message count
lastPrunedMessageKey []byte = []byte("_lastPrunedMessageKey") // contains the last pruned message key
lastPrunedDelayedMessageKey []byte = []byte("_lastPrunedDelayedMessageKey") // contains the last pruned RLP delayed message key
delayedMessageCountKey []byte = []byte("_delayedMessageCount") // contains the current delayed message count
sequencerBatchCountKey []byte = []byte("_sequencerBatchCount") // contains the current sequencer message count
dbSchemaVersion []byte = []byte("_schemaVersion") // contains a uint64 representing the database schema version
)

const currentDbSchemaVersion uint64 = 1
Loading