diff --git a/src/main/java/io/vertx/redis/client/impl/PooledRedisConnection.java b/src/main/java/io/vertx/redis/client/impl/PooledRedisConnection.java index 7fb9f2b8..793f7ea3 100644 --- a/src/main/java/io/vertx/redis/client/impl/PooledRedisConnection.java +++ b/src/main/java/io/vertx/redis/client/impl/PooledRedisConnection.java @@ -10,6 +10,7 @@ import io.vertx.redis.client.Response; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; /** * A pooled Redis connection @@ -20,6 +21,7 @@ public class PooledRedisConnection implements RedisConnection { private final RedisConnectionInternal connection; private final PoolMetrics metrics; private final Object metric; + private final AtomicBoolean ended = new AtomicBoolean(); public PooledRedisConnection(Lease lease, PoolMetrics poolMetrics, Object metric) { this.lease = lease; @@ -88,6 +90,9 @@ public RedisConnection endHandler(@Nullable Handler endHandler) { public Future close() { if (connection.reset()) { lease.recycle(); + } + + if (ended.compareAndSet(false, true)) { if (metrics != null) { metrics.end(metric); } diff --git a/src/test/java/io/vertx/tests/redis/client/RedisPoolMetricsTest.java b/src/test/java/io/vertx/tests/redis/client/RedisPoolMetricsTest.java index 8dc83717..48819922 100644 --- a/src/test/java/io/vertx/tests/redis/client/RedisPoolMetricsTest.java +++ b/src/test/java/io/vertx/tests/redis/client/RedisPoolMetricsTest.java @@ -97,6 +97,33 @@ public void simpleTest(TestContext should) { }); } + @Test + public void taintedConnection(TestContext test) { + Async async = test.async(); + + Redis client = Redis.createClient(rule.vertx(), new RedisOptions().setConnectionString(redis.getRedisUri())); + client.connect() + .compose(conn -> { + test.assertEquals(0, getMetrics().pending()); + test.assertEquals(1, getMetrics().inUse()); + + return conn.send(Request.cmd(Command.SELECT).arg(7)) // taints the connection + .compose(response -> { + test.assertEquals(0, getMetrics().pending()); + test.assertEquals(1, getMetrics().inUse()); + + return conn.close(); + }).onComplete(test.asyncAssertSuccess(ignored -> { + test.assertEquals(0, getMetrics().pending()); + test.assertEquals(0, getMetrics().inUse()); + })); + }) + .compose(ignored -> client.close()) + .onComplete(test.asyncAssertSuccess(ignored -> { + async.complete(); + })); + } + @Test public void testLifecycle(TestContext should) { final Async test = should.async();