Skip to content

Commit

Permalink
fix: Make transformation_service_endpoint configuration optional (#4880)
Browse files Browse the repository at this point in the history
* Make transformation_service_endpoint configuration optional

Signed-off-by: Dimitris Stafylarakis <[email protected]>

* Add custom error for transformation service, implement featurestore unit tests

Signed-off-by: Dimitris Stafylarakis <[email protected]>

---------

Signed-off-by: Dimitris Stafylarakis <[email protected]>
  • Loading branch information
xaniasd authored Jan 3, 2025
1 parent e04d7d5 commit c62377b
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 93 deletions.
22 changes: 22 additions & 0 deletions go/internal/feast/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package feast

import (
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type FeastTransformationServiceNotConfigured struct{}

func (FeastTransformationServiceNotConfigured) GRPCStatus() *status.Status {
errorStatus := status.New(codes.Internal, "No transformation service configured")
ds, err := errorStatus.WithDetails(&errdetails.LocalizedMessage{Message: "No transformation service configured, required for on-demand feature transformations"})
if err != nil {
return errorStatus
}
return ds
}

func (e FeastTransformationServiceNotConfigured) Error() string {
return e.GRPCStatus().Err().Error()
}
22 changes: 12 additions & 10 deletions go/internal/feast/featurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package feast
import (
"context"
"errors"
"fmt"

"github.com/apache/arrow/go/v17/arrow/memory"

//"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

"github.com/feast-dev/feast/go/internal/feast/model"
Expand Down Expand Up @@ -60,17 +61,14 @@ func NewFeatureStore(config *registry.RepoConfig, callback transformation.Transf
return nil, err
}

// Use a scalable transformation service like Python Transformation Service.
// Assume the user will define the "transformation_service_endpoint" in the feature_store.yaml file
// under the "feature_server" section.
transformationServerEndpoint, ok := config.FeatureServer["transformation_service_endpoint"]
if !ok {
fmt.Println("Errors while reading transformation_service_endpoint info")
panic("No transformation service endpoint provided in the feature_store.yaml file.")
var transformationService *transformation.GrpcTransformationService
if transformationServerEndpoint, ok := config.FeatureServer["transformation_service_endpoint"]; ok {
// Use a scalable transformation service like Python Transformation Service.
// Assume the user will define the "transformation_service_endpoint" in the feature_store.yaml file
// under the "feature_server" section.
transformationService, _ = transformation.NewGrpcTransformationService(config, transformationServerEndpoint.(string))
}

transformationService, _ := transformation.NewGrpcTransformationService(config, transformationServerEndpoint.(string))

return &FeatureStore{
config: config,
registry: registry,
Expand Down Expand Up @@ -112,6 +110,10 @@ func (fs *FeatureStore) GetOnlineFeatures(
return nil, err
}

if len(requestedOnDemandFeatureViews) > 0 && fs.transformationService == nil {
return nil, FeastTransformationServiceNotConfigured{}
}

entityNameToJoinKeyMap, expectedJoinKeysSet, err := onlineserving.GetEntityMaps(requestedFeatureViews, entities)
if err != nil {
return nil, err
Expand Down
277 changes: 197 additions & 80 deletions go/internal/feast/featurestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,124 +2,241 @@ package feast

import (
"context"
"log"
"os"
"path/filepath"
"runtime"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/feast-dev/feast/go/internal/feast/onlinestore"
"github.com/feast-dev/feast/go/internal/feast/registry"
"github.com/feast-dev/feast/go/internal/test"
"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
)

// Return absolute path to the test_repo registry regardless of the working directory
func getRegistryPath() map[string]interface{} {
var featureRepoBasePath string
var featureRepoRegistryFile string

func TestMain(m *testing.M) {
// Get the file path of this source file, regardless of the working directory
_, filename, _, ok := runtime.Caller(0)
if !ok {
panic("couldn't find file path of the test file")
log.Print("couldn't find file path of the test file")
os.Exit(1)
}
registry := map[string]interface{}{
"path": filepath.Join(filename, "..", "..", "..", "feature_repo/data/registry.db"),
featureRepoBasePath = filepath.Join(filename, "..", "..", "test")
featureRepoRegistryFile = filepath.Join(featureRepoBasePath, "feature_repo", "data", "registry.db")
if err := test.SetupInitializedRepo(featureRepoBasePath); err != nil {
log.Print("Could not initialize test repo: ", err)
os.Exit(1)
}
return registry
os.Exit(m.Run())
}

func TestNewFeatureStore(t *testing.T) {
t.Skip("@todo(achals): feature_repo isn't checked in yet")
config := registry.RepoConfig{
Project: "feature_repo",
Registry: getRegistryPath(),
Provider: "local",
OnlineStore: map[string]interface{}{
"type": "redis",
},
}
fs, err := NewFeatureStore(&config, nil)
assert.Nil(t, err)
assert.IsType(t, &onlinestore.RedisOnlineStore{}, fs.onlineStore)

t.Run("valid config", func(t *testing.T) {
config := &registry.RepoConfig{
Project: "feature_repo",
Registry: getRegistryPath(),
Provider: "local",
OnlineStore: map[string]interface{}{
"type": "redis",
tests := []struct {
name string
config *registry.RepoConfig
expectOnlineStoreType interface{}
errMessage string
}{
{
name: "valid config",
config: &registry.RepoConfig{
Project: "feature_repo",
Registry: map[string]interface{}{
"path": featureRepoRegistryFile,
},
Provider: "local",
OnlineStore: map[string]interface{}{
"type": "redis",
},
},
FeatureServer: map[string]interface{}{
"transformation_service_endpoint": "localhost:50051",
expectOnlineStoreType: &onlinestore.RedisOnlineStore{},
},
{
name: "valid config with transformation service endpoint",
config: &registry.RepoConfig{
Project: "feature_repo",
Registry: map[string]interface{}{
"path": featureRepoRegistryFile,
},
Provider: "local",
OnlineStore: map[string]interface{}{
"type": "redis",
},
FeatureServer: map[string]interface{}{
"transformation_service_endpoint": "localhost:50051",
},
},
}
fs, err := NewFeatureStore(config, nil)
assert.Nil(t, err)
assert.NotNil(t, fs)
assert.IsType(t, &onlinestore.RedisOnlineStore{}, fs.onlineStore)
assert.NotNil(t, fs.transformationService)
})

t.Run("missing transformation service endpoint", func(t *testing.T) {
config := &registry.RepoConfig{
Project: "feature_repo",
Registry: getRegistryPath(),
Provider: "local",
OnlineStore: map[string]interface{}{
"type": "redis",
expectOnlineStoreType: &onlinestore.RedisOnlineStore{},
},
{
name: "invalid online store config",
config: &registry.RepoConfig{
Project: "feature_repo",
Registry: map[string]interface{}{
"path": featureRepoRegistryFile,
},
Provider: "local",
OnlineStore: map[string]interface{}{
"type": "invalid_store",
},
},
}
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
errMessage: "invalid_store online store type is currently not supported",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := NewFeatureStore(test.config, nil)
if test.errMessage != "" {
assert.Nil(t, got)
require.Error(t, err)
assert.ErrorContains(t, err, test.errMessage)

} else {
require.NoError(t, err)
assert.NotNil(t, got)
assert.IsType(t, test.expectOnlineStoreType, got.onlineStore)
}
}()
NewFeatureStore(config, nil)
})

t.Run("invalid online store config", func(t *testing.T) {
config := &registry.RepoConfig{
Project: "feature_repo",
Registry: getRegistryPath(),
Provider: "local",
OnlineStore: map[string]interface{}{
"type": "invalid_store",
})
}

}

type MockRedis struct {
mock.Mock
}

func (m *MockRedis) Destruct() {}
func (m *MockRedis) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]onlinestore.FeatureData, error) {
args := m.Called(ctx, entityKeys, featureViewNames, featureNames)
var fd [][]onlinestore.FeatureData
if args.Get(0) != nil {
fd = args.Get(0).([][]onlinestore.FeatureData)
}
return fd, args.Error(1)
}

func TestGetOnlineFeatures(t *testing.T) {
tests := []struct {
name string
config *registry.RepoConfig
fn func(*testing.T, *FeatureStore)
}{
{
name: "redis with simple features",
config: &registry.RepoConfig{
Project: "feature_repo",
Registry: map[string]interface{}{
"path": featureRepoRegistryFile,
},
Provider: "local",
OnlineStore: map[string]interface{}{
"type": "redis",
"connection_string": "localhost:6379",
},
},
FeatureServer: map[string]interface{}{
"transformation_service_endpoint": "localhost:50051",
fn: testRedisSimpleFeatures,
},
{
name: "redis with On-demand feature views, no transformation service endpoint",
config: &registry.RepoConfig{
Project: "feature_repo",
Registry: map[string]interface{}{
"path": featureRepoRegistryFile,
},
Provider: "local",
OnlineStore: map[string]interface{}{
"type": "redis",
"connection_string": "localhost:6379",
},
},
}
fs, err := NewFeatureStore(config, nil)
assert.NotNil(t, err)
assert.Nil(t, fs)
})
fn: testRedisODFVNoTransformationService,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {

fs, err := NewFeatureStore(test.config, nil)
require.Nil(t, err)
fs.onlineStore = new(MockRedis)
test.fn(t, fs)
})

}
}

func TestGetOnlineFeaturesRedis(t *testing.T) {
t.Skip("@todo(achals): feature_repo isn't checked in yet")
config := registry.RepoConfig{
Project: "feature_repo",
Registry: getRegistryPath(),
Provider: "local",
OnlineStore: map[string]interface{}{
"type": "redis",
"connection_string": "localhost:6379",
func testRedisSimpleFeatures(t *testing.T, fs *FeatureStore) {

featureNames := []string{"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
}
entities := map[string]*types.RepeatedValue{"driver_id": {Val: []*types.Value{{Val: &types.Value_Int64Val{Int64Val: 1001}},
{Val: &types.Value_Int64Val{Int64Val: 1002}},
}}}

results := [][]onlinestore.FeatureData{
{
{
Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "conv_rate"},
Value: types.Value{Val: &types.Value_FloatVal{FloatVal: 12.0}},
},
{
Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "acc_rate"},
Value: types.Value{Val: &types.Value_FloatVal{FloatVal: 1.0}},
},
{
Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "avg_daily_trips"},
Value: types.Value{Val: &types.Value_Int64Val{Int64Val: 100}},
},
},
{

{
Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "conv_rate"},
Value: types.Value{Val: &types.Value_FloatVal{FloatVal: 24.0}},
},
{
Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "acc_rate"},
Value: types.Value{Val: &types.Value_FloatVal{FloatVal: 2.0}},
},
{
Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "avg_daily_trips"},
Value: types.Value{Val: &types.Value_Int64Val{Int64Val: 130}},
},
},
}
ctx := context.Background()
mr := fs.onlineStore.(*MockRedis)
mr.On("OnlineRead", ctx, mock.Anything, mock.Anything, mock.Anything).Return(results, nil)
response, err := fs.GetOnlineFeatures(ctx, featureNames, nil, entities, map[string]*types.RepeatedValue{}, true)
require.Nil(t, err)
assert.Len(t, response, 4) // 3 Features + 1 entity = 4 columns (feature vectors) in response
}

func testRedisODFVNoTransformationService(t *testing.T, fs *FeatureStore) {
featureNames := []string{"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
"transformed_conv_rate:conv_rate_plus_val1",
}
entities := map[string]*types.RepeatedValue{"driver_id": {Val: []*types.Value{{Val: &types.Value_Int64Val{Int64Val: 1001}},
{Val: &types.Value_Int64Val{Int64Val: 1002}},
{Val: &types.Value_Int64Val{Int64Val: 1003}}}},
}

fs, err := NewFeatureStore(&config, nil)
assert.Nil(t, err)
ctx := context.Background()
response, err := fs.GetOnlineFeatures(
ctx, featureNames, nil, entities, map[string]*types.RepeatedValue{}, true)
assert.Nil(t, err)
assert.Len(t, response, 4) // 3 Features + 1 entity = 4 columns (feature vectors) in response
mr := fs.onlineStore.(*MockRedis)
mr.On("OnlineRead", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
response, err := fs.GetOnlineFeatures(ctx, featureNames, nil, entities, map[string]*types.RepeatedValue{}, true)
assert.Nil(t, response)
assert.ErrorAs(t, err, &FeastTransformationServiceNotConfigured{})

}
Loading

0 comments on commit c62377b

Please sign in to comment.