Skip to content

Commit

Permalink
Fix handling of GetNotifications() request when leader is not fully i…
Browse files Browse the repository at this point in the history
…nitialized (#437)

Fixes #430
  • Loading branch information
merlimat authored Feb 6, 2024
1 parent eca092f commit facc428
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
17 changes: 16 additions & 1 deletion server/leader_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,9 @@ func (lc *leaderController) GetNotifications(req *proto.NotificationsRequest, st
cancel()
return lc.ctx.Err()

case <-ctx.Done():
return ctx.Err()

case <-stream.Context().Done():
// The stream is getting closed
cancel()
Expand All @@ -810,7 +813,14 @@ func (lc *leaderController) dispatchNotifications(ctx context.Context, req *prot
if req.StartOffsetExclusive != nil {
offsetInclusive = *req.StartOffsetExclusive + 1
} else {
commitOffset := lc.quorumAckTracker.CommitOffset()
lc.Lock()
qat := lc.quorumAckTracker
lc.Unlock()

if qat == nil {
return errors.New("leader is not yet ready")
}
commitOffset := qat.CommitOffset()

// The client is creating a new notification stream and wants to receive the notification from the next
// entry that will be written.
Expand All @@ -833,6 +843,11 @@ func (lc *leaderController) dispatchNotifications(ctx context.Context, req *prot
offsetInclusive = commitOffset + 1
}

return lc.iterateOverNotifications(ctx, stream, offsetInclusive)
}

func (lc *leaderController) iterateOverNotifications(ctx context.Context, stream proto.OxiaClient_GetNotificationsServer, startOffsetInclusive int64) error {
offsetInclusive := startOffsetInclusive
for ctx.Err() == nil {
notifications, err := lc.db.ReadNextNotifications(ctx, offsetInclusive)
if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions server/leader_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,28 @@ func TestLeaderController_NotificationsCloseLeader(t *testing.T) {
assert.NoError(t, walFactory.Close())
}

func TestLeaderController_NotificationsWhenNotReady(t *testing.T) {
var shard int64 = 1

kvFactory, _ := kv.NewPebbleKVFactory(testKVOptions)
walFactory := newTestWalFactory(t)

lc, _ := NewLeaderController(Config{}, common.DefaultNamespace, shard, newMockRpcClient(), walFactory, kvFactory)
_, _ = lc.NewTerm(&proto.NewTermRequest{ShardId: shard, Term: 1})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream := newMockGetNotificationsServer(ctx)

// Get notification should fail if the leader controller is not fully initialized
err := lc.GetNotifications(&proto.NotificationsRequest{ShardId: shard}, stream)
assert.ErrorIs(t, err, context.Canceled)

assert.NoError(t, lc.Close())
assert.NoError(t, kvFactory.Close())
assert.NoError(t, walFactory.Close())
}

func TestLeaderController_List(t *testing.T) {
var shard int64 = 1

Expand Down

0 comments on commit facc428

Please sign in to comment.