Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify ES|QL execution info #120418

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.compute.EsqlRefCountingListener;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
Expand All @@ -25,17 +25,16 @@
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.session.Configuration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

/**
* Manages computes across multiple clusters by sending {@link ClusterComputeRequest} to remote clusters and executing the computes.
Expand Down Expand Up @@ -63,47 +62,39 @@ final class ClusterComputeHandler implements TransportRequestHandler<ClusterComp
transportService.registerRequestHandler(ComputeService.CLUSTER_ACTION_NAME, esqlExecutor, ClusterComputeRequest::new, this);
}

void startComputeOnRemoteClusters(
void startComputeOnRemoteCluster(
String sessionId,
CancellableTask rootTask,
Configuration configuration,
PhysicalPlan plan,
ExchangeSourceHandler exchangeSource,
List<RemoteCluster> clusters,
ComputeListener computeListener
RemoteCluster cluster,
ActionListener<ComputeResponse> listener
) {
var queryPragmas = configuration.pragmas();
var linkExchangeListeners = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink());
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(linkExchangeListeners)) {
for (RemoteCluster cluster : clusters) {
final var childSessionId = computeService.newChildSession(sessionId);
ExchangeService.openExchange(
transportService,
listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close);
final var childSessionId = computeService.newChildSession(sessionId);
ExchangeService.openExchange(
transportService,
cluster.connection,
childSessionId,
queryPragmas.exchangeBufferSize(),
esqlExecutor,
listener.delegateFailureAndWrap((l, unused) -> {
var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection);
exchangeSource.addRemoteSink(remoteSink, true, queryPragmas.concurrentExchangeClients(), ActionListener.noop());
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan);
transportService.sendChildRequest(
cluster.connection,
childSessionId,
queryPragmas.exchangeBufferSize(),
esqlExecutor,
refs.acquire().delegateFailureAndWrap((l, unused) -> {
var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection);
exchangeSource.addRemoteSink(remoteSink, true, queryPragmas.concurrentExchangeClients(), ActionListener.noop());
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan);
var clusterListener = ActionListener.runBefore(
computeListener.acquireCompute(cluster.clusterAlias()),
() -> l.onResponse(null)
);
transportService.sendChildRequest(
cluster.connection,
ComputeService.CLUSTER_ACTION_NAME,
clusterRequest,
rootTask,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
);
})
ComputeService.CLUSTER_ACTION_NAME,
clusterRequest,
rootTask,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(l, ComputeResponse::new, esqlExecutor)
);
}
}
})
);
}

List<RemoteCluster> getRemoteClusters(
Expand Down Expand Up @@ -141,28 +132,16 @@ public void messageReceived(ClusterComputeRequest request, TransportChannel chan
listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + plan));
return;
}
String clusterAlias = request.clusterAlias();
/*
* This handler runs only on remote cluster coordinators, so it creates a new local EsqlExecutionInfo object to record
* execution metadata for ES|QL processing local to this cluster. The execution info will be copied into the
* ComputeResponse that is sent back to the primary coordinating cluster.
*/
EsqlExecutionInfo execInfo = new EsqlExecutionInfo(true);
execInfo.swapCluster(clusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(clusterAlias, Arrays.toString(request.indices())));
CancellableTask cancellable = (CancellableTask) task;
try (var computeListener = ComputeListener.create(clusterAlias, transportService, cancellable, execInfo, listener)) {
runComputeOnRemoteCluster(
clusterAlias,
request.sessionId(),
(CancellableTask) task,
request.configuration(),
(ExchangeSinkExec) plan,
Set.of(remoteClusterPlan.targetIndices()),
remoteClusterPlan.originalIndices(),
execInfo,
computeListener
);
}
runComputeOnRemoteCluster(
request.clusterAlias(),
request.sessionId(),
(CancellableTask) task,
request.configuration(),
(ExchangeSinkExec) plan,
Set.of(remoteClusterPlan.targetIndices()),
remoteClusterPlan.originalIndices(),
listener
);
}

/**
Expand All @@ -182,48 +161,59 @@ void runComputeOnRemoteCluster(
ExchangeSinkExec plan,
Set<String> concreteIndices,
OriginalIndices originalIndices,
EsqlExecutionInfo executionInfo,
ComputeListener computeListener
ActionListener<ComputeResponse> listener
) {
final var exchangeSink = exchangeService.getSinkHandler(globalSessionId);
parentTask.addListener(
() -> exchangeService.finishSinkHandler(globalSessionId, new TaskCancelledException(parentTask.getReasonCancelled()))
);
final String localSessionId = clusterAlias + ":" + globalSessionId;
final PhysicalPlan coordinatorPlan = ComputeService.reductionPlan(plan, true);
var exchangeSource = new ExchangeSourceHandler(
configuration.pragmas().exchangeBufferSize(),
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH),
computeListener.acquireAvoid()
);
try (Releasable ignored = exchangeSource.addEmptySink()) {
exchangeSink.addCompletionListener(computeListener.acquireAvoid());
computeService.runCompute(
parentTask,
new ComputeContext(
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
final long startTimeInNanos = System.nanoTime();
final Runnable cancelQueryOnFailure = computeService.cancelQueryOnFailure(parentTask);
try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
final TimeValue took = TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos);
final ComputeResponse r = finalResponse.get();
return new ComputeResponse(profiles, took, r.totalShards, r.successfulShards, r.skippedShards, r.failedShards);
}))) {
var exchangeSource = new ExchangeSourceHandler(
configuration.pragmas().exchangeBufferSize(),
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH),
computeListener.acquireAvoid()
);
try (Releasable ignored = exchangeSource.addEmptySink()) {
exchangeSink.addCompletionListener(computeListener.acquireAvoid());
computeService.runCompute(
parentTask,
new ComputeContext(
localSessionId,
clusterAlias,
List.of(),
configuration,
configuration.newFoldContext(),
exchangeSource,
exchangeSink
),
coordinatorPlan,
computeListener.acquireCompute()
);
dataNodeComputeHandler.startComputeOnDataNodes(
localSessionId,
clusterAlias,
List.of(),
parentTask,
configuration,
configuration.newFoldContext(),
plan,
concreteIndices,
originalIndices,
exchangeSource,
exchangeSink
),
coordinatorPlan,
computeListener.acquireCompute(clusterAlias)
);
dataNodeComputeHandler.startComputeOnDataNodes(
localSessionId,
clusterAlias,
parentTask,
configuration,
plan,
concreteIndices,
originalIndices,
exchangeSource,
executionInfo,
computeListener
);
cancelQueryOnFailure,
computeListener.acquireCompute().map(r -> {
finalResponse.set(r);
return r.getProfiles();
})
);
}
}
}

Expand Down
Loading