From 41bc94895367ed3a19703882d7a90d6f440ef9c2 Mon Sep 17 00:00:00 2001 From: Julia Egorova Date: Fri, 10 Jan 2025 21:01:02 +0300 Subject: [PATCH] Shutdown `subscription_executor` on close and reconnect (#155) * Shutdown subscription_executor on close and reconnect * Keep multiple versions of nats-server * Close dangling nats threads before #close and #reconnect test --- lib/nats/io/client.rb | 7 +++++ scripts/nats-server | 7 ++--- spec/client_drain_spec.rb | 5 +++- spec/client_reconnect_spec.rb | 36 ++++++++++++++++++++++++++ spec/client_spec.rb | 41 ++++++++++++++++++++++++++++++ spec/support/nats_server_helper.rb | 5 ++++ 6 files changed, 97 insertions(+), 4 deletions(-) diff --git a/lib/nats/io/client.rb b/lib/nats/io/client.rb index 2b640a7..a4f7635 100644 --- a/lib/nats/io/client.rb +++ b/lib/nats/io/client.rb @@ -325,6 +325,7 @@ def connect(uri = nil, opts = {}) opts[:max_outstanding_pings] = ENV["NATS_MAX_OUTSTANDING_PINGS"].to_i unless ENV["NATS_MAX_OUTSTANDING_PINGS"].nil? opts[:connect_timeout] ||= NATS::IO::DEFAULT_CONNECT_TIMEOUT opts[:drain_timeout] ||= NATS::IO::DEFAULT_DRAIN_TIMEOUT + opts[:close_timeout] ||= NATS::IO::DEFAULT_CLOSE_TIMEOUT @options = opts # Process servers in the NATS cluster and pick one to connect @@ -1298,6 +1299,8 @@ def process_op_error(e) @flusher_thread.exit if @flusher_thread.alive? @ping_interval_thread.exit if @ping_interval_thread.alive? + @subscription_executor.shutdown + attempt_reconnect rescue NATS::IO::NoServersError => e @last_err = e @@ -1568,6 +1571,9 @@ def close_connection(conn_status, do_cbs = true) @read_loop_thread.exit end + @subscription_executor&.shutdown + @subscription_executor&.wait_for_termination(options[:close_timeout]) + # TODO: Delete any other state which we are not using here too. synchronize do @pongs.synchronize do @@ -1857,6 +1863,7 @@ module IO DEFAULT_CONNECT_TIMEOUT = 2 DEFAULT_READ_WRITE_TIMEOUT = 2 DEFAULT_DRAIN_TIMEOUT = 30 + DEFAULT_CLOSE_TIMEOUT = 30 # Default Pending Limits DEFAULT_SUB_PENDING_MSGS_LIMIT = 65536 diff --git a/scripts/nats-server b/scripts/nats-server index 5829bb5..db92bea 100755 --- a/scripts/nats-server +++ b/scripts/nats-server @@ -6,12 +6,13 @@ export DEFAULT_NATS_SERVER_VERSION=latest export NATS_SERVER_VERSION="${NATS_SERVER_VERSION:=$DEFAULT_NATS_SERVER_VERSION}" platform=$(uname -s) +server_path=tmp/nats-server/nats-server-$platform-$NATS_SERVER_VERSION -if [ ! -f ./tmp/nats-server/nats-server-$platform ]; then +if [ ! -f ./$server_path ]; then echo "NATS server is not installed, downloading..." mkdir -p tmp/nats-server curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$NATS_SERVER_VERSION | PREFIX=$(pwd)/tmp/nats-server/ sh - mv tmp/nats-server/nats-server tmp/nats-server/nats-server-$platform + mv tmp/nats-server/nats-server $server_path fi -./tmp/nats-server/nats-server-$platform $@ +./$server_path $@ diff --git a/spec/client_drain_spec.rb b/spec/client_drain_spec.rb index 127d9d8..ae44c68 100644 --- a/spec/client_drain_spec.rb +++ b/spec/client_drain_spec.rb @@ -99,7 +99,7 @@ end it "should report drain timeout error" do - nc = NATS.connect(drain_timeout: 0.5) + nc = NATS.connect(drain_timeout: 0.5, close_timeout: 1) nc2 = NATS.connect future = Future.new @@ -157,5 +157,8 @@ result = future.wait_for(2) expect(result).to eql(:error) expect(errors.first).to be_a(NATS::IO::DrainTimeoutError) + + nc.close + nc2.close end end diff --git a/spec/client_reconnect_spec.rb b/spec/client_reconnect_spec.rb index 7f0a70d..fde414f 100644 --- a/spec/client_reconnect_spec.rb +++ b/spec/client_reconnect_spec.rb @@ -619,4 +619,40 @@ expect(nats.status).to eql(NATS::IO::CLOSED) end end + + describe "#process_op_error" do + # Close all dangling nats threads from previous tests + before do + Thread.list.each do |thread| + thread.exit if thread.name&.start_with?("nats:") + end + end + + let(:responder) { NATS.connect } + + let(:requester) do + NATS.connect( + reconnect: true, + reconnect_time_wait: 2, + max_reconnect_attempts: 1 + ) + end + + it "closes all its threads before reconnection" do + responder.subscribe("foo") { |msg| msg.respond("bar") } + requester.request("foo") + + nats_threads = Thread.list.select do |thread| + thread.name&.start_with?("nats:") + end + + @s.restart + sleep 2 + + expect(Thread.list & nats_threads).to be_empty + + responder.close + requester.close + end + end end diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 87cb649..c316601 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -557,4 +557,45 @@ end.to_not raise_error end end + + describe "#close" do + context "when client has established a connection" do + # Close all dangling nats threads from previous tests + before do + Thread.list.each do |thread| + thread.exit if thread.name&.start_with?("nats:") + end + end + + let(:responder) { NATS.connect(servers: [@s.uri]) } + let(:requester) { NATS.connect(servers: [@s.uri]) } + + it "closes all its threads" do + responder.subscribe("foo") { |msg| msg.respond("bar") } + requester.request("foo") + + nats_threads = Thread.list.select do |thread| + thread.name&.start_with?("nats:") + end + + requester.close + responder.close + + expect(Thread.list & nats_threads).to be_empty + end + end + + context "when client has not established a connection yet" do + let(:nats) { NATS::IO::Client.new } + + it "closes without a hitch" do + nats.close + expect(nats.closed?).to be_truthy + end + + it "closes all its threads" do + expect { nats.close }.not_to change { Thread.list.count } + end + end + end end diff --git a/spec/support/nats_server_helper.rb b/spec/support/nats_server_helper.rb index a8e64ea..95fe6c3 100644 --- a/spec/support/nats_server_helper.rb +++ b/spec/support/nats_server_helper.rb @@ -99,6 +99,11 @@ def kill_server end end + def restart + kill_server + start_server(true) + end + def wait_for_server(uri, max_wait = 5) # :nodoc: wait = max_wait.to_f loop do