Skip to content

Commit

Permalink
Simplify ES|QL execution info
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Jan 17, 2025
1 parent 53cca8e commit 0c24e2a
Show file tree
Hide file tree
Showing 5 changed files with 341 additions and 789 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.compute.EsqlRefCountingListener;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
Expand All @@ -25,17 +25,16 @@
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.session.Configuration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

/**
* Manages computes across multiple clusters by sending {@link ClusterComputeRequest} to remote clusters and executing the computes.
Expand Down Expand Up @@ -63,47 +62,39 @@ final class ClusterComputeHandler implements TransportRequestHandler<ClusterComp
transportService.registerRequestHandler(ComputeService.CLUSTER_ACTION_NAME, esqlExecutor, ClusterComputeRequest::new, this);
}

void startComputeOnRemoteClusters(
void startComputeOnRemoteCluster(
String sessionId,
CancellableTask rootTask,
Configuration configuration,
PhysicalPlan plan,
ExchangeSourceHandler exchangeSource,
List<RemoteCluster> clusters,
ComputeListener computeListener
RemoteCluster cluster,
ActionListener<ComputeResponse> listener
) {
var queryPragmas = configuration.pragmas();
var linkExchangeListeners = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink());
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(linkExchangeListeners)) {
for (RemoteCluster cluster : clusters) {
final var childSessionId = computeService.newChildSession(sessionId);
ExchangeService.openExchange(
transportService,
listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close);
final var childSessionId = computeService.newChildSession(sessionId);
ExchangeService.openExchange(
transportService,
cluster.connection,
childSessionId,
queryPragmas.exchangeBufferSize(),
esqlExecutor,
listener.delegateFailureAndWrap((l, unused) -> {
var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection);
exchangeSource.addRemoteSink(remoteSink, true, queryPragmas.concurrentExchangeClients(), ActionListener.noop());
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan);
transportService.sendChildRequest(
cluster.connection,
childSessionId,
queryPragmas.exchangeBufferSize(),
esqlExecutor,
refs.acquire().delegateFailureAndWrap((l, unused) -> {
var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection);
exchangeSource.addRemoteSink(remoteSink, true, queryPragmas.concurrentExchangeClients(), ActionListener.noop());
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan);
var clusterListener = ActionListener.runBefore(
computeListener.acquireCompute(cluster.clusterAlias()),
() -> l.onResponse(null)
);
transportService.sendChildRequest(
cluster.connection,
ComputeService.CLUSTER_ACTION_NAME,
clusterRequest,
rootTask,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
);
})
ComputeService.CLUSTER_ACTION_NAME,
clusterRequest,
rootTask,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(l, ComputeResponse::new, esqlExecutor)
);
}
}
})
);
}

List<RemoteCluster> getRemoteClusters(
Expand Down Expand Up @@ -141,28 +132,16 @@ public void messageReceived(ClusterComputeRequest request, TransportChannel chan
listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + plan));
return;
}
String clusterAlias = request.clusterAlias();
/*
* This handler runs only on remote cluster coordinators, so it creates a new local EsqlExecutionInfo object to record
* execution metadata for ES|QL processing local to this cluster. The execution info will be copied into the
* ComputeResponse that is sent back to the primary coordinating cluster.
*/
EsqlExecutionInfo execInfo = new EsqlExecutionInfo(true);
execInfo.swapCluster(clusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(clusterAlias, Arrays.toString(request.indices())));
CancellableTask cancellable = (CancellableTask) task;
try (var computeListener = ComputeListener.create(clusterAlias, transportService, cancellable, execInfo, listener)) {
runComputeOnRemoteCluster(
clusterAlias,
request.sessionId(),
(CancellableTask) task,
request.configuration(),
(ExchangeSinkExec) plan,
Set.of(remoteClusterPlan.targetIndices()),
remoteClusterPlan.originalIndices(),
execInfo,
computeListener
);
}
runComputeOnRemoteCluster(
request.clusterAlias(),
request.sessionId(),
(CancellableTask) task,
request.configuration(),
(ExchangeSinkExec) plan,
Set.of(remoteClusterPlan.targetIndices()),
remoteClusterPlan.originalIndices(),
listener
);
}

/**
Expand All @@ -182,48 +161,59 @@ void runComputeOnRemoteCluster(
ExchangeSinkExec plan,
Set<String> concreteIndices,
OriginalIndices originalIndices,
EsqlExecutionInfo executionInfo,
ComputeListener computeListener
ActionListener<ComputeResponse> listener
) {
final var exchangeSink = exchangeService.getSinkHandler(globalSessionId);
parentTask.addListener(
() -> exchangeService.finishSinkHandler(globalSessionId, new TaskCancelledException(parentTask.getReasonCancelled()))
);
final String localSessionId = clusterAlias + ":" + globalSessionId;
final PhysicalPlan coordinatorPlan = ComputeService.reductionPlan(plan, true);
var exchangeSource = new ExchangeSourceHandler(
configuration.pragmas().exchangeBufferSize(),
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH),
computeListener.acquireAvoid()
);
try (Releasable ignored = exchangeSource.addEmptySink()) {
exchangeSink.addCompletionListener(computeListener.acquireAvoid());
computeService.runCompute(
parentTask,
new ComputeContext(
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
final long startTimeInNanos = System.nanoTime();
final Runnable cancelQueryOnFailure = computeService.cancelQueryOnFailure(parentTask);
try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
final TimeValue took = TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos);
final ComputeResponse r = finalResponse.get();
return new ComputeResponse(profiles, took, r.totalShards, r.successfulShards, r.skippedShards, r.failedShards);
}))) {
var exchangeSource = new ExchangeSourceHandler(
configuration.pragmas().exchangeBufferSize(),
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH),
computeListener.acquireAvoid()
);
try (Releasable ignored = exchangeSource.addEmptySink()) {
exchangeSink.addCompletionListener(computeListener.acquireAvoid());
computeService.runCompute(
parentTask,
new ComputeContext(
localSessionId,
clusterAlias,
List.of(),
configuration,
configuration.newFoldContext(),
exchangeSource,
exchangeSink
),
coordinatorPlan,
computeListener.acquireCompute()
);
dataNodeComputeHandler.startComputeOnDataNodes(
localSessionId,
clusterAlias,
List.of(),
parentTask,
configuration,
configuration.newFoldContext(),
plan,
concreteIndices,
originalIndices,
exchangeSource,
exchangeSink
),
coordinatorPlan,
computeListener.acquireCompute(clusterAlias)
);
dataNodeComputeHandler.startComputeOnDataNodes(
localSessionId,
clusterAlias,
parentTask,
configuration,
plan,
concreteIndices,
originalIndices,
exchangeSource,
executionInfo,
computeListener
);
cancelQueryOnFailure,
computeListener.acquireCompute().map(r -> {
finalResponse.set(r);
return r.getProfiles();
})
);
}
}
}

Expand Down
Loading

0 comments on commit 0c24e2a

Please sign in to comment.