Skip to content

Commit

Permalink
Provide a mechanism to modify config files in a running test cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
mark-vieira committed Dec 2, 2024
1 parent c54d4b6 commit 9d27436
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.elasticsearch.test.cluster.util.ProcessUtils;
import org.elasticsearch.test.cluster.util.Retry;
import org.elasticsearch.test.cluster.util.Version;
import org.elasticsearch.test.cluster.util.resource.MutableResource;
import org.elasticsearch.test.cluster.util.resource.Resource;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
Expand Down Expand Up @@ -115,6 +117,9 @@ public static class Node {
private Version currentVersion;
private Process process = null;
private DistributionDescriptor distributionDescriptor;
private Set<String> extraConfigListeners = new HashSet<>();
private Set<String> keystoreFileListeners = new HashSet<>();
private Set<Resource> roleFileListeners = new HashSet<>();

public Node(Path baseWorkingDir, DistributionResolver distributionResolver, LocalNodeSpec spec) {
this(baseWorkingDir, distributionResolver, spec, null, false);
Expand Down Expand Up @@ -446,6 +451,14 @@ private void copyExtraConfigFiles() {
}
}
resource.writeTo(target);

// Register and update listener for this config file
if (resource instanceof MutableResource && extraConfigListeners.add(fileName)) {
((MutableResource) resource).addUpdateListener(updated -> {
LOGGER.info("Updating config file '{}'", fileName);
updated.writeTo(target);
});
}
});
}

Expand Down Expand Up @@ -485,29 +498,39 @@ private void addKeystoreSettings() {

private void addKeystoreFiles() {
spec.getKeystoreFiles().forEach((key, file) -> {
try {
Path path = Files.createTempFile(tempDir, key, null);
file.writeTo(path);

ProcessUtils.exec(
spec.getKeystorePassword(),
workingDir,
OS.conditional(
c -> c.onWindows(() -> distributionDir.resolve("bin").resolve("elasticsearch-keystore.bat"))
.onUnix(() -> distributionDir.resolve("bin").resolve("elasticsearch-keystore"))
),
getEnvironmentVariables(),
false,
"add-file",
key,
path.toString()
).waitFor();
} catch (InterruptedException | IOException e) {
throw new RuntimeException(e);
addKeystoreFile(key, file);
if (file instanceof MutableResource && keystoreFileListeners.add(key)) {
((MutableResource) file).addUpdateListener(updated -> {
LOGGER.info("Updating keystore file '{}'", key);
addKeystoreFile(key, updated);
});
}
});
}

private void addKeystoreFile(String key, Resource file) {
try {
Path path = Files.createTempFile(tempDir, key, null);
file.writeTo(path);

ProcessUtils.exec(
spec.getKeystorePassword(),
workingDir,
OS.conditional(
c -> c.onWindows(() -> distributionDir.resolve("bin").resolve("elasticsearch-keystore.bat"))
.onUnix(() -> distributionDir.resolve("bin").resolve("elasticsearch-keystore"))
),
getEnvironmentVariables(),
false,
"add-file",
key,
path.toString()
).waitFor();
} catch (InterruptedException | IOException e) {
throw new RuntimeException(e);
}
}

private void writeSecureSecretsFile() {
if (spec.getKeystoreFiles().isEmpty() == false) {
throw new IllegalStateException(
Expand Down Expand Up @@ -535,16 +558,20 @@ private void configureSecurity() {
if (spec.isSecurityEnabled()) {
if (spec.getUsers().isEmpty() == false) {
LOGGER.info("Setting up roles.yml for node '{}'", name);

Path destination = workingDir.resolve("config").resolve("roles.yml");
spec.getRolesFiles().forEach(rolesFile -> {
try (
Writer writer = Files.newBufferedWriter(destination, StandardOpenOption.APPEND);
Reader reader = new BufferedReader(new InputStreamReader(rolesFile.asStream()))
) {
reader.transferTo(writer);
} catch (IOException e) {
throw new UncheckedIOException("Failed to append roles file " + rolesFile + " to " + destination, e);
writeRolesFile();
spec.getRolesFiles().forEach(resource -> {
if (resource instanceof MutableResource && roleFileListeners.add(resource)) {
((MutableResource) resource).addUpdateListener(updated -> {
LOGGER.info("Updating roles.yml for node '{}'", name);
Path rolesFile = workingDir.resolve("config").resolve("roles.yml");
try {
Files.delete(rolesFile);
Files.copy(distributionDir.resolve("config").resolve("roles.yml"), rolesFile);
writeRolesFile();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
});
}
Expand Down Expand Up @@ -596,6 +623,20 @@ private void configureSecurity() {
}
}

private void writeRolesFile() {
Path destination = workingDir.resolve("config").resolve("roles.yml");
spec.getRolesFiles().forEach(rolesFile -> {
try (
Writer writer = Files.newBufferedWriter(destination, StandardOpenOption.APPEND);
Reader reader = new BufferedReader(new InputStreamReader(rolesFile.asStream()))
) {
reader.transferTo(writer);
} catch (IOException e) {
throw new UncheckedIOException("Failed to append roles file " + rolesFile + " to " + destination, e);
}
});
}

private void installPlugins() {
if (spec.getPlugins().isEmpty() == false) {
Pattern pattern = Pattern.compile("(.+)(?:-\\d+\\.\\d+\\.\\d+(-SNAPSHOT)?\\.zip)");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.test.cluster.util.resource;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
* A mutable version of {@link Resource}. Anywhere a {@link Resource} is accepted in the test clusters API a {@link MutableResource} can
* be supplied instead. Unless otherwise specified, when the {@link #update(Resource)} method is called, the backing configuration will
* be updated in-place.
*/
public class MutableResource implements Resource {
private final List<Consumer<? super Resource>> listeners = new ArrayList<>();
private Resource delegate;

private MutableResource(Resource delegate) {
this.delegate = delegate;
}

@Override
public InputStream asStream() {
return delegate.asStream();
}

public static MutableResource from(Resource delegate) {
return new MutableResource(delegate);
}

public void update(Resource delegate) {
this.delegate = delegate;
this.listeners.forEach(listener -> listener.accept(this));
}

/**
* Registers a listener that will be notified when any updates are made to this resource. This listener will receive a reference to
* the resource with the updated value.
*
* @param listener action to be called on update
*/
public void addUpdateListener(Consumer<? super Resource> listener) {
listeners.add(listener);
}
}

0 comments on commit 9d27436

Please sign in to comment.