Skip to content

Commit

Permalink
Run cluster info requests on local node
Browse files Browse the repository at this point in the history
  • Loading branch information
nielsbauman committed Jan 17, 2025
1 parent 9782179 commit cebcff1
Show file tree
Hide file tree
Showing 18 changed files with 62 additions and 413 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ public void testLargeClusterStatePublishing() throws Exception {
MappingMetadata mappingMetadata = client.admin()
.indices()
.prepareGetMappings(TEST_REQUEST_TIMEOUT, "test")
.setLocal(true)
.get()
.getMappings()
.get("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Collections;
Expand All @@ -49,23 +48,13 @@ public class TransportGetIndexAction extends TransportClusterInfoAction<GetIndex
public TransportGetIndexAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
SettingsFilter settingsFilter,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService,
IndexScopedSettings indexScopedSettings
) {
super(
GetIndexAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
GetIndexRequest::new,
indexNameExpressionResolver,
GetIndexResponse::new
);
super(GetIndexAction.NAME, transportService, clusterService, actionFilters, GetIndexRequest::new, indexNameExpressionResolver);
this.indicesService = indicesService;
this.settingsFilter = settingsFilter;
this.indexScopedSettings = indexScopedSettings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@

package org.elasticsearch.action.admin.indices.mapping.get;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;

Expand All @@ -35,21 +33,6 @@ public GetMappingsResponse(Map<String, MappingMetadata> mappings) {
this.mappings = mappings;
}

GetMappingsResponse(StreamInput in) throws IOException {
super(in);
mappings = in.readImmutableMap(in.getTransportVersion().before(TransportVersions.V_8_0_0) ? i -> {
int mappingCount = i.readVInt();
assert mappingCount == 1 || mappingCount == 0 : "Expected 0 or 1 mappings but got " + mappingCount;
if (mappingCount == 1) {
String type = i.readString();
assert MapperService.SINGLE_MAPPING_NAME.equals(type) : "Expected type [_doc] but got [" + type + "]";
return new MappingMetadata(i);
} else {
return MappingMetadata.EMPTY_MAPPINGS;
}
} : i -> i.readBoolean() ? new MappingMetadata(i) : MappingMetadata.EMPTY_MAPPINGS);
}

public Map<String, MappingMetadata> mappings() {
return mappings;
}
Expand All @@ -58,6 +41,11 @@ public Map<String, MappingMetadata> getMappings() {
return mappings();
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Override
public void writeTo(StreamOutput out) throws IOException {
MappingMetadata.writeMappingMetadata(out, mappings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Map;
Expand All @@ -38,7 +37,6 @@ public class TransportGetMappingsAction extends TransportClusterInfoAction<GetMa
public TransportGetMappingsAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService
Expand All @@ -47,11 +45,9 @@ public TransportGetMappingsAction(
GetMappingsAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
GetMappingsRequest::new,
indexNameExpressionResolver,
GetMappingsResponse::new
indexNameExpressionResolver
);
this.indicesService = indicesService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected LocalClusterStateRequest(StreamInput in) throws IOException {
}

@Override
public final void writeTo(StreamOutput out) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
TransportAction.localOnly();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;

import java.io.IOException;

public abstract class ClusterInfoRequest<Request extends ClusterInfoRequest<Request>> extends MasterNodeReadRequest<Request>
public abstract class ClusterInfoRequest<Request extends ClusterInfoRequest<Request>> extends LocalClusterStateRequest
implements
IndicesRequest.Replaceable {

Expand All @@ -37,18 +37,11 @@ public ClusterInfoRequest(TimeValue masterTimeout, IndicesOptions indicesOptions
this.indicesOptions = indicesOptions;
}

@Deprecated(forRemoval = true)
public ClusterInfoRequest() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
}

// So subclasses can override the default indices options, if needed
@Deprecated(forRemoval = true)
protected ClusterInfoRequest(IndicesOptions indicesOptions) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
this.indicesOptions = indicesOptions;
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
public ClusterInfoRequest(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
Expand All @@ -58,16 +51,6 @@ public ClusterInfoRequest(StreamInput in) throws IOException {
indicesOptions = IndicesOptions.readIndicesOptions(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
if (out.getTransportVersion().before(TransportVersions.V_8_0_0)) {
out.writeStringArray(Strings.EMPTY_ARRAY);
}
indicesOptions.writeIndicesOptions(out);
}

@Override
@SuppressWarnings("unchecked")
public Request indices(String... indices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,17 @@
*/
package org.elasticsearch.action.support.master.info;

import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
import org.elasticsearch.client.internal.ElasticsearchClient;
import org.elasticsearch.common.util.ArrayUtils;

public abstract class ClusterInfoRequestBuilder<
Request extends ClusterInfoRequest<Request>,
Response extends ActionResponse,
Builder extends ClusterInfoRequestBuilder<Request, Response, Builder>> extends MasterNodeReadOperationRequestBuilder<
Request,
Response,
Builder> {
Builder extends ClusterInfoRequestBuilder<Request, Response, Builder>> extends ActionRequestBuilder<Request, Response> {

protected ClusterInfoRequestBuilder(ElasticsearchClient client, ActionType<Response> action, Request request) {
super(client, action, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,48 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

public abstract class TransportClusterInfoAction<Request extends ClusterInfoRequest<Request>, Response extends ActionResponse> extends
TransportMasterNodeReadAction<Request, Response> {
TransportLocalClusterStateAction<Request, Response> {

protected final IndexNameExpressionResolver indexNameExpressionResolver;

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@SuppressWarnings("this-escape")
public TransportClusterInfoAction(
String actionName,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver,
Writeable.Reader<Response> response
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
super(actionName, actionFilters, transportService.getTaskManager(), clusterService, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.indexNameExpressionResolver = indexNameExpressionResolver;

transportService.registerRequestHandler(
actionName,
transportService,
clusterService,
threadPool,
actionFilters,
executor,
false,
true,
request,
indexNameExpressionResolver,
response,
threadPool.executor(ThreadPool.Names.MANAGEMENT)
(r, channel, task) -> executeDirect(task, r, new ChannelActionListener<>(channel))
);
}

Expand All @@ -55,7 +63,7 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
}

@Override
protected final void masterOperation(
protected final void localClusterStateOperation(
Task task,
final Request request,
final ClusterState state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
Expand All @@ -27,7 +28,6 @@

import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.HEAD;
import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout;

/**
* The REST handler for get index and head index APIs.
Expand All @@ -48,10 +48,10 @@ public String getName() {
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
final GetIndexRequest getIndexRequest = new GetIndexRequest(getMasterNodeTimeout(request));
final GetIndexRequest getIndexRequest = new GetIndexRequest(RestUtils.getMasterNodeTimeout(request));
getIndexRequest.indices(indices);
getIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, getIndexRequest.indicesOptions()));
getIndexRequest.local(request.paramAsBoolean("local", getIndexRequest.local()));
RestUtils.consumeDeprecatedLocalParameter(request);
getIndexRequest.humanReadable(request.paramAsBoolean("human", false));
getIndexRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
getIndexRequest.features(GetIndexRequest.Feature.fromRequest(request));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
Expand All @@ -25,7 +26,6 @@
import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout;

@ServerlessScope(Scope.PUBLIC)
public class RestGetMappingAction extends BaseRestHandler {
Expand All @@ -50,10 +50,10 @@ public String getName() {
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
final GetMappingsRequest getMappingsRequest = new GetMappingsRequest(getMasterNodeTimeout(request));
final GetMappingsRequest getMappingsRequest = new GetMappingsRequest(RestUtils.getMasterNodeTimeout(request));
getMappingsRequest.indices(indices);
getMappingsRequest.indicesOptions(IndicesOptions.fromRequest(request, getMappingsRequest.indicesOptions()));
getMappingsRequest.local(request.paramAsBoolean("local", getMappingsRequest.local()));
RestUtils.consumeDeprecatedLocalParameter(request);
final HttpChannel httpChannel = request.getHttpChannel();
return channel -> new RestCancellableNodeClient(client, httpChannel).admin()
.indices()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ class TestTransportGetIndexAction extends TransportGetIndexAction {
super(
GetIndexActionTests.this.transportService,
GetIndexActionTests.this.clusterService,
GetIndexActionTests.this.threadPool,
settingsFilter,
new ActionFilters(emptySet()),
new GetIndexActionTests.Resolver(),
Expand Down
Loading

0 comments on commit cebcff1

Please sign in to comment.