diff --git a/CHANGES.md b/CHANGES.md index 261a81d..eff9f02 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -62,6 +62,12 @@ To be released. - `Context.sendActivity()` and `InboxContext.forwardActivity()` methods now reject when they fail to enqueue the task. [[#192]] + - Fedify now allows you to manually route an `Activity` to the corresponding + inbox listener. [[#193]] + + - Added `Context.routeActivity()` method. + - Added `RouteActivityOptions` interface. + - `Object.toJsonLd()` without any `format` option now returns its original JSON-LD object even if it not created from `Object.fromJsonLd()` but it is returned from another `Object`'s `get*()` method. @@ -111,6 +117,7 @@ To be released. [#183]: https://github.com/dahlia/fedify/pull/183 [#186]: https://github.com/dahlia/fedify/pull/186 [#192]: https://github.com/dahlia/fedify/issues/192 +[#193]: https://github.com/dahlia/fedify/issues/193 Version 1.2.8 diff --git a/docs/manual/inbox.md b/docs/manual/inbox.md index bdfd436..c8de8c0 100644 --- a/docs/manual/inbox.md +++ b/docs/manual/inbox.md @@ -483,3 +483,69 @@ const ctx = null as unknown as Context; // ---cut-before--- ctx.getInboxUri() ~~~~ + + +Manual routing +-------------- + +*This API is available since Fedify 1.3.0.* + +If you want to manually route an activity to the appropriate inbox listener +with no actual HTTP request, you can use the `Context.routeActivity()` method. +The method takes an identifier of the recipient (or `null` for the shared inbox) +and an `Activity` object to route. The point of this method is that it verifies +if the `Activity` object is made by the its actor, and unless it is, the method +silently ignores the activity. + +The following code shows how to route an `Activity` object enclosed in +top-level `Announce` object to the corresponding inbox listener: + +~~~~ typescript twoslash +import { Activity, Announce, type Federation } from "@fedify/fedify"; + +const federation = null as unknown as Federation; + +federation + .setInboxListeners("/users/{identifier}/inbox", "/inbox") +// ---cut-before--- + .on(Announce, async (ctx, announce) => { + // Get an object enclosed in the `Announce` object: + const object = await announce.getObject(); + if (object instanceof Activity) { + // Route the activity to the appropriate inbox listener (shared inbox): + await ctx.routeActivity(ctx.recipient, object); + } + }) +~~~~ + +As another example, the following code shows how to invoke the corresponding +inbox listeners for a remote actor's activities: + +~~~~ typescript twoslash +import { Activity, type Context, isActor } from "@fedify/fedify"; + +async function main(context: Context) { +// ---cut-before--- +const actor = await context.lookupObject("@hongminhee@fosstodon.org"); +if (!isActor(actor)) return; +const collection = await actor.getOutbox(); +if (collection == null) return; +for await (const item of context.traverseCollection(collection)) { + if (item instanceof Activity) { + await context.routeActivity(null, item); + } +} +// ---cut-after--- +} +~~~~ + +> [!TIP] +> The `Context.routeActivity()` method trusts the `Activity` object only if +> one of the following conditions is met: +> +> - The `Activity` has its Object Integrity Proofs and the proofs are signed +> by its actor. +> +> - The `Activity` is dereferenceable by its `~Object.id` and +> the dereferenced object has an actor that belongs to the same origin +> as the `Activity` object. diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index e68afed..5e5ec6f 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -129,6 +129,7 @@ spans: | `activitypub.fetch_key` | Client | Fetches the public keys for the actor. | | `activitypub.get_actor_handle` | Client | Resolves the actor handle. | | `activitypub.inbox` | Consumer | Dequeues the ActivityPub activity to receive. | +| `activitypub.inbox` | Internal | Manually routes the ActivityPub activity. | | `activitypub.inbox` | Producer | Enqueues the ActivityPub activity to receive. | | `activitypub.inbox` | Server | Receives the ActivityPub activity. | | `activitypub.lookup_object` | Client | Looks up the Activity Streams object. | diff --git a/src/federation/context.ts b/src/federation/context.ts index e290edc..cdf1aa2 100644 --- a/src/federation/context.ts +++ b/src/federation/context.ts @@ -315,6 +315,32 @@ export interface Context { activity: Activity, options?: SendActivityOptions, ): Promise; + + /** + * Manually routes an activity to the appropriate inbox listener. + * + * It is useful for routing an activity that is not received from the network, + * or for routing an activity that is enclosed in another activity. + * + * Note that the activity will be verified if it has Object Integrity Proofs + * or is equivalent to the actual remote object. If the activity is not + * verified, it will be rejected. + * @param recipient The recipient of the activity. If it is `null`, + * the activity will be routed to the shared inbox. + * Otherwise, the activity will be routed to the personal + * inbox of the recipient with the given identifier. + * @param activity The activity to route. It must have a proof or + * a dereferenceable `id` to verify the activity. + * @param options Options for routing the activity. + * @returns `true` if the activity is successfully verified and routed. + * Otherwise, `false`. + * @since 1.3.0 + */ + routeActivity( + recipient: string | null, + activity: Activity, + options?: RouteActivityOptions, + ): Promise; } /** @@ -579,6 +605,36 @@ export interface ForwardActivityOptions extends SendActivityOptions { skipIfUnsigned: boolean; } +/** + * Options for {@link Context.routeActivity} method. + * @since 1.3.0 + */ +export interface RouteActivityOptions { + /** + * Whether to skip enqueuing the activity and invoke the listener immediately. + * If no inbox queue is available, this option is ignored and the activity + * will be always invoked immediately. + * @default false + */ + immediate?: boolean; + + /** + * The document loader for loading remote JSON-LD documents. + */ + documentLoader?: DocumentLoader; + + /** + * The context loader for loading remote JSON-LD contexts. + */ + contextLoader?: DocumentLoader; + + /** + * The OpenTelemetry tracer provider. If omitted, the global tracer provider + * is used. + */ + tracerProvider?: TracerProvider; +} + /** * A pair of a public key and a private key in various formats. * @since 0.10.0 diff --git a/src/federation/handler.test.ts b/src/federation/handler.test.ts index e29b0c7..2e67152 100644 --- a/src/federation/handler.test.ts +++ b/src/federation/handler.test.ts @@ -1166,7 +1166,7 @@ test("handleInbox()", async () => { ...inboxOptions, }); assertEquals(onNotFoundCalled, null); - assertEquals(response.status, 202); + assertEquals([response.status, await response.text()], [202, ""]); response = await handleInbox(signedRequest, { recipient: "someone", diff --git a/src/federation/handler.ts b/src/federation/handler.ts index 1721854..d055657 100644 --- a/src/federation/handler.ts +++ b/src/federation/handler.ts @@ -1,12 +1,6 @@ import { getLogger } from "@logtape/logtape"; import type { Span, TracerProvider } from "@opentelemetry/api"; -import { - context, - propagation, - SpanKind, - SpanStatusCode, - trace, -} from "@opentelemetry/api"; +import { SpanKind, SpanStatusCode, trace } from "@opentelemetry/api"; import { accepts } from "@std/http/negotiation"; import metadata from "../deno.json" with { type: "json" }; import type { DocumentLoader } from "../runtime/docloader.ts"; @@ -35,11 +29,10 @@ import type { ObjectDispatcher, } from "./callback.ts"; import type { Context, InboxContext, RequestContext } from "./context.ts"; -import type { InboxListenerSet } from "./inbox.ts"; +import { type InboxListenerSet, routeActivity } from "./inbox.ts"; import { KvKeyCache } from "./keycache.ts"; import type { KvKey, KvStore } from "./kv.ts"; import type { MessageQueue } from "./mq.ts"; -import type { InboxMessage } from "./queue.ts"; export function acceptsJsonLd(request: Request): boolean { const types = accepts(request); @@ -641,39 +634,20 @@ async function handleInboxInternal( span.setAttribute("activitypub.activity.id", activity.id.href); } span.setAttribute("activitypub.activity.type", getTypeId(activity).href); - const cacheKey = activity.id == null - ? null - : [...kvPrefixes.activityIdempotence, activity.id.href] satisfies KvKey; - if (cacheKey != null) { - const cached = await kv.get(cacheKey); - if (cached === true) { - logger.debug("Activity {activityId} has already been processed.", { - activityId: activity.id?.href, - activity: json, - recipient, - }); - span.setStatus({ - code: SpanStatusCode.UNSET, - message: `Activity ${activity.id?.href} has already been processed.`, - }); - return new Response( - `Activity <${activity.id}> has already been processed.`, - { - status: 202, - headers: { "Content-Type": "text/plain; charset=utf-8" }, - }, - ); - } - } - if (activity.actorId == null) { - logger.error("Missing actor.", { activity: json }); - span.setStatus({ code: SpanStatusCode.ERROR, message: "Missing actor." }); - return new Response("Missing actor.", { - status: 400, - headers: { "Content-Type": "text/plain; charset=utf-8" }, - }); - } - span.setAttribute("activitypub.actor.id", activity.actorId.href); + const routeResult = await routeActivity({ + context: ctx, + json, + activity, + recipient, + inboxListeners, + inboxContextFactory, + inboxErrorHandler, + kv, + kvPrefixes, + queue, + span, + tracerProvider, + }); if ( httpSigKey != null && !await doesActorOwnKey(activity, httpSigKey, ctx) ) { @@ -683,138 +657,53 @@ async function handleInboxInternal( activity: json, recipient, keyId: httpSigKey.id?.href, - actorId: activity.actorId.href, + actorId: activity.actorId?.href, }, ); span.setStatus({ code: SpanStatusCode.ERROR, message: `The signer (${httpSigKey.id?.href}) and ` + - `the actor (${activity.actorId.href}) do not match.`, + `the actor (${activity.actorId?.href}) do not match.`, }); return new Response("The signer and the actor do not match.", { status: 401, headers: { "Content-Type": "text/plain; charset=utf-8" }, }); } - if (queue != null) { - const carrier: Record = {}; - propagation.inject(context.active(), carrier); - try { - await queue.enqueue( - { - type: "inbox", - id: crypto.randomUUID(), - baseUrl: request.url, - activity: json, - identifier: recipient, - attempt: 0, - started: new Date().toISOString(), - traceContext: carrier, - } satisfies InboxMessage, - ); - } catch (error) { - logger.error( - "Failed to enqueue the incoming activity {activityId}:\n{error}", - { error, activityId: activity.id?.href, activity: json, recipient }, - ); - span.setStatus({ - code: SpanStatusCode.ERROR, - message: - `Failed to enqueue the incoming activity ${activity.id?.href}.`, - }); - throw error; - } - logger.info( - "Activity {activityId} is enqueued.", - { activityId: activity.id?.href, activity: json, recipient }, + if (routeResult === "alreadyProcessed") { + return new Response( + `Activity <${activity.id}> has already been processed.`, + { + status: 202, + headers: { "Content-Type": "text/plain; charset=utf-8" }, + }, ); + } else if (routeResult === "missingActor") { + return new Response("Missing actor.", { + status: 400, + headers: { "Content-Type": "text/plain; charset=utf-8" }, + }); + } else if (routeResult === "enqueued") { return new Response("Activity is enqueued.", { status: 202, headers: { "Content-Type": "text/plain; charset=utf-8" }, }); + } else if (routeResult === "unsupportedActivity") { + return new Response("", { + status: 202, + headers: { "Content-Type": "text/plain; charset=utf-8" }, + }); + } else if (routeResult === "error") { + return new Response("Internal server error.", { + status: 500, + headers: { "Content-Type": "text/plain; charset=utf-8" }, + }); + } else { + return new Response("", { + status: 202, + headers: { "Content-Type": "text/plain; charset=utf-8" }, + }); } - tracerProvider = tracerProvider ?? trace.getTracerProvider(); - const tracer = tracerProvider.getTracer(metadata.name, metadata.version); - const response = await tracer.startActiveSpan( - "activitypub.dispatch_inbox_listener", - { kind: SpanKind.INTERNAL }, - async (span) => { - const dispatched = inboxListeners?.dispatchWithClass(activity!); - if (dispatched == null) { - logger.error( - "Unsupported activity type:\n{activity}", - { activity: json, recipient }, - ); - span.setStatus({ - code: SpanStatusCode.UNSET, - message: `Unsupported activity type: ${getTypeId(activity!).href}`, - }); - span.end(); - return new Response("", { - status: 202, - headers: { "Content-Type": "text/plain; charset=utf-8" }, - }); - } - const { class: cls, listener } = dispatched; - span.updateName(`activitypub.dispatch_inbox_listener ${cls.name}`); - try { - await listener( - inboxContextFactory( - recipient, - json, - activity?.id?.href, - getTypeId(activity!).href, - ), - activity!, - ); - } catch (error) { - try { - await inboxErrorHandler?.(ctx, error as Error); - } catch (error) { - logger.error( - "An unexpected error occurred in inbox error handler:\n{error}", - { - error, - activityId: activity!.id?.href, - activity: json, - recipient, - }, - ); - } - logger.error( - "Failed to process the incoming activity {activityId}:\n{error}", - { - error, - activityId: activity!.id?.href, - activity: json, - recipient, - }, - ); - span.setStatus({ code: SpanStatusCode.ERROR, message: String(error) }); - span.end(); - return new Response("Internal server error.", { - status: 500, - headers: { "Content-Type": "text/plain; charset=utf-8" }, - }); - } - if (cacheKey != null) { - await kv.set(cacheKey, true, { - ttl: Temporal.Duration.from({ days: 1 }), - }); - } - logger.info( - "Activity {activityId} has been processed.", - { activityId: activity!.id?.href, activity: json, recipient }, - ); - span.end(); - return new Response("", { - status: 202, - headers: { "Content-Type": "text/plain; charset=utf-8" }, - }); - }, - ); - if (response.status >= 500) span.setStatus({ code: SpanStatusCode.ERROR }); - return response; } /** diff --git a/src/federation/inbox.ts b/src/federation/inbox.ts index c2dd2bf..4856d6e 100644 --- a/src/federation/inbox.ts +++ b/src/federation/inbox.ts @@ -1,5 +1,21 @@ +import { getLogger } from "@logtape/logtape"; +import { + context, + propagation, + type Span, + SpanKind, + SpanStatusCode, + trace, + type TracerProvider, +} from "@opentelemetry/api"; +import metadata from "../deno.json" with { type: "json" }; +import { getTypeId } from "../vocab/type.ts"; import { Activity } from "../vocab/vocab.ts"; -import type { InboxListener } from "./callback.ts"; +import type { InboxErrorHandler, InboxListener } from "./callback.ts"; +import type { Context, InboxContext } from "./context.ts"; +import type { KvKey, KvStore } from "./kv.ts"; +import type { MessageQueue } from "./mq.ts"; +import type { InboxMessage } from "./queue.ts"; export class InboxListenerSet { #listeners: Map< @@ -55,3 +71,180 @@ export class InboxListenerSet { return this.dispatchWithClass(activity)?.listener ?? null; } } + +export interface RouteActivityParameters { + context: Context; + json: unknown; + activity: Activity; + recipient: string | null; + inboxListeners?: InboxListenerSet; + inboxContextFactory( + recipient: string | null, + activity: unknown, + activityId: string | undefined, + activityType: string, + ): InboxContext; + inboxErrorHandler?: InboxErrorHandler; + kv: KvStore; + kvPrefixes: { activityIdempotence: KvKey }; + queue?: MessageQueue; + span: Span; + tracerProvider?: TracerProvider; +} + +export type RouteActivityResult = + | "alreadyProcessed" + | "missingActor" + | "enqueued" + | "unsupportedActivity" + | "error" + | "success"; + +export async function routeActivity( + { + context: ctx, + json, + activity, + recipient, + inboxListeners, + inboxContextFactory, + inboxErrorHandler, + kv, + kvPrefixes, + queue, + span, + tracerProvider, + }: RouteActivityParameters, +): Promise { + const logger = getLogger(["fedify", "federation", "inbox"]); + const cacheKey = activity.id == null ? null : [ + ...kvPrefixes.activityIdempotence, + activity.id.href, + ] satisfies KvKey; + if (cacheKey != null) { + const cached = await kv.get(cacheKey); + if (cached === true) { + logger.debug("Activity {activityId} has already been processed.", { + activityId: activity.id?.href, + activity: json, + recipient, + }); + span.setStatus({ + code: SpanStatusCode.UNSET, + message: `Activity ${activity.id?.href} has already been processed.`, + }); + return "alreadyProcessed"; + } + } + if (activity.actorId == null) { + logger.error("Missing actor.", { activity: json }); + span.setStatus({ code: SpanStatusCode.ERROR, message: "Missing actor." }); + return "missingActor"; + } + span.setAttribute("activitypub.actor.id", activity.actorId.href); + if (queue != null) { + const carrier: Record = {}; + propagation.inject(context.active(), carrier); + try { + await queue.enqueue( + { + type: "inbox", + id: crypto.randomUUID(), + baseUrl: ctx.origin, + activity: json, + identifier: recipient, + attempt: 0, + started: new Date().toISOString(), + traceContext: carrier, + } satisfies InboxMessage, + ); + } catch (error) { + logger.error( + "Failed to enqueue the incoming activity {activityId}:\n{error}", + { error, activityId: activity.id?.href, activity: json, recipient }, + ); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: + `Failed to enqueue the incoming activity ${activity.id?.href}.`, + }); + throw error; + } + logger.info( + "Activity {activityId} is enqueued.", + { activityId: activity.id?.href, activity: json, recipient }, + ); + return "enqueued"; + } + tracerProvider = tracerProvider ?? trace.getTracerProvider(); + const tracer = tracerProvider.getTracer(metadata.name, metadata.version); + return await tracer.startActiveSpan( + "activitypub.dispatch_inbox_listener", + { kind: SpanKind.INTERNAL }, + async (span) => { + const dispatched = inboxListeners?.dispatchWithClass(activity!); + if (dispatched == null) { + logger.error( + "Unsupported activity type:\n{activity}", + { activity: json, recipient }, + ); + span.setStatus({ + code: SpanStatusCode.UNSET, + message: `Unsupported activity type: ${getTypeId(activity!).href}`, + }); + span.end(); + return "unsupportedActivity"; + } + const { class: cls, listener } = dispatched; + span.updateName(`activitypub.dispatch_inbox_listener ${cls.name}`); + try { + await listener( + inboxContextFactory( + recipient, + json, + activity?.id?.href, + getTypeId(activity!).href, + ), + activity!, + ); + } catch (error) { + try { + await inboxErrorHandler?.(ctx, error as Error); + } catch (error) { + logger.error( + "An unexpected error occurred in inbox error handler:\n{error}", + { + error, + activityId: activity!.id?.href, + activity: json, + recipient, + }, + ); + } + logger.error( + "Failed to process the incoming activity {activityId}:\n{error}", + { + error, + activityId: activity!.id?.href, + activity: json, + recipient, + }, + ); + span.setStatus({ code: SpanStatusCode.ERROR, message: String(error) }); + span.end(); + return "error"; + } + if (cacheKey != null) { + await kv.set(cacheKey, true, { + ttl: Temporal.Duration.from({ days: 1 }), + }); + } + logger.info( + "Activity {activityId} has been processed.", + { activityId: activity!.id?.href, activity: json, recipient }, + ); + span.end(); + return "success"; + }, + ); +} diff --git a/src/federation/middleware.test.ts b/src/federation/middleware.test.ts index d4c8212..39c2d14 100644 --- a/src/federation/middleware.test.ts +++ b/src/federation/middleware.test.ts @@ -1,5 +1,8 @@ +import { Invite } from "@fedify/fedify"; import { + assert, assertEquals, + assertFalse, assertInstanceOf, assertRejects, assertStrictEquals, @@ -32,11 +35,13 @@ import { lookupObject } from "../vocab/lookup.ts"; import { getTypeId } from "../vocab/type.ts"; import { Activity, + Announce, Create, type CryptographicKey, Multikey, Note, Object, + Offer, Person, } from "../vocab/vocab.ts"; import type { Context } from "./context.ts"; @@ -1176,6 +1181,159 @@ test("ContextImpl.sendActivity()", async (t) => { }); }); +test("ContextImpl.routeActivity()", async () => { + const federation = new FederationImpl({ + kv: new MemoryKvStore(), + }); + + const activities: [string | null, Activity][] = []; + federation + .setInboxListeners("/u/{identifier}/i", "/i") + .on(Offer, (ctx, offer) => { + activities.push([ctx.recipient, offer]); + }); + + const ctx = new ContextImpl({ + url: new URL("https://example.com/"), + federation, + data: undefined, + documentLoader: mockDocumentLoader, + }); + + // Unsigned & non-dereferenceable activity + assertFalse( + await ctx.routeActivity( + null, + new Offer({ + actor: new URL("https://example.com/person"), + }), + ), + ); + assertEquals(activities, []); + + // Signed activity without recipient (shared inbox) + const signedOffer = await signObject( + new Offer({ + actor: new URL("https://example.com/person2"), + }), + ed25519PrivateKey, + ed25519Multikey.id!, + ); + assert(await ctx.routeActivity(null, signedOffer)); + assertEquals(activities, [[null, signedOffer]]); + + // Signed activity with recipient (personal inbox) + const signedInvite = await signObject( + new Invite({ + actor: new URL("https://example.com/person2"), + }), + ed25519PrivateKey, + ed25519Multikey.id!, + ); + assert(await ctx.routeActivity("id", signedInvite)); + assertEquals(activities, [[null, signedOffer], ["id", signedInvite]]); + + // Unsigned activity dereferenced to 404 + assertFalse( + await ctx.routeActivity( + null, + new Create({ + id: new URL("https://example.com/not-found"), + actor: new URL("https://example.com/person"), + }), + ), + ); + assertEquals(activities, [[null, signedOffer], ["id", signedInvite]]); + + // Unsigned activity dereferenced to 200, but not an Activity + assertFalse( + await ctx.routeActivity( + null, + new Create({ + id: new URL("https://example.com/person"), + actor: new URL("https://example.com/person"), + }), + ), + ); + assertEquals(activities, [[null, signedOffer], ["id", signedInvite]]); + + // Unsigned activity dereferenced to 200, but has a different id + assertFalse( + await ctx.routeActivity( + null, + new Announce({ + id: new URL("https://example.com/announce#diffrent-id"), + actor: new URL("https://example.com/person"), + }), + ), + ); + assertEquals(activities, [[null, signedOffer], ["id", signedInvite]]); + + // Unsigned activity dereferenced to 200, but has no actor + assertFalse( + await ctx.routeActivity( + null, + new Announce({ + id: new URL("https://example.com/announce"), + // Although the actor is set here, the fetched document has no actor. + // See also src/testing/fixtures/example.com/announce + actor: new URL("https://example.com/person"), + }), + ), + ); + assertEquals(activities, [[null, signedOffer], ["id", signedInvite]]); + + // Unsigned activity dereferenced to 200, but actor is cross-origin + assertFalse( + await ctx.routeActivity( + null, + new Create({ + id: new URL("https://example.com/cross-origin-actor"), + actor: new URL("https://cross-origin.com/actor"), + }), + ), + ); + assertEquals(activities, [[null, signedOffer], ["id", signedInvite]]); + + // Unsigned activity dereferenced to 200, but no inbox listener corresponds + assert( + await ctx.routeActivity( + null, + new Create({ + id: new URL("https://example.com/create"), + actor: new URL("https://example.com/person"), + }), + ), + ); + assertEquals(activities, [[null, signedOffer], ["id", signedInvite]]); + + // Unsigned activity dereferenced to 200 + assert( + await ctx.routeActivity( + null, + new Invite({ + id: new URL("https://example.com/invite"), + actor: new URL("https://example.com/person"), + }), + ), + ); + assertEquals( + activities, + [ + [null, signedOffer], + ["id", signedInvite], + [ + null, + new Invite({ + id: new URL("https://example.com/invite"), + actor: new URL("https://example.com/person"), + object: new URL("https://example.com/object"), + }), + ], + ], + ); +}); + test("InboxContextImpl.forwardActivity()", async (t) => { mf.install(); diff --git a/src/federation/middleware.ts b/src/federation/middleware.ts index bf9d0dd..6fa90c5 100644 --- a/src/federation/middleware.ts +++ b/src/federation/middleware.ts @@ -1,3 +1,4 @@ +import { verifyObject } from "@fedify/fedify"; import { getLogger, withContext } from "@logtape/logtape"; import { context, @@ -74,6 +75,7 @@ import type { InboxContext, ParseUriResult, RequestContext, + RouteActivityOptions, SendActivityOptions, } from "./context.ts"; import type { @@ -92,7 +94,8 @@ import { handleInbox, handleObject, } from "./handler.ts"; -import { InboxListenerSet } from "./inbox.ts"; +import { InboxListenerSet, routeActivity } from "./inbox.ts"; +import { KvKeyCache } from "./keycache.ts"; import type { KvKey, KvStore } from "./kv.ts"; import type { MessageQueue } from "./mq.ts"; import type { @@ -3076,6 +3079,179 @@ export class ContextImpl implements Context { cursor = result.nextCursor ?? null; } } + + routeActivity( + recipient: string | null, + activity: Activity, + options: RouteActivityOptions = {}, + ): Promise { + const tracerProvider = this.tracerProvider ?? this.tracerProvider; + const tracer = tracerProvider.getTracer(metadata.name, metadata.version); + return tracer.startActiveSpan( + "activitypub.inbox", + { + kind: this.federation.inboxQueue == null || options.immediate + ? SpanKind.INTERNAL + : SpanKind.PRODUCER, + attributes: { + "activitypub.activity.type": getTypeId(activity).href, + }, + }, + async (span) => { + if (activity.id != null) { + span.setAttribute("activitypub.activity.id", activity.id.href); + } + if (activity.toIds.length > 0) { + span.setAttribute( + "activitypub.activity.to", + activity.toIds.map((to) => to.href), + ); + } + if (activity.ccIds.length > 0) { + span.setAttribute( + "activitypub.activity.cc", + activity.ccIds.map((cc) => cc.href), + ); + } + if (activity.btoIds.length > 0) { + span.setAttribute( + "activitypub.activity.bto", + activity.btoIds.map((bto) => bto.href), + ); + } + if (activity.bccIds.length > 0) { + span.setAttribute( + "activitypub.activity.bcc", + activity.bccIds.map((bcc) => bcc.href), + ); + } + try { + const ok = await this.routeActivityInternal( + recipient, + activity, + options, + span, + ); + if (ok) { + span.setAttribute("activitypub.shared_inbox", recipient == null); + if (recipient != null) { + span.setAttribute("fedify.inbox.recipient", recipient); + } + } else { + span.setStatus({ code: SpanStatusCode.ERROR }); + } + return ok; + } catch (e) { + span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) }); + throw e; + } finally { + span.end(); + } + }, + ); + } + + protected async routeActivityInternal( + recipient: string | null, + activity: Activity, + options: RouteActivityOptions = {}, + span: Span, + ): Promise { + const logger = getLogger(["fedify", "federation", "inbox"]); + const contextLoader = options.contextLoader ?? this.contextLoader; + const json = await activity.toJsonLd({ contextLoader }); + const keyCache = new KvKeyCache( + this.federation.kv, + this.federation.kvPrefixes.publicKey, + this, + ); + const verified = await verifyObject( + Activity, + json, + { + contextLoader, + documentLoader: options.documentLoader ?? this.documentLoader, + tracerProvider: options.tracerProvider ?? this.tracerProvider, + keyCache, + }, + ); + if (verified == null) { + logger.debug( + "Object Integrity Proofs are not verified.", + { recipient, activity: json }, + ); + if (activity.id == null) { + logger.debug( + "Activity is missing an ID; unable to fetch.", + { recipient, activity: json }, + ); + return false; + } + const fetched = await this.lookupObject(activity.id, options); + if (fetched == null) { + logger.debug( + "Failed to fetch the remote activity object {activityId}.", + { recipient, activity: json, activityId: activity.id.href }, + ); + return false; + } else if (!(fetched instanceof Activity)) { + logger.debug( + "Fetched object is not an Activity.", + { recipient, activity: await fetched.toJsonLd({ contextLoader }) }, + ); + return false; + } else if (fetched.id?.href !== activity.id.href) { + logger.debug( + "Fetched activity object has a different ID; failed to verify.", + { recipient, activity: await fetched.toJsonLd({ contextLoader }) }, + ); + return false; + } else if (fetched.actorIds.length < 1) { + logger.debug( + "Fetched activity object is missing an actor; unable to verify.", + { recipient, activity: await fetched.toJsonLd({ contextLoader }) }, + ); + return false; + } + const activityId = fetched.id; + if ( + !fetched.actorIds.every((actor) => actor.origin === activityId.origin) + ) { + logger.debug( + "Fetched activity object has actors from different origins; " + + "unable to verify.", + { recipient, activity: await fetched.toJsonLd({ contextLoader }) }, + ); + return false; + } + logger.debug( + "Successfully fetched the remote activity object {activityId}; " + + "ignore the original activity and use the fetched one, which is trustworthy.", + ); + activity = fetched; + } else { + logger.debug( + "Object Integrity Proofs are verified.", + { recipient, activity: json }, + ); + } + const routeResult = await routeActivity({ + context: this, + json, + activity, + recipient, + inboxListeners: this.federation.inboxListeners, + inboxContextFactory: this.toInboxContext.bind(this), + inboxErrorHandler: this.federation.inboxErrorHandler, + kv: this.federation.kv, + kvPrefixes: this.federation.kvPrefixes, + queue: this.federation.inboxQueue, + span, + tracerProvider: options.tracerProvider ?? this.tracerProvider, + }); + return routeResult === "alreadyProcessed" || routeResult === "enqueued" || + routeResult === "unsupportedActivity" || routeResult === "success"; + } } interface RequestContextOptions diff --git a/src/testing/context.ts b/src/testing/context.ts index e25e8dd..3d1bb07 100644 --- a/src/testing/context.ts +++ b/src/testing/context.ts @@ -34,6 +34,7 @@ export function createContext( lookupObject, traverseCollection, sendActivity, + routeActivity, }: Partial> & { url?: URL; data: TContextData }, ): Context { function throwRouteError(): URL { @@ -84,6 +85,9 @@ export function createContext( sendActivity: sendActivity ?? ((_params) => { throw new Error("Not implemented"); }), + routeActivity: routeActivity ?? ((_params) => { + throw new Error("Not implemented"); + }), }; } diff --git a/src/testing/fixtures/example.com/create b/src/testing/fixtures/example.com/create new file mode 100644 index 0000000..c14023f --- /dev/null +++ b/src/testing/fixtures/example.com/create @@ -0,0 +1,6 @@ +{ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "Create", + "id": "https://example.com/create", + "actor": "https://example.com/person" +} diff --git a/src/testing/fixtures/example.com/cross-origin-actor b/src/testing/fixtures/example.com/cross-origin-actor new file mode 100644 index 0000000..add2c30 --- /dev/null +++ b/src/testing/fixtures/example.com/cross-origin-actor @@ -0,0 +1,6 @@ +{ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "Create", + "id": "https://example.com/cross-origin-actor", + "actor": "https://cross-origin.com/actor" +} diff --git a/src/testing/fixtures/example.com/invite b/src/testing/fixtures/example.com/invite new file mode 100644 index 0000000..6873e2b --- /dev/null +++ b/src/testing/fixtures/example.com/invite @@ -0,0 +1,7 @@ +{ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "Invite", + "id": "https://example.com/invite", + "actor": "https://example.com/person", + "object": "https://example.com/object" +}