From c2dfff93fbdaba5363963ab5ae27e421a02de8bf Mon Sep 17 00:00:00 2001 From: Joakim Olsson Date: Wed, 9 Feb 2022 16:28:13 +0100 Subject: [PATCH] feat: add serviceRequestListener and requestResponseHandler --- lib/index.test.ts | 438 ++++++++++++++++++++++++++++++++++++++++++++- lib/index.ts | 51 ++++++ lib/naming.test.ts | 7 + lib/naming.ts | 4 + 4 files changed, 498 insertions(+), 2 deletions(-) diff --git a/lib/index.test.ts b/lib/index.test.ts index 79dfcfc..36e80fe 100644 --- a/lib/index.test.ts +++ b/lib/index.test.ts @@ -6,7 +6,9 @@ import { eventStreamListener, eventStreamPublisher, Publisher, + requestResponseHandler, servicePublisher, + serviceRequestListener, serviceResponseListener, transientEventStreamListener, useLogger, @@ -240,16 +242,86 @@ describe("Connection", () => { servicePublisher("target", svcPublisher), serviceResponseListener("target", "some.key", () => Promise.reject(new Error("handler error")) + ), + serviceRequestListener("email.send", () => + Promise.reject(new Error("handler error")) + ), + requestResponseHandler("sms.send", () => + Promise.reject(new Error("handler error")) ) ) ).resolves.toBeUndefined(); expect(logger).toHaveBeenCalledWith( "Successfully connected to dummy test-cluster 0.0.1" ); + expect(mockCreateChannel).toHaveBeenCalledTimes(1); expect(logger).toHaveBeenCalledWith("nodeamqp started"); expect(mockPrefetch).toHaveBeenCalledWith(100, true); expect(mockOn).toHaveBeenCalledWith("close", expect.any(Function)); - expect(mockAssertQueue).toHaveBeenCalledWith( + expect(mockAssertExchange).toHaveBeenCalledTimes(6); + expect(mockAssertExchange).toHaveBeenNthCalledWith( + 1, + "events.topic.exchange", + "topic", + { + autoDelete: false, + durable: true, + internal: false, + } + ); + expect(mockAssertExchange).toHaveBeenNthCalledWith( + 2, + "target.headers.exchange.response", + "headers", + { + autoDelete: false, + durable: true, + internal: false, + } + ); + expect(mockAssertExchange).toHaveBeenNthCalledWith( + 3, + "dummy.headers.exchange.response", + "headers", + { + autoDelete: false, + durable: true, + internal: false, + } + ); + expect(mockAssertExchange).toHaveBeenNthCalledWith( + 4, + "dummy.headers.exchange.response", + "headers", + { + autoDelete: false, + durable: true, + internal: false, + } + ); + expect(mockAssertExchange).toHaveBeenNthCalledWith( + 5, + "dummy.direct.exchange.request", + "direct", + { + autoDelete: false, + durable: true, + internal: false, + } + ); + expect(mockAssertExchange).toHaveBeenNthCalledWith( + 6, + "dummy.direct.exchange.request", + "direct", + { + autoDelete: false, + durable: true, + internal: false, + } + ); + expect(mockAssertQueue).toHaveBeenCalledTimes(3); + expect(mockAssertQueue).toHaveBeenNthCalledWith( + 1, "dummy.headers.exchange.response", { autoDelete: false, @@ -258,12 +330,67 @@ describe("Connection", () => { expires: 432000000, } ); - expect(mockBindQueue).toHaveBeenCalledWith( + expect(mockAssertQueue).toHaveBeenNthCalledWith( + 2, + "dummy.direct.exchange.request.queue", + { + autoDelete: false, + durable: true, + exclusive: false, + expires: 432000000, + } + ); + expect(mockAssertQueue).toHaveBeenNthCalledWith( + 3, + "dummy.direct.exchange.request.queue", + { + autoDelete: false, + durable: true, + exclusive: false, + expires: 432000000, + } + ); + expect(mockBindQueue).toHaveBeenCalledTimes(3); + expect(mockBindQueue).toHaveBeenNthCalledWith( + 1, "dummy.headers.exchange.response", "target.headers.exchange.response", "some.key", { service: "dummy" } ); + expect(mockBindQueue).toHaveBeenNthCalledWith( + 2, + "dummy.direct.exchange.request.queue", + "dummy.direct.exchange.request", + "email.send", + {} + ); + expect(mockBindQueue).toHaveBeenNthCalledWith( + 3, + "dummy.direct.exchange.request.queue", + "dummy.direct.exchange.request", + "sms.send", + {} + ); + expect(mockConsume).toHaveBeenCalledTimes(2); + expect(mockConsume).toHaveBeenNthCalledWith( + 1, + "dummy.headers.exchange.response", + expect.any(Function), + { + exclusive: false, + noLocal: false, + } + ); + expect(mockConsume).toHaveBeenNthCalledWith( + 2, + "dummy.direct.exchange.request.queue", + expect.any(Function), + { + exclusive: false, + noLocal: false, + } + ); }); it("should reject with error if any setup queue fails", async () => { @@ -693,4 +820,311 @@ describe("Connection", () => { })(msg); expect(mockAck).toHaveBeenCalledWith(msg, false); }); + + it("should do nothing and ack message on request response handler success without response", async () => { + const mockPrefetch = jest.fn(); + const mockAssertExchange = jest.fn(() => Promise.resolve({})); + const mockAssertQueue = jest.fn(() => Promise.resolve()); + const mockBindQueue = jest.fn(() => Promise.resolve()); + const mockConsume = jest.fn(() => Promise.resolve()); + const mockAck = jest.fn(); + const channel = { + prefetch: mockPrefetch, + assertExchange: mockAssertExchange, + assertQueue: mockAssertQueue, + bindQueue: mockBindQueue, + consume: mockConsume, + ack: mockAck, + }; + const mockCreateChannel = jest.fn(() => channel); + const consumeSpy = jest.spyOn(channel, "consume"); + (amqp.connect as jest.Mock).mockResolvedValue({ + connection: { + serverProperties: { + product: "dummy", + cluster_name: "test-cluster", + version: "0.0.1", + }, + }, + createChannel: mockCreateChannel, + }); + + const connection = new Connection("dummy", "amqp-url"); + + const logger = jest.fn(); + const msgLogger = jest.fn(); + await expect( + connection.start( + useLogger({ info: logger, error: logger, debug: logger }), + useMessageLogger(msgLogger), + requestResponseHandler("some.key", () => Promise.resolve()) + ) + ).resolves.toBeUndefined(); + + const msg: ConsumeMessage = { + content: Buffer.from('{"a":"b"}'), + fields: { + consumerTag: "", + exchange: "dummy", + routingKey: "some.key", + deliveryTag: 1, + redelivered: false, + }, + properties: { + headers: {}, + contentType: "application/json", + contentEncoding: "utf-8", + appId: "", + clusterId: "", + correlationId: "", + deliveryMode: "", + expiration: "", + type: "", + messageId: "", + replyTo: "", + priority: "", + userId: "", + timestamp: "", + }, + }; + + // @ts-ignore + const handler: (msg: any) => void = consumeSpy.mock.calls[0][1]; + await handler(msg); + expect(mockAck).toHaveBeenCalledWith(msg, false); + }); + + it("should nack message on request response handler success with response if service header is missing", async () => { + const mockPrefetch = jest.fn(); + const mockAssertExchange = jest.fn(() => Promise.resolve({})); + const mockAssertQueue = jest.fn(() => Promise.resolve()); + const mockBindQueue = jest.fn(() => Promise.resolve()); + const mockConsume = jest.fn(() => Promise.resolve()); + const mockNack = jest.fn(); + const channel = { + prefetch: mockPrefetch, + assertExchange: mockAssertExchange, + assertQueue: mockAssertQueue, + bindQueue: mockBindQueue, + consume: mockConsume, + nack: mockNack, + }; + const mockCreateChannel = jest.fn(() => channel); + const consumeSpy = jest.spyOn(channel, "consume"); + (amqp.connect as jest.Mock).mockResolvedValue({ + connection: { + serverProperties: { + product: "dummy", + cluster_name: "test-cluster", + version: "0.0.1", + }, + }, + createChannel: mockCreateChannel, + }); + + const connection = new Connection("dummy", "amqp-url"); + + const logger = jest.fn(); + const msgLogger = jest.fn(); + await expect( + connection.start( + useLogger({ info: logger, error: logger, debug: logger }), + useMessageLogger(msgLogger), + requestResponseHandler("some.key", () => + Promise.resolve("response-value") + ) + ) + ).resolves.toBeUndefined(); + + const msg: ConsumeMessage = { + content: Buffer.from('{"a":"b"}'), + fields: { + consumerTag: "", + exchange: "dummy", + routingKey: "some.key", + deliveryTag: 1, + redelivered: false, + }, + properties: { + headers: {}, + contentType: "application/json", + contentEncoding: "utf-8", + appId: "", + clusterId: "", + correlationId: "", + deliveryMode: "", + expiration: "", + type: "", + messageId: "", + replyTo: "", + priority: "", + userId: "", + timestamp: "", + }, + }; + + // @ts-ignore + const handler: (msg: any) => void = consumeSpy.mock.calls[0][1]; + await handler(msg); + expect(mockNack).toHaveBeenCalledWith(msg, false, true); + }); + + it("should nack message on request response handler success with response if publish fails", async () => { + const mockPrefetch = jest.fn(); + const mockAssertExchange = jest.fn(() => Promise.resolve({})); + const mockAssertQueue = jest.fn(() => Promise.resolve()); + const mockBindQueue = jest.fn(() => Promise.resolve()); + const mockConsume = jest.fn(() => Promise.resolve()); + const mockNack = jest.fn(); + const mockPublish = jest.fn(() => false); + const channel = { + prefetch: mockPrefetch, + assertExchange: mockAssertExchange, + assertQueue: mockAssertQueue, + bindQueue: mockBindQueue, + consume: mockConsume, + nack: mockNack, + publish: mockPublish, + }; + const mockCreateChannel = jest.fn(() => channel); + const consumeSpy = jest.spyOn(channel, "consume"); + (amqp.connect as jest.Mock).mockResolvedValue({ + connection: { + serverProperties: { + product: "dummy", + cluster_name: "test-cluster", + version: "0.0.1", + }, + }, + createChannel: mockCreateChannel, + }); + + const connection = new Connection("dummy", "amqp-url"); + + const logger = jest.fn(); + const msgLogger = jest.fn(); + await expect( + connection.start( + useLogger({ info: logger, error: logger, debug: logger }), + useMessageLogger(msgLogger), + requestResponseHandler("some.key", () => Promise.resolve({ a: true })) + ) + ).resolves.toBeUndefined(); + + const msg: ConsumeMessage = { + content: Buffer.from('{"a":"b"}'), + fields: { + consumerTag: "", + exchange: "dummy", + routingKey: "some.key", + deliveryTag: 1, + redelivered: false, + }, + properties: { + headers: { service: "caller" }, + contentType: "application/json", + contentEncoding: "utf-8", + appId: "", + clusterId: "", + correlationId: "", + deliveryMode: "", + expiration: "", + type: "", + messageId: "", + replyTo: "", + priority: "", + userId: "", + timestamp: "", + }, + }; + + // @ts-ignore + const handler: (msg: any) => void = consumeSpy.mock.calls[0][1]; + await handler(msg); + expect(mockNack).toHaveBeenCalledWith(msg, false, true); + expect(mockPublish).toHaveBeenCalledWith( + "dummy.headers.exchange.response", + "some.key", + Buffer.from('{"a":true}'), + { + contentType: "application/json", + headers: { service: "caller" }, + } + ); + }); + + it("should ack message on request response handler success with response if publish succeeds", async () => { + const mockPrefetch = jest.fn(); + const mockAssertExchange = jest.fn(() => Promise.resolve({})); + const mockAssertQueue = jest.fn(() => Promise.resolve()); + const mockBindQueue = jest.fn(() => Promise.resolve()); + const mockConsume = jest.fn(() => Promise.resolve()); + const mockAck = jest.fn(); + const mockPublish = jest.fn(() => true); + const channel = { + prefetch: mockPrefetch, + assertExchange: mockAssertExchange, + assertQueue: mockAssertQueue, + bindQueue: mockBindQueue, + consume: mockConsume, + ack: mockAck, + publish: mockPublish, + }; + const mockCreateChannel = jest.fn(() => channel); + const consumeSpy = jest.spyOn(channel, "consume"); + (amqp.connect as jest.Mock).mockResolvedValue({ + connection: { + serverProperties: { + product: "dummy", + cluster_name: "test-cluster", + version: "0.0.1", + }, + }, + createChannel: mockCreateChannel, + }); + + const connection = new Connection("dummy", "amqp-url"); + + const logger = jest.fn(); + const msgLogger = jest.fn(); + await expect( + connection.start( + useLogger({ info: logger, error: logger, debug: logger }), + useMessageLogger(msgLogger), + requestResponseHandler("some.key", () => Promise.resolve({ a: true })) + ) + ).resolves.toBeUndefined(); + + const msg: ConsumeMessage = { + content: Buffer.from('{"a":"b"}'), + fields: { + consumerTag: "", + exchange: "dummy", + routingKey: "some.key", + deliveryTag: 1, + redelivered: false, + }, + properties: { + headers: { service: "caller" }, + contentType: "application/json", + contentEncoding: "utf-8", + appId: "", + clusterId: "", + correlationId: "", + deliveryMode: "", + expiration: "", + type: "", + messageId: "", + replyTo: "", + priority: "", + userId: "", + timestamp: "", + }, + }; + + // @ts-ignore + const handler: (msg: any) => void = consumeSpy.mock.calls[0][1]; + await handler(msg); + expect(mockAck).toHaveBeenCalledWith(msg, false); + }); }); diff --git a/lib/index.ts b/lib/index.ts index 9bdf41e..7f75b16 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -10,6 +10,7 @@ import { serviceEventQueueName, serviceEventRandomQueueName, serviceRequestExchangeName, + serviceRequestQueueName, serviceResponseExchangeName, } from "./naming"; import { LIB_VERSION } from "./version"; @@ -355,6 +356,54 @@ function serviceResponseListener( }; } +function serviceRequestListener(routingKey: string, handler: Handler): Setup { + return async (conn: Connection) => { + const exchangeName = serviceRequestExchangeName(conn.serviceName); + const queueName = serviceRequestQueueName(conn.serviceName); + const resExchangeName = serviceResponseExchangeName(conn.serviceName); + await conn.exchangeDeclare(resExchangeName, "headers"); + return conn.messageHandlerBindQueueToExchange( + queueName, + exchangeName, + routingKey, + "direct", + handler, + {} + ); + }; +} + +function requestResponseHandler(routingKey: string, handler: Handler): Setup { + return async (conn: Connection) => { + return serviceRequestListener(routingKey, (msg, headers) => { + return handler(msg, headers).then((resp) => { + if (resp) { + const service = headers["service"]; + if (!service) { + return Promise.reject(new Error("failed to extract service name")); + } + const content = Buffer.from(JSON.stringify(resp)); + const success = conn.channel?.publish( + serviceResponseExchangeName(conn.serviceName), + routingKey, + content, + { + headers: { service: service }, + contentType: "application/json", + } + ); + if (success) { + return Promise.resolve(); + } + return Promise.reject(new Error("unable to publish message")); + } else { + return Promise.resolve(); + } + }); + })(conn); + }; +} + export { Connection, Publisher, @@ -365,6 +414,8 @@ export { eventStreamPublisher, servicePublisher, serviceResponseListener, + serviceRequestListener, + requestResponseHandler, useLogger, useMessageLogger, }; diff --git a/lib/naming.test.ts b/lib/naming.test.ts index fb4dd91..ddc54f2 100644 --- a/lib/naming.test.ts +++ b/lib/naming.test.ts @@ -5,6 +5,7 @@ import { serviceEventQueueName, serviceEventRandomQueueName, serviceRequestExchangeName, + serviceRequestQueueName, serviceResponseExchangeName, } from "./naming"; @@ -42,4 +43,10 @@ describe("naming", () => { "test.headers.exchange.response" ); }); + + it("serviceRequestQueueName", () => { + expect(serviceRequestQueueName("test")).toBe( + "test.direct.exchange.request.queue" + ); + }); }); diff --git a/lib/naming.ts b/lib/naming.ts index 48697c6..4c79af5 100644 --- a/lib/naming.ts +++ b/lib/naming.ts @@ -19,6 +19,9 @@ const serviceRequestExchangeName = (service: string): string => const serviceResponseExchangeName = (service: string): string => `${service}.headers.exchange.response`; +const serviceRequestQueueName = (service: string): string => + `${serviceRequestExchangeName(service)}.queue`; + export { exchangeName, eventsExchangeName, @@ -26,4 +29,5 @@ export { serviceEventRandomQueueName, serviceRequestExchangeName, serviceResponseExchangeName, + serviceRequestQueueName, };