Skip to content

Commit

Permalink
fix grpc return code on tier1
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Dec 5, 2023
1 parent 5536779 commit 5c76077
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 39 deletions.
1 change: 1 addition & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Fixed

* Bug in `substreams init` with numbers in ABI types
* Server implementation: return the correct GRPC code instead of wrapping it under an "Unknown" error.

## v1.1.22

Expand Down
9 changes: 3 additions & 6 deletions orchestrator/work/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import (
"sync/atomic"
"time"

"github.com/bufbuild/connect-go"
"github.com/streamingfast/dauth"
"github.com/streamingfast/derr"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
ttrace "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
grpcCodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/streamingfast/substreams/block"
"github.com/streamingfast/substreams/client"
Expand Down Expand Up @@ -263,10 +262,8 @@ func (w *RemoteWorker) work(ctx context.Context, request *pbssinternal.ProcessRa
if ctx.Err() != nil {
return &Result{Error: ctx.Err()}
}
if s, ok := status.FromError(err); ok {
if s.Code() == grpcCodes.InvalidArgument {
return &Result{Error: err}
}
if connect.CodeOf(err) == connect.CodeInvalidArgument {
return &Result{Error: err}
}
return &Result{
Error: NewRetryableErr(fmt.Errorf("receiving stream resp: %w", err)),
Expand Down
16 changes: 8 additions & 8 deletions pipeline/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import (
"context"
"errors"
"fmt"
"github.com/streamingfast/substreams/manifest"
"sync/atomic"

"github.com/bufbuild/connect-go"
"github.com/streamingfast/substreams/manifest"

"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/hub"
"github.com/streamingfast/dstore"
"go.uber.org/zap"
grpccodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
Expand Down Expand Up @@ -46,12 +46,12 @@ func BuildRequestDetails(
if req.Modules != nil { // because of tests which do not define modules in the request. too annoying to add this to tests for now. (TODO)
graph, err := manifest.NewModuleGraph(request.Modules.Modules)
if err != nil {
return nil, nil, status.Errorf(grpccodes.InvalidArgument, "invalid modules: %s", err.Error())
return nil, nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid modules: %w", err))
}

moduleHasStatefulDependencies, err = graph.HasStatefulDependencies(request.OutputModule)
if err != nil {
return nil, nil, status.Errorf(grpccodes.InvalidArgument, "invalid output module: %s", err.Error())
return nil, nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid output module: %w", err))
}
}

Expand Down Expand Up @@ -145,11 +145,11 @@ func resolveStartBlockNum(ctx context.Context, req *pbsubstreamsrpc.Request, res

cursor, err := bstream.CursorFromOpaque(req.StartCursor)
if err != nil {
return 0, "", nil, status.Errorf(grpccodes.InvalidArgument, "invalid StartCursor %q: %s", cursor, err.Error())
return 0, "", nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid StartCursor %q: %w", cursor, err))
}

if req.StopBlockNum > 0 && req.StopBlockNum < cursor.Block.Num() {
return 0, "", nil, status.Errorf(grpccodes.InvalidArgument, "StartCursor %q is after StopBlockNum %d", cursor, req.StopBlockNum)
return 0, "", nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("StartCursor %q is after StopBlockNum %d", cursor, req.StopBlockNum))
}

if cursor.IsOnFinalBlock() {
Expand All @@ -159,7 +159,7 @@ func resolveStartBlockNum(ctx context.Context, req *pbsubstreamsrpc.Request, res

reorgJunctionBlock, head, err := resolveCursor(ctx, cursor)
if err != nil {
return 0, "", nil, status.Errorf(grpccodes.InvalidArgument, "cannot resolve StartCursor %q: %s", cursor, err.Error())
return 0, "", nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("cannot resolve StartCursor %q: %s", cursor, err.Error()))
}
var undoSignal *pbsubstreamsrpc.BlockUndoSignal
resolvedCursor := cursor
Expand Down
6 changes: 3 additions & 3 deletions service/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package service

import (
"context"
"fmt"

"github.com/bufbuild/connect-go"
"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/hub"
"github.com/streamingfast/bstream/stream"
"github.com/streamingfast/dmetering"
"github.com/streamingfast/dstore"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type StreamFactory struct {
Expand Down Expand Up @@ -41,7 +41,7 @@ func (sf *StreamFactory) New(
if cursor != "" {
cur, err := bstream.CursorFromOpaque(cursor)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid StartCursor %q: %s", cursor, err)
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid StartCursor %q: %w", cursor, err))
}
if cursorIsTarget {
options = append(options, stream.WithTargetCursor(cur))
Expand Down
30 changes: 14 additions & 16 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ import (
"go.opentelemetry.io/otel/attribute"
ttrace "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -168,7 +166,7 @@ func (s *Tier1Service) Blocks(

request := req.Msg
if request.Modules == nil {
return status.Error(codes.InvalidArgument, "missing modules in request")
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("missing modules in request"))
}
moduleNames := make([]string, len(request.Modules.Modules))
for i := 0; i < len(moduleNames); i++ {
Expand Down Expand Up @@ -201,7 +199,7 @@ func (s *Tier1Service) Blocks(
defer metrics.ActiveSubstreams.Dec()

if err := outputmodules.ValidateTier1Request(request, s.blockType); err != nil {
return status.Error(codes.InvalidArgument, fmt.Errorf("validate request: %w", err).Error())
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err))
}

outputGraph, err := outputmodules.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules)
Expand Down Expand Up @@ -240,13 +238,13 @@ func (s *Tier1Service) Blocks(
err = s.blocks(runningContext, request, outputGraph, respFunc)

if grpcError := toGRPCError(runningContext, err); grpcError != nil {
switch status.Code(grpcError) {
case codes.Internal:
switch connect.CodeOf(grpcError) {
case connect.CodeInternal:
logger.Info("unexpected termination of stream of blocks", zap.String("stream_processor", "tier1"), zap.Error(err))
case codes.InvalidArgument:
case connect.CodeInvalidArgument:
logger.Debug("recording failure on request", zap.String("request_id", requestID))
s.recordFailure(requestID, grpcError)
case codes.Canceled:
case connect.CodeCanceled:
logger.Info("Blocks request canceled by user", zap.Error(grpcError))
default:
logger.Info("Blocks request completed with error", zap.Error(grpcError))
Expand Down Expand Up @@ -506,7 +504,7 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg
}
if err := streamSrv.Send(resp); err != nil {
logger.Info("unable to send block probably due to client disconnecting", zap.Error(err))
return status.Error(codes.Unavailable, err.Error())
return connect.NewError(connect.CodeUnavailable, err)
}

sendMetering(meter, userID, apiKeyID, ip, userMeta, "sf.substreams.rpc.v2/Blocks", resp)
Expand Down Expand Up @@ -535,11 +533,11 @@ func setupRequestStats(ctx context.Context, requestDetails *reqctx.RequestDetail
// or `stream.ErrInvalidArg`, error is turned into a proper gRPC error respectively of code
// `Canceled`, `DeadlineExceeded` or `InvalidArgument`.
//
// If the `err` has its in chain any error constructed through `status.Error` (and its variants), then
// If the `err` has its in chain any error constructed through `connect.NewError` (and its variants), then
// we return the first found error of such type directly, because it's already a gRPC error.
//
// Otherwise, the error is assumed to be an internal error and turned backed into a proper
// `status.Error(codes.Internal, err.Error())`.
// `connect.NewError(connect.CodeInternal, err)`.
func toGRPCError(ctx context.Context, err error) error {
if err == nil {
return nil
Expand All @@ -553,23 +551,23 @@ func toGRPCError(ctx context.Context, err error) error {
if context.Cause(ctx) != nil {
err = context.Cause(ctx)
}
return status.Error(codes.Canceled, err.Error())
return connect.NewError(connect.CodeCanceled, err)
}

if errors.Is(err, context.DeadlineExceeded) {
return status.Error(codes.DeadlineExceeded, "source deadline exceeded")
return connect.NewError(connect.CodeDeadlineExceeded, err)
}

if errors.Is(err, exec.ErrWasmDeterministicExec) {
return status.Error(codes.InvalidArgument, err.Error())
return connect.NewError(connect.CodeInvalidArgument, err)
}

var errInvalidArg *stream.ErrInvalidArg
if errors.As(err, &errInvalidArg) {
return status.Error(codes.InvalidArgument, errInvalidArg.Error())
return connect.NewError(connect.CodeInvalidArgument, errInvalidArg)
}

// Do we want to print the full cause as coming from Golang? Would we like to maybe trim off "operational"
// data?
return status.Error(codes.Internal, err.Error())
return connect.NewError(connect.CodeInternal, err)
}
9 changes: 4 additions & 5 deletions service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"

"github.com/bufbuild/connect-go"
"github.com/streamingfast/bstream/stream"
"github.com/streamingfast/dauth"
"github.com/streamingfast/dmetering"
Expand All @@ -26,9 +27,7 @@ import (
"go.opentelemetry.io/otel/attribute"
ttrace "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

type Tier2Service struct {
Expand Down Expand Up @@ -118,7 +117,7 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s
span.SetAttributes(attribute.String("hostname", hostname))

if request.Modules == nil {
return status.Error(codes.InvalidArgument, "missing modules in request")
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("missing modules in request"))
}
moduleNames := make([]string, len(request.Modules.Modules))
for i := 0; i < len(moduleNames); i++ {
Expand Down Expand Up @@ -152,7 +151,7 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s
err = s.processRange(ctx, request, respFunc, tracing.GetTraceID(ctx).String())
grpcError = toGRPCError(ctx, err)

if grpcError != nil && status.Code(grpcError) == codes.Internal {
if grpcError != nil && connect.CodeOf(grpcError) == connect.CodeInternal {
logger.Info("unexpected termination of stream of blocks", zap.Error(err))
}

Expand Down Expand Up @@ -303,7 +302,7 @@ func tier2ResponseHandler(ctx context.Context, logger *zap.Logger, streamSrv pbs
resp := respAny.(*pbssinternal.ProcessRangeResponse)
if err := streamSrv.Send(resp); err != nil {
logger.Info("unable to send block probably due to client disconnecting", zap.Error(err))
return status.Error(codes.Unavailable, err.Error())
return connect.NewError(connect.CodeUnavailable, err)
}

sendMetering(meter, userID, apiKeyID, ip, userMeta, "sf.substreams.internal.v2/ProcessRange", resp)
Expand Down
2 changes: 1 addition & 1 deletion sink-server/docker/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (e *DockerEngine) newClickhouse(deploymentID string, pkg *pbsubstreams.Pack
conf := types.ServiceConfig{
Name: name,
ContainerName: name,
Image: "clickhouse/clickhouse-server:23.3-alpine",
Image: "clickhouse/clickhouse-server:23.9-alpine",
Restart: "on-failure",
Ports: []types.ServicePortConfig{
{
Expand Down

0 comments on commit 5c76077

Please sign in to comment.