Skip to content

Commit

Permalink
Merge pull request #886 from ydb-platform/ctx-retry-call
Browse files Browse the repository at this point in the history
move retry call detect into xcontext package
  • Loading branch information
asmyasnikov authored Nov 8, 2023
2 parents a00eddd + 779e0ba commit 36f3718
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 45 deletions.
29 changes: 7 additions & 22 deletions internal/table/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
Expand All @@ -21,21 +22,6 @@ type SessionProvider interface {
Put(context.Context, *session) (err error)
}

type (
markRetryCallKey struct{}
)

func markRetryCall(ctx context.Context) context.Context {
return context.WithValue(ctx, markRetryCallKey{}, true)
}

func isRetryCalledAbove(ctx context.Context) bool {
if _, has := ctx.Value(markRetryCallKey{}).(bool); has {
return true
}
return false
}

func doTx(
ctx context.Context,
c SessionProvider,
Expand All @@ -47,7 +33,7 @@ func doTx(
opts.Trace = &trace.Table{}
}
attempts, onIntermediate := 0, trace.TableOnDoTx(opts.Trace, &ctx,
opts.Label, opts.Label, opts.Idempotent, isRetryCalledAbove(ctx),
opts.Label, opts.Label, opts.Idempotent, xcontext.IsNestedCall(ctx),
)
defer func() {
onIntermediate(err)(attempts, err)
Expand Down Expand Up @@ -87,7 +73,7 @@ func doTx(
}
}()
}
return op(ctx, tx)
return op(xcontext.MarkRetryCall(ctx), tx)
}()

if err != nil {
Expand Down Expand Up @@ -116,7 +102,7 @@ func do(
opts.Trace = &trace.Table{}
}
attempts, onIntermediate := 0, trace.TableOnDo(opts.Trace, &ctx,
opts.Label, opts.Label, opts.Idempotent, isRetryCalledAbove(ctx),
opts.Label, opts.Label, opts.Idempotent, xcontext.IsNestedCall(ctx),
)
defer func() {
onIntermediate(err)(attempts, err)
Expand All @@ -137,7 +123,7 @@ func do(
}
}()
}
return op(ctx, s)
return op(xcontext.MarkRetryCall(ctx), s)
}()

if err != nil {
Expand All @@ -156,7 +142,7 @@ func retryBackoff(
op table.Operation,
opts ...retry.Option,
) error {
return retry.Retry(markRetryCall(ctx),
return retry.Retry(ctx,
func(ctx context.Context) (err error) {
var s *session

Expand All @@ -169,8 +155,7 @@ func retryBackoff(
_ = p.Put(ctx, s)
}()

err = op(ctx, s)
if err != nil {
if err = op(ctx, s); err != nil {
s.checkError(err)
return xerrors.WithStackTrace(err)
}
Expand Down
18 changes: 18 additions & 0 deletions internal/xcontext/retry_call.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package xcontext

import "context"

type (
markRetryCallKey struct{}
)

func MarkRetryCall(ctx context.Context) context.Context {
return context.WithValue(ctx, markRetryCallKey{}, true)
}

func IsNestedCall(ctx context.Context) bool {
if _, has := ctx.Value(markRetryCallKey{}).(bool); has {
return true
}
return false
}
23 changes: 5 additions & 18 deletions retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,21 +191,6 @@ func WithPanicCallback(panicCallback func(e interface{})) panicCallbackOption {
return panicCallbackOption{callback: panicCallback}
}

type (
markRetryCallKey struct{}
)

func markRetryCall(ctx context.Context) context.Context {
return context.WithValue(ctx, markRetryCallKey{}, true)
}

func isRetryCalledAbove(ctx context.Context) bool {
if _, has := ctx.Value(markRetryCallKey{}).(bool); has {
return true
}
return false
}

// Retry provide the best effort fo retrying operation
//
// Retry implements internal busy loop until one of the following conditions is met:
Expand All @@ -228,7 +213,9 @@ func Retry(ctx context.Context, op retryOperation, opts ...Option) (finalErr err
opt.ApplyRetryOption(options)
}
}
ctx = xcontext.WithIdempotent(ctx, options.idempotent)
if options.idempotent {
ctx = xcontext.WithIdempotent(ctx, options.idempotent)
}
defer func() {
if finalErr != nil && options.stackTrace {
finalErr = xerrors.WithStackTrace(finalErr,
Expand All @@ -242,7 +229,7 @@ func Retry(ctx context.Context, op retryOperation, opts ...Option) (finalErr err

code = int64(0)
onIntermediate = trace.RetryOnRetry(options.trace, &ctx,
options.label, options.label, options.idempotent, isRetryCalledAbove(ctx),
options.label, options.label, options.idempotent, xcontext.IsNestedCall(ctx),
)
)
defer func() {
Expand Down Expand Up @@ -271,7 +258,7 @@ func Retry(ctx context.Context, op retryOperation, opts ...Option) (finalErr err
}
}()
}
return op(markRetryCall(ctx))
return op(ctx)
}()

if err == nil {
Expand Down
9 changes: 5 additions & 4 deletions retry/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)
Expand Down Expand Up @@ -36,7 +37,7 @@ func WithDoRetryOptions(opts ...Option) doRetryOptionsOption {
}

// Do is a retryer of database/sql Conn with fallbacks on errors
func Do(ctx context.Context, db *sql.DB, f func(ctx context.Context, cc *sql.Conn) error, opts ...doOption) error {
func Do(ctx context.Context, db *sql.DB, op func(ctx context.Context, cc *sql.Conn) error, opts ...doOption) error {
var (
options = doOptions{}
attempts = 0
Expand All @@ -62,7 +63,7 @@ func Do(ctx context.Context, db *sql.DB, f func(ctx context.Context, cc *sql.Con
defer func() {
_ = cc.Close()
}()
if err = f(ctx, cc); err != nil {
if err = op(xcontext.MarkRetryCall(ctx), cc); err != nil {
return unwrapErrBadConn(xerrors.WithStackTrace(err))
}
return nil
Expand Down Expand Up @@ -117,7 +118,7 @@ func WithTxOptions(txOptions *sql.TxOptions) txOptionsOption {
}

// DoTx is a retryer of database/sql transactions with fallbacks on errors
func DoTx(ctx context.Context, db *sql.DB, f func(context.Context, *sql.Tx) error, opts ...doTxOption) error {
func DoTx(ctx context.Context, db *sql.DB, op func(context.Context, *sql.Tx) error, opts ...doTxOption) error {
var (
options = doTxOptions{
retryOptions: []Option{},
Expand Down Expand Up @@ -159,7 +160,7 @@ func DoTx(ctx context.Context, db *sql.DB, f func(context.Context, *sql.Tx) erro
xerrors.WithStackTrace(fmt.Errorf("rollback failed: %w", errRollback)),
)
}()
if err = f(ctx, tx); err != nil {
if err = op(xcontext.MarkRetryCall(ctx), tx); err != nil {
return unwrapErrBadConn(xerrors.WithStackTrace(err))
}
if err = tx.Commit(); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion trace/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type (

Label string
Idempotent bool
NestedCall bool // flag when Retry called inside head Retry

NestedCall bool // a sign for detect Retry calls inside head Retry
}
RetryLoopIntermediateInfo struct {
Error error
Expand Down

0 comments on commit 36f3718

Please sign in to comment.