From 6af35b8eb10cacb17190e7c78cfbf77e5d2fff74 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 13 Jan 2025 10:08:51 +0100 Subject: [PATCH] fixup Signed-off-by: Matteo Collina --- Makefile | 2 +- lib/client.js | 15 ++++++++----- package-lock.json | 14 ++++++------ package.json | 2 +- test/consumer.spec.js | 9 ++++++-- test/kafka-consumer-worker.js | 16 ++++++++++++-- test/kafka-consumer.spec.js | 41 +++++++++++++++++++---------------- test/producer-worker.js | 29 ++++++++++++++++++++----- test/producer.spec.js | 13 +++++------ 9 files changed, 91 insertions(+), 50 deletions(-) diff --git a/Makefile b/Makefile index 0b6e990..c1d49fd 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ endif NODE ?= node CPPLINT ?= cpplint.py BUILDTYPE ?= Release -TESTS = "test/**/*.js" +TESTS = "test/**/*.spec.js" E2E_TESTS = $(wildcard e2e/*.spec.js) TEST_REPORTER = TEST_OUTPUT = diff --git a/lib/client.js b/lib/client.js index 354f544..5f50799 100644 --- a/lib/client.js +++ b/lib/client.js @@ -307,6 +307,16 @@ Client.prototype.getLastError = function() { Client.prototype.disconnect = function(cb) { var self = this; + if (cb) { + if (this._client) { + this.once('disconnected', function(metrics) { + cb(null, metrics); + }); + } else { + cb() + } + } + if (!this._isDisconnecting && this._client) { this._isDisconnecting = true; this._client.disconnect(function() { @@ -325,12 +335,7 @@ Client.prototype.disconnect = function(cb) { */ var metricsCopy = Object.assign({}, self.metrics); self.emit('disconnected', metricsCopy); - if (cb) { - cb(null, metricsCopy); - } - }); - } return self; diff --git a/package-lock.json b/package-lock.json index 5b47027..da89cf8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -47,13 +47,13 @@ } }, "node_modules/@babel/parser": { - "version": "7.26.3", - "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.26.3.tgz", - "integrity": "sha512-WJ/CvmY8Mea8iDXo6a7RK2wbmJITT5fN3BEkRuFlxVyNx8jOKIIhmC4fSkTcPcf8JyavbBwIe6OpiCOBXt/IcA==", + "version": "7.26.5", + "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.26.5.tgz", + "integrity": "sha512-SRJ4jYmXRqV1/Xc+TIVG84WjHBXKlxO9sHQnA2Pf12QQEAp1LOh6kDzNHXcUnbH1QI0FDoPPVOt+vyUDucxpaw==", "dev": true, "license": "MIT", "dependencies": { - "@babel/types": "^7.26.3" + "@babel/types": "^7.26.5" }, "bin": { "parser": "bin/babel-parser.js" @@ -63,9 +63,9 @@ } }, "node_modules/@babel/types": { - "version": "7.26.3", - "resolved": "https://registry.npmjs.org/@babel/types/-/types-7.26.3.tgz", - "integrity": "sha512-vN5p+1kl59GVKMvTHt55NzzmYVxprfJD+ql7U9NFIfKCBkYE55LYtS+WtPlaYOyzydrKI8Nezd+aZextrd+FMA==", + "version": "7.26.5", + "resolved": "https://registry.npmjs.org/@babel/types/-/types-7.26.5.tgz", + "integrity": "sha512-L6mZmwFDK6Cjh1nRCLXpa6no13ZIioJDz7mdkzHv399pThrTa/k0nUlNaenOeh2kWu/iaOQYElEpKPUswUa9Vg==", "dev": true, "license": "MIT", "dependencies": { diff --git a/package.json b/package.json index ad07eb2..9efcabf 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,7 @@ "main": "lib/index.js", "scripts": { "configure": "node-gyp configure", - "build": "node-gyp build", + "build": "node-gyp build --debug", "test": "make test", "install": "node-gyp rebuild", "prepack": "node ./ci/prepublish.js" diff --git a/test/consumer.spec.js b/test/consumer.spec.js index 4fc3bd5..748c56e 100644 --- a/test/consumer.spec.js +++ b/test/consumer.spec.js @@ -19,8 +19,13 @@ var defaultConfig = { module.exports = { 'Consumer': { - 'afterEach': function() { - client = null; + 'afterEach': function(cb) { + if (client) { + client.disconnect(cb); + client = null; + } else { + cb() + } }, 'cannot be set without a topic config': function() { t.throws(function() { diff --git a/test/kafka-consumer-worker.js b/test/kafka-consumer-worker.js index 20a8963..7aa9c64 100644 --- a/test/kafka-consumer-worker.js +++ b/test/kafka-consumer-worker.js @@ -5,8 +5,9 @@ if (WorkerThreads.isMainThread) { var worker = new WorkerThreads.Worker(__filename); var timeout = setTimeout(function() { + process._rawDebug('terminating worker'); worker.terminate(); - }, 1000); + }, 10000); worker.on('message', function(report) { console.log('received message', report); @@ -20,6 +21,10 @@ if (WorkerThreads.isMainThread) { return; } +const interval = setInterval(() => { + process._rawDebug('waiting for parent'); +}, 1000); + var stream = Kafka.KafkaConsumer.createReadStream({ 'metadata.broker.list': 'localhost:9092', 'client.id': 'kafka-mocha-consumer', @@ -32,8 +37,15 @@ var stream = Kafka.KafkaConsumer.createReadStream({ }); stream.on('data', function(message) { + process._rawDebug('received message', message); WorkerThreads.parentPort?.postMessage({ message }); stream.consumer.commitMessage(message); stream.consumer.disconnect(); - stream.close(); + stream.close(function () { + setTimeout(() => { + process._rawDebug('exiting'); + clearInterval(interval); + process.exit(0); + }, 1000); + }); }); diff --git a/test/kafka-consumer.spec.js b/test/kafka-consumer.spec.js index 0233087..67a152e 100644 --- a/test/kafka-consumer.spec.js +++ b/test/kafka-consumer.spec.js @@ -26,8 +26,13 @@ module.exports = { 'beforeEach': function() { client = new KafkaConsumer(defaultConfig, topicConfig); }, - 'afterEach': function() { - client = null; + 'afterEach': function(cb) { + if (client) { + client.disconnect(cb); + client = null; + } else { + cb() + } }, 'does not modify config and clones it': function () { t.deepStrictEqual(defaultConfig, { @@ -48,36 +53,34 @@ module.exports = { t.notEqual(topicConfig, client.topicConfig); }, 'does not crash in a worker': function (cb) { + this.timeout(10000); var consumer = new worker_threads.Worker( path.join(__dirname, 'kafka-consumer-worker.js') ); - var timeout = setTimeout(function() { - consumer.terminate(); - }, 1000); - consumer.on('message', function(msg) { - t.strictEqual(msg.value.toString(), 'my message'); - consumer.terminate(); + process._rawDebug('got message'); + t.strictEqual(Buffer.from(msg.message.value).toString(), 'my message'); }); + let stream + consumer.on('exit', function(code) { - clearTimeout(timeout); + process._rawDebug('exiting'); + stream.end(); t.strictEqual(code, 0); cb(); }); - consumer.on('online', function() { - const stream = KafkaProducer.createWriteStream({ - 'metadata.broker.list': 'localhost:9092', - 'client.id': 'kafka-mocha-producer', - 'dr_cb': true - }, {}, { - topic: 'topic' - }); - - stream.write(Buffer.from('my message')); + stream = KafkaProducer.createWriteStream({ + 'metadata.broker.list': 'localhost:9092', + 'client.id': 'kafka-mocha-producer', + 'dr_cb': true + }, {}, { + topic: 'topic' }); + + stream.write(Buffer.from('my message')); } }, }; diff --git a/test/producer-worker.js b/test/producer-worker.js index 677102c..160cd33 100644 --- a/test/producer-worker.js +++ b/test/producer-worker.js @@ -4,22 +4,32 @@ var Kafka = require('../'); if (WorkerThreads.isMainThread) { var worker = new WorkerThreads.Worker(__filename); - var timeout = setTimeout(function() { - worker.terminate(); - }, 1000); - worker.on('message', function(report) { console.log('delivery report', report); }); - worker.on('exit', function(code) { - clearTimeout(timeout); + worker.on('exit', function(code, ...args) { + process._rawDebug('exiting', code); process.exit(code); }); return; } +process.on('unhandledRejection', (reason, promise) => { + process._rawDebug('Unhandled Rejection at:', promise, 'reason:', reason); + process.exit(1); +}); + +process.on('uncaughtException', (err) => { + process._rawDebug('Uncaught Exception:', err); + process.exit(1); +}); + +const interval = setInterval(() => { + process._rawDebug('waiting for parent'); +}, 1000); + const stream = Kafka.Producer.createWriteStream({ 'metadata.broker.list': 'localhost:9092', 'client.id': 'kafka-mocha-producer', @@ -29,8 +39,15 @@ const stream = Kafka.Producer.createWriteStream({ }); stream.producer.on('delivery-report', function(err, report) { + process._rawDebug('delivery-report', report); WorkerThreads.parentPort?.postMessage(report); stream.producer.disconnect(); + stream.close(function() { + clearInterval(interval); + process.exit(0); + }); }); stream.write(Buffer.from('my message')); + +process._rawDebug('stream created'); diff --git a/test/producer.spec.js b/test/producer.spec.js index c860e9d..5b975a4 100644 --- a/test/producer.spec.js +++ b/test/producer.spec.js @@ -10,6 +10,7 @@ var Producer = require('../lib/producer'); var t = require('assert'); var worker_threads = require('worker_threads'); +var path = require('path'); // var Mock = require('./mock'); var client; @@ -97,21 +98,19 @@ module.exports = { client.disconnect(next); }, 'does not crash in a worker': function (cb) { + this.timeout(10000); + var producer = new worker_threads.Worker( path.join(__dirname, 'producer-worker.js') ); - var timeout = setTimeout(function() { - producer.terminate(); - }, 1000); - - consumer.on('message', (report) => { + producer.on('message', (report) => { + process._rawDebug('got message'); t.strictEqual(report.topic, 'topic'); - producer.terminate(); }); producer.on('exit', function(code) { - clearTimeout(timeout); + process._rawDebug('exiting'); t.strictEqual(code, 0); cb(); });