diff --git a/go/runtime/txpool/transaction.go b/go/runtime/txpool/transaction.go index 0f521e6cb0f..b68988d9b17 100644 --- a/go/runtime/txpool/transaction.go +++ b/go/runtime/txpool/transaction.go @@ -8,20 +8,12 @@ import ( "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" ) -type txStatus uint8 - -const ( - txStatusPendingCheck = iota - txStatusChecked -) - +// todo: move to main_queue.go // Transaction is a transaction in the transaction pool. type Transaction struct { // tx represents the raw binary transaction data. tx []byte - // status is the transaction status. - status txStatus // time is the timestamp when the transaction was first seen. time time.Time // hash is the cached transaction hash. @@ -36,12 +28,11 @@ type Transaction struct { senderSeq uint64 } -func newTransaction(tx []byte, status txStatus) *Transaction { +func newTransaction(tx []byte) *Transaction { return &Transaction{ - tx: tx, - status: status, - time: time.Now(), - hash: hash.NewFromBytes(tx), + tx: tx, + time: time.Now(), + hash: hash.NewFromBytes(tx), } } @@ -85,10 +76,9 @@ func (tx *Transaction) SenderSeq() uint64 { return tx.senderSeq } +// todo: move this be part of OfferChecked // setChecked populates transaction data retrieved from checks. func (tx *Transaction) setChecked(meta *protocol.CheckTxMetadata) { - tx.status = txStatusChecked - if meta != nil { tx.priority = meta.Priority tx.sender = string(meta.Sender) @@ -102,37 +92,13 @@ func (tx *Transaction) setChecked(meta *protocol.CheckTxMetadata) { } } -// txCheckFlags are the flags describing how transaction should be checked. -type txCheckFlags uint8 - -const ( - // txCheckLocal is a flag indicating that the transaction was obtained from a local client. - txCheckLocal = (1 << 0) - // txCheckDiscard is a flag indicating that the transaction should be discarded after checks. - txCheckDiscard = (1 << 1) -) - -func (f txCheckFlags) isLocal() bool { - return (f & txCheckLocal) != 0 -} - -func (f txCheckFlags) isDiscard() bool { - return (f & txCheckDiscard) != 0 -} - +// todo: move to check_queue.go // PendingCheckTransaction is a transaction pending checks. type PendingCheckTransaction struct { tx []byte - // flags are the transaction check flags. - // todo: replace with queue reference - flags txCheckFlags + // dstQueue is where to offer the tx after checking, or nil to discard + dstQueue RecheckableTransactionStore // notifyCh is a channel for sending back the transaction check result. notifyCh chan *protocol.CheckTxResult } - -func (pct *PendingCheckTransaction) isRecheck() bool { - // If transaction has already been checked then the fact that it is wrapped in a pending check - // transaction again means that this is a re-check. - return pct.status == txStatusChecked -} diff --git a/go/runtime/txpool/txpool.go b/go/runtime/txpool/txpool.go index 96cdba8d2c7..28f112e2aa1 100644 --- a/go/runtime/txpool/txpool.go +++ b/go/runtime/txpool/txpool.go @@ -234,11 +234,16 @@ func (t *txPool) SubmitTxNoWait(ctx context.Context, tx []byte, meta *Transactio } func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMeta, notifyCh chan *protocol.CheckTxResult) error { - tx := newTransaction(rawTx, txStatusPendingCheck) + // todo: in the new design, we'll be submitting to the check queue first, which won't have "first seen time" + // metadata. seems like it could still work out, as checking is single threaded, so we can still preserve the + // original order when we finish checking and offer to the main queue + // todo: can we go without this? // Skip recently seen transactions. - if _, seen := t.seenCache.Peek(tx.hash); seen { - t.logger.Debug("ignoring already seen transaction", "tx_hash", tx.hash) + // todo: hash used to be cached + h := hash.NewFromBytes(rawTx) + if _, seen := t.seenCache.Peek(h); seen { + t.logger.Debug("ignoring already seen transaction", "tx_hash", h) return fmt.Errorf("duplicate transaction") } @@ -247,12 +252,12 @@ func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMe tx: rawTx, notifyCh: notifyCh, } - // todo: change flags to destination queue - if meta.Local { - pct.flags |= txCheckLocal - } if meta.Discard { - pct.flags |= txCheckDiscard + pct.dstQueue = nil + } else if meta.Local { + pct.dstQueue = t.localQueue + } else { + pct.dstQueue = t.mainQueue } return t.addToCheckQueue(pct) @@ -264,7 +269,7 @@ func (t *txPool) addToCheckQueue(pct *PendingCheckTransaction) error { t.logger.Debug("queuing transaction for check", "tx", pct.tx, "tx_hash", h, - "recheck", pct.isRecheck(), + "recheck", "dunno", // todo: can we live without this? ) if err := t.checkTxQueue.add(pct); err != nil { t.logger.Warn("unable to queue transaction", @@ -510,10 +515,10 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { newTxs := make([]*PendingCheckTransaction, 0, len(results)) batchIndices := make([]int, 0, len(results)) - // todo: trying to remove this. we will unschedule all when starting to recheck, and if it doesn't pass, we won't - // put it back. may need some bookkeeping to notify the sender though - var unschedule []hash.Hash for i, res := range results { + // todo: we used to do this after failure metrics, but now it's here first + notifySubmitter(i) + if !res.IsSuccess() { rejectedTransactions.With(t.getMetricLabels()).Inc() // todo: hash used to be cached @@ -522,33 +527,24 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { "tx", batch[i].tx, "tx_hash", h, "result", res, - "recheck", batch[i].isRecheck(), + "recheck", "dunno", // todo: can we live without this? ) - // If this was a recheck, make sure to remove the transaction from the scheduling queue. - if batch[i].isRecheck() { - unschedule = append(unschedule, h) - } - notifySubmitter(i) - continue - } - - if batch[i].flags.isDiscard() || batch[i].isRecheck() { - notifySubmitter(i) + // We won't be sending this tx on to its destination queue. continue } // For any transactions that are to be queued, we defer notification until queued. acceptedTransactions.With(t.getMetricLabels()).Inc() - // todo: need to design a way to get res.Meta to the main queue + // todo: need to design a way to get res.Meta to the main queue. probably extra parameter to OfferChecked // batch[i].setChecked(res.Meta) newTxs = append(newTxs, batch[i]) batchIndices = append(batchIndices, i) } // todo: we used to unschedule rejected txs here, but going forward we will (i) not schedule them in the first - // until they are checked and (ii) unschedule them as soon as we start rechecking them. + // place until they are checked and (ii) unschedule them as soon as we start rechecking them. // If there are more transactions to check, make sure we check them next. if t.checkTxQueue.size() > 0 {