Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdks/actor/runtime): include url for debugging actor in internal error metadata #1952

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/actor/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
"@rivet-gg/actor-protocol": "workspace:*",
"@rivet-gg/manager-protocol": "workspace:*",
"@types/react": "^19.0.4",
"tsup": "^8.3.5",
"tsup": "^8.3.6",
"typescript": "^5.7.3"
}
}
4 changes: 2 additions & 2 deletions sdks/actor/client/src/handle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ export class ActorHandleRaw {
public connect() {
this.#disconnected = false;

let url = `${this.endpoint}/v1/connect?format=${this.protocolFormat}`;
let url = `${this.endpoint}/connect?format=${this.protocolFormat}`;

if (this.parameters !== undefined) {
const paramsStr = JSON.stringify(this.parameters);
Expand Down Expand Up @@ -196,7 +196,7 @@ export class ActorHandleRaw {
};
ws.onerror = (event) => {
if (this.#disconnected) return;
logger().debug("socket error", { event });
logger().warn("socket error", { event });
};
ws.onmessage = async (ev) => {
const response = (await this.#parse(
Expand Down
2 changes: 1 addition & 1 deletion sdks/actor/common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
"zod": "^3.24.1"
},
"devDependencies": {
"tsup": "^8.3.5",
"tsup": "^8.3.6",
"typescript": "^5.7.3"
}
}
2 changes: 1 addition & 1 deletion sdks/actor/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"zod": "^3.24.1"
},
"devDependencies": {
"tsup": "^8.3.5",
"tsup": "^8.3.6",
"typescript": "^5.7.3"
}
}
2 changes: 1 addition & 1 deletion sdks/actor/manager-protocol/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
},
"devDependencies": {
"@rivet-gg/actor-common": "workspace:*",
"tsup": "^8.3.5",
"tsup": "^8.3.6",
"typescript": "^5.7.3"
}
}
2 changes: 1 addition & 1 deletion sdks/actor/protocol/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
"zod": "^3.24.1"
},
"devDependencies": {
"tsup": "^8.3.5",
"tsup": "^8.3.6",
"typescript": "^5.7.3"
}
}
3 changes: 2 additions & 1 deletion sdks/actor/runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@
"@rivet-gg/actor-core": "workspace:*",
"@rivet-gg/actor-protocol": "workspace:*",
"@types/deno": "^2.0.0",
"tsup": "^8.3.5",
"@types/ws": "^8",
"tsup": "^8.3.6",
"typescript": "^5.7.3"
},
"deno": {
Expand Down
100 changes: 63 additions & 37 deletions sdks/actor/runtime/src/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { handleMessageEvent } from "./event";
import { ActorInspection } from "./inspect";
import type { Kv } from "./kv";
import { instanceLogger, logger } from "./log";
import type { Rpc } from "./rpc";
import { Rpc } from "./rpc";
import { Lock, deadline } from "./utils";

const KEYS = {
Expand Down Expand Up @@ -148,7 +148,7 @@ export abstract class Actor<
#lastSaveTime = 0;
#pendingSaveTimeout?: number | NodeJS.Timeout;

#inspection: ActorInspection<this>;
#inspection!: ActorInspection<this>;

/**
* This constructor should never be used directly.
Expand All @@ -159,19 +159,6 @@ export abstract class Actor<
*/
public constructor(config?: Partial<ActorConfig>) {
this.#config = mergeActorConfig(config);
this.#inspection = new ActorInspection(this.#config, {
state: () => ({
enabled: this.#stateEnabled,
state: this.#stateProxy,
}),
connections: () => this.#connections.values(),
rpcs: () => this.#rpcNames,
setState: (state) => {
this.#validateStateEnabled();
this.#setStateWithoutChange(state);
},
onRpcCall: (ctx, rpc, args) => this.#executeRpc(ctx, rpc, args),
});
}

/**
Expand All @@ -181,17 +168,36 @@ export abstract class Actor<
*
* @param ctx - The actor context.
*/
public static start(ctx: ActorContext): Promise<void> {
public static async start(ctx: ActorContext): Promise<void> {
setupLogging();

// biome-ignore lint/complexity/noThisInStatic lint/suspicious/noExplicitAny: Needs to construct self
const instance = new (this as any)() as Actor;
return instance.#run(ctx);
return await instance.#run(ctx);
}

async #run(ctx: ActorContext) {
this.#ctx = ctx;

// Create inspector after receiving `ActorContext`
this.#inspection = new ActorInspection(
this.#config,
this.#ctx.metadata,
{
state: () => ({
enabled: this.#stateEnabled,
state: this.#stateProxy,
}),
connections: () => this.#connections.values(),
rpcs: () => this.#rpcNames,
setState: (state) => {
this.#validateStateEnabled();
this.#setStateWithoutChange(state);
},
onRpcCall: (ctx, rpc, args) => this.#executeRpc(ctx, rpc, args),
},
);

// Run server immediately since init might take a few ms
this.#runServer();

Expand Down Expand Up @@ -318,7 +324,9 @@ export abstract class Actor<
try {
this._onStateChange(this.#stateRaw);
} catch (error) {
logger().error("error in `_onStateChange`", { error });
logger().error("error in `_onStateChange`", {
error: `${error}`,
});
}
}

Expand Down Expand Up @@ -513,6 +521,9 @@ export abstract class Actor<
status = 500;
code = errors.INTERNAL_ERROR_CODE;
message = errors.INTERNAL_ERROR_DESCRIPTION;
metadata = {
url: `https://hub.rivet.gg/projects/${this.#ctx.metadata.project.slug}/environments/${this.#ctx.metadata.environment.slug}/actors?actorId=${this.#ctx.metadata.actor.id}`,
} satisfies errors.InternalErrorMetadata;
}

return c.json(
Expand Down Expand Up @@ -566,7 +577,9 @@ export abstract class Actor<
? JSON.parse(paramsStr)
: undefined;
} catch (error) {
logger().warn("malformed connection parameters", { error });
logger().warn("malformed connection parameters", {
error: `${error}`,
});
throw new errors.MalformedConnectionParameters(error);
}

Expand Down Expand Up @@ -707,31 +720,42 @@ export abstract class Actor<
return;
}

await handleMessageEvent(evt, conn, this.#config, {
onExecuteRpc: async (ctx, name, args) => {
return await this.#executeRpc(ctx, name, args);
},
onSubscribe: async (eventName, conn) => {
this.#addSubscription(eventName, conn);
},
onUnsubscribe: async (eventName, conn) => {
this.#removeSubscription(eventName, conn);
},
onError: (error) => {
logger().warn("connection internal error", {
rpc: error.rpcRequestId,
error,
});
await handleMessageEvent(
evt,
this.#ctx.metadata,
conn,
this.#config,
{
onExecuteRpc: async (ctx, name, args) => {
return await this.#executeRpc(ctx, name, args);
},
onSubscribe: async (eventName, conn) => {
this.#addSubscription(eventName, conn);
},
onUnsubscribe: async (eventName, conn) => {
this.#removeSubscription(eventName, conn);
},
onError: (error) => {
const message = error.internal
? "internal error"
: "user error";
logger().warn(message, {
connectionId: conn?.id,
rpcRequestId: error.rpcRequestId,
rpcName: error.rpcName,
error,
});
},
},
});
);
},
onClose: () => {
this.#removeConnection(conn);
},
onError: (error) => {
// Actors don't need to know about this, since it's abstracted
// away
logger().warn("websocket error", { error });
logger().warn("websocket error", { error: `${error}` });
},
};
}
Expand Down Expand Up @@ -1005,7 +1029,9 @@ export abstract class Actor<
logger().debug("background promise complete");
})
.catch((error) => {
logger().error("background promise failed", { error });
logger().error("background promise failed", {
error: `${error}`,
});
});
this.#backgroundPromises.push(nonfailablePromise);
}
Expand Down
3 changes: 3 additions & 0 deletions sdks/actor/runtime/src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
export const INTERNAL_ERROR_CODE = "internal_error";
export const INTERNAL_ERROR_DESCRIPTION =
"Internal error. Read the actor logs for more details.";
export interface InternalErrorMetadata {
url: string;
}

export const USER_ERROR_CODE = "user_error";

Expand Down
9 changes: 9 additions & 0 deletions sdks/actor/runtime/src/event.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type * as wsToClient from "@rivet-gg/actor-protocol/ws/to_client";

Check failure on line 1 in sdks/actor/runtime/src/event.ts

View workflow job for this annotation

GitHub Actions / quality

organizeImports

Import statements could be sorted:

Check failure on line 1 in sdks/actor/runtime/src/event.ts

View workflow job for this annotation

GitHub Actions / quality

organizeImports

Import statements could be sorted:

Check failure on line 1 in sdks/actor/runtime/src/event.ts

View workflow job for this annotation

GitHub Actions / quality

organizeImports

Import statements could be sorted:

Check failure on line 1 in sdks/actor/runtime/src/event.ts

View workflow job for this annotation

GitHub Actions / quality

organizeImports

Import statements could be sorted:
import * as wsToServer from "@rivet-gg/actor-protocol/ws/to_server";
import type { WSMessageReceive } from "hono/ws";
import type { AnyActor } from "./actor";
Expand All @@ -6,6 +6,7 @@
import * as errors from "./errors";
import { Rpc } from "./rpc";
import { assertUnreachable } from "./utils";
import type { Metadata } from "@rivet-gg/actor-core";

interface MessageEventConfig {
protocol: { maxIncomingMessageSize: number };
Expand Down Expand Up @@ -52,6 +53,7 @@

export async function handleMessageEvent<A extends AnyActor>(
event: MessageEvent<WSMessageReceive>,
actorMetadata: Metadata,
conn: Connection<A>,
config: MessageEventConfig,
handlers: {
Expand All @@ -70,11 +72,13 @@
message: string;
metadata: unknown;
rpcRequestId?: number;
rpcName?: string;
internal: boolean;
}) => void;
},
) {
let rpcRequestId: number | undefined;
let rpcName: string | undefined;
const message = await validateMessageEvent(event, conn, config);

try {
Expand All @@ -88,6 +92,7 @@
const { i: id, n: name, a: args = [] } = message.body.rr;

rpcRequestId = id;
rpcName = name;

const ctx = new Rpc<A>(conn);
const output = await handlers.onExecuteRpc(ctx, name, args);
Expand Down Expand Up @@ -135,6 +140,9 @@
} else {
code = errors.INTERNAL_ERROR_CODE;
message = errors.INTERNAL_ERROR_DESCRIPTION;
metadata = {
url: `https://hub.rivet.gg/projects/${actorMetadata.project.slug}/environments/${actorMetadata.environment.slug}/actors?actorId=${actorMetadata.actor.id}`,
} satisfies errors.InternalErrorMetadata;
internal = true;
}

Expand Down Expand Up @@ -171,6 +179,7 @@
message,
metadata,
rpcRequestId,
rpcName,
internal,
});
}
Expand Down
48 changes: 31 additions & 17 deletions sdks/actor/runtime/src/inspect.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { safeStringify } from "@rivet-gg/actor-common/utils";
import type { Metadata } from "@rivet-gg/actor-core";
import type * as wsToClient from "@rivet-gg/actor-protocol/ws/to_client";
import type { Context } from "hono";
import type { WSEvents } from "hono/ws";
Expand Down Expand Up @@ -66,10 +67,17 @@ export class ActorInspection<A extends AnyActor> {

readonly #config: ActorConfig;

readonly #metadata: Metadata;

readonly #logger = inspectLogger();

constructor(config: ActorConfig, proxy: InspectionAccessProxy<A>) {
constructor(
config: ActorConfig,
metadata: Metadata,
proxy: InspectionAccessProxy<A>,
) {
this.#config = mergeActorConfig(config);
this.#metadata = metadata;
this.#proxy = proxy;
}

Expand Down Expand Up @@ -109,23 +117,29 @@ export class ActorInspection<A extends AnyActor> {
return;
}

await handleMessageEvent(evt, connection, this.#config, {
onExecuteRpc: async (ctx, name, args) => {
return await this.#executeRpc(ctx, name, args);
},
onSubscribe: async () => {
// we do not support granular subscriptions
},
onUnsubscribe: async () => {
// we do not support granular subscriptions
await handleMessageEvent(
evt,
this.#metadata,
connection,
this.#config,
{
onExecuteRpc: async (ctx, name, args) => {
return await this.#executeRpc(ctx, name, args);
},
onSubscribe: async () => {
// we do not support granular subscriptions
},
onUnsubscribe: async () => {
// we do not support granular subscriptions
},
onError: (error) => {
this.#logger.warn("connection error", {
rpc: error.rpcRequestId,
error,
});
},
},
onError: (error) => {
this.#logger.warn("connection error", {
rpc: error.rpcRequestId,
error,
});
},
});
);
},
onClose: () => {
if (!connection) {
Expand Down
Loading