diff --git a/properties/default.properties b/properties/default.properties
index 5f7cbbc485..104de4164b 100644
--- a/properties/default.properties
+++ b/properties/default.properties
@@ -574,6 +574,8 @@ MAP_FILE_LOADER_EXTRA_ARGS=-ingestMetricsDisabled
JOB_OBSERVERS=
JOB_OBSERVER_EXTRA_OPTS=
+ACTIVE_JOB_CACHE_PATH=/datawave/activeJobCache
+
# These should be set only if deploying on the CDH distro of Accumulo,
# otherwise leave them blank
WAREHOUSE_ACCUMULO_LIB=
diff --git a/warehouse/ingest-core/pom.xml b/warehouse/ingest-core/pom.xml
index e45839c5c0..c9ea5b9d58 100644
--- a/warehouse/ingest-core/pom.xml
+++ b/warehouse/ingest-core/pom.xml
@@ -10,6 +10,10 @@
jar
${project.artifactId}
+
+ com.beust
+ jcommander
+
com.clearspring.analytics
stream
@@ -64,6 +68,10 @@
org.apache.commons
commons-jexl3
+
+ org.apache.curator
+ curator-client
+
org.apache.hadoop
hadoop-annotations
@@ -207,6 +215,11 @@
${version.accumulo}
test
+
+ org.apache.curator
+ curator-test
+ test
+
org.javassist
javassist
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/jobcache/ActiveSetter.java b/warehouse/ingest-core/src/main/java/datawave/ingest/jobcache/ActiveSetter.java
new file mode 100644
index 0000000000..1ce6bf99a8
--- /dev/null
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/jobcache/ActiveSetter.java
@@ -0,0 +1,54 @@
+package datawave.ingest.jobcache;
+
+import java.nio.charset.Charset;
+
+import org.apache.curator.framework.CuratorFramework;
+
+/**
+ * Sets the active job cache.
+ */
+public class ActiveSetter {
+
+ private final CuratorFramework zkClient;
+
+ public ActiveSetter(CuratorFramework zkClient) {
+ this.zkClient = zkClient;
+ }
+
+ /**
+ * Sets the active job cache in Zookeeper.
+ *
+ * @param path
+ * The ZK node to set
+ * @param activeJobCache
+ * The active job cache
+ * @throws Exception
+ * if the operation does not succeed
+ */
+ public void set(String path, String activeJobCache) throws Exception {
+ if (path == null || path.isEmpty()) {
+ throw new IllegalArgumentException("path cannot be empty");
+ }
+ if (activeJobCache == null || activeJobCache.isEmpty()) {
+ throw new IllegalArgumentException("activeJobCache cannot be empty");
+ }
+
+ if (!zkPathExists(path)) {
+ createZkPath(path);
+ }
+
+ updateZkPath(path, activeJobCache);
+ }
+
+ private boolean zkPathExists(String path) throws Exception {
+ return zkClient.checkExists().forPath(path) != null;
+ }
+
+ private void createZkPath(String path) throws Exception {
+ zkClient.create().creatingParentsIfNeeded().forPath(path);
+ }
+
+ private void updateZkPath(String path, String value) throws Exception {
+ zkClient.setData().forPath(path, value.getBytes(Charset.defaultCharset()));
+ }
+}
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/jobcache/NonEmptyStringValidator.java b/warehouse/ingest-core/src/main/java/datawave/ingest/jobcache/NonEmptyStringValidator.java
new file mode 100644
index 0000000000..317c8053c7
--- /dev/null
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/jobcache/NonEmptyStringValidator.java
@@ -0,0 +1,16 @@
+package datawave.ingest.jobcache;
+
+import com.beust.jcommander.IParameterValidator;
+import com.beust.jcommander.ParameterException;
+
+/**
+ * Validates a parameter is a non-empty String.
+ */
+public class NonEmptyStringValidator implements IParameterValidator {
+ @Override
+ public void validate(String parameter, String value) throws ParameterException {
+ if (value == null || value.isEmpty()) {
+ throw new ParameterException(parameter + " must be a non-empty String.");
+ }
+ }
+}
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/jobcache/SetActiveCommand.java b/warehouse/ingest-core/src/main/java/datawave/ingest/jobcache/SetActiveCommand.java
new file mode 100644
index 0000000000..270b7cae4b
--- /dev/null
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/jobcache/SetActiveCommand.java
@@ -0,0 +1,69 @@
+package datawave.ingest.jobcache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+
+/**
+ * Command line tool to set the active job cache in Zookeeper.
+ */
+@Parameters(commandDescription = "Sets the active job cache in Zookeeper.")
+public class SetActiveCommand {
+
+ private static final int ZK_NUM_RETRIES = 3;
+ private static final int ZK_RETRY_SLEEP_MS = 1000;
+
+ @Parameter(names = {"-z", "--zookeepers"}, description = "The zookeeper servers to update.", required = true, validateWith = NonEmptyStringValidator.class)
+ private String zookeepers;
+
+ @Parameter(names = {"-p", "--path"}, description = "The zookeeper path where the active job cache will be stored.", required = true,
+ validateWith = NonEmptyStringValidator.class)
+ private String zkPath;
+
+ @Parameter(names = {"-j", "--job-cache"}, description = "The full HDFS path to the active job cache (e.g. 'hdfs://ingest/data/jobCacheA').",
+ required = true, validateWith = NonEmptyStringValidator.class)
+ private String jobCache;
+
+ @Parameter(names = {"-h", "--help"}, description = "Prints the command usage.", help = true)
+ private boolean help;
+
+ public void run() {
+ try (CuratorFramework zkClient = CuratorFrameworkFactory.newClient(zookeepers, new ExponentialBackoffRetry(ZK_RETRY_SLEEP_MS, ZK_NUM_RETRIES))) {
+ zkClient.start();
+
+ new ActiveSetter(zkClient).set(zkPath, jobCache);
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to update " + zkPath + " to " + jobCache + ". Try again.", e);
+ }
+ }
+
+ public boolean isHelp() {
+ return help;
+ }
+
+ public static void main(String[] args) {
+ SetActiveCommand tool = new SetActiveCommand();
+ JCommander jcommander = JCommander.newBuilder().addObject(tool).build();
+
+ try {
+ jcommander.parse(args);
+
+ if (tool.isHelp()) {
+ jcommander.usage();
+ } else {
+ tool.run();
+ }
+
+ } catch (ParameterException e) {
+ System.err.println(e.getMessage());
+ jcommander.usage();
+ System.exit(1);
+ }
+ }
+}
diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/jobcache/ActiveSetterTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/jobcache/ActiveSetterTest.java
new file mode 100644
index 0000000000..630111801f
--- /dev/null
+++ b/warehouse/ingest-core/src/test/java/datawave/ingest/jobcache/ActiveSetterTest.java
@@ -0,0 +1,83 @@
+package datawave.ingest.jobcache;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ActiveSetterTest {
+
+ private static final String ACTIVE_PATH = "/datawave/activeJobCache";
+
+ private static TestingServer ZK_SERVER;
+ private static CuratorFramework ZK_CLIENT;
+
+ private ActiveSetter activeSetter;
+
+ @BeforeClass
+ public static void setupAll() throws Exception {
+ ZK_SERVER = new TestingServer();
+ ZK_SERVER.start();
+
+ ZK_CLIENT = CuratorFrameworkFactory.newClient(ZK_SERVER.getConnectString(), new ExponentialBackoffRetry(1000, 3));
+ ZK_CLIENT.start();
+ }
+
+ @Before
+ public void setup() {
+ activeSetter = new ActiveSetter(ZK_CLIENT);
+ }
+
+ @Test
+ public void shouldSetActive() throws Exception {
+ String activeJobCache = "/data/jobCacheA";
+
+ activeSetter.set(ACTIVE_PATH, activeJobCache);
+
+ assertZkData(ACTIVE_PATH, activeJobCache);
+ }
+
+ @Test
+ public void shouldOverwrite() throws Exception {
+ String activeJobCache1 = "/data/jobCacheA";
+ String activeJobCache2 = "/data/jobCacheB";
+
+ activeSetter.set(ACTIVE_PATH, activeJobCache1);
+ activeSetter.set(ACTIVE_PATH, activeJobCache2);
+
+ assertZkData(ACTIVE_PATH, activeJobCache2);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldNotAcceptEmptyPath() throws Exception {
+ String emptyJobCache = "";
+
+ activeSetter.set(ACTIVE_PATH, emptyJobCache);
+ }
+
+ private void assertZkData(String path, String expectedValue) throws Exception {
+ String actualValue = new String(ZK_CLIENT.getData().forPath(path), Charset.defaultCharset());
+
+ assertEquals(expectedValue, actualValue);
+ }
+
+ @AfterClass
+ public static void tearDownAll() throws IOException {
+ if (ZK_CLIENT != null) {
+ ZK_CLIENT.close();
+ }
+
+ if (ZK_SERVER != null) {
+ ZK_SERVER.stop();
+ }
+ }
+}
diff --git a/warehouse/ingest-scripts/src/main/resources/bin/ingest/findJars.sh b/warehouse/ingest-scripts/src/main/resources/bin/ingest/findJars.sh
index 6afb4f20f7..4b20424ee0 100644
--- a/warehouse/ingest-scripts/src/main/resources/bin/ingest/findJars.sh
+++ b/warehouse/ingest-scripts/src/main/resources/bin/ingest/findJars.sh
@@ -20,11 +20,11 @@ findAccumuloJar (){
ls -1 $WAREHOUSE_ACCUMULO_LIB/$1-[0-9]*.jar | sort | tail -1
}
findZookeeperJar(){
- result=$(ls -1 $ZOOKEEPER_HOME/zookeeper-*.jar 2>/dev/null | head -1)
- [[ -f $result ]] || result=$(ls -1 $ZOOKEEPER_HOME/lib/zookeeper-*.jar | head -1)
+ result=$(ls -1 $ZOOKEEPER_HOME/$1-*.jar 2>/dev/null | head -1)
+ [[ -f $result ]] || result=$(ls -1 $ZOOKEEPER_HOME/lib/$1-*.jar | head -1)
+ echo $result
}
-
CONF_DIR=../../config
DATAWAVE_INDEX_STATS_JAR=$(findJar datawave-index-stats)
DATAWAVE_INGEST_CSV_JAR=$(findJar datawave-ingest-csv)
@@ -94,7 +94,8 @@ INFINISPAN_CORE_JAR=$(findJar infinispan-core)
INFINISPAN_COMMONS_JAR=$(findJar infinispan-commons)
JBOSS_LOGGING_JAR=$(findJar jboss-logging)
JGROUPS_JAR=$(findJar jgroups)
-ZOOKEEPER_JAR=$(findZookeeperJar)
+ZOOKEEPER_JAR=$(findZookeeperJar zookeeper)
+ZOOKEEPER_JUTE_JAR=$(findZookeeperJar zookeeper-jute)
DATAWAVE_QUERY_CORE_JAR=$(findJar datawave-query-core)
COMMONS_JEXL_JAR=$(findJar commons-jexl3)
PROTOSTUFF_API_JAR=$(findJar protostuff-api)
diff --git a/warehouse/ingest-scripts/src/main/resources/bin/ingest/ingest-env.sh b/warehouse/ingest-scripts/src/main/resources/bin/ingest/ingest-env.sh
index 422c211baf..c1a411eef1 100755
--- a/warehouse/ingest-scripts/src/main/resources/bin/ingest/ingest-env.sh
+++ b/warehouse/ingest-scripts/src/main/resources/bin/ingest/ingest-env.sh
@@ -195,6 +195,8 @@ HDFS_BASE_DIR="${HDFS_BASE_DIR}"
BASE_WORK_DIR="${BASE_WORK_DIR}"
BASE_WORK_DIR="${BASE_WORK_DIR:-/datawave/ingest/work}"
+ACTIVE_JOB_CACHE_PATH="${ACTIVE_JOB_CACHE_PATH}"
+
HDFS_MONITOR_ARGS="${HDFS_MONITOR_ARGS}"
MONITOR_SERVER_HOST="${MONITOR_SERVER_HOST}"
diff --git a/warehouse/ingest-scripts/src/main/resources/bin/ingest/ingest-libs.sh b/warehouse/ingest-scripts/src/main/resources/bin/ingest/ingest-libs.sh
index 9cadcff87d..51a9509791 100755
--- a/warehouse/ingest-scripts/src/main/resources/bin/ingest/ingest-libs.sh
+++ b/warehouse/ingest-scripts/src/main/resources/bin/ingest/ingest-libs.sh
@@ -68,6 +68,7 @@ CLASSPATH=${CLASSPATH}:${INFINISPAN_COMMONS_JAR}
CLASSPATH=${CLASSPATH}:${JBOSS_LOGGING_JAR}
CLASSPATH=${CLASSPATH}:${JGROUPS_JAR}
CLASSPATH=${CLASSPATH}:${ZOOKEEPER_JAR}
+CLASSPATH=${CLASSPATH}:${ZOOKEEPER_JUTE_JAR}
CLASSPATH=${CLASSPATH}:${OPENCSV_JAR}
CLASSPATH=${CLASSPATH}:${STREAMLIB}
CLASSPATH=${CLASSPATH}:${JCOMMANDER_JAR}
diff --git a/warehouse/ingest-scripts/src/main/resources/bin/ingest/load-job-cache.sh b/warehouse/ingest-scripts/src/main/resources/bin/ingest/load-job-cache.sh
index 0761f4cb58..85236cc91c 100755
--- a/warehouse/ingest-scripts/src/main/resources/bin/ingest/load-job-cache.sh
+++ b/warehouse/ingest-scripts/src/main/resources/bin/ingest/load-job-cache.sh
@@ -12,6 +12,7 @@ THIS_DIR="${THIS_SCRIPT%/*}"
cd $THIS_DIR
. ../ingest/ingest-env.sh
+. ../ingest/ingest-libs.sh
. ../ingest/job-cache-env.sh
# Check that there are no other instances of this script running
@@ -99,6 +100,25 @@ else
echo "Warehouse and ingest are one in the same. Assuming the warehouse job cache loading is sufficient"
fi
+# Update Zookeeper if we have an active job cache path
+if [[ -n "${ACTIVE_JOB_CACHE_PATH}" ]]; then
+ if ! java -cp ${CLASSPATH} datawave.ingest.jobcache.SetActiveCommand \
+ --zookeepers ${INGEST_ZOOKEEPERS} \
+ --path ${ACTIVE_JOB_CACHE_PATH} \
+ --job-cache "${INGEST_HDFS_NAME_NODE}${JOB_CACHE_DIR}"; then
+ echo "[ERROR] Failed to set active ingest job cache"
+ fi
+
+ if [[ "$WAREHOUSE_HDFS_NAME_NODE" != "$INGEST_HDFS_NAME_NODE" ]]; then
+ if ! java -cp ${CLASSPATH} datawave.ingest.jobcache.SetActiveCommand \
+ --zookeepers ${WAREHOUSE_ZOOKEEPERS} \
+ --path ${ACTIVE_JOB_CACHE_PATH} \
+ --job-cache "${WAREHOUSE_HDFS_NAME_NODE}${JOB_CACHE_DIR}"; then
+ echo "[ERROR] Failed to set active warehouse job cache"
+ fi
+ fi
+fi
+
# Remove the prepared directory
rm -r -f $tmpdir
trap - INT TERM EXIT