diff --git a/inmem_store.go b/inmem_store.go index fbeb3e85..88d177f6 100644 --- a/inmem_store.go +++ b/inmem_store.go @@ -5,6 +5,7 @@ package raft import ( "errors" + "fmt" "sync" "sync/atomic" ) @@ -13,7 +14,7 @@ import ( // It should NOT EVER be used for production. It is used only for // unit tests. Use the MDBStore implementation instead. type InmemStore struct { - storeFail uint32 // accessed atomically as a bool 0/1 + storeFail atomic.Bool // storeSem lets the test control exactly when s StoreLog(s) call takes // effect. @@ -25,8 +26,14 @@ type InmemStore struct { logs map[uint64]*Log kv map[string][]byte kvInt map[string]uint64 + monotonic bool } +var ( + _ LogStore = (*InmemStore)(nil) + _ MonotonicLogStore = (*InmemStore)(nil) +) + // NewInmemStore returns a new in-memory backend. Do not ever // use for production. Only for testing. func NewInmemStore() *InmemStore { @@ -57,7 +64,7 @@ func (i *InmemStore) BlockStore() func() { // FailNext signals that the next call to StoreLog(s) should return an error // without modifying the log contents. Subsequent calls will succeed again. func (i *InmemStore) FailNext() { - atomic.StoreUint32(&i.storeFail, 1) + i.storeFail.Store(true) } // FirstIndex implements the LogStore interface. @@ -102,8 +109,8 @@ func (i *InmemStore) StoreLogs(logs []*Log) error { }() // Switch out fail if it is set so we only fail once - shouldFail := atomic.SwapUint32(&i.storeFail, 0) - if shouldFail == 1 { + shouldFail := i.storeFail.Swap(false) + if shouldFail { return errors.New("IO error") } @@ -111,6 +118,10 @@ func (i *InmemStore) StoreLogs(logs []*Log) error { defer i.l.Unlock() for _, l := range logs { + if i.monotonic && l.Index != i.highIndex+1 { + return fmt.Errorf("non-monotonic write, log index: %d, last index %d, batch range: [%d, %d]", + l.Index, i.highIndex, logs[0].Index, logs[len(logs)-1].Index) + } i.logs[l.Index] = l if i.lowIndex == 0 { i.lowIndex = l.Index @@ -175,3 +186,17 @@ func (i *InmemStore) GetUint64(key []byte) (uint64, error) { defer i.l.RUnlock() return i.kvInt[string(key)], nil } + +// IsMonotonic implements MonotonicLogStore +func (i *InmemStore) IsMonotonic() bool { + return i.monotonic +} + +// SetMonotonic allows the test to choose if the store should enforce monotonic +// writes. This is useful for testing the leader loop's handling of +// non-monotonic log stores. +func (i *InmemStore) SetMonotonic(v bool) { + i.l.Lock() + defer i.l.Unlock() + i.monotonic = v +} diff --git a/log_cache_async.go b/log_cache_async.go index aaa5ebd6..528a3f62 100644 --- a/log_cache_async.go +++ b/log_cache_async.go @@ -245,16 +245,36 @@ func (c *LogCacheAsync) EnableAsync(cc chan<- LogWriteCompletion) { } func (c *LogCacheAsync) runFlusher() { - var batch []*Log + batch := make([]*Log, 0, 128) + consecutiveErrors := 0 + var lastErrTime time.Time for { syncReq := <-c.state.triggerChan + // If we've had more than one error in a row, back off a little to avoid a + // hot loop. Note that we specifically don't wait after the first error in + // case it was a one-off but will start backing off after 2 failures. This + // is not exponential because storage should be fast and we only are + // protecting against spinning on a software bug or similar not actually. + if consecutiveErrors > 1 { + waitTime := time.Duration(consecutiveErrors) * 10 * time.Millisecond + if waitTime > 100*time.Millisecond { + waitTime = 100 * time.Millisecond + } + // Only wait if we're being triggered faster than the backoff time anyway. + if time.Since(lastErrTime) < waitTime { + time.Sleep(waitTime - time.Since(lastErrTime)) + } + } + // Load the state under lock c.state.Lock() persistedIdx := atomic.LoadUint64(&c.persistentIndex) lastIdx := atomic.LoadUint64(&c.lastIndex) + // Make sure to reset batch! + batch = batch[:0] for idx := persistedIdx + 1; idx <= lastIdx; idx++ { batch = append(batch, c.state.cache[idx&c.sizeMask]) } @@ -283,17 +303,31 @@ func (c *LogCacheAsync) runFlusher() { // Might be a no-op if batch is empty lwc := c.doFlush(batch, syncReq.startTime) - // Note: if the flush failed we might retry it on the next loop. This is - // safe assuming that the LogStore is atomic and not left in an invalid - // state (which Raft assumes in general already). It might loop and retry - // the write of the same logs next time which may fail again or may even - // succeed before the leaderloop notices the error and steps down. But - // either way it's fine because we don't advance the persistedIndex if it - // fails so we'll keep trying to write the same logs at least not leave a - // gap. Actually if we do error, even if there is no immediate sync trigger + // Note: if the flush failed we will retry it on the next loop. This is safe + // assuming that the LogStore is atomic and not left in an invalid state + // (which Raft assumes in general already). It might loop and retry the + // write of the same logs next time which may fail again or may even succeed + // before the leader loop notices the error and steps down. But either way + // it's fine because we don't advance the persistedIndex if it fails so + // we'll keep trying to write the same logs at least not leave a gap. + // Actually if we do error, even if there is no immediate sync trigger // waiting, the leader will step down and disable async which will mean we // attempt to flush again anyway. If that fails though (in the stop case - // above) we won't keep retrying and will just re-report the error. + // above) we won't keep retrying and will just re-report the error. If the + // error is coming back hard and we have new logs piling up we could end up + // in a hot loop until the leader steps down. That's probably not a big deal + // since that shouldn't take long but we'll be defensive and back off a + // little too just in case. + if lwc != nil { + if lwc.Error != nil { + consecutiveErrors++ + lastErrTime = time.Now() + } else { + // Note we only reset this if we actually just flushed something not + // when lwc is nil. + consecutiveErrors = 0 + } + } // Need a lock to deliver the completion and update persistent index c.state.Lock() diff --git a/log_cache_async_test.go b/log_cache_async_test.go index ac568dd5..a29ab7d1 100644 --- a/log_cache_async_test.go +++ b/log_cache_async_test.go @@ -11,6 +11,10 @@ import ( func TestLogCacheAsyncBasics(t *testing.T) { underlying := NewInmemStore() + // Set it to behave monotonically to ensure we only ever write the necessary + // logs in correct sequence and not re-write the old ones. + underlying.SetMonotonic(true) + c, err := NewLogCacheAsync(32, underlying) require.NoError(t, err) @@ -53,15 +57,9 @@ func TestLogCacheAsyncBasics(t *testing.T) { // Unblock the write on the underlying log blockCancel() - // Wait for the completion event - select { - case lwc := <-compCh: - require.NoError(t, lwc.Error) - // Should get a single completion for all logs up to 10 - require.Equal(t, 10, int(lwc.PersistentIndex)) - case <-time.After(100 * time.Millisecond): - t.Fatal("timeout waiting for IO completion") - } + // Wait for the completion event (note that we might get one completion for + // both writes or separate ones depending on timing). + assertOKCompletions(t, compCh, 8, 10) // Now the underlying should have all the logs assertLogContents(t, underlying, 1, 10) @@ -84,10 +82,60 @@ func TestLogCacheAsyncBasics(t *testing.T) { // Fail the underlying write underlying.FailNext() blockCancel() + + // We should get the write error reported on the next completion. + lwc := expectCompletion(t, compCh) + require.ErrorContains(t, lwc.Error, "IO error") + // Persistent index should be unchanged since the flush failed + require.Equal(t, 10, int(lwc.PersistentIndex)) + + // But then eventually the write should succeed even if no more writes happen. + // We might see just one or both writes flush together. + assertOKCompletions(t, compCh, 12, 15) + + assertLogContents(t, underlying, 1, 15) } -// TODO: -// yt +func expectCompletion(t *testing.T, compCh <-chan LogWriteCompletion) LogWriteCompletion { + t.Helper() + select { + case lwc := <-compCh: + return lwc + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout waiting for IO completion") + } + return LogWriteCompletion{} +} + +// assertOKCompletions helps to assert that one or more non-error completions +// are received. The max index written in each call to StoreLogsAsync should be +// passed since it's non deterministic which ones are persisted together. They +// must be in ascending order. We will assert that a completion arrives in a +// timely manner, and that it's for a valid prefix of the batches written. If +// it's not all of them, we keep waiting for the rest recursively. +func assertOKCompletions(t *testing.T, compCh <-chan LogWriteCompletion, maxBatchIndexes ...int) { + t.Helper() + lwc := expectCompletion(t, compCh) + require.NoError(t, lwc.Error) + + foundBatchIdx := -1 + for i, idx := range maxBatchIndexes { + if int(lwc.PersistentIndex) == idx { + foundBatchIdx = i + break + } + } + require.GreaterOrEqual(t, foundBatchIdx, 0, + "unexpected persistent index in completion: %d, wanted one of %v", + lwc.PersistentIndex, + maxBatchIndexes, + ) + + if foundBatchIdx < len(maxBatchIndexes)-1 { + // We didn't get all the batches acknowledged yet, keep waiting. + assertOKCompletions(t, compCh, maxBatchIndexes[foundBatchIdx+1:]...) + } +} func assertLogContents(t *testing.T, s LogStore, min, max int) { t.Helper() @@ -97,6 +145,14 @@ func assertLogContents(t *testing.T, s LogStore, min, max int) { // check it matches expectations. var expected, got []string + // Ensure that the min and max are the actual range the log contains! + first, err := s.FirstIndex() + require.NoError(t, err) + require.Equal(t, min, int(first)) + last, err := s.LastIndex() + require.NoError(t, err) + require.Equal(t, max, int(last)) + var log Log for idx := min; idx <= max; idx++ { expected = append(expected, fmt.Sprintf("%d => op-%d", idx, idx))