Skip to content

Commit

Permalink
Add selector syntax to index expressions (#118614)
Browse files Browse the repository at this point in the history
This PR introduces a new syntactical feature to index expression resolution: The selector.

Selectors, denoted with a :: followed by a recognized suffix will allow users to specify which component of
an index abstraction they would like to operate on within an API call. In this case, an index abstraction is a
concrete index, data stream, or alias; Any abstraction that can be resolved to a set of indices/shards. We
define a component of an index abstraction to be some searchable unit of the index abstraction.

(cherry picked from commit c3839e1)

# Conflicts:
#	modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/IngestFailureStoreMetricsIT.java
#	server/src/main/java/org/elasticsearch/TransportVersions.java
#	server/src/test/java/org/elasticsearch/action/OriginalIndicesTests.java
  • Loading branch information
jbaiera committed Jan 9, 2025
1 parent b398448 commit 5482bc0
Show file tree
Hide file tree
Showing 73 changed files with 2,372 additions and 1,066 deletions.
1 change: 0 additions & 1 deletion docs/reference/ml/anomaly-detection/apis/put-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -557,4 +557,3 @@ The API returns the following results:
// TESTRESPONSE[s/"job_version" : "8.4.0"/"job_version" : $body.job_version/]
// TESTRESPONSE[s/1656087283340/$body.$_path/]
// TESTRESPONSE[s/"superuser"/"_es_test_root"/]
// TESTRESPONSE[s/"ignore_throttled" : true/"ignore_throttled" : true,"failure_store":"exclude"/]
13 changes: 13 additions & 0 deletions modules/data-streams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ restResources {

dependencies {
testImplementation project(path: ':test:test-clusters')
testImplementation project(":modules:mapper-extras")
internalClusterTestImplementation project(":modules:mapper-extras")
}

Expand Down Expand Up @@ -71,4 +72,16 @@ tasks.named("yamlRestTestV7CompatTransform").configure({ task ->
task.skipTest("data_stream/200_rollover_failure_store/Lazily roll over a data stream's failure store after a shard failure", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/200_rollover_failure_store/Don't roll over a data stream's failure store when conditions aren't met", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/200_rollover_failure_store/Roll over a data stream's failure store with conditions", "Configuring the failure store via data stream templates is not supported anymore.")

task.skipTest("data_stream/200_rollover_failure_store/Rolling over a failure store on a data stream without the failure store enabled should work", "Rolling over a data stream using target_failure_store is no longer supported.")
task.skipTest("data_stream/200_rollover_failure_store/Rolling over an uninitialized failure store should initialize it", "Rolling over a data stream using target_failure_store is no longer supported.")

task.skipTest("data_stream/210_rollover_failure_store/A failure store marked for lazy rollover should only be rolled over when there is a failure", "Rolling over a data stream using target_failure_store is no longer supported.")
task.skipTest("data_stream/210_rollover_failure_store/Don't roll over a data stream's failure store when conditions aren't met", "Rolling over a data stream using target_failure_store is no longer supported.")
task.skipTest("data_stream/210_rollover_failure_store/Rolling over a failure store on a data stream without the failure store enabled should work", "Rolling over a data stream using target_failure_store is no longer supported.")
task.skipTest("data_stream/210_rollover_failure_store/Rolling over an uninitialized failure store should initialize it", "Rolling over a data stream using target_failure_store is no longer supported.")
task.skipTest("data_stream/210_rollover_failure_store/Roll over a data stream's failure store with conditions", "Rolling over a data stream using target_failure_store is no longer supported.")
task.skipTest("data_stream/210_rollover_failure_store/Lazily roll over a data stream's failure store after an ingest failure", "Rolling over a data stream using target_failure_store is no longer supported.")
task.skipTest("data_stream/210_rollover_failure_store/Lazily roll over a data stream's failure store after a shard failure", "Rolling over a data stream using target_failure_store is no longer supported.")
task.skipTest("data_stream/210_rollover_failure_store/Roll over a data stream's failure store without conditions", "Rolling over a data stream using target_failure_store is no longer supported.")
})
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.support.IndexComponentSelector;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAlias;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -132,10 +134,7 @@ public void setup() throws Exception {
assertTrue(response.isAcknowledged());

// Initialize the failure store.
RolloverRequest rolloverRequest = new RolloverRequest("with-fs", null);
rolloverRequest.setIndicesOptions(
IndicesOptions.builder(rolloverRequest.indicesOptions()).selectorOptions(IndicesOptions.SelectorOptions.FAILURES).build()
);
RolloverRequest rolloverRequest = new RolloverRequest("with-fs::failures", null);
response = client.execute(RolloverAction.INSTANCE, rolloverRequest).get();
assertTrue(response.isAcknowledged());

Expand Down Expand Up @@ -341,7 +340,7 @@ public void testFailureStoreSnapshotAndRestore() throws Exception {
.cluster()
.prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices(dataStreamName)
.setIndices(IndexNameExpressionResolver.combineSelector(dataStreamName, IndexComponentSelector.ALL_APPLICABLE))
.setIncludeGlobalState(false)
.get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.IndexComponentSelector;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -197,9 +198,9 @@ public void testRejectionFromFailureStore() throws IOException {
createDataStream();

// Initialize failure store.
var rolloverRequest = new RolloverRequest(dataStream, null);
rolloverRequest.setIndicesOptions(
IndicesOptions.builder(rolloverRequest.indicesOptions()).selectorOptions(IndicesOptions.SelectorOptions.FAILURES).build()
var rolloverRequest = new RolloverRequest(
IndexNameExpressionResolver.combineSelector(dataStream, IndexComponentSelector.FAILURES),
null
);
var rolloverResponse = client().execute(RolloverAction.INSTANCE, rolloverRequest).actionGet();
var failureStoreIndex = rolloverResponse.getNewIndex();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void setup() throws IOException {

assertOK(client().performRequest(new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME)));
// Initialize the failure store.
assertOK(client().performRequest(new Request("POST", DATA_STREAM_NAME + "/_rollover?target_failure_store")));
assertOK(client().performRequest(new Request("POST", DATA_STREAM_NAME + "::failures/_rollover")));
ensureGreen(DATA_STREAM_NAME);

final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + DATA_STREAM_NAME));
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.datastreams.DataStreamsStatsAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.IndexComponentSelector;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -102,10 +103,11 @@ protected ClusterBlockException checkRequestBlock(

@Override
protected String[] resolveConcreteIndexNames(ClusterState clusterState, DataStreamsStatsAction.Request request) {
return DataStreamsActionUtil.resolveConcreteIndexNames(
return DataStreamsActionUtil.resolveConcreteIndexNamesWithSelector(
indexNameExpressionResolver,
clusterState,
request.indices(),
IndexComponentSelector.ALL_APPLICABLE,
request.indicesOptions()
).toArray(String[]::new);
}
Expand Down Expand Up @@ -163,13 +165,17 @@ protected DataStreamsStatsAction.DataStreamShardStats readShardResult(StreamInpu
request.indicesOptions(),
request.indices()
);
for (String abstractionName : abstractionNames) {
IndexAbstraction indexAbstraction = indicesLookup.get(abstractionName);
for (String abstraction : abstractionNames) {
IndexAbstraction indexAbstraction = indicesLookup.get(abstraction);
assert indexAbstraction != null;
if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM) {
DataStream dataStream = (DataStream) indexAbstraction;
AggregatedStats stats = aggregatedDataStreamsStats.computeIfAbsent(dataStream.getName(), s -> new AggregatedStats());
dataStream.getIndices().stream().map(Index::getName).forEach(index -> {
dataStream.getBackingIndices().getIndices().stream().map(Index::getName).forEach(index -> {
stats.backingIndices.add(index);
allBackingIndices.add(index);
});
dataStream.getFailureIndices().getIndices().stream().map(Index::getName).forEach(index -> {
stats.backingIndices.add(index);
allBackingIndices.add(index);
});
Expand Down
Loading

0 comments on commit 5482bc0

Please sign in to comment.