Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Collina <[email protected]>
  • Loading branch information
mcollina committed Jan 13, 2025
1 parent eeac565 commit 6af35b8
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 50 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ endif
NODE ?= node
CPPLINT ?= cpplint.py
BUILDTYPE ?= Release
TESTS = "test/**/*.js"
TESTS = "test/**/*.spec.js"
E2E_TESTS = $(wildcard e2e/*.spec.js)
TEST_REPORTER =
TEST_OUTPUT =
Expand Down
15 changes: 10 additions & 5 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,16 @@ Client.prototype.getLastError = function() {
Client.prototype.disconnect = function(cb) {
var self = this;

if (cb) {
if (this._client) {
this.once('disconnected', function(metrics) {
cb(null, metrics);
});
} else {
cb()
}
}

if (!this._isDisconnecting && this._client) {
this._isDisconnecting = true;
this._client.disconnect(function() {
Expand All @@ -325,12 +335,7 @@ Client.prototype.disconnect = function(cb) {
*/
var metricsCopy = Object.assign({}, self.metrics);
self.emit('disconnected', metricsCopy);
if (cb) {
cb(null, metricsCopy);
}

});

}

return self;
Expand Down
14 changes: 7 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"main": "lib/index.js",
"scripts": {
"configure": "node-gyp configure",
"build": "node-gyp build",
"build": "node-gyp build --debug",
"test": "make test",
"install": "node-gyp rebuild",
"prepack": "node ./ci/prepublish.js"
Expand Down
9 changes: 7 additions & 2 deletions test/consumer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ var defaultConfig = {

module.exports = {
'Consumer': {
'afterEach': function() {
client = null;
'afterEach': function(cb) {
if (client) {
client.disconnect(cb);
client = null;
} else {
cb()
}
},
'cannot be set without a topic config': function() {
t.throws(function() {
Expand Down
16 changes: 14 additions & 2 deletions test/kafka-consumer-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ if (WorkerThreads.isMainThread) {
var worker = new WorkerThreads.Worker(__filename);

var timeout = setTimeout(function() {
process._rawDebug('terminating worker');
worker.terminate();
}, 1000);
}, 10000);

worker.on('message', function(report) {
console.log('received message', report);
Expand All @@ -20,6 +21,10 @@ if (WorkerThreads.isMainThread) {
return;
}

const interval = setInterval(() => {
process._rawDebug('waiting for parent');
}, 1000);

var stream = Kafka.KafkaConsumer.createReadStream({
'metadata.broker.list': 'localhost:9092',
'client.id': 'kafka-mocha-consumer',
Expand All @@ -32,8 +37,15 @@ var stream = Kafka.KafkaConsumer.createReadStream({
});

stream.on('data', function(message) {
process._rawDebug('received message', message);
WorkerThreads.parentPort?.postMessage({ message });
stream.consumer.commitMessage(message);
stream.consumer.disconnect();
stream.close();
stream.close(function () {
setTimeout(() => {
process._rawDebug('exiting');
clearInterval(interval);
process.exit(0);
}, 1000);
});
});
41 changes: 22 additions & 19 deletions test/kafka-consumer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@ module.exports = {
'beforeEach': function() {
client = new KafkaConsumer(defaultConfig, topicConfig);
},
'afterEach': function() {
client = null;
'afterEach': function(cb) {
if (client) {
client.disconnect(cb);
client = null;
} else {
cb()
}
},
'does not modify config and clones it': function () {
t.deepStrictEqual(defaultConfig, {
Expand All @@ -48,36 +53,34 @@ module.exports = {
t.notEqual(topicConfig, client.topicConfig);
},
'does not crash in a worker': function (cb) {
this.timeout(10000);
var consumer = new worker_threads.Worker(
path.join(__dirname, 'kafka-consumer-worker.js')
);

var timeout = setTimeout(function() {
consumer.terminate();
}, 1000);

consumer.on('message', function(msg) {
t.strictEqual(msg.value.toString(), 'my message');
consumer.terminate();
process._rawDebug('got message');
t.strictEqual(Buffer.from(msg.message.value).toString(), 'my message');
});

let stream

consumer.on('exit', function(code) {
clearTimeout(timeout);
process._rawDebug('exiting');
stream.end();
t.strictEqual(code, 0);
cb();
});

consumer.on('online', function() {
const stream = KafkaProducer.createWriteStream({
'metadata.broker.list': 'localhost:9092',
'client.id': 'kafka-mocha-producer',
'dr_cb': true
}, {}, {
topic: 'topic'
});

stream.write(Buffer.from('my message'));
stream = KafkaProducer.createWriteStream({
'metadata.broker.list': 'localhost:9092',
'client.id': 'kafka-mocha-producer',
'dr_cb': true
}, {}, {
topic: 'topic'
});

stream.write(Buffer.from('my message'));
}
},
};
29 changes: 23 additions & 6 deletions test/producer-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,32 @@ var Kafka = require('../');
if (WorkerThreads.isMainThread) {
var worker = new WorkerThreads.Worker(__filename);

var timeout = setTimeout(function() {
worker.terminate();
}, 1000);

worker.on('message', function(report) {
console.log('delivery report', report);
});

worker.on('exit', function(code) {
clearTimeout(timeout);
worker.on('exit', function(code, ...args) {
process._rawDebug('exiting', code);
process.exit(code);
});

return;
}

process.on('unhandledRejection', (reason, promise) => {
process._rawDebug('Unhandled Rejection at:', promise, 'reason:', reason);
process.exit(1);
});

process.on('uncaughtException', (err) => {
process._rawDebug('Uncaught Exception:', err);
process.exit(1);
});

const interval = setInterval(() => {
process._rawDebug('waiting for parent');
}, 1000);

const stream = Kafka.Producer.createWriteStream({
'metadata.broker.list': 'localhost:9092',
'client.id': 'kafka-mocha-producer',
Expand All @@ -29,8 +39,15 @@ const stream = Kafka.Producer.createWriteStream({
});

stream.producer.on('delivery-report', function(err, report) {
process._rawDebug('delivery-report', report);
WorkerThreads.parentPort?.postMessage(report);
stream.producer.disconnect();
stream.close(function() {
clearInterval(interval);
process.exit(0);
});
});

stream.write(Buffer.from('my message'));

process._rawDebug('stream created');
13 changes: 6 additions & 7 deletions test/producer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
var Producer = require('../lib/producer');
var t = require('assert');
var worker_threads = require('worker_threads');
var path = require('path');
// var Mock = require('./mock');

var client;
Expand Down Expand Up @@ -97,21 +98,19 @@ module.exports = {
client.disconnect(next);
},
'does not crash in a worker': function (cb) {
this.timeout(10000);

var producer = new worker_threads.Worker(
path.join(__dirname, 'producer-worker.js')
);

var timeout = setTimeout(function() {
producer.terminate();
}, 1000);

consumer.on('message', (report) => {
producer.on('message', (report) => {
process._rawDebug('got message');
t.strictEqual(report.topic, 'topic');
producer.terminate();
});

producer.on('exit', function(code) {
clearTimeout(timeout);
process._rawDebug('exiting');
t.strictEqual(code, 0);
cb();
});
Expand Down

0 comments on commit 6af35b8

Please sign in to comment.