diff --git a/go/consensus/tendermint/apps/roothash/transactions.go b/go/consensus/tendermint/apps/roothash/transactions.go index c649492940d..6b17f2a45b8 100644 --- a/go/consensus/tendermint/apps/roothash/transactions.go +++ b/go/consensus/tendermint/apps/roothash/transactions.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" "github.com/oasisprotocol/oasis-core/go/common/logging" abciAPI "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/api" @@ -15,6 +16,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/roothash/api/commitment" "github.com/oasisprotocol/oasis-core/go/roothash/api/message" staking "github.com/oasisprotocol/oasis-core/go/staking/api" + "github.com/oasisprotocol/oasis-core/go/worker/common/committee" ) // getRuntimeState fetches the current runtime state and performs common @@ -379,17 +381,24 @@ func (app *rootHashApplication) submitMsg( // Queue message. inMsg := &message.IncomingMessage{ - ID: meta.NextSequenceNumber, - Caller: ctx.CallerAddress(), - Tag: msg.Tag, - Fee: msg.Fee, - Tokens: msg.Tokens, - Data: msg.Data, + ID: meta.NextSequenceNumber, + Caller: ctx.CallerAddress(), + Tag: msg.Tag, + Fee: msg.Fee, + Tokens: msg.Tokens, + Transaction: msg.Transaction, + Data: msg.Data, } if err = state.SetIncomingMessageInQueue(ctx, rtState.Runtime.ID, inMsg); err != nil { return err } + if msg.Transaction != nil { + // todo: access txpool somehow + var node *committee.Node + node.TxPool.AddIncomingTx(hash.NewFromBytes(*msg.Transaction)) + } + // Update next sequence number. meta.Size++ meta.NextSequenceNumber++ diff --git a/go/roothash/api/api.go b/go/roothash/api/api.go index 80642b75ae6..4354a1eb8d0 100644 --- a/go/roothash/api/api.go +++ b/go/roothash/api/api.go @@ -217,6 +217,9 @@ type SubmitMsg struct { // Tokens are any tokens sent into the runtime as part of the message being sent. The tokens are // transferred before the message is processed by the runtime. Tokens quantity.Quantity `json:"tokens,omitempty"` + // Transaction is an optional transaction. `nil` means no transaction, while `tx = nil; &tx` is + // a transaction that is the empty byte string. Go, I swear. + Transaction *[]byte `json:"transaction,omitempty"` // Data is arbitrary runtime-dependent data. Data []byte `json:"data,omitempty"` } diff --git a/go/roothash/api/message/incoming_message.go b/go/roothash/api/message/incoming_message.go index e9ac7d38992..16380fd57b0 100644 --- a/go/roothash/api/message/incoming_message.go +++ b/go/roothash/api/message/incoming_message.go @@ -26,6 +26,10 @@ type IncomingMessage struct { // transferred before the message is processed by the runtime. Tokens quantity.Quantity `json:"tokens,omitempty"` + // Transaction is an optional transaction. `nil` means no transaction, while `tx = nil; &tx` is + // a transaction that is the empty byte string. Go, I swear. + Transaction *[]byte `json:"transaction,omitempty"` + // Data is arbitrary runtime-dependent data. Data []byte `json:"data,omitempty"` } diff --git a/go/runtime/host/protocol/types.go b/go/runtime/host/protocol/types.go index 20bdbfc1aa4..45fe47ee7e4 100644 --- a/go/runtime/host/protocol/types.go +++ b/go/runtime/host/protocol/types.go @@ -352,6 +352,8 @@ type RuntimeExecuteTxBatchResponse struct { // TxRejectHashes are the transaction hashes of transactions that should be immediately removed // from the scheduling queue as they are invalid. TxRejectHashes []hash.Hash `json:"tx_reject_hashes,omitempty"` + // TxIncomingHashes are the transactions that the runtime is scheduling from outside the queue. + TxIncomingHashes []hash.Hash `json:"extra_txs,omitempty"` // TxInputRoot is the root hash of all transaction inputs. TxInputRoot hash.Hash `json:"tx_input_root,omitempty"` // TxInputWriteLog is the write log for generating transaction inputs. diff --git a/go/runtime/txpool/txpool.go b/go/runtime/txpool/txpool.go index 223f4d2a851..278c6a613fe 100644 --- a/go/runtime/txpool/txpool.go +++ b/go/runtime/txpool/txpool.go @@ -73,6 +73,9 @@ type TransactionPool interface { // SubmitTxNoWait adds the transaction into the transaction pool and returns immediately. SubmitTxNoWait(ctx context.Context, tx []byte, meta *TransactionMeta) error + // AddIncomingTx uh + AddIncomingTx(tx hash.Hash) + // SubmitProposedBatch adds the given (possibly new) transaction batch into the current // proposal queue. SubmitProposedBatch(batch [][]byte) @@ -87,6 +90,9 @@ type TransactionPool interface { // RemoveTxBatch removes a transaction batch from the transaction pool. RemoveTxBatch(txs []hash.Hash) + // AdvanceIncomingTxs uh + AdvanceIncomingTxs(count int) + // GetPrioritizedBatch returns a batch of transactions ordered by priority. // // Offset specifies the transaction hash that should serve as an offset when returning @@ -181,8 +187,9 @@ type txPool struct { schedulerTicker *time.Ticker schedulerNotifier *pubsub.Broker - proposedTxsLock sync.Mutex - proposedTxs map[hash.Hash]*Transaction + proposedTxsLock sync.Mutex + proposedTxs map[hash.Hash]*Transaction + incomingMessageTxs []hash.Hash blockInfoLock sync.Mutex blockInfo *BlockInfo @@ -276,6 +283,11 @@ func (t *txPool) addToCheckQueue(pct *PendingCheckTransaction) error { return nil } +func (t *txPool) AddIncomingTx(tx hash.Hash) { + // todo: locking? + t.incomingMessageTxs = append(t.incomingMessageTxs, tx) +} + func (t *txPool) SubmitProposedBatch(batch [][]byte) { // Also ingest into the regular pool (may fail). for _, rawTx := range batch { @@ -323,6 +335,12 @@ func (t *txPool) RemoveTxBatch(txs []hash.Hash) { pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.schedulerQueue.size())) } +func (t *txPool) AdvanceIncomingTxs(count int) { + // todo: locking? + // todo: will later append really free anything? would deque be better? + t.incomingMessageTxs = t.incomingMessageTxs[count:] +} + func (t *txPool) GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*Transaction { return t.schedulerQueue.getPrioritizedBatch(offset, limit) }