From 1efe2d078af03f68dbd8130eb399370165902680 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 10 Sep 2024 16:11:17 -0400 Subject: [PATCH 1/7] p2p: fan in incoming txns into backlog worker --- data/txHandler.go | 87 +++++++++++++++++++++--------------------- data/txHandler_test.go | 4 ++ 2 files changed, 47 insertions(+), 44 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index ec3a84cc1f..33c2fc0423 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -102,12 +102,13 @@ const ( // The txBacklogMsg structure used to track a single incoming transaction from the gossip network, type txBacklogMsg struct { - rawmsg *network.IncomingMessage // the raw message from the network - unverifiedTxGroup []transactions.SignedTxn // the unverified ( and signed ) transaction group - rawmsgDataHash *crypto.Digest // hash (if any) of raw message data from the network - unverifiedTxGroupHash *crypto.Digest // hash (if any) of the unverifiedTxGroup - verificationErr error // The verification error generated by the verification function, if any. - capguard *util.ErlCapacityGuard // the structure returned from the elastic rate limiter, to be released when dequeued + rawmsg *network.IncomingMessage // the raw message from the network + unverifiedTxGroup []transactions.SignedTxn // the unverified ( and signed ) transaction group + rawmsgDataHash *crypto.Digest // hash (if any) of raw message data from the network + unverifiedTxGroupHash *crypto.Digest // hash (if any) of the unverifiedTxGroup + verificationErr error // The verification error generated by the verification function, if any. + capguard *util.ErlCapacityGuard // the structure returned from the elastic rate limiter, to be released when dequeued + syncCh chan network.ForwardingPolicy // channel to signal the synchronous mode and its ops completion } // TxHandler handles transaction messages @@ -346,6 +347,10 @@ func (handler *TxHandler) backlogWorker() { if wi.capguard != nil { wi.capguard.Served() } + // if in synchronous mode, signal the completion of the operation + if wi.syncCh != nil { + wi.syncCh <- network.Ignore + } continue } // handler.streamVerifierChan does not receive if ctx is cancelled @@ -353,6 +358,10 @@ func (handler *TxHandler) backlogWorker() { case handler.streamVerifierChan <- &verify.UnverifiedTxnSigJob{TxnGroup: wi.unverifiedTxGroup, BacklogMessage: wi}: case <-handler.ctx.Done(): transactionMessagesDroppedFromBacklog.Inc(nil) + // if in synchronous mode, signal the completion of the operation + if wi.syncCh != nil { + wi.syncCh <- network.Ignore + } return } if wi.capguard != nil { @@ -366,7 +375,6 @@ func (handler *TxHandler) backlogWorker() { m := wi.BacklogMessage.(*txBacklogMsg) m.verificationErr = wi.Err handler.postProcessCheckedTxn(m) - case <-handler.ctx.Done(): return } @@ -507,6 +515,11 @@ func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) { if wi.verificationErr != nil { // disconnect from peer. handler.postProcessReportErrors(wi.verificationErr) + // if in synchronous mode, signal the completion of the operation + if wi.syncCh != nil { + wi.syncCh <- network.Disconnect + return + } logging.Base().Warnf("Received a malformed tx group %v: %v", wi.unverifiedTxGroup, wi.verificationErr) handler.net.Disconnect(wi.rawmsg.Sender) return @@ -523,6 +536,10 @@ func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) { if err != nil { handler.rememberReportErrors(err) logging.Base().Debugf("could not remember tx: %v", err) + // if in synchronous mode, signal the completion of the operation + if wi.syncCh != nil { + wi.syncCh <- network.Ignore + } return } @@ -533,6 +550,11 @@ func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) { if err != nil { logging.Base().Infof("unable to pin transaction: %v", err) } + // if in synchronous mode, signal the completion of the operation + if wi.syncCh != nil { + wi.syncCh <- network.Accept + return + } // We reencode here instead of using rawmsg.Data to avoid broadcasting non-canonical encodings handler.net.Relay(handler.ctx, protocol.TxnTag, reencode(verifiedTxGroup), false, wi.rawmsg.Sender) @@ -785,11 +807,6 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net // validateIncomingTxMessage is the validator for the MessageProcessor implementation used by P2PNetwork. func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) network.OutgoingMessage { - msgKey, isDup := handler.incomingMsgDupCheck(rawmsg.Data) - if isDup { - return network.OutgoingMessage{Action: network.Ignore} - } - unverifiedTxGroup, consumed, invalid := decodeMsg(rawmsg.Data) if invalid { // invalid encoding or exceeding txgroup, disconnect from this peer @@ -819,47 +836,29 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa wi := &txBacklogMsg{ rawmsg: &rawmsg, unverifiedTxGroup: unverifiedTxGroup, - rawmsgDataHash: msgKey, unverifiedTxGroupHash: canonicalKey, capguard: nil, + syncCh: make(chan network.ForwardingPolicy, 1), } - if handler.checkAlreadyCommitted(wi) { - transactionMessagesAlreadyCommitted.Inc(nil) - return network.OutgoingMessage{ - Action: network.Ignore, - } - } + var action network.ForwardingPolicy + select { + case handler.backlogQueue <- wi: + action = <-wi.syncCh + default: + // if we failed here we want to increase the corresponding metric. It might suggest that we + // want to increase the queue size. + transactionMessagesDroppedFromBacklog.Inc(nil) - err := handler.batchVerifier.Verify(wi.unverifiedTxGroup) - if err != nil { - handler.postProcessReportErrors(err) - logging.Base().Warnf("Received a malformed tx group %v: %v", wi.unverifiedTxGroup, err) - return network.OutgoingMessage{ - Action: network.Disconnect, - } - } - verifiedTxGroup := wi.unverifiedTxGroup + // additionally, remove the txn from duplicate caches to ensure it can be re-submitted + handler.deleteFromCaches(nil, canonicalKey) - // save the transaction, if it has high enough fee and not already in the cache - err = handler.txPool.Remember(verifiedTxGroup) - if err != nil { - handler.rememberReportErrors(err) - logging.Base().Debugf("could not remember tx: %v", err) - return network.OutgoingMessage{ - Action: network.Ignore, - } + // queue is full, do not if the message valid or not so ignore + action = network.Ignore } - transactionMessagesRemember.Inc(nil) - - // if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions. - err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup) - if err != nil { - logging.Base().Infof("unable to pin transaction: %v", err) - } return network.OutgoingMessage{ - Action: network.Accept, + Action: action, } } diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 24e09b2963..c6c83eeb83 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -2774,6 +2774,8 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) { handler, err := makeTestTxHandler(ledger, cfg) require.NoError(t, err) + handler.Start() + defer handler.Stop() // valid message _, blob := makeTxns(addresses, secrets, 1, 2, genesisHash) @@ -2807,6 +2809,8 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) { require.True(t, cfg.TxFilterCanonicalEnabled()) handler, err := makeTestTxHandler(ledger, cfg) require.NoError(t, err) + handler.Start() + defer handler.Stop() // valid message _, blob := makeTxns(addresses, secrets, 1, 2, genesisHash) From 26b13d323290c090569a17525f8903d26822a369 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 16 Sep 2024 19:02:03 -0400 Subject: [PATCH 2/7] increase number of workers in txTopicHandleLoop --- network/p2pNetwork.go | 41 +++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 32b9a49ef3..de86e4ca06 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -923,24 +923,33 @@ func (n *P2PNetwork) txTopicHandleLoop() { } n.log.Debugf("Subscribed to topic %s", p2p.TXTopicName) - for { - // msg from sub.Next not used since all work done by txTopicValidator - _, err := sub.Next(n.ctx) - if err != nil { - if err != pubsub.ErrSubscriptionCancelled && err != context.Canceled { - n.log.Errorf("Error reading from subscription %v, peerId %s", err, n.service.ID()) + const threads = incomingThreads / 2 // perf tests showed that 10 (half of incomingThreads) was optimal in terms of TPS (attempted 1, 5, 10, 20) + var wg sync.WaitGroup + wg.Add(threads) + for i := 0; i < threads; i++ { + go func(ctx context.Context, sub p2p.SubNextCancellable, wantTXGossip *atomic.Bool, peerID peer.ID, log logging.Logger) { + defer wg.Done() + for { + // msg from sub.Next not used since all work done by txTopicValidator + _, err := sub.Next(ctx) + if err != nil { + if err != pubsub.ErrSubscriptionCancelled && err != context.Canceled { + log.Errorf("Error reading from subscription %v, peerId %s", err, peerID) + } + log.Debugf("Cancelling subscription to topic %s due Subscription.Next error: %v", p2p.TXTopicName, err) + sub.Cancel() + return + } + // participation or configuration change, cancel subscription and quit + if !wantTXGossip.Load() { + log.Debugf("Cancelling subscription to topic %s due participation change", p2p.TXTopicName) + sub.Cancel() + return + } } - n.log.Debugf("Cancelling subscription to topic %s due Subscription.Next error: %v", p2p.TXTopicName, err) - sub.Cancel() - return - } - // participation or configuration change, cancel subscription and quit - if !n.wantTXGossip.Load() { - n.log.Debugf("Cancelling subscription to topic %s due participation change", p2p.TXTopicName) - sub.Cancel() - return - } + }(n.ctx, sub, &n.wantTXGossip, n.service.ID(), n.log) } + wg.Wait() } // txTopicValidator calls txHandler to validate and process incoming transactions. From 467dd68fd326f5d438f01b20f0ad864a2dbcf419 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 17 Sep 2024 15:55:39 -0400 Subject: [PATCH 3/7] fix wsPeers --- network/p2pNetwork.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index de86e4ca06..357e241293 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -954,20 +954,11 @@ func (n *P2PNetwork) txTopicHandleLoop() { // txTopicValidator calls txHandler to validate and process incoming transactions. func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg *pubsub.Message) pubsub.ValidationResult { - var routingAddr [8]byte n.wsPeersLock.Lock() - var wsp *wsPeer - var ok bool - if wsp, ok = n.wsPeers[peerID]; ok { - copy(routingAddr[:], wsp.RoutingAddr()) - } else { - // well, otherwise use last 8 bytes of peerID - copy(routingAddr[:], peerID[len(peerID)-8:]) - } + var wsp *wsPeer = n.wsPeers[peerID] n.wsPeersLock.Unlock() inmsg := IncomingMessage{ - // Sender: gossipSubPeer{peerID: msg.ReceivedFrom, net: n, routingAddr: routingAddr}, Sender: wsp, Tag: protocol.TxnTag, Data: msg.Data, From ff86795f6b40cdeb7ff1a8e381f69224ad057f81 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 18 Sep 2024 16:17:13 -0400 Subject: [PATCH 4/7] cache stx id --- data/transactions/signedtxn.go | 10 ++++++++++ data/txHandler.go | 5 +++++ 2 files changed, 15 insertions(+) diff --git a/data/transactions/signedtxn.go b/data/transactions/signedtxn.go index 5a10829f09..cc9c3de86b 100644 --- a/data/transactions/signedtxn.go +++ b/data/transactions/signedtxn.go @@ -37,6 +37,8 @@ type SignedTxn struct { Lsig LogicSig `codec:"lsig"` Txn Transaction `codec:"txn"` AuthAddr basics.Address `codec:"sgnr"` + + cachedID *Txid } // SignedTxnInBlock is how a signed transaction is encoded in a block. @@ -59,9 +61,17 @@ type SignedTxnWithAD struct { // ID returns the Txid (i.e., hash) of the underlying transaction. func (s SignedTxn) ID() Txid { + if s.cachedID != nil { + return *s.cachedID + } return s.Txn.ID() } +func (s *SignedTxn) CacheID() { + txid := s.Txn.ID() + s.cachedID = &txid +} + // ID on SignedTxnInBlock should never be called, because the ID depends // on the block from which this transaction will be decoded. By having // a different return value from SignedTxn.ID(), we will catch errors at diff --git a/data/txHandler.go b/data/txHandler.go index 33c2fc0423..f7746db4cb 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -531,6 +531,11 @@ func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) { // at this point, we've verified the transaction, so we can safely treat the transaction as a verified transaction. verifiedTxGroup := wi.unverifiedTxGroup + // precompute transaction IDs + for i := range verifiedTxGroup { + verifiedTxGroup[i].CacheID() + } + // save the transaction, if it has high enough fee and not already in the cache err := handler.txPool.Remember(verifiedTxGroup) if err != nil { From b6901c86eba46884adf4c8385179c82127e2956b Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 20 Sep 2024 14:34:31 -0400 Subject: [PATCH 5/7] fix linter --- data/transactions/signedtxn.go | 1 + 1 file changed, 1 insertion(+) diff --git a/data/transactions/signedtxn.go b/data/transactions/signedtxn.go index cc9c3de86b..08007d2f9c 100644 --- a/data/transactions/signedtxn.go +++ b/data/transactions/signedtxn.go @@ -67,6 +67,7 @@ func (s SignedTxn) ID() Txid { return s.Txn.ID() } +// CacheID caches the ID of the underlying transaction. func (s *SignedTxn) CacheID() { txid := s.Txn.ID() s.cachedID = &txid From 2da6164dfd59c52f57f58674aa20038c36ec8c64 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 20 Sep 2024 14:40:41 -0400 Subject: [PATCH 6/7] msgp --- data/transactions/signedtxn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data/transactions/signedtxn.go b/data/transactions/signedtxn.go index 08007d2f9c..15b5379e91 100644 --- a/data/transactions/signedtxn.go +++ b/data/transactions/signedtxn.go @@ -38,7 +38,7 @@ type SignedTxn struct { Txn Transaction `codec:"txn"` AuthAddr basics.Address `codec:"sgnr"` - cachedID *Txid + cachedID *Txid `codec:"-"` } // SignedTxnInBlock is how a signed transaction is encoded in a block. From 563579898140f6b3c8cf5e050d70ffbd22b2fe15 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 20 Sep 2024 17:10:15 -0400 Subject: [PATCH 7/7] fix cached id data race --- data/txHandler.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index f7746db4cb..8f0ebebd68 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -342,6 +342,10 @@ func (handler *TxHandler) backlogWorker() { logging.Base().Warnf("Failed to release capacity to ElasticRateLimiter: %v", err) } } + // precompute transaction IDs + for i := range wi.unverifiedTxGroup { + wi.unverifiedTxGroup[i].CacheID() + } if handler.checkAlreadyCommitted(wi) { transactionMessagesAlreadyCommitted.Inc(nil) if wi.capguard != nil { @@ -531,11 +535,6 @@ func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) { // at this point, we've verified the transaction, so we can safely treat the transaction as a verified transaction. verifiedTxGroup := wi.unverifiedTxGroup - // precompute transaction IDs - for i := range verifiedTxGroup { - verifiedTxGroup[i].CacheID() - } - // save the transaction, if it has high enough fee and not already in the cache err := handler.txPool.Remember(verifiedTxGroup) if err != nil {