Skip to content

Commit

Permalink
- specs: more flaky/stuck tests
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Jan 9, 2025
1 parent abe484c commit ffd6218
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 145 deletions.
174 changes: 59 additions & 115 deletions spec/client_cluster_reconnect_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@
it "should connect to another server if possible before reconnect" do
@s3.kill_server

mon = Monitor.new
reconnected = mon.new_cond
reconnected = Future.new

nats = NATS.connect(servers: [@s1.uri, @s2.uri], dont_randomize_servers: true)

Expand All @@ -100,9 +99,7 @@
reconnects = 0
nats.on_reconnect do
reconnects += 1
mon.synchronize do
reconnected.signal
end
reconnected.set_result(:ok)
end

msgs = []
Expand All @@ -119,9 +116,7 @@
sleep 0.1
end

mon.synchronize do
reconnected.wait(1)
end
expect(reconnected.wait_for(1)).to eq :ok
expect(nats.connected_server).to eql(@s2.uri)
nats.close

Expand All @@ -133,8 +128,7 @@
it "should connect to another server if possible before reconnect using multiple uris" do
@s3.kill_server

mon = Monitor.new
reconnected = mon.new_cond
reconnected = Future.new

nats = NATS::IO::Client.new
nats.connect("nats://secret:[email protected]:4242,nats://secret:[email protected]:4243", dont_randomize_servers: true)
Expand All @@ -152,9 +146,7 @@
reconnects = 0
nats.on_reconnect do
reconnects += 1
mon.synchronize do
reconnected.signal
end
reconnected.set_result(:ok)
end

msgs = []
Expand All @@ -171,9 +163,7 @@
sleep 0.1
end

mon.synchronize do
reconnected.wait(1)
end
expect(reconnected.wait_for(1)).to eq :ok
expect(nats.connected_server.to_s).to eql(@s2.uri.to_s)
nats.close

Expand All @@ -185,8 +175,7 @@
it "should gracefully reconnect to another available server while publishing" do
@s3.kill_server

mon = Monitor.new
reconnected = mon.new_cond
reconnected = Future.new

nats = NATS::IO::Client.new
nats.connect({
Expand All @@ -207,56 +196,40 @@
reconnects = 0
nats.on_reconnect do |s|
reconnects += 1
mon.synchronize do
reconnected.signal
end
reconnected.set_result(:ok)
end

errors = []
nats.on_error do |e|
errors << e
end

msgs = []
msg_counter = 0
nats.subscribe("hello.*") do |msg|
msgs << msg
msg_counter += 1
if msg_counter == 100
@s1.kill_server
end
end
nats.flush
expect(nats.connected_server.to_s).to eql(@s1.uri.to_s)

msg_payload = "A" * 10_000
1000.times do |n|
# Receive 100 messages initially and then failover
if n == 100
nats.flush

# Wait a bit for all messages
sleep 0.5
expect(msgs.count).to eql(100)
@s1.kill_server
elsif n % 100 == 0
# yield a millisecond
sleep 0.001
end

# Messages sent here can be lost
msg_payload = "A" * 1_000
100.times do |n|
nats.publish("hello.#{n}", msg_payload)
end

# Flush everything we have sent so far
nats.flush(5)
errors = []
errors.each do |e|
errors << e
end
mon.synchronize { reconnected.wait(1) }

expect(reconnected.wait_for(2)).to eq :ok
expect(nats.connected_server).to eql(@s2.uri)
nats.close

expect(reconnects).to eql(1)
expect(disconnects).to eql(2)
expect(closes).to eql(1)
expect(errors).to be_empty
expect(errors.size).to eq(1)
end
end

Expand All @@ -267,18 +240,20 @@
end

after do
@s1.kill_server
[@s1, @s2, @s3].each do |s|
s.kill_server
end
end

it "should reconnect to nodes discovered from seed server" do
# Nodes join to cluster before we try to connect
[@s2, @s3].each do |s|
s.start_server(true)
context "with nodes joined before first connect" do
before do
[@s2, @s3].each do |s|
s.start_server(true)
end
end

begin
mon = Monitor.new
reconnected = mon.new_cond
it "should reconnect to nodes discovered from seed server" do
reconnected = Future.new

nats = NATS::IO::Client.new
disconnects = 0
Expand All @@ -294,9 +269,7 @@
reconnects = 0
nats.on_reconnect do
reconnects += 1
mon.synchronize do
reconnected.signal
end
reconnected.set_result(:ok)
end

errors = []
Expand All @@ -309,9 +282,8 @@
expect(nats.connected_server).to eql(@s1.uri)
@s1.kill_server
sleep 0.2
mon.synchronize do
reconnected.wait(5)
end

reconnected.wait_for(3)

# Reconnected...
# expect(nats.connected_server).to eql(@s2.uri)
Expand All @@ -325,23 +297,11 @@
expect(nats.last_error).to eql(nil)

nats.close
ensure
# Wrap up test
[@s2, @s3].each do |s|
s.kill_server
end
end
end

it "should reconnect to nodes discovered from seed server with single uri" do
skip "FIXME: flaky test"

# Nodes join to cluster before we try to connect
[@s2, @s3].each do |s|
s.start_server(true)
end
it "should reconnect to nodes discovered from seed server with single uri" do
skip "FIXME: flaky test"

begin
mon = Monitor.new
reconnected = mon.new_cond

Expand Down Expand Up @@ -390,17 +350,11 @@
expect(nats.last_error).to eql(nil)

nats.close
ensure
# Wrap up test
[@s2, @s3].each do |s|
s.kill_server
end
end
end

it "should reconnect to nodes discovered in the cluster after first connect" do
mon = Monitor.new
reconnected = mon.new_cond
reconnected = Future.new

nats = NATS::IO::Client.new
disconnects = 0
Expand All @@ -416,9 +370,7 @@
reconnects = 0
nats.on_reconnect do
reconnects += 1
mon.synchronize do
reconnected.signal
end
reconnected.set_result(:ok)
end

errors = []
Expand All @@ -436,42 +388,34 @@
})
expect(nats.connected_server).to eql(@s1.uri)

begin
# Couple of servers join...
[@s2, @s3].each do |s|
s.start_server(true)
end
nats.flush
# Couple of servers join...
[@s2, @s3].each do |s|
s.start_server(true)
end
nats.flush

# Wait for a bit before disconnecting from original server
nats.flush
@s1.kill_server
mon.synchronize do
reconnected.wait(3)
end
# Wait for a bit before disconnecting from original server
nats.flush
@s1.kill_server

# We still consider the original node and we have new ones
# which can be used to failover.
expect(nats.servers.count).to eql(3)
reconnected.wait_for(3)

# Only 2 new ones should be discovered servers even after reconnect
expect(nats.discovered_servers.count).to eql(2)
expect(nats.connected_server).to eql(@s2.uri)
expect(reconnects).to eql(1)
expect(disconnects).to eql(1)
expect(closes).to eql(0)
expect(errors.count).to eql(2)
expect(errors.first).to be_a(Errno::ECONNRESET)
expect(errors.last).to be_a(Errno::ECONNREFUSED)
expect(nats.last_error).to be_a(Errno::ECONNREFUSED)
# We still consider the original node and we have new ones
# which can be used to failover.
expect(nats.servers.count).to eql(3)

nats.close
ensure
# Wrap up test
[@s2, @s3].each do |s|
s.kill_server
end
end
# Only 2 new ones should be discovered servers even after reconnect
expect(nats.discovered_servers.count).to eql(2)
expect(nats.connected_server).to eql(@s2.uri)
expect(reconnects).to eql(1)
expect(disconnects).to eql(1)
expect(closes).to eql(0)
expect(errors.count).to eql(2)
expect(errors.first).to be_a(Errno::ECONNRESET)
expect(errors.last).to be_a(Errno::ECONNREFUSED)
expect(nats.last_error).to be_a(Errno::ECONNREFUSED)

nats.close
end
end
end
23 changes: 15 additions & 8 deletions spec/client_drain_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
wait_subs = Future.new
wait_pubs = Future.new
reqs_started = Queue.new
wait_reqs_start = Future.new
wait_reqs = Future.new

Thread.new do
wait_subs.wait_for(2)
40.times do |i|
("a".."b").each do
payload = "REQ:#{_1}:#{i}"
payload = "PUB:#{_1}:#{i}"
nc2.publish(_1, payload * 128)
sleep 0.01
end
Expand All @@ -43,9 +44,10 @@

("a".."b").map do |sub|
Thread.new do
wait_reqs_start.wait_for(5)
reqs_started << sub
payload = "REQ:#{sub}"
nc2.request(sub, payload)
nc2.request(sub, payload, timeout: 5)
end
end.each(&:join)

Expand Down Expand Up @@ -73,20 +75,25 @@
sub_queue.push(f1)
sub_queue.push(f2)

expect(f1.wait_for(1)).to eql(:ok)
expect(f2.wait_for(1)).to eql(:ok)
expect(f1.wait_for(2)).to eql(:ok)
expect(f2.wait_for(2)).to eql(:ok)

wait_pubs.wait_for(2)

wait_reqs_start.done

reqs_started.pop
reqs_started.pop

# sleep a bit to let requests initiate
sleep 2

# Start draining process asynchronously.
nc.drain

# Release the queue (we have 38 messages left)
# Release the queue
80.times { sub_queue.push(Future.new) }
result = future.wait_for(2)
result = future.wait_for(7)
expect(result).to eql(:closed)
expect(wait_reqs.wait_for(2)).to eql(:ok)
end
Expand Down Expand Up @@ -141,8 +148,8 @@
sub_queue.push(f1)
sub_queue.push(f2)

expect(f1.wait_for(1)).to eql(:ok)
expect(f2.wait_for(1)).to eql(:ok)
expect(f1.wait_for(2)).to eql(:ok)
expect(f2.wait_for(2)).to eql(:ok)

wait_pubs.wait_for(2)

Expand Down
Loading

0 comments on commit ffd6218

Please sign in to comment.