Skip to content

Commit

Permalink
Add methods to stream blocks to Streaming API
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksej-paschenko committed Dec 14, 2023
1 parent 76ff729 commit cabe606
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 0 deletions.
43 changes: 43 additions & 0 deletions streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ type TraceEventData struct {
Hash string `json:"hash"`
}

// BlockEventData represents a notification about a new block.
type BlockEventData struct {
Workchain int32 `json:"workchain"`
Shard string `json:"shard"`
Seqno uint32 `json:"seqno"`
RootHash string `json:"root_hash"`
FileHash string `json:"file_hash"`
}

// TraceHandler is a callback that handles a new trace event.
type TraceHandler func(TraceEventData)

Expand All @@ -41,6 +50,9 @@ type TransactionHandler func(data TransactionEventData)
// MempoolHandler is a callback that handles a new mempool event.
type MempoolHandler func(data MempoolEventData)

// BlockHandler is a callback that handles a new block event.
type BlockHandler func(data BlockEventData)

// StreamingAPI provides a convenient way to receive events happening on the TON blockchain.
type StreamingAPI struct {
logger Logger
Expand Down Expand Up @@ -123,12 +135,19 @@ type Websocket interface {
// UnsubscribeFromMempool unsubscribes from notifications about new messages in the TON network.
UnsubscribeFromMempool() error

// SubscribeToBlocks subscribes to notifications about new blocks in the specified workchain.
// Workchain is optional. If it is nil, all blocks from all workchain will be received.
SubscribeToBlocks(workchain *int) error
// UnsubscribeFromBlocks unsubscribes from notifications about new blocks in the specified workchain.
UnsubscribeFromBlocks() error

// SetMempoolHandler defines a callback that will be called when a new mempool event is received.
SetMempoolHandler(handler MempoolHandler)
// SetTransactionHandler defines a callback that will be called when a new transaction event is received.
SetTransactionHandler(handler TransactionHandler)
// SetTraceHandler defines a callback that will be called when a new trace event is received.
SetTraceHandler(handler TraceHandler)
SetBlockHandler(handler BlockHandler)
}

// WebsocketConfigurator configures an open websocket connection.
Expand Down Expand Up @@ -172,6 +191,7 @@ func (s *StreamingAPI) SubscribeToTraces(ctx context.Context, accounts []string,
if err := json.Unmarshal(data, &eventData); err != nil {
// this should never happen but anyway
s.logger.Errorf("sse connection received invalid trace event data: %v", err)
return
}
handler(eventData)
})
Expand All @@ -191,6 +211,7 @@ func (s *StreamingAPI) SubscribeToMempool(ctx context.Context, accounts []string
if err := json.Unmarshal(data, &eventData); err != nil {
// this should never happen but anyway
s.logger.Errorf("sse connection received invalid mempool event data: %v", err)
return
}
handler(eventData)
})
Expand Down Expand Up @@ -219,6 +240,28 @@ func (s *StreamingAPI) SubscribeToTransactions(ctx context.Context, accounts []s
if err := json.Unmarshal(data, &eventData); err != nil {
// this should never happen but anyway
s.logger.Errorf("sse connection received invalid transaction event data: %v", err)
return
}
handler(eventData)
})
}

// SubscribeToBlocks opens a new sse connection to tonapi.io and subscribes to new blocks in the specified workchain.
// When a new block is received, the handler will be called.
// If workchain is nil, all blocks from all workchain will be received.
// This function returns an error when the underlying connection fails or context is canceled.
// No automatic reconnection is performed.
func (s *StreamingAPI) SubscribeToBlocks(ctx context.Context, workchain *int, handler BlockHandler) error {
url := fmt.Sprintf("%s/v2/sse/blocks", s.endpoint)
if workchain != nil {
url = fmt.Sprintf("%s/v2/sse/blocks?workchain=%d", s.endpoint, *workchain)
}
return s.subscribe(ctx, url, s.apiKey, func(data []byte) {
eventData := BlockEventData{}
if err := json.Unmarshal(data, &eventData); err != nil {
// this should never happen but anyway
s.logger.Errorf("sse connection received invalid block event data: %v", err)
return
}
handler(eventData)
})
Expand Down
35 changes: 35 additions & 0 deletions websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type websocketConnection struct {
mempoolHandler MempoolHandler
transactionHandler TransactionHandler
traceHandler TraceHandler
blockHandler BlockHandler
}

func (w *websocketConnection) SubscribeToTransactions(accounts []string, operations []string) error {
Expand Down Expand Up @@ -95,6 +96,25 @@ func (w *websocketConnection) UnsubscribeFromMempool() error {
return w.conn.WriteJSON(request)
}

func (w *websocketConnection) SubscribeToBlocks(workchain *int) error {
request := JsonRPCRequest{ID: w.currentRequestID(), JSONRPC: "2.0", Method: "subscribe_block"}
if workchain != nil {
request.Params = []string{
fmt.Sprintf("workchain=%d", *workchain),
}
}
w.mu.Lock()
defer w.mu.Unlock()
return w.conn.WriteJSON(request)
}

func (w *websocketConnection) UnsubscribeFromBlocks() error {
request := JsonRPCRequest{ID: w.currentRequestID(), JSONRPC: "2.0", Method: "unsubscribe_block"}
w.mu.Lock()
defer w.mu.Unlock()
return w.conn.WriteJSON(request)
}

func (w *websocketConnection) SetMempoolHandler(handler MempoolHandler) {
w.mu.Lock()
defer w.mu.Unlock()
Expand All @@ -113,6 +133,12 @@ func (w *websocketConnection) SetTraceHandler(handler TraceHandler) {
w.traceHandler = handler
}

func (w *websocketConnection) SetBlockHandler(handler BlockHandler) {
w.mu.Lock()
defer w.mu.Unlock()
w.blockHandler = handler
}

func websocketConnect(ctx context.Context, endpoint string, apiKey string) (*websocketConnection, error) {
header := http.Header{}
if len(apiKey) > 0 {
Expand All @@ -137,6 +163,7 @@ func websocketConnect(ctx context.Context, endpoint string, apiKey string) (*web
mempoolHandler: func(data MempoolEventData) {},
transactionHandler: func(data TransactionEventData) {},
traceHandler: func(data TraceEventData) {},
blockHandler: func(data BlockEventData) {},
}, nil
}

Expand Down Expand Up @@ -185,6 +212,14 @@ func (w *websocketConnection) runJsonRPC(ctx context.Context, fn WebsocketConfig
w.processHandler(func() {
w.mempoolHandler(mempoolEvent)
})
case "block":
var block BlockEventData
if err := json.Unmarshal(response.Params, &block); err != nil {
return err
}
w.processHandler(func() {
w.blockHandler(block)
})
}
}
})
Expand Down

0 comments on commit cabe606

Please sign in to comment.