Skip to content

Release v2.0.0

Compare
Choose a tag to compare
@wallyqs wallyqs released this 07 Feb 21:37
· 168 commits to main since this release
bfbc762

Revamped version of the with more similar APIs to the Go client, and initial support for JetStream.

To install add the following to your Gemfile:

gem 'nats-pure', '2.0.0'
require 'nats'

# Connect to server that has JetStream support, e.g.
# 
#   nats-server -js
# 
nc = NATS.connect("localhost")

# Create JetStream context.
js = nc.jetstream

# Create Stream that will persist messages from foo subject.
begin
  info = js.add_stream(name: "sample-stream", subjects: ["foo"])
rescue => e
  puts "Error: #{e}"
end

# Send 10 messages and wait to get an ack that they have been persisted.
10.times do |i|
  ack = js.publish("foo", "hello world: #{i}", timeout: 2)
  puts "Published: #{ack.seq}"
end

# Create pull based consumer.
psub = js.pull_subscribe("foo", "psub")

# Fetch 3 messages from consumer.
msgs = psub.fetch(3)
msgs.each do |msg|
  puts "      ACK: Stream Seq: #{msg.metadata.sequence.stream} || Consumer Seq: #{msg.metadata.sequence.consumer}"
  msg.ack
end

# Get latest consumer info.
cinfo = psub.consumer_info
puts "Consumer '#{cinfo.name}' Pending Messages: #{cinfo.num_pending}"

# Subscribe is now dispatched a NATS::Msg that may include headers
nc.subscribe("hello") do |msg|
  puts "Received on '#{msg.subject}': Data: #{msg.data} || Header: #{msg.header}"
  msg.respond("OK") if msg.reply
end
sub = nc.subscribe("hello")

# Can use publish to send a message with headers.
nc.publish("hello", header: { 'quux': 'quuz'})
nc.publish_msg(NATS::Msg.new(subject: "hello", data: "world", header:{ 'foo': 'bar'}))

# Request also supports publishing with headers.
msg = nc.request("hello", header: { 'a': 'b'})
puts "Response #{msg.data}"

msg = nc.request_msg(NATS::Msg.new(subject: "hello", data: "world!!!", header:{ 'foo': 'bar'}))
puts "Response #{msg.data}"

# Can also use iterator style to consume messages now.
msg = sub.next_msg
puts "Received on '#{msg.subject}': Data: #{msg.data} || Header: #{msg.header}"

msg = sub.next_msg(timeout: 2)
puts "Received on '#{msg.subject}': Data: #{msg.data} || Header: #{msg.header}"

begin
  sub.next_msg(timeout: 1)
rescue NATS::Timeout => e
  # puts "Timeout since no new messages yet: #{e}"
end

nc.flush

nc.close