Skip to content

Commit

Permalink
publish and request support headers as well
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <[email protected]>
  • Loading branch information
wallyqs committed Oct 27, 2021
1 parent 2a271e6 commit cd960e2
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 20 deletions.
15 changes: 11 additions & 4 deletions lib/nats/io/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,14 @@ class << self; alias_method :request, :old_request; end
self
end

def publish(subject, msg=EMPTY_MSG, opt_reply=nil, &blk)
def publish(subject, msg=EMPTY_MSG, opt_reply=nil, **options, &blk)
raise NATS::IO::BadSubject if !subject or subject.empty?
msg_size = msg.bytesize
if options[:header]
return publish_msg(NATS::Msg.new(subject: subject, data: msg, reply: opt_reply, header: options[:header]))
end

# Accounting
msg_size = msg.bytesize
@stats[:out_msgs] += 1
@stats[:out_bytes] += msg_size

Expand Down Expand Up @@ -458,13 +461,17 @@ def subscribe(subject, opts={}, &callback)
# It times out in case the request is not retrieved within the
# specified deadline.
# If given a callback, then the request happens asynchronously.
def request(subject, payload="", opts={}, &blk)
def request(subject, payload="", **opts, &blk)
raise NATS::IO::BadSubject if !subject or subject.empty?

# If a block was given then fallback to method using auto unsubscribe.
return old_request(subject, payload, opts, &blk) if blk
return old_request(subject, payload, opts) if opts[:old_style]

if opts[:header]
return request_msg(NATS::Msg.new(subject: subject, data: payload, header: opts[:header]), **opts)
end

token = nil
inbox = nil
future = nil
Expand Down Expand Up @@ -512,7 +519,7 @@ def request(subject, payload="", opts={}, &blk)
end

# request_msg makes a NATS request using a NATS::Msg that may include headers.
def request_msg(msg, opts={})
def request_msg(msg, **opts)
raise TypeError, "nats: expected NATS::Msg, got #{msg.class.name}" unless msg.is_a?(Msg)
raise NATS::IO::BadSubject if !msg.subject or msg.subject.empty?

Expand Down
26 changes: 13 additions & 13 deletions lib/nats/io/js.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def initialize(conn, params={})
# @option params [String] :stream Expected Stream to which the message is being published.
# @raise [NATS::Timeout] When it takes too long to receive an ack response.
# @return [PubAck] The pub ack response.
def publish(subject, payload="", params={})
def publish(subject, payload="", **params)
params[:timeout] ||= @opts[:timeout]
if params[:stream]
params[:header] ||= {}
Expand All @@ -86,7 +86,7 @@ def publish(subject, payload="", params={})
header: params[:header])

begin
resp = @nc.request_msg(msg, params)
resp = @nc.request_msg(msg, **params)
result = JSON.parse(resp.data, symbolize_names: true)
rescue ::NATS::IO::NoRespondersError
raise JetStream::Error::NoStreamResponse.new("nats: no response from stream")
Expand Down Expand Up @@ -282,7 +282,7 @@ def find_stream_name_by_subject(subject, params={})
def api_request(req_subject, req="", params={})
params[:timeout] ||= @opts[:timeout]
result = begin
msg = @nc.request(req_subject, req, params)
msg = @nc.request(req_subject, req, **params)
JSON.parse(msg.data, symbolize_names: true)
rescue NATS::IO::NoRespondersError
raise JetStream::Error::ServiceUnavailable
Expand Down Expand Up @@ -557,11 +557,11 @@ def initialize(opts)
end

module AckMethods
def ack(params={})
def ack(**params)
ensure_is_acked_once!

resp = if params[:timeout]
@nc.request(@reply, Ack::Ack, params)
@nc.request(@reply, Ack::Ack, **params)
else
@nc.publish(@reply, Ack::Ack)
end
Expand All @@ -570,21 +570,21 @@ def ack(params={})
resp
end

def ack_sync(params={})
def ack_sync(**params)
ensure_is_acked_once!

params[:timeout] ||= 0.5
resp = @nc.request(@reply, Ack::Ack, params)
resp = @nc.request(@reply, Ack::Ack, **params)
@sub.synchronize { @ackd = true }

resp
end

def nak(params={})
def nak(**params)
ensure_is_acked_once!

resp = if params[:timeout]
@nc.request(@reply, Ack::Nak, params)
@nc.request(@reply, Ack::Nak, **params)
else
@nc.publish(@reply, Ack::Nak)
end
Expand All @@ -593,11 +593,11 @@ def nak(params={})
resp
end

def term(params={})
def term(**params)
ensure_is_acked_once!

resp = if params[:timeout]
@nc.request(@reply, Ack::Term, params)
@nc.request(@reply, Ack::Term, **params)
else
@nc.publish(@reply, Ack::Term)
end
Expand All @@ -606,8 +606,8 @@ def term(params={})
resp
end

def in_progress(params={})
params[:timeout] ? @nc.request(@reply, Ack::Progress, params) : @nc.publish(@reply, Ack::Progress)
def in_progress(**params)
params[:timeout] ? @nc.request(@reply, Ack::Progress, **params) : @nc.publish(@reply, Ack::Progress)
end

def metadata
Expand Down
11 changes: 9 additions & 2 deletions lib/nats/io/msg.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,16 @@ def initialize(opts={})
@meta = nil
end

def respond(data)
def respond(data='')
return unless @nc
@nc.publish(self.reply, data)
if self.header
dmsg = self.dup
dmsg.subject = self.reply
dmsg.data = data
@nc.publish_msg(dmsg)
else
@nc.publish(self.reply, data)
end
end

def respond_msg(msg)
Expand Down
2 changes: 2 additions & 0 deletions spec/client_cluster_reconnect_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@
end

it 'should reconnect to nodes discovered from seed server with single uri' do
skip 'FIXME: flaky test'

# Nodes join to cluster before we try to connect
[@s2, @s3].each do |s|
s.start_server(true)
Expand Down
43 changes: 42 additions & 1 deletion spec/client_v2_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,27 @@
end
sub1.unsubscribe

sub3 = nc.subscribe("quux")

# message with no headers
nc.publish("quux", "first")

# empty payload
nc.publish("quux", header: { "foo": "A"})

# payload and header
nc.publish("quux", "third", header: { "foo": "B"})
nc.flush

msg = sub3.next_msg
expect(msg.header).to be_nil

msg = sub3.next_msg
expect(msg.header["foo"]).to eql("A")

msg = sub3.next_msg
expect(msg.header["foo"]).to eql("B")

nc.close
end

Expand All @@ -114,7 +135,7 @@
end
nc.flush

1.upto(5) do |n|
1.upto(5) do |n|p
data = "hello world-#{'A' * n}"
msg = NATS::Msg.new(subject: 'hello',
data: data,
Expand All @@ -129,6 +150,26 @@
end
expect(msgs.count).to eql(5)

sub2 = nc.subscribe("quux")
Thread.new do
# Add some custom headers...
msg = sub2.next_msg
msg.header["reply"] = "ok"
msg.respond
end

msg = nc.request("quux", timeout: 2, header: { "one": "1" })
expect(msg.data).to eql('')
expect(msg.header).to eql({"one" => "1", "reply" => "ok"})

expect do
msg.respond_msg("foo")
end.to raise_error TypeError

expect do
nc.request("quux", timeout: 0.0001, header: { "one": "1" })
end.to raise_error NATS::Timeout

nc.close
end

Expand Down

0 comments on commit cd960e2

Please sign in to comment.