Skip to content

Commit

Permalink
Immediate retry logic for transient Mongo errors
Browse files Browse the repository at this point in the history
  • Loading branch information
prdoyle committed Jan 27, 2024
1 parent 4696d9e commit b5d7352
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.vena.bosk.drivers.mongo;

import com.mongodb.MongoClientSettings;
import com.mongodb.MongoException;
import com.mongodb.ReadConcern;
import com.mongodb.WriteConcern;
import com.mongodb.client.FindIterable;
Expand Down Expand Up @@ -35,6 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL;
import static io.vena.bosk.drivers.mongo.Formatter.REVISION_ONE;
import static io.vena.bosk.drivers.mongo.Formatter.REVISION_ZERO;
import static io.vena.bosk.drivers.mongo.MappedDiagnosticContext.setupMDC;
Expand Down Expand Up @@ -525,12 +527,31 @@ private MDCScope beginDriverOperation(String description, Object... args) {

private <X extends Exception, Y extends Exception> void doRetryableDriverOperation(RetryableOperation<X,Y> operation, String description, Object... args) throws X,Y {
RetryableOperation<X,Y> operationInSession = () -> {
try (var session = collection.newSession()) {
operation.run();
session.commitTransactionIfAny();
} catch (FailedSessionException e) {
setDisconnectedDriver(e);
throw new DisconnectedException(e);
int immediateRetriesLeft = 2;
while (true) {
try (var session = collection.newSession()) {
operation.run();
session.commitTransactionIfAny();
} catch (FailedSessionException e) {
setDisconnectedDriver(e);
throw new DisconnectedException(e);
} catch (MongoException e) {
if (e.hasErrorLabel(TRANSIENT_TRANSACTION_ERROR_LABEL)) {
if (immediateRetriesLeft >= 1) {
immediateRetriesLeft--;
LOGGER.debug("Transient transaction error; retrying immediately", e);
continue;
} else {
LOGGER.warn("Exhausted immediate retry attempts for transient transaction error", e);
setDisconnectedDriver(e);
throw new DisconnectedException(e);
}
} else {
LOGGER.debug("MongoException is not recoverable; rethrowing", e);
throw e;
}
}
break;
}
};
try (var __ = beginDriverOperation(description, args)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.vena.bosk.drivers.mongo;

import com.mongodb.ClientSessionOptions;
import com.mongodb.MongoException;
import com.mongodb.MongoNamespace;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
Expand Down Expand Up @@ -117,9 +118,25 @@ public Session(boolean isReadOnly) throws FailedSessionException {
* would start a new transaction.
*/
public void commitTransactionIfAny() {
if (clientSession.hasActiveTransaction()) {
int retriesRemaining = 2;
while (clientSession.hasActiveTransaction()) {
LOGGER.debug("Commit transaction");
clientSession.commitTransaction();
try {
clientSession.commitTransaction();
} catch (MongoException e) {
if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
if (retriesRemaining >= 1) {
retriesRemaining--;
LOGGER.debug("Unknown transaction commit result; retrying the commit", e);
} else {
LOGGER.debug("Exhausted commit retry attempts", e);
throw e;
}
} else {
LOGGER.debug("Can't retry commit; rethrowing", e);
throw e;
}
}
}
}

Expand Down Expand Up @@ -169,6 +186,8 @@ public void ensureTransactionStarted() {
* this can be called first to ensure a successful commit before proceeding.
*/
public void commitTransaction() {
// TODO: Can we eliminate this and use commitTransactionIfAny exclusively?
// This one doesn't even have the retry logic.
LOGGER.debug("Commit transaction");
currentSession().commitTransaction();
}
Expand Down

0 comments on commit b5d7352

Please sign in to comment.