Skip to content

Commit

Permalink
Fix CCS exchange when multi cluster aliases point to same cluster (el…
Browse files Browse the repository at this point in the history
…astic#117297)

[esql] > Unexpected error from Elasticsearch: illegal_state_exception - sink exchanger for id [ruxoDDxXTGW55oIPHoCT-g:964613010] already exists.

This issue occurs when two or more clusterAliases point to the same 
physical remote cluster. The exchange service assumes the destination is
unique, which is not true in this topology. This PR addresses the
problem by appending a suffix using a monotonic increasing number,
ensuring that different exchanges are created in such cases.

Another issue arising from this behavior is that data on a remote 
cluster is processed multiple times, leading to incorrect results. I can
work on the fix for this once we agree that this is an issue.
  • Loading branch information
dnhatn authored Nov 22, 2024
1 parent 8f943a6 commit 5d4072d
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 17 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/117297.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 117297
summary: Fix CCS exchange when multi cluster aliases point to same cluster
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Strings;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -44,6 +45,7 @@

import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING;
import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -149,19 +151,23 @@ public static void stopClusters() throws IOException {
}

protected void disconnectFromRemoteClusters() throws Exception {
Settings.Builder settings = Settings.builder();
final Set<String> clusterAliases = clusterGroup.clusterAliases();
for (String clusterAlias : clusterAliases) {
if (clusterAlias.equals(LOCAL_CLUSTER) == false) {
settings.putNull("cluster.remote." + clusterAlias + ".seeds");
settings.putNull("cluster.remote." + clusterAlias + ".mode");
settings.putNull("cluster.remote." + clusterAlias + ".proxy_address");
removeRemoteCluster(clusterAlias);
}
}
}

protected void removeRemoteCluster(String clusterAlias) throws Exception {
Settings.Builder settings = Settings.builder();
settings.putNull("cluster.remote." + clusterAlias + ".seeds");
settings.putNull("cluster.remote." + clusterAlias + ".mode");
settings.putNull("cluster.remote." + clusterAlias + ".proxy_address");
client().admin().cluster().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).setPersistentSettings(settings).get();
assertBusy(() -> {
for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) {
assertThat(transportService.getRemoteClusterService().getRegisteredRemoteClusterNames(), empty());
assertThat(transportService.getRemoteClusterService().getRegisteredRemoteClusterNames(), not(contains(clusterAlias)));
}
});
}
Expand All @@ -178,12 +184,17 @@ protected void configureAndConnectsToRemoteClusters() throws Exception {
}

protected void configureRemoteCluster(String clusterAlias, Collection<String> seedNodes) throws Exception {
final String remoteClusterSettingPrefix = "cluster.remote." + clusterAlias + ".";
Settings.Builder settings = Settings.builder();
final List<String> seedAddresses = seedNodes.stream().map(node -> {
final var seedAddresses = seedNodes.stream().map(node -> {
final TransportService transportService = cluster(clusterAlias).getInstance(TransportService.class, node);
return transportService.boundAddress().publishAddress().toString();
return transportService.boundAddress().publishAddress();
}).toList();
configureRemoteClusterWithSeedAddresses(clusterAlias, seedAddresses);
}

protected void configureRemoteClusterWithSeedAddresses(String clusterAlias, Collection<TransportAddress> seedNodes) throws Exception {
final String remoteClusterSettingPrefix = "cluster.remote." + clusterAlias + ".";
Settings.Builder settings = Settings.builder();
final List<String> seedAddresses = seedNodes.stream().map(TransportAddress::toString).toList();
boolean skipUnavailable = skipUnavailableForRemoteClusters().containsKey(clusterAlias)
? skipUnavailableForRemoteClusters().get(clusterAlias)
: DEFAULT_SKIP_UNAVAILABLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -339,6 +340,10 @@ public boolean isEmpty() {
return sinks.isEmpty();
}

public Set<String> sinkKeys() {
return sinks.keySet();
}

@Override
protected void doStart() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand All @@ -15,6 +16,7 @@
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.compute.operator.DriverTaskRunner;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
Expand All @@ -27,8 +29,10 @@
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.plugin.ComputeService;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.junit.Before;

Expand All @@ -40,8 +44,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase.randomPragmas;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;

Expand Down Expand Up @@ -189,4 +195,44 @@ public void testCancel() throws Exception {
Exception error = expectThrows(Exception.class, requestFuture::actionGet);
assertThat(error.getMessage(), containsString("proxy timeout"));
}

public void testSameRemoteClusters() throws Exception {
TransportAddress address = cluster(REMOTE_CLUSTER).getInstance(TransportService.class).getLocalNode().getAddress();
int moreClusters = between(1, 5);
for (int i = 0; i < moreClusters; i++) {
String clusterAlias = REMOTE_CLUSTER + "-" + i;
configureRemoteClusterWithSeedAddresses(clusterAlias, List.of(address));
}
int numDocs = between(10, 100);
createRemoteIndex(numDocs);
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
request.query("FROM *:test | STATS total=sum(const) | LIMIT 1");
request.pragmas(randomPragmas());
ActionFuture<EsqlQueryResponse> future = client().execute(EsqlQueryAction.INSTANCE, request);
try {
try {
assertBusy(() -> {
List<TaskInfo> tasks = client(REMOTE_CLUSTER).admin()
.cluster()
.prepareListTasks()
.setActions(ComputeService.CLUSTER_ACTION_NAME)
.get()
.getTasks();
assertThat(tasks, hasSize(moreClusters + 1));
});
} finally {
PauseFieldPlugin.allowEmitting.countDown();
}
try (EsqlQueryResponse resp = future.actionGet(30, TimeUnit.SECONDS)) {
// TODO: This produces incorrect results because data on the remote cluster is processed multiple times.
long expectedCount = numDocs * (moreClusters + 1L);
assertThat(getValuesList(resp), equalTo(List.of(List.of(expectedCount))));
}
} finally {
for (int i = 0; i < moreClusters; i++) {
String clusterAlias = REMOTE_CLUSTER + "-" + i;
removeRemoteCluster(clusterAlias);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ protected void doRun() throws Exception {
});
sessionId = foundTasks.get(0).taskId().toString();
assertTrue(fetchingStarted.await(1, TimeUnit.MINUTES));
ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(sessionId);
String exchangeId = exchangeService.sinkKeys().stream().filter(s -> s.startsWith(sessionId)).findFirst().get();
ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(exchangeId);
waitedForPages = randomBoolean();
if (waitedForPages) {
// do not fail exchange requests until we have some pages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;

Expand All @@ -101,6 +102,7 @@ public class ComputeService {
private final EnrichLookupService enrichLookupService;
private final LookupFromIndexService lookupFromIndexService;
private final ClusterService clusterService;
private final AtomicLong childSessionIdGenerator = new AtomicLong();

public ComputeService(
SearchService searchService,
Expand Down Expand Up @@ -167,7 +169,7 @@ public void execute(
return;
}
var computeContext = new ComputeContext(
sessionId,
newChildSession(sessionId),
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
List.of(),
configuration,
Expand Down Expand Up @@ -330,22 +332,23 @@ private void startComputeOnDataNodes(
// the new remote exchange sink, and initialize the computation on the target node via data-node-request.
for (DataNode node : dataNodeResult.dataNodes()) {
var queryPragmas = configuration.pragmas();
var childSessionId = newChildSession(sessionId);
ExchangeService.openExchange(
transportService,
node.connection,
sessionId,
childSessionId,
queryPragmas.exchangeBufferSize(),
esqlExecutor,
refs.acquire().delegateFailureAndWrap((l, unused) -> {
var remoteSink = exchangeService.newRemoteSink(parentTask, sessionId, transportService, node.connection);
var remoteSink = exchangeService.newRemoteSink(parentTask, childSessionId, transportService, node.connection);
exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients());
ActionListener<ComputeResponse> computeResponseListener = computeListener.acquireCompute(clusterAlias);
var dataNodeListener = ActionListener.runBefore(computeResponseListener, () -> l.onResponse(null));
transportService.sendChildRequest(
node.connection,
DATA_ACTION_NAME,
new DataNodeRequest(
sessionId,
childSessionId,
configuration,
clusterAlias,
node.shardIds,
Expand Down Expand Up @@ -378,17 +381,18 @@ private void startComputeOnRemoteClusters(
var linkExchangeListeners = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink());
try (RefCountingListener refs = new RefCountingListener(linkExchangeListeners)) {
for (RemoteCluster cluster : clusters) {
final var childSessionId = newChildSession(sessionId);
ExchangeService.openExchange(
transportService,
cluster.connection,
sessionId,
childSessionId,
queryPragmas.exchangeBufferSize(),
esqlExecutor,
refs.acquire().delegateFailureAndWrap((l, unused) -> {
var remoteSink = exchangeService.newRemoteSink(rootTask, sessionId, transportService, cluster.connection);
var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection);
exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients());
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, sessionId, configuration, remotePlan);
var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan);
var clusterListener = ActionListener.runBefore(
computeListener.acquireCompute(cluster.clusterAlias()),
() -> l.onResponse(null)
Expand Down Expand Up @@ -912,4 +916,8 @@ public List<SearchExecutionContext> searchExecutionContexts() {
return searchContexts.stream().map(ctx -> ctx.getSearchExecutionContext()).toList();
}
}

private String newChildSession(String session) {
return session + "/" + childSessionIdGenerator.incrementAndGet();
}
}

0 comments on commit 5d4072d

Please sign in to comment.