Skip to content

Commit

Permalink
Updated so that tier2 also receives the correct value
Browse files Browse the repository at this point in the history
  • Loading branch information
maoueh committed Jan 30, 2025
1 parent 7f4d451 commit e9bbdb1
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 14 deletions.
13 changes: 9 additions & 4 deletions pipeline/resolve.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package pipeline

import (
"connectrpc.com/connect"
"context"
"errors"
"fmt"
"sync/atomic"

"connectrpc.com/connect"
"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/hub"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
Expand All @@ -15,7 +17,6 @@ import (
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"github.com/streamingfast/substreams/reqctx"
"go.uber.org/zap"
"sync/atomic"
)

type getBlockFunc func() (uint64, error)
Expand Down Expand Up @@ -48,7 +49,8 @@ func BuildRequestDetails(
getRecentFinalBlock getBlockFunc,
resolveCursor CursorResolver,
getHeadBlock getBlockFunc,
segmentSize uint64) (req *reqctx.RequestDetails, undoSignal *pbsubstreamsrpc.BlockUndoSignal, err error) {
segmentSize uint64,
) (req *reqctx.RequestDetails, undoSignal *pbsubstreamsrpc.BlockUndoSignal, err error) {
req = &reqctx.RequestDetails{
Modules: request.Modules,
OutputModule: request.OutputModule,
Expand Down Expand Up @@ -95,7 +97,7 @@ func BuildRequestDetails(
return
}

func BuildRequestDetailsFromSubrequest(request *pbssinternal.ProcessRangeRequest) (req *reqctx.RequestDetails) {
func BuildRequestDetailsFromSubrequest(ctx context.Context, request *pbssinternal.ProcessRangeRequest) (req *reqctx.RequestDetails) {
req = &reqctx.RequestDetails{
Modules: request.Modules,
OutputModule: request.OutputModule,
Expand All @@ -107,6 +109,9 @@ func BuildRequestDetailsFromSubrequest(request *pbssinternal.ProcessRangeRequest
ResolvedStartBlockNum: request.StartBlock(),
UniqueID: nextUniqueID(),
}

req.SetStageLayerParallelExecutorCountFromContext(ctx)

return req
}

Expand Down
17 changes: 17 additions & 0 deletions reqctx/request.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package reqctx

import (
"context"
"strconv"

"github.com/streamingfast/dauth"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
)

Expand Down Expand Up @@ -45,3 +47,18 @@ func (d *RequestDetails) ShouldStreamCachedOutputs() bool {
return d.ProductionMode &&
d.ResolvedStartBlockNum < d.LinearHandoffBlockNum
}

// SetStageLayerParallelExecutorCountFromContext sets the MaxStageLayerParallelExecutor from the context
// by first retrieving the dauth trusted headers and then parsing the value from the header, if present.
func (d *RequestDetails) SetStageLayerParallelExecutorCountFromContext(ctx context.Context) {
trustedHeaders := dauth.FromContext(ctx)
if trustedHeaders == nil {
return
}

if parallelExecutors := trustedHeaders.Get("X-Sf-Substreams-Stage-Layer-Parallel-Executor-Max-Count"); parallelExecutors != "" {
if count, err := strconv.ParseUint(parallelExecutors, 10, 64); err == nil {
d.MaxStageLayerParallelExecutor = count
}
}
}
15 changes: 6 additions & 9 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,18 +437,15 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
requestDetails.MaxParallelJobs = count
}
}
if parallelExecutors := auth.Get("X-Sf-Substreams-Stage-Layer-Parallel-Executor-Max-Count"); parallelExecutors != "" {
if count, err := strconv.ParseUint(parallelExecutors, 10, 64); err == nil {
requestDetails.MaxStageLayerParallelExecutor = count
}
}
if ct := auth.Get("X-Sf-Substreams-Cache-Tag"); ct != "" {
if IsValidCacheTag(ct) {
cacheTag = ct
if tag := auth.Get("X-Sf-Substreams-Cache-Tag"); tag != "" {
if IsValidCacheTag(tag) {
cacheTag = tag
} else {
return fmt.Errorf("invalid value for X-Sf-Substreams-Cache-Tag %s, should only contain letters, numbers, hyphens and underscores", ct)
return fmt.Errorf("invalid value for X-Sf-Substreams-Cache-Tag %s, should only contain letters, numbers, hyphens and underscores", tag)
}
}

requestDetails.SetStageLayerParallelExecutorCountFromContext(ctx)
}

var requestStats *metrics.Stats
Expand Down
2 changes: 1 addition & 1 deletion service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P
return stream.NewErrInvalidArg(err.Error())
}

requestDetails := pipeline.BuildRequestDetailsFromSubrequest(request)
requestDetails := pipeline.BuildRequestDetailsFromSubrequest(ctx, request)
ctx = reqctx.WithRequest(ctx, requestDetails)
if s.moduleExecutionTracing {
ctx = reqctx.WithModuleExecutionTracing(ctx)
Expand Down

0 comments on commit e9bbdb1

Please sign in to comment.