Skip to content

Commit

Permalink
Configurable watch query timeout (#162)
Browse files Browse the repository at this point in the history
  • Loading branch information
louiseschmidtgen authored and berkayoz committed Sep 2, 2024
1 parent 092e35e commit 25173c5
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 5 deletions.
5 changes: 4 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ var (
acpLimitMaxConcurrentTxn int64
acpOnlyWriteQueries bool

etcdMode bool
etcdMode bool
watchQueryTimeout time.Duration
}

rootCmd = &cobra.Command{
Expand Down Expand Up @@ -80,6 +81,7 @@ var (
rootCmdOpts.admissionControlPolicy,
rootCmdOpts.acpLimitMaxConcurrentTxn,
rootCmdOpts.acpOnlyWriteQueries,
rootCmdOpts.watchQueryTimeout,
)
if err != nil {
logrus.WithError(err).Fatal("Failed to create server")
Expand Down Expand Up @@ -141,6 +143,7 @@ func init() {
// TODO(MK-1408): This value is highly dependend on underlying hardware, thus making the default value a bit useless. The linked card will implement a dynamic way to set this value.
rootCmd.Flags().Int64Var(&rootCmdOpts.acpLimitMaxConcurrentTxn, "admission-control-policy-limit-max-concurrent-transactions", 300, "Maximum number of transactions that are allowed to run concurrently. Transactions will not be admitted after the limit is reached.")
rootCmd.Flags().BoolVar(&rootCmdOpts.acpOnlyWriteQueries, "admission-control-only-for-write-queries", false, "If set, admission control will only be applied to write queries.")
rootCmd.Flags().DurationVar(&rootCmdOpts.watchQueryTimeout, "watch-query-timeout", 20*time.Second, "Timeout for querying events in the watch poll loop. If timeout is reached, the poll loop will be re-triggered. The minimum value is 5 seconds.")

rootCmd.AddCommand(&cobra.Command{
Use: "version",
Expand Down
9 changes: 9 additions & 0 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ type Generic struct {
CompactInterval time.Duration
// PollInterval is the event poll interval used by kine.
PollInterval time.Duration
// WatchQueryTimeout is the timeout on the after query in the poll loop.
WatchQueryTimeout time.Duration
}

func configureConnectionPooling(db *sql.DB) {
Expand Down Expand Up @@ -656,6 +658,13 @@ func (d *Generic) GetCompactInterval() time.Duration {
return 5 * time.Minute
}

func (d *Generic) GetWatchQueryTimeout() time.Duration {
if v := d.WatchQueryTimeout; v >= 5*time.Second {
return v
}
return 20 * time.Second
}

func (d *Generic) GetPollInterval() time.Duration {
if v := d.PollInterval; v > 0 {
return v
Expand Down
12 changes: 10 additions & 2 deletions pkg/kine/drivers/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ type opts struct {
dsn string
driverName string // If not empty, use a pre-registered dqlite driver

compactInterval time.Duration
pollInterval time.Duration
compactInterval time.Duration
pollInterval time.Duration
watchQueryTimeout time.Duration

admissionControlPolicy string
admissionControlPolicyLimitMaxConcurrentTxn int64
Expand Down Expand Up @@ -97,6 +98,7 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string) (server.

dialect.CompactInterval = opts.compactInterval
dialect.PollInterval = opts.pollInterval
dialect.WatchQueryTimeout = opts.watchQueryTimeout
dialect.AdmissionControlPolicy = generic.NewAdmissionControlPolicy(
opts.admissionControlPolicy,
opts.admissionControlOnlyWriteQueries,
Expand Down Expand Up @@ -211,6 +213,12 @@ func parseOpts(dsn string) (opts, error) {
return opts{}, fmt.Errorf("failed to parse poll-interval duration value %q: %w", vs[0], err)
}
result.pollInterval = d
case "watch-query-timeout":
d, err := time.ParseDuration(vs[0])
if err != nil {
return opts{}, fmt.Errorf("failed to parse watch-query-timeout duration value %q: %w", vs[0], err)
}
result.watchQueryTimeout = d
case "admission-control-policy":
result.admissionControlPolicy = vs[0]
case "admission-control-policy-limit-max-concurrent-txn":
Expand Down
10 changes: 8 additions & 2 deletions pkg/kine/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sqllog
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -48,6 +49,7 @@ type Dialect interface {
IsFill(key string) bool
GetSize(ctx context.Context) (int64, error)
GetCompactInterval() time.Duration
GetWatchQueryTimeout() time.Duration
GetPollInterval() time.Duration
Close()
}
Expand Down Expand Up @@ -414,10 +416,14 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
}
}
waitForMore = true
watchCtx, cancel := context.WithTimeout(s.ctx, s.d.GetWatchQueryTimeout())
defer cancel()

rows, err := s.d.After(s.ctx, last, 500)
rows, err := s.d.After(watchCtx, last, 500)
if err != nil {
logrus.Errorf("fail to list latest changes: %v", err)
if !errors.Is(err, context.DeadlineExceeded) {
logrus.Errorf("fail to list latest changes: %v", err)
}
continue
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func New(
admissionControlPolicy string,
admissionControlPolicyLimitMaxConcurrentTxn int64,
admissionControlOnlyWriteQueries bool,
watchQueryTimeout time.Duration,
) (*Server, error) {
var (
options []app.Option
Expand Down Expand Up @@ -273,6 +274,7 @@ func New(
params["admission-control-policy"] = []string{admissionControlPolicy}
params["admission-control-policy-limit-max-concurrent-txn"] = []string{fmt.Sprintf("%v", admissionControlPolicyLimitMaxConcurrentTxn)}
params["admission-control-only-write-queries"] = []string{fmt.Sprintf("%v", admissionControlOnlyWriteQueries)}
params["watch-query-timeout"] = []string{fmt.Sprintf("%v", watchQueryTimeout)}

kineConfig.Listener = listen
kineConfig.Endpoint = fmt.Sprintf("dqlite://k8s?%s", params.Encode())
Expand Down

0 comments on commit 25173c5

Please sign in to comment.