Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
winebarrel committed Oct 2, 2024
1 parent 5ad4170 commit 464dadb
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 128 deletions.
25 changes: 6 additions & 19 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,7 @@ on:

jobs:
test:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os:
- ubuntu-latest
# Container operations, such as postgres, are only supported on Linux runners
# - macOS-latest
# - windows-latest
go:
- "1.21"
- "1.20"
- "1.19"
- "1.18"
- "1.17"
runs-on: ubuntu-latest
services:
postgres:
image: postgres:12
Expand All @@ -31,13 +17,14 @@ jobs:
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Install Go
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go }}

- name: Checkout code
uses: actions/checkout@v4
go-version-file: go.mod
cache: false

- name: Lint
uses: golangci/golangci-lint-action@v4
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ This library is still under heavy development, and might significantly change AP
## Test

```
docker-compose up -d
docker compose up -d
make db
make table
make test
Expand Down
37 changes: 37 additions & 0 deletions connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package qg

import (
"database/sql/driver"
"fmt"
"net/url"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/stdlib"
)

func GetConnector(host string, port int, username string, password string, database string) (driver.Connector, error) {
u := &url.URL{
Scheme: "postgres",
Host: fmt.Sprintf("%s:%d", host, port),
User: url.UserPassword(username, password),
Path: database,
}

return GetConnectorFromURL(u)
}

func GetConnectorFromURL(u *url.URL) (driver.Connector, error) {
return GetConnectorFromConnStr(u.String())
}

func GetConnectorFromConnStr(connStr string) (driver.Connector, error) {
cfg, err := pgx.ParseConfig(connStr)

if err != nil {
return nil, err
}

connector := stdlib.GetConnector(*cfg, stdlib.OptionAfterConnect(PrepareStatements))

return connector, nil
}
17 changes: 14 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
module github.com/kanmu/qg

go 1.16
go 1.21

require (
github.com/jackc/pgconn v1.14.3
github.com/jackc/pgx/v4 v4.18.3
gopkg.in/guregu/null.v3 v3.0.2-0.20160228005316-41961cea0328
)

require (
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.3 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/lib/pq v1.10.3 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
gopkg.in/guregu/null.v3 v3.0.2-0.20160228005316-41961cea0328
golang.org/x/crypto v0.20.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
33 changes: 0 additions & 33 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0=
github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo=
github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8=
Expand All @@ -36,7 +35,6 @@ github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5W
github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A=
github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78=
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA=
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg=
Expand Down Expand Up @@ -64,7 +62,6 @@ github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -101,18 +98,13 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand All @@ -134,27 +126,17 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.20.0 h1:jmAMJJZXr5KiCw05dfYK9QnqaqKLYXijU23lsEdcQqg=
golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -167,25 +149,13 @@ golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand All @@ -195,10 +165,7 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw
golang.org/x/tools v0.0.0-20190823170909-c4a336ef6a2f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
86 changes: 51 additions & 35 deletions que.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

null "gopkg.in/guregu/null.v3"

"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/stdlib"
)
Expand Down Expand Up @@ -51,7 +50,7 @@ type Job struct {
mu sync.Mutex
deleted bool
c *Client
conn *pgx.Conn
conn *sql.Conn
tx Txer
}

Expand Down Expand Up @@ -92,7 +91,9 @@ type JobStats struct {
}

// Conn returns transaction
func (j *Job) Conn() *pgx.Conn {
//
// Deprecated: This is an internal method. DON'T USE IT.
func (j *Job) Conn() *sql.Conn {
j.mu.Lock()
defer j.mu.Unlock()

Expand Down Expand Up @@ -124,7 +125,12 @@ func (j *Job) DeleteContext(ctx context.Context) error {
return nil
}

_, err := j.conn.Exec(ctx, "que_destroy_job", j.Queue, j.Priority, j.RunAt, j.ID)
err := j.conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
_, err := pgxConn.Exec(ctx, "que_destroy_job", j.Queue, j.Priority, j.RunAt, j.ID)
return err
})

if err != nil {
return err
}
Expand Down Expand Up @@ -152,12 +158,16 @@ func (j *Job) DoneContext(ctx context.Context) {
var ok bool
// Swallow this error because we don't want an unlock failure to cause work to
// stop.
if err := j.conn.QueryRow(ctx, "que_unlock_job", j.ID).Scan(&ok); err != nil {
err := j.conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
return pgxConn.QueryRow(ctx, "que_unlock_job", j.ID).Scan(&ok)
})

if err != nil {
log.Printf("failed to unlock job job_id=%d job_type=%s", j.ID, j.Type)
}

stdlib.ReleaseConn(j.c.pool, j.conn) //nolint:errcheck
// j.pool.Release(j.conn)
j.conn.Close()
j.c.dischargeJob(j)
j.c = nil
j.conn = nil
Expand All @@ -178,7 +188,12 @@ func (j *Job) ErrorContext(ctx context.Context, msg string) error {
errorCount := j.ErrorCount + 1
delay := intPow(int(errorCount), 4) + 3 // TODO: configurable delay

_, err := j.conn.Exec(ctx, "que_set_error", errorCount, delay, msg, j.Queue, j.Priority, j.RunAt, j.ID)
err := j.conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
_, err := pgxConn.Exec(ctx, "que_set_error", errorCount, delay, msg, j.Queue, j.Priority, j.RunAt, j.ID)
return err
})

if err != nil {
return err
}
Expand Down Expand Up @@ -341,27 +356,29 @@ func (c *Client) LockJob(queue string) (*Job, error) {
// After the Job has been worked, you must call either Done() or Error() on it
// in order to return the database connection to the pool and remove the lock.
func (c *Client) LockJobContext(ctx context.Context, queue string) (*Job, error) {
conn, err := stdlib.AcquireConn(c.pool)
conn, err := c.pool.Conn(ctx)
if err != nil {
return nil, err
}

j := Job{c: c, conn: conn}

for i := 0; i < maxLockJobAttempts; i++ {
err = conn.QueryRow(ctx, "que_lock_job", queue).Scan(
&j.Queue,
&j.Priority,
&j.RunAt,
&j.ID,
&j.Type,
&j.Args,
&j.ErrorCount,
)
err = conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
return pgxConn.QueryRow(ctx, "que_lock_job", queue).Scan(
&j.Queue,
&j.Priority,
&j.RunAt,
&j.ID,
&j.Type,
&j.Args,
&j.ErrorCount,
)
})

if err != nil {
// stdConn.Close()
stdlib.ReleaseConn(c.pool, conn) //nolint:errcheck
// c.pool.Release(conn)
conn.Close()
if err == pgx.ErrNoRows {
return nil, nil
}
Expand All @@ -381,7 +398,10 @@ func (c *Client) LockJobContext(ctx context.Context, queue string) (*Job, error)
// I'm not sure how to reliably commit a transaction that deletes
// the job in a separate thread between lock_job and check_job.
var ok bool
err = conn.QueryRow(ctx, "que_check_job", j.Queue, j.Priority, j.RunAt, j.ID).Scan(&ok)
err = conn.Raw(func(driverConn any) error {
pgxConn := driverConn.(*stdlib.Conn).Conn()
return pgxConn.QueryRow(ctx, "que_check_job", j.Queue, j.Priority, j.RunAt, j.ID).Scan(&ok)
})
if err == nil {
c.manageJob(&j)
return &j, nil
Expand All @@ -392,16 +412,18 @@ func (c *Client) LockJobContext(ctx context.Context, queue string) (*Job, error)
// eventually causing the server to run out of locks.
//
// Also swallow the possible error, exactly like in Done.
_ = conn.QueryRow(ctx, "que_unlock_job", j.ID).Scan(&ok)
conn.Raw(func(driverConn any) error {

Check failure on line 415 in que.go

View workflow job for this annotation

GitHub Actions / test

Error return value of `conn.Raw` is not checked (errcheck)

Check failure on line 415 in que.go

View workflow job for this annotation

GitHub Actions / test

Error return value of `conn.Raw` is not checked (errcheck)
pgxConn := driverConn.(*stdlib.Conn).Conn()
pgxConn.QueryRow(ctx, "que_unlock_job", j.ID).Scan(&ok)

Check failure on line 417 in que.go

View workflow job for this annotation

GitHub Actions / test

Error return value of `(github.com/jackc/pgx/v4.Row).Scan` is not checked (errcheck)

Check failure on line 417 in que.go

View workflow job for this annotation

GitHub Actions / test

Error return value of `(github.com/jackc/pgx/v4.Row).Scan` is not checked (errcheck)
return nil
})
continue
} else {
// stdConn.Close()
// c.pool.Release(conn)
stdlib.ReleaseConn(c.pool, conn) //nolint:errcheck
conn.Close()
return nil, err
}
}
stdlib.ReleaseConn(c.pool, conn) //nolint:errcheck
conn.Close()
return nil, ErrAgain
}

Expand All @@ -414,16 +436,10 @@ var preparedStatements = map[string]string{
"que_unlock_job": sqlUnlockJob,
}

// PrepareStatementsContext func without context.Context.
// Deprecated: `PrepareStatements` cannot be passed to `pgconn.Config.AfterConnect`. Use `PrepareStatementsContext` instead
func PrepareStatements(conn *pgconn.PgConn) error {
return PrepareStatementsContext(context.Background(), conn)
}

// PrepareStatements prepar statements
func PrepareStatementsContext(ctx context.Context, conn *pgconn.PgConn) error {
func PrepareStatements(ctx context.Context, conn *pgx.Conn) error {
for name, sql := range preparedStatements {
if _, err := conn.Prepare(ctx, name, sql, nil); err != nil {
if _, err := conn.Prepare(ctx, name, sql); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit 464dadb

Please sign in to comment.