From 37913771b9e4f68161edabed4db3ee17d2fa1766 Mon Sep 17 00:00:00 2001 From: nick Date: Fri, 2 Aug 2024 02:57:56 +0900 Subject: [PATCH 1/2] feat: optimize --- node/pkg/aggregator/aggregator.go | 244 ++++++------------------------ node/pkg/aggregator/types.go | 116 +++++++++++--- 2 files changed, 142 insertions(+), 218 deletions(-) diff --git a/node/pkg/aggregator/aggregator.go b/node/pkg/aggregator/aggregator.go index bf79c7d3b..c410041ec 100644 --- a/node/pkg/aggregator/aggregator.go +++ b/node/pkg/aggregator/aggregator.go @@ -1,7 +1,6 @@ package aggregator import ( - "bytes" "context" "encoding/json" "sync" @@ -31,18 +30,15 @@ func NewAggregator(h host.Host, ps *pubsub.PubSub, topicString string, config Co aggregateInterval := time.Duration(config.AggregateInterval) * time.Millisecond aggregator := Aggregator{ - Config: config, - Raft: raft.NewRaftNode(h, ps, topic, 100, aggregateInterval), - CollectedPrices: map[int32][]int64{}, - CollectedProofs: map[int32][][]byte{}, - CollectedAgreements: map[int32][]bool{}, - PreparedLocalAggregates: map[int32]int64{}, - PreparedGlobalAggregates: map[int32]GlobalAggregate{}, - SyncedTimes: map[int32]time.Time{}, - AggregatorMutex: sync.Mutex{}, - RoundID: 1, - Signer: signHelper, - LatestLocalAggregates: latestLocalAggregates, + Config: config, + Raft: raft.NewRaftNode(h, ps, topic, 100, aggregateInterval), + + roundPrices: &RoundPrices{prices: map[int32][]int64{}}, + roundProofs: &RoundProofs{proofs: map[int32][][]byte{}}, + + RoundID: 1, + Signer: signHelper, + LatestLocalAggregates: latestLocalAggregates, } aggregator.Raft.LeaderJob = aggregator.LeaderJob aggregator.Raft.HandleCustomMessage = aggregator.HandleCustomMessage @@ -64,15 +60,11 @@ func (n *Aggregator) Run(ctx context.Context) { func (n *Aggregator) LeaderJob() error { n.RoundID++ n.Raft.IncreaseTerm() - return n.PublishSyncMessage(n.RoundID, time.Now()) + return n.PublishTriggerMessage(n.RoundID, time.Now()) } func (n *Aggregator) HandleCustomMessage(ctx context.Context, message raft.Message) error { switch message.Type { - case RoundSync: - return n.HandleRoundSyncMessage(ctx, message) - case SyncReply: - return n.HandleSyncReplyMessage(ctx, message) case Trigger: return n.HandleTriggerMessage(ctx, message) case PriceData: @@ -84,30 +76,28 @@ func (n *Aggregator) HandleCustomMessage(ctx context.Context, message raft.Messa } } -func (n *Aggregator) HandleRoundSyncMessage(ctx context.Context, msg raft.Message) error { - var roundSyncMessage RoundSyncMessage - err := json.Unmarshal(msg.Data, &roundSyncMessage) +func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message) error { + var triggerMessage TriggerMessage + err := json.Unmarshal(msg.Data, &triggerMessage) if err != nil { - log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to unmarshal round sync message") + log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to unmarshal trigger message") return err } - if msg.SentFrom != n.Raft.GetLeader() { - log.Warn().Str("Player", "Aggregator").Msg("round sync message sent from non-leader") - return errorSentinel.ErrAggregatorNonLeaderRaftMessage - } - - if roundSyncMessage.LeaderID == "" || roundSyncMessage.RoundID == 0 { - log.Error().Str("Player", "Aggregator").Msg("invalid round sync message") + if triggerMessage.RoundID == 0 { + log.Error().Str("Player", "Aggregator").Msg("invalid trigger message") return errorSentinel.ErrAggregatorInvalidRaftMessage } - if n.Raft.GetRole() != raft.Leader { - n.RoundID = roundSyncMessage.RoundID + if msg.SentFrom != n.Raft.GetLeader() { + log.Warn().Str("Player", "Aggregator").Msg("trigger message sent from non-leader") + return errorSentinel.ErrAggregatorNonLeaderRaftMessage } + defer n.cleanUpRoundData(triggerMessage.RoundID - 10) + var value int64 - localAggregate, ok := n.LatestLocalAggregates.Load(n.ID) + localAggregateRaw, ok := n.LatestLocalAggregates.Load(n.ID) if !ok { log.Error().Str("Player", "Aggregator").Msg("failed to get latest local aggregate") // set value to -1 rather than returning error @@ -115,85 +105,11 @@ func (n *Aggregator) HandleRoundSyncMessage(ctx context.Context, msg raft.Messag // if not enough messages collected from HandleSyncReplyMessage, it will hang in certain round value = -1 } else { + localAggregate := localAggregateRaw.(types.LocalAggregate) value = localAggregate.Value } - n.AggregatorMutex.Lock() - defer n.AggregatorMutex.Unlock() - // run cleanup to prevent memory leak - // removes data 10 rounds ago, approximately 4 seconds old data - n.cleanUpRoundData(roundSyncMessage.RoundID - 10) - - n.PreparedLocalAggregates[roundSyncMessage.RoundID] = value - n.SyncedTimes[roundSyncMessage.RoundID] = roundSyncMessage.Timestamp - return n.PublishSyncReplyMessage(roundSyncMessage.RoundID, true) -} - -func (n *Aggregator) HandleSyncReplyMessage(ctx context.Context, msg raft.Message) error { - if n.Raft.GetRole() != raft.Leader { - log.Debug().Str("Player", "Aggregator").Msg("received sync reply message while not leader") - return nil - } - - var syncReplyMessage SyncReplyMessage - err := json.Unmarshal(msg.Data, &syncReplyMessage) - if err != nil { - log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to unmarshal sync reply message") - return err - } - - if syncReplyMessage.RoundID == 0 { - log.Error().Str("Player", "Aggregator").Msg("invalid sync reply message") - return errorSentinel.ErrAggregatorInvalidRaftMessage - } - - n.AggregatorMutex.Lock() - defer n.AggregatorMutex.Unlock() - - if _, ok := n.CollectedAgreements[syncReplyMessage.RoundID]; !ok { - n.CollectedAgreements[syncReplyMessage.RoundID] = []bool{} - } - - n.CollectedAgreements[syncReplyMessage.RoundID] = append(n.CollectedAgreements[syncReplyMessage.RoundID], syncReplyMessage.Agreed) - if len(n.CollectedAgreements[syncReplyMessage.RoundID]) >= n.Raft.SubscribersCount()+1 { - defer delete(n.CollectedAgreements, syncReplyMessage.RoundID) - agreeCount := 0 - for _, agreed := range n.CollectedAgreements[syncReplyMessage.RoundID] { - if agreed { - agreeCount++ - } - } - requiredAgreements := int(float64(n.Raft.SubscribersCount()) * AGREEMENT_QUORUM) - if agreeCount >= requiredAgreements { - return n.PublishTriggerMessage(syncReplyMessage.RoundID) - } else { - log.Warn().Str("Player", "Aggregator").Int("agreeCount", agreeCount).Int("requiredAgreements", requiredAgreements).Msg("not enough agreements, resigning as leader") - n.Raft.ResignLeader() - return nil - } - } - return nil -} - -func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message) error { - var triggerMessage TriggerMessage - err := json.Unmarshal(msg.Data, &triggerMessage) - if err != nil { - log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to unmarshal trigger message") - return err - } - - if triggerMessage.RoundID == 0 { - log.Error().Str("Player", "Aggregator").Msg("invalid trigger message") - return errorSentinel.ErrAggregatorInvalidRaftMessage - } - - if msg.SentFrom != n.Raft.GetLeader() { - log.Warn().Str("Player", "Aggregator").Msg("trigger message sent from non-leader") - return errorSentinel.ErrAggregatorNonLeaderRaftMessage - } - defer delete(n.PreparedLocalAggregates, triggerMessage.RoundID) - return n.PublishPriceDataMessage(triggerMessage.RoundID, n.PreparedLocalAggregates[triggerMessage.RoundID]) + return n.PublishPriceDataMessage(triggerMessage.RoundID, value, triggerMessage.Timestamp) } func (n *Aggregator) HandlePriceDataMessage(ctx context.Context, msg raft.Message) error { @@ -209,19 +125,13 @@ func (n *Aggregator) HandlePriceDataMessage(ctx context.Context, msg raft.Messag return errorSentinel.ErrAggregatorInvalidRaftMessage } - n.AggregatorMutex.Lock() - defer n.AggregatorMutex.Unlock() - if _, ok := n.CollectedPrices[priceDataMessage.RoundID]; !ok { - n.CollectedPrices[priceDataMessage.RoundID] = []int64{} - } - - n.CollectedPrices[priceDataMessage.RoundID] = append(n.CollectedPrices[priceDataMessage.RoundID], priceDataMessage.PriceData) - if len(n.CollectedPrices[priceDataMessage.RoundID]) >= n.Raft.SubscribersCount()+1 { - log.Debug().Str("Player", "Aggregator").Str("Name", n.Name).Any("collected prices", n.CollectedPrices[priceDataMessage.RoundID]).Int32("roundId", priceDataMessage.RoundID).Msg("collected prices") - defer delete(n.CollectedPrices, priceDataMessage.RoundID) - defer delete(n.SyncedTimes, priceDataMessage.RoundID) - filteredCollectedPrices := FilterNegative(n.CollectedPrices[priceDataMessage.RoundID]) + n.roundPrices.push(priceDataMessage.RoundID, priceDataMessage.PriceData) + if n.roundPrices.len(priceDataMessage.RoundID) >= n.Raft.SubscribersCount()+1 { + prices := n.roundPrices.snapshot(priceDataMessage.RoundID) + log.Debug().Str("Player", "Aggregator").Str("Name", n.Name).Any("collected prices", prices).Int32("roundId", priceDataMessage.RoundID).Msg("collected prices") + defer n.roundPrices.delete(priceDataMessage.RoundID) + filteredCollectedPrices := FilterNegative(prices) if len(filteredCollectedPrices) == 0 { log.Warn().Str("Player", "Aggregator").Str("Name", n.Name).Int32("roundId", priceDataMessage.RoundID).Msg("no prices collected") return nil @@ -233,19 +143,13 @@ func (n *Aggregator) HandlePriceDataMessage(ctx context.Context, msg raft.Messag return err } log.Debug().Str("Player", "Aggregator").Str("Name", n.Name).Any("filtered collected prices", filteredCollectedPrices).Int32("roundId", priceDataMessage.RoundID).Int64("global_aggregate", median).Msg("global aggregated") - n.PreparedGlobalAggregates[priceDataMessage.RoundID] = GlobalAggregate{ - ConfigID: n.ID, - Value: median, - Round: priceDataMessage.RoundID, - Timestamp: n.SyncedTimes[priceDataMessage.RoundID], - } - proof, err := n.Signer.MakeGlobalAggregateProof(median, n.SyncedTimes[priceDataMessage.RoundID], n.Name) + proof, err := n.Signer.MakeGlobalAggregateProof(median, priceDataMessage.Timestamp, n.Name) if err != nil { log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to make global aggregate proof") return err } - return n.PublishProofMessage(priceDataMessage.RoundID, proof) + return n.PublishProofMessage(priceDataMessage.RoundID, median, proof, priceDataMessage.Timestamp) } return nil } @@ -263,22 +167,15 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e return errorSentinel.ErrAggregatorInvalidRaftMessage } - n.AggregatorMutex.Lock() - defer n.AggregatorMutex.Unlock() - if _, ok := n.CollectedProofs[proofMessage.RoundID]; !ok { - n.CollectedProofs[proofMessage.RoundID] = [][]byte{} - } - - n.CollectedProofs[proofMessage.RoundID] = append(n.CollectedProofs[proofMessage.RoundID], proofMessage.Proof) - if len(n.CollectedProofs[proofMessage.RoundID]) >= n.Raft.SubscribersCount()+1 { - defer delete(n.CollectedProofs, proofMessage.RoundID) - defer delete(n.PreparedGlobalAggregates, proofMessage.RoundID) + n.roundProofs.push(proofMessage.RoundID, proofMessage.Proof) + if n.roundProofs.len(proofMessage.RoundID) >= n.Raft.SubscribersCount()+1 { + defer n.roundProofs.delete(proofMessage.RoundID) globalAggregate := GlobalAggregate{ ConfigID: n.ID, - Value: n.PreparedGlobalAggregates[proofMessage.RoundID].Value, + Value: proofMessage.Value, Round: proofMessage.RoundID, - Timestamp: n.PreparedGlobalAggregates[proofMessage.RoundID].Timestamp} - concatProof := bytes.Join(n.CollectedProofs[proofMessage.RoundID], nil) + Timestamp: proofMessage.Timestamp} + concatProof := n.roundProofs.concat(proofMessage.RoundID) proof := Proof{ConfigID: n.ID, Round: proofMessage.RoundID, Proof: concatProof} err := PublishGlobalAggregateAndProof(ctx, globalAggregate, proof) @@ -290,55 +187,13 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e return nil } -func (n *Aggregator) PublishSyncMessage(roundId int32, timestamp time.Time) error { - roundMessage := RoundSyncMessage{ +func (n *Aggregator) PublishTriggerMessage(roundId int32, timestamp time.Time) error { + triggerMessage := TriggerMessage{ LeaderID: n.Raft.GetHostId(), RoundID: roundId, Timestamp: timestamp, } - marshalledRoundMessage, err := json.Marshal(roundMessage) - if err != nil { - log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to marshal round message") - return err - } - - message := raft.Message{ - Type: RoundSync, - SentFrom: n.Raft.GetHostId(), - Data: json.RawMessage(marshalledRoundMessage), - } - - return n.Raft.PublishMessage(message) -} - -func (n *Aggregator) PublishSyncReplyMessage(roundId int32, agreed bool) error { - syncReplyMessage := SyncReplyMessage{ - RoundID: roundId, - Agreed: agreed, - } - - marshalledSyncReplyMessage, err := json.Marshal(syncReplyMessage) - if err != nil { - log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to marshal sync reply message") - return err - } - - message := raft.Message{ - Type: SyncReply, - SentFrom: n.Raft.GetHostId(), - Data: json.RawMessage(marshalledSyncReplyMessage), - } - - return n.Raft.PublishMessage(message) -} - -func (n *Aggregator) PublishTriggerMessage(roundId int32) error { - triggerMessage := TriggerMessage{ - LeaderID: n.Raft.GetHostId(), - RoundID: roundId, - } - marshalledTriggerMessage, err := json.Marshal(triggerMessage) if err != nil { log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to marshal trigger message") @@ -354,10 +209,11 @@ func (n *Aggregator) PublishTriggerMessage(roundId int32) error { return n.Raft.PublishMessage(message) } -func (n *Aggregator) PublishPriceDataMessage(roundId int32, value int64) error { +func (n *Aggregator) PublishPriceDataMessage(roundId int32, value int64, timestamp time.Time) error { priceDataMessage := PriceDataMessage{ RoundID: roundId, PriceData: value, + Timestamp: timestamp, } marshalledPriceDataMessage, err := json.Marshal(priceDataMessage) @@ -375,10 +231,12 @@ func (n *Aggregator) PublishPriceDataMessage(roundId int32, value int64) error { return n.Raft.PublishMessage(message) } -func (n *Aggregator) PublishProofMessage(roundId int32, proof []byte) error { +func (n *Aggregator) PublishProofMessage(roundId int32, value int64, proof []byte, timestamp time.Time) error { proofMessage := ProofMessage{ - RoundID: roundId, - Proof: proof, + RoundID: roundId, + Value: value, + Proof: proof, + Timestamp: timestamp, } marshalledProofMessage, err := json.Marshal(proofMessage) @@ -397,10 +255,6 @@ func (n *Aggregator) PublishProofMessage(roundId int32, proof []byte) error { } func (n *Aggregator) cleanUpRoundData(roundId int32) { - delete(n.CollectedPrices, roundId) - delete(n.CollectedProofs, roundId) - delete(n.CollectedAgreements, roundId) - delete(n.PreparedLocalAggregates, roundId) - delete(n.PreparedGlobalAggregates, roundId) - delete(n.SyncedTimes, roundId) + n.roundPrices.delete(roundId) + n.roundProofs.delete(roundId) } diff --git a/node/pkg/aggregator/types.go b/node/pkg/aggregator/types.go index d91fded8a..649406b69 100644 --- a/node/pkg/aggregator/types.go +++ b/node/pkg/aggregator/types.go @@ -1,6 +1,7 @@ package aggregator import ( + "bytes" "context" "sync" "time" @@ -54,52 +55,121 @@ type Config struct { AggregateInterval int32 `db:"aggregate_interval"` } +type RoundPrices struct { + prices map[int32][]int64 + mu sync.RWMutex +} + +type RoundProofs struct { + proofs map[int32][][]byte + mu sync.RWMutex +} + type Aggregator struct { Config Raft *raft.Raft LatestLocalAggregates *LatestLocalAggregates - CollectedPrices map[int32][]int64 - CollectedProofs map[int32][][]byte - CollectedAgreements map[int32][]bool - PreparedLocalAggregates map[int32]int64 - PreparedGlobalAggregates map[int32]GlobalAggregate - SyncedTimes map[int32]time.Time - AggregatorMutex sync.Mutex - - RoundID int32 + roundPrices *RoundPrices + roundProofs *RoundProofs - Signer *helper.Signer + RoundID int32 + Signer *helper.Signer nodeCtx context.Context nodeCancel context.CancelFunc isRunning bool } -type RoundSyncMessage struct { +type PriceDataMessage struct { + RoundID int32 `json:"roundID"` + PriceData int64 `json:"priceData"` + Timestamp time.Time `json:"timestamp"` +} + +type ProofMessage struct { + RoundID int32 `json:"roundID"` + Value int64 `json:"value"` + Proof []byte `json:"proof"` + Timestamp time.Time `json:"timestamp"` +} + +type TriggerMessage struct { LeaderID string `json:"leaderID"` RoundID int32 `json:"roundID"` Timestamp time.Time `json:"timestamp"` } -type PriceDataMessage struct { - RoundID int32 `json:"roundID"` - PriceData int64 `json:"priceData"` +func (r *RoundPrices) push(round int32, value int64) { + r.mu.Lock() + defer r.mu.Unlock() + if prices, ok := r.prices[round]; ok { + r.prices[round] = append(prices, value) + } else { + r.prices[round] = []int64{value} + } } -type ProofMessage struct { - RoundID int32 `json:"roundID"` - Proof []byte `json:"proof"` +func (r *RoundPrices) len(round int32) int { + r.mu.RLock() + defer r.mu.RUnlock() + if prices, ok := r.prices[round]; ok { + return len(prices) + } + return 0 } -type SyncReplyMessage struct { - RoundID int32 `json:"roundID"` - Agreed bool `json:"agreed"` +func (r *RoundPrices) snapshot(round int32) []int64 { + r.mu.RLock() + defer r.mu.RUnlock() + prices, ok := r.prices[round] + if !ok { + return nil + } + result := make([]int64, len(prices)) + copy(result, prices) + return result } -type TriggerMessage struct { - LeaderID string `json:"leaderID"` - RoundID int32 `json:"roundID"` +func (r *RoundPrices) delete(round int32) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.prices, round) +} + +func (r *RoundProofs) push(round int32, proof []byte) { + r.mu.Lock() + defer r.mu.Unlock() + if proofs, ok := r.proofs[round]; ok { + r.proofs[round] = append(proofs, proof) + } else { + r.proofs[round] = [][]byte{proof} + } +} + +func (r *RoundProofs) len(round int32) int { + r.mu.RLock() + defer r.mu.RUnlock() + if proofs, ok := r.proofs[round]; ok { + return len(proofs) + } + return 0 +} + +func (r *RoundProofs) concat(round int32) []byte { + r.mu.RLock() + defer r.mu.RUnlock() + proofs, ok := r.proofs[round] + if !ok { + return nil + } + return bytes.Join(proofs, nil) +} + +func (r *RoundProofs) delete(round int32) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.proofs, round) } type LatestLocalAggregates struct { From 5532745a6cc3d6fd3f0d375a774557e400be8ca7 Mon Sep 17 00:00:00 2001 From: nick Date: Fri, 2 Aug 2024 10:48:47 +0900 Subject: [PATCH 2/2] fix: fix merge conflicts --- node/pkg/aggregator/aggregator.go | 4 +--- node/pkg/aggregator/types.go | 34 +++++++++++++++---------------- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/node/pkg/aggregator/aggregator.go b/node/pkg/aggregator/aggregator.go index c410041ec..32a430140 100644 --- a/node/pkg/aggregator/aggregator.go +++ b/node/pkg/aggregator/aggregator.go @@ -3,7 +3,6 @@ package aggregator import ( "context" "encoding/json" - "sync" "time" @@ -97,7 +96,7 @@ func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message) defer n.cleanUpRoundData(triggerMessage.RoundID - 10) var value int64 - localAggregateRaw, ok := n.LatestLocalAggregates.Load(n.ID) + localAggregate, ok := n.LatestLocalAggregates.Load(n.ID) if !ok { log.Error().Str("Player", "Aggregator").Msg("failed to get latest local aggregate") // set value to -1 rather than returning error @@ -105,7 +104,6 @@ func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message) // if not enough messages collected from HandleSyncReplyMessage, it will hang in certain round value = -1 } else { - localAggregate := localAggregateRaw.(types.LocalAggregate) value = localAggregate.Value } diff --git a/node/pkg/aggregator/types.go b/node/pkg/aggregator/types.go index 649406b69..1914bde07 100644 --- a/node/pkg/aggregator/types.go +++ b/node/pkg/aggregator/types.go @@ -17,8 +17,6 @@ import ( const ( AGREEMENT_QUORUM = 0.5 - RoundSync raft.MessageType = "roundSync" - SyncReply raft.MessageType = "syncReply" Trigger raft.MessageType = "trigger" PriceData raft.MessageType = "priceData" ProofMsg raft.MessageType = "proof" @@ -69,12 +67,12 @@ type Aggregator struct { Config Raft *raft.Raft - LatestLocalAggregates *LatestLocalAggregates - roundPrices *RoundPrices - roundProofs *RoundProofs + LatestLocalAggregates *LatestLocalAggregates + roundPrices *RoundPrices + roundProofs *RoundProofs - RoundID int32 - Signer *helper.Signer + RoundID int32 + Signer *helper.Signer nodeCtx context.Context nodeCancel context.CancelFunc @@ -100,6 +98,17 @@ type TriggerMessage struct { Timestamp time.Time `json:"timestamp"` } +type LatestLocalAggregates struct { + LocalAggregateMap map[int32]types.LocalAggregate + mu sync.RWMutex +} + +func NewLatestLocalAggregates() *LatestLocalAggregates { + return &LatestLocalAggregates{ + LocalAggregateMap: map[int32]types.LocalAggregate{}, + } +} + func (r *RoundPrices) push(round int32, value int64) { r.mu.Lock() defer r.mu.Unlock() @@ -172,17 +181,6 @@ func (r *RoundProofs) delete(round int32) { delete(r.proofs, round) } -type LatestLocalAggregates struct { - LocalAggregateMap map[int32]types.LocalAggregate - mu sync.RWMutex -} - -func NewLatestLocalAggregates() *LatestLocalAggregates { - return &LatestLocalAggregates{ - LocalAggregateMap: map[int32]types.LocalAggregate{}, - } -} - func (a *LatestLocalAggregates) Load(id int32) (types.LocalAggregate, bool) { a.mu.RLock() defer a.mu.RUnlock()