Skip to content

Commit

Permalink
add more Pool tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Intizar-T committed Jul 9, 2024
1 parent aa1e51d commit 6e7f64c
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 10 deletions.
4 changes: 2 additions & 2 deletions node/pkg/fetcher/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"bisonai.com/orakl/node/pkg/common/keys"
"bisonai.com/orakl/node/pkg/db"
pool "bisonai.com/orakl/node/pkg/utils/pool"
"bisonai.com/orakl/node/pkg/utils/pool"
"github.com/rs/zerolog/log"
)

Expand All @@ -33,7 +33,7 @@ func (a *Accumulator) Run(ctx context.Context) {
for {
select {
case <-ticker.C:
p.AddJob(func() {
p.AddJob(ctx, func() {
a.accumulatorJob(accumulatorCtx)
})
case <-ctx.Done():
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/rs/zerolog/log"

errorSentinel "bisonai.com/orakl/node/pkg/error"
pool "bisonai.com/orakl/node/pkg/utils/pool"
"bisonai.com/orakl/node/pkg/utils/pool"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
)
Expand Down Expand Up @@ -339,7 +339,7 @@ func (r *Raft) becomeLeader(ctx context.Context) {
}

case <-r.LeaderJobTicker.C:
p.AddJob(func() {
p.AddJob(ctx, func() {
defer func() {
if r := recover(); r != nil {
log.Error().Msgf("recovered from panic in leader job: %v", r)
Expand Down
9 changes: 7 additions & 2 deletions node/pkg/utils/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (p *Pool) worker(ctx context.Context) {
}
}

func (p *Pool) AddJob(job func()) {
p.jobChannel <- job
func (p *Pool) AddJob(ctx context.Context, job func()) {
select {
case p.jobChannel <- job:
return
case <-ctx.Done():
return
}
}
138 changes: 134 additions & 4 deletions node/pkg/utils/tests/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package tests

import (
"context"
"sync"
"testing"
"time"

pool "bisonai.com/orakl/node/pkg/utils/pool"
"bisonai.com/orakl/node/pkg/utils/pool"
)

const POOL_WORKER_COUNT = 3
Expand All @@ -17,7 +18,106 @@ func TestNewPool(t *testing.T) {
}
}

func TestJobExecution(t *testing.T) {
func TestImmediateJobExecution(t *testing.T) {
p := pool.NewPool(POOL_WORKER_COUNT)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p.Run(ctx)

done := make(chan bool)
p.AddJob(ctx, func() {
done <- true
})

select {
case <-done:
return
case <-time.After(100 * time.Millisecond):
t.Error("Job was not executed immediately")
}
}

func TestLargeNumberOfJobs(t *testing.T) {
p := pool.NewPool(POOL_WORKER_COUNT)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p.Run(ctx)

jobCount := 1000
var wg sync.WaitGroup
wg.Add(jobCount)

for i := 0; i < jobCount; i++ {
p.AddJob(ctx, func() {
wg.Done()
})
}

// Wait for all jobs to be done
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

select {
case <-done:
return
case <-time.After(5 * time.Second):
t.Error("Not all jobs were processed in time")
}
}

func TestContextCancelDuringJobExecution(t *testing.T) {
p := pool.NewPool(POOL_WORKER_COUNT)
ctx, cancel := context.WithCancel(context.Background())
p.Run(ctx)

done := make(chan bool)
p.AddJob(ctx, func() {
time.Sleep(200 * time.Millisecond)
done <- true
})

cancel()

select {
case <-done:
return
case <-time.After(300 * time.Millisecond):
t.Error("Job did not complete as expected after context cancel")
}
}

func TestAddJobToClosedPool(t *testing.T) {
p := pool.NewPool(POOL_WORKER_COUNT)
ctx, cancel := context.WithCancel(context.Background())
p.Run(ctx)

cancel()

defer func() {
if r := recover(); r != nil {
t.Errorf("Panic occurred while adding job to closed pool: %v", r)
}
}()

done := make(chan bool)
go func() {
p.AddJob(ctx, func() {
done <- true
})
}()

select {
case <-done:
t.Error("Job should not be processed after context is done")
case <-time.After(100 * time.Millisecond):
return
}
}

func TestConcurrentJobExecution(t *testing.T) {
p := pool.NewPool(POOL_WORKER_COUNT)
ctx, cancel := context.WithCancel(context.Background())
p.Run(ctx)
Expand All @@ -27,15 +127,45 @@ func TestJobExecution(t *testing.T) {
var confirm_slice []int
for i := 0; i < 10; i++ {
confirm_slice = append(confirm_slice, i)
p.AddJob(func() {
p.AddJob(ctx, func() {
slice = append(slice, i)
})
time.Sleep(100 * time.Millisecond)
}

time.Sleep(time.Second)

for i := 0; i < 10; i++ {
if slice[i] != confirm_slice[i] {
t.Errorf("Job execution failed")
}
}
}

func TestWorkerCount(t *testing.T) {
p := pool.NewPool(POOL_WORKER_COUNT)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

p.Run(ctx)

workerCount := 0
var mu sync.Mutex

done := make(chan bool)
for i := 0; i < POOL_WORKER_COUNT; i++ {
p.AddJob(ctx, func() {
mu.Lock()
workerCount++
mu.Unlock()
done <- true
})
}

for i := 0; i < POOL_WORKER_COUNT; i++ {
<-done
}

if workerCount != POOL_WORKER_COUNT {
t.Errorf("Expected %d workers to be running, but got %d", POOL_WORKER_COUNT, workerCount)
}
}

0 comments on commit 6e7f64c

Please sign in to comment.