Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Remove INDEX_REFRESH_BLOCK after index becomes searchable (#120807)" #121427

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions docs/changelog/120807.yaml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
Expand Down Expand Up @@ -72,7 +70,6 @@

import static org.apache.logging.log4j.Level.DEBUG;
import static org.apache.logging.log4j.Level.ERROR;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_REFRESH_BLOCK;
import static org.elasticsearch.cluster.service.MasterService.isPublishFailureException;
import static org.elasticsearch.core.Strings.format;

Expand Down Expand Up @@ -622,7 +619,6 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
List<TaskContext<StartedShardUpdateTask>> tasksToBeApplied = new ArrayList<>();
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(batchExecutionContext.taskContexts().size());
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
Set<Index> indicesWithUnpromotableShardsStarted = null;
final Map<Index, ClusterStateTimeRanges> updatedTimestampRanges = new HashMap<>();
final ClusterState initialState = batchExecutionContext.initialState();
for (var taskContext : batchExecutionContext.taskContexts()) {
Expand Down Expand Up @@ -741,14 +737,6 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
new ClusterStateTimeRanges(newTimestampMillisRange, newEventIngestedMillisRange)
);
}

if (matched.isPromotableToPrimary() == false
&& initialState.blocks().hasIndexBlock(index.getName(), INDEX_REFRESH_BLOCK)) {
if (indicesWithUnpromotableShardsStarted == null) {
indicesWithUnpromotableShardsStarted = new HashSet<>();
}
indicesWithUnpromotableShardsStarted.add(index);
}
}
}
}
Expand All @@ -772,10 +760,7 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
maybeUpdatedState = ClusterState.builder(maybeUpdatedState).metadata(metadataBuilder).build();
}

maybeUpdatedState = maybeRemoveIndexRefreshBlocks(maybeUpdatedState, indicesWithUnpromotableShardsStarted);

assert assertStartedIndicesHaveCompleteTimestampRanges(maybeUpdatedState);
assert assertRefreshBlockIsNotPresentWhenTheIndexIsSearchable(maybeUpdatedState);

for (final var taskContext : tasksToBeApplied) {
final var task = taskContext.getTask();
Expand All @@ -791,36 +776,6 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
return maybeUpdatedState;
}

private static ClusterState maybeRemoveIndexRefreshBlocks(
ClusterState clusterState,
@Nullable Set<Index> indicesWithUnpromotableShardsStarted
) {
// The provided cluster state must include the newly STARTED unpromotable shards
if (indicesWithUnpromotableShardsStarted == null) {
return clusterState;
}

ClusterBlocks.Builder clusterBlocksBuilder = null;
for (Index indexWithUnpromotableShardsStarted : indicesWithUnpromotableShardsStarted) {
String indexName = indexWithUnpromotableShardsStarted.getName();
assert clusterState.blocks().hasIndexBlock(indexName, INDEX_REFRESH_BLOCK) : indexWithUnpromotableShardsStarted;

var indexRoutingTable = clusterState.routingTable().index(indexWithUnpromotableShardsStarted);
if (indexRoutingTable.readyForSearch()) {
if (clusterBlocksBuilder == null) {
clusterBlocksBuilder = ClusterBlocks.builder(clusterState.blocks());
}
clusterBlocksBuilder.removeIndexBlock(indexName, INDEX_REFRESH_BLOCK);
}
}

if (clusterBlocksBuilder == null) {
return clusterState;
}

return ClusterState.builder(clusterState).blocks(clusterBlocksBuilder).build();
}

private static boolean assertStartedIndicesHaveCompleteTimestampRanges(ClusterState clusterState) {
for (Map.Entry<String, IndexRoutingTable> cursor : clusterState.getRoutingTable().getIndicesRouting().entrySet()) {
assert cursor.getValue().allPrimaryShardsActive() == false
Expand All @@ -844,16 +799,6 @@ private static boolean assertStartedIndicesHaveCompleteTimestampRanges(ClusterSt
return true;
}

private static boolean assertRefreshBlockIsNotPresentWhenTheIndexIsSearchable(ClusterState clusterState) {
for (Map.Entry<String, Set<ClusterBlock>> indexBlock : clusterState.blocks().indices().entrySet()) {
if (indexBlock.getValue().contains(INDEX_REFRESH_BLOCK)) {
assert clusterState.routingTable().index(indexBlock.getKey()).readyForSearch() == false
: "Index [" + indexBlock.getKey() + "] is searchable but has an INDEX_REFRESH_BLOCK";
}
}
return true;
}

@Override
public void clusterStatePublished(ClusterState newClusterState) {
rerouteService.reroute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,34 @@
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry;
import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardLongFieldRange;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithNoShard;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_REFRESH_BLOCK;
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;

public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestCase {
Expand Down Expand Up @@ -488,114 +479,6 @@ public void testExpandsTimestampRangeForReplica() throws Exception {
assertThat(latestIndexMetadata.getEventIngestedRange(), sameInstance(IndexLongFieldRange.UNKNOWN));
}

public void testIndexRefreshBlockIsClearedOnceTheIndexIsReadyToBeSearched() throws Exception {
final var indexName = "test";
final var numberOfShards = randomIntBetween(1, 4);
final var numberOfReplicas = randomIntBetween(1, 4);
var clusterState = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicasWithState(
new String[] { indexName },
numberOfShards,
ShardRouting.Role.INDEX_ONLY,
IntStream.range(0, numberOfReplicas)
.mapToObj(unused -> Tuple.tuple(ShardRoutingState.UNASSIGNED, ShardRouting.Role.SEARCH_ONLY))
.toList()
);

clusterState = ClusterState.builder(clusterState)
.metadata(Metadata.builder(clusterState.metadata()).put(withActiveShardsInSyncAllocationIds(clusterState, indexName)))
.blocks(ClusterBlocks.builder(clusterState.blocks()).addIndexBlock(indexName, INDEX_REFRESH_BLOCK))
.build();

while (clusterState.blocks().hasIndexBlock(indexName, INDEX_REFRESH_BLOCK)) {
clusterState = maybeInitializeUnassignedReplicaShard(clusterState);

final IndexMetadata indexMetadata = clusterState.metadata().index(indexName);

final var initializingReplicaShardOpt = clusterState.routingTable()
.allShards()
.filter(shardRouting -> shardRouting.isPromotableToPrimary() == false)
.filter(shardRouting -> shardRouting.state().equals(ShardRoutingState.INITIALIZING))
.findFirst();

assertThat(clusterState.routingTable().allShards().toList().toString(), initializingReplicaShardOpt.isPresent(), is(true));

var initializingReplicaShard = initializingReplicaShardOpt.get();

final var shardId = initializingReplicaShard.shardId();
final var primaryTerm = indexMetadata.primaryTerm(shardId.id());
final var replicaAllocationId = initializingReplicaShard.allocationId().getId();
final var task = new StartedShardUpdateTask(
new StartedShardEntry(
shardId,
replicaAllocationId,
primaryTerm,
"test",
ShardLongFieldRange.UNKNOWN,
ShardLongFieldRange.UNKNOWN
),
createTestListener()
);

final var resultingState = executeTasks(clusterState, List.of(task));
assertNotSame(clusterState, resultingState);

clusterState = resultingState;
}

var indexRoutingTable = clusterState.routingTable().index(indexName);
assertThat(indexRoutingTable.readyForSearch(), is(true));
for (int i = 0; i < numberOfShards; i++) {
var shardRoutingTable = indexRoutingTable.shard(i);
assertThat(shardRoutingTable, is(notNullValue()));
// Ensure that at least one unpromotable shard is either STARTED or RELOCATING
assertThat(shardRoutingTable.unpromotableShards().isEmpty(), is(false));
}
assertThat(clusterState.blocks().hasIndexBlock(indexName, INDEX_REFRESH_BLOCK), is(false));
}

private static ClusterState maybeInitializeUnassignedReplicaShard(ClusterState clusterState) {
var unassignedShardRoutingOpt = clusterState.routingTable()
.allShards()
.filter(shardRouting -> shardRouting.state().equals(ShardRoutingState.UNASSIGNED))
.findFirst();

if (unassignedShardRoutingOpt.isEmpty()) {
return clusterState;
}

var unassignedShardRouting = unassignedShardRoutingOpt.get();
var initializedShard = unassignedShardRouting.initialize(randomUUID(), null, 1);

RoutingTable routingTable = clusterState.routingTable();
IndexRoutingTable indexRoutingTable = routingTable.index(unassignedShardRouting.getIndexName());
IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex());
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId);
for (int copy = 0; copy < shardRoutingTable.size(); copy++) {
ShardRouting shardRouting = shardRoutingTable.shard(copy);
newIndexRoutingTable.addShard(shardRouting == unassignedShardRouting ? initializedShard : shardRouting);
}
}
routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build();
return ClusterState.builder(clusterState).routingTable(routingTable).build();
}

private static IndexMetadata.Builder withActiveShardsInSyncAllocationIds(ClusterState clusterState, String indexName) {
IndexMetadata.Builder indexMetadataBuilder = new IndexMetadata.Builder(clusterState.metadata().index(indexName));
var indexRoutingTable = clusterState.routingTable().index(indexName);
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable.allShards().toList()) {
indexMetadataBuilder.putInSyncAllocationIds(
indexShardRoutingTable.shardId().getId(),
indexShardRoutingTable.activeShards()
.stream()
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.collect(Collectors.toSet())
);
}
return indexMetadataBuilder;
}

private ClusterState executeTasks(final ClusterState state, final List<StartedShardUpdateTask> tasks) throws Exception {
return ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(state, executor, tasks);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,34 +363,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(
int numberOfShards,
List<ShardRouting.Role> replicaRoles
) {
return stateWithAssignedPrimariesAndReplicasWithState(
indices,
numberOfShards,
replicaRoles.stream().map(role -> Tuple.tuple(ShardRoutingState.STARTED, role)).toList()
);
}

/**
* Creates cluster state with several indexes, shards and replicas (with given roles and state) and all primary shards STARTED.
*/
public static ClusterState stateWithAssignedPrimariesAndReplicasWithState(
String[] indices,
int numberOfShards,
List<Tuple<ShardRoutingState, ShardRouting.Role>> replicaRoleAndStates
) {
return stateWithAssignedPrimariesAndReplicasWithState(indices, numberOfShards, ShardRouting.Role.DEFAULT, replicaRoleAndStates);
}

/**
* Creates cluster state with several indexes, shards and replicas (with given roles and state) and all primary shards STARTED.
*/
public static ClusterState stateWithAssignedPrimariesAndReplicasWithState(
String[] indices,
int numberOfShards,
ShardRouting.Role primaryRole,
List<Tuple<ShardRoutingState, ShardRouting.Role>> replicasStateAndRoles
) {
int numberOfDataNodes = replicasStateAndRoles.size() + 1;
int numberOfDataNodes = replicaRoles.size() + 1;
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
for (int i = 0; i < numberOfDataNodes + 1; i++) {
final DiscoveryNode node = newNode(i);
Expand All @@ -410,7 +383,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicasWithState(
for (String index : indices) {
IndexMetadata indexMetadata = IndexMetadata.builder(index)
.settings(
indexSettings(IndexVersion.current(), numberOfShards, replicasStateAndRoles.size()).put(
indexSettings(IndexVersion.current(), numberOfShards, replicaRoles.size()).put(
SETTING_CREATION_DATE,
System.currentTimeMillis()
)
Expand All @@ -424,19 +397,14 @@ public static ClusterState stateWithAssignedPrimariesAndReplicasWithState(
final ShardId shardId = new ShardId(index, "_na_", i);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = IndexShardRoutingTable.builder(shardId);
indexShardRoutingBuilder.addShard(
shardRoutingBuilder(index, i, newNode(0).getId(), true, ShardRoutingState.STARTED).withRole(primaryRole).build()
TestShardRouting.newShardRouting(index, i, newNode(0).getId(), null, true, ShardRoutingState.STARTED)
);
for (int replica = 0; replica < replicasStateAndRoles.size(); replica++) {
var replicaStateAndRole = replicasStateAndRoles.get(replica);
ShardRoutingState shardRoutingState = replicaStateAndRole.v1();
String currentNodeId = shardRoutingState.equals(ShardRoutingState.UNASSIGNED) ? null : newNode(replica + 1).getId();
var shardRoutingBuilder = shardRoutingBuilder(index, i, currentNodeId, false, shardRoutingState).withRole(
replicaStateAndRole.v2()
for (int replica = 0; replica < replicaRoles.size(); replica++) {
indexShardRoutingBuilder.addShard(
shardRoutingBuilder(index, i, newNode(replica + 1).getId(), false, ShardRoutingState.STARTED).withRole(
replicaRoles.get(replica)
).build()
);
if (shardRoutingState.equals(ShardRoutingState.RELOCATING)) {
shardRoutingBuilder.withRelocatingNodeId(DiscoveryNodeUtils.create("relocating_" + replica).getId());
}
indexShardRoutingBuilder.addShard(shardRoutingBuilder.build());
}
indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder);
}
Expand Down