diff --git a/node/pkg/fetcher/accumulator.go b/node/pkg/fetcher/accumulator.go index 0f03c57be..bf4485ffa 100644 --- a/node/pkg/fetcher/accumulator.go +++ b/node/pkg/fetcher/accumulator.go @@ -10,6 +10,8 @@ import ( "github.com/rs/zerolog/log" ) +const POOL_WORKER_COUNT = 3 + func NewAccumulator(interval time.Duration) *Accumulator { return &Accumulator{ Interval: interval, @@ -22,7 +24,7 @@ func (a *Accumulator) Run(ctx context.Context) { a.cancel = cancel a.isRunning = true - p := pool.NewPool() + p := pool.NewPool(POOL_WORKER_COUNT) p.Run(accumulatorCtx) ticker := time.NewTicker(a.Interval) diff --git a/node/pkg/raft/raft.go b/node/pkg/raft/raft.go index 728291778..2419cf48b 100644 --- a/node/pkg/raft/raft.go +++ b/node/pkg/raft/raft.go @@ -16,6 +16,7 @@ import ( ) const HEARTBEAT_TIMEOUT = 100 * time.Millisecond +const POOL_WORKER_COUNT = 3 func NewRaftNode( h host.Host, @@ -318,7 +319,7 @@ func (r *Raft) becomeLeader(ctx context.Context) { r.HeartbeatTicker = time.NewTicker(r.HeartbeatTimeout) r.LeaderJobTicker = time.NewTicker(r.LeaderJobTimeout) - p := pool.NewPool() + p := pool.NewPool(POOL_WORKER_COUNT) p.Run(ctx) go func() { diff --git a/node/pkg/utils/pool/pool.go b/node/pkg/utils/pool/pool.go index 936d71286..c2f30a3ab 100644 --- a/node/pkg/utils/pool/pool.go +++ b/node/pkg/utils/pool/pool.go @@ -5,19 +5,19 @@ import ( ) type Pool struct { - jobChannel chan func() + jobChannel chan func() + workerCount int } -const POOL_WORKER_COUNT = 3 - -func NewPool() *Pool { +func NewPool(workerCount int) *Pool { return &Pool{ - jobChannel: make(chan func()), + jobChannel: make(chan func()), + workerCount: workerCount, } } func (p *Pool) Run(ctx context.Context) { - for i := 0; i < POOL_WORKER_COUNT; i++ { + for i := 0; i < p.workerCount; i++ { go p.worker(ctx) } } diff --git a/node/pkg/utils/tests/pool_test.go b/node/pkg/utils/tests/pool_test.go index 3ed37a81f..a995b6bd6 100644 --- a/node/pkg/utils/tests/pool_test.go +++ b/node/pkg/utils/tests/pool_test.go @@ -8,15 +8,17 @@ import ( pool "bisonai.com/orakl/node/pkg/utils/pool" ) +const POOL_WORKER_COUNT = 3 + func TestNewPool(t *testing.T) { - p := pool.NewPool() + p := pool.NewPool(POOL_WORKER_COUNT) if p == nil { t.Errorf("NewPool() returned nil") } } func TestJobExecution(t *testing.T) { - p := pool.NewPool() + p := pool.NewPool(POOL_WORKER_COUNT) ctx, cancel := context.WithCancel(context.Background()) p.Run(ctx) defer cancel()