Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

* Skipped explicit Rollback of transaction on errors (server-side a… #1230

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## v3.67.1
asmyasnikov marked this conversation as resolved.
Show resolved Hide resolved
* Fixed race of stop internal processes on close topic writer
* Fixed goroutines leak within topic reader on network problems
* Skipped explicit `Rollback` of transaction on errors (server-side automatically rolled back transactions on errors)
asmyasnikov marked this conversation as resolved.
Show resolved Hide resolved

## v3.67.0
* Added `ydb.WithNodeAddressMutator` experimental option for mutate node addresses from `discovery.ListEndpoints` response
Expand Down
49 changes: 33 additions & 16 deletions internal/table/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,9 +666,30 @@ func (c *Client) Do(ctx context.Context, op table.Operation, opts ...table.Optio
onDone(attempts, finalErr)
}()

err := do(ctx, c, c.config, op, func(err error) {
attempts++
}, config.RetryOptions...)
err := retryBackoff(ctx, c,
func(ctx context.Context, s table.Session) (err error) {
attempts++

err = func() error {
if panicCallback := c.config.PanicCallback(); panicCallback != nil {
defer func() {
if e := recover(); e != nil {
panicCallback(e)
}
}()
}

return op(xcontext.MarkRetryCall(ctx), s)
}()

if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
},
config.RetryOptions...,
)
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand All @@ -695,26 +716,17 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O
onDone(attempts, finalErr)
}()

return retryBackoff(ctx, c,
func(ctx context.Context, s table.Session) (err error) {
err := retryBackoff(ctx, c,
func(ctx context.Context, s table.Session) (finalErr error) {
attempts++

tx, err := s.BeginTransaction(ctx, config.TxSettings)
if err != nil {
return xerrors.WithStackTrace(err)
}

defer func() {
if err != nil {
errRollback := tx.Rollback(ctx)
if errRollback != nil {
err = xerrors.NewWithIssues("",
xerrors.WithStackTrace(err),
xerrors.WithStackTrace(errRollback),
)
} else {
err = xerrors.WithStackTrace(err)
}
if finalErr != nil {
_ = tx.Rollback(ctx)
}
}()

Expand Down Expand Up @@ -743,6 +755,11 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O
},
config.RetryOptions...,
)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
}

func (c *Client) internalPoolGCTick(ctx context.Context, idleThreshold time.Duration) {
Expand Down
40 changes: 0 additions & 40 deletions internal/table/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package table
import (
"context"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
Expand All @@ -22,44 +20,6 @@ type SessionProvider interface {
Put(ctx context.Context, s *session) (err error)
}

func do(
ctx context.Context,
c SessionProvider,
config *config.Config,
op table.Operation,
onAttempt func(err error),
opts ...retry.Option,
) (err error) {
return retryBackoff(ctx, c,
func(ctx context.Context, s table.Session) (err error) {
defer func() {
if onAttempt != nil {
onAttempt(err)
}
}()

err = func() error {
if panicCallback := config.PanicCallback(); panicCallback != nil {
defer func() {
if e := recover(); e != nil {
panicCallback(e)
}
}()
}

return op(xcontext.MarkRetryCall(ctx), s)
}()

if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
},
opts...,
)
}

func retryBackoff(
ctx context.Context,
p SessionProvider,
Expand Down
32 changes: 6 additions & 26 deletions internal/table/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
grpcCodes "google.golang.org/grpc/codes"
grpcStatus "google.golang.org/grpc/status"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xrand"
Expand Down Expand Up @@ -41,12 +40,10 @@ func TestRetryerBackoffRetryCancelation(t *testing.T) {
ctx, cancel := xcontext.WithCancel(context.Background())
results := make(chan error)
go func() {
err := do(ctx, p,
config.New(),
err := retryBackoff(ctx, p,
func(ctx context.Context, _ table.Session) error {
return testErr
},
nil,
retry.WithFastBackoff(
testutil.BackoffFunc(func(n int) <-chan time.Time {
ch := make(chan time.Time)
Expand Down Expand Up @@ -103,7 +100,7 @@ func TestRetryerBadSession(t *testing.T) {
sessions []table.Session
)
ctx, cancel := xcontext.WithCancel(context.Background())
err := do(ctx, p, config.New(),
err := retryBackoff(ctx, p,
func(ctx context.Context, s table.Session) error {
sessions = append(sessions, s)
i++
Expand All @@ -115,7 +112,6 @@ func TestRetryerBadSession(t *testing.T) {
xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION),
)
},
func(err error) {},
)
if !xerrors.Is(err, context.Canceled) {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -154,17 +150,13 @@ func TestRetryerSessionClosing(t *testing.T) {
}
var sessions []table.Session
for i := 0; i < 1000; i++ {
err := do(
context.Background(),
p,
config.New(),
err := retryBackoff(context.Background(), p,
func(ctx context.Context, s table.Session) error {
sessions = append(sessions, s)
s.(*session).SetStatus(table.SessionClosing)

return nil
},
nil,
)
if err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -208,14 +200,10 @@ func TestRetryerImmediateReturn(t *testing.T) {
p := SingleSession(
simpleSession(t),
)
err := do(
context.Background(),
p,
config.New(),
err := retryBackoff(context.Background(), p,
func(ctx context.Context, _ table.Session) error {
return testErr
},
nil,
retry.WithFastBackoff(
testutil.BackoffFunc(func(n int) <-chan time.Time {
panic("this code will not be called")
Expand Down Expand Up @@ -341,10 +329,7 @@ func TestRetryContextDeadline(t *testing.T) {
t.Run(fmt.Sprintf("Timeout=%v,Sleep=%v", timeout, sleep), func(t *testing.T) {
ctx, cancel := xcontext.WithTimeout(context.Background(), timeout)
defer cancel()
_ = do(
ctx,
p,
config.New(),
_ = retryBackoff(ctx, p,
func(ctx context.Context, _ table.Session) error {
select {
case <-ctx.Done():
Expand All @@ -353,7 +338,6 @@ func TestRetryContextDeadline(t *testing.T) {
return errs[r.Int(len(errs))]
}
},
nil,
)
})
}
Expand Down Expand Up @@ -442,10 +426,7 @@ func TestRetryWithCustomErrors(t *testing.T) {
i = 0
sessions = make(map[table.Session]int)
)
err := do(
ctx,
p,
config.New(),
err := retryBackoff(ctx, p,
func(ctx context.Context, s table.Session) (err error) {
sessions[s]++
i++
Expand All @@ -455,7 +436,6 @@ func TestRetryWithCustomErrors(t *testing.T) {

return nil
},
nil,
)
//nolint:nestif
if test.retriable {
Expand Down
18 changes: 18 additions & 0 deletions internal/table/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
var (
errTxAlreadyCommitted = xerrors.Wrap(fmt.Errorf("transaction already committed"))
errTxRollbackedEarly = xerrors.Wrap(fmt.Errorf("transaction rollbacked early"))
errTxFailedEarly = xerrors.Wrap(fmt.Errorf("transaction failed early"))
)

type txState struct {
Expand All @@ -42,6 +43,7 @@ const (
txStateInitialized txStateEnum = iota
txStateCommitted
txStateRollbacked
txStateFailed
)

type transaction struct {
Expand Down Expand Up @@ -73,11 +75,15 @@ func (tx *transaction) Execute(
switch tx.state.Load() {
case txStateCommitted:
return nil, xerrors.WithStackTrace(errTxAlreadyCommitted)
case txStateFailed:
return nil, xerrors.WithStackTrace(errTxFailedEarly)
case txStateRollbacked:
return nil, xerrors.WithStackTrace(errTxRollbackedEarly)
default:
_, r, err = tx.s.Execute(ctx, tx.control, query, parameters, opts...)
if err != nil {
tx.state.Store(txStateFailed)

return nil, xerrors.WithStackTrace(err)
}

Expand Down Expand Up @@ -115,11 +121,15 @@ func (tx *transaction) ExecuteStatement(
switch tx.state.Load() {
case txStateCommitted:
return nil, xerrors.WithStackTrace(errTxAlreadyCommitted)
case txStateFailed:
return nil, xerrors.WithStackTrace(errTxFailedEarly)
case txStateRollbacked:
return nil, xerrors.WithStackTrace(errTxRollbackedEarly)
default:
_, r, err = stmt.Execute(ctx, tx.control, parameters, opts...)
if err != nil {
tx.state.Store(txStateFailed)

return nil, xerrors.WithStackTrace(err)
}

Expand Down Expand Up @@ -148,6 +158,8 @@ func (tx *transaction) CommitTx(
switch tx.state.Load() {
case txStateCommitted:
return nil, xerrors.WithStackTrace(errTxAlreadyCommitted)
case txStateFailed:
return nil, xerrors.WithStackTrace(errTxFailedEarly)
case txStateRollbacked:
return nil, xerrors.WithStackTrace(errTxRollbackedEarly)
default:
Expand All @@ -174,6 +186,8 @@ func (tx *transaction) CommitTx(

response, err = tx.s.tableService.CommitTransaction(ctx, request)
if err != nil {
tx.state.Store(txStateFailed)

return nil, xerrors.WithStackTrace(err)
}

Expand Down Expand Up @@ -206,6 +220,8 @@ func (tx *transaction) Rollback(ctx context.Context) (err error) {
switch tx.state.Load() {
case txStateCommitted:
return nil // nop for committed tx
case txStateFailed:
return xerrors.WithStackTrace(errTxFailedEarly)
case txStateRollbacked:
return xerrors.WithStackTrace(errTxRollbackedEarly)
default:
Expand All @@ -222,6 +238,8 @@ func (tx *transaction) Rollback(ctx context.Context) (err error) {
},
)
if err != nil {
tx.state.Store(txStateFailed)

return xerrors.WithStackTrace(err)
}

Expand Down
15 changes: 5 additions & 10 deletions retry/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,26 +161,21 @@ func DoTx(ctx context.Context, db *sql.DB, op func(context.Context, *sql.Tx) err
}
err := Retry(ctx, func(ctx context.Context) (finalErr error) {
attempts++

tx, err := db.BeginTx(ctx, options.txOptions)
if err != nil {
return unwrapErrBadConn(xerrors.WithStackTrace(err))
}
defer func() {
if finalErr == nil {
return
}
errRollback := tx.Rollback()
if errRollback == nil {
return
if finalErr != nil {
_ = tx.Rollback()
}
finalErr = xerrors.NewWithIssues("",
xerrors.WithStackTrace(finalErr),
xerrors.WithStackTrace(fmt.Errorf("rollback failed: %w", errRollback)),
)
}()

if err = op(xcontext.MarkRetryCall(ctx), tx); err != nil {
return unwrapErrBadConn(xerrors.WithStackTrace(err))
}

if err = tx.Commit(); err != nil {
return unwrapErrBadConn(xerrors.WithStackTrace(err))
}
Expand Down
Loading