Skip to content

Commit

Permalink
[fix] Support async initialization. Fixes #4 should help with towards #3
Browse files Browse the repository at this point in the history
 as well
  • Loading branch information
3rd-Eden committed Dec 3, 2014
1 parent b7d6520 commit 65c3c2d
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 62 deletions.
135 changes: 73 additions & 62 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ function Node(address, options) {
this.leader = ''; // Leader in our cluster.
this.term = 0; // Our current term.

if ('function' === this.type(this.initialize)) {
this.once('initialize', this.initialize);
}

this._initialize(options);
}

Expand Down Expand Up @@ -139,36 +135,38 @@ for (var s = 0; s < Node.states.length; s++) {
* @param {Object} options The configuration you passed in the constructor.
* @api private
*/
Node.prototype._initialize = function initialize(options) {
Node.prototype._initialize = function initializing(options) {
var node = this;

//
// Reset our vote as we're starting a new term. Votes only last one term.
//
this.on('term change', function change() {
this.votes.for = null;
this.votes.granted = 0;
node.on('term change', function change() {
node.votes.for = null;
node.votes.granted = 0;
});

//
// Reset our times and start the heartbeat again. If we're promoted to leader
// the heartbeat will automatically be broadcasted to users as well.
//
this.on('state change', function change(state) {
this.timers.clear('heartbeat, election');
this.heartbeat(Node.LEADER === this.state ? this.beat : this.timeout());
this.emit(Node.states[state].toLowerCase());
node.on('state change', function change(state) {
node.timers.clear('heartbeat, election');
node.heartbeat(Node.LEADER === node.state ? node.beat : node.timeout());
node.emit(Node.states[state].toLowerCase());
});

//
// Receive incoming messages and process them.
//
this.on('data', function incoming(packet, write) {
node.on('data', function incoming(packet, write) {
write = write || nope;
var reason;

if ('object' !== this.type(packet)) {
if ('object' !== node.type(packet)) {
reason = 'Invalid packet received';
this.emit('error', new Error(reason));
return write(this.packet('error', reason));
node.emit('error', new Error(reason));
return write(node.packet('error', reason));
}

//
Expand All @@ -181,16 +179,16 @@ Node.prototype._initialize = function initialize(options) {
// If the node receives a request with a stale term number it should be
// rejected.
//
if (packet.term > this.term) {
this.change({
leader: Node.LEADER === packet.state ? packet.address : packet.leader || this.leader,
if (packet.term > node.term) {
node.change({
leader: Node.LEADER === packet.state ? packet.address : packet.leader || node.leader,
state: Node.FOLLOWER,
term: packet.term
});
} else if (packet.term < this.term) {
reason = 'Stale term detected, received `'+ packet.term +'` we are at '+ this.term;
this.emit('error', new Error(reason));
return write(this.packet('error', reason));
} else if (packet.term < node.term) {
reason = 'Stale term detected, received `'+ packet.term +'` we are at '+ node.term;
node.emit('error', new Error(reason));
return write(node.packet('error', reason));
}

//
Expand All @@ -204,14 +202,14 @@ Node.prototype._initialize = function initialize(options) {
// would be changed or prevented above..
//
if (Node.LEADER === packet.state) {
if (Node.FOLLOWER !== this.state) this.change({ state: Node.FOLLOWER });
if (packet.address !== this.leader) this.change({ leader: packet.address });
if (Node.FOLLOWER !== node.state) node.change({ state: Node.FOLLOWER });
if (packet.address !== node.leader) node.change({ leader: packet.address });

//
// Always when we receive an message from the Leader we need to reset our
// heartbeat.
//
this.heartbeat(this.timeout());
node.heartbeat(node.timeout());
}

switch (packet.type) {
Expand All @@ -227,9 +225,9 @@ Node.prototype._initialize = function initialize(options) {
// The term of the vote is bigger then ours so we need to update it. If
// it's the same and we already voted, we need to deny the vote.
//
if (this.votes.for && this.votes.for !== packet.address) {
this.emit('vote', packet, false);
return write(this.packet('voted', { granted: false }));
if (node.votes.for && node.votes.for !== packet.address) {
node.emit('vote', packet, false);
return write(node.packet('voted', { granted: false }));
}

//
Expand All @@ -239,31 +237,31 @@ Node.prototype._initialize = function initialize(options) {
// @TODO point to index of last commit entry.
// @TODO point to term of last commit entry.
//
if (this.log && packet.last && (
this.log.index > packet.last.index
|| this.term > packet.last.term
if (node.log && packet.last && (
node.log.index > packet.last.index
|| node.term > packet.last.term
)) {
this.emit('vote', packet, false);
return write(this.packet('voted', { granted: false }));
node.emit('vote', packet, false);
return write(node.packet('voted', { granted: false }));
}

//
// We've made our decision, we haven't voted for this term yet and this
// candidate came in first so it gets our vote as all requirements are
// met.
//
this.votes.for = packet.address;
this.emit('vote', packet, true);
this.change({ leader: packet.address, term: packet.term });
write(this.packet('voted', { granted: true }));
node.votes.for = packet.address;
node.emit('vote', packet, true);
node.change({ leader: packet.address, term: packet.term });
write(node.packet('voted', { granted: true }));

//
// We've accepted someone as potential new leader, so we should reset
// our heartbeat to prevent this node from timing out after voting.
// Which would again increment the term causing us to be next CANDIDATE
// and invalidates the request we just got, so that's silly willy.
//
this.heartbeat(this.timeout());
node.heartbeat(node.timeout());
break;

//
Expand All @@ -273,29 +271,29 @@ Node.prototype._initialize = function initialize(options) {
//
// Only accepts votes while we're still in a CANDIDATE state.
//
if (Node.CANDIDATE !== this.state) {
return write(this.packet('error', 'No longer a candidate, ignoring vote'));
if (Node.CANDIDATE !== node.state) {
return write(node.packet('error', 'No longer a candidate, ignoring vote'));
}

//
// Increment our received votes when our voting request has been
// granted by the node that received the data.
//
if (packet.data.granted) {
this.votes.granted++;
node.votes.granted++;
}

//
// Check if we've received the minimal amount of votes required for this
// current voting round to be considered valid.
//
if (this.quorum(this.votes.granted)) {
this.change({ leader: this.address, state: Node.LEADER });
if (node.quorum(node.votes.granted)) {
node.change({ leader: node.address, state: Node.LEADER });

//
// Send a heartbeat message to all connected clients.
//
this.message(Node.FOLLOWER, this.packet('append'));
node.message(Node.FOLLOWER, node.packet('append'));
}

//
Expand All @@ -305,7 +303,7 @@ Node.prototype._initialize = function initialize(options) {
break;

case 'error':
this.emit('error', new Error(packet.data));
node.emit('error', new Error(packet.data));
break;

//
Expand All @@ -332,10 +330,10 @@ Node.prototype._initialize = function initialize(options) {
// return an error.
//
default:
if (this.listeners('rpc').length) {
this.emit('rpc', packet, write);
if (node.listeners('rpc').length) {
node.emit('rpc', packet, write);
} else {
write(this.packet('error', 'Unknown message type: '+ packet.type));
write(node.packet('error', 'Unknown message type: '+ packet.type));
}
}
});
Expand All @@ -344,27 +342,40 @@ Node.prototype._initialize = function initialize(options) {
// We do not need to execute the rest of the functionality below as we're
// currently running as "child" node of the cluster not as the "root" node.
//
if (Node.CHILD === this.state) return;
if (Node.CHILD === node.state) return node.emit('initialize');

//
// Setup the log & appends. Assume that if we're given a function log that it
// needs to be initialized as it requires access to our node instance so it
// can read our information like our leader, state, term etc.
//
if ('function' === this.type(this.Log)) {
this.log = new this.Log(this, options);
if ('function' === node.type(node.Log)) {
node.log = new node.Log(node, options);
}

//
// The node is now listening to events so we can start our heartbeat timeout.
// So that if we don't hear anything from a leader we can promote our selfs to
// a candidate state.
//
// We want to call the `initialize` event before starting a heartbeat so
// implementors have some time to start listening for incoming ping packets.
//
this.emit('initialize');
this.heartbeat(this.timeout());
/**
* The node is now listening to events so we can start our heartbeat timeout.
* So that if we don't hear anything from a leader we can promote our selfs to
* a candidate state.
*
* Start listening listening for heartbeats when implementors are also ready
* with setting up their code.
*
* @api private
*/
function initialize(err) {
if (err) return node.emit('error', err);

node.emit('initialize');
node.heartbeat(node.timeout());
}

if ('function' === node.type(node.initialize)) {
if (node.initialize.length > 1) return node.initialize(options, initialize);
node.initialize(options);
}

initialize();
};

/**
Expand Down
42 changes: 42 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,48 @@ describe('liferaft', function () {

new MyRaft();
});

it('async emits the initialize event once the initialize method is done', function (next) {
var ready = false;

var MyRaft = Raft.extend({
initialize: function initialize(options, init) {
assume(options.custom).equals('options');
assume(ready).is.false();

setTimeout(function () {
ready = true;
init();
}, 100);
}
});

var raft = new MyRaft('foobar', { custom: 'options' });

raft.on('initialize', function () {
assume(ready).is.true();

next();
});
});

it('emits error when the initialize fails', function (next) {
var MyRaft = Raft.extend({
initialize: function initialize(options, init) {
setTimeout(function () {
init(new Error('Failure'));
}, 100);
}
});

var raft = new MyRaft();

raft.on('error', function (err) {
assume(err.message).equals('Failure');

next();
});
});
});

describe('#indefinitely', function () {
Expand Down

0 comments on commit 65c3c2d

Please sign in to comment.