Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store active job cache pointer in Zookeeper #2681

Merged
merged 3 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions properties/default.properties
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,8 @@ MAP_FILE_LOADER_EXTRA_ARGS=-ingestMetricsDisabled
JOB_OBSERVERS=
JOB_OBSERVER_EXTRA_OPTS=

ACTIVE_JOB_CACHE_PATH=/datawave/activeJobCache

# These should be set only if deploying on the CDH distro of Accumulo,
# otherwise leave them blank
WAREHOUSE_ACCUMULO_LIB=
Expand Down
13 changes: 13 additions & 0 deletions warehouse/ingest-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<dependencies>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
</dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
Expand Down Expand Up @@ -64,6 +68,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-jexl3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
Expand Down Expand Up @@ -207,6 +215,11 @@
<version>${version.accumulo}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package datawave.ingest.jobcache;

import java.nio.charset.Charset;

import org.apache.curator.framework.CuratorFramework;

/**
* Sets the active job cache.
*/
public class ActiveSetter {

private final CuratorFramework zkClient;

public ActiveSetter(CuratorFramework zkClient) {
this.zkClient = zkClient;
}

/**
* Sets the active job cache in Zookeeper.
*
* @param path
* The ZK node to set
* @param activeJobCache
* The active job cache
* @throws Exception
* if the operation does not succeed
*/
public void set(String path, String activeJobCache) throws Exception {
if (path == null || path.isEmpty()) {
throw new IllegalArgumentException("path cannot be empty");
}
if (activeJobCache == null || activeJobCache.isEmpty()) {
throw new IllegalArgumentException("activeJobCache cannot be empty");
}

if (!zkPathExists(path)) {
createZkPath(path);
}

updateZkPath(path, activeJobCache);
}

private boolean zkPathExists(String path) throws Exception {
return zkClient.checkExists().forPath(path) != null;
}

private void createZkPath(String path) throws Exception {
zkClient.create().creatingParentsIfNeeded().forPath(path);
}

private void updateZkPath(String path, String value) throws Exception {
zkClient.setData().forPath(path, value.getBytes(Charset.defaultCharset()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package datawave.ingest.jobcache;

import com.beust.jcommander.IParameterValidator;
import com.beust.jcommander.ParameterException;

/**
* Validates a parameter is a non-empty String.
*/
public class NonEmptyStringValidator implements IParameterValidator {
@Override
public void validate(String parameter, String value) throws ParameterException {
if (value == null || value.isEmpty()) {
throw new ParameterException(parameter + " must be a non-empty String.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package datawave.ingest.jobcache;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;

/**
* Command line tool to set the active job cache in Zookeeper.
*/
@Parameters(commandDescription = "Sets the active job cache in Zookeeper.")
public class SetActiveCommand {

private static final int ZK_NUM_RETRIES = 3;
private static final int ZK_RETRY_SLEEP_MS = 1000;

@Parameter(names = {"-z", "--zookeepers"}, description = "The zookeeper servers to update.", required = true, validateWith = NonEmptyStringValidator.class)
private String zookeepers;

@Parameter(names = {"-p", "--path"}, description = "The zookeeper path where the active job cache will be stored.", required = true,
validateWith = NonEmptyStringValidator.class)
private String zkPath;

@Parameter(names = {"-j", "--job-cache"}, description = "The full HDFS path to the active job cache (e.g. 'hdfs://ingest/data/jobCacheA').",
required = true, validateWith = NonEmptyStringValidator.class)
private String jobCache;

@Parameter(names = {"-h", "--help"}, description = "Prints the command usage.", help = true)
private boolean help;

public void run() {
try (CuratorFramework zkClient = CuratorFrameworkFactory.newClient(zookeepers, new ExponentialBackoffRetry(ZK_RETRY_SLEEP_MS, ZK_NUM_RETRIES))) {
zkClient.start();

new ActiveSetter(zkClient).set(zkPath, jobCache);

} catch (Exception e) {
throw new RuntimeException("Failed to update " + zkPath + " to " + jobCache + ". Try again.", e);
}
}

public boolean isHelp() {
return help;
}

public static void main(String[] args) {
SetActiveCommand tool = new SetActiveCommand();
JCommander jcommander = JCommander.newBuilder().addObject(tool).build();

try {
jcommander.parse(args);

if (tool.isHelp()) {
jcommander.usage();
} else {
tool.run();
}

} catch (ParameterException e) {
System.err.println(e.getMessage());
jcommander.usage();
System.exit(1);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package datawave.ingest.jobcache;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.nio.charset.Charset;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class ActiveSetterTest {

private static final String ACTIVE_PATH = "/datawave/activeJobCache";

private static TestingServer ZK_SERVER;
private static CuratorFramework ZK_CLIENT;

private ActiveSetter activeSetter;

@BeforeClass
public static void setupAll() throws Exception {
ZK_SERVER = new TestingServer();
ZK_SERVER.start();

ZK_CLIENT = CuratorFrameworkFactory.newClient(ZK_SERVER.getConnectString(), new ExponentialBackoffRetry(1000, 3));
ZK_CLIENT.start();
}

@Before
public void setup() {
activeSetter = new ActiveSetter(ZK_CLIENT);
}

@Test
public void shouldSetActive() throws Exception {
String activeJobCache = "/data/jobCacheA";

activeSetter.set(ACTIVE_PATH, activeJobCache);

assertZkData(ACTIVE_PATH, activeJobCache);
}

@Test
public void shouldOverwrite() throws Exception {
String activeJobCache1 = "/data/jobCacheA";
String activeJobCache2 = "/data/jobCacheB";

activeSetter.set(ACTIVE_PATH, activeJobCache1);
activeSetter.set(ACTIVE_PATH, activeJobCache2);

assertZkData(ACTIVE_PATH, activeJobCache2);
}

@Test(expected = IllegalArgumentException.class)
public void shouldNotAcceptEmptyPath() throws Exception {
String emptyJobCache = "";

activeSetter.set(ACTIVE_PATH, emptyJobCache);
}

private void assertZkData(String path, String expectedValue) throws Exception {
String actualValue = new String(ZK_CLIENT.getData().forPath(path), Charset.defaultCharset());

assertEquals(expectedValue, actualValue);
}

@AfterClass
public static void tearDownAll() throws IOException {
if (ZK_CLIENT != null) {
ZK_CLIENT.close();
}

if (ZK_SERVER != null) {
ZK_SERVER.stop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ findAccumuloJar (){
ls -1 $WAREHOUSE_ACCUMULO_LIB/$1-[0-9]*.jar | sort | tail -1
}
findZookeeperJar(){
result=$(ls -1 $ZOOKEEPER_HOME/zookeeper-*.jar 2>/dev/null | head -1)
[[ -f $result ]] || result=$(ls -1 $ZOOKEEPER_HOME/lib/zookeeper-*.jar | head -1)
result=$(ls -1 $ZOOKEEPER_HOME/$1-*.jar 2>/dev/null | head -1)
[[ -f $result ]] || result=$(ls -1 $ZOOKEEPER_HOME/lib/$1-*.jar | head -1)
echo $result
}


CONF_DIR=../../config
DATAWAVE_INDEX_STATS_JAR=$(findJar datawave-index-stats)
DATAWAVE_INGEST_CSV_JAR=$(findJar datawave-ingest-csv)
Expand Down Expand Up @@ -94,7 +94,8 @@ INFINISPAN_CORE_JAR=$(findJar infinispan-core)
INFINISPAN_COMMONS_JAR=$(findJar infinispan-commons)
JBOSS_LOGGING_JAR=$(findJar jboss-logging)
JGROUPS_JAR=$(findJar jgroups)
ZOOKEEPER_JAR=$(findZookeeperJar)
ZOOKEEPER_JAR=$(findZookeeperJar zookeeper)
ZOOKEEPER_JUTE_JAR=$(findZookeeperJar zookeeper-jute)
DATAWAVE_QUERY_CORE_JAR=$(findJar datawave-query-core)
COMMONS_JEXL_JAR=$(findJar commons-jexl3)
PROTOSTUFF_API_JAR=$(findJar protostuff-api)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ HDFS_BASE_DIR="${HDFS_BASE_DIR}"
BASE_WORK_DIR="${BASE_WORK_DIR}"
BASE_WORK_DIR="${BASE_WORK_DIR:-/datawave/ingest/work}"

ACTIVE_JOB_CACHE_PATH="${ACTIVE_JOB_CACHE_PATH}"

HDFS_MONITOR_ARGS="${HDFS_MONITOR_ARGS}"

MONITOR_SERVER_HOST="${MONITOR_SERVER_HOST}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ CLASSPATH=${CLASSPATH}:${INFINISPAN_COMMONS_JAR}
CLASSPATH=${CLASSPATH}:${JBOSS_LOGGING_JAR}
CLASSPATH=${CLASSPATH}:${JGROUPS_JAR}
CLASSPATH=${CLASSPATH}:${ZOOKEEPER_JAR}
CLASSPATH=${CLASSPATH}:${ZOOKEEPER_JUTE_JAR}
CLASSPATH=${CLASSPATH}:${OPENCSV_JAR}
CLASSPATH=${CLASSPATH}:${STREAMLIB}
CLASSPATH=${CLASSPATH}:${JCOMMANDER_JAR}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ THIS_DIR="${THIS_SCRIPT%/*}"
cd $THIS_DIR

. ../ingest/ingest-env.sh
. ../ingest/ingest-libs.sh
. ../ingest/job-cache-env.sh

# Check that there are no other instances of this script running
Expand Down Expand Up @@ -99,6 +100,25 @@ else
echo "Warehouse and ingest are one in the same. Assuming the warehouse job cache loading is sufficient"
fi

# Update Zookeeper if we have an active job cache path
if [[ -n "${ACTIVE_JOB_CACHE_PATH}" ]]; then
if ! java -cp ${CLASSPATH} datawave.ingest.jobcache.SetActiveCommand \
--zookeepers ${INGEST_ZOOKEEPERS} \
--path ${ACTIVE_JOB_CACHE_PATH} \
--job-cache "${INGEST_HDFS_NAME_NODE}${JOB_CACHE_DIR}"; then
echo "[ERROR] Failed to set active ingest job cache"
fi

if [[ "$WAREHOUSE_HDFS_NAME_NODE" != "$INGEST_HDFS_NAME_NODE" ]]; then
if ! java -cp ${CLASSPATH} datawave.ingest.jobcache.SetActiveCommand \
--zookeepers ${WAREHOUSE_ZOOKEEPERS} \
--path ${ACTIVE_JOB_CACHE_PATH} \
--job-cache "${WAREHOUSE_HDFS_NAME_NODE}${JOB_CACHE_DIR}"; then
echo "[ERROR] Failed to set active warehouse job cache"
fi
fi
fi

# Remove the prepared directory
rm -r -f $tmpdir
trap - INT TERM EXIT
Expand Down
Loading