Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Commit

Permalink
feat: cache services and dependencies queries
Browse files Browse the repository at this point in the history
  • Loading branch information
johanneswuerbach committed Feb 9, 2022
1 parent d844fd5 commit bd2d334
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 41 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ data:
outputLocation: s3://my-jaeger-s3-bucket-athena-results/
workGroup: jaeger
maxSpanAge: 336h
dependenciesQueryTtl: 6h
servicesQueryTtl: 60s
---
apiVersion: v1
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/stretchr/testify v1.7.0
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20211228015320-b4f792c43cd0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)

require (
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
12 changes: 7 additions & 5 deletions plugin/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ type S3 struct {
}

type Athena struct {
DatabaseName string
TableName string
WorkGroup string
OutputLocation string
MaxSpanAge string
DatabaseName string
TableName string
WorkGroup string
OutputLocation string
MaxSpanAge string
DependenciesQueryTTL string
ServicesQueryTTL string
}

type Configuration struct {
Expand Down
205 changes: 169 additions & 36 deletions plugin/s3spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/athena"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/johanneswuerbach/jaeger-s3/plugin/config"
"github.com/opentracing/opentracing-go"
"golang.org/x/sync/errgroup"
)

func NewReader(logger hclog.Logger, svc *athena.Client, cfg config.Athena) (*Reader, error) {
Expand All @@ -23,19 +25,33 @@ func NewReader(logger hclog.Logger, svc *athena.Client, cfg config.Athena) (*Rea
return nil, fmt.Errorf("failed to parse max timeframe: %w", err)
}

dependenciesQueryTTL, err := time.ParseDuration(cfg.DependenciesQueryTTL)
if err != nil {
return nil, fmt.Errorf("failed to parse dependencies query ttl: %w", err)
}

servicesQueryTTL, err := time.ParseDuration(cfg.ServicesQueryTTL)
if err != nil {
return nil, fmt.Errorf("failed to parse services query ttl: %w", err)
}

return &Reader{
svc: svc,
cfg: cfg,
logger: logger,
maxSpanAge: maxSpanAge,
svc: svc,
cfg: cfg,
logger: logger,
maxSpanAge: maxSpanAge,
dependenciesQueryTTL: dependenciesQueryTTL,
servicesQueryTTL: servicesQueryTTL,
}, nil
}

type Reader struct {
logger hclog.Logger
svc *athena.Client
cfg config.Athena
maxSpanAge time.Duration
logger hclog.Logger
svc *athena.Client
cfg config.Athena
maxSpanAge time.Duration
dependenciesQueryTTL time.Duration
servicesQueryTTL time.Duration
}

const (
Expand Down Expand Up @@ -87,18 +103,22 @@ func (s *Reader) GetServices(ctx context.Context) ([]string, error) {
otSpan, _ := opentracing.StartSpanFromContext(ctx, "GetServices")
defer otSpan.Finish()

conditions := []string{
fmt.Sprintf(`datehour BETWEEN '%s' AND '%s'`, s.DefaultMinTime().Format(PARTION_FORMAT), s.DefaultMaxTime().Format(PARTION_FORMAT)),
result, err := s.getServicesAndOperations(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query services and operations: %w", err)
}

result, err := s.queryAthena(ctx, fmt.Sprintf(`SELECT service_name FROM "%s" WHERE %s GROUP BY 1 ORDER BY 1`, s.cfg.TableName, strings.Join(conditions, " AND ")))
if err != nil {
return nil, fmt.Errorf("failed to query athena: %w", err)
serviceNameMap := map[string]bool{}
for _, v := range result {
serviceName := *v.Data[0].VarCharValue
if !serviceNameMap[serviceName] {
serviceNameMap[serviceName] = true
}
}

serviceNames := make([]string, len(result))
for i, v := range result {
serviceNames[i] = *v.Data[0].VarCharValue
serviceNames := make([]string, 0, len(serviceNameMap))
for serviceName, _ := range serviceNameMap {
serviceNames = append(serviceNames, serviceName)
}

return serviceNames, nil
Expand All @@ -109,30 +129,41 @@ func (s *Reader) GetOperations(ctx context.Context, query spanstore.OperationQue
span, _ := opentracing.StartSpanFromContext(ctx, "GetOperations")
defer span.Finish()

// TODO Prevent SQL injections
conditions := []string{
fmt.Sprintf(`datehour BETWEEN '%s' AND '%s'`, s.DefaultMinTime().Format(PARTION_FORMAT), s.DefaultMaxTime().Format(PARTION_FORMAT)),
fmt.Sprintf(`service_name = '%s'`, query.ServiceName),
result, err := s.getServicesAndOperations(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query services and operations: %w", err)
}

if query.SpanKind != "" {
conditions = append(conditions, fmt.Sprintf(`span_kind = '%s'`, query.SpanKind))
operations := []spanstore.Operation{}
for _, v := range result {
if query.ServiceName != *v.Data[0].VarCharValue {
continue
}

if query.SpanKind != "" && query.SpanKind != *v.Data[2].VarCharValue {
continue
}

operations = append(operations, spanstore.Operation{
Name: *v.Data[1].VarCharValue,
SpanKind: *v.Data[2].VarCharValue,
})
}

result, err := s.queryAthena(ctx, fmt.Sprintf(`SELECT operation_name, span_kind FROM "%s" WHERE %s GROUP BY 1, 2 ORDER BY 1, 2`, s.cfg.TableName, strings.Join(conditions, " AND ")))
if err != nil {
return nil, fmt.Errorf("failed to query athena: %w", err)
return operations, nil
}

func (r *Reader) getServicesAndOperations(ctx context.Context) ([]types.Row, error) {
conditions := []string{
fmt.Sprintf(`datehour BETWEEN '%s' AND '%s'`, r.DefaultMinTime().Format(PARTION_FORMAT), r.DefaultMaxTime().Format(PARTION_FORMAT)),
}

operations := make([]spanstore.Operation, len(result))
for i, v := range result {
operations[i] = spanstore.Operation{
Name: *v.Data[0].VarCharValue,
SpanKind: *v.Data[1].VarCharValue,
}
result, err := r.queryAthenaCached(ctx, fmt.Sprintf(`SELECT service_name, operation_name, span_kind FROM "%s" WHERE %s GROUP BY 1, 2, 3 ORDER BY 1, 2, 3`, r.cfg.TableName, strings.Join(conditions, " AND ")), r.servicesQueryTTL)
if err != nil {
return nil, fmt.Errorf("failed to query athena: %w", err)
}

return operations, nil
return result, nil
}

func (s *Reader) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
Expand Down Expand Up @@ -243,7 +274,7 @@ func (r *Reader) GetDependencies(ctx context.Context, endTs time.Time, lookback
fmt.Sprintf(`datehour BETWEEN '%s' AND '%s'`, startTs.Format(PARTION_FORMAT), endTs.Format(PARTION_FORMAT)),
}

result, err := r.queryAthena(ctx, fmt.Sprintf(`
result, err := r.queryAthenaCached(ctx, fmt.Sprintf(`
WITH spans_with_references AS (
SELECT
base.service_name,
Expand All @@ -260,7 +291,7 @@ func (r *Reader) GetDependencies(ctx context.Context, endTs time.Time, lookback
JOIN %s as jaeger ON spans_with_references.ref_trace_id = jaeger.trace_id AND spans_with_references.ref_span_id = jaeger.span_id
WHERE %s
GROUP BY 1, 2
`, r.cfg.TableName, r.cfg.TableName, strings.Join(conditions, " AND ")))
`, r.cfg.TableName, r.cfg.TableName, strings.Join(conditions, " AND ")), r.dependenciesQueryTTL)
if err != nil {
return nil, fmt.Errorf("failed to query athena: %w", err)
}
Expand All @@ -282,6 +313,85 @@ func (r *Reader) GetDependencies(ctx context.Context, endTs time.Time, lookback
return dependencyLinks, nil
}

func (r *Reader) queryAthenaCached(ctx context.Context, queryString string, ttl time.Duration) ([]types.Row, error) {
paginator := athena.NewListQueryExecutionsPaginator(r.svc, &athena.ListQueryExecutionsInput{
WorkGroup: &r.cfg.WorkGroup,
})
queryExecutionIds := []string{}
for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get athena query result: %w", err)
}

queryExecutionIds = append(queryExecutionIds, output.QueryExecutionIds...)
}

queryExecutionIdChunks := chunks(queryExecutionIds, 50)
g, getQueryExecutionCtx := errgroup.WithContext(ctx)

ttlTime := time.Now().Add(-ttl)
var mu sync.Mutex

latestCompletionDateTime := time.Now()
latestQueryExecutionId := ""

for _, value := range queryExecutionIdChunks {
value := value
g.Go(func() error {
result, err := r.svc.BatchGetQueryExecution(getQueryExecutionCtx, &athena.BatchGetQueryExecutionInput{
QueryExecutionIds: value,
})
if err != nil {
return err
}

for _, v := range result.QueryExecutions {
// Different query
if *v.Query != queryString {
continue
}

// Query didn't completed
if v.Status.CompletionDateTime == nil {
continue
}

// Query already expired
if v.Status.CompletionDateTime.Before(ttlTime) {
continue
}

mu.Lock()

// Store the latest query result
if latestQueryExecutionId == "" {
latestQueryExecutionId = *v.QueryExecutionId
latestCompletionDateTime = *v.Status.CompletionDateTime
} else {
if v.Status.CompletionDateTime.After(latestCompletionDateTime) {
latestQueryExecutionId = *v.QueryExecutionId
latestCompletionDateTime = *v.Status.CompletionDateTime
}
}

mu.Unlock()
}

return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}

if latestQueryExecutionId != "" {
return r.fetchQueryResult(ctx, latestQueryExecutionId)
}

return r.queryAthena(ctx, queryString)
}

func (s *Reader) queryAthena(ctx context.Context, queryString string) ([]types.Row, error) {
output, err := s.svc.StartQueryExecution(ctx, &athena.StartQueryExecutionInput{
QueryString: &queryString,
Expand Down Expand Up @@ -313,9 +423,13 @@ func (s *Reader) queryAthena(ctx context.Context, queryString string) ([]types.R
time.Sleep(100 * time.Millisecond)
}

return s.fetchQueryResult(ctx, *output.QueryExecutionId)
}

func (r *Reader) fetchQueryResult(ctx context.Context, queryExecutionId string) ([]types.Row, error) {
// Get query results
paginator := athena.NewGetQueryResultsPaginator(s.svc, &athena.GetQueryResultsInput{
QueryExecutionId: output.QueryExecutionId,
paginator := athena.NewGetQueryResultsPaginator(r.svc, &athena.GetQueryResultsInput{
QueryExecutionId: &queryExecutionId,
})
rows := []types.Row{}
for paginator.HasMorePages() {
Expand All @@ -334,3 +448,22 @@ func (s *Reader) queryAthena(ctx context.Context, queryString string) ([]types.R

return rows, nil
}

// From https://stackoverflow.com/a/67011816/2148473
func chunks(xs []string, chunkSize int) [][]string {
if len(xs) == 0 {
return nil
}
divided := make([][]string, (len(xs)+chunkSize-1)/chunkSize)
prev := 0
i := 0
till := len(xs) - chunkSize
for prev < till {
next := prev + chunkSize
divided[i] = xs[prev:next]
prev = next
i++
}
divided[i] = xs[prev:]
return divided
}

0 comments on commit bd2d334

Please sign in to comment.