Skip to content

Commit

Permalink
fix: don't re-use buffer for reading requests (#1447)
Browse files Browse the repository at this point in the history
  • Loading branch information
jensneuse authored Dec 16, 2024
1 parent 3f70b21 commit 3262622
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 31 deletions.
25 changes: 25 additions & 0 deletions router-tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package integration_test

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -382,6 +383,30 @@ func TestProxy(t *testing.T) {
})
}

func TestConcurrentBodyRead(t *testing.T) {
t.Parallel()

expectedData := `{"data":{"employees":[{"id":1,"details":{"forename":"Jens","surname":"Neuse","hasChildren":true},"role":{"title":["Founder","CEO"],"departments":["ENGINEERING","MARKETING"]},"hobbies":[{"category":"SPORT"},{"name":"Counter Strike","genres":["FPS"],"yearsOfExperience":20},{"name":"WunderGraph"},{"languages":["GO","TYPESCRIPT"]},{"countriesLived":[{"language":"English"},{"language":"German"}]}]},{"id":2,"details":{"forename":"Dustin","surname":"Deus","hasChildren":false},"role":{"title":["Co-founder","Tech Lead"],"departments":["ENGINEERING"]},"hobbies":[{"category":"STRENGTH_TRAINING"},{"name":"Counter Strike","genres":["FPS"],"yearsOfExperience":0.5},{"languages":["GO","RUST"]}]},{"id":3,"details":{"forename":"Stefan","surname":"Avram","hasChildren":false},"role":{"title":["Co-founder","Head of Growth"],"departments":["MARKETING"]},"hobbies":[{"category":"HIKING"},{"category":"SPORT"},{"name":"Reading"},{"countriesLived":[{"language":"English"},{"language":"Serbian"}]}]},{"id":4,"details":{"forename":"Björn","surname":"Schwenzer","hasChildren":true},"role":{"title":["Co-founder","COO"],"departments":["OPERATIONS","MARKETING"]},"hobbies":[{"category":"HIKING"},{"planeModels":["Aquila AT01","Cessna C172","Cessna C206","Cirrus SR20","Cirrus SR22","Diamond DA40","Diamond HK36","Diamond DA20","Piper Cub","Pitts Special","Robin DR400"],"yearsOfExperience":20},{"countriesLived":[{"language":"English"},{"language":"German"}]}]},{"id":5,"details":{"forename":"Sergiy","surname":"Petrunin","hasChildren":false},"role":{"title":["Senior GO Engineer"],"departments":["ENGINEERING"]},"hobbies":[{"name":"Building a house"},{"name":"Forumla 1"},{"name":"Raising cats"}]},{"id":7,"details":{"forename":"Suvij","surname":"Surya","hasChildren":false},"role":{"title":["Software Engineer"],"departments":["ENGINEERING"]},"hobbies":[{"name":"Chess","genres":["BOARD"],"yearsOfExperience":9.5},{"name":"Watching anime"}]},{"id":8,"details":{"forename":"Nithin","surname":"Kumar","hasChildren":false},"role":{"title":["Software Engineer"],"departments":["ENGINEERING"]},"hobbies":[{"category":"STRENGTH_TRAINING"},{"name":"Miscellaneous","genres":["ADVENTURE","RPG","SIMULATION","STRATEGY"],"yearsOfExperience":17},{"name":"Watching anime"}]},{"id":10,"details":{"forename":"Eelco","surname":"Wiersma","hasChildren":false},"role":{"title":["Senior Frontend Engineer"],"departments":["ENGINEERING"]},"hobbies":[{"languages":["TYPESCRIPT"]},{"category":"CALISTHENICS"},{"category":"HIKING"},{"category":"STRENGTH_TRAINING"},{"name":"saas-ui"},{"countriesLived":[{"language":"German"},{"language":"Indonesian"},{"language":"Dutch"},{"language":"Portuguese"},{"language":"Spanish"},{"language":"Thai"}]}]},{"id":11,"details":{"forename":"Alexandra","surname":"Neuse","hasChildren":true},"role":{"title":["Accounting & Finance"],"departments":["OPERATIONS"]},"hobbies":[{"name":"Spending time with the family"}]},{"id":12,"details":{"forename":"David","surname":"Stutt","hasChildren":false},"role":{"title":["Software Engineer"],"departments":["ENGINEERING"]},"hobbies":[{"languages":["CSHARP","GO","RUST","TYPESCRIPT"]},{"category":"STRENGTH_TRAINING"},{"name":"Miscellaneous","genres":["ADVENTURE","BOARD","CARD","ROGUELITE","RPG","SIMULATION","STRATEGY"],"yearsOfExperience":25.5},{"countriesLived":[{"language":"English"},{"language":"Korean"},{"language":"Taiwanese"}]}]}]}}`

testenv.Run(t, &testenv.Config{}, func(t *testing.T, xEnv *testenv.Environment) {
goRoutines := 10
wg := &sync.WaitGroup{}
wg.Add(goRoutines)
for i := 0; i < goRoutines; i++ {
go func() {
wg.Done()
res, err := xEnv.MakeGraphQLRequestWithContext(context.Background(), testenv.GraphQLRequest{
Query: bigEmployeesQuery,
})
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.Response.StatusCode)
require.Equal(t, expectedData, res.Body)
}()
}
wg.Wait()
})
}

func TestTracing(t *testing.T) {
t.Parallel()

Expand Down
33 changes: 6 additions & 27 deletions router/core/graphql_prehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ type PreHandler struct {
maxUploadFiles int
maxUploadFileSize int
complexityLimits *config.ComplexityLimits
bodyReadBuffers *sync.Pool
trackSchemaUsageInfo bool
clientHeader config.ClientHeader
computeOperationSha256 bool
Expand Down Expand Up @@ -126,7 +125,6 @@ func NewPreHandler(opts *PreHandlerOptions) *PreHandler {
maxUploadFiles: opts.MaxUploadFiles,
maxUploadFileSize: opts.MaxUploadFileSize,
complexityLimits: opts.ComplexityLimits,
bodyReadBuffers: &sync.Pool{},
alwaysIncludeQueryPlan: opts.AlwaysIncludeQueryPlan,
alwaysSkipLoader: opts.AlwaysSkipLoader,
queryPlansEnabled: opts.QueryPlansEnabled,
Expand All @@ -140,20 +138,11 @@ func NewPreHandler(opts *PreHandlerOptions) *PreHandler {

func (h *PreHandler) getBodyReadBuffer(preferredSize int64) *bytes.Buffer {
if preferredSize <= 0 {
preferredSize = 1024
preferredSize = 1024 * 4 // 4KB
} else if preferredSize > h.operationProcessor.maxOperationSizeInBytes {
preferredSize = h.operationProcessor.maxOperationSizeInBytes
}
buf := h.bodyReadBuffers.Get()
if buf == nil {
return bytes.NewBuffer(make([]byte, 0, preferredSize))
}
return buf.(*bytes.Buffer)
}

func (h *PreHandler) releaseBodyReadBuffer(buf *bytes.Buffer) {
buf.Reset()
h.bodyReadBuffers.Put(buf)
return bytes.NewBuffer(make([]byte, 0, preferredSize))
}

// Error and Status Code handling
Expand Down Expand Up @@ -243,9 +232,6 @@ func (h *PreHandler) Handler(next http.Handler) http.Handler {

var body []byte
var files []httpclient.File
// XXX: This buffer needs to be returned to the pool only
// AFTER we're done with body (retrieved from parser.ReadBody())
buf := h.getBodyReadBuffer(r.ContentLength)

if strings.Contains(r.Header.Get("Content-Type"), "multipart/form-data") {
if !h.fileUploadEnabled {
Expand All @@ -254,7 +240,6 @@ func (h *PreHandler) Handler(next http.Handler) http.Handler {
statusCode: http.StatusOK,
}
writeOperationError(r, w, requestLogger, requestContext.error)
h.releaseBodyReadBuffer(buf)
return
}

Expand All @@ -266,11 +251,10 @@ func (h *PreHandler) Handler(next http.Handler) http.Handler {
multipartParser := NewMultipartParser(h.operationProcessor, h.maxUploadFiles, h.maxUploadFileSize)

var err error
body, files, err = multipartParser.Parse(r, buf)
body, files, err = multipartParser.Parse(r, h.getBodyReadBuffer(r.ContentLength))
if err != nil {
requestContext.error = err
writeOperationError(r, w, requestLogger, requestContext.error)
h.releaseBodyReadBuffer(buf)
readMultiPartSpan.End()
return
}
Expand All @@ -295,7 +279,7 @@ func (h *PreHandler) Handler(next http.Handler) http.Handler {
)

var err error
body, err = h.operationProcessor.ReadBody(buf, r.Body)
body, err = h.operationProcessor.ReadBody(r.Body, h.getBodyReadBuffer(r.ContentLength))
if err != nil {
requestContext.error = err

Expand All @@ -304,7 +288,6 @@ func (h *PreHandler) Handler(next http.Handler) http.Handler {
// The error is logged as debug log in the writeOperationError function

writeOperationError(r, w, requestLogger, err)
h.releaseBodyReadBuffer(buf)
readOperationBodySpan.End()
return
}
Expand All @@ -315,7 +298,7 @@ func (h *PreHandler) Handler(next http.Handler) http.Handler {
variablesParser := h.variableParsePool.Get()
defer h.variableParsePool.Put(variablesParser)

err = h.handleOperation(r, buf, variablesParser, &httpOperation{
err = h.handleOperation(r, variablesParser, &httpOperation{
requestContext: requestContext,
requestLogger: requestLogger,
routerSpan: routerSpan,
Expand All @@ -330,7 +313,6 @@ func (h *PreHandler) Handler(next http.Handler) http.Handler {
rtrace.AttachErrToSpan(routerSpan, err)

writeOperationError(r, w, requestLogger, err)
h.releaseBodyReadBuffer(buf)
return
}

Expand Down Expand Up @@ -385,7 +367,7 @@ func (h *PreHandler) Handler(next http.Handler) http.Handler {
})
}

func (h *PreHandler) handleOperation(req *http.Request, buf *bytes.Buffer, variablesParser *astjson.Parser, httpOperation *httpOperation) error {
func (h *PreHandler) handleOperation(req *http.Request, variablesParser *astjson.Parser, httpOperation *httpOperation) error {
operationKit, err := h.operationProcessor.NewKit()
if err != nil {
return err
Expand Down Expand Up @@ -524,9 +506,6 @@ func (h *PreHandler) handleOperation(req *http.Request, buf *bytes.Buffer, varia
// Set the router span name after we have the operation name
httpOperation.routerSpan.SetName(GetSpanName(operationKit.parsedOperation.Request.OperationName, operationKit.parsedOperation.Type))

// Give the buffer back to the pool as soon as we're done with it
h.releaseBodyReadBuffer(buf)

if req.Method == http.MethodGet && operationKit.parsedOperation.Type == "mutation" {
return &httpGraphqlError{
message: "Mutations can only be sent over HTTP POST",
Expand Down
4 changes: 2 additions & 2 deletions router/core/operation_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1162,8 +1162,8 @@ func (p *OperationProcessor) freeKit(kit *parseKit) {
p.parseKitSemaphore <- kit.i
}

func (p *OperationProcessor) ReadBody(buf *bytes.Buffer, r io.Reader) ([]byte, error) {
if _, err := io.Copy(buf, r); err != nil {
func (p *OperationProcessor) ReadBody(reader io.Reader, buf *bytes.Buffer) ([]byte, error) {
if _, err := io.Copy(buf, reader); err != nil {
// Set when http.MaxBytesReader is used before
var maxBytesErr *http.MaxBytesError
if errors.As(err, &maxBytesErr) {
Expand Down
5 changes: 3 additions & 2 deletions router/core/parse_multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"bytes"
"errors"
"fmt"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/httpclient"
"io"
"mime"
"mime/multipart"
"net/http"
"os"
"strings"

"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/httpclient"
)

type MultipartParser struct {
Expand Down Expand Up @@ -114,7 +115,7 @@ func (p *MultipartParser) Parse(r *http.Request, buf *bytes.Buffer) ([]byte, []h
}
}

body, err = p.operationProcessor.ReadBody(buf, strings.NewReader(strings.Join(p.form.Value["operations"], "")))
body, err = p.operationProcessor.ReadBody(strings.NewReader(strings.Join(p.form.Value["operations"], "")), buf)
if err != nil {
return body, p.files, err
}
Expand Down

0 comments on commit 3262622

Please sign in to comment.