Skip to content

Commit

Permalink
Retry internally when CAS upload is throttled [GCS] (elastic#120250)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicktindall authored Jan 20, 2025
1 parent 1ba5d25 commit c02292f
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 80 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/120250.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 120250
summary: "Retry internally when CAS upload is throttled [GCS]"
area: Snapshot/Restore
type: enhancement
issues:
- 116546
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.repositories.azure;

import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

Expand All @@ -21,6 +22,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.http.ResponseInjectingHttpHandler;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.repositories.RepositoriesService;
Expand All @@ -46,7 +48,6 @@
import java.util.stream.IntStream;

import static org.elasticsearch.repositories.azure.AbstractAzureServerTestCase.randomBlobContent;
import static org.elasticsearch.repositories.azure.ResponseInjectingAzureHttpHandler.createFailNRequestsHandler;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
Expand All @@ -60,15 +61,15 @@ public class AzureBlobStoreRepositoryMetricsTests extends AzureBlobStoreReposito
);
private static final int MAX_RETRIES = 3;

private final Queue<ResponseInjectingAzureHttpHandler.RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();
private final Queue<ResponseInjectingHttpHandler.RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();

@Override
protected Map<String, HttpHandler> createHttpHandlers() {
Map<String, HttpHandler> httpHandlers = super.createHttpHandlers();
assert httpHandlers.size() == 1 : "This assumes there's a single handler";
return httpHandlers.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new ResponseInjectingAzureHttpHandler(requestHandlers, e.getValue())));
.collect(Collectors.toMap(Map.Entry::getKey, e -> new ResponseInjectingHttpHandler(requestHandlers, e.getValue())));
}

/**
Expand Down Expand Up @@ -106,7 +107,7 @@ public void testThrottleResponsesAreCountedInMetrics() throws IOException {
// Queue up some throttle responses
final int numThrottles = randomIntBetween(1, MAX_RETRIES);
IntStream.range(0, numThrottles)
.forEach(i -> requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS)));
.forEach(i -> requestHandlers.offer(new ResponseInjectingHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS)));

// Check that the blob exists
blobContainer.blobExists(purpose, blobName);
Expand All @@ -132,11 +133,7 @@ public void testRangeNotSatisfiedAreCountedInMetrics() throws IOException {

// Queue up a range-not-satisfied error
requestHandlers.offer(
new ResponseInjectingAzureHttpHandler.FixedRequestHandler(
RestStatus.REQUESTED_RANGE_NOT_SATISFIED,
null,
GET_BLOB_REQUEST_PREDICATE
)
new ResponseInjectingHttpHandler.FixedRequestHandler(RestStatus.REQUESTED_RANGE_NOT_SATISFIED, null, GET_BLOB_REQUEST_PREDICATE)
);

// Attempt to read the blob
Expand Down Expand Up @@ -169,7 +166,7 @@ public void testErrorResponsesAreCountedInMetrics() throws IOException {
if (status == RestStatus.TOO_MANY_REQUESTS) {
throttles.incrementAndGet();
}
requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(status));
requestHandlers.offer(new ResponseInjectingHttpHandler.FixedRequestHandler(status));
});

// Check that the blob exists
Expand Down Expand Up @@ -265,7 +262,7 @@ public void testBatchDeleteFailure() throws IOException {
clearMetrics(dataNodeName);

// Handler will fail one or more of the batch requests
final ResponseInjectingAzureHttpHandler.RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches);
final ResponseInjectingHttpHandler.RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches);

// Exhaust the retries
IntStream.range(0, (numberOfBatches - failedBatches) + (failedBatches * (MAX_RETRIES + 1)))
Expand Down Expand Up @@ -308,6 +305,35 @@ private MetricsAsserter metricsAsserter(
return new MetricsAsserter(dataNodeName, operationPurpose, operation, repository);
}

/**
* Creates a {@link ResponseInjectingHttpHandler.RequestHandler} that will persistently fail the first <code>numberToFail</code>
* distinct requests it sees. Any other requests are passed through to the delegate.
*
* @param numberToFail The number of requests to fail
* @return the handler
*/
private static ResponseInjectingHttpHandler.RequestHandler createFailNRequestsHandler(int numberToFail) {
final List<String> requestsToFail = new ArrayList<>(numberToFail);
return (exchange, delegate) -> {
final Headers requestHeaders = exchange.getRequestHeaders();
final String requestId = requestHeaders.get("X-ms-client-request-id").get(0);
boolean failRequest = false;
synchronized (requestsToFail) {
if (requestsToFail.contains(requestId)) {
failRequest = true;
} else if (requestsToFail.size() < numberToFail) {
requestsToFail.add(requestId);
failRequest = true;
}
}
if (failRequest) {
exchange.sendResponseHeaders(500, -1);
} else {
delegate.handle(exchange);
}
};
}

private class MetricsAsserter {
private final String dataNodeName;
private final OperationPurpose purpose;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.http.ResponseInjectingHttpHandler;
import org.elasticsearch.rest.RestStatus;
import org.junit.Before;

Expand All @@ -34,14 +35,14 @@

public class AzureBlobContainerStatsTests extends AbstractAzureServerTestCase {

private final Queue<ResponseInjectingAzureHttpHandler.RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();
private final Queue<ResponseInjectingHttpHandler.RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();

@SuppressForbidden(reason = "use a http server")
@Before
public void configureAzureHandler() {
httpServer.createContext(
"/",
new ResponseInjectingAzureHttpHandler(
new ResponseInjectingHttpHandler(
requestHandlers,
new AzureHttpHandler(ACCOUNT, CONTAINER, null, MockAzureBlobStore.LeaseExpiryPredicate.NEVER_EXPIRE)
)
Expand All @@ -61,7 +62,7 @@ public void testRetriesAndOperationsAreTrackedSeparately() throws IOException {
for (int i = 0; i < randomIntBetween(10, 50); i++) {
final boolean triggerRetry = randomBoolean();
if (triggerRetry) {
requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS));
requestHandlers.offer(new ResponseInjectingHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS));
}
final AzureBlobStore.Operation operation = randomFrom(supportedOperations);
switch (operation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
Expand Down Expand Up @@ -268,7 +269,8 @@ protected GoogleCloudStorageBlobStore createBlobStore() {
metadata.name(),
storageService,
bigArrays,
randomIntBetween(1, 8) * 1024
randomIntBetween(1, 8) * 1024,
BackoffPolicy.noBackoff()
) {
@Override
long getLargeBlobThresholdInBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
Expand All @@ -41,6 +42,7 @@
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -105,14 +107,16 @@ class GoogleCloudStorageBlobStore implements BlobStore {
private final GoogleCloudStorageOperationsStats stats;
private final int bufferSize;
private final BigArrays bigArrays;
private final BackoffPolicy casBackoffPolicy;

GoogleCloudStorageBlobStore(
String bucketName,
String clientName,
String repositoryName,
GoogleCloudStorageService storageService,
BigArrays bigArrays,
int bufferSize
int bufferSize,
BackoffPolicy casBackoffPolicy
) {
this.bucketName = bucketName;
this.clientName = clientName;
Expand All @@ -121,6 +125,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
this.bigArrays = bigArrays;
this.stats = new GoogleCloudStorageOperationsStats(bucketName);
this.bufferSize = bufferSize;
this.casBackoffPolicy = casBackoffPolicy;
}

private Storage client() throws IOException {
Expand Down Expand Up @@ -691,28 +696,46 @@ OptionalBytesReference compareAndExchangeRegister(
.setMd5(Base64.getEncoder().encodeToString(MessageDigests.digest(updated, MessageDigests.md5())))
.build();
final var bytesRef = updated.toBytesRef();
try {
SocketAccess.doPrivilegedVoidIOException(
() -> client().create(
blobInfo,
bytesRef.bytes,
bytesRef.offset,
bytesRef.length,
Storage.BlobTargetOption.generationMatch()
)
);
} catch (Exception e) {
final var serviceException = unwrapServiceException(e);
if (serviceException != null) {

final Iterator<TimeValue> retries = casBackoffPolicy.iterator();
BaseServiceException finalException = null;
while (true) {
try {
SocketAccess.doPrivilegedVoidIOException(
() -> client().create(
blobInfo,
bytesRef.bytes,
bytesRef.offset,
bytesRef.length,
Storage.BlobTargetOption.generationMatch()
)
);
return OptionalBytesReference.of(expected);
} catch (Exception e) {
final var serviceException = unwrapServiceException(e);
if (serviceException == null) {
throw e;
}
final var statusCode = serviceException.getCode();
if (statusCode == RestStatus.PRECONDITION_FAILED.getStatus() || statusCode == RestStatus.TOO_MANY_REQUESTS.getStatus()) {
if (statusCode == RestStatus.PRECONDITION_FAILED.getStatus()) {
return OptionalBytesReference.MISSING;
}
if (statusCode == RestStatus.TOO_MANY_REQUESTS.getStatus()) {
finalException = ExceptionsHelper.useOrSuppress(finalException, serviceException);
if (retries.hasNext()) {
try {
// noinspection BusyWait
Thread.sleep(retries.next().millis());
} catch (InterruptedException iex) {
Thread.currentThread().interrupt();
finalException.addSuppressed(iex);
}
} else {
throw finalException;
}
}
}
throw e;
}

return OptionalBytesReference.of(expected);
}

private static BaseServiceException unwrapServiceException(Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
Expand Down Expand Up @@ -56,10 +58,33 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
);
static final Setting<String> CLIENT_NAME = Setting.simpleString("client", "default");

/**
* We will retry CASes that fail due to throttling. We use an {@link BackoffPolicy#linearBackoff(TimeValue, int, TimeValue)}
* with the following parameters
*/
static final Setting<TimeValue> RETRY_THROTTLED_CAS_DELAY_INCREMENT = Setting.timeSetting(
"throttled_cas_retry.delay_increment",
TimeValue.timeValueMillis(100),
TimeValue.ZERO
);
static final Setting<Integer> RETRY_THROTTLED_CAS_MAX_NUMBER_OF_RETRIES = Setting.intSetting(
"throttled_cas_retry.maximum_number_of_retries",
2,
0
);
static final Setting<TimeValue> RETRY_THROTTLED_CAS_MAXIMUM_DELAY = Setting.timeSetting(
"throttled_cas_retry.maximum_delay",
TimeValue.timeValueSeconds(5),
TimeValue.ZERO
);

private final GoogleCloudStorageService storageService;
private final ByteSizeValue chunkSize;
private final String bucket;
private final String clientName;
private final TimeValue retryThrottledCasDelayIncrement;
private final int retryThrottledCasMaxNumberOfRetries;
private final TimeValue retryThrottledCasMaxDelay;

GoogleCloudStorageRepository(
final RepositoryMetadata metadata,
Expand All @@ -83,6 +108,9 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
this.chunkSize = getSetting(CHUNK_SIZE, metadata);
this.bucket = getSetting(BUCKET, metadata);
this.clientName = CLIENT_NAME.get(metadata.settings());
this.retryThrottledCasDelayIncrement = RETRY_THROTTLED_CAS_DELAY_INCREMENT.get(metadata.settings());
this.retryThrottledCasMaxNumberOfRetries = RETRY_THROTTLED_CAS_MAX_NUMBER_OF_RETRIES.get(metadata.settings());
this.retryThrottledCasMaxDelay = RETRY_THROTTLED_CAS_MAXIMUM_DELAY.get(metadata.settings());
logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath(), chunkSize, isCompress());
}

Expand All @@ -105,7 +133,15 @@ private static Map<String, String> buildLocation(RepositoryMetadata metadata) {

@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService, bigArrays, bufferSize);
return new GoogleCloudStorageBlobStore(
bucket,
clientName,
metadata.name(),
storageService,
bigArrays,
bufferSize,
BackoffPolicy.linearBackoff(retryThrottledCasDelayIncrement, retryThrottledCasMaxNumberOfRetries, retryThrottledCasMaxDelay)
);
}

@Override
Expand Down
Loading

0 comments on commit c02292f

Please sign in to comment.