diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java index 9c267afc31..4fcebff88b 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java @@ -527,33 +527,11 @@ private void updateAllConnection(List hostInfos) { try { writeLock(); - List notExistHosts = new ArrayList<>(); - if (!clientMap.isEmpty()) { - logger.info("ready to close not in new HostInfo connections!"); - for (HostInfo hostInfo : clientMap.keySet()) { - if (hostInfo == null) { - continue; - } - Optional optionalHostInfo = - hostInfos.stream().filter(hostInfo1 -> hostInfo1.equals(hostInfo)) - .findFirst(); - if (optionalHostInfo.isPresent()) { - continue; - } - NettyClient client = clientMap.get(hostInfo); - if (client != null && client.isActive()) { - sender.waitForAckForChannel(client.getChannel()); - client.close(); - clientList.remove(client); - sender.clearCallBackByChannel(client.getChannel()); - } - logger.info("ready to close not in new HostInfo connections!"); - notExistHosts.add(hostInfo); - } - } - removeNotExistHost(notExistHosts); - - updateAndInitConnection(hostInfos); + List unHealthyHostList = findUnHealthyHostList(hostInfos); + List newlyAddList = findNewlyAddList(hostInfos); + logger.info("unhealthyHostList = {},newlyAddList = {}", unHealthyHostList, newlyAddList); + updateAndInitConnection(newlyAddList, unHealthyHostList.size()); + removeUnHealthyHostList(unHealthyHostList); } catch (Exception e) { logger.error("update Connection error", e); } finally { @@ -562,20 +540,67 @@ private void updateAllConnection(List hostInfos) { } - private void removeNotExistHost(List notExistHosts) { - for (HostInfo notExistHost : notExistHosts) { + private List findUnHealthyHostList(List hostInfos) { + List unHealthyHostList = new ArrayList<>(); + if (!clientMap.isEmpty()) { + logger.info("ready to close not in new HostInfo connections!"); + for (HostInfo hostInfo : clientMap.keySet()) { + if (hostInfo == null) { + continue; + } + Optional optionalHostInfo = + hostInfos.stream().filter(hostInfo1 -> hostInfo1.equals(hostInfo)) + .findFirst(); + NettyClient client = clientMap.get(hostInfo); + if (optionalHostInfo.isPresent() && client.isActive()) { + continue; + } + unHealthyHostList.add(hostInfo); + } + } + return unHealthyHostList; + } - clientMap.remove(notExistHost); - clientMapData.remove(notExistHost); - clientMapHB.remove(notExistHost); + private List findNewlyAddList(List hostInfos) { + List newlyAddList = new ArrayList<>(); + if (!clientMap.isEmpty()) { + for (HostInfo hostInfo : hostInfos) { + if (hostInfo == null) { + continue; + } + Optional optionalHostInfo = + clientMap.keySet().stream().filter(hostInfo1 -> hostInfo1.equals(hostInfo)) + .findFirst(); + if (optionalHostInfo.isPresent()) { + continue; + } + newlyAddList.add(hostInfo); + } + } + return newlyAddList; + } - channelLoadMapData.remove(notExistHost); - channelLoadMapHB.remove(notExistHost); + private void removeUnHealthyHostList(List unHealthyHostList) { + for (HostInfo unHealthyHost : unHealthyHostList) { + NettyClient client = clientMap.get(unHealthyHost); + logger.info("ready to close not in new HostInfo connections!"); + if (client != null && client.isActive()) { + sender.waitForAckForChannel(client.getChannel()); + sender.clearCallBackByChannel(client.getChannel()); + boolean close = client.close(); + clientList.remove(client); + logger.info("close connections! = {} for host = {}", close, unHealthyHost); + } + clientMap.remove(unHealthyHost); + clientMapData.remove(unHealthyHost); + clientMapHB.remove(unHealthyHost); + channelLoadMapData.remove(unHealthyHost); + channelLoadMapHB.remove(unHealthyHost); } } - private void updateAndInitConnection(List hostInfos) { - int needSize = realSize - clientMap.size(); + private void updateAndInitConnection(List hostInfos, int unHealthySize) { + int needSize = realSize - (clientMap.size() - unHealthySize); if (needSize > 0) { /* Build new channels */ for (HostInfo hostInfo : hostInfos) {