Skip to content

Commit

Permalink
[SDK] DataProxy SDK supports elegant metadata updates
Browse files Browse the repository at this point in the history
  • Loading branch information
castorqin committed Oct 11, 2024
1 parent c177a67 commit daaab01
Showing 1 changed file with 61 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -527,33 +527,11 @@ private void updateAllConnection(List<HostInfo> hostInfos) {

try {
writeLock();
List<HostInfo> notExistHosts = new ArrayList<>();
if (!clientMap.isEmpty()) {
logger.info("ready to close not in new HostInfo connections!");
for (HostInfo hostInfo : clientMap.keySet()) {
if (hostInfo == null) {
continue;
}
Optional<HostInfo> optionalHostInfo =
hostInfos.stream().filter(hostInfo1 -> hostInfo1.equals(hostInfo))
.findFirst();
if (optionalHostInfo.isPresent()) {
continue;
}
NettyClient client = clientMap.get(hostInfo);
if (client != null && client.isActive()) {
sender.waitForAckForChannel(client.getChannel());
client.close();
clientList.remove(client);
sender.clearCallBackByChannel(client.getChannel());
}
logger.info("ready to close not in new HostInfo connections!");
notExistHosts.add(hostInfo);
}
}
removeNotExistHost(notExistHosts);

updateAndInitConnection(hostInfos);
List<HostInfo> unHealthyHostList = findUnHealthyHostList(hostInfos);
List<HostInfo> newlyAddList = findNewlyAddList(hostInfos);
logger.info("unhealthyHostList = {},newlyAddList = {}", unHealthyHostList, newlyAddList);
updateAndInitConnection(newlyAddList, unHealthyHostList.size());
removeUnHealthyHostList(unHealthyHostList);
} catch (Exception e) {
logger.error("update Connection error", e);
} finally {
Expand All @@ -562,20 +540,67 @@ private void updateAllConnection(List<HostInfo> hostInfos) {

}

private void removeNotExistHost(List<HostInfo> notExistHosts) {
for (HostInfo notExistHost : notExistHosts) {
private List<HostInfo> findUnHealthyHostList(List<HostInfo> hostInfos) {
List<HostInfo> unHealthyHostList = new ArrayList<>();
if (!clientMap.isEmpty()) {
logger.info("ready to close not in new HostInfo connections!");
for (HostInfo hostInfo : clientMap.keySet()) {
if (hostInfo == null) {
continue;
}
Optional<HostInfo> optionalHostInfo =
hostInfos.stream().filter(hostInfo1 -> hostInfo1.equals(hostInfo))
.findFirst();
NettyClient client = clientMap.get(hostInfo);
if (optionalHostInfo.isPresent() && client.isActive()) {
continue;
}
unHealthyHostList.add(hostInfo);
}
}
return unHealthyHostList;
}

clientMap.remove(notExistHost);
clientMapData.remove(notExistHost);
clientMapHB.remove(notExistHost);
private List<HostInfo> findNewlyAddList(List<HostInfo> hostInfos) {
List<HostInfo> newlyAddList = new ArrayList<>();
if (!clientMap.isEmpty()) {
for (HostInfo hostInfo : hostInfos) {
if (hostInfo == null) {
continue;
}
Optional<HostInfo> optionalHostInfo =
clientMap.keySet().stream().filter(hostInfo1 -> hostInfo1.equals(hostInfo))
.findFirst();
if (optionalHostInfo.isPresent()) {
continue;
}
newlyAddList.add(hostInfo);
}
}
return newlyAddList;
}

channelLoadMapData.remove(notExistHost);
channelLoadMapHB.remove(notExistHost);
private void removeUnHealthyHostList(List<HostInfo> unHealthyHostList) {
for (HostInfo unHealthyHost : unHealthyHostList) {
NettyClient client = clientMap.get(unHealthyHost);
logger.info("ready to close not in new HostInfo connections!");
if (client != null && client.isActive()) {
sender.waitForAckForChannel(client.getChannel());
sender.clearCallBackByChannel(client.getChannel());
boolean close = client.close();
clientList.remove(client);
logger.info("close connections! = {} for host = {}", close, unHealthyHost);
}
clientMap.remove(unHealthyHost);
clientMapData.remove(unHealthyHost);
clientMapHB.remove(unHealthyHost);
channelLoadMapData.remove(unHealthyHost);
channelLoadMapHB.remove(unHealthyHost);
}
}

private void updateAndInitConnection(List<HostInfo> hostInfos) {
int needSize = realSize - clientMap.size();
private void updateAndInitConnection(List<HostInfo> hostInfos, int unHealthySize) {
int needSize = realSize - (clientMap.size() - unHealthySize);
if (needSize > 0) {
/* Build new channels */
for (HostInfo hostInfo : hostInfos) {
Expand Down

0 comments on commit daaab01

Please sign in to comment.