diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java index bdf64691ba8..6bf59c3b634 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java @@ -160,6 +160,10 @@ public void shutdown() { if (readClientHandler != null) { readClientHandler.close(); } + for (TransportClient client : currentClient.values()) { + client.close(); + } + currentClient.clear(); } public FlinkShuffleClientImpl( @@ -685,8 +689,9 @@ private boolean shouldRetry(Throwable e) { public void cleanup(int shuffleId, int mapId, int attemptId) { final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId); super.cleanup(shuffleId, mapId, attemptId); - if (currentClient != null) { - currentClient.remove(mapKey); + TransportClient client = currentClient.remove(mapKey); + if (client != null) { + client.close(); } }