Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Commit

Permalink
feat: improved cache lookup
Browse files Browse the repository at this point in the history
use the fact that the executions are ordered and re-use still running queries
  • Loading branch information
johanneswuerbach committed Feb 21, 2022
1 parent 6999107 commit e356587
Show file tree
Hide file tree
Showing 4 changed files with 345 additions and 68 deletions.
124 changes: 78 additions & 46 deletions plugin/s3spanstore/athena_query_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,90 +2,122 @@ package s3spanstore

import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/athena"
"github.com/aws/aws-sdk-go-v2/service/athena/types"
"github.com/hashicorp/go-hclog"
"golang.org/x/sync/errgroup"
)

type AthenaQueryCache struct {
logger hclog.Logger
svc AthenaAPI
workGroup string
}

func NewAthenaQueryCache(svc AthenaAPI, workGroup string) *AthenaQueryCache {
return &AthenaQueryCache{svc: svc, workGroup: workGroup}
func NewAthenaQueryCache(logger hclog.Logger, svc AthenaAPI, workGroup string) *AthenaQueryCache {
return &AthenaQueryCache{logger: logger, svc: svc, workGroup: workGroup}
}

func (c *AthenaQueryCache) Lookup(ctx context.Context, key string, ttl time.Duration) (*types.QueryExecution, error) {
paginator := athena.NewListQueryExecutionsPaginator(c.svc, &athena.ListQueryExecutionsInput{
WorkGroup: &c.workGroup,
})
queryExecutionIds := []string{}
for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get athena query result: %w", err)
}
ttlTime := time.Now().Add(-ttl)
queryExecutionIdChunks := make(chan []string, 3)

queryExecutionIds = append(queryExecutionIds, output.QueryExecutionIds...)
}
g, gCtx := errgroup.WithContext(ctx)
fetchCtx, fetchCancelFunc := context.WithCancel(gCtx)

queryExecutionIdChunks := chunks(queryExecutionIds, 50)
g, getQueryExecutionCtx := errgroup.WithContext(ctx)
// Page fetcher
g.Go(func() error {
paginator := athena.NewListQueryExecutionsPaginator(c.svc, &athena.ListQueryExecutionsInput{
WorkGroup: &c.workGroup,
MaxResults: aws.Int32(50),
})

ttlTime := time.Now().Add(-ttl)
var mu sync.Mutex
pages := 0
earlyExit := false
defer close(queryExecutionIdChunks)

Pages:
for paginator.HasMorePages() {
pages += 1
output, err := paginator.NextPage(fetchCtx)
if err != nil {
if errors.Is(err, context.Canceled) {
earlyExit = true
break Pages
}

return fmt.Errorf("failed to get athena query result: %w", err)
}

select {
case <-fetchCtx.Done():
earlyExit = true
break Pages
case queryExecutionIdChunks <- output.QueryExecutionIds:
}
}

c.logger.Debug("AthenaQueryCache/ListQueryExecutions finished", "pages", pages, "earlyExit", earlyExit)

return nil
})

// QueryExecutions lookup worker
var latestQueryExecution *types.QueryExecution
g.Go(func() error {
executionsFetched := 0
found := false

select {
case <-fetchCtx.Done():
break
case queryExecutionIds := <-queryExecutionIdChunks:
if len(queryExecutionIds) == 0 {
fetchCancelFunc() // Cancel search as results are ordered, so this is the most recent
break
}

for _, value := range queryExecutionIdChunks {
value := value
g.Go(func() error {
result, err := c.svc.BatchGetQueryExecution(getQueryExecutionCtx, &athena.BatchGetQueryExecutionInput{
QueryExecutionIds: value,
result, err := c.svc.BatchGetQueryExecution(gCtx, &athena.BatchGetQueryExecutionInput{
QueryExecutionIds: queryExecutionIds,
})
if err != nil {
return err
return fmt.Errorf("failed to get query executions: %w", err)
}

executionsFetched += len(result.QueryExecutions)
for _, v := range result.QueryExecutions {
// Different query
if !strings.Contains(*v.Query, key) {
continue
}

// Query didn't completed
if v.Status.CompletionDateTime == nil {
// Query already expired
if v.Status.SubmissionDateTime.Before(ttlTime) {
fetchCancelFunc() // Cancel search as results are ordered so no more recent query wll follow
continue
}

// Query already expired
if v.Status.CompletionDateTime.Before(ttlTime) {
// We don't want to match unsuccessful executions
if v.Status.State == "FAILED" || v.Status.State == "CANCELLED" {
continue
}

mu.Lock()

// Store the latest query result
if latestQueryExecution == nil {
// Matching query
if strings.Contains(*v.Query, key) {
found = true
latestQueryExecution = &v
} else {
if v.Status.CompletionDateTime.After(*latestQueryExecution.Status.CompletionDateTime) {
latestQueryExecution = &v
}
fetchCancelFunc() // Cancel search as results are ordered, so this is the most recent
break
}

mu.Unlock()
}
}

c.logger.Debug("AthenaQueryCache/BatchGetQueryExecution finished", "executionsFetched", executionsFetched, "found", found)

return nil
})

return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit e356587

Please sign in to comment.