Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify and speedup handling of AliasFilter logic in search #120446

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ public final void execute(Runnable command) {
* tiebreak results with identical sort values
*/
protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shardIt, int shardIndex) {
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
AliasFilter filter = aliasFilter.getOrDefault(shardIt.shardId().getIndex().getUUID(), AliasFilter.EMPTY);
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
ShardSearchRequest shardRequest = new ShardSearchRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,16 +376,13 @@ private void finishPhase() {
private static final float DEFAULT_INDEX_BOOST = 1.0f;

public CanMatchNodeRequest.Shard buildShardLevelRequest(SearchShardIterator shardIt) {
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
int shardRequestIndex = shardItIndexMap.get(shardIt);
return new CanMatchNodeRequest.Shard(
shardIt.getOriginalIndices().indices(),
shardIt.shardId(),
shardRequestIndex,
filter,
indexBoost,
aliasFilter.getOrDefault(shardIt.shardId().getIndex().getUUID(), AliasFilter.EMPTY),
concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST),
shardIt.getSearchContextId(),
shardIt.getSearchContextKeepAlive(),
ShardSearchRequest.computeWaitForCheckpoint(request.getWaitForCheckpoints(), shardIt.shardId(), shardRequestIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,17 @@ private static boolean hasDataStreamRef(ClusterState clusterState, Set<String> i
Map<String, AliasFilter> buildIndexAliasFilters(
ClusterState clusterState,
Set<ResolvedExpression> indicesAndAliases,
Index[] concreteIndices
Index[] concreteIndices,
boolean filterEmpty
) {
final Map<String, AliasFilter> aliasFilterMap = new HashMap<>();
for (Index index : concreteIndices) {
clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index.getName());
AliasFilter aliasFilter = searchService.buildAliasFilter(clusterState, index.getName(), indicesAndAliases);
assert aliasFilter != null;
aliasFilterMap.put(index.getUUID(), aliasFilter);
if (filterEmpty == false || aliasFilter != AliasFilter.EMPTY) {
aliasFilterMap.put(index.getUUID(), aliasFilter);
}
}
return aliasFilterMap;
}
Expand Down Expand Up @@ -1087,7 +1090,7 @@ static List<SearchShardIterator> getRemoteShardsIterator(
// add the cluster name to the remote index names for indices disambiguation
// this ends up in the hits returned with the search response
ShardId shardId = searchShardsGroup.shardId();
AliasFilter aliasFilter = aliasFilterMap.get(shardId.getIndex().getUUID());
AliasFilter aliasFilter = aliasFilterMap.getOrDefault(shardId.getIndex().getUUID(), AliasFilter.EMPTY);
String[] aliases = aliasFilter.getAliases();
String clusterAlias = entry.getKey();
String[] finalIndices = aliases.length == 0 ? new String[] { shardId.getIndexName() } : aliases;
Expand Down Expand Up @@ -1247,7 +1250,7 @@ private void executeSearch(
clusterState,
searchRequest.indices()
);
aliasFilter = buildIndexAliasFilters(clusterState, indicesAndAliases, indices);
aliasFilter = buildIndexAliasFilters(clusterState, indicesAndAliases, indices, true);
aliasFilter.putAll(remoteAliasMap);
localShardIterators = getLocalShardsIterator(
clusterState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
final Map<String, AliasFilter> aliasFilters = transportSearchAction.buildIndexAliasFilters(
clusterState,
indicesAndAliases,
concreteIndices
concreteIndices,
false
);
String[] concreteIndexNames = Arrays.stream(concreteIndices).map(Index::getName).toArray(String[]::new);
GroupShardsIterator<SearchShardIterator> shardIts = GroupShardsIterator.sortAndCreate(
Expand Down
87 changes: 45 additions & 42 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -49,6 +47,7 @@
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
Expand Down Expand Up @@ -1717,52 +1716,56 @@ interface IndexDeletionAllowedPredicate {
private final IndexDeletionAllowedPredicate ALWAYS_TRUE = (Index index, IndexSettings indexSettings) -> true;

public AliasFilter buildAliasFilter(ClusterState state, String index, Set<ResolvedExpression> resolvedExpressions) {
/* Being static, parseAliasFilter doesn't have access to whatever guts it needs to parse a query. Instead of passing in a bunch
* of dependencies we pass in a function that can perform the parsing. */
CheckedFunction<BytesReference, QueryBuilder, IOException> filterParser = bytes -> {
try (
XContentParser parser = XContentHelper.createParserNotCompressed(parserConfig, bytes, XContentHelper.xContentType(bytes))
) {
return parseTopLevelQuery(parser);
}
};
String[] aliases = indexNameExpressionResolver.filteringAliases(state, index, resolvedExpressions);
if (aliases == null) {
return AliasFilter.EMPTY;
}

Metadata metadata = state.metadata();
IndexAbstraction ia = state.metadata().getIndicesLookup().get(index);
DataStream dataStream = ia.getParentDataStream();
if (dataStream != null) {
String dataStreamName = dataStream.getName();
List<QueryBuilder> filters = Arrays.stream(aliases)
.map(name -> metadata.dataStreamAliases().get(name))
.filter(dataStreamAlias -> dataStreamAlias.getFilter(dataStreamName) != null)
.map(dataStreamAlias -> {
try {
return filterParser.apply(dataStreamAlias.getFilter(dataStreamName).uncompressed());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
})
.toList();
if (filters.isEmpty()) {
return AliasFilter.of(null, aliases);
} else {
if (filters.size() == 1) {
return AliasFilter.of(filters.get(0), aliases);
} else {
BoolQueryBuilder bool = new BoolQueryBuilder();
for (QueryBuilder filter : filters) {
bool.should(filter);
}
return AliasFilter.of(bool, aliases);
return doBuildAliasFilter(state, index, aliases);
}

private AliasFilter doBuildAliasFilter(ClusterState state, String index, String[] aliases) {
DataStream dataStream = state.metadata().getIndicesLookup().get(index).getParentDataStream();
if (dataStream == null) {
return AliasFilter.of(ShardSearchRequest.parseAliasFilter(this::parseFilter, state.metadata().index(index), aliases), aliases);
}
var dataStreamAliases = state.metadata().dataStreamAliases();
String dataStreamName = dataStream.getName();
List<QueryBuilder> filters = Arrays.stream(aliases)
.map(dataStreamAliases::get)
.filter(dataStreamAlias -> dataStreamAlias.getFilter(dataStreamName) != null)
.map(dataStreamAlias -> {
try {
return parseFilter(dataStreamAlias.getFilter(dataStreamName));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
} else {
IndexMetadata indexMetadata = metadata.index(index);
return AliasFilter.of(ShardSearchRequest.parseAliasFilter(filterParser, indexMetadata, aliases), aliases);
})
.toList();

if (filters.isEmpty()) {
return AliasFilter.of(null, aliases);
}
if (filters.size() == 1) {
return AliasFilter.of(filters.get(0), aliases);
}
BoolQueryBuilder bool = new BoolQueryBuilder();
for (QueryBuilder filter : filters) {
bool.should(filter);
}
return AliasFilter.of(bool, aliases);
}

private QueryBuilder parseFilter(CompressedXContent bytes) throws IOException {
var uncompressed = bytes.uncompressed();
try (
XContentParser parser = XContentHelper.createParserNotCompressed(
parserConfig,
uncompressed,
XContentHelper.xContentType(uncompressed)
)
) {
return parseTopLevelQuery(parser);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -621,7 +622,7 @@ public Rewriteable rewrite(QueryRewriteContext ctx) throws IOException {
* Returns {@code null} if no filtering is required.</p>
*/
public static QueryBuilder parseAliasFilter(
CheckedFunction<BytesReference, QueryBuilder, IOException> filterParser,
CheckedFunction<CompressedXContent, QueryBuilder, IOException> filterParser,
IndexMetadata metadata,
String... aliasNames
) {
Expand All @@ -635,7 +636,7 @@ public static QueryBuilder parseAliasFilter(
return null;
}
try {
return filterParser.apply(alias.filter().uncompressed());
return filterParser.apply(alias.filter());
} catch (IOException ex) {
throw new AliasFilterParsingException(index, alias.getAlias(), "Invalid alias filter", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ private IndexMetadata add(IndexMetadata indexMetadata, String alias, @Nullable C
public QueryBuilder aliasFilter(IndexMetadata indexMetadata, String... aliasNames) {
return ShardSearchRequest.parseAliasFilter(bytes -> {
try (
InputStream inputStream = bytes.streamInput();
InputStream inputStream = bytes.uncompressed().streamInput();
XContentParser parser = XContentFactory.xContentType(inputStream)
.xContent()
.createParser(xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputStream)
Expand Down