diff --git a/actions/deleteItem.js b/actions/deleteItem.js index b94e5c0..ed119ff 100644 --- a/actions/deleteItem.js +++ b/actions/deleteItem.js @@ -29,6 +29,7 @@ module.exports = function deleteItem(store, data, cb) { itemDb.del(key, function(err) { if (err) return cb(err) + store.emit(table, key, existingItem) cb(null, returnObj) }) }) diff --git a/actions/putItem.js b/actions/putItem.js index e299dc0..9bd573e 100644 --- a/actions/putItem.js +++ b/actions/putItem.js @@ -29,6 +29,7 @@ module.exports = function putItem(store, data, cb) { itemDb.put(key, data.Item, function(err) { if (err) return cb(err) + store.emit(table, key, existingItem, data.Item) cb(null, returnObj) }) }) diff --git a/actions/updateItem.js b/actions/updateItem.js index 72f7f7c..6fdd8ba 100644 --- a/actions/updateItem.js +++ b/actions/updateItem.js @@ -52,6 +52,7 @@ module.exports = function updateItem(store, data, cb) { itemDb.put(key, item, function(err) { if (err) return cb(err) + store.emit(table, key, oldItem, item) cb(null, returnObj) }) }) diff --git a/index.js b/index.js index a0f5da6..3ada9e7 100644 --- a/index.js +++ b/index.js @@ -33,7 +33,9 @@ function dynalite(options) { // Ensure we close DB when we're closing the server too var httpServerClose = server.close, httpServerListen = server.listen + var streamHandler = {}; server.close = function(cb) { + streamHandler = {}; store.db.close(function(err) { if (err) return cb(err) // Recreate the store if the user wants to listen again @@ -45,6 +47,56 @@ function dynalite(options) { }) } + store.emit = function(tableDef, meh, before, after) { + var cause = (Math.random()*100000).toString(16); + var table = tableDef.TableName; + var key = tableDef.KeySchema.map(function(ks) { return ks.AttributeName }).reduce(function(key, attr) { + key[attr] = (before || after)[attr]; + return key; + }, {}); + var payload = { + Records: [{ + dynamodb: { + Keys: key, + OldImage: before, + NewImage: after + }, + eventName: before === undefined ? 'INSERT' : (after === undefined ? 'REMOVE' : 'MODIFY') + }] + } + + var handlers = streamHandler[table] || []; + handlers.forEach(function(handler, idx) { + var id = (Math.random()*10000*(idx+1)).toString(16); + handler.pending++; + setTimeout(send, 0); + function send() { + if (handler.running) return setTimeout(send, 0); + handler.running = true; + handler.fn(payload, {}, function(err) { + if (err) throw err; + handler.pending--; + handler.running = false; + }); + } + }); + } + + server.pendingEvents = function() { + return Object.keys(streamHandler).reduce(function(memo, table) { + return memo + streamHandler[table].reduce(function(h) { return h.pending; }); + }, 0); + } + + server.stream = function(table, handler) { + streamHandler[table] = streamHandler[table] || [] + streamHandler[table].push({ + running: false, + pending: 0, + fn: handler + }); + } + return server }