Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into hostrewritesgogo
Browse files Browse the repository at this point in the history
  • Loading branch information
mathetake committed Jan 21, 2025
2 parents efe5260 + 36d2d76 commit 5070454
Show file tree
Hide file tree
Showing 6 changed files with 401 additions and 250 deletions.
4 changes: 2 additions & 2 deletions tests/extproc/custom_extproc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"testing"
"time"

openai "github.com/openai/openai-go"
"github.com/openai/openai-go"
"github.com/openai/openai-go/option"
"github.com/stretchr/testify/require"

Expand All @@ -21,7 +21,7 @@ import (
// TestExtProcCustomRouter tests examples/extproc_custom_router.
func TestExtProcCustomRouter(t *testing.T) {
requireBinaries(t)
requireRunEnvoy(t, "/dev/null", "dummy")
requireRunEnvoy(t, "/dev/null")
requireTestUpstream(t)
configPath := t.TempDir() + "/extproc-config.yaml"
requireWriteExtProcConfig(t, configPath, &filterconfig.Config{
Expand Down
221 changes: 16 additions & 205 deletions tests/extproc/extproc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@
package extproc

import (
"bufio"
"bytes"
"context"
_ "embed"
"encoding/json"
"fmt"
"io"
"os"
Expand All @@ -16,10 +12,7 @@ import (
"strconv"
"strings"
"testing"
"time"

openai "github.com/openai/openai-go"
"github.com/openai/openai-go/option"
"github.com/stretchr/testify/require"
"sigs.k8s.io/yaml"

Expand All @@ -36,178 +29,6 @@ var (
awsBedrockSchema = filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaAWSBedrock}
)

// TestE2E tests the end-to-end flow of the external processor with Envoy.
//
// This requires the following environment variables to be set:
// - TEST_AWS_ACCESS_KEY_ID
// - TEST_AWS_SECRET_ACCESS_KEY
// - TEST_OPENAI_API_KEY
//
// The test will be skipped if any of these are not set.
func TestE2E(t *testing.T) {
requireBinaries(t)
accessLogPath := t.TempDir() + "/access.log"
openAIAPIKey := getEnvVarOrSkip(t, "TEST_OPENAI_API_KEY")
requireRunEnvoy(t, accessLogPath, openAIAPIKey)
configPath := t.TempDir() + "/extproc-config.yaml"

// Test with APIKey.
apiKeyFilePath := t.TempDir() + "/open-ai-api-key"
file, err := os.Create(apiKeyFilePath)
require.NoError(t, err)
defer func() { require.NoError(t, file.Close()) }()
_, err = file.WriteString(openAIAPIKey)
require.NoError(t, err)
require.NoError(t, file.Sync())

requireWriteExtProcConfig(t, configPath, &filterconfig.Config{
MetadataNamespace: "ai_gateway_llm_ns",
LLMRequestCosts: []filterconfig.LLMRequestCost{
{MetadataKey: "used_token", Type: filterconfig.LLMRequestCostTypeInputToken},
},
Schema: openAISchema,
// This can be any header key, but it must match the envoy.yaml routing configuration.
SelectedBackendHeaderKey: "x-selected-backend-name",
ModelNameHeaderKey: "x-model-name",
Rules: []filterconfig.RouteRule{
{
Backends: []filterconfig.Backend{{Name: "openai", Schema: openAISchema, Auth: &filterconfig.BackendAuth{
APIKey: &filterconfig.APIKeyAuth{Filename: apiKeyFilePath},
}}},
Headers: []filterconfig.HeaderMatch{{Name: "x-model-name", Value: "gpt-4o-mini"}},
},
{
Backends: []filterconfig.Backend{
{Name: "aws-bedrock", Schema: awsBedrockSchema, Auth: &filterconfig.BackendAuth{AWSAuth: &filterconfig.AWSAuth{}}},
},
Headers: []filterconfig.HeaderMatch{{Name: "x-model-name", Value: "us.meta.llama3-2-1b-instruct-v1:0"}},
},
},
})

requireExtProcWithAWSCredentials(t, configPath)

t.Run("health-checking", func(t *testing.T) {
client := openai.NewClient(option.WithBaseURL(listenerAddress + "/v1/"))
for _, tc := range []struct {
testCaseName,
modelName string
}{
{testCaseName: "openai", modelName: "gpt-4o-mini"}, // This will go to "openai"
{testCaseName: "aws-bedrock", modelName: "us.meta.llama3-2-1b-instruct-v1:0"}, // This will go to "aws-bedrock".
} {
t.Run(tc.modelName, func(t *testing.T) {
require.Eventually(t, func() bool {
chatCompletion, err := client.Chat.Completions.New(context.Background(), openai.ChatCompletionNewParams{
Messages: openai.F([]openai.ChatCompletionMessageParamUnion{
openai.UserMessage("Say this is a test"),
}),
Model: openai.F(tc.modelName),
})
if err != nil {
t.Logf("error: %v", err)
return false
}
nonEmptyCompletion := false
for _, choice := range chatCompletion.Choices {
t.Logf("choice: %s", choice.Message.Content)
if choice.Message.Content != "" {
nonEmptyCompletion = true
}
}
return nonEmptyCompletion
}, 10*time.Second, 1*time.Second)
})
}
})

// Read all access logs and check if the used token is logged.
// If the used token is set correctly in the metadata, it should be logged in the access log.
t.Run("check-used-token-metadata-access-log", func(t *testing.T) {
// Since the access log might not be written immediately, we wait for the log to be written.
require.Eventually(t, func() bool {
accessLog, err := os.ReadFile(accessLogPath)
require.NoError(t, err)
// This should match the format of the access log in envoy.yaml.
type lineFormat struct {
UsedToken any `json:"used_token"`
}
scanner := bufio.NewScanner(bytes.NewReader(accessLog))
for scanner.Scan() {
line := scanner.Bytes()
var l lineFormat
if err = json.Unmarshal(line, &l); err != nil {
t.Logf("error unmarshalling line: %v", err)
continue
}
t.Logf("line: %s", line)
// The access formatter somehow changed its behavior sometimes between 1.31 and the latest Envoy,
// so we need to check for both float64 and string.
if num, ok := l.UsedToken.(float64); ok && num > 0 {
return true
} else if str, ok := l.UsedToken.(string); ok {
if num, err := strconv.Atoi(str); err == nil && num > 0 {
return true
}
}
t.Log("cannot find used token in line")
}
return false
}, 10*time.Second, 1*time.Second)
})

t.Run("streaming", func(t *testing.T) {
client := openai.NewClient(option.WithBaseURL(listenerAddress + "/v1/"))
for _, tc := range []struct {
testCaseName,
modelName string
}{
{testCaseName: "openai", modelName: "gpt-4o-mini"}, // This will go to "openai"
{testCaseName: "aws-bedrock", modelName: "us.meta.llama3-2-1b-instruct-v1:0"}, // This will go to "aws-bedrock".
} {
t.Run(tc.modelName, func(t *testing.T) {
require.Eventually(t, func() bool {
stream := client.Chat.Completions.NewStreaming(context.Background(), openai.ChatCompletionNewParams{
Messages: openai.F([]openai.ChatCompletionMessageParamUnion{
openai.UserMessage("Say this is a test"),
}),
Model: openai.F(tc.modelName),
})
defer func() {
_ = stream.Close()
}()

acc := openai.ChatCompletionAccumulator{}

for stream.Next() {
chunk := stream.Current()
if !acc.AddChunk(chunk) {
t.Log("error adding chunk")
return false
}
}

if err := stream.Err(); err != nil {
t.Logf("error: %v", err)
return false
}

nonEmptyCompletion := false
for _, choice := range acc.Choices {
t.Logf("choice: %s", choice.Message.Content)
if choice.Message.Content != "" {
nonEmptyCompletion = true
}
}
return nonEmptyCompletion
}, 10*time.Second, 1*time.Second)
})
}
})

// TODO: add more tests like updating the config, signal handling, etc.
}

// requireExtProcWithAWSCredentials starts the external processor with the provided executable and configPath
// with additional environment variables for AWS credentials.
//
Expand Down Expand Up @@ -237,16 +58,16 @@ func requireExtProc(t *testing.T, stdout io.Writer, executable, configPath strin

func requireTestUpstream(t *testing.T) {
// Starts the Envoy proxy.
envoyCmd := exec.Command(testUpstreamExecutablePath()) // #nosec G204
envoyCmd.Stdout = os.Stdout
envoyCmd.Stderr = os.Stderr
envoyCmd.Env = []string{"TESTUPSTREAM_ID=extproc_test"}
require.NoError(t, envoyCmd.Start())
t.Cleanup(func() { _ = envoyCmd.Process.Signal(os.Interrupt) })
cmd := exec.Command(testUpstreamExecutablePath()) // #nosec G204
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Env = []string{"TESTUPSTREAM_ID=extproc_test"}
require.NoError(t, cmd.Start())
t.Cleanup(func() { _ = cmd.Process.Kill() })
}

// requireRunEnvoy starts the Envoy proxy with the provided configuration.
func requireRunEnvoy(t *testing.T, accessLogPath string, openAIAPIKey string) {
func requireRunEnvoy(t *testing.T, accessLogPath string) {
tmpDir := t.TempDir()
envoyYaml := strings.Replace(envoyYamlBase, "ACCESS_LOG_PATH", accessLogPath, 1)

Expand All @@ -255,35 +76,25 @@ func requireRunEnvoy(t *testing.T, accessLogPath string, openAIAPIKey string) {
require.NoError(t, os.WriteFile(envoyYamlPath, []byte(envoyYaml), 0o600))

// Starts the Envoy proxy.
envoyCmd := exec.Command("envoy",
cmd := exec.Command("envoy",
"-c", envoyYamlPath,
"--log-level", "warn",
"--concurrency", strconv.Itoa(max(runtime.NumCPU(), 2)),
)
envoyCmd.Stdout = os.Stdout
envoyCmd.Stderr = os.Stderr
require.NoError(t, envoyCmd.Start())
t.Cleanup(func() { _ = envoyCmd.Process.Signal(os.Interrupt) })
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
require.NoError(t, cmd.Start())
t.Cleanup(func() { _ = cmd.Process.Kill() })
}

// requireBinaries requires Envoy to be present in the PATH as well as the Extproc binary in the out directory.
// requireBinaries requires Envoy to be present in the PATH as well as the Extproc and testuptream binaries in the out directory.
func requireBinaries(t *testing.T) {
_, err := exec.LookPath("envoy")
if err != nil {
t.Fatalf("envoy binary not found in PATH")
}

// Check if the Extproc binary is present in the root of the repository
require.NoError(t, err, "envoy binary not found in PATH")
_, err = os.Stat(extProcExecutablePath())
if err != nil {
t.Fatalf("%s binary not found in the root of the repository", extProcExecutablePath())
}

// Check if the TestUpstream binary is present in the root of the repository
require.NoErrorf(t, err, "extproc binary not found in the root of the repository")
_, err = os.Stat(testUpstreamExecutablePath())
if err != nil {
t.Fatalf("%s binary not found in the root of the repository", testUpstreamExecutablePath())
}
require.NoErrorf(t, err, "testupstream binary not found in the root of the repository")
}

// getEnvVarOrSkip requires an environment variable to be set.
Expand Down
Loading

0 comments on commit 5070454

Please sign in to comment.