Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding unit tests for FoxgloveWebSocketPlayer module #335

Merged
merged 8 commits into from
Feb 4, 2025
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// SPDX-FileCopyrightText: Copyright (C) 2023-2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)<[email protected]>
// SPDX-License-Identifier: MPL-2.0

import { JsonMessageWriter } from "@lichtblick/suite-base/players/FoxgloveWebSocketPlayer/JsonMessageWriter";

describe("JsonMessageWriter", () => {
const writer = new JsonMessageWriter();

it("should return a message converted to a Uint8Array", () => {
const message = { text: "test message" };

const result = writer.writeMessage(message);

expect(result).toHaveLength(result.length);
});

it("should return an empty Uint8array because the message recieved was undefined", () => {
const message = undefined;

const result = writer.writeMessage(message);
const expected = new Uint8Array(Buffer.from(""));

expect(result).toEqual(expected);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// License, v2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/

import { MessageWriter } from "./MessageWriter";
import { MessageWriter } from "./types";

export class JsonMessageWriter implements MessageWriter {
public writeMessage(message: unknown): Uint8Array {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// SPDX-FileCopyrightText: Copyright (C) 2023-2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)<[email protected]>
// SPDX-License-Identifier: MPL-2.0

import BasicBuilder from "@lichtblick/suite-base/testing/builders/BasicBuilder";

import WorkerSocketAdapter from "./WorkerSocketAdapter";

describe("WorkerSocketAdapter", () => {
let workerMock: any;

beforeEach(() => {
workerMock = {
postMessage: jest.fn(),
terminate: jest.fn(),
onmessage: undefined as ((event: MessageEvent) => void) | undefined,
};

global.Worker = jest.fn(() => workerMock as unknown as Worker);

new WorkerSocketAdapter("wss://example.com");
laisspportugal marked this conversation as resolved.
Show resolved Hide resolved
});

it("WorkerSocketAdapter should open a WebSocket connection", () => {
workerMock.onmessage?.({ data: { type: "open", protocol: "json" } } as MessageEvent);

expect(workerMock.postMessage).toHaveBeenCalledWith({
type: "open",
data: { wsUrl: "wss://example.com", protocols: undefined },
});
});

it("WorkerSocketAdapter should close a WebSocket connection", () => {
workerMock.onmessage?.({ data: { type: "close", data: {} } } as MessageEvent);

expect(workerMock.terminate).toHaveBeenCalled();
});

it("WorkerSocketAdapter should send a message", () => {
const socket = new WorkerSocketAdapter("wss://example.com");

socket.send("Hello, World!");

expect(workerMock.postMessage).toHaveBeenCalledWith({
type: "data",
data: "Hello, World!",
laisspportugal marked this conversation as resolved.
Show resolved Hide resolved
});
});

it("WorkerSocketAdapter should handle an error", () => {
workerMock.onmessage?.({
data: { type: "error", error: "Something went wrong" },
} as MessageEvent);

expect(workerMock.postMessage).toHaveBeenCalledWith({
type: "open",
data: { wsUrl: "wss://example.com", protocols: undefined },
});
});

it("WorkerSocketAdapter should handle a message", () => {
workerMock.onmessage?.({
data: { type: "message", data: BasicBuilder.string() },
} as MessageEvent);

expect(workerMock.postMessage).toHaveBeenCalledWith({
type: "open",
data: { wsUrl: "wss://example.com", protocols: undefined },
});
});
});
laisspportugal marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import { IWebSocket } from "@foxglove/ws-protocol";

import { FromWorkerMessage, ToWorkerMessage } from "./worker";
import { ToWorkerMessage, FromWorkerMessage } from "./types";

export default class WorkerSocketAdapter implements IWebSocket {
#worker: Worker;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// SPDX-FileCopyrightText: Copyright (C) 2023-2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)<[email protected]>
// SPDX-License-Identifier: MPL-2.0

/** Suppress warnings about messages on unknown subscriptions if the susbscription was recently canceled. */
export const SUBSCRIPTION_WARNING_SUPPRESSION_MS = 2000;

export const ZERO_TIME = Object.freeze({ sec: 0, nsec: 0 });
export const GET_ALL_PARAMS_REQUEST_ID = "get-all-params";
export const GET_ALL_PARAMS_PERIOD_MS = 15000;
export const ROS_ENCODINGS = ["ros1", "cdr"];
export const SUPPORTED_PUBLICATION_ENCODINGS = ["json", ...ROS_ENCODINGS];
export const FALLBACK_PUBLICATION_ENCODING = "json";
export const SUPPORTED_SERVICE_ENCODINGS = ["json", ...ROS_ENCODINGS];

/**
* When the tab is inactive setTimeout's are throttled to at most once per second.
* Because the MessagePipeline listener uses timeouts to resolve its promises, it throttles our ability to
* emit a frame more than once per second. In the websocket player this was causing
* an accumulation of messages that were waiting to be emitted, this could keep growing
* indefinitely if the rate at which we emit a frame is low enough.
* 400MB
*/
export const CURRENT_FRAME_MAXIMUM_SIZE_BYTES = 400 * 1024 * 1024;
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// SPDX-FileCopyrightText: Copyright (C) 2023-2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)<[email protected]>
// SPDX-License-Identifier: MPL-2.0

import { StatusLevel } from "@foxglove/ws-protocol";

import {
dataTypeToFullName,
statusLevelToProblemSeverity,
} from "@lichtblick/suite-base/players/FoxgloveWebSocketPlayer/helpers";
import BasicBuilder from "@lichtblick/suite-base/testing/builders/BasicBuilder";

describe("dataTypeToFullName", () => {
it("should convert dataType to include /msg/ on it", () => {
const message = "unit/test";

const result = dataTypeToFullName(message);

expect(result).toBe("unit/msg/test");
});

it("should return the message unaltered if it differs from the 'text/text' format", () => {
const message = BasicBuilder.string();

const result = dataTypeToFullName(message);

expect(result).toBe(message);
});
});

describe("statusLevelToProblemSeverity", () => {
type StatusLevelToProblemTest = [level: StatusLevel, result: string];

it.each<StatusLevelToProblemTest>([
[StatusLevel.INFO, "info"],
[StatusLevel.WARNING, "warn"],
[StatusLevel.ERROR, "error"],
])("should map StatusLevel %s to result %s", (level, result) => {
expect(statusLevelToProblemSeverity(level)).toBe(result);
});
});
24 changes: 24 additions & 0 deletions packages/suite-base/src/players/FoxgloveWebSocketPlayer/helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// SPDX-FileCopyrightText: Copyright (C) 2023-2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)<[email protected]>
// SPDX-License-Identifier: MPL-2.0

import { StatusLevel } from "@foxglove/ws-protocol";

import { PlayerProblem } from "@lichtblick/suite-base/players/types";

export function dataTypeToFullName(dataType: string): string {
const parts = dataType.split("/");
if (parts.length === 2) {
return `${parts[0]}/msg/${parts[1]}`;
}
return dataType;
}

export function statusLevelToProblemSeverity(level: StatusLevel): PlayerProblem["severity"] {
if (level === StatusLevel.INFO) {
return "info";
} else if (level === StatusLevel.WARNING) {
return "warn";
} else {
return "error";
}
}
78 changes: 21 additions & 57 deletions packages/suite-base/src/players/FoxgloveWebSocketPlayer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@
// License, v2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/
import {
Channel,
ChannelId,
ClientChannel,
FoxgloveClient,
ServerCapability,
SubscriptionId,
Service,
ServiceCallPayload,
ServiceCallRequest,
ServiceCallResponse,
Expand All @@ -27,7 +24,7 @@ import { v4 as uuidv4 } from "uuid";

import { debouncePromise } from "@lichtblick/den/async";
import Log from "@lichtblick/log";
import { parseChannel, ParsedChannel } from "@lichtblick/mcap-support";
import { parseChannel } from "@lichtblick/mcap-support";
import { MessageDefinition, isMsgDefEqual } from "@lichtblick/message-definition";
import CommonRosTypes from "@lichtblick/rosmsg-msgs-common";
import { MessageWriter as Ros1MessageWriter } from "@lichtblick/rosmsg-serialization";
Expand All @@ -54,46 +51,31 @@ import {
import rosDatatypesToMessageDefinition from "@lichtblick/suite-base/util/rosDatatypesToMessageDefinition";

import { JsonMessageWriter } from "./JsonMessageWriter";
import { MessageWriter } from "./MessageWriter";
import WorkerSocketAdapter from "./WorkerSocketAdapter";
import {
CURRENT_FRAME_MAXIMUM_SIZE_BYTES,
FALLBACK_PUBLICATION_ENCODING,
GET_ALL_PARAMS_PERIOD_MS,
GET_ALL_PARAMS_REQUEST_ID,
ROS_ENCODINGS,
SUBSCRIPTION_WARNING_SUPPRESSION_MS,
SUPPORTED_PUBLICATION_ENCODINGS,
SUPPORTED_SERVICE_ENCODINGS,
ZERO_TIME,
} from "./constants";
import { dataTypeToFullName, statusLevelToProblemSeverity } from "./helpers";
import {
MessageWriter,
MessageDefinitionMap,
Publication,
ResolvedChannel,
ResolvedService,
} from "./types";

const log = Log.getLogger(__dirname);
const textEncoder = new TextEncoder();
const textDecoder = new TextDecoder();

/** Suppress warnings about messages on unknown subscriptions if the susbscription was recently canceled. */
const SUBSCRIPTION_WARNING_SUPPRESSION_MS = 2000;

const ZERO_TIME = Object.freeze({ sec: 0, nsec: 0 });
const GET_ALL_PARAMS_REQUEST_ID = "get-all-params";
const GET_ALL_PARAMS_PERIOD_MS = 15000;
const ROS_ENCODINGS = ["ros1", "cdr"];
const SUPPORTED_PUBLICATION_ENCODINGS = ["json", ...ROS_ENCODINGS];
const FALLBACK_PUBLICATION_ENCODING = "json";
const SUPPORTED_SERVICE_ENCODINGS = ["json", ...ROS_ENCODINGS];

type ResolvedChannel = {
channel: Channel;
parsedChannel: ParsedChannel;
};
type Publication = ClientChannel & { messageWriter?: Ros1MessageWriter | Ros2MessageWriter };
type ResolvedService = {
service: Service;
parsedResponse: ParsedChannel;
requestMessageWriter: MessageWriter;
};
type MessageDefinitionMap = Map<string, MessageDefinition>;

/**
* When the tab is inactive setTimeout's are throttled to at most once per second.
* Because the MessagePipeline listener uses timeouts to resolve its promises, it throttles our ability to
* emit a frame more than once per second. In the websocket player this was causing
* an accumulation of messages that were waiting to be emitted, this could keep growing
* indefinitely if the rate at which we emit a frame is low enough.
* 400MB
*/
const CURRENT_FRAME_MAXIMUM_SIZE_BYTES = 400 * 1024 * 1024;

export default class FoxgloveWebSocketPlayer implements Player {
readonly #sourceId: string;

Expand Down Expand Up @@ -1095,7 +1077,7 @@ export default class FoxgloveWebSocketPlayer implements Player {
: value;
};
const message = Buffer.from(JSON.stringify(msg, replacer) ?? "");
this.#client.sendMessage(clientChannel.id, message);
this.#client.sendMessage(clientChannel.id, new Uint8Array(message));
} else if (
ROS_ENCODINGS.includes(clientChannel.encoding) &&
clientChannel.messageWriter != undefined
Expand Down Expand Up @@ -1361,21 +1343,3 @@ export default class FoxgloveWebSocketPlayer implements Player {
}
}
}

function dataTypeToFullName(dataType: string): string {
const parts = dataType.split("/");
if (parts.length === 2) {
return `${parts[0]}/msg/${parts[1]}`;
}
return dataType;
}

function statusLevelToProblemSeverity(level: StatusLevel): PlayerProblem["severity"] {
if (level === StatusLevel.INFO) {
return "info";
} else if (level === StatusLevel.WARNING) {
return "warn";
} else {
return "error";
}
}
39 changes: 39 additions & 0 deletions packages/suite-base/src/players/FoxgloveWebSocketPlayer/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// SPDX-FileCopyrightText: Copyright (C) 2023-2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)<[email protected]>
// SPDX-License-Identifier: MPL-2.0

// This Source Code Form is subject to the terms of the Mozilla Public
// License, v2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/

import { Channel, ClientChannel, Service } from "@foxglove/ws-protocol";

import { ParsedChannel } from "@lichtblick/mcap-support";
import { MessageDefinition } from "@lichtblick/message-definition";
import { MessageWriter as Ros1MessageWriter } from "@lichtblick/rosmsg-serialization";
import { MessageWriter as Ros2MessageWriter } from "@lichtblick/rosmsg2-serialization";

export type ResolvedChannel = {
channel: Channel;
parsedChannel: ParsedChannel;
};
export type Publication = ClientChannel & { messageWriter?: Ros1MessageWriter | Ros2MessageWriter };
export type ResolvedService = {
service: Service;
parsedResponse: ParsedChannel;
requestMessageWriter: MessageWriter;
};
export type MessageDefinitionMap = Map<string, MessageDefinition>;
export type FromWorkerMessage =
| { type: "open"; protocol: string }
| { type: "close"; data: unknown }
| { type: "error"; error: unknown }
| { type: "message"; data: unknown };

export type ToWorkerMessage =
| { type: "open"; data: { wsUrl: string; protocols?: string[] | string } }
| { type: "close"; data: undefined }
| { type: "data"; data: string | ArrayBuffer | ArrayBufferView };

export interface MessageWriter {
writeMessage(message: unknown): Uint8Array;
}
Loading
Loading