diff --git a/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/AbstractConnection.java b/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/AbstractConnection.java index 7b3ce9f65e7..082ac90b8e2 100644 --- a/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/AbstractConnection.java +++ b/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/AbstractConnection.java @@ -17,13 +17,11 @@ package org.apache.hertzbeat.collector.collect.common.cache; -import lombok.extern.slf4j.Slf4j; - /** * AbstractConnection */ -@Slf4j -public abstract class AbstractConnection implements AutoCloseable { + +public abstract class AbstractConnection implements AutoCloseable { /** * @return Returns the connection. @@ -35,8 +33,15 @@ public abstract class AbstractConnection implements AutoCloseable { */ public abstract void closeConnection() throws Exception; + /** + * Check connection when get connection. + */ + public abstract void check() throws Exception; + @Override public void close() throws Exception{ - closeConnection(); + + this.closeConnection(); } + } diff --git a/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/JdbcConnect.java b/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/JdbcConnect.java index ad828871c2b..4f1a4d8065b 100644 --- a/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/JdbcConnect.java +++ b/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/JdbcConnect.java @@ -18,6 +18,7 @@ package org.apache.hertzbeat.collector.collect.common.cache; import java.sql.Connection; +import java.sql.SQLException; import lombok.extern.slf4j.Slf4j; /** @@ -39,8 +40,24 @@ public void closeConnection() throws Exception { } } + @Override + public void check() throws SQLException { + + if (connection.isClosed()) { + throw new SQLException("Connection is closed"); + } + } + @Override public Connection getConnection() { + + try { + this.check(); + } + catch (SQLException e) { + log.error(e.getMessage()); + return null; + } return connection; } } diff --git a/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/JmxConnect.java b/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/JmxConnect.java index f0e03602c26..911ca6c2fbb 100644 --- a/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/JmxConnect.java +++ b/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/JmxConnect.java @@ -40,8 +40,24 @@ public void closeConnection() throws Exception { } } + @Override + public void check() throws Exception { + + if (connection.getConnectionId().isEmpty()) { + throw new RuntimeException("connection is closed"); + } + } + @Override public JMXConnector getConnection() { + + try { + this.check(); + } + catch (Exception e) { + log.error(e.getMessage()); + return null; + } return connection; } } diff --git a/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/MongodbConnect.java b/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/MongodbConnect.java index 73e1e920863..0d5afc256da 100644 --- a/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/MongodbConnect.java +++ b/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/MongodbConnect.java @@ -19,6 +19,7 @@ import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; +import org.bson.Document; /** * mongodb connect client @@ -38,8 +39,23 @@ public void closeConnection() throws Exception { } } + @Override + public void check() throws Exception { + + mongoClient.getDatabase("admin").runCommand(new Document("ping", 1)); + } + @Override public MongoClient getConnection() { + + try { + this.check(); + } + catch (Exception e) { + log.error(e.getMessage()); + return null; + } + return mongoClient; } } diff --git a/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/RedfishConnect.java b/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/RedfishConnect.java index b0750f72cf8..4b8233a8e93 100644 --- a/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/RedfishConnect.java +++ b/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/RedfishConnect.java @@ -38,8 +38,24 @@ public void closeConnection() throws Exception { } } + @Override + public void check() throws Exception { + + if (!reddishConnectSession.isOpen()) { + throw new RuntimeException("Connection is closed"); + } + } + @Override public ConnectSession getConnection() { + + try { + this.check(); + } + catch (Exception e) { + log.error(e.getMessage()); + return null; + } return reddishConnectSession; } } diff --git a/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/RedisConnect.java b/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/RedisConnect.java index e24be18a542..db5f2e2d389 100644 --- a/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/RedisConnect.java +++ b/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/RedisConnect.java @@ -39,8 +39,24 @@ public void closeConnection() throws Exception { } } + @Override + public void check() throws Exception { + + if (!connection.isOpen()) { + throw new RuntimeException("Connection is closed"); + } + } + @Override public StatefulConnection getConnection() { + + try { + this.check(); + } + catch (Exception e) { + log.error(e.getMessage()); + return null; + } return connection; } } diff --git a/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/SshConnect.java b/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/SshConnect.java index 5e05aada1ce..4d6ed801206 100644 --- a/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/SshConnect.java +++ b/collector/src/main/java/org/apache/hertzbeat/collector/collect/common/cache/SshConnect.java @@ -38,7 +38,23 @@ public void closeConnection() throws Exception { } } + @Override + public void check() throws Exception { + + if (!clientSession.isOpen()) { + throw new Exception("ssh connection is not open"); + } + } + public ClientSession getConnection() { + + try { + this.check(); + } + catch (Exception e) { + log.error(e.getMessage()); + return null; + } return clientSession; } } diff --git a/collector/src/main/java/org/apache/hertzbeat/collector/collect/redis/RedisCommonCollectImpl.java b/collector/src/main/java/org/apache/hertzbeat/collector/collect/redis/RedisCommonCollectImpl.java index 2c5a6d73f97..802485691dd 100644 --- a/collector/src/main/java/org/apache/hertzbeat/collector/collect/redis/RedisCommonCollectImpl.java +++ b/collector/src/main/java/org/apache/hertzbeat/collector/collect/redis/RedisCommonCollectImpl.java @@ -236,22 +236,26 @@ private StatefulRedisClusterConnection getClusterConnection(Redi * @return connection */ private StatefulConnection getStatefulConnection(CacheIdentifier identifier) { - StatefulConnection connection = null; + Optional cacheOption = connectionCommonCache.getCache(identifier, true); + if (cacheOption.isPresent()) { RedisConnect redisConnect = cacheOption.get(); - connection = redisConnect.getConnection(); - if (!connection.isOpen()) { + + try { + return redisConnect.getConnection(); + } catch (RuntimeException e) { + log.info("The Redis connection from cache is invalid, closing and removing: {}", e.getMessage()); try { - connection.closeAsync(); - } catch (Exception e) { - log.info("The redis connect form cache, close error: {}", e.getMessage()); + redisConnect.getConnection().closeAsync(); + } catch (Exception closeException) { + log.info("Error closing Redis connection: {}", closeException.getMessage()); } - connection = null; connectionCommonCache.removeCache(identifier); } } - return connection; + + return null; } /** diff --git a/collector/src/test/java/org/apache/hertzbeat/collector/collect/common/cache/CommonCacheTest.java b/collector/src/test/java/org/apache/hertzbeat/collector/collect/common/cache/CommonCacheTest.java index 91bcb8e51a8..25f671b107b 100644 --- a/collector/src/test/java/org/apache/hertzbeat/collector/collect/common/cache/CommonCacheTest.java +++ b/collector/src/test/java/org/apache/hertzbeat/collector/collect/common/cache/CommonCacheTest.java @@ -45,6 +45,10 @@ public Object getConnection() { @Override public void closeConnection() throws Exception { } + + @Override + public void check() throws Exception { + } }; }