diff --git a/router-tests/telemetry_test.go b/router-tests/telemetry_test.go index 0a6119656b..909d6f97d2 100644 --- a/router-tests/telemetry_test.go +++ b/router-tests/telemetry_test.go @@ -6,6 +6,7 @@ import ( "regexp" "runtime" "testing" + "time" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" @@ -2458,6 +2459,409 @@ func TestTelemetry(t *testing.T) { }) }) + t.Run("Telemetry works with subgraph timeouts", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + exporter := tracetest.NewInMemoryExporter(t) + + testenv.Run(t, &testenv.Config{ + TraceExporter: exporter, + MetricReader: metricReader, + RouterOptions: []core.Option{ + core.WithSubgraphTransportOptions( + core.NewSubgraphTransportOptions(config.TrafficShapingRules{ + All: config.GlobalSubgraphRequestRule{ + RequestTimeout: 10 * time.Millisecond, + }, + Subgraphs: map[string]*config.GlobalSubgraphRequestRule{ + "hobbies": { + RequestTimeout: 3 * time.Millisecond, + }, + }, + })), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `{ employee(id:1) { id details { forename surname } } }`, + }) + require.JSONEq(t, `{"data":{"employee":{"id":1,"details":{"forename":"Jens","surname":"Neuse"}}}}`, res.Body) + + sn := exporter.GetSpans().Snapshots() + require.Len(t, sn, 9, "expected 9 spans, got %d", len(sn)) + + /** + * Spans + */ + + // Pre-Handler Operation Read + + require.Equal(t, "HTTP - Read Body", sn[0].Name()) + require.Equal(t, trace.SpanKindInternal, sn[0].SpanKind()) + require.Equal(t, sdktrace.Status{Code: codes.Unset}, sn[0].Status()) + require.Len(t, sn[0].Attributes(), 7) + require.Contains(t, sn[0].Attributes(), otel.WgRouterVersion.String("dev")) + require.Contains(t, sn[0].Attributes(), otel.WgRouterClusterName.String("")) + require.Contains(t, sn[0].Attributes(), otel.WgFederatedGraphID.String("graph")) + require.Contains(t, sn[0].Attributes(), otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain())) + require.Contains(t, sn[0].Attributes(), otel.WgClientName.String("unknown")) + require.Contains(t, sn[0].Attributes(), otel.WgClientVersion.String("missing")) + require.Contains(t, sn[0].Attributes(), otel.WgOperationProtocol.String("http")) + + // Pre-Handler Operation Parse + + require.Equal(t, "Operation - Parse", sn[1].Name()) + require.Equal(t, trace.SpanKindInternal, sn[1].SpanKind()) + require.Equal(t, sdktrace.Status{Code: codes.Unset}, sn[1].Status()) + + // Span Resource attributes + + rs := attribute.NewSet(sn[1].Resource().Attributes()...) + + require.True(t, rs.HasValue("host.name")) + require.True(t, rs.HasValue("os.type")) + require.True(t, rs.HasValue("process.pid")) + + require.NotEmpty(t, sn[1].Resource().Attributes(), attribute.String("telemetry.sdk.version", "1.24.0")) + require.Contains(t, sn[1].Resource().Attributes(), attribute.String("service.instance.id", "test-instance")) + require.Contains(t, sn[1].Resource().Attributes(), attribute.String("telemetry.sdk.name", "opentelemetry")) + require.Contains(t, sn[1].Resource().Attributes(), attribute.String("telemetry.sdk.language", "go")) + require.Contains(t, sn[1].Resource().Attributes(), attribute.String("service.version", "dev")) + require.Contains(t, sn[1].Resource().Attributes(), attribute.String("service.name", "cosmo-router")) + + // Span attributes + + require.Len(t, sn[1].Attributes(), 7) + require.Contains(t, sn[1].Attributes(), otel.WgRouterVersion.String("dev")) + require.Contains(t, sn[1].Attributes(), otel.WgRouterClusterName.String("")) + require.Contains(t, sn[1].Attributes(), otel.WgFederatedGraphID.String("graph")) + require.Contains(t, sn[1].Attributes(), otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain())) + require.Contains(t, sn[1].Attributes(), otel.WgClientName.String("unknown")) + require.Contains(t, sn[1].Attributes(), otel.WgClientVersion.String("missing")) + require.Contains(t, sn[1].Attributes(), otel.WgOperationProtocol.String("http")) + + require.Equal(t, "Operation - Normalize", sn[2].Name()) + require.Equal(t, trace.SpanKindInternal, sn[2].SpanKind()) + require.Equal(t, sdktrace.Status{Code: codes.Unset}, sn[2].Status()) + + // Span Resource attributes + + rs = attribute.NewSet(sn[2].Resource().Attributes()...) + + require.Len(t, sn[2].Resource().Attributes(), 9) + + require.True(t, rs.HasValue("host.name")) + require.True(t, rs.HasValue("os.type")) + require.True(t, rs.HasValue("process.pid")) + + require.NotEmpty(t, sn[2].Resource().Attributes(), attribute.String("telemetry.sdk.version", "1.24.0")) + require.Contains(t, sn[2].Resource().Attributes(), attribute.String("service.instance.id", "test-instance")) + require.Contains(t, sn[2].Resource().Attributes(), attribute.String("telemetry.sdk.name", "opentelemetry")) + require.Contains(t, sn[2].Resource().Attributes(), attribute.String("telemetry.sdk.language", "go")) + require.Contains(t, sn[2].Resource().Attributes(), attribute.String("service.version", "dev")) + require.Contains(t, sn[2].Resource().Attributes(), attribute.String("service.name", "cosmo-router")) + + // Span attributes + + require.Len(t, sn[2].Attributes(), 10) + + require.Contains(t, sn[2].Attributes(), otel.WgRouterVersion.String("dev")) + require.Contains(t, sn[2].Attributes(), otel.WgRouterClusterName.String("")) + require.Contains(t, sn[2].Attributes(), otel.WgFederatedGraphID.String("graph")) + require.Contains(t, sn[2].Attributes(), otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain())) + require.Contains(t, sn[2].Attributes(), otel.WgOperationName.String("")) + require.Contains(t, sn[2].Attributes(), otel.WgOperationType.String("query")) + require.Contains(t, sn[2].Attributes(), otel.WgNormalizationCacheHit.Bool(false)) + require.Contains(t, sn[2].Attributes(), otel.WgClientName.String("unknown")) + require.Contains(t, sn[2].Attributes(), otel.WgClientVersion.String("missing")) + require.Contains(t, sn[2].Attributes(), otel.WgOperationProtocol.String("http")) + + require.Equal(t, "Operation - Validate", sn[3].Name()) + require.Equal(t, trace.SpanKindInternal, sn[3].SpanKind()) + require.Equal(t, sdktrace.Status{Code: codes.Unset}, sn[3].Status()) + + // Span Resource attributes + + rs = attribute.NewSet(sn[3].Resource().Attributes()...) + + require.Len(t, sn[3].Resource().Attributes(), 9) + + require.True(t, rs.HasValue("host.name")) + require.True(t, rs.HasValue("os.type")) + require.True(t, rs.HasValue("process.pid")) + + require.NotEmpty(t, sn[3].Resource().Attributes(), attribute.String("telemetry.sdk.version", "1.24.0")) + require.Contains(t, sn[3].Resource().Attributes(), attribute.String("service.instance.id", "test-instance")) + require.Contains(t, sn[3].Resource().Attributes(), attribute.String("telemetry.sdk.name", "opentelemetry")) + require.Contains(t, sn[3].Resource().Attributes(), attribute.String("telemetry.sdk.language", "go")) + require.Contains(t, sn[3].Resource().Attributes(), attribute.String("service.version", "dev")) + require.Contains(t, sn[3].Resource().Attributes(), attribute.String("service.name", "cosmo-router")) + + // Span attributes + + require.Len(t, sn[3].Attributes(), 11) + + require.Equal(t, "Operation - Validate", sn[3].Name()) + require.Equal(t, trace.SpanKindInternal, sn[3].SpanKind()) + require.Equal(t, sdktrace.Status{Code: codes.Unset}, sn[3].Status()) + + require.Contains(t, sn[3].Attributes(), otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain())) + require.Contains(t, sn[3].Attributes(), otel.WgRouterVersion.String("dev")) + require.Contains(t, sn[3].Attributes(), otel.WgRouterClusterName.String("")) + require.Contains(t, sn[3].Attributes(), otel.WgFederatedGraphID.String("graph")) + + require.Contains(t, sn[3].Attributes(), otel.WgClientName.String("unknown")) + require.Contains(t, sn[3].Attributes(), otel.WgValidationCacheHit.Bool(false)) + + require.Contains(t, sn[3].Attributes(), otel.WgClientVersion.String("missing")) + require.Contains(t, sn[3].Attributes(), otel.WgOperationProtocol.String("http")) + require.Contains(t, sn[3].Attributes(), otel.WgOperationName.String("")) + require.Contains(t, sn[3].Attributes(), otel.WgOperationType.String("query")) + + require.Contains(t, sn[3].Attributes(), otel.WgOperationHash.String("14671468813149144966")) + require.Contains(t, sn[3].Attributes(), otel.WgValidationCacheHit.Bool(false)) + + // Span Resource attributes + + rs = attribute.NewSet(sn[4].Resource().Attributes()...) + + require.Len(t, sn[4].Resource().Attributes(), 9) + + require.True(t, rs.HasValue("host.name")) + require.True(t, rs.HasValue("os.type")) + require.True(t, rs.HasValue("process.pid")) + + require.NotEmpty(t, sn[4].Resource().Attributes(), attribute.String("telemetry.sdk.version", "1.24.0")) + require.Contains(t, sn[4].Resource().Attributes(), attribute.String("service.instance.id", "test-instance")) + require.Contains(t, sn[4].Resource().Attributes(), attribute.String("telemetry.sdk.name", "opentelemetry")) + require.Contains(t, sn[4].Resource().Attributes(), attribute.String("telemetry.sdk.language", "go")) + require.Contains(t, sn[4].Resource().Attributes(), attribute.String("service.version", "dev")) + require.Contains(t, sn[4].Resource().Attributes(), attribute.String("service.name", "cosmo-router")) + + // Span attributes + + require.Len(t, sn[4].Attributes(), 12) + require.Contains(t, sn[4].Attributes(), otel.WgRouterVersion.String("dev")) + require.Contains(t, sn[4].Attributes(), otel.WgRouterClusterName.String("")) + require.Contains(t, sn[4].Attributes(), otel.WgFederatedGraphID.String("graph")) + require.Contains(t, sn[4].Attributes(), otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain())) + require.Contains(t, sn[4].Attributes(), otel.WgEngineRequestTracingEnabled.Bool(false)) + require.Contains(t, sn[4].Attributes(), otel.WgEnginePlanCacheHit.Bool(false)) + require.Contains(t, sn[4].Attributes(), otel.WgClientName.String("unknown")) + require.Contains(t, sn[4].Attributes(), otel.WgClientVersion.String("missing")) + require.Contains(t, sn[4].Attributes(), otel.WgOperationName.String("")) + require.Contains(t, sn[4].Attributes(), otel.WgOperationType.String("query")) + require.Contains(t, sn[4].Attributes(), otel.WgOperationProtocol.String("http")) + require.Contains(t, sn[4].Attributes(), otel.WgOperationHash.String("14671468813149144966")) + + // Engine Transport + require.Equal(t, "query unnamed", sn[5].Name()) + require.Equal(t, trace.SpanKindClient, sn[5].SpanKind()) + require.Equal(t, sdktrace.Status{Code: codes.Unset}, sn[5].Status()) + + // Span Resource attributes + + rs = attribute.NewSet(sn[5].Resource().Attributes()...) + + require.Len(t, sn[5].Resource().Attributes(), 9) + + require.True(t, rs.HasValue("host.name")) + require.True(t, rs.HasValue("os.type")) + require.True(t, rs.HasValue("process.pid")) + + require.NotEmpty(t, sn[5].Resource().Attributes(), attribute.String("telemetry.sdk.version", "1.24.0")) + require.Contains(t, sn[5].Resource().Attributes(), attribute.String("service.instance.id", "test-instance")) + require.Contains(t, sn[5].Resource().Attributes(), attribute.String("telemetry.sdk.name", "opentelemetry")) + require.Contains(t, sn[5].Resource().Attributes(), attribute.String("telemetry.sdk.language", "go")) + require.Contains(t, sn[5].Resource().Attributes(), attribute.String("service.version", "dev")) + require.Contains(t, sn[5].Resource().Attributes(), attribute.String("service.name", "cosmo-router")) + + // Span attributes + + sa := attribute.NewSet(sn[5].Attributes()...) + + require.Len(t, sn[5].Attributes(), 21) + require.True(t, sa.HasValue(semconv.HTTPURLKey)) + require.True(t, sa.HasValue(semconv.NetPeerPortKey)) + + require.Contains(t, sn[5].Attributes(), otel.WgRouterVersion.String("dev")) + require.Contains(t, sn[5].Attributes(), otel.WgRouterClusterName.String("")) + require.Contains(t, sn[5].Attributes(), otel.WgFederatedGraphID.String("graph")) + require.Contains(t, sn[5].Attributes(), otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain())) + require.Contains(t, sn[5].Attributes(), otel.WgComponentName.String("engine-transport")) + require.Contains(t, sn[5].Attributes(), semconv.HTTPMethod("POST")) + require.Contains(t, sn[5].Attributes(), semconv.HTTPFlavorKey.String("1.1")) + require.Contains(t, sn[5].Attributes(), semconv.NetPeerName("127.0.0.1")) + require.Contains(t, sn[5].Attributes(), semconv.HTTPRequestContentLength(96)) + require.Contains(t, sn[5].Attributes(), otel.WgClientName.String("unknown")) + require.Contains(t, sn[5].Attributes(), otel.WgClientVersion.String("missing")) + require.Contains(t, sn[5].Attributes(), otel.WgOperationName.String("")) + require.Contains(t, sn[5].Attributes(), otel.WgOperationType.String("query")) + require.Contains(t, sn[5].Attributes(), otel.WgOperationProtocol.String("http")) + require.Contains(t, sn[5].Attributes(), otel.WgOperationHash.String("14671468813149144966")) + require.Contains(t, sn[5].Attributes(), otel.WgSubgraphID.String("0")) + require.Contains(t, sn[5].Attributes(), otel.WgSubgraphName.String("employees")) + require.Contains(t, sn[5].Attributes(), semconv.HTTPStatusCode(200)) + require.Contains(t, sn[5].Attributes(), semconv.HTTPResponseContentLength(78)) + + // Engine Loader Hooks + require.Equal(t, "Engine - Fetch", sn[6].Name()) + require.Equal(t, trace.SpanKindInternal, sn[6].SpanKind()) + require.Equal(t, sdktrace.Status{Code: codes.Unset}, sn[6].Status()) + + // Span Resource attributes + + rs = attribute.NewSet(sn[6].Resource().Attributes()...) + + require.Len(t, sn[6].Resource().Attributes(), 9) + + require.True(t, rs.HasValue("host.name")) + require.True(t, rs.HasValue("os.type")) + require.True(t, rs.HasValue("process.pid")) + + require.NotEmpty(t, sn[6].Resource().Attributes(), attribute.String("telemetry.sdk.version", "1.24.0")) + require.Contains(t, sn[6].Resource().Attributes(), attribute.String("service.instance.id", "test-instance")) + require.Contains(t, sn[6].Resource().Attributes(), attribute.String("telemetry.sdk.name", "opentelemetry")) + require.Contains(t, sn[6].Resource().Attributes(), attribute.String("telemetry.sdk.language", "go")) + require.Contains(t, sn[6].Resource().Attributes(), attribute.String("service.version", "dev")) + require.Contains(t, sn[6].Resource().Attributes(), attribute.String("service.name", "cosmo-router")) + + // Span attributes + + require.Len(t, sn[6].Attributes(), 14) + + require.Contains(t, sn[6].Attributes(), otel.WgSubgraphID.String("0")) + require.Contains(t, sn[6].Attributes(), otel.WgSubgraphName.String("employees")) + require.Contains(t, sn[6].Attributes(), semconv.HTTPStatusCode(200)) + require.Contains(t, sn[6].Attributes(), otel.WgComponentName.String("engine-loader")) + require.Contains(t, sn[6].Attributes(), otel.WgClientName.String("unknown")) + require.Contains(t, sn[6].Attributes(), otel.WgClientVersion.String("missing")) + require.Contains(t, sn[6].Attributes(), otel.WgOperationName.String("")) + require.Contains(t, sn[6].Attributes(), otel.WgOperationType.String("query")) + require.Contains(t, sn[6].Attributes(), otel.WgOperationProtocol.String("http")) + require.Contains(t, sn[6].Attributes(), otel.WgOperationHash.String("14671468813149144966")) + require.Contains(t, sn[6].Attributes(), otel.WgRouterVersion.String("dev")) + require.Contains(t, sn[6].Attributes(), otel.WgRouterClusterName.String("")) + require.Contains(t, sn[6].Attributes(), otel.WgFederatedGraphID.String("graph")) + require.Contains(t, sn[6].Attributes(), otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain())) + + // GraphQL handler + require.Equal(t, "Operation - Execute", sn[7].Name()) + require.Equal(t, trace.SpanKindInternal, sn[7].SpanKind()) + require.Equal(t, sdktrace.Status{Code: codes.Unset}, sn[7].Status()) + + // Span Resource attributes + + rs = attribute.NewSet(sn[7].Resource().Attributes()...) + + require.Len(t, sn[7].Resource().Attributes(), 9) + + require.True(t, rs.HasValue("host.name")) + require.True(t, rs.HasValue("os.type")) + require.True(t, rs.HasValue("process.pid")) + + require.NotEmpty(t, sn[7].Resource().Attributes(), attribute.String("telemetry.sdk.version", "1.24.0")) + require.Contains(t, sn[7].Resource().Attributes(), attribute.String("service.instance.id", "test-instance")) + require.Contains(t, sn[7].Resource().Attributes(), attribute.String("telemetry.sdk.name", "opentelemetry")) + require.Contains(t, sn[7].Resource().Attributes(), attribute.String("telemetry.sdk.language", "go")) + require.Contains(t, sn[7].Resource().Attributes(), attribute.String("service.version", "dev")) + require.Contains(t, sn[7].Resource().Attributes(), attribute.String("service.name", "cosmo-router")) + + // Span attributes + + require.Len(t, sn[7].Attributes(), 11) + require.Contains(t, sn[7].Attributes(), otel.WgClientName.String("unknown")) + require.Contains(t, sn[7].Attributes(), otel.WgClientVersion.String("missing")) + require.Contains(t, sn[7].Attributes(), otel.WgOperationName.String("")) + require.Contains(t, sn[7].Attributes(), otel.WgOperationType.String("query")) + require.Contains(t, sn[7].Attributes(), otel.WgOperationProtocol.String("http")) + require.Contains(t, sn[7].Attributes(), otel.WgOperationHash.String("14671468813149144966")) + + require.Contains(t, sn[7].Attributes(), otel.WgRouterVersion.String("dev")) + require.Contains(t, sn[7].Attributes(), otel.WgRouterClusterName.String("")) + require.Contains(t, sn[7].Attributes(), otel.WgFederatedGraphID.String("graph")) + require.Contains(t, sn[7].Attributes(), otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain())) + require.Contains(t, sn[7].Attributes(), otel.WgAcquireResolverWaitTimeMs.Int64(0)) + + // Root Server middleware + require.Equal(t, "query unnamed", sn[8].Name()) + require.Equal(t, trace.SpanKindServer, sn[8].SpanKind()) + require.Equal(t, sdktrace.Status{Code: codes.Unset}, sn[8].Status()) + + // Span Resource attributes + + rs = attribute.NewSet(sn[8].Resource().Attributes()...) + + require.Len(t, sn[8].Resource().Attributes(), 9) + + require.True(t, rs.HasValue("host.name")) + require.True(t, rs.HasValue("os.type")) + require.True(t, rs.HasValue("process.pid")) + + require.NotEmpty(t, sn[8].Resource().Attributes(), attribute.String("telemetry.sdk.version", "1.24.0")) + require.Contains(t, sn[8].Resource().Attributes(), attribute.String("service.instance.id", "test-instance")) + require.Contains(t, sn[8].Resource().Attributes(), attribute.String("telemetry.sdk.name", "opentelemetry")) + require.Contains(t, sn[8].Resource().Attributes(), attribute.String("telemetry.sdk.language", "go")) + require.Contains(t, sn[8].Resource().Attributes(), attribute.String("service.version", "dev")) + require.Contains(t, sn[8].Resource().Attributes(), attribute.String("service.name", "cosmo-router")) + + sa = attribute.NewSet(sn[8].Attributes()...) + + require.Len(t, sn[8].Attributes(), 26) + require.True(t, sa.HasValue(semconv.NetHostPortKey)) + require.True(t, sa.HasValue(semconv.NetSockPeerAddrKey)) + require.True(t, sa.HasValue(semconv.NetSockPeerPortKey)) + require.True(t, sa.HasValue(otel.WgRouterConfigVersion)) + require.True(t, sa.HasValue(otel.WgFederatedGraphID)) + require.True(t, sa.HasValue("http.user_agent")) + require.True(t, sa.HasValue("http.host")) + require.True(t, sa.HasValue("http.read_bytes")) + require.True(t, sa.HasValue("http.wrote_bytes")) + + require.Contains(t, sn[8].Attributes(), semconv.HTTPMethod("POST")) + require.Contains(t, sn[8].Attributes(), semconv.HTTPScheme("http")) + require.Contains(t, sn[8].Attributes(), semconv.HTTPFlavorKey.String("1.1")) + require.Contains(t, sn[8].Attributes(), semconv.NetHostName("localhost")) + require.Contains(t, sn[8].Attributes(), otel.WgRouterVersion.String("dev")) + require.Contains(t, sn[8].Attributes(), otel.WgRouterClusterName.String("")) + require.Contains(t, sn[8].Attributes(), otel.WgComponentName.String("router-server")) + require.Contains(t, sn[8].Attributes(), otel.WgRouterRootSpan.Bool(true)) + require.Contains(t, sn[8].Attributes(), semconv.HTTPTarget("/graphql")) + require.Contains(t, sn[8].Attributes(), otel.WgClientName.String("unknown")) + require.Contains(t, sn[8].Attributes(), otel.WgClientVersion.String("missing")) + require.Contains(t, sn[8].Attributes(), otel.WgOperationProtocol.String("http")) + require.Contains(t, sn[8].Attributes(), otel.WgOperationName.String("")) + require.Contains(t, sn[8].Attributes(), otel.WgOperationType.String("query")) + require.Contains(t, sn[8].Attributes(), otel.WgOperationContent.String("query($a: Int!){employee(id: $a){id details {forename surname}}}")) + require.Contains(t, sn[8].Attributes(), otel.WgFederatedGraphID.String("graph")) + require.Contains(t, sn[8].Attributes(), otel.WgOperationHash.String("14671468813149144966")) + require.Contains(t, sn[8].Attributes(), semconv.HTTPStatusCode(200)) + + /** + * Metrics + */ + rm := metricdata.ResourceMetrics{} + err := metricReader.Collect(context.Background(), &rm) + require.NoError(t, err) + + rs = attribute.NewSet(rm.Resource.Attributes()...) + + require.True(t, rs.HasValue("host.name")) + require.True(t, rs.HasValue("os.type")) + require.True(t, rs.HasValue("process.pid")) + + require.NotEmpty(t, rm.Resource.Attributes(), attribute.String("telemetry.sdk.version", "1.24.0")) + require.Contains(t, rm.Resource.Attributes(), attribute.String("service.instance.id", "test-instance")) + require.Contains(t, rm.Resource.Attributes(), attribute.String("telemetry.sdk.name", "opentelemetry")) + require.Contains(t, rm.Resource.Attributes(), attribute.String("telemetry.sdk.language", "go")) + require.Contains(t, rm.Resource.Attributes(), attribute.String("service.version", "dev")) + require.Contains(t, rm.Resource.Attributes(), attribute.String("service.name", "cosmo-router")) + + require.Equal(t, 1, len(rm.ScopeMetrics), "expected 1 ScopeMetrics, got %d", len(rm.ScopeMetrics)) + require.Equal(t, 6, len(rm.ScopeMetrics[0].Metrics), "expected 6 Metrics, got %d", len(rm.ScopeMetrics[0].Metrics)) + }) + }) + t.Run("Trace persisted operation", func(t *testing.T) { t.Parallel() diff --git a/router-tests/timeout_test.go b/router-tests/timeout_test.go new file mode 100644 index 0000000000..338eed777b --- /dev/null +++ b/router-tests/timeout_test.go @@ -0,0 +1,132 @@ +package integration_test + +import ( + "github.com/stretchr/testify/require" + "github.com/wundergraph/cosmo/router-tests/testenv" + "github.com/wundergraph/cosmo/router/core" + "github.com/wundergraph/cosmo/router/pkg/config" + "net/http" + "testing" + "time" +) + +func TestTimeouts(t *testing.T) { + t.Parallel() + + const queryEmployeeWithHobby = `{ + employee(id: 1) { + id + hobbies { + ... on Gaming { + name + } + } + } + }` + + const queryEmployeeWithNoHobby = `{ + employee(id: 1) { + id + } + }` + + t.Run("applies RequestTimeout", func(t *testing.T) { + t.Parallel() + + hobbySubgraphSleep := testenv.SubgraphsConfig{ + Hobbies: testenv.SubgraphConfig{ + Middleware: func(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + time.Sleep(5 * time.Millisecond) // Slow response + w.Write([]byte("Hello, world!")) + }) + }, + }, + } + + trafficConfig := config.TrafficShapingRules{ + All: config.GlobalSubgraphRequestRule{ + RequestTimeout: 10 * time.Millisecond, + }, + Subgraphs: map[string]*config.GlobalSubgraphRequestRule{ + "hobbies": { + RequestTimeout: 3 * time.Millisecond, + }, + }, + } + t.Run("applied subgraph timeout to request", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + Subgraphs: hobbySubgraphSleep, + RouterOptions: []core.Option{ + core.WithSubgraphTransportOptions( + core.NewSubgraphTransportOptions(trafficConfig)), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: queryEmployeeWithHobby, + }) + require.Equal(t, `{"errors":[{"message":"Failed to fetch from Subgraph 'hobbies' at Path 'employee'."}],"data":{"employee":{"id":1,"hobbies":null}}}`, res.Body) + }) + }) + + t.Run("Subgraph timeout options don't affect unrelated subgraph", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + Subgraphs: hobbySubgraphSleep, + RouterOptions: []core.Option{ + core.WithSubgraphTransportOptions( + core.NewSubgraphTransportOptions(trafficConfig)), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: queryEmployeeWithNoHobby, + }) + require.Equal(t, `{"data":{"employee":{"id":1}}}`, res.Body) + }) + }) + }) + + t.Run("ResponseHeaderTimeout exceeded", func(t *testing.T) { + t.Parallel() + + hobbySubgraphSleep := testenv.SubgraphsConfig{ + Hobbies: testenv.SubgraphConfig{ + Middleware: func(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + time.Sleep(5 * time.Millisecond) // Slow response + w.Write([]byte("Hello, world!")) + }) + }, + }, + } + + trafficConfig := config.TrafficShapingRules{ + All: config.GlobalSubgraphRequestRule{ + RequestTimeout: 10 * time.Millisecond, + }, + Subgraphs: map[string]*config.GlobalSubgraphRequestRule{ + "hobbies": { + ResponseHeaderTimeout: 3 * time.Millisecond, + }, + }, + } + + testenv.Run(t, &testenv.Config{ + Subgraphs: hobbySubgraphSleep, + RouterOptions: []core.Option{ + core.WithSubgraphTransportOptions( + core.NewSubgraphTransportOptions(trafficConfig)), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: queryEmployeeWithNoHobby, + }) + require.Equal(t, `{"data":{"employee":{"id":1}}}`, res.Body) + }) + }) +} diff --git a/router/cmd/instance.go b/router/cmd/instance.go index d219bb42ca..fb68001dc4 100644 --- a/router/cmd/instance.go +++ b/router/cmd/instance.go @@ -124,15 +124,7 @@ func NewRouter(params Params, additionalOptions ...core.Option) (*core.Router, e core.WithHeaderRules(cfg.Headers), core.WithRouterTrafficConfig(&cfg.TrafficShaping.Router), core.WithFileUploadConfig(&cfg.FileUpload), - core.WithSubgraphTransportOptions(&core.SubgraphTransportOptions{ - RequestTimeout: cfg.TrafficShaping.All.RequestTimeout, - ResponseHeaderTimeout: cfg.TrafficShaping.All.ResponseHeaderTimeout, - ExpectContinueTimeout: cfg.TrafficShaping.All.ExpectContinueTimeout, - KeepAliveIdleTimeout: cfg.TrafficShaping.All.KeepAliveIdleTimeout, - DialTimeout: cfg.TrafficShaping.All.DialTimeout, - TLSHandshakeTimeout: cfg.TrafficShaping.All.TLSHandshakeTimeout, - KeepAliveProbeInterval: cfg.TrafficShaping.All.KeepAliveProbeInterval, - }), + core.WithSubgraphTransportOptions(core.NewSubgraphTransportOptions(cfg.TrafficShaping)), core.WithSubgraphRetryOptions( cfg.TrafficShaping.All.BackoffJitterRetry.Enabled, cfg.TrafficShaping.All.BackoffJitterRetry.MaxAttempts, diff --git a/router/core/graph_server.go b/router/core/graph_server.go index f1ebd56362..4a20669285 100644 --- a/router/core/graph_server.go +++ b/router/core/graph_server.go @@ -80,6 +80,7 @@ type ( playgroundHandler func(http.Handler) http.Handler publicKey *ecdsa.PublicKey executionTransport *http.Transport + executionTransportProxy ProxyFunc baseOtelAttributes []attribute.KeyValue baseRouterConfigVersion string mux *chi.Mux @@ -103,7 +104,8 @@ func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterC cancelFunc: cancel, Config: &r.Config, websocketStats: r.WebsocketStats, - executionTransport: newHTTPTransport(r.subgraphTransportOptions, proxy), + executionTransport: newHTTPTransport(r.subgraphTransportOptions.TransportTimeoutOptions, proxy), + executionTransportProxy: proxy, playgroundHandler: r.playgroundHandler, baseRouterConfigVersion: routerConfig.GetVersion(), inFlightRequests: &atomic.Uint64{}, @@ -788,10 +790,11 @@ func (s *graphServer) buildGraphMux(ctx context.Context, logger: s.logger, trackUsageInfo: s.graphqlMetricsConfig.Enabled, transportOptions: &TransportOptions{ - RequestTimeout: s.subgraphTransportOptions.RequestTimeout, - PreHandlers: s.preOriginHandlers, - PostHandlers: s.postOriginHandlers, - MetricStore: gm.metricStore, + Proxy: s.executionTransportProxy, + SubgraphTransportOptions: s.subgraphTransportOptions, + PreHandlers: s.preOriginHandlers, + PostHandlers: s.postOriginHandlers, + MetricStore: gm.metricStore, RetryOptions: retrytransport.RetryOptions{ Enabled: s.retryOptions.Enabled, MaxRetryCount: s.retryOptions.MaxRetryCount, diff --git a/router/core/router.go b/router/core/router.go index 02934106a8..9e1102b963 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -84,7 +84,7 @@ type ( proxy ProxyFunc } - SubgraphTransportOptions struct { + TransportTimeoutOptions struct { RequestTimeout time.Duration ResponseHeaderTimeout time.Duration ExpectContinueTimeout time.Duration @@ -94,6 +94,11 @@ type ( KeepAliveProbeInterval time.Duration } + SubgraphTransportOptions struct { + TransportTimeoutOptions + SubgraphMap map[string]*TransportTimeoutOptions + } + GraphQLMetricsConfig struct { Enabled bool CollectorEndpoint string @@ -1585,15 +1590,44 @@ func DefaultFileUploadConfig() *config.FileUpload { } } +func NewTransportTimeoutOptions(cfg config.GlobalSubgraphRequestRule) TransportTimeoutOptions { + return TransportTimeoutOptions{ + RequestTimeout: cfg.RequestTimeout, + ResponseHeaderTimeout: cfg.ResponseHeaderTimeout, + ExpectContinueTimeout: cfg.ExpectContinueTimeout, + KeepAliveIdleTimeout: cfg.KeepAliveIdleTimeout, + DialTimeout: cfg.DialTimeout, + TLSHandshakeTimeout: cfg.TLSHandshakeTimeout, + KeepAliveProbeInterval: cfg.KeepAliveProbeInterval, + } +} + +func NewSubgraphTransportOptions(cfg config.TrafficShapingRules) *SubgraphTransportOptions { + base := &SubgraphTransportOptions{ + TransportTimeoutOptions: NewTransportTimeoutOptions(cfg.All), + SubgraphMap: map[string]*TransportTimeoutOptions{}, + } + + for k, v := range cfg.Subgraphs { + opts := NewTransportTimeoutOptions(*v) + base.SubgraphMap[k] = &opts + } + + return base +} + func DefaultSubgraphTransportOptions() *SubgraphTransportOptions { return &SubgraphTransportOptions{ - RequestTimeout: 60 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ResponseHeaderTimeout: 0 * time.Second, - ExpectContinueTimeout: 0 * time.Second, - KeepAliveProbeInterval: 30 * time.Second, - KeepAliveIdleTimeout: 0 * time.Second, - DialTimeout: 30 * time.Second, + TransportTimeoutOptions: TransportTimeoutOptions{ + RequestTimeout: 60 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ResponseHeaderTimeout: 0 * time.Second, + ExpectContinueTimeout: 0 * time.Second, + KeepAliveProbeInterval: 30 * time.Second, + KeepAliveIdleTimeout: 0 * time.Second, + DialTimeout: 30 * time.Second, + }, + SubgraphMap: map[string]*TransportTimeoutOptions{}, } } @@ -1723,7 +1757,7 @@ func WithHostName(hostName string) Option { type ProxyFunc func(req *http.Request) (*url.URL, error) -func newHTTPTransport(opts *SubgraphTransportOptions, proxy ProxyFunc) *http.Transport { +func newHTTPTransport(opts TransportTimeoutOptions, proxy ProxyFunc) *http.Transport { dialer := &net.Dialer{ Timeout: opts.DialTimeout, KeepAlive: opts.KeepAliveProbeInterval, diff --git a/router/core/timout_transport.go b/router/core/timout_transport.go new file mode 100644 index 0000000000..7289f67075 --- /dev/null +++ b/router/core/timout_transport.go @@ -0,0 +1,53 @@ +package core + +import ( + "context" + "go.uber.org/zap" + "net/http" +) + +type TimeoutTransport struct { + defaultTransport http.RoundTripper + logger *zap.Logger + subgraphTrippers map[string]*http.Transport + opts *SubgraphTransportOptions +} + +func NewTimeoutTransport(transportOpts *SubgraphTransportOptions, roundTripper http.RoundTripper, logger *zap.Logger, proxy ProxyFunc) *TimeoutTransport { + tt := &TimeoutTransport{ + defaultTransport: roundTripper, + logger: logger, + subgraphTrippers: map[string]*http.Transport{}, + opts: transportOpts, + } + + for subgraph, subgraphOpts := range transportOpts.SubgraphMap { + if subgraphOpts != nil { + tt.subgraphTrippers[subgraph] = newHTTPTransport(*subgraphOpts, proxy) + } + } + + return tt +} + +func (tt *TimeoutTransport) RoundTrip(req *http.Request) (*http.Response, error) { + if req == nil { + return nil, nil + } + + rq := getRequestContext(req.Context()) + if rq == nil { + return nil, nil + } + subgraph := rq.ActiveSubgraph(req) + if subgraph != nil && subgraph.Name != "" && tt.subgraphTrippers[subgraph.Name] != nil { + timeout := tt.opts.SubgraphMap[subgraph.Name].RequestTimeout + if timeout > 0 { + ctx, cancel := context.WithTimeout(req.Context(), timeout) + defer cancel() + return tt.subgraphTrippers[subgraph.Name].RoundTrip(req.WithContext(ctx)) + } + return tt.subgraphTrippers[subgraph.Name].RoundTrip(req) + } + return tt.defaultTransport.RoundTrip(req) +} diff --git a/router/core/timout_transport_test.go b/router/core/timout_transport_test.go new file mode 100644 index 0000000000..5e25ce1631 --- /dev/null +++ b/router/core/timout_transport_test.go @@ -0,0 +1,207 @@ +package core + +import ( + "context" + "crypto/tls" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" +) + +func TestTimeoutTransport(t *testing.T) { + t.Parallel() + + var ( + testSubgraphKey = "test" + ) + + t.Run("applies request timeout", func(t *testing.T) { + t.Parallel() + + t.Run("Fast response within timeout", func(t *testing.T) { + t.Parallel() + + fastServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer fastServer.Close() + + rqCtx := &requestContext{ + subgraphResolver: NewSubgraphResolver([]Subgraph{{Name: testSubgraphKey, UrlString: fastServer.URL}}), + } + + transportOpts := &SubgraphTransportOptions{ + SubgraphMap: map[string]*TransportTimeoutOptions{ + testSubgraphKey: { + RequestTimeout: 10 * time.Millisecond, + }, + }, + } + + req := httptest.NewRequest("GET", fastServer.URL, nil) + req = req.WithContext(withRequestContext(req.Context(), rqCtx)) + + timeoutTransport := NewTimeoutTransport( + transportOpts, + http.DefaultTransport, + zap.NewNop(), + http.ProxyFromEnvironment, + ) + resp, err := timeoutTransport.RoundTrip(req) + require.Nil(t, err) + require.NotNil(t, resp) + require.Equal(t, http.StatusOK, resp.StatusCode) + }) + + t.Run("Slow response exceeding timeout", func(t *testing.T) { + t.Parallel() + + transportOpts := &SubgraphTransportOptions{ + SubgraphMap: map[string]*TransportTimeoutOptions{ + testSubgraphKey: { + RequestTimeout: 10 * time.Millisecond, + }, + }, + } + + slowServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + time.Sleep(20 * time.Millisecond) // Slow response + w.Write([]byte("Hello, world!")) + })) + defer slowServer.Close() + + rqCtx := &requestContext{ + subgraphResolver: NewSubgraphResolver([]Subgraph{{Name: testSubgraphKey, UrlString: slowServer.URL}}), + } + + req := httptest.NewRequest("GET", slowServer.URL, nil) + req = req.WithContext(withRequestContext(req.Context(), rqCtx)) + + timeoutTransport := NewTimeoutTransport( + transportOpts, + http.DefaultTransport, + zap.NewNop(), + http.ProxyFromEnvironment, + ) + + resp, err := timeoutTransport.RoundTrip(req) + require.NotNil(t, err) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.Nil(t, resp) // No response due to timeout + }) + }) + + t.Run("ResponseHeaderTimeout exceeded", func(t *testing.T) { + t.Parallel() + + transportOpts := &SubgraphTransportOptions{ + SubgraphMap: map[string]*TransportTimeoutOptions{ + testSubgraphKey: { + ResponseHeaderTimeout: 3 * time.Millisecond, + }, + }, + } + + headerTimeoutServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(5 * time.Millisecond) // Delayed header response + w.WriteHeader(http.StatusOK) + })) + defer headerTimeoutServer.Close() + + rqCtx := &requestContext{ + subgraphResolver: NewSubgraphResolver([]Subgraph{{Name: "test", UrlString: headerTimeoutServer.URL}}), + } + + req := httptest.NewRequest("GET", headerTimeoutServer.URL, nil) + req = req.WithContext(withRequestContext(req.Context(), rqCtx)) + + timeoutTransport := NewTimeoutTransport( + transportOpts, + http.DefaultTransport, + zap.NewNop(), + http.ProxyFromEnvironment, + ) + + resp, err := timeoutTransport.RoundTrip(req) + require.NotNil(t, err) + require.ErrorContains(t, err, "timeout awaiting response headers") + require.Nil(t, resp) + }) + + t.Run("TLSHandshakeTimeout exceeded", func(t *testing.T) { + t.Parallel() + + transportOpts := &SubgraphTransportOptions{ + SubgraphMap: map[string]*TransportTimeoutOptions{ + testSubgraphKey: { + TLSHandshakeTimeout: 2 * time.Millisecond, + }, + }, + } + + tlsServer := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + tlsServer.TLS = &tls.Config{} + tlsServer.StartTLS() + defer tlsServer.Close() + + rqCtx := &requestContext{ + subgraphResolver: NewSubgraphResolver([]Subgraph{{Name: "test", UrlString: tlsServer.URL}}), + } + + req := httptest.NewRequest("GET", tlsServer.URL, nil) + req = req.WithContext(withRequestContext(req.Context(), rqCtx)) + + timeoutTransport := NewTimeoutTransport( + transportOpts, + http.DefaultTransport, + zap.NewNop(), + http.ProxyFromEnvironment, + ) + + resp, err := timeoutTransport.RoundTrip(req) + require.NotNil(t, err) + require.Contains(t, err.Error(), "TLS handshake timeout") + require.Nil(t, resp) + }) + + t.Run("DialTimeout exceeded", func(t *testing.T) { + t.Parallel() + + transportOpts := &SubgraphTransportOptions{ + SubgraphMap: map[string]*TransportTimeoutOptions{ + testSubgraphKey: { + DialTimeout: 1 * time.Millisecond, + }, + }, + } + + unreachableServerURL := "http://192.0.2.1" // Reserved IP address unlikely to respond + + rqCtx := &requestContext{ + subgraphResolver: NewSubgraphResolver([]Subgraph{{Name: testSubgraphKey, UrlString: unreachableServerURL}}), + } + + req := httptest.NewRequest("GET", unreachableServerURL, nil) + req = req.WithContext(withRequestContext(req.Context(), rqCtx)) + + timeoutTransport := NewTimeoutTransport( + transportOpts, + http.DefaultTransport, + zap.NewNop(), + http.ProxyFromEnvironment, + ) + + resp, err := timeoutTransport.RoundTrip(req) + require.NotNil(t, err) + require.ErrorContains(t, err, "dial tcp") + require.ErrorAs(t, err, &os.ErrDeadlineExceeded) + require.Nil(t, resp) + }) +} diff --git a/router/core/transport.go b/router/core/transport.go index 38faf9684f..7ad5b79389 100644 --- a/router/core/transport.go +++ b/router/core/transport.go @@ -31,6 +31,10 @@ import ( "go.uber.org/zap" ) +var ( + defaultTimeout = 60 * time.Second +) + type TransportPreHandler func(req *http.Request, ctx RequestContext) (*http.Request, *http.Response) type TransportPostHandler func(resp *http.Response, ctx RequestContext) *http.Response @@ -63,6 +67,7 @@ func NewCustomTransport( ct := &CustomTransport{ metricStore: metricStore, } + if retryOptions.Enabled { ct.roundTripper = retrytransport.NewRetryHTTPTransport(roundTripper, retryOptions, logger) } else { @@ -296,12 +301,13 @@ func (ct *CustomTransport) singleFlightKey(req *http.Request) uint64 { type TransportFactory struct { preHandlers []TransportPreHandler postHandlers []TransportPostHandler + subgraphTransportOptions *SubgraphTransportOptions retryOptions retrytransport.RetryOptions - requestTimeout time.Duration localhostFallbackInsideDocker bool metricStore metric.Store logger *zap.Logger tracerProvider *sdktrace.TracerProvider + proxy ProxyFunc } var _ ApiTransportFactory = TransportFactory{} @@ -309,8 +315,9 @@ var _ ApiTransportFactory = TransportFactory{} type TransportOptions struct { PreHandlers []TransportPreHandler PostHandlers []TransportPostHandler + SubgraphTransportOptions *SubgraphTransportOptions + Proxy ProxyFunc RetryOptions retrytransport.RetryOptions - RequestTimeout time.Duration LocalhostFallbackInsideDocker bool MetricStore metric.Store Logger *zap.Logger @@ -322,20 +329,26 @@ func NewTransport(opts *TransportOptions) *TransportFactory { preHandlers: opts.PreHandlers, postHandlers: opts.PostHandlers, retryOptions: opts.RetryOptions, - requestTimeout: opts.RequestTimeout, + subgraphTransportOptions: opts.SubgraphTransportOptions, localhostFallbackInsideDocker: opts.LocalhostFallbackInsideDocker, metricStore: opts.MetricStore, logger: opts.Logger, tracerProvider: opts.TracerProvider, + proxy: opts.Proxy, } } -func (t TransportFactory) RoundTripper(enableSingleFlight bool, transport http.RoundTripper) http.RoundTripper { +func (t TransportFactory) RoundTripper(enableSingleFlight bool, baseTransport http.RoundTripper) http.RoundTripper { + if t.subgraphTransportOptions != nil && t.subgraphTransportOptions.SubgraphMap != nil && len(t.subgraphTransportOptions.SubgraphMap) > 0 { + baseTransport = NewTimeoutTransport(t.subgraphTransportOptions, baseTransport, t.logger, t.proxy) + } + if t.localhostFallbackInsideDocker && docker.Inside() { - transport = docker.NewLocalhostFallbackRoundTripper(transport) + baseTransport = docker.NewLocalhostFallbackRoundTripper(baseTransport) } + traceTransport := trace.NewTransport( - transport, + baseTransport, []otelhttp.Option{ otelhttp.WithSpanNameFormatter(SpanNameFormatter), otelhttp.WithSpanOptions(otrace.WithAttributes(otel.EngineTransportAttribute)), @@ -375,7 +388,10 @@ func (t TransportFactory) RoundTripper(enableSingleFlight bool, transport http.R } func (t TransportFactory) DefaultTransportTimeout() time.Duration { - return t.requestTimeout + if t.subgraphTransportOptions != nil { + return t.subgraphTransportOptions.RequestTimeout + } + return defaultTimeout } func (t TransportFactory) DefaultHTTPProxyURL() *url.URL { diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index 572e46448a..f4146d6f39 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -139,6 +139,8 @@ type TrafficShapingRules struct { All GlobalSubgraphRequestRule `yaml:"all"` // Apply to requests from clients to the router Router RouterTrafficConfiguration `yaml:"router"` + // Subgraphs is a set of rules that apply to requests from the router to subgraphs. The key is the subgraph name. + Subgraphs map[string]*GlobalSubgraphRequestRule `yaml:"subgraphs,omitempty"` } type FileUpload struct { @@ -164,6 +166,10 @@ type GlobalSubgraphRequestRule struct { KeepAliveProbeInterval time.Duration `yaml:"keep_alive_probe_interval,omitempty" envDefault:"30s"` } +type SubgraphTrafficRequestRule struct { + RequestTimeout time.Duration `yaml:"request_timeout,omitempty" envDefault:"60s"` +} + type GraphqlMetrics struct { Enabled bool `yaml:"enabled" envDefault:"true" env:"GRAPHQL_METRICS_ENABLED"` CollectorEndpoint string `yaml:"collector_endpoint" envDefault:"https://cosmo-metrics.wundergraph.com" env:"GRAPHQL_METRICS_COLLECTOR_ENDPOINT"` diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index 62408dba68..4392030020 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -1243,6 +1243,19 @@ } } } + }, + "subgraphs": { + "type": "object", + "description": "The configuration to control traffic shaping for specific subgraphs.", + "additionalProperties": { + "request_timeout": { + "type": "string", + "duration": { + "minimum": "1s" + }, + "description": "The request timeout. The period is specified as a string with a number and a unit, e.g. 10ms, 1s, 1m, 1h. The supported units are 'ms', 's', 'm', 'h'." + } + } } } }, diff --git a/router/pkg/config/fixtures/full.yaml b/router/pkg/config/fixtures/full.yaml index 308427a359..52bcd446cb 100644 --- a/router/pkg/config/fixtures/full.yaml +++ b/router/pkg/config/fixtures/full.yaml @@ -162,6 +162,9 @@ traffic_shaping: max_attempts: 5 interval: 3s max_duration: 10s + subgraphs: + products: # Will only affect this subgraph + request_timeout: 120s # Header manipulation # See "https://cosmo-docs.wundergraph.com/router/proxy-capabilities" for more information diff --git a/router/pkg/config/testdata/config_defaults.json b/router/pkg/config/testdata/config_defaults.json index b85196ad18..a902983883 100644 --- a/router/pkg/config/testdata/config_defaults.json +++ b/router/pkg/config/testdata/config_defaults.json @@ -119,7 +119,8 @@ }, "Router": { "MaxRequestBodyBytes": 5000000 - } + }, + "Subgraphs": null }, "FileUpload": { "Enabled": true, diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index d7d3625682..77a2eec8b6 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -243,6 +243,24 @@ }, "Router": { "MaxRequestBodyBytes": 5000000 + }, + "Subgraphs": { + "products": { + "BackoffJitterRetry": { + "Enabled": false, + "Algorithm": "", + "MaxAttempts": 0, + "MaxDuration": 0, + "Interval": 0 + }, + "RequestTimeout": 120000000000, + "DialTimeout": 0, + "ResponseHeaderTimeout": 0, + "ExpectContinueTimeout": 0, + "TLSHandshakeTimeout": 0, + "KeepAliveIdleTimeout": 0, + "KeepAliveProbeInterval": 0 + } } }, "FileUpload": {