Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add fallback/recovery additions [prototype] #223

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions internal/datasourcev2/polling_http_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,10 @@ func (r *pollingRequester) Request() (*fdv2proto.ChangeSet, error) {
r.loggers.Debug("Polling LaunchDarkly for feature flag updates")
}

body, cached, err := r.makeRequest(endpoints.PollingRequestPath)
Copy link
Contributor Author

@cwaldren-ld cwaldren-ld Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a hack because our current data sources are started/stopped rather than being re-instantiated. The issue is - if we had an error response (like the payload was malformed or just an HTTP error), we'd get back an empty changeset (.NoChanges()) with no error (nil).

This means if the previous Data Source Status was something like VALID due to a previous synchronizer, then we start up polling and it gets the same response as it did last time, we wouldn't update the state to INTERRUPTED based on this error.

If instead the data source was re-instantiated, then there would be no "previous state" for this new run of the data source. So we'd get the error, update the status, and then get the error again (cached) and not update the status - but that'd be correct from the data system's point of view, since nothing has changed.

body, _, err := r.makeRequest(endpoints.PollingRequestPath)
if err != nil {
return nil, err
}
if cached {
return fdv2proto.NewChangeSetBuilder().NoChanges(), nil
}

var payload fdv2proto.PollingPayload
if err = json.Unmarshal(body, &payload); err != nil {
Expand Down
1 change: 1 addition & 0 deletions internal/datasourcev2/streaming_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan<
sp.setInitializedAndNotifyClient(true, closeWhenReady)

default:
processedEvent = false
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems incorrect in both the fdv1 and fdv2 sources. If we get an unrecognized event, then we don't want to set the data source state to valid. That would clear any existing error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this shouldn't be a tri-bool and then we have a little different handling, where:

  1. We only set it to true if we have processed a valid event.
  2. Failure to decode an expected event fails would set this to false.
  3. An unknown event type would be ignored, with no change to this value, as a sort of forward compatible guard against new event types we want to add.
  4. We only execute line 294 if it is true, and avoid the false and new third (null?) state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a good idea.

enum ProcessingState {
    EVENT_DECODED,
    EVENT_IGNORED,
    EVENT_MALFORMED
}

or similar.

sp.loggers.Infof("Unexpected event found in stream: %s", event.Event())
}

Expand Down
140 changes: 114 additions & 26 deletions internal/datasystem/fdv2_datasystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
"sync"
"time"

"github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto"

"github.com/launchdarkly/go-sdk-common/v3/ldlog"
"github.com/launchdarkly/go-server-sdk/v7/interfaces"
"github.com/launchdarkly/go-server-sdk/v7/internal"
Expand Down Expand Up @@ -72,6 +70,9 @@
// Protects status.
mu sync.Mutex
status interfaces.DataSourceStatus

fallbackCond func() bool
recoveryCond func() bool
}

// NewFDv2 creates a new instance of the FDv2 data system. The first argument indicates if the system is enabled or
Expand Down Expand Up @@ -111,6 +112,24 @@
fdv2.primarySync = cfg.Synchronizers.Primary
fdv2.secondarySync = cfg.Synchronizers.Secondary
fdv2.disabled = disabled
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The power of conditions is the chaining I've shown here. We can have an arbitrary number of conditions and hook them up, which I can see being useful in the (far) future. For now, it'd probably be fine to define some preset conditions with a couple of knobs.

fdv2.fallbackCond = func() bool {
status := fdv2.getStatus()
fdv2.loggers.Debugf("Status: %s", status.String())
interruptedAtRuntime := status.State == interfaces.DataSourceStateInterrupted && time.Since(status.StateSince) > 1*time.Minute
cannotInitialize := status.State == interfaces.DataSourceStateInitializing && time.Since(status.StateSince) > 10*time.Second
healthyForTooLong := status.State == interfaces.DataSourceStateValid && time.Since(status.StateSince) > 30*time.Second
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

healthyForTooLong is an interesting one. We want this in the recoveryCond in order to prevent the SDK from using the secondary for too long - presumably we want to switch back to the primary because it is more efficient/better for [reasons].

I put it in the fallbackCond to cause a flip-flop pattern for demo purposes. We probably wouldn't actually want that condition in there. Although, it could be useful in a chaos monkey sense - every so often, check that your backup is functioning.


return interruptedAtRuntime || cannotInitialize || healthyForTooLong
}
fdv2.recoveryCond = func() bool {
status := fdv2.getStatus()
fdv2.loggers.Debugf("Status: %s", status.String())

interruptedAtRuntime := status.State == interfaces.DataSourceStateInterrupted && time.Since(status.StateSince) > 1*time.Minute
healthyForTooLong := status.State == interfaces.DataSourceStateValid && time.Since(status.StateSince) > 5*time.Minute
cannotInitialize := status.State == interfaces.DataSourceStateInitializing && time.Since(status.StateSince) > 10*time.Second
return interruptedAtRuntime || healthyForTooLong || cannotInitialize
}

if cfg.Store != nil && !disabled {
// If there's a persistent Store, we should provide a status monitor and inform Store that it's present.
Expand Down Expand Up @@ -158,16 +177,19 @@
return len(f.initializers) > 0 || f.primarySync != nil
}

func (f *FDv2) run(ctx context.Context, closeWhenReady chan struct{}) {

Check failure on line 180 in internal/datasystem/fdv2_datasystem.go

View workflow job for this annotation

GitHub Actions / Linux, Go 1.18 / Unit Tests and Coverage

unnecessary leading newline (whitespace)

Check failure on line 180 in internal/datasystem/fdv2_datasystem.go

View workflow job for this annotation

GitHub Actions / Linux, Go 1.22 / Unit Tests and Coverage

unnecessary leading newline (whitespace)

Check failure on line 180 in internal/datasystem/fdv2_datasystem.go

View workflow job for this annotation

GitHub Actions / ldotel Linux, Go 1.23

unnecessary leading newline (whitespace)

Check failure on line 180 in internal/datasystem/fdv2_datasystem.go

View workflow job for this annotation

GitHub Actions / Linux, Go 1.23 / Unit Tests and Coverage

unnecessary leading newline (whitespace)

Check failure on line 180 in internal/datasystem/fdv2_datasystem.go

View workflow job for this annotation

GitHub Actions / ldotel Linux, Go 1.22

unnecessary leading newline (whitespace)

Check failure on line 180 in internal/datasystem/fdv2_datasystem.go

View workflow job for this annotation

GitHub Actions / ldotel Linux, Go 1.18

unnecessary leading newline (whitespace)

Check failure on line 180 in internal/datasystem/fdv2_datasystem.go

View workflow job for this annotation

GitHub Actions / ldai Linux, Go 1.23

unnecessary leading newline (whitespace)

Check failure on line 180 in internal/datasystem/fdv2_datasystem.go

View workflow job for this annotation

GitHub Actions / ldai Linux, Go 1.22

unnecessary leading newline (whitespace)

Check failure on line 180 in internal/datasystem/fdv2_datasystem.go

View workflow job for this annotation

GitHub Actions / ldai Linux, Go 1.18

unnecessary leading newline (whitespace)
selector := f.runInitializers(ctx, closeWhenReady)

f.UpdateStatus(interfaces.DataSourceStateInitializing, interfaces.DataSourceErrorInfo{})

f.runInitializers(ctx, closeWhenReady)

if f.hasDataSources() && f.dataStoreStatusProvider.IsStatusMonitoringEnabled() {
f.launchTask(func() {
f.runPersistentStoreOutageRecovery(ctx, f.dataStoreStatusProvider.AddStatusListener())
})
}

f.runSynchronizers(ctx, closeWhenReady, selector)
f.runSynchronizers(ctx, closeWhenReady)
}

func (f *FDv2) runPersistentStoreOutageRecovery(ctx context.Context, statuses <-chan interfaces.DataStoreStatus) {
Expand All @@ -189,12 +211,12 @@
}
}

func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}) fdv2proto.Selector {
func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}) {
for _, initializer := range f.initializers {
f.loggers.Infof("Attempting to initialize via %s", initializer.Name())
basis, err := initializer.Fetch(ctx)
if errors.Is(err, context.Canceled) {
return fdv2proto.NoSelector()
return
}
if err != nil {
f.loggers.Warnf("Initializer %s failed: %v", initializer.Name(), err)
Expand All @@ -205,35 +227,97 @@
f.readyOnce.Do(func() {
close(closeWhenReady)
})
return basis.Selector
}
return fdv2proto.NoSelector()
}

func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{}, selector fdv2proto.Selector) {
// If the SDK was configured with no synchronizer, then (assuming no initializer succeeded), we should
// trigger the ready signal to let the call to MakeClient unblock immediately.
// The Go SDK is architected in such a way that a "closeWhenReady" channel is passed into the Data System by means
// of MakeClient. MakeClient is able to block the user until the client reaches a terminal state in regard to
// its initialization status - either initialized with data, or run out of time initializing, or some fatal error.
//
// By closing this channel, the SDK's data sources indicate that MakeClient should unblock.
//
// In the FDv2 world, we have the possibility that a synchronizer fails or we fall back to a secondary synchronizer.
// Perhaps we've already closed the channel, and now a new synchronizer is attempting to do the same.
//
// In that case, we need to guarantee that the channel is closed only once. To do this, we "wrap" channel that is passed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// In that case, we need to guarantee that the channel is closed only once. To do this, we "wrap" channel that is passed
// In that case, we need to guarantee that the channel is closed only once. To do this, we "wrap" the channel that is passed

// directly into the synchronizer with another channel. The wrapper is responsible for actually closing the underlying
// closeWhenReady, and it uses a sync.Once to ensure that happens only once.
//
// This design could be improved significantly. Some ideas:
// 1) Refactor the SDK to listen to Data Source status rather than the special closeWhenReady channel. It's unclear why
// this isn't currently the case, but there may be good reasons.
// 2) Use callbacks. Somewhat equivalent to (1), but instead of needing to wrap the channel, we just proxy directly to
// calling the sync.Once.
// 3) Make the channel multi-use. Instead of the contract being "close when ready", have it be "send a value when ready".
func (f *FDv2) closeChannelWrapper(ctx context.Context, closeWhenReady chan struct{}) chan struct{} {
ch := make(chan struct{})
go func() {
select {
case <-ctx.Done():
return
case <-ch:
f.readyOnce.Do(func() {
close(closeWhenReady)
})
}
}()
return ch
}

func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{}) {
// If the SDK was configured with no synchronizer, then (assuming no initializer succeeded, which would have
// already closed the channel), we should close it now so that MakeClient unblocks.
if f.primarySync == nil {
f.readyOnce.Do(func() {
close(closeWhenReady)
})
return
}

// We can't pass closeWhenReady to the data source, because it might have already been closed.
// Instead, create a "proxy" channel just for the data source; if that is closed, we close the real one
// using the sync.Once.
ready := make(chan struct{})
f.primarySync.Sync(ready, selector)
f.launchTask(func() {
for {
f.loggers.Debugf("Primary synchronizer %s is starting", f.primarySync.Name())
f.primarySync.Sync(f.closeChannelWrapper(ctx, closeWhenReady), f.store.Selector())
if err := f.evaluateCond(ctx, f.fallbackCond); errors.Is(err, context.Canceled) {
return
}
if err := f.primarySync.Close(); err != nil {
f.loggers.Errorf("Primary synchronizer %s failed to gracefully close: %v", f.primarySync.Name(), err)
}
f.loggers.Debugf("Fallback condition met")
f.loggers.Debugf("Secondary synchronizer %s is starting", f.secondarySync.Name())

f.UpdateStatus(interfaces.DataSourceStateInterrupted, interfaces.DataSourceErrorInfo{})

f.secondarySync.Sync(f.closeChannelWrapper(ctx, closeWhenReady), f.store.Selector())
if err := f.evaluateCond(ctx, f.recoveryCond); errors.Is(err, context.Canceled) {
return
}
if err := f.secondarySync.Close(); err != nil {
f.loggers.Errorf("Secondary synchronizer %s failed to gracefully close: %v", f.secondarySync.Name(), err)
}
f.loggers.Debugf("Recovery condition met")
select {
case <-ctx.Done():
return
default:
}
}
})
}

func (f *FDv2) evaluateCond(ctx context.Context, cond func() bool) error {
ticker := time.NewTicker(10 * time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hard coded 10 second timer is what limits the resolution of the fallback / recovery conditions right?

Copy link
Contributor Author

@cwaldren-ld cwaldren-ld Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I could see an alternative of making this event-triggered, where we'd have "timer event" and "data source status event" (and anything else that can be used as a condition.)

But then we'd need to hold a map of timers, hook into the data source status broadcasters.. it just doesn't seem worth the complexity when compared to a predictable "tick" that polls whatever data is needed.

defer ticker.Stop()
for {
select {
case <-ready:
f.readyOnce.Do(func() {
close(closeWhenReady)
})
case <-ctx.Done():
return
return ctx.Err()
case <-ticker.C:
if cond() {
return nil
}
f.loggers.Debugf("Condition check succeeded, continue with current synchronizer")
}
}
}
Expand Down Expand Up @@ -302,13 +386,17 @@
}

//nolint:revive // DataSourceStatusReporter method.
func (f *FDv2) UpdateStatus(status interfaces.DataSourceState, err interfaces.DataSourceErrorInfo) {
func (f *FDv2) UpdateStatus(state interfaces.DataSourceState, err interfaces.DataSourceErrorInfo) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function equivalent in fdv1 is here: https://github.com/launchdarkly/go-server-sdk/blob/v7/internal/datasource/data_source_update_sink_impl.go#L157

The minimal implementation I wrote here is for demo purposes. We may need to adopt the other one to be backwards compatible.

f.mu.Lock()
defer f.mu.Unlock()
f.status = interfaces.DataSourceStatus{
State: status,
LastError: err,
StateSince: time.Now(),

if state != f.status.State {
f.status.State = state
f.status.StateSince = time.Now()
}

if err != f.status.LastError {
f.status.LastError = err
}
}

Expand Down
Loading