From b2145820d2e967823e3861841fffc12f0e88f153 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Fri, 31 Jan 2025 16:29:58 +0100 Subject: [PATCH] Revert "Remove INDEX_REFRESH_BLOCK after index becomes searchable (#120807)" This reverts commit ae0f1a64b571c319e33a24bc8a05a1fa1d1668b9. The refresh block would be removed in a subsequent cluster state update instead of removing it immediately after an index is ready for searches. Closes ES-10697 --- docs/changelog/120807.yaml | 5 - .../action/shard/ShardStateAction.java | 55 -------- ...dStartedClusterStateTaskExecutorTests.java | 117 ------------------ .../ClusterStateCreationUtils.java | 48 ++----- 4 files changed, 8 insertions(+), 217 deletions(-) delete mode 100644 docs/changelog/120807.yaml diff --git a/docs/changelog/120807.yaml b/docs/changelog/120807.yaml deleted file mode 100644 index 02083be207846..0000000000000 --- a/docs/changelog/120807.yaml +++ /dev/null @@ -1,5 +0,0 @@ -pr: 120807 -summary: Remove INDEX_REFRESH_BLOCK after index becomes searchable -area: CRUD -type: enhancement -issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 388baca6c1048..ed6ca57d67b25 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -23,8 +23,6 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.NotMasterException; -import org.elasticsearch.cluster.block.ClusterBlock; -import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; @@ -72,7 +70,6 @@ import static org.apache.logging.log4j.Level.DEBUG; import static org.apache.logging.log4j.Level.ERROR; -import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_REFRESH_BLOCK; import static org.elasticsearch.cluster.service.MasterService.isPublishFailureException; import static org.elasticsearch.core.Strings.format; @@ -622,7 +619,6 @@ public ClusterState execute(BatchExecutionContext batchE List> tasksToBeApplied = new ArrayList<>(); List shardRoutingsToBeApplied = new ArrayList<>(batchExecutionContext.taskContexts().size()); Set seenShardRoutings = new HashSet<>(); // to prevent duplicates - Set indicesWithUnpromotableShardsStarted = null; final Map updatedTimestampRanges = new HashMap<>(); final ClusterState initialState = batchExecutionContext.initialState(); for (var taskContext : batchExecutionContext.taskContexts()) { @@ -741,14 +737,6 @@ public ClusterState execute(BatchExecutionContext batchE new ClusterStateTimeRanges(newTimestampMillisRange, newEventIngestedMillisRange) ); } - - if (matched.isPromotableToPrimary() == false - && initialState.blocks().hasIndexBlock(index.getName(), INDEX_REFRESH_BLOCK)) { - if (indicesWithUnpromotableShardsStarted == null) { - indicesWithUnpromotableShardsStarted = new HashSet<>(); - } - indicesWithUnpromotableShardsStarted.add(index); - } } } } @@ -772,10 +760,7 @@ public ClusterState execute(BatchExecutionContext batchE maybeUpdatedState = ClusterState.builder(maybeUpdatedState).metadata(metadataBuilder).build(); } - maybeUpdatedState = maybeRemoveIndexRefreshBlocks(maybeUpdatedState, indicesWithUnpromotableShardsStarted); - assert assertStartedIndicesHaveCompleteTimestampRanges(maybeUpdatedState); - assert assertRefreshBlockIsNotPresentWhenTheIndexIsSearchable(maybeUpdatedState); for (final var taskContext : tasksToBeApplied) { final var task = taskContext.getTask(); @@ -791,36 +776,6 @@ public ClusterState execute(BatchExecutionContext batchE return maybeUpdatedState; } - private static ClusterState maybeRemoveIndexRefreshBlocks( - ClusterState clusterState, - @Nullable Set indicesWithUnpromotableShardsStarted - ) { - // The provided cluster state must include the newly STARTED unpromotable shards - if (indicesWithUnpromotableShardsStarted == null) { - return clusterState; - } - - ClusterBlocks.Builder clusterBlocksBuilder = null; - for (Index indexWithUnpromotableShardsStarted : indicesWithUnpromotableShardsStarted) { - String indexName = indexWithUnpromotableShardsStarted.getName(); - assert clusterState.blocks().hasIndexBlock(indexName, INDEX_REFRESH_BLOCK) : indexWithUnpromotableShardsStarted; - - var indexRoutingTable = clusterState.routingTable().index(indexWithUnpromotableShardsStarted); - if (indexRoutingTable.readyForSearch()) { - if (clusterBlocksBuilder == null) { - clusterBlocksBuilder = ClusterBlocks.builder(clusterState.blocks()); - } - clusterBlocksBuilder.removeIndexBlock(indexName, INDEX_REFRESH_BLOCK); - } - } - - if (clusterBlocksBuilder == null) { - return clusterState; - } - - return ClusterState.builder(clusterState).blocks(clusterBlocksBuilder).build(); - } - private static boolean assertStartedIndicesHaveCompleteTimestampRanges(ClusterState clusterState) { for (Map.Entry cursor : clusterState.getRoutingTable().getIndicesRouting().entrySet()) { assert cursor.getValue().allPrimaryShardsActive() == false @@ -844,16 +799,6 @@ private static boolean assertStartedIndicesHaveCompleteTimestampRanges(ClusterSt return true; } - private static boolean assertRefreshBlockIsNotPresentWhenTheIndexIsSearchable(ClusterState clusterState) { - for (Map.Entry> indexBlock : clusterState.blocks().indices().entrySet()) { - if (indexBlock.getValue().contains(INDEX_REFRESH_BLOCK)) { - assert clusterState.routingTable().index(indexBlock.getKey()).readyForSearch() == false - : "Index [" + indexBlock.getKey() + "] is searchable but has an INDEX_REFRESH_BLOCK"; - } - } - return true; - } - @Override public void clusterStatePublished(ClusterState newClusterState) { rerouteService.reroute( diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java index 6f67898bbbdb0..ca7376a43d718 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java @@ -12,31 +12,24 @@ import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionTestUtils; -import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry; import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardUpdateTask; -import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.routing.AllocationId; -import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.Tuple; import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardLongFieldRange; import java.util.List; -import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -44,11 +37,9 @@ import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithNoShard; -import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_REFRESH_BLOCK; import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestCase { @@ -488,114 +479,6 @@ public void testExpandsTimestampRangeForReplica() throws Exception { assertThat(latestIndexMetadata.getEventIngestedRange(), sameInstance(IndexLongFieldRange.UNKNOWN)); } - public void testIndexRefreshBlockIsClearedOnceTheIndexIsReadyToBeSearched() throws Exception { - final var indexName = "test"; - final var numberOfShards = randomIntBetween(1, 4); - final var numberOfReplicas = randomIntBetween(1, 4); - var clusterState = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicasWithState( - new String[] { indexName }, - numberOfShards, - ShardRouting.Role.INDEX_ONLY, - IntStream.range(0, numberOfReplicas) - .mapToObj(unused -> Tuple.tuple(ShardRoutingState.UNASSIGNED, ShardRouting.Role.SEARCH_ONLY)) - .toList() - ); - - clusterState = ClusterState.builder(clusterState) - .metadata(Metadata.builder(clusterState.metadata()).put(withActiveShardsInSyncAllocationIds(clusterState, indexName))) - .blocks(ClusterBlocks.builder(clusterState.blocks()).addIndexBlock(indexName, INDEX_REFRESH_BLOCK)) - .build(); - - while (clusterState.blocks().hasIndexBlock(indexName, INDEX_REFRESH_BLOCK)) { - clusterState = maybeInitializeUnassignedReplicaShard(clusterState); - - final IndexMetadata indexMetadata = clusterState.metadata().index(indexName); - - final var initializingReplicaShardOpt = clusterState.routingTable() - .allShards() - .filter(shardRouting -> shardRouting.isPromotableToPrimary() == false) - .filter(shardRouting -> shardRouting.state().equals(ShardRoutingState.INITIALIZING)) - .findFirst(); - - assertThat(clusterState.routingTable().allShards().toList().toString(), initializingReplicaShardOpt.isPresent(), is(true)); - - var initializingReplicaShard = initializingReplicaShardOpt.get(); - - final var shardId = initializingReplicaShard.shardId(); - final var primaryTerm = indexMetadata.primaryTerm(shardId.id()); - final var replicaAllocationId = initializingReplicaShard.allocationId().getId(); - final var task = new StartedShardUpdateTask( - new StartedShardEntry( - shardId, - replicaAllocationId, - primaryTerm, - "test", - ShardLongFieldRange.UNKNOWN, - ShardLongFieldRange.UNKNOWN - ), - createTestListener() - ); - - final var resultingState = executeTasks(clusterState, List.of(task)); - assertNotSame(clusterState, resultingState); - - clusterState = resultingState; - } - - var indexRoutingTable = clusterState.routingTable().index(indexName); - assertThat(indexRoutingTable.readyForSearch(), is(true)); - for (int i = 0; i < numberOfShards; i++) { - var shardRoutingTable = indexRoutingTable.shard(i); - assertThat(shardRoutingTable, is(notNullValue())); - // Ensure that at least one unpromotable shard is either STARTED or RELOCATING - assertThat(shardRoutingTable.unpromotableShards().isEmpty(), is(false)); - } - assertThat(clusterState.blocks().hasIndexBlock(indexName, INDEX_REFRESH_BLOCK), is(false)); - } - - private static ClusterState maybeInitializeUnassignedReplicaShard(ClusterState clusterState) { - var unassignedShardRoutingOpt = clusterState.routingTable() - .allShards() - .filter(shardRouting -> shardRouting.state().equals(ShardRoutingState.UNASSIGNED)) - .findFirst(); - - if (unassignedShardRoutingOpt.isEmpty()) { - return clusterState; - } - - var unassignedShardRouting = unassignedShardRoutingOpt.get(); - var initializedShard = unassignedShardRouting.initialize(randomUUID(), null, 1); - - RoutingTable routingTable = clusterState.routingTable(); - IndexRoutingTable indexRoutingTable = routingTable.index(unassignedShardRouting.getIndexName()); - IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); - for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) { - IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId); - for (int copy = 0; copy < shardRoutingTable.size(); copy++) { - ShardRouting shardRouting = shardRoutingTable.shard(copy); - newIndexRoutingTable.addShard(shardRouting == unassignedShardRouting ? initializedShard : shardRouting); - } - } - routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build(); - return ClusterState.builder(clusterState).routingTable(routingTable).build(); - } - - private static IndexMetadata.Builder withActiveShardsInSyncAllocationIds(ClusterState clusterState, String indexName) { - IndexMetadata.Builder indexMetadataBuilder = new IndexMetadata.Builder(clusterState.metadata().index(indexName)); - var indexRoutingTable = clusterState.routingTable().index(indexName); - for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable.allShards().toList()) { - indexMetadataBuilder.putInSyncAllocationIds( - indexShardRoutingTable.shardId().getId(), - indexShardRoutingTable.activeShards() - .stream() - .map(ShardRouting::allocationId) - .map(AllocationId::getId) - .collect(Collectors.toSet()) - ); - } - return indexMetadataBuilder; - } - private ClusterState executeTasks(final ClusterState state, final List tasks) throws Exception { return ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(state, executor, tasks); } diff --git a/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java index fa2247ddabea0..950c54ddb1d22 100644 --- a/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -363,34 +363,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas( int numberOfShards, List replicaRoles ) { - return stateWithAssignedPrimariesAndReplicasWithState( - indices, - numberOfShards, - replicaRoles.stream().map(role -> Tuple.tuple(ShardRoutingState.STARTED, role)).toList() - ); - } - - /** - * Creates cluster state with several indexes, shards and replicas (with given roles and state) and all primary shards STARTED. - */ - public static ClusterState stateWithAssignedPrimariesAndReplicasWithState( - String[] indices, - int numberOfShards, - List> replicaRoleAndStates - ) { - return stateWithAssignedPrimariesAndReplicasWithState(indices, numberOfShards, ShardRouting.Role.DEFAULT, replicaRoleAndStates); - } - - /** - * Creates cluster state with several indexes, shards and replicas (with given roles and state) and all primary shards STARTED. - */ - public static ClusterState stateWithAssignedPrimariesAndReplicasWithState( - String[] indices, - int numberOfShards, - ShardRouting.Role primaryRole, - List> replicasStateAndRoles - ) { - int numberOfDataNodes = replicasStateAndRoles.size() + 1; + int numberOfDataNodes = replicaRoles.size() + 1; DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); for (int i = 0; i < numberOfDataNodes + 1; i++) { final DiscoveryNode node = newNode(i); @@ -410,7 +383,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicasWithState( for (String index : indices) { IndexMetadata indexMetadata = IndexMetadata.builder(index) .settings( - indexSettings(IndexVersion.current(), numberOfShards, replicasStateAndRoles.size()).put( + indexSettings(IndexVersion.current(), numberOfShards, replicaRoles.size()).put( SETTING_CREATION_DATE, System.currentTimeMillis() ) @@ -424,19 +397,14 @@ public static ClusterState stateWithAssignedPrimariesAndReplicasWithState( final ShardId shardId = new ShardId(index, "_na_", i); IndexShardRoutingTable.Builder indexShardRoutingBuilder = IndexShardRoutingTable.builder(shardId); indexShardRoutingBuilder.addShard( - shardRoutingBuilder(index, i, newNode(0).getId(), true, ShardRoutingState.STARTED).withRole(primaryRole).build() + TestShardRouting.newShardRouting(index, i, newNode(0).getId(), null, true, ShardRoutingState.STARTED) ); - for (int replica = 0; replica < replicasStateAndRoles.size(); replica++) { - var replicaStateAndRole = replicasStateAndRoles.get(replica); - ShardRoutingState shardRoutingState = replicaStateAndRole.v1(); - String currentNodeId = shardRoutingState.equals(ShardRoutingState.UNASSIGNED) ? null : newNode(replica + 1).getId(); - var shardRoutingBuilder = shardRoutingBuilder(index, i, currentNodeId, false, shardRoutingState).withRole( - replicaStateAndRole.v2() + for (int replica = 0; replica < replicaRoles.size(); replica++) { + indexShardRoutingBuilder.addShard( + shardRoutingBuilder(index, i, newNode(replica + 1).getId(), false, ShardRoutingState.STARTED).withRole( + replicaRoles.get(replica) + ).build() ); - if (shardRoutingState.equals(ShardRoutingState.RELOCATING)) { - shardRoutingBuilder.withRelocatingNodeId(DiscoveryNodeUtils.create("relocating_" + replica).getId()); - } - indexShardRoutingBuilder.addShard(shardRoutingBuilder.build()); } indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder); }