diff --git a/packages/transport-http/src/subscribe/handlers/events.ts b/packages/transport-http/src/subscribe/handlers/events.ts new file mode 100644 index 000000000..ca91be04f --- /dev/null +++ b/packages/transport-http/src/subscribe/handlers/events.ts @@ -0,0 +1,104 @@ +import {SdkTransport} from "@onflow/typedefs" +import {createSubscriptionHandler} from "./types" + +type EventsArgs = + SdkTransport.SubscriptionArguments + +type EventsData = + SdkTransport.SubscriptionData + +export type EventsArgsDto = ( + | { + start_block_id: string + } + | { + start_block_height: number + } + | {} +) & { + event_types?: string[] + addresses?: string[] + contracts?: string[] +} + +type EventsDataDto = { + block_id: string + block_height: number + block_timestamp: string + type: string + transaction_id: string + transaction_index: number + event_index: number + payload: string +} + +export const eventsHandler = createSubscriptionHandler<{ + Topic: SdkTransport.SubscriptionTopic.EVENTS + Args: EventsArgs + Data: EventsData + ArgsDto: EventsArgsDto + DataDto: EventsDataDto +}>({ + topic: SdkTransport.SubscriptionTopic.EVENTS, + createSubscriber: (initialArgs, onData, onError) => { + let resumeArgs: EventsArgs = { + ...initialArgs, + } + + return { + onData(rawData: EventsDataDto) { + // Parse the raw data + const result: EventsData = { + event: { + blockId: rawData.block_id, + blockHeight: rawData.block_height, + blockTimestamp: rawData.block_timestamp, + type: rawData.type, + transactionId: rawData.transaction_id, + transactionIndex: rawData.transaction_index, + eventIndex: rawData.event_index, + payload: rawData.payload, + }, + } + + // Update the resume args + resumeArgs = { + ...resumeArgs, + startBlockHeight: result.event.blockHeight + 1, + startBlockId: undefined, + } + + onData(result) + }, + onError(error: Error) { + onError(error) + }, + argsToDto(args: EventsArgs) { + let encodedArgs: EventsArgsDto = { + event_types: args.filter?.eventTypes, + addresses: args.filter?.addresses, + contracts: args.filter?.contracts, + } + + if ("startBlockHeight" in args) { + return { + ...encodedArgs, + start_block_height: args.startBlockHeight, + } + } + + if ("startBlockId" in args) { + return { + ...encodedArgs, + start_block_id: args.startBlockId, + } + } + + return encodedArgs + }, + get connectionArgs() { + return resumeArgs + }, + } + }, +}) diff --git a/packages/transport-http/src/subscribe/subscribe.ts b/packages/transport-http/src/subscribe/subscribe.ts index cebf511bd..18265e894 100644 --- a/packages/transport-http/src/subscribe/subscribe.ts +++ b/packages/transport-http/src/subscribe/subscribe.ts @@ -2,8 +2,13 @@ import {SdkTransport} from "@onflow/typedefs" import {SubscriptionManager} from "./subscription-manager" import {blocksHandler} from "./handlers/blocks" import {blockDigestsHandler} from "./handlers/block-digests" +import {eventsHandler} from "./handlers/events" -const SUBSCRIPTION_HANDLERS = [blocksHandler, blockDigestsHandler] +const SUBSCRIPTION_HANDLERS = [ + blocksHandler, + blockDigestsHandler, + eventsHandler, +] // Map of SubscriptionManager instances by access node URL let subscriptionManagerMap: Map< diff --git a/packages/typedefs/src/sdk-transport/subscriptions.ts b/packages/typedefs/src/sdk-transport/subscriptions.ts index b0ecf2f15..2530c30ba 100644 --- a/packages/typedefs/src/sdk-transport/subscriptions.ts +++ b/packages/typedefs/src/sdk-transport/subscriptions.ts @@ -1,4 +1,4 @@ -import {Block, BlockDigest} from ".." +import {Block, BlockDigest, Event, EventFilter} from ".." export type SubscriptionSchema = { [SubscriptionTopic.BLOCKS]: SchemaItem< @@ -13,11 +13,31 @@ export type SubscriptionSchema = { blockDigest: BlockDigest } > + [SubscriptionTopic.EVENTS]: SchemaItem< + // TODO: We do not know the data model types yet + ( + | { + startBlockId: string + } + | { + startBlockHeight: number + } + | {} + ) & { + filter: EventFilter + }, + { + event: Omit & { + payload: string + } + } + > } export enum SubscriptionTopic { BLOCKS = "blocks", BLOCK_DIGESTS = "block_digests", + EVENTS = "events", } type BlockArgs =