Skip to content

Commit

Permalink
chore: Plumb context.Context throughout polling datasourcev2 (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
keelerm84 authored Jan 7, 2025
1 parent d4f25e4 commit f61835f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 42 deletions.
21 changes: 13 additions & 8 deletions internal/datasourcev2/polling_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
// PollingRequester allows PollingProcessor to delegate fetching data to another component.
// This is useful for testing the PollingProcessor without needing to set up a test HTTP server.
type PollingRequester interface {
Request() (*fdv2proto.ChangeSet, error)
Request(context.Context) (*fdv2proto.ChangeSet, error)
BaseURI() string
FilterKey() string
}
Expand Down Expand Up @@ -79,10 +79,8 @@ func (pp *PollingProcessor) Name() string {
}

//nolint:revive // DataInitializer method.
func (pp *PollingProcessor) Fetch(_ context.Context) (*subsystems.Basis, error) {
//nolint:godox
// TODO(SDK-752): Plumb the context into the request method.
basis, err := pp.requester.Request()
func (pp *PollingProcessor) Fetch(ctx context.Context) (*subsystems.Basis, error) {
basis, err := pp.requester.Request(ctx)
if err != nil {
return nil, err
}
Expand All @@ -93,6 +91,13 @@ func (pp *PollingProcessor) Fetch(_ context.Context) (*subsystems.Basis, error)
func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}) {
pp.loggers.Infof("Starting LaunchDarkly polling with interval: %+v", pp.pollInterval)

// This process has a shared method serving both as an initializer and a synchronizer.
//
// The initializers currently provide a cancellable context throughout
// their call stack. Once we have done the same with the synchronizers, we
// can the TODO context with a real one.
ctx := context.TODO()

ticker := newTickerWithInitialTick(pp.pollInterval)

go func() {
Expand All @@ -112,7 +117,7 @@ func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}) {
case <-pp.quit:
return
case <-ticker.C:
if err := pp.poll(); err != nil {
if err := pp.poll(ctx); err != nil {
if hse, ok := err.(httpStatusError); ok {
errorInfo := interfaces.DataSourceErrorInfo{
Kind: interfaces.DataSourceErrorKindErrorResponse,
Expand Down Expand Up @@ -158,8 +163,8 @@ func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}) {
}()
}

func (pp *PollingProcessor) poll() error {
changeSet, err := pp.requester.Request()
func (pp *PollingProcessor) poll(ctx context.Context) error {
changeSet, err := pp.requester.Request(ctx)

if err != nil {
return err
Expand Down
74 changes: 40 additions & 34 deletions internal/datasourcev2/polling_http_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datasourcev2

import (
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -67,12 +68,12 @@ func (r *pollingRequester) FilterKey() string {
return r.filterKey
}

func (r *pollingRequester) Request() (*fdv2proto.ChangeSet, error) {
func (r *pollingRequester) Request(ctx context.Context) (*fdv2proto.ChangeSet, error) {
if r.loggers.IsDebugEnabled() {
r.loggers.Debug("Polling LaunchDarkly for feature flag updates")
}

body, cached, err := r.makeRequest(endpoints.PollingRequestPath)
body, cached, err := r.makeRequest(ctx, endpoints.PollingRequestPath)
if err != nil {
return nil, err
}
Expand All @@ -88,45 +89,50 @@ func (r *pollingRequester) Request() (*fdv2proto.ChangeSet, error) {
changeSet := fdv2proto.NewChangeSetBuilder()

for _, event := range payload.Events {
switch event.Name {
case fdv2proto.EventServerIntent:
var serverIntent fdv2proto.ServerIntent
err := json.Unmarshal(event.Data, &serverIntent)
if err != nil {
return nil, err
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
switch event.Name {
case fdv2proto.EventServerIntent:
var serverIntent fdv2proto.ServerIntent
err := json.Unmarshal(event.Data, &serverIntent)
if err != nil {
return nil, err
}
if serverIntent.Payload.Code == fdv2proto.IntentNone {
return changeSet.NoChanges(), nil
}
if err := changeSet.Start(serverIntent); err != nil {
return nil, err
}
case fdv2proto.EventPutObject:
var put fdv2proto.PutObject
if err := json.Unmarshal(event.Data, &put); err != nil {
return nil, err
}
changeSet.AddPut(put.Kind, put.Key, put.Version, put.Object)
case fdv2proto.EventDeleteObject:
var deleteObject fdv2proto.DeleteObject
if err := json.Unmarshal(event.Data, &deleteObject); err != nil {
return nil, err
}
changeSet.AddDelete(deleteObject.Kind, deleteObject.Key, deleteObject.Version)
case fdv2proto.EventPayloadTransferred:
var selector fdv2proto.Selector
if err := json.Unmarshal(event.Data, &selector); err != nil {
return nil, err
}
return changeSet.Finish(selector)
}
if serverIntent.Payload.Code == fdv2proto.IntentNone {
return changeSet.NoChanges(), nil
}
if err := changeSet.Start(serverIntent); err != nil {
return nil, err
}
case fdv2proto.EventPutObject:
var put fdv2proto.PutObject
if err := json.Unmarshal(event.Data, &put); err != nil {
return nil, err
}
changeSet.AddPut(put.Kind, put.Key, put.Version, put.Object)
case fdv2proto.EventDeleteObject:
var deleteObject fdv2proto.DeleteObject
if err := json.Unmarshal(event.Data, &deleteObject); err != nil {
return nil, err
}
changeSet.AddDelete(deleteObject.Kind, deleteObject.Key, deleteObject.Version)
case fdv2proto.EventPayloadTransferred:
var selector fdv2proto.Selector
if err := json.Unmarshal(event.Data, &selector); err != nil {
return nil, err
}
return changeSet.Finish(selector)
}
}

return nil, fmt.Errorf("didn't receive any known protocol events in polling payload")
}

func (r *pollingRequester) makeRequest(resource string) ([]byte, bool, error) {
req, reqErr := http.NewRequest("GET", endpoints.AddPath(r.baseURI, resource), nil)
func (r *pollingRequester) makeRequest(ctx context.Context, resource string) ([]byte, bool, error) {
req, reqErr := http.NewRequestWithContext(ctx, "GET", endpoints.AddPath(r.baseURI, resource), nil)
if reqErr != nil {
reqErr = fmt.Errorf(
"unable to create a poll request; this is not a network problem, most likely a bad base URI: %w",
Expand Down

0 comments on commit f61835f

Please sign in to comment.