From dd886964e5a5421bcdce5b276e979e3deb2a85c0 Mon Sep 17 00:00:00 2001 From: Trang Vu Date: Wed, 27 Jul 2016 13:56:09 +0700 Subject: [PATCH] Add retry policy configuration --- .../cql/MapConfiguredCqlClientFactory.java | 9 +++++++++ .../cql/MapConfiguredCqlClientFactoryTest.java | 17 +++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/src/main/java/com/hmsonline/trident/cql/MapConfiguredCqlClientFactory.java b/src/main/java/com/hmsonline/trident/cql/MapConfiguredCqlClientFactory.java index 3e535d3..0468abb 100644 --- a/src/main/java/com/hmsonline/trident/cql/MapConfiguredCqlClientFactory.java +++ b/src/main/java/com/hmsonline/trident/cql/MapConfiguredCqlClientFactory.java @@ -8,6 +8,7 @@ import com.datastax.driver.core.SocketOptions; import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +31,8 @@ public class MapConfiguredCqlClientFactory extends CqlClientFactory { public static final String TRIDENT_CASSANDRA_CONSISTENCY = "trident.cassandra.consistency"; public static final String TRIDENT_CASSANDRA_SERIAL_CONSISTENCY = "trident.cassandra.serial.consistency"; public static final String TRIDENT_CASSANDRA_QUERY_LOGGER_CONSTANT_THRESHOLD = "trident.cassandra.query.logger.constant.threshold"; + public static final String TRIDENT_CASSANDRA_RETRY_POLICY_ENABLE = "trident.cassandra.retry.policy.enable"; + public static final String TRIDENT_CASSANDRA_RETRY_POLICY = "trident.cassandra.retry.policy"; final Map configuration; @@ -103,6 +106,12 @@ private void configureOther() { if (StringUtils.isNotEmpty(nameConfiguration)) { builder = builder.withClusterName(nameConfiguration); } + + boolean retryPolicyEnabled = configuration.containsKey(TRIDENT_CASSANDRA_RETRY_POLICY_ENABLE) ? (boolean) configuration.get(TRIDENT_CASSANDRA_RETRY_POLICY_ENABLE) : false; + if (retryPolicyEnabled && configuration.containsKey(TRIDENT_CASSANDRA_RETRY_POLICY) && configuration.get(TRIDENT_CASSANDRA_RETRY_POLICY) != null) { + builder = builder.withRetryPolicy((RetryPolicy) configuration.get(TRIDENT_CASSANDRA_RETRY_POLICY)); + } + } private void configureLoadBalancingPolicy() { diff --git a/src/test/java/com/hmsonline/trident/cql/MapConfiguredCqlClientFactoryTest.java b/src/test/java/com/hmsonline/trident/cql/MapConfiguredCqlClientFactoryTest.java index 1a66a2c..0808a87 100644 --- a/src/test/java/com/hmsonline/trident/cql/MapConfiguredCqlClientFactoryTest.java +++ b/src/test/java/com/hmsonline/trident/cql/MapConfiguredCqlClientFactoryTest.java @@ -4,6 +4,8 @@ import java.util.HashMap; import java.util.Map; +import com.datastax.driver.core.policies.DefaultRetryPolicy; +import com.datastax.driver.core.policies.FallthroughRetryPolicy; import junit.framework.Assert; import org.junit.Test; @@ -22,6 +24,8 @@ public void testGetClusterBuilder() throws Exception { configuration.put(MapConfiguredCqlClientFactory.TRIDENT_CASSANDRA_LOCAL_DATA_CENTER_NAME, DATA_CENTER_NAME); configuration.put(MapConfiguredCqlClientFactory.TRIDENT_CASSANDRA_CONSISTENCY, DEFAULT_CONSISTENCY_LEVEL.name()); configuration.put(MapConfiguredCqlClientFactory.TRIDENT_CASSANDRA_SERIAL_CONSISTENCY, DEFAULT_SERIAL_CONSISTENCY_LEVEL.name()); + configuration.put(MapConfiguredCqlClientFactory.TRIDENT_CASSANDRA_RETRY_POLICY, FallthroughRetryPolicy.INSTANCE); + configuration.put(MapConfiguredCqlClientFactory.TRIDENT_CASSANDRA_RETRY_POLICY_ENABLE, true); final CqlClientFactory factory = new MapConfiguredCqlClientFactory(configuration); @@ -39,6 +43,19 @@ public void testGetClusterBuilder() throws Exception { Assert.assertEquals(DEFAULT_CONSISTENCY_LEVEL, clusterBuilder.getConfiguration().getQueryOptions().getConsistencyLevel()); Assert.assertEquals(DEFAULT_SERIAL_CONSISTENCY_LEVEL, clusterBuilder.getConfiguration().getQueryOptions().getSerialConsistencyLevel()); Assert.assertEquals(ProtocolOptions.Compression.NONE, clusterBuilder.getConfiguration().getProtocolOptions().getCompression()); + Assert.assertTrue(clusterBuilder.getConfiguration().getPolicies().getRetryPolicy() instanceof FallthroughRetryPolicy); + } + + @Test + public void testDisableRetryPolicy() { + final Map configuration = new HashMap(); + configuration.put(MapConfiguredCqlClientFactory.TRIDENT_CASSANDRA_CQL_HOSTS, HOSTS); + configuration.put(MapConfiguredCqlClientFactory.TRIDENT_CASSANDRA_RETRY_POLICY, FallthroughRetryPolicy.INSTANCE); + final CqlClientFactory factory = + new MapConfiguredCqlClientFactory(configuration); + + final Cluster.Builder clusterBuilder = factory.getClusterBuilder(); + Assert.assertTrue(clusterBuilder.getConfiguration().getPolicies().getRetryPolicy() instanceof DefaultRetryPolicy); } @Test