From 92bb3791c1d013f39a56708df695ccfaeb108aa5 Mon Sep 17 00:00:00 2001 From: Marco Manino Date: Thu, 24 Oct 2024 16:51:40 +0200 Subject: [PATCH] Make Create path uniform with Update/Delete (#187) --- pkg/kine/drivers/generic/generic.go | 14 +++++++------- pkg/kine/logstructured/logstructured.go | 12 ++++++------ pkg/kine/logstructured/sqllog/sql.go | 17 +++++++++-------- pkg/kine/server/create.go | 12 +++++++----- pkg/kine/server/limited.go | 2 +- pkg/kine/server/types.go | 2 +- pkg/kine/server/update.go | 24 ++++++++---------------- 7 files changed, 39 insertions(+), 44 deletions(-) diff --git a/pkg/kine/drivers/generic/generic.go b/pkg/kine/drivers/generic/generic.go index 9468b114..1a464c7c 100644 --- a/pkg/kine/drivers/generic/generic.go +++ b/pkg/kine/drivers/generic/generic.go @@ -11,7 +11,6 @@ import ( "time" "github.com/canonical/k8s-dqlite/pkg/kine/prepared" - "github.com/canonical/k8s-dqlite/pkg/kine/server" "github.com/pkg/errors" "github.com/sirupsen/logrus" "go.opentelemetry.io/otel" @@ -519,7 +518,7 @@ func (d *Generic) Count(ctx context.Context, prefix, startKey string, revision i return rev.Int64, id, err } -func (d *Generic) Create(ctx context.Context, key string, value []byte, ttl int64) (rev int64, err error) { +func (d *Generic) Create(ctx context.Context, key string, value []byte, ttl int64) (rev int64, succeeded bool, err error) { ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Create", otelName)) defer func() { @@ -541,16 +540,17 @@ func (d *Generic) Create(ctx context.Context, key string, value []byte, ttl int6 result, err := d.execute(ctx, "create_sql", d.CreateSQL, key, ttl, value, key) if err != nil { logrus.WithError(err).Error("failed to create key") - return 0, err + return 0, false, err } - if insertCount, err := result.RowsAffected(); err != nil { - return 0, err + return 0, false, err } else if insertCount == 0 { - return 0, server.ErrKeyExists + return 0, false, nil } - return result.LastInsertId() + rev, err = result.LastInsertId() + return rev, true, err } + func (d *Generic) Update(ctx context.Context, key string, value []byte, preRev, ttl int64) (rev int64, updated bool, err error) { ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Update", otelName)) defer func() { diff --git a/pkg/kine/logstructured/logstructured.go b/pkg/kine/logstructured/logstructured.go index 5762f120..4ce36383 100644 --- a/pkg/kine/logstructured/logstructured.go +++ b/pkg/kine/logstructured/logstructured.go @@ -29,9 +29,9 @@ type Log interface { Wait() CurrentRevision(ctx context.Context) (int64, error) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error) - Create(ctx context.Context, key string, value []byte, lease int64) (int64, error) - Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, updateRet bool, errRet error) - Delete(ctx context.Context, key string, revision int64) (revRet int64, deleted bool, errRet error) + Create(ctx context.Context, key string, value []byte, lease int64) (rev int64, created bool, err error) + Update(ctx context.Context, key string, value []byte, revision, lease int64) (rev int64, updated bool, err error) + Delete(ctx context.Context, key string, revision int64) (rev int64, deleted bool, err error) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) Watch(ctx context.Context, prefix string) <-chan []*server.Event Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) @@ -137,10 +137,10 @@ func (l *LogStructured) adjustRevision(ctx context.Context, rev *int64) { } } -func (l *LogStructured) Create(ctx context.Context, key string, value []byte, lease int64) (rev int64, err error) { - rev, err = l.log.Create(ctx, key, value, lease) +func (l *LogStructured) Create(ctx context.Context, key string, value []byte, lease int64) (rev int64, created bool, err error) { + rev, created, err = l.log.Create(ctx, key, value, lease) logrus.Debugf("CREATE %s, size=%d, lease=%d => rev=%d, err=%v", key, len(value), lease, rev, err) - return rev, err + return rev, created, err } func (l *LogStructured) Delete(ctx context.Context, key string, revision int64) (revRet int64, deleted bool, errRet error) { diff --git a/pkg/kine/logstructured/sqllog/sql.go b/pkg/kine/logstructured/sqllog/sql.go index 25d7ee2e..dab85524 100644 --- a/pkg/kine/logstructured/sqllog/sql.go +++ b/pkg/kine/logstructured/sqllog/sql.go @@ -65,7 +65,7 @@ type Dialect interface { CurrentRevision(ctx context.Context) (int64, error) AfterPrefix(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error) After(ctx context.Context, rev, limit int64) (*sql.Rows, error) - Create(ctx context.Context, key string, value []byte, lease int64) (int64, error) + Create(ctx context.Context, key string, value []byte, lease int64) (int64, bool, error) Update(ctx context.Context, key string, value []byte, prevRev, lease int64) (int64, bool, error) Delete(ctx context.Context, key string, revision int64) (int64, bool, error) DeleteRevision(ctx context.Context, revision int64) error @@ -106,7 +106,7 @@ func (s *SQLLog) compactStart(ctx context.Context) error { } if len(events) == 0 { - _, err := s.Create(ctx, "compact_rev_key", []byte(""), 0) + _, _, err := s.Create(ctx, "compact_rev_key", []byte(""), 0) return err } else if len(events) == 1 { return nil @@ -493,14 +493,15 @@ func (s *SQLLog) Count(ctx context.Context, prefix, startKey string, revision in return s.d.Count(ctx, prefix, startKey, revision) } -func (s *SQLLog) Create(ctx context.Context, key string, value []byte, lease int64) (rev int64, err error) { - rev, err = s.d.Create(ctx, key, value, lease) +func (s *SQLLog) Create(ctx context.Context, key string, value []byte, lease int64) (int64, bool, error) { + rev, created, err := s.d.Create(ctx, key, value, lease) if err != nil { - return 0, err + return 0, false, err } - - s.notifyWatcherPoll(rev) - return rev, nil + if created { + s.notifyWatcherPoll(rev) + } + return rev, created, nil } func (s *SQLLog) Delete(ctx context.Context, key string, revision int64) (rev int64, deleted bool, err error) { diff --git a/pkg/kine/server/create.go b/pkg/kine/server/create.go index 2779a0d3..f085ca3d 100644 --- a/pkg/kine/server/create.go +++ b/pkg/kine/server/create.go @@ -21,7 +21,7 @@ func isCreate(txn *etcdserverpb.TxnRequest) *etcdserverpb.PutRequest { return nil } -func (l *LimitedServer) create(ctx context.Context, put *etcdserverpb.PutRequest, txn *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) { +func (l *LimitedServer) create(ctx context.Context, put *etcdserverpb.PutRequest) (*etcdserverpb.TxnResponse, error) { var err error createCnt.Add(ctx, 1) ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.create", otelName)) @@ -42,16 +42,18 @@ func (l *LimitedServer) create(ctx context.Context, put *etcdserverpb.PutRequest return nil, unsupported("prevKv") } - rev, err := l.backend.Create(ctx, string(put.Key), put.Value, put.Lease) + rev, created, err := l.backend.Create(ctx, string(put.Key), put.Value, put.Lease) + if err != nil { + return nil, err + } + span.SetAttributes(attribute.Int64("revision", rev)) - if err == ErrKeyExists { + if !created { span.AddEvent("key exists") return &etcdserverpb.TxnResponse{ Header: txnHeader(rev), Succeeded: false, }, nil - } else if err != nil { - return nil, err } return &etcdserverpb.TxnResponse{ diff --git a/pkg/kine/server/limited.go b/pkg/kine/server/limited.go index 476f28fa..83ea068b 100644 --- a/pkg/kine/server/limited.go +++ b/pkg/kine/server/limited.go @@ -28,7 +28,7 @@ func txnHeader(rev int64) *etcdserverpb.ResponseHeader { func (l *LimitedServer) Txn(ctx context.Context, txn *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) { if put := isCreate(txn); put != nil { - return l.create(ctx, put, txn) + return l.create(ctx, put) } if rev, key, ok := isDelete(txn); ok { return l.delete(ctx, key, rev) diff --git a/pkg/kine/server/types.go b/pkg/kine/server/types.go index 9d78a87c..61580859 100644 --- a/pkg/kine/server/types.go +++ b/pkg/kine/server/types.go @@ -15,7 +15,7 @@ type Backend interface { Start(ctx context.Context) error Wait() Get(ctx context.Context, key, rangeEnd string, limit, revision int64) (int64, *KeyValue, error) - Create(ctx context.Context, key string, value []byte, lease int64) (int64, error) + Create(ctx context.Context, key string, value []byte, lease int64) (int64, bool, error) Delete(ctx context.Context, key string, revision int64) (int64, bool, error) List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) diff --git a/pkg/kine/server/update.go b/pkg/kine/server/update.go index 5fc5dabb..2cdf172f 100644 --- a/pkg/kine/server/update.go +++ b/pkg/kine/server/update.go @@ -27,9 +27,9 @@ func isUpdate(txn *etcdserverpb.TxnRequest) (int64, string, []byte, int64, bool) func (l *LimitedServer) update(ctx context.Context, rev int64, key string, value []byte, lease int64) (*etcdserverpb.TxnResponse, error) { var ( - kv *KeyValue - updated bool - err error + kv *KeyValue + succeeded bool + err error ) updateCnt.Add(ctx, 1) @@ -45,29 +45,21 @@ func (l *LimitedServer) update(ctx context.Context, rev int64, key string, value ) if rev == 0 { - rev, err = l.backend.Create(ctx, key, value, lease) - if err == ErrKeyExists { - return &etcdserverpb.TxnResponse{ - Header: txnHeader(rev), - Succeeded: false, - }, nil - } else { - updated = true - } + rev, succeeded, err = l.backend.Create(ctx, key, value, lease) } else { - rev, updated, err = l.backend.Update(ctx, key, value, rev, lease) + rev, succeeded, err = l.backend.Update(ctx, key, value, rev, lease) } if err != nil { return nil, err } - span.SetAttributes(attribute.Bool("updated", updated), attribute.Int64("revision", rev)) + span.SetAttributes(attribute.Bool("updated", succeeded), attribute.Int64("revision", rev)) resp := &etcdserverpb.TxnResponse{ Header: txnHeader(rev), - Succeeded: updated, + Succeeded: succeeded, } - if updated { + if succeeded { resp.Responses = []*etcdserverpb.ResponseOp{ { Response: &etcdserverpb.ResponseOp_ResponsePut{