Skip to content

Commit

Permalink
[chore] prometheusremotewrite: simplify wal initialization (open-tele…
Browse files Browse the repository at this point in the history
…metry#30631)

In the only production code that calls the `newWal` we first verify that
the config is not nil then call into the `newWal` that returns error if
config is nil which means that code never runs in production, so better
to simplify the newWal and return nil if error is nil.

Alternative we can just remove the nil check in `newWal` and rely on the
caller to check for the nil config.

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Jan 23, 2024
1 parent c69b447 commit f07bcda
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 30 deletions.
8 changes: 1 addition & 7 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,8 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err
SendMetadata: cfg.SendMetadata,
},
}
if cfg.WAL == nil {
return prwe, nil
}

prwe.wal, err = newWAL(cfg.WAL, prwe.export)
if err != nil {
return nil, err
}
prwe.wal = newWAL(cfg.WAL, prwe.export)
return prwe, nil
}

Expand Down
7 changes: 3 additions & 4 deletions exporter/prometheusremotewriteexporter/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func (wc *WALConfig) truncateFrequency() time.Duration {
return defaultWALTruncateFrequency
}

func newWAL(walConfig *WALConfig, exportSink func(context.Context, []*prompb.WriteRequest) error) (*prweWAL, error) {
func newWAL(walConfig *WALConfig, exportSink func(context.Context, []*prompb.WriteRequest) error) *prweWAL {
if walConfig == nil {
// There are cases for which the WAL can be disabled.
// TODO: Perhaps log that the WAL wasn't enabled.
return nil, errNilConfig
return nil
}

return &prweWAL{
Expand All @@ -72,7 +72,7 @@ func newWAL(walConfig *WALConfig, exportSink func(context.Context, []*prompb.Wri
stopChan: make(chan struct{}),
rWALIndex: &atomic.Uint64{},
wWALIndex: &atomic.Uint64{},
}, nil
}
}

func (wc *WALConfig) createWAL() (*wal.Log, string, error) {
Expand All @@ -90,7 +90,6 @@ func (wc *WALConfig) createWAL() (*wal.Log, string, error) {
var (
errAlreadyClosed = errors.New("already closed")
errNilWAL = errors.New("wal is nil")
errNilConfig = errors.New("expecting a non-nil configuration")
)

// retrieveWALIndices queries the WriteAheadLog for its current first and last indices.
Expand Down
31 changes: 12 additions & 19 deletions exporter/prometheusremotewriteexporter/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ func doNothingExportSink(_ context.Context, reqL []*prompb.WriteRequest) error {

func TestWALCreation_nilConfig(t *testing.T) {
config := (*WALConfig)(nil)
pwal, err := newWAL(config, doNothingExportSink)
require.Equal(t, err, errNilConfig)
pwal := newWAL(config, doNothingExportSink)
require.Nil(t, pwal)
}

func TestWALCreation_nonNilConfig(t *testing.T) {
config := &WALConfig{Directory: t.TempDir()}
pwal, err := newWAL(config, doNothingExportSink)
pwal := newWAL(config, doNothingExportSink)
require.NotNil(t, pwal)
assert.Nil(t, err)
assert.NoError(t, pwal.stop())
}

Expand Down Expand Up @@ -80,27 +78,24 @@ func TestWALStopManyTimes(t *testing.T) {
TruncateFrequency: 60 * time.Microsecond,
BufferSize: 1,
}
pwal, err := newWAL(config, doNothingExportSink)
require.Nil(t, err)
pwal := newWAL(config, doNothingExportSink)
require.NotNil(t, pwal)

// Ensure that invoking .stop() multiple times doesn't cause a panic, but actually
// First close should NOT return an error.
err = pwal.stop()
require.Nil(t, err)
require.NoError(t, pwal.stop())
for i := 0; i < 4; i++ {
// Every invocation to .stop() should return an errAlreadyClosed.
err = pwal.stop()
require.Equal(t, err, errAlreadyClosed)
require.ErrorIs(t, pwal.stop(), errAlreadyClosed)
}
}

func TestWAL_persist(t *testing.T) {
// Unit tests that requests written to the WAL persist.
config := &WALConfig{Directory: t.TempDir()}

pwal, err := newWAL(config, doNothingExportSink)
require.Nil(t, err)
pwal := newWAL(config, doNothingExportSink)
require.NotNil(t, pwal)

// 1. Write out all the entries.
reqL := []*prompb.WriteRequest{
Expand All @@ -127,27 +122,25 @@ func TestWAL_persist(t *testing.T) {
}

ctx := context.Background()
err = pwal.retrieveWALIndices()
require.Nil(t, err)
require.NoError(t, pwal.retrieveWALIndices())
t.Cleanup(func() {
assert.NoError(t, pwal.stop())
})

err = pwal.persistToWAL(reqL)
require.Nil(t, err)
require.NoError(t, pwal.persistToWAL(reqL))

// 2. Read all the entries from the WAL itself, guided by the indices available,
// and ensure that they are exactly in order as we'd expect them.
wal := pwal.wal
start, err := wal.FirstIndex()
require.Nil(t, err)
require.NoError(t, err)
end, err := wal.LastIndex()
require.Nil(t, err)
require.NoError(t, err)

var reqLFromWAL []*prompb.WriteRequest
for i := start; i <= end; i++ {
req, err := pwal.readPrompbFromWAL(ctx, i)
require.Nil(t, err)
require.NoError(t, err)
reqLFromWAL = append(reqLFromWAL, req)
}

Expand Down

0 comments on commit f07bcda

Please sign in to comment.