Skip to content

Commit

Permalink
* Skipped explicit Rollback of transaction on errors (server-side a…
Browse files Browse the repository at this point in the history
…utomatically rolled back transactions on errors)
  • Loading branch information
asmyasnikov committed May 16, 2024
1 parent 51e23f0 commit 006f25a
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
* Fixed race of stop internal processes on close topic writer
* Fixed goroutines leak within topic reader on network problems
* Skipped explicit `Rollback` of transaction on errors (server-side automatically rolled back transactions on errors)

## v3.67.0
* Added `ydb.WithNodeAddressMutator` experimental option for mutate node addresses from `discovery.ListEndpoints` response
Expand Down
14 changes: 0 additions & 14 deletions internal/table/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,20 +704,6 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O
return xerrors.WithStackTrace(err)
}

defer func() {
if err != nil {
errRollback := tx.Rollback(ctx)
if errRollback != nil {
err = xerrors.NewWithIssues("",
xerrors.WithStackTrace(err),
xerrors.WithStackTrace(errRollback),
)
} else {
err = xerrors.WithStackTrace(err)
}
}
}()

err = func() error {
if panicCallback := c.config.PanicCallback(); panicCallback != nil {
defer func() {
Expand Down
18 changes: 18 additions & 0 deletions internal/table/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
var (
errTxAlreadyCommitted = xerrors.Wrap(fmt.Errorf("transaction already committed"))
errTxRollbackedEarly = xerrors.Wrap(fmt.Errorf("transaction rollbacked early"))
errTxFailedEarly = xerrors.Wrap(fmt.Errorf("transaction failed early"))
)

type txState struct {
Expand All @@ -42,6 +43,7 @@ const (
txStateInitialized txStateEnum = iota
txStateCommitted
txStateRollbacked
txStateFailed
)

type transaction struct {
Expand Down Expand Up @@ -73,11 +75,15 @@ func (tx *transaction) Execute(
switch tx.state.Load() {
case txStateCommitted:
return nil, xerrors.WithStackTrace(errTxAlreadyCommitted)
case txStateFailed:
return nil, xerrors.WithStackTrace(errTxFailedEarly)
case txStateRollbacked:
return nil, xerrors.WithStackTrace(errTxRollbackedEarly)
default:
_, r, err = tx.s.Execute(ctx, tx.control, query, parameters, opts...)
if err != nil {
tx.state.Store(txStateFailed)

return nil, xerrors.WithStackTrace(err)
}

Expand Down Expand Up @@ -115,11 +121,15 @@ func (tx *transaction) ExecuteStatement(
switch tx.state.Load() {
case txStateCommitted:
return nil, xerrors.WithStackTrace(errTxAlreadyCommitted)
case txStateFailed:
return nil, xerrors.WithStackTrace(errTxFailedEarly)
case txStateRollbacked:
return nil, xerrors.WithStackTrace(errTxRollbackedEarly)
default:
_, r, err = stmt.Execute(ctx, tx.control, parameters, opts...)
if err != nil {
tx.state.Store(txStateFailed)

return nil, xerrors.WithStackTrace(err)
}

Expand Down Expand Up @@ -148,6 +158,8 @@ func (tx *transaction) CommitTx(
switch tx.state.Load() {
case txStateCommitted:
return nil, xerrors.WithStackTrace(errTxAlreadyCommitted)
case txStateFailed:
return nil, xerrors.WithStackTrace(errTxFailedEarly)
case txStateRollbacked:
return nil, xerrors.WithStackTrace(errTxRollbackedEarly)
default:
Expand All @@ -174,6 +186,8 @@ func (tx *transaction) CommitTx(

response, err = tx.s.tableService.CommitTransaction(ctx, request)
if err != nil {
tx.state.Store(txStateFailed)

return nil, xerrors.WithStackTrace(err)
}

Expand Down Expand Up @@ -206,6 +220,8 @@ func (tx *transaction) Rollback(ctx context.Context) (err error) {
switch tx.state.Load() {
case txStateCommitted:
return nil // nop for committed tx
case txStateFailed:
return xerrors.WithStackTrace(errTxFailedEarly)
case txStateRollbacked:
return xerrors.WithStackTrace(errTxRollbackedEarly)
default:
Expand All @@ -222,6 +238,8 @@ func (tx *transaction) Rollback(ctx context.Context) (err error) {
},
)
if err != nil {
tx.state.Store(txStateFailed)

return xerrors.WithStackTrace(err)
}

Expand Down
16 changes: 3 additions & 13 deletions retry/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,26 +161,16 @@ func DoTx(ctx context.Context, db *sql.DB, op func(context.Context, *sql.Tx) err
}
err := Retry(ctx, func(ctx context.Context) (finalErr error) {
attempts++

tx, err := db.BeginTx(ctx, options.txOptions)
if err != nil {
return unwrapErrBadConn(xerrors.WithStackTrace(err))
}
defer func() {
if finalErr == nil {
return
}
errRollback := tx.Rollback()
if errRollback == nil {
return
}
finalErr = xerrors.NewWithIssues("",
xerrors.WithStackTrace(finalErr),
xerrors.WithStackTrace(fmt.Errorf("rollback failed: %w", errRollback)),
)
}()

if err = op(xcontext.MarkRetryCall(ctx), tx); err != nil {
return unwrapErrBadConn(xerrors.WithStackTrace(err))
}

if err = tx.Commit(); err != nil {
return unwrapErrBadConn(xerrors.WithStackTrace(err))
}
Expand Down

0 comments on commit 006f25a

Please sign in to comment.