Skip to content

Commit

Permalink
implement internal ctx in Pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Intizar-T committed Jul 10, 2024
1 parent 5f4fabb commit 48237ed
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 13 deletions.
6 changes: 5 additions & 1 deletion node/pkg/fetcher/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ func (a *Accumulator) Run(ctx context.Context) {
for {
select {
case <-ticker.C:
p.AddJob(ctx, func() {
p.AddJob(func() {
a.accumulatorJob(accumulatorCtx)
})
case <-ctx.Done():
if p.IsRunning {
p.Cancel()
p.IsRunning = false
}
log.Debug().Str("Player", "Fetcher").Msg("fetcher local aggregates channel goroutine stopped")
return
}
Expand Down
6 changes: 5 additions & 1 deletion node/pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ func (r *Raft) becomeLeader(ctx context.Context) {
log.Debug().Msg("resigning as leader")
r.HeartbeatTicker.Stop()
r.LeaderJobTicker.Stop()
if p.IsRunning {
p.Cancel()
p.IsRunning = false
}

return

Expand All @@ -341,7 +345,7 @@ func (r *Raft) becomeLeader(ctx context.Context) {
}

case <-r.LeaderJobTicker.C:
p.AddJob(ctx, func() {
p.AddJob(func() {
defer func() {
if r := recover(); r != nil {
log.Error().Msgf("recovered from panic in leader job: %v", r)
Expand Down
18 changes: 13 additions & 5 deletions node/pkg/utils/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
type Pool struct {
jobChannel chan func()
workerCount int
ctx context.Context
Cancel context.CancelFunc
IsRunning bool
}

func NewPool(workerCount int) *Pool {
Expand All @@ -17,27 +20,32 @@ func NewPool(workerCount int) *Pool {
}

func (p *Pool) Run(ctx context.Context) {
poolCtx, cancel := context.WithCancel(ctx)
p.ctx = poolCtx
p.Cancel = cancel
p.IsRunning = true

for i := 0; i < p.workerCount; i++ {
go p.worker(ctx)
go p.worker()
}
}

func (p *Pool) worker(ctx context.Context) {
func (p *Pool) worker() {
for {
select {
case job := <-p.jobChannel:
job()
case <-ctx.Done():
case <-p.ctx.Done():
return
}
}
}

func (p *Pool) AddJob(ctx context.Context, job func()) {
func (p *Pool) AddJob(job func()) {
select {
case p.jobChannel <- job:
return
case <-ctx.Done():
case <-p.ctx.Done():
return
}
}
12 changes: 6 additions & 6 deletions node/pkg/utils/tests/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestImmediateJobExecution(t *testing.T) {
p.Run(ctx)

done := make(chan bool)
p.AddJob(ctx, func() {
p.AddJob(func() {
done <- true
})

Expand All @@ -48,7 +48,7 @@ func TestLargeNumberOfJobs(t *testing.T) {
wg.Add(jobCount)

for i := 0; i < jobCount; i++ {
p.AddJob(ctx, func() {
p.AddJob(func() {
wg.Done()
})
}
Expand All @@ -74,7 +74,7 @@ func TestContextCancelDuringJobExecution(t *testing.T) {
p.Run(ctx)

done := make(chan bool)
p.AddJob(ctx, func() {
p.AddJob(func() {
time.Sleep(200 * time.Millisecond)
done <- true
})
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestAddJobToClosedPool(t *testing.T) {

done := make(chan bool)
go func() {
p.AddJob(ctx, func() {
p.AddJob(func() {
done <- true
})
}()
Expand All @@ -129,7 +129,7 @@ func TestConcurrentJobExecution(t *testing.T) {
wg.Add(jobCount)

for i := 0; i < jobCount; i++ {
p.AddJob(ctx, func() {
p.AddJob(func() {
channel <- i
wg.Done()
})
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestWorkerCount(t *testing.T) {

done := make(chan bool)
for i := 0; i < POOL_WORKER_COUNT; i++ {
p.AddJob(ctx, func() {
p.AddJob(func() {
mu.Lock()
workerCount++
mu.Unlock()
Expand Down

0 comments on commit 48237ed

Please sign in to comment.