diff --git a/pkg/kine/drivers/generic/generic.go b/pkg/kine/drivers/generic/generic.go index f11c02e2..429d20c4 100644 --- a/pkg/kine/drivers/generic/generic.go +++ b/pkg/kine/drivers/generic/generic.go @@ -72,6 +72,8 @@ var ( ) AS high` ) +const retryCount = 500 + type Stripped string func (s Stripped) String() string { @@ -317,7 +319,7 @@ func getPrefixRange(prefix string) (start, end string) { } func (d *Generic) query(ctx context.Context, txName, sql string, args ...interface{}) (rows *sql.Rows, err error) { - i := uint(0) + i := 0 start := time.Now() done, err := d.AdmissionControlPolicy.Admit(ctx, txName) @@ -333,7 +335,7 @@ func (d *Generic) query(ctx context.Context, txName, sql string, args ...interfa }() strippedSQL := Stripped(sql) - for ; i < 500; i++ { + for ; i < retryCount; i++ { if i > 2 { logrus.Debugf("QUERY (try: %d) %v : %s", i, args, strippedSQL) } else { @@ -418,7 +420,7 @@ func (d *Generic) queryRowPrepared(ctx context.Context, txName, sql string, prep } func (d *Generic) executePrepared(ctx context.Context, txName, sql string, prepared *sql.Stmt, args ...interface{}) (result sql.Result, err error) { - i := uint(0) + i := 0 start := time.Now() defer func() { if err != nil { @@ -438,7 +440,7 @@ func (d *Generic) executePrepared(ctx context.Context, txName, sql string, prepa } strippedSQL := Stripped(sql) - for ; i < 500; i++ { + for ; i < retryCount; i++ { if i > 2 { logrus.Debugf("EXEC (try: %d) %v : %s", i, args, strippedSQL) } else { @@ -474,9 +476,21 @@ func (d *Generic) GetCompactRevision(ctx context.Context) (int64, int64, error) return 0, 0, fmt.Errorf("denied: %w", err) } - row := d.DB.QueryRow(revisionIntervalSQL) + for i := 0; i < retryCount; i++ { + if i > 2 { + logrus.Debugf("EXEC (try: %d): %s", i, revisionIntervalSQL) + } else { + logrus.Tracef("EXEC (try: %d): %s", i, revisionIntervalSQL) + } + row := d.DB.QueryRow(revisionIntervalSQL) + err = row.Scan(&compact, &target) + if err != nil && d.Retry != nil && d.Retry(err) { + time.Sleep(jitter.Deviation(nil, 0.3)(2 * time.Millisecond)) + continue + } + break + } done() - err = row.Scan(&compact, &target) if err == sql.ErrNoRows { return 0, 0, nil } diff --git a/pkg/kine/drivers/sqlite/sqlite.go b/pkg/kine/drivers/sqlite/sqlite.go index fffd4d6a..f7a94a18 100644 --- a/pkg/kine/drivers/sqlite/sqlite.go +++ b/pkg/kine/drivers/sqlite/sqlite.go @@ -103,6 +103,15 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string) (server. opts.admissionControlPolicyLimitMaxConcurrentTxn, ) + if driverName == "sqlite3" { + dialect.Retry = func(err error) bool { + if err, ok := err.(sqlite3.Error); ok { + return err.Code == sqlite3.ErrBusy + } + return false + } + } + return logstructured.New(sqllog.New(dialect)), dialect, nil }