diff --git a/go/consensus/tendermint/apps/roothash/transactions.go b/go/consensus/tendermint/apps/roothash/transactions.go index c649492940d..e482025245e 100644 --- a/go/consensus/tendermint/apps/roothash/transactions.go +++ b/go/consensus/tendermint/apps/roothash/transactions.go @@ -379,12 +379,13 @@ func (app *rootHashApplication) submitMsg( // Queue message. inMsg := &message.IncomingMessage{ - ID: meta.NextSequenceNumber, - Caller: ctx.CallerAddress(), - Tag: msg.Tag, - Fee: msg.Fee, - Tokens: msg.Tokens, - Data: msg.Data, + ID: meta.NextSequenceNumber, + Caller: ctx.CallerAddress(), + Tag: msg.Tag, + Fee: msg.Fee, + Tokens: msg.Tokens, + Transaction: msg.Transaction, + Data: msg.Data, } if err = state.SetIncomingMessageInQueue(ctx, rtState.Runtime.ID, inMsg); err != nil { return err diff --git a/go/roothash/api/api.go b/go/roothash/api/api.go index 80642b75ae6..4354a1eb8d0 100644 --- a/go/roothash/api/api.go +++ b/go/roothash/api/api.go @@ -217,6 +217,9 @@ type SubmitMsg struct { // Tokens are any tokens sent into the runtime as part of the message being sent. The tokens are // transferred before the message is processed by the runtime. Tokens quantity.Quantity `json:"tokens,omitempty"` + // Transaction is an optional transaction. `nil` means no transaction, while `tx = nil; &tx` is + // a transaction that is the empty byte string. Go, I swear. + Transaction *[]byte `json:"transaction,omitempty"` // Data is arbitrary runtime-dependent data. Data []byte `json:"data,omitempty"` } diff --git a/go/roothash/api/message/incoming_message.go b/go/roothash/api/message/incoming_message.go index e9ac7d38992..16380fd57b0 100644 --- a/go/roothash/api/message/incoming_message.go +++ b/go/roothash/api/message/incoming_message.go @@ -26,6 +26,10 @@ type IncomingMessage struct { // transferred before the message is processed by the runtime. Tokens quantity.Quantity `json:"tokens,omitempty"` + // Transaction is an optional transaction. `nil` means no transaction, while `tx = nil; &tx` is + // a transaction that is the empty byte string. Go, I swear. + Transaction *[]byte `json:"transaction,omitempty"` + // Data is arbitrary runtime-dependent data. Data []byte `json:"data,omitempty"` } diff --git a/go/worker/compute/executor/committee/transactions.go b/go/worker/compute/executor/committee/transactions.go index 8d351ff0c6e..98e8deb14f9 100644 --- a/go/worker/compute/executor/committee/transactions.go +++ b/go/worker/compute/executor/committee/transactions.go @@ -9,6 +9,8 @@ import ( cmnBackoff "github.com/oasisprotocol/oasis-core/go/common/backoff" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" + roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" "github.com/oasisprotocol/oasis-core/go/runtime/transaction" "github.com/oasisprotocol/oasis-core/go/runtime/txpool" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/txsync" @@ -39,7 +41,7 @@ func (n *Node) resolveBatchLocked(batch *unresolvedBatch, missingState NodeState } var ctx context.Context ctx, n.missingTxsCancel = context.WithCancel(n.roundCtx) - go n.requestMissingTransactions(ctx) + go n.requestMissingTransactions(ctx, n.commonNode.CurrentConsensusBlock) } return resolvedBatch, nil } @@ -81,7 +83,31 @@ func (n *Node) handleNewCheckedTransactions(txs []*txpool.PendingCheckTransactio } } -func (n *Node) requestMissingTransactions(ctx context.Context) { +func (n *Node) requestMissingTransactions(ctx context.Context, consensusBlk *consensus.LightBlock) { + // Load transactions from roothash incoming messages. + func() { + inMsgs, err := n.commonNode.Consensus.RootHash().GetIncomingMessageQueue(ctx, &roothash.InMessageQueueRequest{ + RuntimeID: n.commonNode.Runtime.ID(), + Height: consensusBlk.Height, + }) + if err != nil { + n.logger.Error("failed to fetch incoming runtime message queue transactions", + "err", err, + ) + // todo: propagate sanely + panic(err) + } + var inMsgTxs [][]byte + for _, msg := range inMsgs { + if msg.Transaction != nil { + inMsgTxs = append(inMsgTxs, *msg.Transaction) + } + } + if len(inMsgTxs) != 0 { + n.commonNode.TxPool.SubmitProposedBatch(inMsgTxs) + } + }() + requestOp := func() error { // Determine what transactions are missing. txHashes := func() []hash.Hash {