Skip to content

Commit

Permalink
Merge pull request #49 from nats-io/headers
Browse files Browse the repository at this point in the history
Add support for Headers
  • Loading branch information
wallyqs authored Aug 17, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents e3377fe + 75fb753 commit f726f5b
Showing 10 changed files with 415 additions and 28 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -4,13 +4,14 @@ rvm:
- 2.5
- 2.6
- 2.7
- 3.0.2

cache:
directories:
- $HOME/nats-server

before_install:
- bash ./scripts/install_gnatsd.sh
- bash ./scripts/install_nats.sh

before_script:
- export PATH=$HOME/nats-server:$PATH
4 changes: 2 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
nats-pure (0.6.2)
nats-pure (0.7.0)

GEM
remote: https://rubygems.org/
@@ -39,4 +39,4 @@ DEPENDENCIES
rspec

BUNDLED WITH
2.0.1
2.1.4
173 changes: 157 additions & 16 deletions lib/nats/io/client.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2016-2018 The NATS Authors
# Copyright 2016-2021 The NATS Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
@@ -64,6 +64,12 @@ module IO
PING_REQUEST = ("PING#{CR_LF}".freeze)
PONG_RESPONSE = ("PONG#{CR_LF}".freeze)

NATS_HDR_LINE = ("NATS/1.0#{CR_LF}".freeze)
STATUS_MSG_LEN = 3
STATUS_HDR = ("Status".freeze)
DESC_HDR = ("Description".freeze)
NATS_HDR_LINE_SIZE = (NATS_HDR_LINE.bytesize)

SUB_OP = ('SUB'.freeze)
EMPTY_MSG = (''.freeze)

@@ -91,6 +97,9 @@ class AuthError < ConnectError; end
# When we cannot connect since there are no servers available.
class NoServersError < ConnectError; end

# When there are no subscribers available to respond.
class NoRespondersError < ConnectError; end

# When the connection exhausts max number of pending pings replies.
class StaleConnectionError < Error; end

@@ -346,6 +355,36 @@ def publish(subject, msg=EMPTY_MSG, opt_reply=nil, &blk)
@flush_queue << :pub if @flush_queue.empty?
end

# Publishes a NATS::Msg that may include headers.
def publish_msg(msg)
raise TypeError, "nats: expected NATS::Msg, got #{msg.class.name}" unless msg.is_a?(Msg)
raise BadSubject if !msg.subject or msg.subject.empty?

msg.reply ||= ''
msg.data ||= ''
msg_size = msg.data.bytesize

# Accounting
@stats[:out_msgs] += 1
@stats[:out_bytes] += msg_size

if msg.header
hdr = ''
hdr << NATS_HDR_LINE
msg.header.each do |k, v|
hdr << "#{k}: #{v}#{CR_LF}"
end
hdr << CR_LF
hdr_len = hdr.bytesize
total_size = msg_size + hdr_len
send_command("HPUB #{msg.subject} #{msg.reply} #{hdr_len} #{total_size}\r\n#{hdr}#{msg.data}\r\n")
else
send_command("PUB #{msg.subject} #{msg.reply} #{msg_size}\r\n#{msg.data}\r\n")
end

@flush_queue << :pub if @flush_queue.empty?
end

# Create subscription which is dispatched asynchronously
# messages to a callback.
def subscribe(subject, opts={}, &callback)
@@ -393,7 +432,8 @@ def subscribe(subject, opts={}, &callback)
when 0 then cb.call
when 1 then cb.call(msg.data)
when 2 then cb.call(msg.data, msg.reply)
else cb.call(msg.data, msg.reply, msg.subject)
when 3 then cb.call(msg.data, msg.reply, msg.subject)
else cb.call(msg.data, msg.reply, msg.subject, msg.header)
end
rescue => e
synchronize do
@@ -411,11 +451,12 @@ def subscribe(subject, opts={}, &callback)
# It times out in case the request is not retrieved within the
# specified deadline.
# If given a callback, then the request happens asynchronously.
def request(subject, payload, opts={}, &blk)
return unless subject
def request(subject, payload="", opts={}, &blk)
raise BadSubject if !subject or subject.empty?

# If a block was given then fallback to method using auto unsubscribe.
return old_request(subject, payload, opts, &blk) if blk
return old_request(subject, payload, opts) if opts[:old_style]

token = nil
inbox = nil
@@ -447,6 +488,61 @@ def request(subject, payload, opts={}, &blk)
synchronize do
result = @resp_map[token]
response = result[:response]
@resp_map.delete(token)
end

if response and response.header
status = response.header[STATUS_HDR]
raise NoRespondersError if status == "503"
end

response
end

# Makes a NATS request using a NATS::Msg that may include headers.
def request_msg(msg, opts={})
raise TypeError, "nats: expected NATS::Msg, got #{msg.class.name}" unless msg.is_a?(Msg)
raise BadSubject if !msg.subject or msg.subject.empty?

token = nil
inbox = nil
future = nil
response = nil
timeout = opts[:timeout] ||= 0.5
synchronize do
start_resp_mux_sub! unless @resp_sub_prefix

# Create token for this request.
token = @nuid.next
inbox = "#{@resp_sub_prefix}.#{token}"

# Create the a future for the request that will
# get signaled when it receives the request.
future = @resp_sub.new_cond
@resp_map[token][:future] = future
end
msg.reply = inbox
msg.data ||= ''
msg_size = msg.data.bytesize

# Publish request and wait for reply.
publish_msg(msg)
with_nats_timeout(timeout) do
@resp_sub.synchronize do
future.wait(timeout)
end
end

# Check if there is a response already.
synchronize do
result = @resp_map[token]
response = result[:response]
@resp_map.delete(token)
end

if response and response.header
status = response.header[STATUS_HDR]
raise NoRespondersError if status == "503"
end

response
@@ -464,11 +560,13 @@ def old_request(subject, payload, opts={}, &blk)
# the messages asynchronously and return the sid.
if blk
opts[:max] ||= 1
s = subscribe(inbox, opts) do |msg, reply|
s = subscribe(inbox, opts) do |msg, reply, subject, header|
case blk.arity
when 0 then blk.call
when 1 then blk.call(msg)
else blk.call(msg, reply)
when 2 then blk.call(msg, reply)
when 3 then blk.call(msg, reply, subject)
else blk.call(msg, reply, subject, header)
end
end
publish(subject, payload, inbox)
@@ -594,8 +692,7 @@ def process_err(err)
process_op_error(e)
end

def process_msg(subject, sid, reply, data)
# Accounting
def process_msg(subject, sid, reply, data, header)
@stats[:in_msgs] += 1
@stats[:in_bytes] += data.size

@@ -604,7 +701,7 @@ def process_msg(subject, sid, reply, data)
synchronize { sub = @subs[sid] }
return unless sub

sc = nil
err = nil
sub.synchronize do
sub.received += 1

@@ -625,7 +722,8 @@ def process_msg(subject, sid, reply, data)
# do so here already while holding the lock and return
if sub.future
future = sub.future
sub.response = Msg.new(subject, reply, data)
hdr = process_hdr(header)
sub.response = Msg.new(subject: subject, reply: reply, data: data, header: hdr)
future.signal

return
@@ -634,20 +732,54 @@ def process_msg(subject, sid, reply, data)
# and should be able to consume messages in parallel.
if sub.pending_queue.size >= sub.pending_msgs_limit \
or sub.pending_size >= sub.pending_bytes_limit then
sc = SlowConsumer.new("nats: slow consumer, messages dropped")
err = SlowConsumer.new("nats: slow consumer, messages dropped")
else
hdr = process_hdr(header)

# Only dispatch message when sure that it would not block
# the main read loop from the parser.
sub.pending_queue << Msg.new(subject, reply, data)
msg = Msg.new(subject: subject, reply: reply, data: data, header: hdr)
sub.pending_queue << msg
sub.pending_size += data.size
end
end
end

synchronize do
@last_err = sc
@err_cb.call(sc) if @err_cb
end if sc
@last_err = err
@err_cb.call(err) if @err_cb
end if err
end

def process_hdr(header)
hdr = nil
if header
hdr = {}
lines = header.lines

# Check if it is an inline status and description.
if lines.count <= 2
status_hdr = lines.first.rstrip
hdr[STATUS_HDR] = status_hdr.slice(NATS_HDR_LINE_SIZE-1, STATUS_MSG_LEN)

if NATS_HDR_LINE_SIZE+2 < status_hdr.bytesize
desc = status_hdr.slice(NATS_HDR_LINE_SIZE+STATUS_MSG_LEN, status_hdr.bytesize)
hdr[DESC_HDR] = desc unless desc.empty?
end
end
begin
lines.slice(1, header.size).each do |line|
line.rstrip!
next if line.empty?
key, value = line.strip.split(/\s*:\s*/, 2)
hdr[key] = value
end
rescue => e
err = e
end
end

hdr
end

def process_info(line)
@@ -831,6 +963,15 @@ def connect_command
cs[:sig] = @signature_cb.call(nonce)
end

if @server_info[:headers]
cs[:headers] = @server_info[:headers]
cs[:no_responders] = if @options[:no_responders] == false
@options[:no_responders]
else
@server_info[:headers]
end
end

"CONNECT #{cs.to_json}#{CR_LF}"
end

@@ -1537,7 +1678,7 @@ def connect_addrinfo(ai, port, timeout)
end
end

Msg = Struct.new(:subject, :reply, :data)
Msg = Struct.new(:subject, :reply, :data, :header, keyword_init: true)

class Subscription
include MonitorMixin
20 changes: 17 additions & 3 deletions lib/nats/io/parser.rb
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ module NATS
module Protocol

MSG = /\AMSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n/i
HMSG = /\AHMSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?([\d]+)\s+(\d+)\r\n/i
OK = /\A\+OK\s*\r\n/i
ERR = /\A-ERR\s+('.+')?\r\n/i
PING = /\APING\s*\r\n/i
@@ -49,6 +50,7 @@ def reset!
@sid = nil
@reply = nil
@needed = nil
@header_needed = nil
end

def parse(data)
@@ -61,6 +63,10 @@ def parse(data)
@buf = $'
@sub, @sid, @reply, @needed = $1, $2.to_i, $4, $5.to_i
@parse_state = AWAITING_MSG_PAYLOAD
when HMSG
@buf = $'
@sub, @sid, @reply, @header_needed, @needed = $1, $2.to_i, $4, $5.to_i, $6.to_i
@parse_state = AWAITING_MSG_PAYLOAD
when OK # No-op right now
@buf = $'
when ERR
@@ -89,9 +95,17 @@ def parse(data)

when AWAITING_MSG_PAYLOAD
return unless (@needed && @buf.bytesize >= (@needed + CR_LF_SIZE))
@nc.process_msg(@sub, @sid, @reply, @buf.slice(0, @needed))
@buf = @buf.slice((@needed + CR_LF_SIZE), @buf.bytesize)
@sub = @sid = @reply = @needed = nil
if @header_needed
hbuf = @buf.slice(0, @header_needed)
payload = @buf.slice(@header_needed, (@needed-@header_needed))
@nc.process_msg(@sub, @sid, @reply, payload, hbuf)
@buf = @buf.slice((@needed + CR_LF_SIZE), @buf.bytesize)
else
@nc.process_msg(@sub, @sid, @reply, @buf.slice(0, @needed), nil)
@buf = @buf.slice((@needed + CR_LF_SIZE), @buf.bytesize)
end

@sub = @sid = @reply = @needed = @header_needed = nil
@parse_state = AWAITING_CONTROL_LINE
@buf = nil if (@buf && @buf.empty?)
end
4 changes: 2 additions & 2 deletions lib/nats/io/version.rb
Original file line number Diff line number Diff line change
@@ -15,8 +15,8 @@
module NATS
module IO
# NOTE: These are all announced to the server on CONNECT
VERSION = "0.6.2"
LANG = "#{RUBY_ENGINE}2".freeze
VERSION = "0.7.0"
LANG = "#{RUBY_ENGINE}#{RUBY_VERSION}".freeze
PROTOCOL = 1
end
end
2 changes: 1 addition & 1 deletion scripts/install_gnatsd.sh → scripts/install_nats.sh
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@

set -e

export DEFAULT_NATS_SERVER_VERSION=v2.0.0
export DEFAULT_NATS_SERVER_VERSION=v2.3.4

export NATS_SERVER_VERSION="${NATS_SERVER_VERSION:=$DEFAULT_NATS_SERVER_VERSION}"

Loading

0 comments on commit f726f5b

Please sign in to comment.