Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use InMemoryNoOpCommitDirectory for archives indices only #121210

Merged
merged 4 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3209,7 +3209,12 @@ void checkIndex() throws IOException {
try {
doCheckIndex();
} catch (IOException e) {
store.markStoreCorrupted(e);
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class) != null) {
// Cache-based read operations on Lucene files can throw an AlreadyClosedException wrapped into an IOException in case
// of evictions. We don't want to mark the store as corrupted for this.
} else {
store.markStoreCorrupted(e);
}
throw e;
} finally {
store.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
package org.elasticsearch.xpack.searchablesnapshots;

import org.apache.lucene.search.TotalHits;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest;
import org.elasticsearch.action.admin.cluster.allocation.TransportClusterAllocationExplainAction;
Expand Down Expand Up @@ -72,14 +69,11 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_RECOVERY_STATE_FACTORY_KEY;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.sameInstance;
Expand Down Expand Up @@ -258,22 +252,8 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception {

// the original shard size from the snapshot
final long originalSize = snapshotShards.get(shardRouting.getId()).getStats().getTotalSize();
totalExpectedSize += originalSize;

final Directory unwrappedDir = FilterDirectory.unwrap(
internalCluster().getInstance(IndicesService.class, getDiscoveryNodes().resolveNode(shardRouting.currentNodeId()).getName())
.indexServiceSafe(shardRouting.index())
.getShard(shardRouting.getId())
.store()
.directory()
);
assertThat(shardRouting.toString(), unwrappedDir, notNullValue());
assertThat(shardRouting.toString(), unwrappedDir, instanceOf(ByteBuffersDirectory.class));

final ByteBuffersDirectory inMemoryDir = (ByteBuffersDirectory) unwrappedDir;
assertThat(inMemoryDir.listAll(), arrayWithSize(0));

assertThat(shardRouting.toString(), store.totalDataSetSizeInBytes(), equalTo(originalSize));
totalExpectedSize += originalSize;
}

final StoreStats store = indicesStatsResponse.getTotal().getStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.blobcache.shared.SharedBlobCacheService;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
Expand All @@ -34,6 +35,7 @@
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
Expand Down Expand Up @@ -277,6 +279,10 @@ private BlobStoreIndexShardSnapshot.FileInfo fileInfo(final String name) throws
@Override
public final String[] listAll() {
ensureOpen();
return listAllFiles();
}

private String[] listAllFiles() {
return files().stream().map(BlobStoreIndexShardSnapshot.FileInfo::physicalName).sorted(String::compareTo).toArray(String[]::new);
}

Expand All @@ -288,42 +294,39 @@ public final long fileLength(final String name) throws IOException {

@Override
public Set<String> getPendingDeletions() {
throw unsupportedException();
throw unsupportedException("getPendingDeletions");
}

@Override
public void sync(Collection<String> names) {
throw unsupportedException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I am sure I understand this correctly, we still call sync and syncMetadata but do not modify anything? I wonder why these are still called?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syncMetadata is called during peer-recoveries by Store#cleanupAndVerify and if I remember correctly sync is called during snapshots.

Those methods are still called because peer-recovery of searchable snapshots is the same as for regular indices, with the difference that the files on the replica "magically" appears before the recovery starts. We could change that but it was a bit too much work when I tried because the code is intermingled and required to add many "if not searchable snapshot do this" conditions (and also because before we would need to accomodate for searchable snapshots indices before #118606 that creates an additional commit during recovery). It's feasible though, but the code ended up being much less readible.

So I think that relying on SearchableSnapshotDirectory throwing on files change operations would help catching any places where we're changing files whereas after #118606 we should not.

Since I'm not fully confident that we're testing all places I only merged this in 9.1.

}
public void sync(Collection<String> names) {}

@Override
public void syncMetaData() {
throw unsupportedException();
}
public void syncMetaData() {}

@Override
public void deleteFile(String name) {
throw unsupportedException();
throw unsupportedException("deleteFile(" + name + ')');
}

@Override
public IndexOutput createOutput(String name, IOContext context) {
throw unsupportedException();
throw unsupportedException("createOutput(" + name + ", " + context + ')');
}

@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) {
throw unsupportedException();
throw unsupportedException("createTempOutput(" + prefix + ", " + suffix + ", " + context + ')');
}

@Override
public void rename(String source, String dest) {
throw unsupportedException();
throw unsupportedException("rename(" + source + ", " + dest + ')');
}

private static UnsupportedOperationException unsupportedException() {
assert false : "this operation is not supported and should have not be called";
return new UnsupportedOperationException("Searchable snapshot directory does not support this operation");
private UnsupportedOperationException unsupportedException(String description) {
var message = "Searchable snapshot directory does not support the operation [" + description + ']';
assert false : message + ", current directory files: " + Strings.arrayToCommaDelimitedString(this.listAllFiles());
return new UnsupportedOperationException(message);
}

@Override
Expand Down Expand Up @@ -612,24 +615,33 @@ public static Directory create(
final Path cacheDir = CacheService.getShardCachePath(shardPath).resolve(snapshotId.getUUID());
Files.createDirectories(cacheDir);

return new InMemoryNoOpCommitDirectory(
new SearchableSnapshotDirectory(
blobContainerSupplier,
lazySnapshot::getOrCompute,
blobStoreCacheService,
initialRepository.getMetadata().name(),
snapshotId,
indexId,
shardPath.getShardId(),
indexSettings.getSettings(),
currentTimeNanosSupplier,
cache,
cacheDir,
shardPath,
threadPool,
sharedBlobCacheService
)
final var dir = new SearchableSnapshotDirectory(
blobContainerSupplier,
lazySnapshot::getOrCompute,
blobStoreCacheService,
initialRepository.getMetadata().name(),
snapshotId,
indexId,
shardPath.getShardId(),
indexSettings.getSettings(),
currentTimeNanosSupplier,
cache,
cacheDir,
shardPath,
threadPool,
sharedBlobCacheService
);

// Archives indices mounted as searchable snapshots always require a writeable Lucene directory in order to rewrite the segments
// infos file to the latest Lucene version. Similarly, searchable snapshot indices created before 9.0.0 also require a writeable
// directory because in previous versions commits were executed during recovery (to associate translogs with Lucene indices),
// creating additional files that need to be sent and written to replicas during peer-recoveries. From 9.0.0 we merged a change to
// skip commits creation during recovery for searchable snapshots (see https://github.com/elastic/elasticsearch/pull/118606).
var version = IndexMetadata.SETTING_INDEX_VERSION_COMPATIBILITY.get(indexSettings.getSettings());
if (version.before(IndexVersions.UPGRADE_TO_LUCENE_10_0_0) || indexSettings.getIndexVersionCreated().isLegacyIndexVersion()) {
return new InMemoryNoOpCommitDirectory(dir);
}
return dir;
}

public static SearchableSnapshotDirectory unwrapDirectory(Directory dir) {
Expand Down