diff --git a/internal/datasourcev2/polling_data_source.go b/internal/datasourcev2/polling_data_source.go index 0f19e86d..690f7ff1 100644 --- a/internal/datasourcev2/polling_data_source.go +++ b/internal/datasourcev2/polling_data_source.go @@ -22,7 +22,7 @@ const ( // PollingRequester allows PollingProcessor to delegate fetching data to another component. // This is useful for testing the PollingProcessor without needing to set up a test HTTP server. type PollingRequester interface { - Request() (*fdv2proto.ChangeSet, error) + Request(context.Context) (*fdv2proto.ChangeSet, error) BaseURI() string FilterKey() string } @@ -79,10 +79,8 @@ func (pp *PollingProcessor) Name() string { } //nolint:revive // DataInitializer method. -func (pp *PollingProcessor) Fetch(_ context.Context) (*subsystems.Basis, error) { - //nolint:godox - // TODO(SDK-752): Plumb the context into the request method. - basis, err := pp.requester.Request() +func (pp *PollingProcessor) Fetch(ctx context.Context) (*subsystems.Basis, error) { + basis, err := pp.requester.Request(ctx) if err != nil { return nil, err } @@ -93,6 +91,13 @@ func (pp *PollingProcessor) Fetch(_ context.Context) (*subsystems.Basis, error) func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}) { pp.loggers.Infof("Starting LaunchDarkly polling with interval: %+v", pp.pollInterval) + // This process has a shared method serving both as an initializer and a synchronizer. + // + // The initializers currently provide a cancellable context throughout + // their call stack. Once we have done the same with the synchronizers, we + // can the TODO context with a real one. + ctx := context.TODO() + ticker := newTickerWithInitialTick(pp.pollInterval) go func() { @@ -112,7 +117,7 @@ func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}) { case <-pp.quit: return case <-ticker.C: - if err := pp.poll(); err != nil { + if err := pp.poll(ctx); err != nil { if hse, ok := err.(httpStatusError); ok { errorInfo := interfaces.DataSourceErrorInfo{ Kind: interfaces.DataSourceErrorKindErrorResponse, @@ -158,8 +163,8 @@ func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}) { }() } -func (pp *PollingProcessor) poll() error { - changeSet, err := pp.requester.Request() +func (pp *PollingProcessor) poll(ctx context.Context) error { + changeSet, err := pp.requester.Request(ctx) if err != nil { return err diff --git a/internal/datasourcev2/polling_http_request.go b/internal/datasourcev2/polling_http_request.go index 191ec7b7..49b329d3 100644 --- a/internal/datasourcev2/polling_http_request.go +++ b/internal/datasourcev2/polling_http_request.go @@ -1,6 +1,7 @@ package datasourcev2 import ( + "context" "encoding/json" "fmt" "io" @@ -67,12 +68,12 @@ func (r *pollingRequester) FilterKey() string { return r.filterKey } -func (r *pollingRequester) Request() (*fdv2proto.ChangeSet, error) { +func (r *pollingRequester) Request(ctx context.Context) (*fdv2proto.ChangeSet, error) { if r.loggers.IsDebugEnabled() { r.loggers.Debug("Polling LaunchDarkly for feature flag updates") } - body, cached, err := r.makeRequest(endpoints.PollingRequestPath) + body, cached, err := r.makeRequest(ctx, endpoints.PollingRequestPath) if err != nil { return nil, err } @@ -88,45 +89,50 @@ func (r *pollingRequester) Request() (*fdv2proto.ChangeSet, error) { changeSet := fdv2proto.NewChangeSetBuilder() for _, event := range payload.Events { - switch event.Name { - case fdv2proto.EventServerIntent: - var serverIntent fdv2proto.ServerIntent - err := json.Unmarshal(event.Data, &serverIntent) - if err != nil { - return nil, err + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + switch event.Name { + case fdv2proto.EventServerIntent: + var serverIntent fdv2proto.ServerIntent + err := json.Unmarshal(event.Data, &serverIntent) + if err != nil { + return nil, err + } + if serverIntent.Payload.Code == fdv2proto.IntentNone { + return changeSet.NoChanges(), nil + } + if err := changeSet.Start(serverIntent); err != nil { + return nil, err + } + case fdv2proto.EventPutObject: + var put fdv2proto.PutObject + if err := json.Unmarshal(event.Data, &put); err != nil { + return nil, err + } + changeSet.AddPut(put.Kind, put.Key, put.Version, put.Object) + case fdv2proto.EventDeleteObject: + var deleteObject fdv2proto.DeleteObject + if err := json.Unmarshal(event.Data, &deleteObject); err != nil { + return nil, err + } + changeSet.AddDelete(deleteObject.Kind, deleteObject.Key, deleteObject.Version) + case fdv2proto.EventPayloadTransferred: + var selector fdv2proto.Selector + if err := json.Unmarshal(event.Data, &selector); err != nil { + return nil, err + } + return changeSet.Finish(selector) } - if serverIntent.Payload.Code == fdv2proto.IntentNone { - return changeSet.NoChanges(), nil - } - if err := changeSet.Start(serverIntent); err != nil { - return nil, err - } - case fdv2proto.EventPutObject: - var put fdv2proto.PutObject - if err := json.Unmarshal(event.Data, &put); err != nil { - return nil, err - } - changeSet.AddPut(put.Kind, put.Key, put.Version, put.Object) - case fdv2proto.EventDeleteObject: - var deleteObject fdv2proto.DeleteObject - if err := json.Unmarshal(event.Data, &deleteObject); err != nil { - return nil, err - } - changeSet.AddDelete(deleteObject.Kind, deleteObject.Key, deleteObject.Version) - case fdv2proto.EventPayloadTransferred: - var selector fdv2proto.Selector - if err := json.Unmarshal(event.Data, &selector); err != nil { - return nil, err - } - return changeSet.Finish(selector) } } return nil, fmt.Errorf("didn't receive any known protocol events in polling payload") } -func (r *pollingRequester) makeRequest(resource string) ([]byte, bool, error) { - req, reqErr := http.NewRequest("GET", endpoints.AddPath(r.baseURI, resource), nil) +func (r *pollingRequester) makeRequest(ctx context.Context, resource string) ([]byte, bool, error) { + req, reqErr := http.NewRequestWithContext(ctx, "GET", endpoints.AddPath(r.baseURI, resource), nil) if reqErr != nil { reqErr = fmt.Errorf( "unable to create a poll request; this is not a network problem, most likely a bad base URI: %w",