Skip to content

Commit

Permalink
Merge pull request #8 from getoutreach/asyncrunner
Browse files Browse the repository at this point in the history
feat: asyncrunner and race
  • Loading branch information
pavelsmejkal authored Jul 3, 2024
2 parents 6f77195 + cec8b85 commit 94597b8
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 32 deletions.
30 changes: 30 additions & 0 deletions adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.

// Description: This file contains a compatibility layer with https://github.com/getoutreach/gobox/blob/main/pkg/async/async.go
package plumber

import (
"context"
"io"
)

// AsyncRunner provides a compatibility adapter with async.Runner interface
func AsyncRunner(runner interface {
Run(ctx context.Context) error
}) RunnerCloser {
type Closer interface {
Close(ctx context.Context) error
}
return GracefulRunner(func(ctx context.Context, ready ReadyFunc) error {
go ready()
return runner.Run(ctx)
}, func(ctx context.Context) error {
switch r := runner.(type) {
case Closer:
return r.Close(ctx)
case io.Closer:
return r.Close()
}
return nil
})
}
40 changes: 40 additions & 0 deletions looper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.

// Description: This file contains looper structs
package plumber

import "context"

// BaseLooper is a looper struct that can be used in composition as following example:
//
// type Looper struct {
// *plumber.BaseLooper
// }
//
// s := &Looper{}
// s.BaseLooper = plumber.NewBaseLooper(s.loop)
//
// func (s *Looper) loop(ctx context.Context, l *plumber.Loop) error {
// ....
// }
type BaseLooper struct {
runner RunnerCloser
}

// Run executes runners workload. Pipelines are starting Run method in separated goroutine.
// Runner must report its readiness using given callback
func (l *BaseLooper) Run(ctx context.Context, ready ReadyFunc) error {
return l.runner.Run(ctx, ready)
}

// Close method triggers graceful shutdown on the task. It should block till task is properly closed.
// When Close timeout is exceeded then given context is canceled.
func (l *BaseLooper) Close(ctx context.Context) error {
return l.runner.Close(ctx)
}

func NewBaseLooper(looper func(ctx context.Context, loop *Loop) error) *BaseLooper {
return &BaseLooper{
runner: Looper(looper),
}
}
66 changes: 34 additions & 32 deletions orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,23 @@ func (l *Loop) Closing() <-chan DoneFunc {
// })
func Looper(run func(ctx context.Context, loop *Loop) error) RunnerCloser {
var (
once sync.Once
l = &Loop{
runOnce sync.Once
closeOnce sync.Once
returnedCh = make(chan struct{}, 1)
l = &Loop{
closeCh: make(chan DoneFunc, 1),
}
)
return &gracefulRunner{
run: func(ctx context.Context, ready ReadyFunc) error {
var err error
once.Do(func() {
runOnce.Do(func() {
l.ready = ready
defer close(l.closeCh)
defer closeOnce.Do(func() {
close(l.closeCh)
})
err = run(ctx, l)
close(returnedCh)
})
return err
},
Expand All @@ -181,13 +186,14 @@ func Looper(run func(ctx context.Context, loop *Loop) error) RunnerCloser {
close(errCh)
}
)
l.closeCh <- canceled
// if hasn't been started, lets close it
once.Do(func() {
close(errCh)
closeOnce.Do(func() {
l.closeCh <- canceled
close(l.closeCh)
})
select {
case <-returnedCh:
return nil
case <-ctx.Done():
return ctx.Err()
case err := <-errCh:
Expand Down Expand Up @@ -286,15 +292,14 @@ func (r *ParallelPipeline) Run(ctx context.Context, ready ReadyFunc) error {
for _, runner := range r.runners {
go func(runner RunnerCloser) {
defer r.wg.Done()
if err := runner.Run(ctx, func() {
err := runner.Run(ctx, func() {
// Signal that runner is ready
readyCh <- struct{}{}
}); err != nil {
if r.options.ErrorSignaler != nil && !r.closing.Load() {
r.options.ErrorSignaler(err)
}
errs <- err
})
if r.options.ErrorSignaler != nil && !r.closing.Load() {
r.options.ErrorSignaler(err)
}
errs <- err
}(runner)
}

Expand Down Expand Up @@ -357,55 +362,53 @@ func (r *SerialPipeline) Run(ctx context.Context, ready ReadyFunc) error {
wg sync.WaitGroup
errs = make(ErrorCh, len(r.runners))
readyCh = make(chan struct{}, 1)
closeCh = make(chan struct{})
)
wg.Add(len(r.runners))

drain := func(index int) {
for i := index; i < len(r.runners); i++ {
wg.Done()
}
}

// started go routine
wg.Add(1)
go func() {
defer wg.Done()
var index = 0
var errored atomic.Bool
for {
select {
case <-closeCh:
drain(index)
return
case <-readyCh:
case _, ok := <-readyCh:
// We are closed
if !ok {
return
}
// when all runners are running we cal report that pipeline is ready
if index == len(r.runners) {
ready()
return
}
// when we are closing we need to mark remaining workers as finished
if r.closing.Load() {
drain(index)
if r.closing.Load() || errored.Load() {
return
}
runner := r.runners[index]
index++
// runner go routine
wg.Add(1)
go func() {
var once sync.Once
defer wg.Done()
err := runner.Run(ctx, func() {
// worker is ready we can start with next one
once.Do(func() {
readyCh <- struct{}{}
if !r.closing.Load() && !errored.Load() {
readyCh <- struct{}{}
}
})
})
if r.options.ErrorSignaler != nil && !r.closing.Load() {
r.options.ErrorSignaler(err)
}
if err != nil {
r.closing.Store(true)
close(closeCh)
errored.Store(true)
close(readyCh)
errs <- err
}
errs <- err
}()
case <-ctx.Done():
return
Expand All @@ -414,7 +417,6 @@ func (r *SerialPipeline) Run(ctx context.Context, ready ReadyFunc) error {
}()
// Lets start first worker
readyCh <- struct{}{}

wg.Wait()
close(errs)

Expand Down

0 comments on commit 94597b8

Please sign in to comment.