From b2d36951efa095d5b2fa1be9af6860709774e230 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Tue, 20 Feb 2024 15:51:42 +0100 Subject: [PATCH] refactor(cluster): add explicit message types --- lib/cluster-adapter.ts | 149 +++++++++++++++++++++++++++++++---------- 1 file changed, 113 insertions(+), 36 deletions(-) diff --git a/lib/cluster-adapter.ts b/lib/cluster-adapter.ts index c2e2161..b9df678 100644 --- a/lib/cluster-adapter.ts +++ b/lib/cluster-adapter.ts @@ -1,4 +1,9 @@ -import { Adapter, BroadcastOptions, Room } from "./index"; +import { + Adapter, + type BroadcastFlags, + type BroadcastOptions, + type Room, +} from "./index"; import { debug as debugModule } from "debug"; import { randomBytes } from "crypto"; @@ -10,6 +15,10 @@ function randomId() { return randomBytes(8).toString("hex"); } +type DistributiveOmit = T extends any + ? Omit + : never; + export interface ClusterAdapterOptions { /** * The number of ms between two heartbeats. @@ -38,11 +47,50 @@ export enum MessageType { BROADCAST_ACK, } -export interface ClusterMessage { +export type ClusterMessage = { uid: string; - type: MessageType; - data?: Record; -} + nsp: string; +} & ( + | { + type: MessageType.INITIAL_HEARTBEAT | MessageType.HEARTBEAT; + } + | { + type: MessageType.BROADCAST; + data: { + opts: { rooms: string[]; except: string[]; flags: BroadcastFlags }; + packet: unknown; + requestId?: string; + }; + } + | { + type: MessageType.SOCKETS_JOIN | MessageType.SOCKETS_LEAVE; + data: { + opts: { rooms: string[]; except: string[]; flags: BroadcastFlags }; + rooms: string[]; + }; + } + | { + type: MessageType.DISCONNECT_SOCKETS; + data: { + opts: { rooms: string[]; except: string[]; flags: BroadcastFlags }; + close?: boolean; + }; + } + | { + type: MessageType.FETCH_SOCKETS; + data: { + opts: { rooms: string[]; except: string[]; flags: BroadcastFlags }; + requestId: string; + }; + } + | { + type: MessageType.SERVER_SIDE_EMIT; + data: { + requestId?: string; + packet: unknown; + }; + } +); interface ClusterRequest { type: MessageType; @@ -53,13 +101,39 @@ interface ClusterRequest { responses: any[]; } -interface ClusterResponse { - type: MessageType; - data: { - requestId: string; - [key: string]: unknown; - }; -} +export type ClusterResponse = { + uid: string; + nsp: string; +} & ( + | { + type: MessageType.FETCH_SOCKETS_RESPONSE; + data: { + requestId: string; + sockets: unknown[]; + }; + } + | { + type: MessageType.SERVER_SIDE_EMIT_RESPONSE; + data: { + requestId: string; + packet: unknown; + }; + } + | { + type: MessageType.BROADCAST_CLIENT_COUNT; + data: { + requestId: string; + clientCount: number; + }; + } + | { + type: MessageType.BROADCAST_ACK; + data: { + requestId: string; + packet: unknown; + }; + } +); interface ClusterAckRequest { clientCountCallback: (clientCount: number) => void; @@ -85,7 +159,7 @@ function decodeOptions(opts): BroadcastOptions { /** * A cluster-ready adapter. Any extending class must: * - * - implement {@link ClusterAdapter#publishMessage} and {@link ClusterAdapter#publishResponse} + * - implement {@link ClusterAdapter#doPublish} and {@link ClusterAdapter#doPublishResponse} * - call {@link ClusterAdapter#onMessage} and {@link ClusterAdapter#onResponse} */ export abstract class ClusterAdapter extends Adapter { @@ -125,7 +199,7 @@ export abstract class ClusterAdapter extends Adapter { this.publishResponse(message.uid, { type: MessageType.BROADCAST_CLIENT_COUNT, data: { - requestId: message.data.requestId as string, + requestId: message.data.requestId, clientCount, }, }); @@ -135,7 +209,7 @@ export abstract class ClusterAdapter extends Adapter { this.publishResponse(message.uid, { type: MessageType.BROADCAST_ACK, data: { - requestId: message.data.requestId as string, + requestId: message.data.requestId, packet: arg, }, }); @@ -153,23 +227,17 @@ export abstract class ClusterAdapter extends Adapter { } case MessageType.SOCKETS_JOIN: - super.addSockets( - decodeOptions(message.data.opts), - message.data.rooms as string[] - ); + super.addSockets(decodeOptions(message.data.opts), message.data.rooms); break; case MessageType.SOCKETS_LEAVE: - super.delSockets( - decodeOptions(message.data.opts), - message.data.rooms as string[] - ); + super.delSockets(decodeOptions(message.data.opts), message.data.rooms); break; case MessageType.DISCONNECT_SOCKETS: super.disconnectSockets( decodeOptions(message.data.opts), - message.data.close as boolean + message.data.close ); break; @@ -182,7 +250,7 @@ export abstract class ClusterAdapter extends Adapter { this.publishResponse(message.uid, { type: MessageType.FETCH_SOCKETS_RESPONSE, data: { - requestId: message.data.requestId as string, + requestId: message.data.requestId, sockets: localSockets.map((socket) => { // remove sessionStore from handshake, as it may contain circular references const { sessionStore, ...handshake } = socket.handshake; @@ -216,7 +284,7 @@ export abstract class ClusterAdapter extends Adapter { this.publishResponse(message.uid, { type: MessageType.SERVER_SIDE_EMIT_RESPONSE, data: { - requestId: message.data.requestId as string, + requestId: message.data.requestId, packet: arg, }, }); @@ -247,7 +315,7 @@ export abstract class ClusterAdapter extends Adapter { case MessageType.BROADCAST_CLIENT_COUNT: { this.ackRequests .get(requestId) - ?.clientCountCallback(response.data.clientCount as number); + ?.clientCountCallback(response.data.clientCount); break; } @@ -264,7 +332,7 @@ export abstract class ClusterAdapter extends Adapter { } request.current++; - (response.data.sockets as any[]).forEach((socket) => + response.data.sockets.forEach((socket) => request.responses.push(socket) ); @@ -295,6 +363,7 @@ export abstract class ClusterAdapter extends Adapter { } default: + // @ts-ignore debug("unknown response type: %s", response.type); } } @@ -537,11 +606,10 @@ export abstract class ClusterAdapter extends Adapter { }); } - protected publish(message: Omit) { - return this.publishMessage({ - uid: this.uid, - ...message, - }); + protected publish(message: DistributiveOmit) { + (message as ClusterMessage).uid = this.uid; + (message as ClusterMessage).nsp = this.nsp.name; + return this.doPublish(message as ClusterMessage); } /** @@ -551,7 +619,16 @@ export abstract class ClusterAdapter extends Adapter { * @protected * @return an offset, if applicable */ - protected abstract publishMessage(message: ClusterMessage): Promise; + protected abstract doPublish(message: ClusterMessage): Promise; + + protected publishResponse( + requesterUid: string, + response: Omit + ) { + (response as ClusterResponse).uid = this.uid; + (response as ClusterResponse).nsp = this.nsp.name; + return this.doPublishResponse(requesterUid, response as ClusterResponse); + } /** * Send a response to the given member of the cluster. @@ -560,7 +637,7 @@ export abstract class ClusterAdapter extends Adapter { * @param response * @protected */ - protected abstract publishResponse( + protected abstract doPublishResponse( requesterUid: string, response: ClusterResponse ); @@ -640,7 +717,7 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter { return Promise.resolve(1 + this.nodesMap.size); } - override publish(message: Omit) { + override publish(message: DistributiveOmit) { this.scheduleHeartbeat(); return super.publish(message);