Skip to content

Commit

Permalink
refactor(cluster): add explicit message types
Browse files Browse the repository at this point in the history
  • Loading branch information
darrachequesne committed Feb 20, 2024
1 parent b157e9e commit b2d3695
Showing 1 changed file with 113 additions and 36 deletions.
149 changes: 113 additions & 36 deletions lib/cluster-adapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { Adapter, BroadcastOptions, Room } from "./index";
import {
Adapter,
type BroadcastFlags,
type BroadcastOptions,
type Room,
} from "./index";
import { debug as debugModule } from "debug";
import { randomBytes } from "crypto";

Expand All @@ -10,6 +15,10 @@ function randomId() {
return randomBytes(8).toString("hex");
}

type DistributiveOmit<T, K extends keyof any> = T extends any
? Omit<T, K>
: never;

export interface ClusterAdapterOptions {
/**
* The number of ms between two heartbeats.
Expand Down Expand Up @@ -38,11 +47,50 @@ export enum MessageType {
BROADCAST_ACK,
}

export interface ClusterMessage {
export type ClusterMessage = {
uid: string;
type: MessageType;
data?: Record<string, unknown>;
}
nsp: string;
} & (
| {
type: MessageType.INITIAL_HEARTBEAT | MessageType.HEARTBEAT;
}
| {
type: MessageType.BROADCAST;
data: {
opts: { rooms: string[]; except: string[]; flags: BroadcastFlags };
packet: unknown;
requestId?: string;
};
}
| {
type: MessageType.SOCKETS_JOIN | MessageType.SOCKETS_LEAVE;
data: {
opts: { rooms: string[]; except: string[]; flags: BroadcastFlags };
rooms: string[];
};
}
| {
type: MessageType.DISCONNECT_SOCKETS;
data: {
opts: { rooms: string[]; except: string[]; flags: BroadcastFlags };
close?: boolean;
};
}
| {
type: MessageType.FETCH_SOCKETS;
data: {
opts: { rooms: string[]; except: string[]; flags: BroadcastFlags };
requestId: string;
};
}
| {
type: MessageType.SERVER_SIDE_EMIT;
data: {
requestId?: string;
packet: unknown;
};
}
);

interface ClusterRequest {
type: MessageType;
Expand All @@ -53,13 +101,39 @@ interface ClusterRequest {
responses: any[];
}

interface ClusterResponse {
type: MessageType;
data: {
requestId: string;
[key: string]: unknown;
};
}
export type ClusterResponse = {
uid: string;
nsp: string;
} & (
| {
type: MessageType.FETCH_SOCKETS_RESPONSE;
data: {
requestId: string;
sockets: unknown[];
};
}
| {
type: MessageType.SERVER_SIDE_EMIT_RESPONSE;
data: {
requestId: string;
packet: unknown;
};
}
| {
type: MessageType.BROADCAST_CLIENT_COUNT;
data: {
requestId: string;
clientCount: number;
};
}
| {
type: MessageType.BROADCAST_ACK;
data: {
requestId: string;
packet: unknown;
};
}
);

interface ClusterAckRequest {
clientCountCallback: (clientCount: number) => void;
Expand All @@ -85,7 +159,7 @@ function decodeOptions(opts): BroadcastOptions {
/**
* A cluster-ready adapter. Any extending class must:
*
* - implement {@link ClusterAdapter#publishMessage} and {@link ClusterAdapter#publishResponse}
* - implement {@link ClusterAdapter#doPublish} and {@link ClusterAdapter#doPublishResponse}
* - call {@link ClusterAdapter#onMessage} and {@link ClusterAdapter#onResponse}
*/
export abstract class ClusterAdapter extends Adapter {
Expand Down Expand Up @@ -125,7 +199,7 @@ export abstract class ClusterAdapter extends Adapter {
this.publishResponse(message.uid, {
type: MessageType.BROADCAST_CLIENT_COUNT,
data: {
requestId: message.data.requestId as string,
requestId: message.data.requestId,
clientCount,
},
});
Expand All @@ -135,7 +209,7 @@ export abstract class ClusterAdapter extends Adapter {
this.publishResponse(message.uid, {
type: MessageType.BROADCAST_ACK,
data: {
requestId: message.data.requestId as string,
requestId: message.data.requestId,
packet: arg,
},
});
Expand All @@ -153,23 +227,17 @@ export abstract class ClusterAdapter extends Adapter {
}

case MessageType.SOCKETS_JOIN:
super.addSockets(
decodeOptions(message.data.opts),
message.data.rooms as string[]
);
super.addSockets(decodeOptions(message.data.opts), message.data.rooms);
break;

case MessageType.SOCKETS_LEAVE:
super.delSockets(
decodeOptions(message.data.opts),
message.data.rooms as string[]
);
super.delSockets(decodeOptions(message.data.opts), message.data.rooms);
break;

case MessageType.DISCONNECT_SOCKETS:
super.disconnectSockets(
decodeOptions(message.data.opts),
message.data.close as boolean
message.data.close
);
break;

Expand All @@ -182,7 +250,7 @@ export abstract class ClusterAdapter extends Adapter {
this.publishResponse(message.uid, {
type: MessageType.FETCH_SOCKETS_RESPONSE,
data: {
requestId: message.data.requestId as string,
requestId: message.data.requestId,
sockets: localSockets.map((socket) => {
// remove sessionStore from handshake, as it may contain circular references
const { sessionStore, ...handshake } = socket.handshake;
Expand Down Expand Up @@ -216,7 +284,7 @@ export abstract class ClusterAdapter extends Adapter {
this.publishResponse(message.uid, {
type: MessageType.SERVER_SIDE_EMIT_RESPONSE,
data: {
requestId: message.data.requestId as string,
requestId: message.data.requestId,
packet: arg,
},
});
Expand Down Expand Up @@ -247,7 +315,7 @@ export abstract class ClusterAdapter extends Adapter {
case MessageType.BROADCAST_CLIENT_COUNT: {
this.ackRequests
.get(requestId)
?.clientCountCallback(response.data.clientCount as number);
?.clientCountCallback(response.data.clientCount);
break;
}

Expand All @@ -264,7 +332,7 @@ export abstract class ClusterAdapter extends Adapter {
}

request.current++;
(response.data.sockets as any[]).forEach((socket) =>
response.data.sockets.forEach((socket) =>
request.responses.push(socket)
);

Expand Down Expand Up @@ -295,6 +363,7 @@ export abstract class ClusterAdapter extends Adapter {
}

default:
// @ts-ignore
debug("unknown response type: %s", response.type);
}
}
Expand Down Expand Up @@ -537,11 +606,10 @@ export abstract class ClusterAdapter extends Adapter {
});
}

protected publish(message: Omit<ClusterMessage, "uid">) {
return this.publishMessage({
uid: this.uid,
...message,
});
protected publish(message: DistributiveOmit<ClusterMessage, "nsp" | "uid">) {
(message as ClusterMessage).uid = this.uid;
(message as ClusterMessage).nsp = this.nsp.name;
return this.doPublish(message as ClusterMessage);
}

/**
Expand All @@ -551,7 +619,16 @@ export abstract class ClusterAdapter extends Adapter {
* @protected
* @return an offset, if applicable
*/
protected abstract publishMessage(message: ClusterMessage): Promise<string>;
protected abstract doPublish(message: ClusterMessage): Promise<string>;

protected publishResponse(
requesterUid: string,
response: Omit<ClusterResponse, "nsp" | "uid">
) {
(response as ClusterResponse).uid = this.uid;
(response as ClusterResponse).nsp = this.nsp.name;
return this.doPublishResponse(requesterUid, response as ClusterResponse);
}

/**
* Send a response to the given member of the cluster.
Expand All @@ -560,7 +637,7 @@ export abstract class ClusterAdapter extends Adapter {
* @param response
* @protected
*/
protected abstract publishResponse(
protected abstract doPublishResponse(
requesterUid: string,
response: ClusterResponse
);
Expand Down Expand Up @@ -640,7 +717,7 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
return Promise.resolve(1 + this.nodesMap.size);
}

override publish(message: Omit<ClusterMessage, "uid">) {
override publish(message: DistributiveOmit<ClusterMessage, "nsp" | "uid">) {
this.scheduleHeartbeat();

return super.publish(message);
Expand Down

0 comments on commit b2d3695

Please sign in to comment.