Skip to content

Commit

Permalink
Merge pull request #77 from nats-io/release/v2.1.0
Browse files Browse the repository at this point in the history
Release v2.1.0
  • Loading branch information
wallyqs authored Jun 9, 2022
2 parents 630cae5 + 81d31f9 commit 0c1966f
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 30 deletions.
34 changes: 17 additions & 17 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
PATH
remote: .
specs:
nats-pure (2.0.0)
nats-pure (2.1.0)

GEM
remote: https://rubygems.org/
specs:
base32 (0.3.2)
benchmark-ips (2.7.2)
diff-lcs (1.3)
ed25519 (1.2.4)
base32 (0.3.4)
benchmark-ips (2.9.3)
diff-lcs (1.5.0)
ed25519 (1.3.0)
nkeys (0.1.0)
base32 (~> 0.3)
ed25519 (~> 1.2)
rake (13.0.1)
rspec (3.8.0)
rspec-core (~> 3.8.0)
rspec-expectations (~> 3.8.0)
rspec-mocks (~> 3.8.0)
rspec-core (3.8.1)
rspec-support (~> 3.8.0)
rspec-expectations (3.8.4)
rake (13.0.6)
rspec (3.11.0)
rspec-core (~> 3.11.0)
rspec-expectations (~> 3.11.0)
rspec-mocks (~> 3.11.0)
rspec-core (3.11.0)
rspec-support (~> 3.11.0)
rspec-expectations (3.11.0)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.8.0)
rspec-mocks (3.8.1)
rspec-support (~> 3.11.0)
rspec-mocks (3.11.0)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.8.0)
rspec-support (3.8.2)
rspec-support (~> 3.11.0)
rspec-support (3.11.0)

PLATFORMS
ruby
Expand Down
22 changes: 18 additions & 4 deletions lib/nats/io/js.rb
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ def add_consumer(stream, config, params={})
stream_name: stream,
config: config
}

result = api_request(req_subject, req.to_json, params)
JetStream::API::ConsumerInfo.new(result).freeze
end
Expand Down Expand Up @@ -503,6 +504,7 @@ def fetch(batch=1, params={})
synchronize do
unless @pending_queue.empty?
msg = @pending_queue.pop
@pending_size -= msg.data.size
# Check for a no msgs response status.
if JS.is_status_msg(msg)
case msg.header["Status"]
Expand All @@ -528,7 +530,12 @@ def fetch(batch=1, params={})
# Wait for result of fetch or timeout.
synchronize { wait_for_msgs_cond.wait(timeout) }

msgs << @pending_queue.pop unless @pending_queue.empty?
unless @pending_queue.empty?
msg = @pending_queue.pop
@pending_size -= msg.data.size

msgs << msg
end

duration = MonotonicTime.since(t)
if duration > timeout
Expand Down Expand Up @@ -557,6 +564,7 @@ def fetch(batch=1, params={})
if batch <= @pending_queue.size
batch.times do
msg = @pending_queue.pop
@pending_size -= msg.data.size

# Check for a no msgs response status.
if JS.is_status_msg(msg)
Expand All @@ -583,10 +591,15 @@ def fetch(batch=1, params={})
# Not receiving even one is a timeout.
start_time = MonotonicTime.now
msg = nil
synchronize {

synchronize do
wait_for_msgs_cond.wait(timeout)
msg = @pending_queue.pop unless @pending_queue.empty?
}

unless @pending_queue.empty?
msg = @pending_queue.pop
@pending_size -= msg.data.size
end
end

# Check if the first message was a response saying that
# there are no messages.
Expand Down Expand Up @@ -631,6 +644,7 @@ def fetch(batch=1, params={})
end
else
msg = @pending_queue.pop
@pending_size -= msg.data.size

if JS.is_status_msg(msg)
case msg.header[JS::Header::Status]
Expand Down
2 changes: 1 addition & 1 deletion lib/nats/io/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
module NATS
module IO
# VERSION is the version of the client announced on CONNECT to the server.
VERSION = "2.0.0".freeze
VERSION = "2.1.0".freeze

# LANG is the lang runtime of the client announced on CONNECT to the server.
LANG = "#{RUBY_ENGINE}#{RUBY_VERSION}".freeze
Expand Down
58 changes: 50 additions & 8 deletions spec/js_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
js.publish("hello", "1")
js.publish("world", "2")
js.publish("hello.world", "3")

sub = js.pull_subscribe("hello", "psub", config: { max_waiting: 30 })
info = sub.consumer_info
expect(info.config.max_waiting).to eql(30)
Expand Down Expand Up @@ -413,11 +413,11 @@
expect(e).to be_a(NATS::IO::Timeout)

# NOTE: After +2.7.1 info also resets the expired requests.
#
#
# resp = nc.request("$JS.API.CONSUMER.INFO.test.test")
# info = JSON.parse(resp.data, symbolize_names: true)
# expect(info[:num_waiting]).to be_between(1, 3)
#
#

# This should not cause 408 timeout errors.
10.times do
Expand Down Expand Up @@ -492,6 +492,48 @@
sub.unsubscribe
end.to raise_error(NATS::IO::BadSubscription)
end

it 'should account pending data' do
nc = NATS.connect(@s.uri)
nc2 = NATS.connect(@s.uri)
js = nc.jetstream
subject = "limits.test"

nc.on_error do |e|
puts e
end

js.add_stream(name: "limitstest", subjects: [subject])

# Continuously send messages until reaching pending bytes limit.
t = Thread.new do
payload = 'A' * 1024 * 1024
loop do
nc2.publish(subject, payload)
sleep 0.01
end
end

sub = js.pull_subscribe(subject, "test")
65.times do |i|
msgs = sub.fetch(1)
msgs.each do |msg|
msg.ack
end
end

sub = js.pull_subscribe(subject, "test")
65.times do |i|
msgs = sub.fetch(2)
msgs.each do |msg|
msg.ack
end
end

t.exit
nc.close
nc2.close
end
end

describe 'Push Subscribe' do
Expand Down Expand Up @@ -593,7 +635,7 @@
sub.unsubscribe
end

it "should create durable single subscribers" do
it "should create durable single subscribers" do
js = nc.jetstream
js.add_stream(name: "hello", subjects: ["hello", "world", "hello.>"])

Expand Down Expand Up @@ -621,7 +663,7 @@
expect do
js.subscribe("hello", durable: "first")
end.to raise_error(NATS::JetStream::Error)

info = sub.consumer_info
expect(info.num_ack_pending).to eql(2)
sub.unsubscribe
Expand Down Expand Up @@ -944,16 +986,16 @@
expect(resp.config.name).to eql('stream3')
expect(resp.config.num_replicas).to eql(1)

expect do
expect do
nc.jsm.add_stream(foo: "foo")
end.to raise_error(ArgumentError)

expect do
expect do
nc.jsm.add_stream(foo: "foo.*")
end.to raise_error(ArgumentError)

# Raise when stream names contain prohibited characters
expect do
expect do
nc.jsm.add_stream(name: "foo.bar*baz")
end.to raise_error(ArgumentError)
nc.close
Expand Down

0 comments on commit 0c1966f

Please sign in to comment.