Skip to content

Commit

Permalink
DBZ-7904 Enhancing the threads utility class for broader use
Browse files Browse the repository at this point in the history
Forced Shutdown for errored pubsub managed channel

Forced Shutdown for errored pubsub managed channel

DBZ-7904 Enhancing the threads utility class for broader use
  • Loading branch information
Ankur Gupta authored and jpechane committed Jun 3, 2024
1 parent bfe425b commit ccbcc14
Showing 1 changed file with 28 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;

import io.debezium.util.Threads;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
Expand Down Expand Up @@ -131,6 +130,9 @@ public interface PublisherBuilder {
@ConfigProperty(name = PROP_PREFIX + "wait.message.delivery.timeout.ms", defaultValue = "30000")
Integer waitMessageDeliveryTimeout;

@ConfigProperty(name = PROP_PREFIX + "channel.shutdown.timeout.ms", defaultValue = "30000")
Integer channelShutdownTimeout;

@ConfigProperty(name = PROP_PREFIX + "address")
Optional<String> address;

Expand Down Expand Up @@ -213,12 +215,33 @@ void close() {
publisher.shutdown();
}
catch (Exception e) {
LOGGER.warn("Exception while closing publisher: {}", e);
LOGGER.warn("Exception while closing publisher: {}", e.getMessage(), e);
}
});
shutdownChannel(channel);
}

void shutdownChannel(ManagedChannel channel) {

if (channel != null && !channel.isShutdown()) {
channel.shutdown();
ExecutorService executor = Threads.newSingleThreadExecutor(this.getClass(), projectId, "managed-channel-shutdown-coordinator");

Future<?> future = executor.submit(() -> {
channel.shutdown();
try {
channel.awaitTermination(channelShutdownTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

try {
future.get(channelShutdownTimeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOGGER.warn("Exception while shutting down the managed channel {}", e.getMessage(), e);
} finally {
executor.shutdownNow();
}
}
}

Expand Down

0 comments on commit ccbcc14

Please sign in to comment.