Skip to content

Commit

Permalink
Fix deadlock; circular buffer fail and DeleteRange while in async (ne…
Browse files Browse the repository at this point in the history
…eds tests)
  • Loading branch information
banks committed Nov 10, 2023
1 parent 254fe55 commit 6a33ad1
Showing 1 changed file with 58 additions and 15 deletions.
73 changes: 58 additions & 15 deletions log_cache_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,23 @@ func (c *LogCacheAsync) LastIndex() (uint64, error) {
return atomic.LoadUint64(&c.lastIndex), nil
}

// GetLog gets a log entry at a given index.
func (c *LogCacheAsync) GetLog(index uint64, log *Log) error {
// Quick check to see if it's even possibly in the cache so we avoid locking
// at all in the case of scanning through old records.
// minPossibleIdx is the lowest log we could possibly have cached. We might
// not have that low because we just started or because we are currently
// writing a batch over the top, but it's a lower bound.
func (c *LogCacheAsync) minPossibleIdx() uint64 {
lastIdx := atomic.LoadUint64(&c.lastIndex)

// minPossibleIdx is the lowest log we could possibly have cached. We might
// not have that low because we just started or because we are currently
// writing a batch over the top but below this we know it won't be in cache so
// there is no need to lock at all.
minPossibleIdx := uint64(1)
if lastIdx > c.size {
minPossibleIdx = lastIdx - c.size
}
if index < minPossibleIdx {
return minPossibleIdx
}

// GetLog gets a log entry at a given index.
func (c *LogCacheAsync) GetLog(index uint64, log *Log) error {
// Quick check to see if it's even possibly in the cache so we avoid locking
// at all in the case of scanning through old records.
if index < c.minPossibleIdx() {
return c.store.GetLog(index, log)
}

Expand Down Expand Up @@ -170,15 +172,50 @@ func (c *LogCacheAsync) StoreLogs(logs []*Log) error {
return nil
}

// DeleteRange deletes a range of log entries. The range is inclusive. We only
// support DeleteRange calls in Sync mode which makes reasoning about the cache
// simple because the cache can't contain any un-flushed writes in sync mode.
// DeleteRange deletes a range of log entries. The range is inclusive. We need
// to support deletions in both sync and async mode since leader needs to be
// able to truncate logs while in Async mode. In async mode though we can't
// clear the cache as it may have unflushed logs. It has always been OK to call
// DeleteRange and StoreLogs concurrently though since Raft truncates in a
// background goroutine so we don't have to involve the flusher thread in the
// delete. We do need to be mindful though that the underlying LogStore probably
// used a lock to serialise DeleteRange and StoreLogs so our call here could
// deadlock with the flusher thread if we delete while holding the cache state
// lock.
func (c *LogCacheAsync) DeleteRange(min uint64, max uint64) error {
c.state.Lock()

if c.state.completionCh != nil {
// ASYNC MODE!

// Release the state lock while we delete to reduce contention if this is
// slow.
c.state.Unlock()
return errors.New("call to sync DeleteRange when in async mode")
err := c.store.DeleteRange(min, max)
if err != nil {
return err
}

// First check if the truncate could have any impact on any on the cached
// records.
if max < c.minPossibleIdx() {
// range deleted was entirely below the cached range so we are done.
return nil
}

// It's possible (but not certain) that some of the deleted range could be
// in cache. Check if any logs in the deleted range are present and remove
// them if they are.
c.state.Lock()
for idx := min; idx <= max; idx++ {
l := c.state.cache[idx&c.sizeMask]
if l != nil && l.Index == idx {
// Delete it!
c.state.cache[idx&c.sizeMask] = nil
}
}
c.state.Unlock()
return nil
}
// Invalidate the cache
c.state.cache = make([]*Log, c.size)
Expand Down Expand Up @@ -289,12 +326,16 @@ func (c *LogCacheAsync) doFlush(logs []*Log, start time.Time) *LogWriteCompletio
err := c.store.StoreLogs(logs)

lwc := LogWriteCompletion{
// Initialize this to the current persistentIndex in case we fail to write.
// We could Load but no need since we know it must be the one before the
// first log we are writing now.
PersistentIndex: logs[0].Index - 1,
Error: err,
Duration: time.Since(start),
}
if err == nil {
lwc.PersistentIndex = logs[len(logs)-1].Index
atomic.StoreUint64(&c.persistentIndex, logs[len(logs)-1].Index)
}
return &lwc
}
Expand Down Expand Up @@ -347,8 +388,10 @@ func (c *LogCacheAsync) StoreLogsAsync(logs []*Log) error {
// this lets us get notified about when it's free. Note that even though we
// unlock and it's _possible_ for another StoreLogsAsync call to be made,
doneCh := make(chan struct{})
c.state.triggerChan <- syncRequest{startTime: start, doneCh: doneCh}
// Unlock before we send since we might block if the flusher is busy but it
// won't be able to complete without the lock.
c.state.Unlock()
c.state.triggerChan <- syncRequest{startTime: start, doneCh: doneCh}
<-doneCh
c.state.Lock()
// Reload the indexes now sync is done so we can check if there is space
Expand Down

0 comments on commit 6a33ad1

Please sign in to comment.