Skip to content

Commit

Permalink
Merge pull request #1945 from OffchainLabs/improve-txstreamer-cacheclear
Browse files Browse the repository at this point in the history
Retain new feed messages when cache is cleared
  • Loading branch information
ganeshvanahalli authored Nov 2, 2023
2 parents a365513 + 8b0ac77 commit ed4ec22
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil
var oldMsg *arbostypes.MessageWithMetadata
var lastDelayedRead uint64
var hasNewConfirmedMessages bool
var cacheClearLen int

messagesAfterPos := messageStartPos + arbutil.MessageIndex(len(messages))
broadcastStartPos := arbutil.MessageIndex(atomic.LoadUint64(&s.broadcasterQueuedMessagesPos))
Expand Down Expand Up @@ -724,10 +725,13 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil
// Or no active broadcast reorg and broadcast messages start before or immediately after last L1 message
if messagesAfterPos >= broadcastStartPos {
broadcastSliceIndex := int(messagesAfterPos - broadcastStartPos)
messagesOldLen := len(messages)
if broadcastSliceIndex < len(s.broadcasterQueuedMessages) {
// Some cached feed messages can be used
messages = append(messages, s.broadcasterQueuedMessages[broadcastSliceIndex:]...)
}
// This calculation gives the exact length of cache which was appended to messages
cacheClearLen = broadcastSliceIndex + len(messages) - messagesOldLen
}

// L1 used or replaced broadcast cache items
Expand Down Expand Up @@ -800,8 +804,14 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil
}

if clearQueueOnSuccess {
s.broadcasterQueuedMessages = s.broadcasterQueuedMessages[:0]
atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, 0)
// Check if new messages were added at the end of cache, if they were, then dont remove those particular messages
if len(s.broadcasterQueuedMessages) > cacheClearLen {
s.broadcasterQueuedMessages = s.broadcasterQueuedMessages[cacheClearLen:]
atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos)+uint64(cacheClearLen))
} else {
s.broadcasterQueuedMessages = s.broadcasterQueuedMessages[:0]
atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, 0)
}
s.broadcasterQueuedMessagesActiveReorg = false
}

Expand Down

0 comments on commit ed4ec22

Please sign in to comment.