Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Beemer <[email protected]>
  • Loading branch information
beeme1mr committed Jan 7, 2025
1 parent 36ec82a commit e6653d9
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 94 deletions.
8 changes: 5 additions & 3 deletions libs/providers/flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ In the above example, the provider expects flagd to be available at `localhost:8

Alternatively, you can use socket paths to connect to flagd.

```
```ts
OpenFeature.setProvider(new FlagdProvider({
socketPath: "/tmp/flagd.socks",
}))
Expand All @@ -72,7 +72,7 @@ Alternatively, you can use socket paths to connect to flagd.
This mode performs flag evaluations locally (in-process).
Flag configurations for evaluation are obtained via gRPC protocol using [sync protobuf schema](https://buf.build/open-feature/flagd/file/main:sync/v1/sync_service.proto) service definition.

```
```ts
OpenFeature.setProvider(new FlagdProvider({
resolverType: 'in-process',
}))
Expand All @@ -83,7 +83,7 @@ In the above example, the provider expects a flag sync service implementation to
In-process resolver can also work in an offline mode.
To enable this mode, you should provide a valid flag configuration file with the option `offlineFlagSourcePath`.

```
```ts
OpenFeature.setProvider(new FlagdProvider({
resolverType: 'in-process',
offlineFlagSourcePath: './flags.json',
Expand All @@ -107,6 +107,8 @@ For general information on events, see the [official documentation](https://open

### Flag Metadata

TODO: Add message about flag metadata and how it's merged

| Field | Type | Value |
| ------- | ------ | ------------------------------------------------- |
| `scope` | string | "selector" set for the associated source in flagd |
Expand Down
3 changes: 2 additions & 1 deletion libs/providers/flagd/package.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
{
"name": "@openfeature/flagd-provider",
"version": "0.13.1",
"license": "Apache-2.0",
"scripts": {
"publish-if-not-exists": "cp $NPM_CONFIG_USERCONFIG .npmrc && if [ \"$(npm show $npm_package_name@$npm_package_version version)\" = \"$(npm run current-version -s)\" ]; then echo 'already published, skipping'; else npm publish --access public; fi",
"current-version": "echo $npm_package_version"
},
"peerDependencies": {
"@grpc/grpc-js": "~1.8.0 || ~1.9.0 || ~1.10.0 || ~1.11.0 || ~1.12.0",
"@openfeature/server-sdk": "^1.13.0"
"@openfeature/server-sdk": "^1.17.0"
}
}
2 changes: 0 additions & 2 deletions libs/providers/flagd/src/lib/constants.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
export const BASE_EVENT_STREAM_RETRY_BACKOFF_MS = 1000;
export const DEFAULT_MAX_EVENT_STREAM_RETRIES = Infinity;
export const EVENT_CONFIGURATION_CHANGE = 'configuration_change';
export const EVENT_PROVIDER_READY = 'provider_ready';
export const DEFAULT_MAX_CACHE_SIZE = 1000;
28 changes: 13 additions & 15 deletions libs/providers/flagd/src/lib/flagd-provider.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
FlagMetadata,
OpenFeature,
ProviderEvents,
ProviderStatus,
StandardResolutionReasons,
} from '@openfeature/server-sdk';
import type { UnaryCall } from '@protobuf-ts/runtime-rpc';
Expand All @@ -25,7 +24,6 @@ import {
ResolveStringResponse,
ServiceClient,
} from '../proto/ts/flagd/evaluation/v1/evaluation';
import { EVENT_CONFIGURATION_CHANGE, EVENT_PROVIDER_READY } from './constants';
import { FlagdProvider } from './flagd-provider';
import { FlagChangeMessage, GRPCService } from './service/grpc/grpc-service';
import { ConnectivityState } from '@grpc/grpc-js/build/src/connectivity-state';
Expand Down Expand Up @@ -77,7 +75,7 @@ describe(FlagdProvider.name, () => {
return {
on: jest.fn((event: string, callback: (message: unknown) => void) => {
if (event === 'data') {
callback({ type: EVENT_PROVIDER_READY });
callback({ type: ProviderEvents.Ready });
}
}),
cancel: jest.fn(),
Expand Down Expand Up @@ -245,8 +243,8 @@ describe(FlagdProvider.name, () => {
});

describe('streaming', () => {
const STATIC_BOOLEAN_KEY_1 = 'staticBoolflagOne';
const STATIC_BOOLEAN_KEY_2 = 'staticBoolflagTwo';
const STATIC_BOOLEAN_KEY_1 = 'staticBoolFlagOne';
const STATIC_BOOLEAN_KEY_2 = 'staticBoolFlagTwo';
const TARGETING_MATCH_BOOLEAN_KEY = 'targetingMatchBooleanKey';

// ref to callback to fire to fake error messages to flagd
Expand Down Expand Up @@ -319,7 +317,7 @@ describe(FlagdProvider.name, () => {
}
});
// fire message saying provider is ready
registeredOnMessageCallback({ type: EVENT_PROVIDER_READY, data: {} });
registeredOnMessageCallback({ type: ProviderEvents.Ready, data: {} });
});
});

Expand All @@ -336,7 +334,7 @@ describe(FlagdProvider.name, () => {
),
);
// fire message saying provider is ready
registeredOnMessageCallback({ type: EVENT_PROVIDER_READY, data: {} });
registeredOnMessageCallback({ type: ProviderEvents.Ready, data: {} });
client = OpenFeature.getClient('change events test');
});

Expand Down Expand Up @@ -368,7 +366,7 @@ describe(FlagdProvider.name, () => {
};

// mock change event from flagd
registeredOnMessageCallback({ type: EVENT_CONFIGURATION_CHANGE, data });
registeredOnMessageCallback({ type: ProviderEvents.ConfigurationChanged, data });
});
});

Expand All @@ -385,7 +383,7 @@ describe(FlagdProvider.name, () => {
),
);
// fire message saying provider is ready
registeredOnMessageCallback({ type: EVENT_PROVIDER_READY, data: {} });
registeredOnMessageCallback({ type: ProviderEvents.Ready, data: {} });
client = OpenFeature.getClient('streaming test');
});

Expand Down Expand Up @@ -417,7 +415,7 @@ describe(FlagdProvider.name, () => {
),
);
// fire message saying provider is ready
registeredOnMessageCallback({ type: EVENT_PROVIDER_READY, data: {} });
registeredOnMessageCallback({ type: ProviderEvents.Ready, data: {} });
client = OpenFeature.getClient('cache invalidation');
});

Expand Down Expand Up @@ -448,7 +446,7 @@ describe(FlagdProvider.name, () => {
},
};
registeredOnMessageCallback({
type: EVENT_CONFIGURATION_CHANGE,
type: ProviderEvents.ConfigurationChanged,
data: message,
});

Expand Down Expand Up @@ -481,7 +479,7 @@ describe(FlagdProvider.name, () => {
},
};
registeredOnMessageCallback({
type: EVENT_CONFIGURATION_CHANGE,
type: ProviderEvents.ConfigurationChanged,
data: message,
});

Expand Down Expand Up @@ -512,7 +510,7 @@ describe(FlagdProvider.name, () => {
registeredOnErrorCallback();

// status should be ERROR
expect(provider.status).toEqual(ProviderStatus.ERROR);
// expect(provider.status).toEqual(ProviderStatus.ERROR);
expect(streamingServiceClientMock.getChannel().getConnectivityState).toHaveBeenCalledWith(true);
expect(streamingServiceClientMock.getChannel().watchConnectivityState).toHaveBeenCalled();
});
Expand All @@ -529,7 +527,7 @@ describe(FlagdProvider.name, () => {
return {
on: jest.fn((event: string, callback: (message: unknown) => void) => {
if (event === 'data') {
callback({ type: EVENT_PROVIDER_READY });
callback({ type: ProviderEvents.Ready });
}
}),
cancel: jest.fn(),
Expand Down Expand Up @@ -652,7 +650,7 @@ describe(FlagdProvider.name, () => {
return {
on: jest.fn((event: string, callback: (message: unknown) => void) => {
if (event === 'data') {
callback({ type: EVENT_PROVIDER_READY });
callback({ type: ProviderEvents.Ready });
}
}),
cancel: cancelMock,
Expand Down
70 changes: 22 additions & 48 deletions libs/providers/flagd/src/lib/flagd-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
OpenFeatureEventEmitter,
Provider,
ProviderEvents,
ProviderStatus,
ResolutionDetails,
} from '@openfeature/server-sdk';
import { FlagdProviderOptions, getConfig } from './configuration';
Expand All @@ -15,22 +14,13 @@ import { InProcessService } from './service/in-process/in-process-service';

export class FlagdProvider implements Provider {
metadata = {
name: 'flagd Provider',
name: 'flagd',
};

readonly runsOn = 'server';

get status() {
return this._status;
}

get events() {
return this._events;
}
readonly events = new OpenFeatureEventEmitter();

private readonly _service: Service;
private _status = ProviderStatus.NOT_READY;
private _events = new OpenFeatureEventEmitter();

/**
* Construct a new flagd provider.
Expand All @@ -53,19 +43,19 @@ export class FlagdProvider implements Provider {
: new GRPCService(config, undefined, logger);
}

initialize(): Promise<void> {
return this._service
.connect(this.handleReconnect.bind(this), this.handleChanged.bind(this), this.handleError.bind(this))
.then(() => {
this.logger?.debug(`${this.metadata.name}: ready`);
this._status = ProviderStatus.READY;
})
.catch((err) => {
this._status = ProviderStatus.ERROR;
this.logger?.error(`${this.metadata.name}: error during initialization: ${err.message}`);
this.logger?.debug(err);
throw err;
});
async initialize(): Promise<void> {
try {
await this._service.connect(
this.handleReconnect.bind(this),
this.handleChanged.bind(this),
this.handleError.bind(this),
);
this.logger?.debug(`${this.metadata.name}: ready`);
} catch (err) {
this.logger?.error(`${this.metadata.name}: error during initialization: ${(err as Error)?.message}`);
this.logger?.debug(err);
throw err;
}
}

onClose(): Promise<void> {
Expand All @@ -79,9 +69,7 @@ export class FlagdProvider implements Provider {
transformedContext: EvaluationContext,
logger: Logger,
): Promise<ResolutionDetails<boolean>> {
return this._service
.resolveBoolean(flagKey, defaultValue, transformedContext, logger)
.catch((err) => this.logRejected(err, flagKey, logger));
return this._service.resolveBoolean(flagKey, defaultValue, transformedContext, logger);
}

resolveStringEvaluation(
Expand All @@ -90,9 +78,7 @@ export class FlagdProvider implements Provider {
transformedContext: EvaluationContext,
logger: Logger,
): Promise<ResolutionDetails<string>> {
return this._service
.resolveString(flagKey, defaultValue, transformedContext, logger)
.catch((err) => this.logRejected(err, flagKey, logger));
return this._service.resolveString(flagKey, defaultValue, transformedContext, logger);
}

resolveNumberEvaluation(
Expand All @@ -101,9 +87,7 @@ export class FlagdProvider implements Provider {
transformedContext: EvaluationContext,
logger: Logger,
): Promise<ResolutionDetails<number>> {
return this._service
.resolveNumber(flagKey, defaultValue, transformedContext, logger)
.catch((err) => this.logRejected(err, flagKey, logger));
return this._service.resolveNumber(flagKey, defaultValue, transformedContext, logger);
}

resolveObjectEvaluation<T extends JsonValue>(
Expand All @@ -112,28 +96,18 @@ export class FlagdProvider implements Provider {
transformedContext: EvaluationContext,
logger: Logger,
): Promise<ResolutionDetails<T>> {
return this._service
.resolveObject<T>(flagKey, defaultValue, transformedContext, logger)
.catch((err) => this.logRejected(err, flagKey, logger));
return this._service.resolveObject<T>(flagKey, defaultValue, transformedContext, logger);
}

logRejected = (err: Error, flagKey: string, logger: Logger) => {
logger.error(`Error resolving flag ${flagKey}: ${err?.message}`);
logger.error(err?.stack);
throw err;
};

private handleReconnect(): void {
this._status = ProviderStatus.READY;
this._events.emit(ProviderEvents.Ready);
this.events.emit(ProviderEvents.Ready);
}

private handleError(message: string): void {
this._status = ProviderStatus.ERROR;
this._events.emit(ProviderEvents.Error, { message });
this.events.emit(ProviderEvents.Error, { message });
}

private handleChanged(flagsChanged: string[]): void {
this._events.emit(ProviderEvents.ConfigurationChanged, { flagsChanged });
this.events.emit(ProviderEvents.ConfigurationChanged, { flagsChanged });
}
}
7 changes: 4 additions & 3 deletions libs/providers/flagd/src/lib/service/grpc/grpc-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
JsonValue,
Logger,
ParseError,
ProviderEvents,
ResolutionDetails,
StandardResolutionReasons,
TypeMismatchError,
Expand All @@ -29,7 +30,7 @@ import {
ServiceClient,
} from '../../../proto/ts/flagd/evaluation/v1/evaluation';
import { Config } from '../../configuration';
import { DEFAULT_MAX_CACHE_SIZE, EVENT_CONFIGURATION_CHANGE, EVENT_PROVIDER_READY } from '../../constants';
import { DEFAULT_MAX_CACHE_SIZE } from '../../constants';
import { FlagdProvider } from '../../flagd-provider';
import { Service } from '../service';
import { closeStreamIfDefined } from '../common';
Expand Down Expand Up @@ -163,15 +164,15 @@ export class GRPCService implements Service {
this.handleError(reconnectCallback, changedCallback, disconnectCallback);
});
stream.on('data', (message) => {
if (message.type === EVENT_PROVIDER_READY) {
if (message.type === ProviderEvents.Ready) {
this.logger?.debug(`${FlagdProvider.name}: streaming connection established with flagd`);
// if resolveConnect is undefined, this is a reconnection; we only want to fire the reconnect callback in that case
if (resolveConnect) {
resolveConnect();
} else {
reconnectCallback();
}
} else if (message.type === EVENT_CONFIGURATION_CHANGE) {
} else if (message.type === ProviderEvents.ConfigurationChanged) {
this.handleFlagsChanged(message, changedCallback);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,16 @@ export class GrpcFetch implements DataFetch {
this._request = { providerId: '', selector: selector ? selector : '' };
}

connect(
async connect(
dataCallback: (flags: string) => string[],
reconnectCallback: () => void,
changedCallback: (flagsChanged: string[]) => void,
disconnectCallback: (message: string) => void,
): Promise<void> {
return new Promise<void>((resolve, reject) =>
await new Promise<void>((resolve, reject) =>
this.listen(dataCallback, reconnectCallback, changedCallback, disconnectCallback, resolve, reject),
).then(() => {
this._initialized = true;
});
);
this._initialized = true;
}

async disconnect() {
Expand Down
Loading

0 comments on commit e6653d9

Please sign in to comment.