From a05f5dd74ae1620f972be8eb82b001693ecbd30e Mon Sep 17 00:00:00 2001 From: Warren He Date: Thu, 18 Aug 2022 17:50:16 -0700 Subject: [PATCH] go/runtime/txpool: new metrics for multiple queue counts --- docs/oasis-node/metrics.md | 4 +++- go/runtime/txpool/local_queue.go | 6 ++++++ go/runtime/txpool/local_queue_test.go | 1 + go/runtime/txpool/metrics.go | 22 +++++++++++++++++++--- go/runtime/txpool/rim_queue.go | 6 ++++++ go/runtime/txpool/rim_queue_test.go | 1 + go/runtime/txpool/txpool.go | 11 +++++++---- 7 files changed, 43 insertions(+), 8 deletions(-) diff --git a/docs/oasis-node/metrics.md b/docs/oasis-node/metrics.md index 74242915ca7..1edc7cd74a5 100644 --- a/docs/oasis-node/metrics.md +++ b/docs/oasis-node/metrics.md @@ -78,9 +78,11 @@ oasis_storage_latency | Summary | Storage call latency (seconds). | call | [stor oasis_storage_successes | Counter | Number of storage successes. | call | [storage/api](https://github.com/oasisprotocol/oasis-core/tree/master/go/storage/api/metrics.go) oasis_storage_value_size | Summary | Storage call value size (bytes). | call | [storage/api](https://github.com/oasisprotocol/oasis-core/tree/master/go/storage/api/metrics.go) oasis_txpool_accepted_transactions | Counter | Number of accepted transactions (passing check tx). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go) +oasis_txpool_local_queue_size | Gauge | Size of the local transactions schedulable queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go) oasis_txpool_pending_check_size | Gauge | Size of the pending to be checked queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go) -oasis_txpool_pending_schedule_size | Gauge | Size of the pending to be scheduled queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go) +oasis_txpool_pending_schedule_size | Gauge | Size of the main schedulable queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go) oasis_txpool_rejected_transactions | Counter | Number of rejected transactions (failing check tx). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go) +oasis_txpool_rim_queue_size | Gauge | Size of the roothash incoming message transactions schedulable queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go) oasis_up | Gauge | Is oasis-test-runner active for specific scenario. | | [oasis-node/cmd/common/metrics](https://github.com/oasisprotocol/oasis-core/tree/master/go/oasis-node/cmd/common/metrics/metrics.go) oasis_worker_aborted_batch_count | Counter | Number of aborted batches. | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/node.go) oasis_worker_batch_processing_time | Summary | Time it takes for a batch to finalize (seconds). | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/node.go) diff --git a/go/runtime/txpool/local_queue.go b/go/runtime/txpool/local_queue.go index 20e93825e63..8f8ef2add46 100644 --- a/go/runtime/txpool/local_queue.go +++ b/go/runtime/txpool/local_queue.go @@ -91,3 +91,9 @@ func (lq *localQueue) GetTxsToPublish() []*TxQueueMeta { defer lq.l.Unlock() return append([]*TxQueueMeta(nil), lq.txs...) } + +func (lq *localQueue) size() int { + lq.l.Lock() + defer lq.l.Unlock() + return len(lq.txs) +} diff --git a/go/runtime/txpool/local_queue_test.go b/go/runtime/txpool/local_queue_test.go index 1c22ff05a36..9ad13ef0dab 100644 --- a/go/runtime/txpool/local_queue_test.go +++ b/go/runtime/txpool/local_queue_test.go @@ -21,6 +21,7 @@ func TestLocalQueueBasic(t *testing.T) { rawB := []byte("b") txB := &TxQueueMeta{Raw: rawB, Hash: hash.NewFromBytes(rawB)} require.NoError(t, lq.OfferChecked(txB, &protocol.CheckTxMetadata{Priority: 5}), "offer checked a") + require.Equal(t, 2, lq.size()) // We should preserve the order. Publish in original order. require.EqualValues(t, []*TxQueueMeta{txA, txB}, lq.GetTxsToPublish(), "get txs to publish") diff --git a/go/runtime/txpool/metrics.go b/go/runtime/txpool/metrics.go index e9e526f81a7..b267909b457 100644 --- a/go/runtime/txpool/metrics.go +++ b/go/runtime/txpool/metrics.go @@ -14,10 +14,24 @@ var ( }, []string{"runtime"}, ) - pendingScheduleSize = prometheus.NewGaugeVec( + mainQueueSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "oasis_txpool_pending_schedule_size", - Help: "Size of the pending to be scheduled queue (number of entries).", + Help: "Size of the main schedulable queue (number of entries).", + }, + []string{"runtime"}, + ) + localQueueSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "oasis_txpool_local_queue_size", + Help: "Size of the local transactions schedulable queue (number of entries).", + }, + []string{"runtime"}, + ) + rimQueueSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "oasis_txpool_rim_queue_size", + Help: "Size of the roothash incoming message transactions schedulable queue (number of entries).", }, []string{"runtime"}, ) @@ -37,7 +51,9 @@ var ( ) txpoolCollectors = []prometheus.Collector{ pendingCheckSize, - pendingScheduleSize, + mainQueueSize, + localQueueSize, + rimQueueSize, rejectedTransactions, acceptedTransactions, } diff --git a/go/runtime/txpool/rim_queue.go b/go/runtime/txpool/rim_queue.go index d1d9806416d..57b1fee7768 100644 --- a/go/runtime/txpool/rim_queue.go +++ b/go/runtime/txpool/rim_queue.go @@ -50,3 +50,9 @@ func (rq *rimQueue) Load(inMsgs []*message.IncomingMessage) { defer rq.l.Unlock() rq.txs = newTxs } + +func (rq *rimQueue) size() int { + rq.l.Lock() + defer rq.l.Unlock() + return len(rq.txs) +} diff --git a/go/runtime/txpool/rim_queue_test.go b/go/runtime/txpool/rim_queue_test.go index d8cd2f22497..61eef6ebfd4 100644 --- a/go/runtime/txpool/rim_queue_test.go +++ b/go/runtime/txpool/rim_queue_test.go @@ -21,6 +21,7 @@ func TestRimQueue(t *testing.T) { }, }) require.EqualValues(t, map[hash.Hash]*TxQueueMeta{txA.Hash: txA}, rq.txs, "after load") + require.Equal(t, 1, rq.size()) require.Nil(t, rq.GetSchedulingSuggestion(50), "get scheduling suggestion") rq.HandleTxsUsed([]hash.Hash{txA.Hash}) diff --git a/go/runtime/txpool/txpool.go b/go/runtime/txpool/txpool.go index 82b3d0690f6..89fc895debc 100644 --- a/go/runtime/txpool/txpool.go +++ b/go/runtime/txpool/txpool.go @@ -364,8 +364,8 @@ func (t *txPool) HandleTxsUsed(hashes []hash.Hash) { q.HandleTxsUsed(hashes) } - // todo: metrics - // pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.schedulerQueue.size())) + mainQueueSize.With(t.getMetricLabels()).Set(float64(t.mainQueue.inner.size())) + localQueueSize.With(t.getMetricLabels()).Set(float64(t.localQueue.size())) } func (t *txPool) GetKnownBatch(batch []hash.Hash) ([]*TxQueueMeta, map[hash.Hash]int) { @@ -432,6 +432,7 @@ func (t *txPool) ProcessBlock(bi *BlockInfo) error { func (t *txPool) ProcessIncomingMessages(inMsgs []*message.IncomingMessage) error { t.rimQueue.Load(inMsgs) + rimQueueSize.With(t.getMetricLabels()).Set(float64(t.rimQueue.size())) return nil } @@ -644,8 +645,8 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { t.schedulerNotifier.Broadcast(false) } - // todo: metrics - // pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.PendingScheduleSize())) + mainQueueSize.With(t.getMetricLabels()).Set(float64(t.mainQueue.inner.size())) + localQueueSize.With(t.getMetricLabels()).Set(float64(t.localQueue.size())) } func (t *txPool) ensureInitialized() error { @@ -847,6 +848,8 @@ func (t *txPool) recheck() { results = append(results, notifyCh) } } + mainQueueSize.With(t.getMetricLabels()).Set(float64(t.mainQueue.inner.size())) + localQueueSize.With(t.getMetricLabels()).Set(float64(t.localQueue.size())) if len(pcts) == 0 { return