diff --git a/cmd/root.go b/cmd/root.go index 180a81c7..5ea77772 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -40,7 +40,8 @@ var ( acpLimitMaxConcurrentTxn int64 acpOnlyWriteQueries bool - etcdMode bool + etcdMode bool + watchQueryTimeout time.Duration } rootCmd = &cobra.Command{ @@ -107,6 +108,7 @@ var ( rootCmdOpts.admissionControlPolicy, rootCmdOpts.acpLimitMaxConcurrentTxn, rootCmdOpts.acpOnlyWriteQueries, + rootCmdOpts.watchQueryTimeout, ) if err != nil { logrus.WithError(err).Fatal("Failed to create server") @@ -181,6 +183,7 @@ func init() { // TODO(MK-1408): This value is highly dependent on underlying hardware, thus making the default value a bit useless. The linked card will implement a dynamic way to set this value. rootCmd.Flags().Int64Var(&rootCmdOpts.acpLimitMaxConcurrentTxn, "admission-control-policy-limit-max-concurrent-transactions", 300, "Maximum number of transactions that are allowed to run concurrently. Transactions will not be admitted after the limit is reached.") rootCmd.Flags().BoolVar(&rootCmdOpts.acpOnlyWriteQueries, "admission-control-only-for-write-queries", false, "If set, admission control will only be applied to write queries.") + rootCmd.Flags().DurationVar(&rootCmdOpts.watchQueryTimeout, "watch-query-timeout", 20*time.Second, "Timeout for querying events in the watch poll loop. If timeout is reached, the poll loop will be re-triggered. The minimum value is 5 seconds.") rootCmd.AddCommand(&cobra.Command{ Use: "version", diff --git a/pkg/kine/drivers/generic/generic.go b/pkg/kine/drivers/generic/generic.go index 6549b809..d6c47efa 100644 --- a/pkg/kine/drivers/generic/generic.go +++ b/pkg/kine/drivers/generic/generic.go @@ -158,6 +158,8 @@ type Generic struct { CompactInterval time.Duration // PollInterval is the event poll interval used by kine. PollInterval time.Duration + // WatchQueryTimeout is the timeout on the after query in the poll loop. + WatchQueryTimeout time.Duration } func configureConnectionPooling(db *sql.DB) { @@ -827,6 +829,13 @@ func (d *Generic) GetCompactInterval() time.Duration { return 5 * time.Minute } +func (d *Generic) GetWatchQueryTimeout() time.Duration { + if v := d.WatchQueryTimeout; v >= 5*time.Second { + return v + } + return 20 * time.Second +} + func (d *Generic) GetPollInterval() time.Duration { if v := d.PollInterval; v > 0 { return v diff --git a/pkg/kine/drivers/sqlite/sqlite.go b/pkg/kine/drivers/sqlite/sqlite.go index e55476f4..ddb10a91 100644 --- a/pkg/kine/drivers/sqlite/sqlite.go +++ b/pkg/kine/drivers/sqlite/sqlite.go @@ -23,8 +23,9 @@ type opts struct { dsn string driverName string // If not empty, use a pre-registered dqlite driver - compactInterval time.Duration - pollInterval time.Duration + compactInterval time.Duration + pollInterval time.Duration + watchQueryTimeout time.Duration admissionControlPolicy string admissionControlPolicyLimitMaxConcurrentTxn int64 @@ -93,6 +94,7 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string) (server. dialect.CompactInterval = opts.compactInterval dialect.PollInterval = opts.pollInterval + dialect.WatchQueryTimeout = opts.watchQueryTimeout dialect.AdmissionControlPolicy = generic.NewAdmissionControlPolicy( opts.admissionControlPolicy, opts.admissionControlOnlyWriteQueries, @@ -213,6 +215,12 @@ func parseOpts(dsn string) (opts, error) { return opts{}, fmt.Errorf("failed to parse poll-interval duration value %q: %w", vs[0], err) } result.pollInterval = d + case "watch-query-timeout": + d, err := time.ParseDuration(vs[0]) + if err != nil { + return opts{}, fmt.Errorf("failed to parse watch-query-timeout duration value %q: %w", vs[0], err) + } + result.watchQueryTimeout = d case "admission-control-policy": result.admissionControlPolicy = vs[0] case "admission-control-policy-limit-max-concurrent-txn": diff --git a/pkg/kine/logstructured/sqllog/sql.go b/pkg/kine/logstructured/sqllog/sql.go index 3ce9151b..8b11e6ae 100644 --- a/pkg/kine/logstructured/sqllog/sql.go +++ b/pkg/kine/logstructured/sqllog/sql.go @@ -3,6 +3,7 @@ package sqllog import ( "context" "database/sql" + "errors" "fmt" "strings" "sync" @@ -74,6 +75,7 @@ type Dialect interface { IsFill(key string) bool GetSize(ctx context.Context) (int64, error) GetCompactInterval() time.Duration + GetWatchQueryTimeout() time.Duration GetPollInterval() time.Duration Close() error } @@ -397,10 +399,14 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { } } waitForMore = true + watchCtx, cancel := context.WithTimeout(s.ctx, s.d.GetWatchQueryTimeout()) + defer cancel() - rows, err := s.d.After(s.ctx, last, 500) + rows, err := s.d.After(watchCtx, last, 500) if err != nil { - logrus.Errorf("fail to list latest changes: %v", err) + if !errors.Is(err, context.DeadlineExceeded) { + logrus.Errorf("fail to list latest changes: %v", err) + } continue } diff --git a/pkg/server/server.go b/pkg/server/server.go index 610df044..3bd0b0da 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -69,6 +69,7 @@ func New( admissionControlPolicy string, admissionControlPolicyLimitMaxConcurrentTxn int64, admissionControlOnlyWriteQueries bool, + watchQueryTimeout time.Duration, ) (*Server, error) { var ( options []app.Option @@ -273,6 +274,7 @@ func New( params["admission-control-policy"] = []string{admissionControlPolicy} params["admission-control-policy-limit-max-concurrent-txn"] = []string{fmt.Sprintf("%v", admissionControlPolicyLimitMaxConcurrentTxn)} params["admission-control-only-write-queries"] = []string{fmt.Sprintf("%v", admissionControlOnlyWriteQueries)} + params["watch-query-timeout"] = []string{fmt.Sprintf("%v", watchQueryTimeout)} kineConfig.Listener = listen kineConfig.Endpoint = fmt.Sprintf("dqlite://k8s?%s", params.Encode())