Skip to content

Commit

Permalink
Limit parallel execution of a stage's layer
Browse files Browse the repository at this point in the history
Previously, the engine was executing modules in a stage's layer all in parallel. So if you had 20 independent mapper modules, they were all run in parallel.

This was hindering performance on high load where a lot of CPU cycles can be consumed will the machine has limited physical cores available.

We now change that behavior, development mode will not execute any modules in parallel, never. For production mode, we now limit to 2 parallel execution. A future update will make that value dynamic based on the subscription of the request.

# Conflicts:
#	docs/release-notes/change-log.md
  • Loading branch information
maoueh committed Jan 28, 2025
1 parent b7e1e86 commit ff3d1de
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 31 deletions.
6 changes: 5 additions & 1 deletion docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Server

* Improve noop-mode: will now only send one signal per bundle, without any data
* Improve noop-mode: will now only send one signal per bundle, without any data.

* Limit parallel execution of a stage's layer

Previously, the engine was executing modules in a stage's layer all in parallel. We now change that behavior, development mode will not execute any modules in parallel and production mode will limit parallelism to 2 for now.

### Client

Expand Down
21 changes: 12 additions & 9 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@ type Pipeline struct {
postBlockHooks []substreams.BlockHook
postJobHooks []substreams.PostJobHook

wasmRuntime *wasm.Registry
execGraph *exec.Graph
loadedModules map[uint32]wasm.Module
ModuleExecutors [][]exec.ModuleExecutor // Staged module executors
executionStages exec.ExecutionStages
wasmRuntime *wasm.Registry
execGraph *exec.Graph
loadedModules map[uint32]wasm.Module
// StagedModuleExecutors represents all the modules within a stage that should be executed. The
// first level of the 2D list represents layer within a stage to execute sequentially.
// The second level contains modules to execute within a layer, those can be executed concurrently.
StagedModuleExecutors [][]exec.ModuleExecutor
executionStages exec.ExecutionStages

mapModuleOutput *pbsubstreamsrpc.MapModuleOutput
extraMapModuleOutputs []*pbsubstreamsrpc.MapModuleOutput
Expand Down Expand Up @@ -455,7 +458,7 @@ func (p *Pipeline) returnInternalModuleProgressOutputs(clock *pbsubstreams.Clock

// BuildModuleExecutors builds the ModuleExecutors, and the loadedModules.
func (p *Pipeline) BuildModuleExecutors(ctx context.Context) error {
if p.ModuleExecutors != nil {
if p.StagedModuleExecutors != nil {
// Eventually, we can invalidate our catch to accomodate the PATCH
// and rebuild all the modules, and tear down the previously loaded ones.
return nil
Expand Down Expand Up @@ -582,13 +585,13 @@ func (p *Pipeline) BuildModuleExecutors(ctx context.Context) error {
}
}

p.ModuleExecutors = stagedModuleExecutors
p.StagedModuleExecutors = stagedModuleExecutors
return nil
}

func (p *Pipeline) cleanUpModuleExecutors(ctx context.Context) error {
for _, stage := range p.ModuleExecutors {
for _, executor := range stage {
for _, layer := range p.StagedModuleExecutors {
for _, executor := range layer {
if err := executor.Close(ctx); err != nil {
return fmt.Errorf("closing module executor %q: %w", executor.Name(), err)
}
Expand Down
40 changes: 20 additions & 20 deletions pipeline/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"io"
"runtime/debug"
"sync"

"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
Expand All @@ -20,6 +19,7 @@ import (
"github.com/streamingfast/substreams/reqctx"
"github.com/streamingfast/substreams/storage/execout"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -307,11 +307,9 @@ func (p *Pipeline) executeModules(ctx context.Context, execOutput execout.Execut
// the ctx is cached in the built moduleExecutors so we only activate timeout here
ctx, cancel := context.WithTimeout(ctx, p.executionTimeout)
defer cancel()
for _, stage := range p.ModuleExecutors {
//t0 := time.Now()
if len(stage) < 2 {
//fmt.Println("Linear stage", len(stage))
for _, executor := range stage {
for _, layer := range p.StagedModuleExecutors {
if len(layer) <= 1 || reqctx.IsInDevelopmentModeRequest(ctx) {
for _, executor := range layer {
if !executor.RunsOnBlock(blockNum) {
continue
}
Expand All @@ -321,40 +319,42 @@ func (p *Pipeline) executeModules(ctx context.Context, execOutput execout.Execut
}
}
} else {
results := make([]resultObj, len(stage))
wg := sync.WaitGroup{}
//fmt.Println("Parallelized in stage", stageIdx, len(stage))
for i, executor := range stage {
results := make([]resultObj, len(layer))
wg := errgroup.Group{}
wg.SetLimit(2)

for i, executor := range layer {
if !executor.RunsOnBlock(execOutput.Clock().Number) {
results[i] = resultObj{not_runnable: true}
continue
}
wg.Add(1)
i := i
executor := executor
go func() {
defer wg.Done()

wg.Go(func() error {
res := p.execute(ctx, executor, execOutput)
results[i] = res
}()

return nil
})
}

if err := wg.Wait(); err != nil {
return fmt.Errorf("running executors: %w", err)
}
wg.Wait()

for i, result := range results {
if result.not_runnable {
continue
}
executor := stage[i]
executor := layer[i]
if result.err != nil {
//p.returnFailureProgress(ctx, err, executor)
return fmt.Errorf("running executor %q: %w", executor.Name(), result.err)
}

if err := p.applyExecutionResult(ctx, executor, result, execOutput); err != nil {
return fmt.Errorf("applying executor results %q on block %d (%s): %w", executor.Name(), blockNum, execOutput.Clock(), result.err)
}
}
}
//blockDuration += time.Since(t0)
}

return nil
Expand Down
9 changes: 9 additions & 0 deletions reqctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ func WithEmitter(ctx context.Context, emitter dmetering.EventEmitter) context.Co
return context.WithValue(ctx, emitterKey, emitter)
}

func IsInDevelopmentModeRequest(ctx context.Context) bool {
details := Details(ctx)
if details == nil {
return true
}

return !details.ProductionMode
}

type ISpan interface {
// End completes the Span. The Span is considered complete and ready to be
// delivered through the rest of the telemetry pipeline after this method
Expand Down
2 changes: 1 addition & 1 deletion service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P

allExecutorsExcludedByBlockIndex := true
excludable:
for _, stage := range pipe.ModuleExecutors {
for _, stage := range pipe.StagedModuleExecutors {
for _, executor := range stage {
switch executor := executor.(type) {
case *exec.MapperModuleExecutor:
Expand Down

0 comments on commit ff3d1de

Please sign in to comment.