Skip to content

Commit

Permalink
Merge pull request #50 from nats-io/release/v0.7.0
Browse files Browse the repository at this point in the history
Release/v0.7.0
  • Loading branch information
wallyqs authored Aug 17, 2021
2 parents f726f5b + 5ad495f commit 935874e
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 10 deletions.
15 changes: 12 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

A thread safe [Ruby](http://ruby-lang.org) client for the [NATS messaging system](https://nats.io) written in pure Ruby.

[![License Apache 2.0](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)[![Build Status](https://travis-ci.org/nats-io/nats-pure.rb.svg)](http://travis-ci.org/nats-io/nats-pure.rb)[![Gem Version](https://d25lcipzij17d.cloudfront.net/badge.svg?id=rb&type=5&v=0.6.2)](https://rubygems.org/gems/nats-pure/versions/0.6.2)
[![License Apache 2.0](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)[![Build Status](https://travis-ci.org/nats-io/nats-pure.rb.svg)](http://travis-ci.org/nats-io/nats-pure.rb)[![Gem Version](https://d25lcipzij17d.cloudfront.net/badge.svg?id=rb&type=5&v=0.7.0)](https://rubygems.org/gems/nats-pure/versions/0.7.0)

## Getting Started

Expand Down Expand Up @@ -42,8 +42,8 @@ nats.unsubscribe(sid)
nats.request('help', 'please', max: 5) { |response| puts "Got a response: '#{response}'" }

# Replies
nats.subscribe('help') do |msg, reply, subject|
puts "Received on '#{subject}': '#{msg}'"
nats.subscribe('help') do |msg, reply, subject, headers|
puts "Received on '#{subject}': '#{msg}' with headers: #{headers}"
nats.publish(reply, "I'll help!")
end

Expand All @@ -55,6 +55,15 @@ rescue NATS::IO::Timeout
puts "nats: request timed out"
end

# Request using a message with headers
begin
msg = NATS::Msg.new(subject: "help", headers: {foo: 'bar'})
resp = nats.request_msg(msg)
puts "Received on '#{resp.subject}': #{resp.data}"
rescue NATS::IO::Timeout => e
puts "nats: request timed out: #{e}"
end

# Server roundtrip which fails if it does not happen within 500ms
begin
nats.flush(0.5)
Expand Down
24 changes: 17 additions & 7 deletions lib/nats/io/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -478,13 +478,18 @@ def request(subject, payload="", opts={}, &blk)

# Publish request and wait for reply.
publish(subject, payload, inbox)
with_nats_timeout(timeout) do
@resp_sub.synchronize do
future.wait(timeout)
begin
with_nats_timeout(timeout) do
@resp_sub.synchronize do
future.wait(timeout)
end
end
rescue NATS::IO::Timeout => e
synchronize { @resp_map.delete(token) }
raise e
end

# Check if there is a response already
# Check if there is a response already.
synchronize do
result = @resp_map[token]
response = result[:response]
Expand Down Expand Up @@ -527,10 +532,15 @@ def request_msg(msg, opts={})

# Publish request and wait for reply.
publish_msg(msg)
with_nats_timeout(timeout) do
@resp_sub.synchronize do
future.wait(timeout)
begin
with_nats_timeout(timeout) do
@resp_sub.synchronize do
future.wait(timeout)
end
end
rescue NATS::IO::Timeout => e
synchronize { @resp_map.delete(token) }
raise e
end

# Check if there is a response already.
Expand Down
16 changes: 16 additions & 0 deletions spec/client_v2_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@
resp = nc.request_msg(msg, timeout: 1)
end.to raise_error(NATS::IO::NoRespondersError)

result = nc.instance_variable_get(:@resp_map)
expect(result.keys.count).to eql(0)

expect(resp).to be_nil

nc.close
Expand All @@ -142,6 +145,19 @@

expect(resp).to be_nil

# Timed out requests should be cleaned up.
50.times do
nc.request("hi", "timeout", timeout: 0.001) rescue nil
end

msg = NATS::Msg.new(subject: "hi")
50.times do
nc.request_msg(msg, timeout: 0.001) rescue nil
end

result = nc.instance_variable_get(:@resp_map)
expect(result.keys.count).to eql(0)

resp = nil
expect do
msg = NATS::Msg.new(subject: "hi")
Expand Down

0 comments on commit 935874e

Please sign in to comment.