Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(store): track store's contiguous head #239

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions p2p/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ func TestExchangeServer_handleRequestTimeout(t *testing.T) {
peer := createMocknet(t, 1)
s, err := store.NewStore[*headertest.DummyHeader](datastore.NewMapDatastore())
require.NoError(t, err)
head := headertest.RandDummyHeader(t)
head.HeightI %= 1000 // make it a bit lower
cristaloleg marked this conversation as resolved.
Show resolved Hide resolved
s.Init(context.Background(), head)
server, err := NewExchangeServer[*headertest.DummyHeader](
peer[0],
s,
Expand Down
6 changes: 5 additions & 1 deletion store/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ func (b *batch[H]) getByHeight(height uint64) H {
return zero
}

return b.headers[height-base-1]
h := b.headers[height-base-1]
if h.Height() == height {
cristaloleg marked this conversation as resolved.
Show resolved Hide resolved
return h
}
return zero
}

// Append appends new headers to the batch.
Expand Down
147 changes: 77 additions & 70 deletions store/heightsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,34 @@ type heightSub[H header.Header[H]] struct {
// height refers to the latest locally available header height
// that has been fully verified and inserted into the subjective chain
height atomic.Uint64
heightReqsLk sync.Mutex
heightReqs map[uint64]map[chan H]struct{}
heightSubsLk sync.Mutex
heightSubs map[uint64]*signalAndCounter
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}

type signalAndCounter struct {
signal chan struct{}
count int
}

// newHeightSub instantiates new heightSub.
func newHeightSub[H header.Header[H]]() *heightSub[H] {
return &heightSub[H]{
heightReqs: make(map[uint64]map[chan H]struct{}),
heightSubs: make(map[uint64]*signalAndCounter),
}
}

// Init the heightSub with a given height.
// Unblocks all awaiting [Wait] calls lower than height.
func (hs *heightSub[H]) Init(height uint64) {
hs.height.Store(height)

hs.heightSubsLk.Lock()
defer hs.heightSubsLk.Unlock()

for h := range hs.heightSubs {
if h < height {
hs.unblockHeight(h, true)
}
}
}

Expand All @@ -34,97 +54,84 @@ func (hs *heightSub[H]) Height() uint64 {
}

// SetHeight sets the new head height for heightSub.
// Unblocks all awaiting [Wait] calls in range from [heightSub.Height] to height.
func (hs *heightSub[H]) SetHeight(height uint64) {
cristaloleg marked this conversation as resolved.
Show resolved Hide resolved
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
hs.height.Store(height)
for {
curr := hs.height.Load()
if curr >= height {
return
}
if !hs.height.CompareAndSwap(curr, height) {
continue
}

hs.heightSubsLk.Lock()
defer hs.heightSubsLk.Unlock() //nolint:gocritic we have a return below

for ; curr <= height; curr++ {
hs.unblockHeight(curr, true)
}
return
}
}

// Sub subscribes for a header of a given height.
// It can return errElapsedHeight, which means a requested header was already provided
// Wait for a given height to be published.
// It can return errElapsedHeight, which means a requested height was already seen
// and caller should get it elsewhere.
func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) {
var zero H
func (hs *heightSub[H]) Wait(ctx context.Context, height uint64) error {
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
if hs.Height() >= height {
return zero, errElapsedHeight
return errElapsedHeight
}

hs.heightReqsLk.Lock()
hs.heightSubsLk.Lock()
if hs.Height() >= height {
// This is a rare case we have to account for.
// The lock above can park a goroutine long enough for hs.height to change for a requested height,
// leaving the request never fulfilled and the goroutine deadlocked.
hs.heightReqsLk.Unlock()
return zero, errElapsedHeight
hs.heightSubsLk.Unlock()
return errElapsedHeight
}
resp := make(chan H, 1)
reqs, ok := hs.heightReqs[height]

sac, ok := hs.heightSubs[height]
if !ok {
reqs = make(map[chan H]struct{})
hs.heightReqs[height] = reqs
sac = &signalAndCounter{
signal: make(chan struct{}, 1),
}
hs.heightSubs[height] = sac
}
reqs[resp] = struct{}{}
hs.heightReqsLk.Unlock()
sac.count++
hs.heightSubsLk.Unlock()

select {
case resp := <-resp:
return resp, nil
case <-sac.signal:
return nil
case <-ctx.Done():
// no need to keep the request, if the op has canceled
hs.heightReqsLk.Lock()
delete(reqs, resp)
if len(reqs) == 0 {
delete(hs.heightReqs, height)
}
hs.heightReqsLk.Unlock()
return zero, ctx.Err()
hs.heightSubsLk.Lock()
hs.unblockHeight(height, false)
hs.heightSubsLk.Unlock()
return ctx.Err()
}
}

// Pub processes all the outstanding subscriptions matching the given headers.
// Pub is only safe when called from one goroutine.
// For Pub to work correctly, heightSub has to be initialized with SetHeight
// so that given headers are contiguous to the height on heightSub.
func (hs *heightSub[H]) Pub(headers ...H) {
ln := len(headers)
if ln == 0 {
return
}
// UnblockHeight and release the waiters in [Wait].
// Note: do not advance heightSub's height.
func (hs *heightSub[H]) UnblockHeight(height uint64) {
hs.heightSubsLk.Lock()
defer hs.heightSubsLk.Unlock()

height := hs.Height()
from, to := headers[0].Height(), headers[ln-1].Height()
if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1
log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from)
return
}
hs.SetHeight(to)

hs.heightReqsLk.Lock()
defer hs.heightReqsLk.Unlock()

// there is a common case where we Pub only header
// in this case, we shouldn't loop over each heightReqs
// and instead read from the map directly
if ln == 1 {
reqs, ok := hs.heightReqs[from]
if ok {
for req := range reqs {
req <- headers[0] // reqs must always be buffered, so this won't block
}
delete(hs.heightReqs, from)
}
hs.unblockHeight(height, true)
}

func (hs *heightSub[H]) unblockHeight(height uint64, all bool) {
sac, ok := hs.heightSubs[height]
if !ok {
return
}

// instead of looping over each header in 'headers', we can loop over each request
// which will drastically decrease idle iterations, as there will be less requests than headers
for height, reqs := range hs.heightReqs {
// then we look if any of the requests match the given range of headers
if height >= from && height <= to {
// and if so, calculate its position and fulfill requests
h := headers[height-from]
for req := range reqs {
req <- h // reqs must always be buffered, so this won't block
}
delete(hs.heightReqs, height)
}
sac.count--
if all || sac.count == 0 {
close(sac.signal)
delete(hs.heightSubs, height)
}
}
122 changes: 95 additions & 27 deletions store/heightsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@ func TestHeightSub(t *testing.T) {

// assert subscription returns nil for past heights
{
h := headertest.RandDummyHeader(t)
h.HeightI = 100
hs.SetHeight(99)
hs.Pub(h)
hs.Init(99)

h, err := hs.Sub(ctx, 10)
err := hs.Wait(ctx, 10)
assert.ErrorIs(t, err, errElapsedHeight)
assert.Nil(t, h)
}

// assert actual subscription works
Expand All @@ -34,52 +30,125 @@ func TestHeightSub(t *testing.T) {
// fixes flakiness on CI
time.Sleep(time.Millisecond)

h1 := headertest.RandDummyHeader(t)
h1.HeightI = 101
h2 := headertest.RandDummyHeader(t)
h2.HeightI = 102
hs.Pub(h1, h2)
hs.SetHeight(102)
}()

h, err := hs.Sub(ctx, 101)
err := hs.Wait(ctx, 101)
assert.NoError(t, err)
assert.NotNil(t, h)
}

// assert multiple subscriptions work
{
ch := make(chan error, 10)
for range cap(ch) {
go func() {
_, err := hs.Sub(ctx, 103)
err := hs.Wait(ctx, 103)
ch <- err
}()
}

time.Sleep(time.Millisecond * 10)

h3 := headertest.RandDummyHeader(t)
h3.HeightI = 103
hs.Pub(h3)
hs.SetHeight(103)

for range cap(ch) {
assert.NoError(t, <-ch)
}
}
}

func TestHeightSub_withWaitCancelled(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

hs := newHeightSub[*headertest.DummyHeader]()
hs.Init(10)

const waiters = 5

cancelChs := make([]chan error, waiters)
blockedChs := make([]chan error, waiters)
for i := range waiters {
cancelChs[i] = make(chan error, 1)
blockedChs[i] = make(chan error, 1)

go func() {
ctx, cancel := context.WithTimeout(ctx, time.Duration(i+1)*time.Millisecond)
defer cancel()

err := hs.Wait(ctx, 100)
cancelChs[i] <- err
}()

go func() {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

err := hs.Wait(ctx, 100)
blockedChs[i] <- err
}()
}

for i := range cancelChs {
err := <-cancelChs[i]
assert.ErrorIs(t, err, context.DeadlineExceeded)
}

for i := range blockedChs {
select {
case <-blockedChs[i]:
t.Error("channel should be blocked")
default:
}
}
}

// Test heightSub can accept non-adj headers without an error.
func TestHeightSubNonAdjacement(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

hs := newHeightSub[*headertest.DummyHeader]()
hs.Init(99)

go func() {
// fixes flakiness on CI
time.Sleep(time.Millisecond)

hs.SetHeight(300)
}()

err := hs.Wait(ctx, 200)
assert.NoError(t, err)
}

// Test heightSub's height cannot go down but only up.
func TestHeightSub_monotonicHeight(t *testing.T) {
hs := newHeightSub[*headertest.DummyHeader]()

hs.Init(99)
assert.Equal(t, int64(hs.height.Load()), int64(99))

hs.SetHeight(300)
assert.Equal(t, int64(hs.height.Load()), int64(300))

hs.SetHeight(120)
assert.Equal(t, int64(hs.height.Load()), int64(300))
}

func TestHeightSubCancellation(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

h := headertest.RandDummyHeader(t)
h.HeightI %= 1000 // make it a bit lower
hs := newHeightSub[*headertest.DummyHeader]()

sub := make(chan *headertest.DummyHeader)
sub := make(chan struct{})
go func() {
// subscribe first time
h, _ := hs.Sub(ctx, h.HeightI)
sub <- h
hs.Wait(ctx, h.Height())
sub <- struct{}{}
}()

// give a bit time for subscription to settle
Expand All @@ -88,19 +157,18 @@ func TestHeightSubCancellation(t *testing.T) {
// subscribe again but with failed canceled context
canceledCtx, cancel := context.WithCancel(ctx)
cancel()
_, err := hs.Sub(canceledCtx, h.HeightI)
assert.Error(t, err)
err := hs.Wait(canceledCtx, h.Height())
assert.ErrorIs(t, err, context.Canceled)

// publish header
hs.Pub(h)
// update height
hs.SetHeight(h.Height())

// ensure we still get our header
select {
case subH := <-sub:
assert.Equal(t, h.HeightI, subH.HeightI)
case <-sub:
case <-ctx.Done():
t.Error(ctx.Err())
}
// ensure we don't have any active subscriptions
assert.Len(t, hs.heightReqs, 0)
assert.Len(t, hs.heightSubs, 0)
}
Loading
Loading