Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQP: Use exchange instead of queue for discovery (WIP) #27

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions spec/01transport.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,30 @@ transportTests = (type) ->
clientA.connect (err) ->
done err

describe 'sending participant registration message', ->
client = null
beforeEach (done) ->
client = transport.getClient address
return client.connect done
afterEach (done) ->
return client.disconnect done

it 'should be received by subscribed broker', (done) ->
definition =
id: '123'
role: 'role'
component: 'lib/Component'
inports: []
outports: []
onDiscover = (message) ->
got = message.data.payload
chai.expect(got).to.eql definition
return done()
broker.subscribeParticipantChange onDiscover, (err) ->
return done err if err
client.registerParticipant definition, (err) ->
return done err if err

describe 'outqueue without subscribers', ->
it 'sending should not error', (done) ->
payload = { foo: 'bar91' }
Expand Down
31 changes: 24 additions & 7 deletions src/amqp.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
debug = require('debug')('msgflo:amqp')
async = require 'async'
interfaces = require './interfaces'
uuid = require 'uuid'

try
amqp = require 'amqplib/callback_api'
Expand Down Expand Up @@ -144,10 +145,12 @@ class Client extends interfaces.MessagingClient
protocol: 'discovery'
command: 'participant'
payload: part
@channel.assertQueue 'fbp'
data = new Buffer JSON.stringify msg
@channel.sendToQueue 'fbp', data
return callback null
exchangeName = 'fbp'
@channel.assertExchange exchangeName, 'fanout', {}, (err) =>
return callback err if err
data = new Buffer JSON.stringify msg
@channel.publish exchangeName, '', data
return callback null

class MessageBroker extends Client
constructor: (address, options) ->
Expand Down Expand Up @@ -203,7 +206,12 @@ class MessageBroker extends Client
return callback null

# Participant registration
subscribeParticipantChange: (handler) ->
subscribeParticipantChange: (handler, callback) ->
defaultCallback = (err) ->
if err
console.err "Error in msgflo.amqp.subscribeParticipantChange, and no callback added", err
callback = defaultCallback if not callback

deserialize = (message) =>
debug 'receive on fbp', message.fields.deliveryTag
data = null
Expand All @@ -216,8 +224,17 @@ class MessageBroker extends Client
data: data
return handler out

@channel.assertQueue 'fbp'
@channel.consume 'fbp', deserialize
exchangeName = 'fbp'
@channel.assertExchange exchangeName, 'fanout', {}, (err) =>
return callback err if err
subscribeQueue = '.fbp-subscribe-' + uuid.v4()
@channel.assertQueue subscribeQueue, { persistent: false }, (err) =>
return callback err if err
@channel.bindQueue subscribeQueue, exchangeName, '', {}, (err) =>
return callback err if err
@channel.consume subscribeQueue, deserialize
debug 'subscribed to', subscribeQueue, exchangeName
return callback null

exports.Client = Client
exports.MessageBroker = MessageBroker
11 changes: 9 additions & 2 deletions src/direct.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,16 @@ class MessageBroker extends interfaces.MessageBroker
nackMessage: (message) ->
return

subscribeParticipantChange: (handler) ->
subscribeParticipantChange: (handler, callback) ->
defaultCallback = (err) ->
if err
console.err "Error in msgflo.direct.subscribeParticipantChange, and no callback added", err
callback = defaultCallback if not callback

@createQueue '', 'fbp', (err) =>
@subscribeToQueue 'fbp', handler, () ->
return callback err if err
@subscribeToQueue 'fbp', handler, (err) ->
return callback err

exports.MessageBroker = MessageBroker
exports.Client = Client
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class MessageBroker extends MessagingSystem
throw new Error 'Not Implemented'

# Participant registration
subscribeParticipantChange: (handler) ->
subscribeParticipantChange: (handler, callback) ->
throw new Error 'Not Implemented'

exports.MessageBroker = MessageBroker
11 changes: 9 additions & 2 deletions src/mqtt.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,16 @@ class MessageBroker extends Client
routing.binderMixin this

# Participant registration
subscribeParticipantChange: (handler) ->
subscribeParticipantChange: (handler, callback) ->
defaultCallback = (err) ->
if err
console.err "Error in msgflo.mqtt.subscribeParticipantChange, and no callback added", err
callback = defaultCallback if not callback

@createQueue '', 'fbp', (err) =>
@subscribeToQueue 'fbp', handler, () ->
return callback err if err
@subscribeToQueue 'fbp', handler, (err) ->
return callback err

exports.Client = Client
exports.MessageBroker = MessageBroker