Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Commit

Permalink
Merge pull request #15 from johanneswuerbach/more-tests
Browse files Browse the repository at this point in the history
feat: improved cache lookup
  • Loading branch information
johanneswuerbach authored Feb 21, 2022
2 parents 2311752 + e356587 commit 91e6637
Show file tree
Hide file tree
Showing 5 changed files with 767 additions and 144 deletions.
126 changes: 126 additions & 0 deletions plugin/s3spanstore/athena_query_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package s3spanstore

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/athena"
"github.com/aws/aws-sdk-go-v2/service/athena/types"
"github.com/hashicorp/go-hclog"
"golang.org/x/sync/errgroup"
)

type AthenaQueryCache struct {
logger hclog.Logger
svc AthenaAPI
workGroup string
}

func NewAthenaQueryCache(logger hclog.Logger, svc AthenaAPI, workGroup string) *AthenaQueryCache {
return &AthenaQueryCache{logger: logger, svc: svc, workGroup: workGroup}
}

func (c *AthenaQueryCache) Lookup(ctx context.Context, key string, ttl time.Duration) (*types.QueryExecution, error) {
ttlTime := time.Now().Add(-ttl)
queryExecutionIdChunks := make(chan []string, 3)

g, gCtx := errgroup.WithContext(ctx)
fetchCtx, fetchCancelFunc := context.WithCancel(gCtx)

// Page fetcher
g.Go(func() error {
paginator := athena.NewListQueryExecutionsPaginator(c.svc, &athena.ListQueryExecutionsInput{
WorkGroup: &c.workGroup,
MaxResults: aws.Int32(50),
})

pages := 0
earlyExit := false
defer close(queryExecutionIdChunks)

Pages:
for paginator.HasMorePages() {
pages += 1
output, err := paginator.NextPage(fetchCtx)
if err != nil {
if errors.Is(err, context.Canceled) {
earlyExit = true
break Pages
}

return fmt.Errorf("failed to get athena query result: %w", err)
}

select {
case <-fetchCtx.Done():
earlyExit = true
break Pages
case queryExecutionIdChunks <- output.QueryExecutionIds:
}
}

c.logger.Debug("AthenaQueryCache/ListQueryExecutions finished", "pages", pages, "earlyExit", earlyExit)

return nil
})

// QueryExecutions lookup worker
var latestQueryExecution *types.QueryExecution
g.Go(func() error {
executionsFetched := 0
found := false

select {
case <-fetchCtx.Done():
break
case queryExecutionIds := <-queryExecutionIdChunks:
if len(queryExecutionIds) == 0 {
fetchCancelFunc() // Cancel search as results are ordered, so this is the most recent
break
}

result, err := c.svc.BatchGetQueryExecution(gCtx, &athena.BatchGetQueryExecutionInput{
QueryExecutionIds: queryExecutionIds,
})
if err != nil {
return fmt.Errorf("failed to get query executions: %w", err)
}

executionsFetched += len(result.QueryExecutions)
for _, v := range result.QueryExecutions {
// Query already expired
if v.Status.SubmissionDateTime.Before(ttlTime) {
fetchCancelFunc() // Cancel search as results are ordered so no more recent query wll follow
continue
}

// We don't want to match unsuccessful executions
if v.Status.State == "FAILED" || v.Status.State == "CANCELLED" {
continue
}

// Matching query
if strings.Contains(*v.Query, key) {
found = true
latestQueryExecution = &v
fetchCancelFunc() // Cancel search as results are ordered, so this is the most recent
break
}
}
}

c.logger.Debug("AthenaQueryCache/BatchGetQueryExecution finished", "executionsFetched", executionsFetched, "found", found)

return nil
})

if err := g.Wait(); err != nil {
return nil, err
}

return latestQueryExecution, nil
}
260 changes: 260 additions & 0 deletions plugin/s3spanstore/athena_query_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
package s3spanstore

import (
"context"
"os"
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/athena"
"github.com/aws/aws-sdk-go-v2/service/athena/types"
"github.com/golang/mock/gomock"
"github.com/hashicorp/go-hclog"
"github.com/johanneswuerbach/jaeger-s3/plugin/s3spanstore/mocks"
"github.com/stretchr/testify/assert"
)

func NewTestAthenaQueryCache(mockSvc *mocks.MockAthenaAPI) *AthenaQueryCache {
loggerName := "jaeger-s3"

logLevel := os.Getenv("GRPC_STORAGE_PLUGIN_LOG_LEVEL")
if logLevel == "" {
logLevel = hclog.Debug.String()
}

logger := hclog.New(&hclog.LoggerOptions{
Level: hclog.LevelFromString(logLevel),
Name: loggerName,
JSONFormat: true,
})

return NewAthenaQueryCache(logger, mockSvc, "jaeger")
}

func TestNoResults(t *testing.T) {

ctrl := gomock.NewController(t)
defer ctrl.Finish()

assert := assert.New(t)
ctx := context.TODO()

mockSvc := mocks.NewMockAthenaAPI(ctrl)
mockSvc.EXPECT().ListQueryExecutions(gomock.Any(), gomock.Any()).
Return(&athena.ListQueryExecutionsOutput{}, nil)
cache := NewTestAthenaQueryCache(mockSvc)

cachedQuery, err := cache.Lookup(ctx, "test", time.Second*60)

assert.NoError(err)
assert.Nil(cachedQuery)
}

func TestOneFinishedResult(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

validQueryID := "get-services"
invalidQueryID := "different"

assert := assert.New(t)
ctx := context.TODO()

mockSvc := mocks.NewMockAthenaAPI(ctrl)
mockSvc.EXPECT().ListQueryExecutions(gomock.Any(), gomock.Any()).
Return(&athena.ListQueryExecutionsOutput{
QueryExecutionIds: []string{invalidQueryID, validQueryID},
}, nil)

mockSvc.EXPECT().BatchGetQueryExecution(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, input *athena.BatchGetQueryExecutionInput, _ ...func(*athena.Options)) (*athena.BatchGetQueryExecutionOutput, error) {
assert.Equal([]string{invalidQueryID, validQueryID}, input.QueryExecutionIds)

return &athena.BatchGetQueryExecutionOutput{
QueryExecutions: []types.QueryExecution{
{
Query: aws.String("asdas"),
QueryExecutionId: aws.String(invalidQueryID),
Status: &types.QueryExecutionStatus{
SubmissionDateTime: aws.Time(time.Now().UTC()),
},
},
{
Query: aws.String(`SELECT service_name, operation_name, span_kind FROM "jaeger" WHERE`),
QueryExecutionId: aws.String(validQueryID),
Status: &types.QueryExecutionStatus{
SubmissionDateTime: aws.Time(time.Now().UTC()),
CompletionDateTime: aws.Time(time.Now().UTC()),
},
},
},
}, nil
})

cache := NewTestAthenaQueryCache(mockSvc)

cachedQuery, err := cache.Lookup(ctx, "service_name, operation_name", time.Second*60)

assert.NoError(err)
assert.NotNil(cachedQuery)
assert.Equal(validQueryID, *cachedQuery.QueryExecutionId)
}

func TestOnePendingResult(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

validQueryID := "get-services"
validPreviousQueryID := "get-services-old"
invalidQueryID := "different"

assert := assert.New(t)
ctx := context.TODO()

mockSvc := mocks.NewMockAthenaAPI(ctrl)
mockSvc.EXPECT().ListQueryExecutions(gomock.Any(), gomock.Any()).
Return(&athena.ListQueryExecutionsOutput{
QueryExecutionIds: []string{invalidQueryID, validQueryID, validPreviousQueryID},
}, nil)

mockSvc.EXPECT().BatchGetQueryExecution(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, input *athena.BatchGetQueryExecutionInput, _ ...func(*athena.Options)) (*athena.BatchGetQueryExecutionOutput, error) {
assert.Equal([]string{invalidQueryID, validQueryID, validPreviousQueryID}, input.QueryExecutionIds)

return &athena.BatchGetQueryExecutionOutput{
QueryExecutions: []types.QueryExecution{
{
Query: aws.String("asdas"),
QueryExecutionId: aws.String(invalidQueryID),
Status: &types.QueryExecutionStatus{
SubmissionDateTime: aws.Time(time.Now().UTC()),
},
},
{
Query: aws.String(`SELECT service_name, operation_name, span_kind FROM "jaeger" WHERE`),
QueryExecutionId: aws.String(validQueryID),
Status: &types.QueryExecutionStatus{
CompletionDateTime: nil,
SubmissionDateTime: aws.Time(time.Now().UTC()),
},
},
{
Query: aws.String(`SELECT service_name, operation_name, span_kind FROM "jaeger" WHERE`),
QueryExecutionId: aws.String(validPreviousQueryID),
Status: &types.QueryExecutionStatus{
CompletionDateTime: aws.Time(time.Now().UTC()),
SubmissionDateTime: aws.Time(time.Now().UTC().Add(-10 * time.Second)),
},
},
},
}, nil
})

cache := NewTestAthenaQueryCache(mockSvc)

cachedQuery, err := cache.Lookup(ctx, "service_name, operation_name", time.Second*60)

assert.NoError(err)
assert.NotNil(cachedQuery)
assert.Equal(validQueryID, *cachedQuery.QueryExecutionId)
}

func TestOneStaleResult(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

expiredQueryID := "get-services"

assert := assert.New(t)
ctx := context.TODO()

mockSvc := mocks.NewMockAthenaAPI(ctrl)
mockSvc.EXPECT().ListQueryExecutions(gomock.Any(), gomock.Any()).
Return(&athena.ListQueryExecutionsOutput{
QueryExecutionIds: []string{expiredQueryID},
}, nil)

mockSvc.EXPECT().BatchGetQueryExecution(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, input *athena.BatchGetQueryExecutionInput, _ ...func(*athena.Options)) (*athena.BatchGetQueryExecutionOutput, error) {
assert.Equal([]string{expiredQueryID}, input.QueryExecutionIds)

return &athena.BatchGetQueryExecutionOutput{
QueryExecutions: []types.QueryExecution{
{
Query: aws.String(`SELECT service_name, operation_name, span_kind FROM "jaeger" WHERE`),
QueryExecutionId: aws.String(expiredQueryID),
Status: &types.QueryExecutionStatus{
CompletionDateTime: nil,
SubmissionDateTime: aws.Time(time.Now().UTC().Add(-time.Second * 90)),
},
},
},
}, nil
})

cache := NewTestAthenaQueryCache(mockSvc)

cachedQuery, err := cache.Lookup(ctx, "service_name, operation_name", time.Second*60)

assert.NoError(err)
assert.Nil(cachedQuery)
}

func TestEarlyExitWithMultiplePages(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

queryID := "get-services"

assert := assert.New(t)
ctx := context.TODO()

first := true

mockSvc := mocks.NewMockAthenaAPI(ctrl)
mockSvc.EXPECT().ListQueryExecutions(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, _ *athena.ListQueryExecutionsInput, _ ...func(*athena.Options)) (*athena.ListQueryExecutionsOutput, error) {
time.Sleep(time.Millisecond * 200)
if ctx.Err() != nil {
return nil, ctx.Err()
}

if first {
first = false
return &athena.ListQueryExecutionsOutput{
NextToken: aws.String("next"),
QueryExecutionIds: []string{queryID},
}, nil
} else {
return &athena.ListQueryExecutionsOutput{
QueryExecutionIds: []string{},
}, nil
}
}).Times(2)

mockSvc.EXPECT().BatchGetQueryExecution(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, input *athena.BatchGetQueryExecutionInput, _ ...func(*athena.Options)) (*athena.BatchGetQueryExecutionOutput, error) {
assert.Equal([]string{queryID}, input.QueryExecutionIds)

return &athena.BatchGetQueryExecutionOutput{
QueryExecutions: []types.QueryExecution{
{
Query: aws.String(`SELECT service_name, operation_name, span_kind FROM "jaeger" WHERE`),
QueryExecutionId: aws.String(queryID),
Status: &types.QueryExecutionStatus{
CompletionDateTime: nil,
SubmissionDateTime: aws.Time(time.Now().UTC()),
},
},
},
}, nil
})

cache := NewTestAthenaQueryCache(mockSvc)

cachedQuery, err := cache.Lookup(ctx, "service_name, operation_name", time.Second*60)

assert.NoError(err)
assert.NotNil(cachedQuery)
}
Loading

0 comments on commit 91e6637

Please sign in to comment.