Skip to content

Commit

Permalink
Run cluster info requests on local node
Browse files Browse the repository at this point in the history
  • Loading branch information
nielsbauman committed Jan 17, 2025
1 parent 9782179 commit c1aed7c
Show file tree
Hide file tree
Showing 17 changed files with 68 additions and 300 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ public void testLargeClusterStatePublishing() throws Exception {
MappingMetadata mappingMetadata = client.admin()
.indices()
.prepareGetMappings(TEST_REQUEST_TIMEOUT, "test")
.setLocal(true)
.get()
.getMappings()
.get("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public TransportGetIndexAction(
threadPool,
actionFilters,
GetIndexRequest::new,
indexNameExpressionResolver,
GetIndexResponse::new
indexNameExpressionResolver
);
this.indicesService = indicesService;
this.settingsFilter = settingsFilter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@

package org.elasticsearch.action.admin.indices.mapping.get;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;

Expand All @@ -35,21 +33,6 @@ public GetMappingsResponse(Map<String, MappingMetadata> mappings) {
this.mappings = mappings;
}

GetMappingsResponse(StreamInput in) throws IOException {
super(in);
mappings = in.readImmutableMap(in.getTransportVersion().before(TransportVersions.V_8_0_0) ? i -> {
int mappingCount = i.readVInt();
assert mappingCount == 1 || mappingCount == 0 : "Expected 0 or 1 mappings but got " + mappingCount;
if (mappingCount == 1) {
String type = i.readString();
assert MapperService.SINGLE_MAPPING_NAME.equals(type) : "Expected type [_doc] but got [" + type + "]";
return new MappingMetadata(i);
} else {
return MappingMetadata.EMPTY_MAPPINGS;
}
} : i -> i.readBoolean() ? new MappingMetadata(i) : MappingMetadata.EMPTY_MAPPINGS);
}

public Map<String, MappingMetadata> mappings() {
return mappings;
}
Expand All @@ -58,6 +41,11 @@ public Map<String, MappingMetadata> getMappings() {
return mappings();
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Override
public void writeTo(StreamOutput out) throws IOException {
MappingMetadata.writeMappingMetadata(out, mappings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public TransportGetMappingsAction(
threadPool,
actionFilters,
GetMappingsRequest::new,
indexNameExpressionResolver,
GetMappingsResponse::new
indexNameExpressionResolver
);
this.indicesService = indicesService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected LocalClusterStateRequest(StreamInput in) throws IOException {
}

@Override
public final void writeTo(StreamOutput out) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
TransportAction.localOnly();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;

import java.io.IOException;

public abstract class ClusterInfoRequest<Request extends ClusterInfoRequest<Request>> extends MasterNodeReadRequest<Request>
public abstract class ClusterInfoRequest<Request extends ClusterInfoRequest<Request>> extends LocalClusterStateRequest
implements
IndicesRequest.Replaceable {

Expand All @@ -37,18 +37,11 @@ public ClusterInfoRequest(TimeValue masterTimeout, IndicesOptions indicesOptions
this.indicesOptions = indicesOptions;
}

@Deprecated(forRemoval = true)
public ClusterInfoRequest() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
}

// So subclasses can override the default indices options, if needed
@Deprecated(forRemoval = true)
protected ClusterInfoRequest(IndicesOptions indicesOptions) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
this.indicesOptions = indicesOptions;
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
public ClusterInfoRequest(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
Expand All @@ -58,16 +51,6 @@ public ClusterInfoRequest(StreamInput in) throws IOException {
indicesOptions = IndicesOptions.readIndicesOptions(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
if (out.getTransportVersion().before(TransportVersions.V_8_0_0)) {
out.writeStringArray(Strings.EMPTY_ARRAY);
}
indicesOptions.writeIndicesOptions(out);
}

@Override
@SuppressWarnings("unchecked")
public Request indices(String... indices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,17 @@
*/
package org.elasticsearch.action.support.master.info;

import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
import org.elasticsearch.client.internal.ElasticsearchClient;
import org.elasticsearch.common.util.ArrayUtils;

public abstract class ClusterInfoRequestBuilder<
Request extends ClusterInfoRequest<Request>,
Response extends ActionResponse,
Builder extends ClusterInfoRequestBuilder<Request, Response, Builder>> extends MasterNodeReadOperationRequestBuilder<
Request,
Response,
Builder> {
Builder extends ClusterInfoRequestBuilder<Request, Response, Builder>> extends ActionRequestBuilder<Request, Response> {

protected ClusterInfoRequestBuilder(ElasticsearchClient client, ActionType<Response> action, Request request) {
super(client, action, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,56 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

public abstract class TransportClusterInfoAction<Request extends ClusterInfoRequest<Request>, Response extends ActionResponse> extends
TransportMasterNodeReadAction<Request, Response> {
TransportLocalClusterStateAction<Request, Response> {

protected final IndexNameExpressionResolver indexNameExpressionResolver;

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@SuppressWarnings("this-escape")
public TransportClusterInfoAction(
String actionName,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver,
Writeable.Reader<Response> response
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
actionName,
transportService,
clusterService,
threadPool,
actionFilters,
request,
indexNameExpressionResolver,
response,
transportService.getTaskManager(),
clusterService,
threadPool.executor(ThreadPool.Names.MANAGEMENT)
);
this.indexNameExpressionResolver = indexNameExpressionResolver;

transportService.registerRequestHandler(
actionName,
executor,
false,
true,
request,
(r, channel, task) -> executeDirect(task, r, new ChannelActionListener<>(channel))
);
}

@Override
Expand All @@ -55,7 +70,7 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
}

@Override
protected final void masterOperation(
protected final void localClusterStateOperation(
Task task,
final Request request,
final ClusterState state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
Expand All @@ -27,7 +28,6 @@

import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.HEAD;
import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout;

/**
* The REST handler for get index and head index APIs.
Expand All @@ -48,10 +48,10 @@ public String getName() {
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
final GetIndexRequest getIndexRequest = new GetIndexRequest(getMasterNodeTimeout(request));
final GetIndexRequest getIndexRequest = new GetIndexRequest(RestUtils.getMasterNodeTimeout(request));
getIndexRequest.indices(indices);
getIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, getIndexRequest.indicesOptions()));
getIndexRequest.local(request.paramAsBoolean("local", getIndexRequest.local()));
RestUtils.consumeDeprecatedLocalParameter(request);
getIndexRequest.humanReadable(request.paramAsBoolean("human", false));
getIndexRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
getIndexRequest.features(GetIndexRequest.Feature.fromRequest(request));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
Expand All @@ -25,7 +26,6 @@
import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout;

@ServerlessScope(Scope.PUBLIC)
public class RestGetMappingAction extends BaseRestHandler {
Expand All @@ -50,10 +50,10 @@ public String getName() {
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
final GetMappingsRequest getMappingsRequest = new GetMappingsRequest(getMasterNodeTimeout(request));
final GetMappingsRequest getMappingsRequest = new GetMappingsRequest(RestUtils.getMasterNodeTimeout(request));
getMappingsRequest.indices(indices);
getMappingsRequest.indicesOptions(IndicesOptions.fromRequest(request, getMappingsRequest.indicesOptions()));
getMappingsRequest.local(request.paramAsBoolean("local", getMappingsRequest.local()));
RestUtils.consumeDeprecatedLocalParameter(request);
final HttpChannel httpChannel = request.getHttpChannel();
return channel -> new RestCancellableNodeClient(client, httpChannel).admin()
.indices()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
package org.elasticsearch.action.admin.indices.mapping.get;

import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;

import java.util.HashMap;
Expand All @@ -23,30 +22,20 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class GetMappingsResponseTests extends AbstractWireSerializingTestCase<GetMappingsResponse> {
public class GetMappingsResponseTests extends ESTestCase {

public void testCheckEqualsAndHashCode() {
GetMappingsResponse resp = createTestInstance();
GetMappingsResponse resp = new GetMappingsResponse(Map.of("index-" + randomAlphaOfLength(5), createMappingsForIndex()));
EqualsHashCodeTestUtils.checkEqualsAndHashCode(resp, r -> new GetMappingsResponse(r.mappings()), GetMappingsResponseTests::mutate);
}

@Override
protected Writeable.Reader<GetMappingsResponse> instanceReader() {
return GetMappingsResponse::new;
}

private static GetMappingsResponse mutate(GetMappingsResponse original) {
Map<String, MappingMetadata> builder = new HashMap<>(original.mappings());
String indexKey = original.mappings().keySet().iterator().next();
builder.put(indexKey + "1", createMappingsForIndex());
return new GetMappingsResponse(builder);
}

@Override
protected GetMappingsResponse mutateInstance(GetMappingsResponse instance) {
return mutate(instance);
}

public static MappingMetadata createMappingsForIndex() {
Map<String, Object> mappings = new HashMap<>();
if (rarely() == false) { // rarely have no fields
Expand All @@ -60,13 +49,6 @@ public static MappingMetadata createMappingsForIndex() {
return new MappingMetadata(MapperService.SINGLE_MAPPING_NAME, mappings);
}

@Override
protected GetMappingsResponse createTestInstance() {
GetMappingsResponse resp = new GetMappingsResponse(Map.of("index-" + randomAlphaOfLength(5), createMappingsForIndex()));
logger.debug("--> created: {}", resp);
return resp;
}

public void testChunking() {
AbstractChunkedSerializingTestCase.assertChunkCount(
new GetMappingsResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.info.ClusterInfoRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -32,19 +32,17 @@ public ExplainLifecycleRequest(TimeValue masterTimeout) {
super(masterTimeout);
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
public ExplainLifecycleRequest(StreamInput in) throws IOException {
super(in);
onlyErrors = in.readBoolean();
onlyManaged = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(onlyErrors);
out.writeBoolean(onlyManaged);
}

public boolean onlyErrors() {
return onlyErrors;
}
Expand Down
Loading

0 comments on commit c1aed7c

Please sign in to comment.