Skip to content

Commit

Permalink
feat: make multipart heartbeat configurable (#1006)
Browse files Browse the repository at this point in the history
Previous work for multipart added a timer for regular heartbeat messages
which could be set to keep the subscription connection alive. We had
configured it to always be 5 seconds, but that caused friction in our
cosmo tests which wanted to verify that the heartbeat would be sent.
This PR allows for it to be configurable via options, so that we can
change it in our tests and save a few seconds.
  • Loading branch information
df-wg authored Dec 5, 2024
1 parent a521793 commit 7675b4b
Showing 1 changed file with 25 additions and 17 deletions.
42 changes: 25 additions & 17 deletions v2/pkg/engine/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

const (
HeartbeatInterval = 5 * time.Second
DefaultHeartbeatInterval = 5 * time.Second
)

var (
Expand Down Expand Up @@ -65,8 +65,9 @@ type Resolver struct {
reporter Reporter
asyncErrorWriter AsyncErrorWriter

propagateSubgraphErrors bool
propagateSubgraphStatusCodes bool
propagateSubgraphErrors bool
propagateSubgraphStatusCodes bool
multipartSubHeartbeatInterval time.Duration
}

func (r *Resolver) SetAsyncErrorWriter(w AsyncErrorWriter) {
Expand Down Expand Up @@ -142,6 +143,8 @@ type ResolverOptions struct {
ResolvableOptions ResolvableOptions
// AllowedCustomSubgraphErrorFields defines which fields are allowed in the subgraph error when in passthrough mode
AllowedSubgraphErrorFields []string
// MultipartSubHeartbeatInterval defines the interval in which a heartbeat is sent to all multipart subscriptions
MultipartSubHeartbeatInterval time.Duration
}

// New returns a new Resolver, ctx.Done() is used to cancel all active subscriptions & streams
Expand All @@ -151,6 +154,10 @@ func New(ctx context.Context, options ResolverOptions) *Resolver {
options.MaxConcurrency = 32
}

if options.MultipartSubHeartbeatInterval <= 0 {
options.MultipartSubHeartbeatInterval = DefaultHeartbeatInterval
}

// We transform the allowed fields into a map for faster lookups
allowedExtensionFields := make(map[string]struct{}, len(options.AllowedErrorExtensionFields))
for _, field := range options.AllowedErrorExtensionFields {
Expand All @@ -176,18 +183,19 @@ func New(ctx context.Context, options ResolverOptions) *Resolver {
}

resolver := &Resolver{
ctx: ctx,
options: options,
propagateSubgraphErrors: options.PropagateSubgraphErrors,
propagateSubgraphStatusCodes: options.PropagateSubgraphStatusCodes,
events: make(chan subscriptionEvent),
triggers: make(map[uint64]*trigger),
heartbeatSubscriptions: make(map[*Context]*sub),
reporter: options.Reporter,
asyncErrorWriter: options.AsyncErrorWriter,
triggerUpdateBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
allowedErrorExtensionFields: allowedExtensionFields,
allowedErrorFields: allowedErrorFields,
ctx: ctx,
options: options,
propagateSubgraphErrors: options.PropagateSubgraphErrors,
propagateSubgraphStatusCodes: options.PropagateSubgraphStatusCodes,
events: make(chan subscriptionEvent),
triggers: make(map[uint64]*trigger),
heartbeatSubscriptions: make(map[*Context]*sub),
reporter: options.Reporter,
asyncErrorWriter: options.AsyncErrorWriter,
triggerUpdateBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
allowedErrorExtensionFields: allowedExtensionFields,
allowedErrorFields: allowedErrorFields,
multipartSubHeartbeatInterval: options.MultipartSubHeartbeatInterval,
}
resolver.maxConcurrency = make(chan struct{}, options.MaxConcurrency)
for i := 0; i < options.MaxConcurrency; i++ {
Expand Down Expand Up @@ -358,7 +366,7 @@ func (r *Resolver) executeSubscriptionUpdate(ctx *Context, sub *sub, sharedInput

func (r *Resolver) handleEvents() {
done := r.ctx.Done()
heartbeat := time.NewTicker(HeartbeatInterval)
heartbeat := time.NewTicker(r.multipartSubHeartbeatInterval)
defer heartbeat.Stop()
for {
select {
Expand Down Expand Up @@ -407,7 +415,7 @@ func (r *Resolver) handleHeartbeat(data []byte) {
// check if the last write to the subscription was more than heartbeat interval ago
c, s := c, s
s.mux.Lock()
skipHeartbeat := now.Sub(s.lastWrite) < HeartbeatInterval
skipHeartbeat := now.Sub(s.lastWrite) < r.multipartSubHeartbeatInterval
s.mux.Unlock()
if skipHeartbeat {
continue
Expand Down

0 comments on commit 7675b4b

Please sign in to comment.