From cabe60609346890e4d2acdda4605d61d6e424634 Mon Sep 17 00:00:00 2001 From: "aleksej.paschenko" Date: Wed, 13 Dec 2023 12:38:34 +0300 Subject: [PATCH] Add methods to stream blocks to Streaming API --- streaming.go | 43 +++++++++++++++++++++++++++++++++++++++++++ websocket.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+) diff --git a/streaming.go b/streaming.go index 9ce9018..0a3c8e3 100644 --- a/streaming.go +++ b/streaming.go @@ -32,6 +32,15 @@ type TraceEventData struct { Hash string `json:"hash"` } +// BlockEventData represents a notification about a new block. +type BlockEventData struct { + Workchain int32 `json:"workchain"` + Shard string `json:"shard"` + Seqno uint32 `json:"seqno"` + RootHash string `json:"root_hash"` + FileHash string `json:"file_hash"` +} + // TraceHandler is a callback that handles a new trace event. type TraceHandler func(TraceEventData) @@ -41,6 +50,9 @@ type TransactionHandler func(data TransactionEventData) // MempoolHandler is a callback that handles a new mempool event. type MempoolHandler func(data MempoolEventData) +// BlockHandler is a callback that handles a new block event. +type BlockHandler func(data BlockEventData) + // StreamingAPI provides a convenient way to receive events happening on the TON blockchain. type StreamingAPI struct { logger Logger @@ -123,12 +135,19 @@ type Websocket interface { // UnsubscribeFromMempool unsubscribes from notifications about new messages in the TON network. UnsubscribeFromMempool() error + // SubscribeToBlocks subscribes to notifications about new blocks in the specified workchain. + // Workchain is optional. If it is nil, all blocks from all workchain will be received. + SubscribeToBlocks(workchain *int) error + // UnsubscribeFromBlocks unsubscribes from notifications about new blocks in the specified workchain. + UnsubscribeFromBlocks() error + // SetMempoolHandler defines a callback that will be called when a new mempool event is received. SetMempoolHandler(handler MempoolHandler) // SetTransactionHandler defines a callback that will be called when a new transaction event is received. SetTransactionHandler(handler TransactionHandler) // SetTraceHandler defines a callback that will be called when a new trace event is received. SetTraceHandler(handler TraceHandler) + SetBlockHandler(handler BlockHandler) } // WebsocketConfigurator configures an open websocket connection. @@ -172,6 +191,7 @@ func (s *StreamingAPI) SubscribeToTraces(ctx context.Context, accounts []string, if err := json.Unmarshal(data, &eventData); err != nil { // this should never happen but anyway s.logger.Errorf("sse connection received invalid trace event data: %v", err) + return } handler(eventData) }) @@ -191,6 +211,7 @@ func (s *StreamingAPI) SubscribeToMempool(ctx context.Context, accounts []string if err := json.Unmarshal(data, &eventData); err != nil { // this should never happen but anyway s.logger.Errorf("sse connection received invalid mempool event data: %v", err) + return } handler(eventData) }) @@ -219,6 +240,28 @@ func (s *StreamingAPI) SubscribeToTransactions(ctx context.Context, accounts []s if err := json.Unmarshal(data, &eventData); err != nil { // this should never happen but anyway s.logger.Errorf("sse connection received invalid transaction event data: %v", err) + return + } + handler(eventData) + }) +} + +// SubscribeToBlocks opens a new sse connection to tonapi.io and subscribes to new blocks in the specified workchain. +// When a new block is received, the handler will be called. +// If workchain is nil, all blocks from all workchain will be received. +// This function returns an error when the underlying connection fails or context is canceled. +// No automatic reconnection is performed. +func (s *StreamingAPI) SubscribeToBlocks(ctx context.Context, workchain *int, handler BlockHandler) error { + url := fmt.Sprintf("%s/v2/sse/blocks", s.endpoint) + if workchain != nil { + url = fmt.Sprintf("%s/v2/sse/blocks?workchain=%d", s.endpoint, *workchain) + } + return s.subscribe(ctx, url, s.apiKey, func(data []byte) { + eventData := BlockEventData{} + if err := json.Unmarshal(data, &eventData); err != nil { + // this should never happen but anyway + s.logger.Errorf("sse connection received invalid block event data: %v", err) + return } handler(eventData) }) diff --git a/websocket.go b/websocket.go index 3b07e6d..e717778 100644 --- a/websocket.go +++ b/websocket.go @@ -38,6 +38,7 @@ type websocketConnection struct { mempoolHandler MempoolHandler transactionHandler TransactionHandler traceHandler TraceHandler + blockHandler BlockHandler } func (w *websocketConnection) SubscribeToTransactions(accounts []string, operations []string) error { @@ -95,6 +96,25 @@ func (w *websocketConnection) UnsubscribeFromMempool() error { return w.conn.WriteJSON(request) } +func (w *websocketConnection) SubscribeToBlocks(workchain *int) error { + request := JsonRPCRequest{ID: w.currentRequestID(), JSONRPC: "2.0", Method: "subscribe_block"} + if workchain != nil { + request.Params = []string{ + fmt.Sprintf("workchain=%d", *workchain), + } + } + w.mu.Lock() + defer w.mu.Unlock() + return w.conn.WriteJSON(request) +} + +func (w *websocketConnection) UnsubscribeFromBlocks() error { + request := JsonRPCRequest{ID: w.currentRequestID(), JSONRPC: "2.0", Method: "unsubscribe_block"} + w.mu.Lock() + defer w.mu.Unlock() + return w.conn.WriteJSON(request) +} + func (w *websocketConnection) SetMempoolHandler(handler MempoolHandler) { w.mu.Lock() defer w.mu.Unlock() @@ -113,6 +133,12 @@ func (w *websocketConnection) SetTraceHandler(handler TraceHandler) { w.traceHandler = handler } +func (w *websocketConnection) SetBlockHandler(handler BlockHandler) { + w.mu.Lock() + defer w.mu.Unlock() + w.blockHandler = handler +} + func websocketConnect(ctx context.Context, endpoint string, apiKey string) (*websocketConnection, error) { header := http.Header{} if len(apiKey) > 0 { @@ -137,6 +163,7 @@ func websocketConnect(ctx context.Context, endpoint string, apiKey string) (*web mempoolHandler: func(data MempoolEventData) {}, transactionHandler: func(data TransactionEventData) {}, traceHandler: func(data TraceEventData) {}, + blockHandler: func(data BlockEventData) {}, }, nil } @@ -185,6 +212,14 @@ func (w *websocketConnection) runJsonRPC(ctx context.Context, fn WebsocketConfig w.processHandler(func() { w.mempoolHandler(mempoolEvent) }) + case "block": + var block BlockEventData + if err := json.Unmarshal(response.Params, &block); err != nil { + return err + } + w.processHandler(func() { + w.blockHandler(block) + }) } } })