Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dd-trace-go sql tracer #34

Merged
merged 3 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
module github.com/kanmu/qg

go 1.22
go 1.22.0

toolchain go1.22.8

require (
github.com/jackc/pgx/v4 v4.18.3
gopkg.in/DataDog/dd-trace-go.v1 v1.68.0
gopkg.in/guregu/null.v3 v3.0.2-0.20160228005316-41961cea0328
)

require (
github.com/DataDog/appsec-internal-go v1.7.0 // indirect
github.com/DataDog/datadog-agent/pkg/obfuscate v0.48.0 // indirect
github.com/DataDog/datadog-agent/pkg/remoteconfig/state v0.48.1 // indirect
github.com/DataDog/datadog-go/v5 v5.3.0 // indirect
github.com/DataDog/go-libddwaf/v3 v3.3.0 // indirect
github.com/DataDog/go-tuf v1.0.2-0.5.2 // indirect
github.com/DataDog/sketches-go v1.4.5 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/ebitengine/purego v0.6.0-alpha.5 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.3 // indirect
github.com/jackc/pgio v1.0.0 // indirect
Expand All @@ -16,7 +31,19 @@ require (
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/lib/pq v1.10.3 // indirect
github.com/outcaste-io/ristretto v0.2.3 // indirect
github.com/philhofer/fwd v1.1.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.7.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
golang.org/x/crypto v0.20.0 // indirect
github.com/tinylib/msgp v1.1.8 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.16.1 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)
179 changes: 172 additions & 7 deletions go.sum

Large diffs are not rendered by default.

19 changes: 6 additions & 13 deletions que.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
null "gopkg.in/guregu/null.v3"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/stdlib"
)

// Job is a single unit of work for Que to perform.
Expand Down Expand Up @@ -125,8 +124,7 @@ func (j *Job) DeleteContext(ctx context.Context) error {
return nil
}

err := j.conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
err := rawConn(j.conn, func(pgxConn *pgx.Conn) error {
_, err := pgxConn.Exec(ctx, "que_destroy_job", j.Queue, j.Priority, j.RunAt, j.ID)
return err
})
Expand Down Expand Up @@ -158,8 +156,7 @@ func (j *Job) DoneContext(ctx context.Context) {
var ok bool
// Swallow this error because we don't want an unlock failure to cause work to
// stop.
err := j.conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
err := rawConn(j.conn, func(pgxConn *pgx.Conn) error {
return pgxConn.QueryRow(ctx, "que_unlock_job", j.ID).Scan(&ok)
})

Expand Down Expand Up @@ -188,8 +185,7 @@ func (j *Job) ErrorContext(ctx context.Context, msg string) error {
errorCount := j.ErrorCount + 1
delay := intPow(int(errorCount), 4) + 3 // TODO: configurable delay

err := j.conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
err := rawConn(j.conn, func(pgxConn *pgx.Conn) error {
_, err := pgxConn.Exec(ctx, "que_set_error", errorCount, delay, msg, j.Queue, j.Priority, j.RunAt, j.ID)
return err
})
Expand Down Expand Up @@ -366,8 +362,7 @@ func (c *Client) LockJobContext(ctx context.Context, queue string) (*Job, error)
j := Job{c: c, conn: conn}

for i := 0; i < maxLockJobAttempts; i++ {
err = conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
err = rawConn(conn, func(pgxConn *pgx.Conn) error {
return pgxConn.QueryRow(ctx, "que_lock_job", queue).Scan(
&j.Queue,
&j.Priority,
Expand Down Expand Up @@ -400,8 +395,7 @@ func (c *Client) LockJobContext(ctx context.Context, queue string) (*Job, error)
// I'm not sure how to reliably commit a transaction that deletes
// the job in a separate thread between lock_job and check_job.
var ok bool
err = conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
err = rawConn(conn, func(pgxConn *pgx.Conn) error {
return pgxConn.QueryRow(ctx, "que_check_job", j.Queue, j.Priority, j.RunAt, j.ID).Scan(&ok)
})
if err == nil {
Expand All @@ -414,8 +408,7 @@ func (c *Client) LockJobContext(ctx context.Context, queue string) (*Job, error)
// eventually causing the server to run out of locks.
//
// Also swallow the possible error, exactly like in Done.
conn.Raw(func(driverConn any) error { //nolint:errcheck
pgxConn := driverConn.(*stdlib.Conn).Conn()
rawConn(conn, func(pgxConn *pgx.Conn) error { //nolint:errcheck
pgxConn.QueryRow(ctx, "que_unlock_job", j.ID).Scan(&ok) //nolint:errcheck
return nil
})
Expand Down
14 changes: 11 additions & 3 deletions que_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package qg

import (
"database/sql"
"database/sql/driver"
"testing"
"time"

"github.com/jackc/pgx/v4"
sqltracer "gopkg.in/DataDog/dd-trace-go.v1/contrib/database/sql"
)

var testConnConfig = func() *pgx.ConnConfig {
Expand All @@ -17,12 +19,12 @@ var testConnConfig = func() *pgx.ConnConfig {

const maxConn = 5

func openTestClientMaxConns(t testing.TB, maxConnections int) *Client {
func openTestClientMaxConns(t testing.TB, maxConnections int, openDB func(driver.Connector) *sql.DB) *Client {
connector, err := GetConnector("localhost", 5432, "qgtest", "", "qgtest")
if err != nil {
t.Fatal(err)
}
db := sql.OpenDB(connector)
db := openDB(connector)
// using stdlib, it's difficult to open max conn from the beginning
// if we want to open connections till its limit, need to use go routine to
// concurrently open connections
Expand All @@ -38,7 +40,13 @@ func openTestClientMaxConns(t testing.TB, maxConnections int) *Client {
}

func openTestClient(t testing.TB) *Client {
return openTestClientMaxConns(t, maxConn)
return openTestClientMaxConns(t, maxConn, sql.OpenDB)
}

func openTestClientWithTracer(t testing.TB) *Client {
return openTestClientMaxConns(t, maxConn, func(c driver.Connector) *sql.DB {
return sqltracer.OpenDB(c)
})
}

func truncateAndClose(c *Client) {
Expand Down
26 changes: 26 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package qg

import (
"database/sql"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/stdlib"
sqltracer "gopkg.in/DataDog/dd-trace-go.v1/contrib/database/sql"
)

// intPow returns x**y, the base-x exponential of y.
func intPow(x, y int) (r int) {
if x == r || y < r {
Expand All @@ -24,3 +32,21 @@ func intPow(x, y int) (r int) {
}
return
}

// Get *pgx.Conn from *sql.Conn and pass it to function.
func rawConn(conn *sql.Conn, f func(*pgx.Conn) error) error {
err := conn.Raw(func(driverConn any) error {
var stdlibConn *stdlib.Conn

if tracedConn, ok := driverConn.(*sqltracer.TracedConn); ok {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wConn, ok := driverConn.(interface { WrappedConn() driver.Conn }); ok みたいにインターフェイスだけ持ってくるのはどうですか?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

修正してみました57b94f5

stdlibConn = tracedConn.WrappedConn().(*stdlib.Conn)
} else {
stdlibConn = driverConn.(*stdlib.Conn)
}

pgxConn := stdlibConn.Conn()
return f(pgxConn)
})

return err
}
7 changes: 3 additions & 4 deletions work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package qg

import (
"context"
"database/sql"
"fmt"
"sync"
"testing"
"time"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/stdlib"
)

func TestLockJob(t *testing.T) {
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestJobConnRace(t *testing.T) {

// Test the race condition in LockJob
func TestLockJobAdvisoryRace(t *testing.T) {
c := openTestClientMaxConns(t, 4)
c := openTestClientMaxConns(t, 4, sql.OpenDB)
defer truncateAndClose(c)
ctx := context.Background()

Expand Down Expand Up @@ -349,8 +349,7 @@ func TestLockJobAdvisoryRace(t *testing.T) {
if err != nil {
panic(err)
}
conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
rawConn(conn, func(pgxConn *pgx.Conn) error {
ourBackendID = getBackendID(pgxConn)
return nil
})
Expand Down
31 changes: 31 additions & 0 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,37 @@ func TestWorkerWorkOne(t *testing.T) {
}
}

func TestWorkerWorkOneWithTracer(t *testing.T) {
c := openTestClientWithTracer(t)
defer truncateAndClose(c)

success := false
wm := WorkMap{
"MyJob": func(j *Job) error {
success = true
return nil
},
}
w := NewWorker(c, wm)

didWork := w.WorkOne()
if didWork {
t.Errorf("want didWork=false when no job was queued")
}

if err := c.Enqueue(&Job{Type: "MyJob"}); err != nil {
t.Fatal(err)
}

didWork = w.WorkOne()
if !didWork {
t.Errorf("want didWork=true")
}
if !success {
t.Errorf("want success=true")
}
}

func TestWorkerShutdown(t *testing.T) {
c := openTestClient(t)
defer truncateAndClose(c)
Expand Down
Loading