Skip to content

Commit

Permalink
make pool worker count customizable
Browse files Browse the repository at this point in the history
  • Loading branch information
Intizar-T committed Jul 9, 2024
1 parent 1ffb37e commit aa1e51d
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 10 deletions.
4 changes: 3 additions & 1 deletion node/pkg/fetcher/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/rs/zerolog/log"
)

const POOL_WORKER_COUNT = 3

func NewAccumulator(interval time.Duration) *Accumulator {
return &Accumulator{
Interval: interval,
Expand All @@ -22,7 +24,7 @@ func (a *Accumulator) Run(ctx context.Context) {
a.cancel = cancel
a.isRunning = true

p := pool.NewPool()
p := pool.NewPool(POOL_WORKER_COUNT)
p.Run(accumulatorCtx)

ticker := time.NewTicker(a.Interval)
Expand Down
3 changes: 2 additions & 1 deletion node/pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
)

const HEARTBEAT_TIMEOUT = 100 * time.Millisecond
const POOL_WORKER_COUNT = 3

func NewRaftNode(
h host.Host,
Expand Down Expand Up @@ -318,7 +319,7 @@ func (r *Raft) becomeLeader(ctx context.Context) {
r.HeartbeatTicker = time.NewTicker(r.HeartbeatTimeout)
r.LeaderJobTicker = time.NewTicker(r.LeaderJobTimeout)

p := pool.NewPool()
p := pool.NewPool(POOL_WORKER_COUNT)
p.Run(ctx)

go func() {
Expand Down
12 changes: 6 additions & 6 deletions node/pkg/utils/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ import (
)

type Pool struct {
jobChannel chan func()
jobChannel chan func()
workerCount int
}

const POOL_WORKER_COUNT = 3

func NewPool() *Pool {
func NewPool(workerCount int) *Pool {
return &Pool{
jobChannel: make(chan func()),
jobChannel: make(chan func()),
workerCount: workerCount,
}
}

func (p *Pool) Run(ctx context.Context) {
for i := 0; i < POOL_WORKER_COUNT; i++ {
for i := 0; i < p.workerCount; i++ {
go p.worker(ctx)
}
}
Expand Down
6 changes: 4 additions & 2 deletions node/pkg/utils/tests/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ import (
pool "bisonai.com/orakl/node/pkg/utils/pool"
)

const POOL_WORKER_COUNT = 3

func TestNewPool(t *testing.T) {
p := pool.NewPool()
p := pool.NewPool(POOL_WORKER_COUNT)
if p == nil {
t.Errorf("NewPool() returned nil")
}
}

func TestJobExecution(t *testing.T) {
p := pool.NewPool()
p := pool.NewPool(POOL_WORKER_COUNT)
ctx, cancel := context.WithCancel(context.Background())
p.Run(ctx)
defer cancel()
Expand Down

0 comments on commit aa1e51d

Please sign in to comment.