Skip to content

Commit

Permalink
Shutdown subscription_executor on close and reconnect (#155)
Browse files Browse the repository at this point in the history
* Shutdown subscription_executor on close and reconnect

* Keep multiple versions of nats-server

* Close dangling nats threads before #close and #reconnect test
  • Loading branch information
vankiru authored Jan 10, 2025
1 parent 1ad40f5 commit 41bc948
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 4 deletions.
7 changes: 7 additions & 0 deletions lib/nats/io/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ def connect(uri = nil, opts = {})
opts[:max_outstanding_pings] = ENV["NATS_MAX_OUTSTANDING_PINGS"].to_i unless ENV["NATS_MAX_OUTSTANDING_PINGS"].nil?
opts[:connect_timeout] ||= NATS::IO::DEFAULT_CONNECT_TIMEOUT
opts[:drain_timeout] ||= NATS::IO::DEFAULT_DRAIN_TIMEOUT
opts[:close_timeout] ||= NATS::IO::DEFAULT_CLOSE_TIMEOUT
@options = opts

# Process servers in the NATS cluster and pick one to connect
Expand Down Expand Up @@ -1298,6 +1299,8 @@ def process_op_error(e)
@flusher_thread.exit if @flusher_thread.alive?
@ping_interval_thread.exit if @ping_interval_thread.alive?

@subscription_executor.shutdown

attempt_reconnect
rescue NATS::IO::NoServersError => e
@last_err = e
Expand Down Expand Up @@ -1568,6 +1571,9 @@ def close_connection(conn_status, do_cbs = true)
@read_loop_thread.exit
end

@subscription_executor&.shutdown
@subscription_executor&.wait_for_termination(options[:close_timeout])

# TODO: Delete any other state which we are not using here too.
synchronize do
@pongs.synchronize do
Expand Down Expand Up @@ -1857,6 +1863,7 @@ module IO
DEFAULT_CONNECT_TIMEOUT = 2
DEFAULT_READ_WRITE_TIMEOUT = 2
DEFAULT_DRAIN_TIMEOUT = 30
DEFAULT_CLOSE_TIMEOUT = 30

# Default Pending Limits
DEFAULT_SUB_PENDING_MSGS_LIMIT = 65536
Expand Down
7 changes: 4 additions & 3 deletions scripts/nats-server
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ export DEFAULT_NATS_SERVER_VERSION=latest
export NATS_SERVER_VERSION="${NATS_SERVER_VERSION:=$DEFAULT_NATS_SERVER_VERSION}"

platform=$(uname -s)
server_path=tmp/nats-server/nats-server-$platform-$NATS_SERVER_VERSION

if [ ! -f ./tmp/nats-server/nats-server-$platform ]; then
if [ ! -f ./$server_path ]; then
echo "NATS server is not installed, downloading..."
mkdir -p tmp/nats-server
curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$NATS_SERVER_VERSION | PREFIX=$(pwd)/tmp/nats-server/ sh
mv tmp/nats-server/nats-server tmp/nats-server/nats-server-$platform
mv tmp/nats-server/nats-server $server_path
fi

./tmp/nats-server/nats-server-$platform $@
./$server_path $@
5 changes: 4 additions & 1 deletion spec/client_drain_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
end

it "should report drain timeout error" do
nc = NATS.connect(drain_timeout: 0.5)
nc = NATS.connect(drain_timeout: 0.5, close_timeout: 1)
nc2 = NATS.connect

future = Future.new
Expand Down Expand Up @@ -157,5 +157,8 @@
result = future.wait_for(2)
expect(result).to eql(:error)
expect(errors.first).to be_a(NATS::IO::DrainTimeoutError)

nc.close
nc2.close
end
end
36 changes: 36 additions & 0 deletions spec/client_reconnect_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -619,4 +619,40 @@
expect(nats.status).to eql(NATS::IO::CLOSED)
end
end

describe "#process_op_error" do
# Close all dangling nats threads from previous tests
before do
Thread.list.each do |thread|
thread.exit if thread.name&.start_with?("nats:")
end
end

let(:responder) { NATS.connect }

let(:requester) do
NATS.connect(
reconnect: true,
reconnect_time_wait: 2,
max_reconnect_attempts: 1
)
end

it "closes all its threads before reconnection" do
responder.subscribe("foo") { |msg| msg.respond("bar") }
requester.request("foo")

nats_threads = Thread.list.select do |thread|
thread.name&.start_with?("nats:")
end

@s.restart
sleep 2

expect(Thread.list & nats_threads).to be_empty

responder.close
requester.close
end
end
end
41 changes: 41 additions & 0 deletions spec/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -557,4 +557,45 @@
end.to_not raise_error
end
end

describe "#close" do
context "when client has established a connection" do
# Close all dangling nats threads from previous tests
before do
Thread.list.each do |thread|
thread.exit if thread.name&.start_with?("nats:")
end
end

let(:responder) { NATS.connect(servers: [@s.uri]) }
let(:requester) { NATS.connect(servers: [@s.uri]) }

it "closes all its threads" do
responder.subscribe("foo") { |msg| msg.respond("bar") }
requester.request("foo")

nats_threads = Thread.list.select do |thread|
thread.name&.start_with?("nats:")
end

requester.close
responder.close

expect(Thread.list & nats_threads).to be_empty
end
end

context "when client has not established a connection yet" do
let(:nats) { NATS::IO::Client.new }

it "closes without a hitch" do
nats.close
expect(nats.closed?).to be_truthy
end

it "closes all its threads" do
expect { nats.close }.not_to change { Thread.list.count }
end
end
end
end
5 changes: 5 additions & 0 deletions spec/support/nats_server_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ def kill_server
end
end

def restart
kill_server
start_server(true)
end

def wait_for_server(uri, max_wait = 5) # :nodoc:
wait = max_wait.to_f
loop do
Expand Down

0 comments on commit 41bc948

Please sign in to comment.