diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9f426e2..b43d0a9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -6,21 +6,7 @@ on: jobs: test: - runs-on: ${{ matrix.os }} - strategy: - fail-fast: false - matrix: - os: - - ubuntu-latest - # Container operations, such as postgres, are only supported on Linux runners - # - macOS-latest - # - windows-latest - go: - - "1.21" - - "1.20" - - "1.19" - - "1.18" - - "1.17" + runs-on: ubuntu-latest services: postgres: image: postgres:12 @@ -31,13 +17,14 @@ jobs: options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 steps: + - name: Checkout code + uses: actions/checkout@v4 + - name: Install Go uses: actions/setup-go@v5 with: - go-version: ${{ matrix.go }} - - - name: Checkout code - uses: actions/checkout@v4 + go-version-file: go.mod + cache: false - name: Lint uses: golangci/golangci-lint-action@v4 diff --git a/README.md b/README.md index 4bf00be..0e023cf 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ This library is still under heavy development, and might significantly change AP ## Test ``` -docker-compose up -d +docker compose up -d make db make table make test diff --git a/connector.go b/connector.go new file mode 100644 index 0000000..51fc26f --- /dev/null +++ b/connector.go @@ -0,0 +1,37 @@ +package qg + +import ( + "database/sql/driver" + "fmt" + "net/url" + + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/stdlib" +) + +func GetConnector(host string, port int, username string, password string, database string) (driver.Connector, error) { + u := &url.URL{ + Scheme: "postgres", + Host: fmt.Sprintf("%s:%d", host, port), + User: url.UserPassword(username, password), + Path: database, + } + + return GetConnectorFromURL(u) +} + +func GetConnectorFromURL(u *url.URL) (driver.Connector, error) { + return GetConnectorFromConnStr(u.String()) +} + +func GetConnectorFromConnStr(connStr string) (driver.Connector, error) { + cfg, err := pgx.ParseConfig(connStr) + + if err != nil { + return nil, err + } + + connector := stdlib.GetConnector(*cfg, stdlib.OptionAfterConnect(PrepareStatements)) + + return connector, nil +} diff --git a/go.mod b/go.mod index 481ff0f..63d1076 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,22 @@ module github.com/kanmu/qg -go 1.16 +go 1.21 require ( - github.com/jackc/pgconn v1.14.3 github.com/jackc/pgx/v4 v4.18.3 + gopkg.in/guregu/null.v3 v3.0.2-0.20160228005316-41961cea0328 +) + +require ( + github.com/jackc/chunkreader/v2 v2.0.1 // indirect + github.com/jackc/pgconn v1.14.3 // indirect + github.com/jackc/pgio v1.0.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgproto3/v2 v2.3.3 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgtype v1.14.0 // indirect github.com/lib/pq v1.10.3 // indirect github.com/shopspring/decimal v1.3.1 // indirect - gopkg.in/guregu/null.v3 v3.0.2-0.20160228005316-41961cea0328 + golang.org/x/crypto v0.20.0 // indirect + golang.org/x/text v0.14.0 // indirect ) diff --git a/go.sum b/go.sum index b2e879d..664fe72 100644 --- a/go.sum +++ b/go.sum @@ -15,7 +15,6 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= -github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0= github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= @@ -36,7 +35,6 @@ github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5W github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A= github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg= @@ -64,7 +62,6 @@ github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= -github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -101,18 +98,13 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -134,27 +126,17 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.20.0 h1:jmAMJJZXr5KiCw05dfYK9QnqaqKLYXijU23lsEdcQqg= golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -167,25 +149,13 @@ golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= -golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -195,10 +165,7 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190823170909-c4a336ef6a2f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/que.go b/que.go index e506aab..bd9b7e9 100644 --- a/que.go +++ b/que.go @@ -10,7 +10,6 @@ import ( null "gopkg.in/guregu/null.v3" - "github.com/jackc/pgconn" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/stdlib" ) @@ -51,7 +50,7 @@ type Job struct { mu sync.Mutex deleted bool c *Client - conn *pgx.Conn + conn *sql.Conn tx Txer } @@ -92,7 +91,9 @@ type JobStats struct { } // Conn returns transaction -func (j *Job) Conn() *pgx.Conn { +// +// Deprecated: This is an internal method. DON'T USE IT. +func (j *Job) Conn() *sql.Conn { j.mu.Lock() defer j.mu.Unlock() @@ -124,7 +125,12 @@ func (j *Job) DeleteContext(ctx context.Context) error { return nil } - _, err := j.conn.Exec(ctx, "que_destroy_job", j.Queue, j.Priority, j.RunAt, j.ID) + err := j.conn.Raw(func(driverConn any) error { + pgxConn := driverConn.(*stdlib.Conn).Conn() + _, err := pgxConn.Exec(ctx, "que_destroy_job", j.Queue, j.Priority, j.RunAt, j.ID) + return err + }) + if err != nil { return err } @@ -152,12 +158,16 @@ func (j *Job) DoneContext(ctx context.Context) { var ok bool // Swallow this error because we don't want an unlock failure to cause work to // stop. - if err := j.conn.QueryRow(ctx, "que_unlock_job", j.ID).Scan(&ok); err != nil { + err := j.conn.Raw(func(driverConn any) error { + pgxConn := driverConn.(*stdlib.Conn).Conn() + return pgxConn.QueryRow(ctx, "que_unlock_job", j.ID).Scan(&ok) + }) + + if err != nil { log.Printf("failed to unlock job job_id=%d job_type=%s", j.ID, j.Type) } - stdlib.ReleaseConn(j.c.pool, j.conn) //nolint:errcheck - // j.pool.Release(j.conn) + j.conn.Close() j.c.dischargeJob(j) j.c = nil j.conn = nil @@ -178,7 +188,12 @@ func (j *Job) ErrorContext(ctx context.Context, msg string) error { errorCount := j.ErrorCount + 1 delay := intPow(int(errorCount), 4) + 3 // TODO: configurable delay - _, err := j.conn.Exec(ctx, "que_set_error", errorCount, delay, msg, j.Queue, j.Priority, j.RunAt, j.ID) + err := j.conn.Raw(func(driverConn any) error { + pgxConn := driverConn.(*stdlib.Conn).Conn() + _, err := pgxConn.Exec(ctx, "que_set_error", errorCount, delay, msg, j.Queue, j.Priority, j.RunAt, j.ID) + return err + }) + if err != nil { return err } @@ -341,7 +356,7 @@ func (c *Client) LockJob(queue string) (*Job, error) { // After the Job has been worked, you must call either Done() or Error() on it // in order to return the database connection to the pool and remove the lock. func (c *Client) LockJobContext(ctx context.Context, queue string) (*Job, error) { - conn, err := stdlib.AcquireConn(c.pool) + conn, err := c.pool.Conn(ctx) if err != nil { return nil, err } @@ -349,19 +364,21 @@ func (c *Client) LockJobContext(ctx context.Context, queue string) (*Job, error) j := Job{c: c, conn: conn} for i := 0; i < maxLockJobAttempts; i++ { - err = conn.QueryRow(ctx, "que_lock_job", queue).Scan( - &j.Queue, - &j.Priority, - &j.RunAt, - &j.ID, - &j.Type, - &j.Args, - &j.ErrorCount, - ) + err = conn.Raw(func(driverConn any) error { + pgxConn := driverConn.(*stdlib.Conn).Conn() + return pgxConn.QueryRow(ctx, "que_lock_job", queue).Scan( + &j.Queue, + &j.Priority, + &j.RunAt, + &j.ID, + &j.Type, + &j.Args, + &j.ErrorCount, + ) + }) + if err != nil { - // stdConn.Close() - stdlib.ReleaseConn(c.pool, conn) //nolint:errcheck - // c.pool.Release(conn) + conn.Close() if err == pgx.ErrNoRows { return nil, nil } @@ -381,7 +398,10 @@ func (c *Client) LockJobContext(ctx context.Context, queue string) (*Job, error) // I'm not sure how to reliably commit a transaction that deletes // the job in a separate thread between lock_job and check_job. var ok bool - err = conn.QueryRow(ctx, "que_check_job", j.Queue, j.Priority, j.RunAt, j.ID).Scan(&ok) + err = conn.Raw(func(driverConn any) error { + pgxConn := driverConn.(*stdlib.Conn).Conn() + return pgxConn.QueryRow(ctx, "que_check_job", j.Queue, j.Priority, j.RunAt, j.ID).Scan(&ok) + }) if err == nil { c.manageJob(&j) return &j, nil @@ -392,16 +412,18 @@ func (c *Client) LockJobContext(ctx context.Context, queue string) (*Job, error) // eventually causing the server to run out of locks. // // Also swallow the possible error, exactly like in Done. - _ = conn.QueryRow(ctx, "que_unlock_job", j.ID).Scan(&ok) + conn.Raw(func(driverConn any) error { + pgxConn := driverConn.(*stdlib.Conn).Conn() + pgxConn.QueryRow(ctx, "que_unlock_job", j.ID).Scan(&ok) + return nil + }) continue } else { - // stdConn.Close() - // c.pool.Release(conn) - stdlib.ReleaseConn(c.pool, conn) //nolint:errcheck + conn.Close() return nil, err } } - stdlib.ReleaseConn(c.pool, conn) //nolint:errcheck + conn.Close() return nil, ErrAgain } @@ -414,16 +436,10 @@ var preparedStatements = map[string]string{ "que_unlock_job": sqlUnlockJob, } -// PrepareStatementsContext func without context.Context. -// Deprecated: `PrepareStatements` cannot be passed to `pgconn.Config.AfterConnect`. Use `PrepareStatementsContext` instead -func PrepareStatements(conn *pgconn.PgConn) error { - return PrepareStatementsContext(context.Background(), conn) -} - // PrepareStatements prepar statements -func PrepareStatementsContext(ctx context.Context, conn *pgconn.PgConn) error { +func PrepareStatements(ctx context.Context, conn *pgx.Conn) error { for name, sql := range preparedStatements { - if _, err := conn.Prepare(ctx, name, sql, nil); err != nil { + if _, err := conn.Prepare(ctx, name, sql); err != nil { return err } } diff --git a/que_test.go b/que_test.go index 807d683..9ca9337 100644 --- a/que_test.go +++ b/que_test.go @@ -2,23 +2,14 @@ package qg import ( "database/sql" - "fmt" - "net/url" "testing" "time" "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/stdlib" ) var testConnConfig = func() *pgx.ConnConfig { - u := &url.URL{ - Scheme: "postgres", - Host: fmt.Sprintf("%s:%d", "localhost", 5432), - User: url.User("qgtest"), - Path: "qgtest", - } - conn, _ := pgx.ParseConfig(u.String()) + conn, _ := pgx.ParseConfig("postgres://qgtest@localhost:5432/qgtest") // conn.LogLevel = pgx.LogLevelDebug // conn.Logger = log15.New("testlogger", "test/qg") return conn @@ -27,20 +18,11 @@ var testConnConfig = func() *pgx.ConnConfig { const maxConn = 5 func openTestClientMaxConns(t testing.TB, maxConnections int) *Client { - u := &url.URL{ - Scheme: "postgres", - Host: fmt.Sprintf("%s:%d", "localhost", 5432), - User: url.User("qgtest"), - Path: "qgtest", - } - - driverConfig, _ := pgx.ParseConfig(u.String()) - driverConfig.AfterConnect = PrepareStatementsContext - connStr := stdlib.RegisterConnConfig(driverConfig) - db, err := sql.Open("pgx", connStr) + connector, err := GetConnector("localhost", 5432, "qgtest", "", "qgtest") if err != nil { t.Fatal(err) } + db := sql.OpenDB(connector) // using stdlib, it's difficult to open max conn from the begining // if we want to open connections till its limit, need to use go routine to // concurrently open connections diff --git a/testing.go b/testing.go index 2c6ed7f..49aeb0a 100644 --- a/testing.go +++ b/testing.go @@ -1,9 +1,9 @@ package qg -import "github.com/jackc/pgx/v4" +import "database/sql" // // TestInjectJobConn injects *pgx.Conn to Job -func TestInjectJobConn(j *Job, conn *pgx.Conn) *Job { +func TestInjectJobConn(j *Job, conn *sql.Conn) *Job { j.conn = conn return j } diff --git a/work_test.go b/work_test.go index 5173a17..386fe64 100644 --- a/work_test.go +++ b/work_test.go @@ -214,15 +214,6 @@ func TestLockJobAdvisoryRace(t *testing.T) { defer truncateAndClose(c) ctx := context.Background() - // *pgx.ConnPool doesn't support pools of only one connection. Make sure - // the other one is busy so we know which backend will be used by LockJob - // below. - unusedConn, err := stdlib.AcquireConn(c.pool) - if err != nil { - t.Fatal(err) - } - defer stdlib.ReleaseConn(c.pool, unusedConn) //nolint:errcheck - // We use two jobs: the first one is concurrently deleted, and the second // one is returned by LockJob after recovering from the race condition. for i := 0; i < 2; i++ { @@ -340,7 +331,7 @@ func TestLockJobAdvisoryRace(t *testing.T) { (SELECT min(job_id) FROM que_jobs) RETURNING job_id - `).Scan(ctx, &jid) + `).Scan(&jid) if err != nil { panic(err) } @@ -500,7 +491,7 @@ func TestJobDeleteFromTx(t *testing.T) { } // start a transaction - tx, err := conn.Begin(ctx) + tx, err := conn.BeginTx(ctx, nil) if err != nil { t.Fatal(err) } @@ -510,7 +501,7 @@ func TestJobDeleteFromTx(t *testing.T) { t.Fatal(err) } - if err = tx.Commit(ctx); err != nil { + if err = tx.Commit(); err != nil { t.Fatal(err) } @@ -552,7 +543,7 @@ func TestJobDeleteFromTxRollback(t *testing.T) { } // start a transaction - tx, err := conn.Begin(ctx) + tx, err := conn.BeginTx(ctx, nil) if err != nil { t.Fatal(err) } @@ -562,7 +553,7 @@ func TestJobDeleteFromTxRollback(t *testing.T) { t.Fatal(err) } - if err = tx.Rollback(ctx); err != nil { + if err = tx.Rollback(); err != nil { t.Fatal(err) }