Skip to content

Commit

Permalink
feat(router): support traffic shaping rules on subgraph level (#1438)
Browse files Browse the repository at this point in the history
  • Loading branch information
df-wg authored Dec 18, 2024
1 parent 04f5510 commit 45a1189
Show file tree
Hide file tree
Showing 13 changed files with 913 additions and 31 deletions.
404 changes: 404 additions & 0 deletions router-tests/telemetry_test.go

Large diffs are not rendered by default.

132 changes: 132 additions & 0 deletions router-tests/timeout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package integration_test

import (
"github.com/stretchr/testify/require"
"github.com/wundergraph/cosmo/router-tests/testenv"
"github.com/wundergraph/cosmo/router/core"
"github.com/wundergraph/cosmo/router/pkg/config"
"net/http"
"testing"
"time"
)

func TestTimeouts(t *testing.T) {
t.Parallel()

const queryEmployeeWithHobby = `{
employee(id: 1) {
id
hobbies {
... on Gaming {
name
}
}
}
}`

const queryEmployeeWithNoHobby = `{
employee(id: 1) {
id
}
}`

t.Run("applies RequestTimeout", func(t *testing.T) {
t.Parallel()

hobbySubgraphSleep := testenv.SubgraphsConfig{
Hobbies: testenv.SubgraphConfig{
Middleware: func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
time.Sleep(5 * time.Millisecond) // Slow response
w.Write([]byte("Hello, world!"))
})
},
},
}

trafficConfig := config.TrafficShapingRules{
All: config.GlobalSubgraphRequestRule{
RequestTimeout: 10 * time.Millisecond,
},
Subgraphs: map[string]*config.GlobalSubgraphRequestRule{
"hobbies": {
RequestTimeout: 3 * time.Millisecond,
},
},
}
t.Run("applied subgraph timeout to request", func(t *testing.T) {
t.Parallel()

testenv.Run(t, &testenv.Config{
Subgraphs: hobbySubgraphSleep,
RouterOptions: []core.Option{
core.WithSubgraphTransportOptions(
core.NewSubgraphTransportOptions(trafficConfig)),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: queryEmployeeWithHobby,
})
require.Equal(t, `{"errors":[{"message":"Failed to fetch from Subgraph 'hobbies' at Path 'employee'."}],"data":{"employee":{"id":1,"hobbies":null}}}`, res.Body)
})
})

t.Run("Subgraph timeout options don't affect unrelated subgraph", func(t *testing.T) {
t.Parallel()

testenv.Run(t, &testenv.Config{
Subgraphs: hobbySubgraphSleep,
RouterOptions: []core.Option{
core.WithSubgraphTransportOptions(
core.NewSubgraphTransportOptions(trafficConfig)),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: queryEmployeeWithNoHobby,
})
require.Equal(t, `{"data":{"employee":{"id":1}}}`, res.Body)
})
})
})

t.Run("ResponseHeaderTimeout exceeded", func(t *testing.T) {
t.Parallel()

hobbySubgraphSleep := testenv.SubgraphsConfig{
Hobbies: testenv.SubgraphConfig{
Middleware: func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
time.Sleep(5 * time.Millisecond) // Slow response
w.Write([]byte("Hello, world!"))
})
},
},
}

trafficConfig := config.TrafficShapingRules{
All: config.GlobalSubgraphRequestRule{
RequestTimeout: 10 * time.Millisecond,
},
Subgraphs: map[string]*config.GlobalSubgraphRequestRule{
"hobbies": {
ResponseHeaderTimeout: 3 * time.Millisecond,
},
},
}

testenv.Run(t, &testenv.Config{
Subgraphs: hobbySubgraphSleep,
RouterOptions: []core.Option{
core.WithSubgraphTransportOptions(
core.NewSubgraphTransportOptions(trafficConfig)),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: queryEmployeeWithNoHobby,
})
require.Equal(t, `{"data":{"employee":{"id":1}}}`, res.Body)
})
})
}
10 changes: 1 addition & 9 deletions router/cmd/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,7 @@ func NewRouter(params Params, additionalOptions ...core.Option) (*core.Router, e
core.WithHeaderRules(cfg.Headers),
core.WithRouterTrafficConfig(&cfg.TrafficShaping.Router),
core.WithFileUploadConfig(&cfg.FileUpload),
core.WithSubgraphTransportOptions(&core.SubgraphTransportOptions{
RequestTimeout: cfg.TrafficShaping.All.RequestTimeout,
ResponseHeaderTimeout: cfg.TrafficShaping.All.ResponseHeaderTimeout,
ExpectContinueTimeout: cfg.TrafficShaping.All.ExpectContinueTimeout,
KeepAliveIdleTimeout: cfg.TrafficShaping.All.KeepAliveIdleTimeout,
DialTimeout: cfg.TrafficShaping.All.DialTimeout,
TLSHandshakeTimeout: cfg.TrafficShaping.All.TLSHandshakeTimeout,
KeepAliveProbeInterval: cfg.TrafficShaping.All.KeepAliveProbeInterval,
}),
core.WithSubgraphTransportOptions(core.NewSubgraphTransportOptions(cfg.TrafficShaping)),
core.WithSubgraphRetryOptions(
cfg.TrafficShaping.All.BackoffJitterRetry.Enabled,
cfg.TrafficShaping.All.BackoffJitterRetry.MaxAttempts,
Expand Down
13 changes: 8 additions & 5 deletions router/core/graph_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type (
playgroundHandler func(http.Handler) http.Handler
publicKey *ecdsa.PublicKey
executionTransport *http.Transport
executionTransportProxy ProxyFunc
baseOtelAttributes []attribute.KeyValue
baseRouterConfigVersion string
mux *chi.Mux
Expand All @@ -103,7 +104,8 @@ func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterC
cancelFunc: cancel,
Config: &r.Config,
websocketStats: r.WebsocketStats,
executionTransport: newHTTPTransport(r.subgraphTransportOptions, proxy),
executionTransport: newHTTPTransport(r.subgraphTransportOptions.TransportTimeoutOptions, proxy),
executionTransportProxy: proxy,
playgroundHandler: r.playgroundHandler,
baseRouterConfigVersion: routerConfig.GetVersion(),
inFlightRequests: &atomic.Uint64{},
Expand Down Expand Up @@ -788,10 +790,11 @@ func (s *graphServer) buildGraphMux(ctx context.Context,
logger: s.logger,
trackUsageInfo: s.graphqlMetricsConfig.Enabled,
transportOptions: &TransportOptions{
RequestTimeout: s.subgraphTransportOptions.RequestTimeout,
PreHandlers: s.preOriginHandlers,
PostHandlers: s.postOriginHandlers,
MetricStore: gm.metricStore,
Proxy: s.executionTransportProxy,
SubgraphTransportOptions: s.subgraphTransportOptions,
PreHandlers: s.preOriginHandlers,
PostHandlers: s.postOriginHandlers,
MetricStore: gm.metricStore,
RetryOptions: retrytransport.RetryOptions{
Enabled: s.retryOptions.Enabled,
MaxRetryCount: s.retryOptions.MaxRetryCount,
Expand Down
52 changes: 43 additions & 9 deletions router/core/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type (
proxy ProxyFunc
}

SubgraphTransportOptions struct {
TransportTimeoutOptions struct {
RequestTimeout time.Duration
ResponseHeaderTimeout time.Duration
ExpectContinueTimeout time.Duration
Expand All @@ -94,6 +94,11 @@ type (
KeepAliveProbeInterval time.Duration
}

SubgraphTransportOptions struct {
TransportTimeoutOptions
SubgraphMap map[string]*TransportTimeoutOptions
}

GraphQLMetricsConfig struct {
Enabled bool
CollectorEndpoint string
Expand Down Expand Up @@ -1585,15 +1590,44 @@ func DefaultFileUploadConfig() *config.FileUpload {
}
}

func NewTransportTimeoutOptions(cfg config.GlobalSubgraphRequestRule) TransportTimeoutOptions {
return TransportTimeoutOptions{
RequestTimeout: cfg.RequestTimeout,
ResponseHeaderTimeout: cfg.ResponseHeaderTimeout,
ExpectContinueTimeout: cfg.ExpectContinueTimeout,
KeepAliveIdleTimeout: cfg.KeepAliveIdleTimeout,
DialTimeout: cfg.DialTimeout,
TLSHandshakeTimeout: cfg.TLSHandshakeTimeout,
KeepAliveProbeInterval: cfg.KeepAliveProbeInterval,
}
}

func NewSubgraphTransportOptions(cfg config.TrafficShapingRules) *SubgraphTransportOptions {
base := &SubgraphTransportOptions{
TransportTimeoutOptions: NewTransportTimeoutOptions(cfg.All),
SubgraphMap: map[string]*TransportTimeoutOptions{},
}

for k, v := range cfg.Subgraphs {
opts := NewTransportTimeoutOptions(*v)
base.SubgraphMap[k] = &opts
}

return base
}

func DefaultSubgraphTransportOptions() *SubgraphTransportOptions {
return &SubgraphTransportOptions{
RequestTimeout: 60 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 0 * time.Second,
ExpectContinueTimeout: 0 * time.Second,
KeepAliveProbeInterval: 30 * time.Second,
KeepAliveIdleTimeout: 0 * time.Second,
DialTimeout: 30 * time.Second,
TransportTimeoutOptions: TransportTimeoutOptions{
RequestTimeout: 60 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 0 * time.Second,
ExpectContinueTimeout: 0 * time.Second,
KeepAliveProbeInterval: 30 * time.Second,
KeepAliveIdleTimeout: 0 * time.Second,
DialTimeout: 30 * time.Second,
},
SubgraphMap: map[string]*TransportTimeoutOptions{},
}
}

Expand Down Expand Up @@ -1723,7 +1757,7 @@ func WithHostName(hostName string) Option {

type ProxyFunc func(req *http.Request) (*url.URL, error)

func newHTTPTransport(opts *SubgraphTransportOptions, proxy ProxyFunc) *http.Transport {
func newHTTPTransport(opts TransportTimeoutOptions, proxy ProxyFunc) *http.Transport {
dialer := &net.Dialer{
Timeout: opts.DialTimeout,
KeepAlive: opts.KeepAliveProbeInterval,
Expand Down
53 changes: 53 additions & 0 deletions router/core/timout_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package core

import (
"context"
"go.uber.org/zap"
"net/http"
)

type TimeoutTransport struct {
defaultTransport http.RoundTripper
logger *zap.Logger
subgraphTrippers map[string]*http.Transport
opts *SubgraphTransportOptions
}

func NewTimeoutTransport(transportOpts *SubgraphTransportOptions, roundTripper http.RoundTripper, logger *zap.Logger, proxy ProxyFunc) *TimeoutTransport {
tt := &TimeoutTransport{
defaultTransport: roundTripper,
logger: logger,
subgraphTrippers: map[string]*http.Transport{},
opts: transportOpts,
}

for subgraph, subgraphOpts := range transportOpts.SubgraphMap {
if subgraphOpts != nil {
tt.subgraphTrippers[subgraph] = newHTTPTransport(*subgraphOpts, proxy)
}
}

return tt
}

func (tt *TimeoutTransport) RoundTrip(req *http.Request) (*http.Response, error) {
if req == nil {
return nil, nil
}

rq := getRequestContext(req.Context())
if rq == nil {
return nil, nil
}
subgraph := rq.ActiveSubgraph(req)
if subgraph != nil && subgraph.Name != "" && tt.subgraphTrippers[subgraph.Name] != nil {
timeout := tt.opts.SubgraphMap[subgraph.Name].RequestTimeout
if timeout > 0 {
ctx, cancel := context.WithTimeout(req.Context(), timeout)
defer cancel()
return tt.subgraphTrippers[subgraph.Name].RoundTrip(req.WithContext(ctx))
}
return tt.subgraphTrippers[subgraph.Name].RoundTrip(req)
}
return tt.defaultTransport.RoundTrip(req)
}
Loading

0 comments on commit 45a1189

Please sign in to comment.