Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: fan in incoming txns into backlog worker #6126

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions data/transactions/signedtxn.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
Lsig LogicSig `codec:"lsig"`
Txn Transaction `codec:"txn"`
AuthAddr basics.Address `codec:"sgnr"`

cachedID *Txid `codec:"-"`
}

// SignedTxnInBlock is how a signed transaction is encoded in a block.
Expand All @@ -59,9 +61,18 @@

// ID returns the Txid (i.e., hash) of the underlying transaction.
func (s SignedTxn) ID() Txid {
if s.cachedID != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: did we test performance both with and without the cachedID change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not recall but can perf tests

return *s.cachedID

Check warning on line 65 in data/transactions/signedtxn.go

View check run for this annotation

Codecov / codecov/patch

data/transactions/signedtxn.go#L65

Added line #L65 was not covered by tests
}
return s.Txn.ID()
}

// CacheID caches the ID of the underlying transaction.
func (s *SignedTxn) CacheID() {
txid := s.Txn.ID()
s.cachedID = &txid

Check warning on line 73 in data/transactions/signedtxn.go

View check run for this annotation

Codecov / codecov/patch

data/transactions/signedtxn.go#L71-L73

Added lines #L71 - L73 were not covered by tests
}

// ID on SignedTxnInBlock should never be called, because the ID depends
// on the block from which this transaction will be decoded. By having
// a different return value from SignedTxn.ID(), we will catch errors at
Expand Down
92 changes: 47 additions & 45 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,13 @@

// The txBacklogMsg structure used to track a single incoming transaction from the gossip network,
type txBacklogMsg struct {
rawmsg *network.IncomingMessage // the raw message from the network
unverifiedTxGroup []transactions.SignedTxn // the unverified ( and signed ) transaction group
rawmsgDataHash *crypto.Digest // hash (if any) of raw message data from the network
unverifiedTxGroupHash *crypto.Digest // hash (if any) of the unverifiedTxGroup
verificationErr error // The verification error generated by the verification function, if any.
capguard *util.ErlCapacityGuard // the structure returned from the elastic rate limiter, to be released when dequeued
rawmsg *network.IncomingMessage // the raw message from the network
unverifiedTxGroup []transactions.SignedTxn // the unverified ( and signed ) transaction group
rawmsgDataHash *crypto.Digest // hash (if any) of raw message data from the network
unverifiedTxGroupHash *crypto.Digest // hash (if any) of the unverifiedTxGroup
verificationErr error // The verification error generated by the verification function, if any.
capguard *util.ErlCapacityGuard // the structure returned from the elastic rate limiter, to be released when dequeued
syncCh chan network.ForwardingPolicy // channel to signal the synchronous mode and its ops completion
}

// TxHandler handles transaction messages
Expand Down Expand Up @@ -348,18 +349,30 @@
logging.Base().Warnf("Failed to release capacity to ElasticRateLimiter: %v", err)
}
}
// precompute transaction IDs
for i := range wi.unverifiedTxGroup {
wi.unverifiedTxGroup[i].CacheID()
}
if handler.checkAlreadyCommitted(wi) {
transactionMessagesAlreadyCommitted.Inc(nil)
if wi.capguard != nil {
wi.capguard.Served()
}
// if in synchronous mode, signal the completion of the operation
if wi.syncCh != nil {
wi.syncCh <- network.Ignore

Check warning on line 363 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L363

Added line #L363 was not covered by tests
}
continue
}
// handler.streamVerifierChan does not receive if ctx is cancelled
select {
case handler.streamVerifierChan <- &verify.UnverifiedTxnSigJob{TxnGroup: wi.unverifiedTxGroup, BacklogMessage: wi}:
case <-handler.ctx.Done():
transactionMessagesDroppedFromBacklog.Inc(nil)
// if in synchronous mode, signal the completion of the operation
if wi.syncCh != nil {
wi.syncCh <- network.Ignore

Check warning on line 374 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L374

Added line #L374 was not covered by tests
}
return
}
if wi.capguard != nil {
Expand All @@ -373,7 +386,6 @@
m := wi.BacklogMessage.(*txBacklogMsg)
m.verificationErr = wi.Err
handler.postProcessCheckedTxn(m)

case <-handler.ctx.Done():
return
}
Expand Down Expand Up @@ -514,6 +526,11 @@
if wi.verificationErr != nil {
// disconnect from peer.
handler.postProcessReportErrors(wi.verificationErr)
// if in synchronous mode, signal the completion of the operation
if wi.syncCh != nil {
wi.syncCh <- network.Disconnect
return

Check warning on line 532 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L531-L532

Added lines #L531 - L532 were not covered by tests
}
logging.Base().Warnf("Received a malformed tx group %v: %v", wi.unverifiedTxGroup, wi.verificationErr)
handler.net.Disconnect(wi.rawmsg.Sender)
return
Expand All @@ -530,6 +547,10 @@
if err != nil {
handler.rememberReportErrors(err)
logging.Base().Debugf("could not remember tx: %v", err)
// if in synchronous mode, signal the completion of the operation
if wi.syncCh != nil {
wi.syncCh <- network.Ignore

Check warning on line 552 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L552

Added line #L552 was not covered by tests
}
return
}

Expand All @@ -540,6 +561,11 @@
if err != nil {
logging.Base().Infof("unable to pin transaction: %v", err)
}
// if in synchronous mode, signal the completion of the operation
if wi.syncCh != nil {
wi.syncCh <- network.Accept
return

Check warning on line 567 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L566-L567

Added lines #L566 - L567 were not covered by tests
}

// We reencode here instead of using rawmsg.Data to avoid broadcasting non-canonical encodings
handler.net.Relay(handler.ctx, protocol.TxnTag, reencode(verifiedTxGroup), false, wi.rawmsg.Sender)
Expand Down Expand Up @@ -800,11 +826,6 @@

// validateIncomingTxMessage is the validator for the MessageProcessor implementation used by P2PNetwork.
func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) network.OutgoingMessage {
msgKey, isDup := handler.incomingMsgDupCheck(rawmsg.Data)
if isDup {
return network.OutgoingMessage{Action: network.Ignore}
}

unverifiedTxGroup, consumed, invalid := decodeMsg(rawmsg.Data)
if invalid {
// invalid encoding or exceeding txgroup, disconnect from this peer
Expand Down Expand Up @@ -834,52 +855,33 @@
wi := &txBacklogMsg{
rawmsg: &rawmsg,
unverifiedTxGroup: unverifiedTxGroup,
rawmsgDataHash: msgKey,
unverifiedTxGroupHash: canonicalKey,
capguard: nil,
syncCh: make(chan network.ForwardingPolicy, 1),

Check warning on line 860 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L860

Added line #L860 was not covered by tests
}

if handler.checkAlreadyCommitted(wi) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are cutting down on duplicated logic between what the backlogWorker is doing vs incoming message validation.

transactionMessagesAlreadyCommitted.Inc(nil)
return network.OutgoingMessage{
Action: network.Ignore,
}
}

err := handler.batchVerifier.Verify(wi.unverifiedTxGroup)
if err != nil {
handler.postProcessReportErrors(err)
logging.Base().Warnf("Received a malformed tx group %v: %v", wi.unverifiedTxGroup, err)
return network.OutgoingMessage{
Action: network.Disconnect,
}
}
verifiedTxGroup := wi.unverifiedTxGroup

// save the transaction, if it has high enough fee and not already in the cache
err = handler.txPool.Remember(verifiedTxGroup)
if err != nil {
handler.rememberReportErrors(err)
logging.Base().Debugf("could not remember tx: %v", err)
return network.OutgoingMessage{
Action: network.Ignore,
}
}
var action network.ForwardingPolicy
select {
case handler.backlogQueue <- wi:
action = <-wi.syncCh
default:

Check warning on line 867 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L863-L867

Added lines #L863 - L867 were not covered by tests
// if we failed here we want to increase the corresponding metric. It might suggest that we
// want to increase the queue size.
transactionMessagesDroppedFromBacklog.Inc(nil)

Check warning on line 870 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L870

Added line #L870 was not covered by tests

transactionMessagesRemember.Inc(nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relying on backlog worker for this?

// additionally, remove the txn from duplicate caches to ensure it can be re-submitted
handler.deleteFromCaches(nil, canonicalKey)

Check warning on line 873 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L873

Added line #L873 was not covered by tests

// if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions.
err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup)
if err != nil {
logging.Base().Infof("unable to pin transaction: %v", err)
// queue is full, do not if the message valid or not so ignore
action = network.Ignore

Check warning on line 876 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L876

Added line #L876 was not covered by tests
}

if hybridNet, ok := handler.net.(HybridRelayer); ok {
_ = hybridNet.BridgeP2PToWS(handler.ctx, protocol.TxnTag, reencoded, false, wi.rawmsg.Sender)
}

return network.OutgoingMessage{
Action: network.Accept,
Action: action,

Check warning on line 884 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L884

Added line #L884 was not covered by tests
}
}

Expand Down
4 changes: 4 additions & 0 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2774,6 +2774,8 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) {

handler, err := makeTestTxHandler(ledger, cfg)
require.NoError(t, err)
handler.Start()
defer handler.Stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handler variable gets set to a new test handler on line 2810 - which handler will this refer to when stop() is called (I would think latest referenced object).


// valid message
_, blob := makeTxns(addresses, secrets, 1, 2, genesisHash)
Expand Down Expand Up @@ -2807,6 +2809,8 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) {
require.True(t, cfg.TxFilterCanonicalEnabled())
handler, err := makeTestTxHandler(ledger, cfg)
require.NoError(t, err)
handler.Start()
defer handler.Stop()

// valid message
_, blob := makeTxns(addresses, secrets, 1, 2, genesisHash)
Expand Down
41 changes: 25 additions & 16 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,24 +923,33 @@
}
n.log.Debugf("Subscribed to topic %s", p2p.TXTopicName)

for {
// msg from sub.Next not used since all work done by txTopicValidator
_, err := sub.Next(n.ctx)
if err != nil {
if err != pubsub.ErrSubscriptionCancelled && err != context.Canceled {
n.log.Errorf("Error reading from subscription %v, peerId %s", err, n.service.ID())
const threads = incomingThreads / 2 // perf tests showed that 10 (half of incomingThreads) was optimal in terms of TPS (attempted 1, 5, 10, 20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was on our default specced hardware?

var wg sync.WaitGroup
wg.Add(threads)
for i := 0; i < threads; i++ {
go func(ctx context.Context, sub p2p.SubNextCancellable, wantTXGossip *atomic.Bool, peerID peer.ID, log logging.Logger) {
defer wg.Done()
for {
// msg from sub.Next not used since all work done by txTopicValidator
_, err := sub.Next(ctx)
if err != nil {
if err != pubsub.ErrSubscriptionCancelled && err != context.Canceled {
log.Errorf("Error reading from subscription %v, peerId %s", err, peerID)
}
log.Debugf("Cancelling subscription to topic %s due Subscription.Next error: %v", p2p.TXTopicName, err)
sub.Cancel()
return
}
// participation or configuration change, cancel subscription and quit
if !wantTXGossip.Load() {
log.Debugf("Cancelling subscription to topic %s due participation change", p2p.TXTopicName)
sub.Cancel()
return

Check warning on line 947 in network/p2pNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/p2pNetwork.go#L945-L947

Added lines #L945 - L947 were not covered by tests
}
}
n.log.Debugf("Cancelling subscription to topic %s due Subscription.Next error: %v", p2p.TXTopicName, err)
sub.Cancel()
return
}
// participation or configuration change, cancel subscription and quit
if !n.wantTXGossip.Load() {
n.log.Debugf("Cancelling subscription to topic %s due participation change", p2p.TXTopicName)
sub.Cancel()
return
}
}(n.ctx, sub, &n.wantTXGossip, n.service.ID(), n.log)
}
wg.Wait()
}

type gsPeer struct {
Expand Down
Loading