Skip to content

Commit

Permalink
feat: wait aggregator to start until data is ready
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Aug 22, 2024
1 parent 39ae053 commit 1cf353b
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 0 deletions.
11 changes: 11 additions & 0 deletions node/pkg/aggregator/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"bisonai.com/miko/node/pkg/db"

errorSentinel "bisonai.com/miko/node/pkg/error"
"bisonai.com/miko/node/pkg/utils/condition"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -153,6 +154,11 @@ func (a *App) startAggregator(ctx context.Context, aggregator *Aggregator) error
return nil
}

isReady := func() bool {
return a.isLocalAggregateReady(aggregator.ID)
}
condition.WaitForCondition(ctx, isReady)

nodeCtx, cancel := context.WithCancel(ctx)
aggregator.nodeCtx = nodeCtx
aggregator.nodeCancel = cancel
Expand Down Expand Up @@ -360,3 +366,8 @@ func (a *App) handleMessage(ctx context.Context, msg bus.Message) {
return
}
}

func (a *App) isLocalAggregateReady(confidId int32) bool {
_, ok := a.LatestLocalAggregates.Load(confidId)
return ok
}
2 changes: 2 additions & 0 deletions node/pkg/error/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,6 @@ var (
ErrLogscribeConsumerServiceNotProvided = &CustomError{Service: LogscribeConsumer, Code: InvalidInputError, Message: "Service field not provided in logscribeconsumer"}
ErrLogscribeConsumerInvalidLevel = &CustomError{Service: LogscribeConsumer, Code: InvalidInputError, Message: "Invalid log level provided to logscribeconsumer"}
ErrLogscribeConsumerEndpointUnresponsive = &CustomError{Service: LogscribeConsumer, Code: NetworkError, Message: "Logscribe endpoint unresponsive"}

ErrConditionTimedOut = &CustomError{Service: Others, Code: InternalError, Message: "Condition timed out"}
)
41 changes: 41 additions & 0 deletions node/pkg/utils/condition/condition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package condition

import (
"context"
"time"

errorsentinel "bisonai.com/miko/node/pkg/error"
)

// can be blocking infinitely if condition is not met, use with caution
func WaitForCondition(ctx context.Context, condition func() bool) {
for {
if condition() {
return
}

select {
case <-ctx.Done():
return
default:
time.Sleep(500 * time.Millisecond)
}
}
}

func WaitForConditionWithTimeout(ctx context.Context, timeout time.Duration, condition func() bool) error {
for {
if condition() {
return nil
}

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(timeout):
return errorsentinel.ErrConditionTimedOut
default:
time.Sleep(500 * time.Millisecond)
}
}
}
41 changes: 41 additions & 0 deletions node/pkg/utils/tests/condition_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package tests

import (
"context"
"testing"
"time"

"bisonai.com/miko/node/pkg/utils/condition"
"github.com/stretchr/testify/assert"
)

func TestWaitForCondition_Success(t *testing.T) {
conditionMet := false

testCond := func() bool {
return conditionMet
}

go func() {
time.Sleep(100 * time.Millisecond)
conditionMet = true
}()

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

condition.WaitForCondition(ctx, testCond)

assert.True(t, conditionMet)
}

func TestWaitForCondition_Timeout(t *testing.T) {
testCond := func() bool {
return false
}

ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()

condition.WaitForCondition(ctx, testCond)
}

0 comments on commit 1cf353b

Please sign in to comment.