From 77f11ae58c0058f8fb72e985a4d1f0b9bcc73b6f Mon Sep 17 00:00:00 2001 From: "Louise K. Schmidtgen" Date: Fri, 13 Sep 2024 10:10:47 +0200 Subject: [PATCH] Update Query (#170) --- pkg/kine/drivers/generic/generic.go | 41 +++++++++++++++ pkg/kine/logstructured/logstructured.go | 47 ++--------------- pkg/kine/logstructured/sqllog/sql.go | 39 +++++++------- pkg/kine/server/types.go | 2 +- pkg/kine/server/update.go | 32 +++++++----- test/delete_test.go | 3 +- test/update_test.go | 68 ++++++++++++++++++------- 7 files changed, 139 insertions(+), 93 deletions(-) diff --git a/pkg/kine/drivers/generic/generic.go b/pkg/kine/drivers/generic/generic.go index a4f05838..b2ec61ca 100644 --- a/pkg/kine/drivers/generic/generic.go +++ b/pkg/kine/drivers/generic/generic.go @@ -149,6 +149,7 @@ type Generic struct { FillSQL string InsertLastInsertIDSQL string CreateSQL string + UpdateSQL string GetSizeSQL string Retry ErrRetry TranslateErr TranslateErr @@ -312,6 +313,21 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig ) maxkv WHERE maxkv.deleted = 1 OR id IS NULL`, paramCharacter, numbered), + UpdateSQL: q(` + INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value) + SELECT + ? AS name, + 0 AS created, + 0 AS deleted, + create_revision, + id AS prev_revision, + ? AS lease, + ? AS value, + value AS old_value + FROM kine WHERE id = (SELECT MAX(id) FROM kine WHERE name = ?) + AND deleted = 0 + AND id = ?`, paramCharacter, numbered), + FillSQL: q(`INSERT INTO kine(id, name, created, deleted, create_revision, prev_revision, lease, value, old_value) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)`, paramCharacter, numbered), AdmissionControlPolicy: &allowAllPolicy{}, @@ -515,6 +531,31 @@ func (d *Generic) Create(ctx context.Context, key string, value []byte, ttl int6 } return result.LastInsertId() } +func (d *Generic) Update(ctx context.Context, key string, value []byte, preRev, ttl int64) (rev int64, updated bool, err error) { + ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Update", otelName)) + defer func() { + if err != nil { + if d.TranslateErr != nil { + err = d.TranslateErr(err) + } + span.RecordError(err) + } + span.End() + }() + + result, err := d.execute(ctx, "update_sql", d.UpdateSQL, key, ttl, value, key, preRev) + if err != nil { + logrus.WithError(err).Error("failed to update key") + return 0, false, err + } + if insertCount, err := result.RowsAffected(); err != nil { + return 0, false, err + } else if insertCount == 0 { + return 0, false, nil + } + rev, err = result.LastInsertId() + return rev, true, err +} // Compact compacts the database up to the revision provided in the method's call. // After the call, any request for a version older than the given revision will return diff --git a/pkg/kine/logstructured/logstructured.go b/pkg/kine/logstructured/logstructured.go index cc5dba4b..c32b5756 100644 --- a/pkg/kine/logstructured/logstructured.go +++ b/pkg/kine/logstructured/logstructured.go @@ -30,6 +30,7 @@ type Log interface { CurrentRevision(ctx context.Context) (int64, error) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error) Create(ctx context.Context, key string, value []byte, lease int64) (int64, error) + Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, updateRet bool, errRet error) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) Watch(ctx context.Context, prefix string) <-chan []*server.Event Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) @@ -275,61 +276,21 @@ func (l *LogStructured) Count(ctx context.Context, prefix, startKey string, revi return rev, count, nil } -func (l *LogStructured) Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, kvRet *server.KeyValue, updateRet bool, errRet error) { +func (l *LogStructured) Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, updateRet bool, errRet error) { ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Update", otelName)) defer func() { - l.adjustRevision(ctx, &revRet) - kvRev := int64(0) - if kvRet != nil { - kvRev = kvRet.ModRevision - } - logrus.Debugf("UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, kvrev=%d, updated=%v, err=%v", key, len(value), revision, lease, revRet, kvRev, updateRet, errRet) + logrus.Debugf("UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, updated=%v, err=%v", key, len(value), revision, lease, revRet, updateRet, errRet) span.SetAttributes( attribute.String("key", key), attribute.Int64("revision", revision), attribute.Int64("lease", lease), attribute.Int64("value-size", int64(len(value))), attribute.Int64("adjusted-revision", revRet), - attribute.Int64("kv-mod-revision", kvRev), attribute.Bool("updated", updateRet), ) span.End() }() - - rev, event, err := l.get(ctx, key, "", 1, 0, false) - if err != nil { - return 0, nil, false, err - } - - if event == nil { - return 0, nil, false, nil - } - - if event.KV.ModRevision != revision { - return rev, event.KV, false, nil - } - - updateEvent := &server.Event{ - KV: &server.KeyValue{ - Key: key, - CreateRevision: event.KV.CreateRevision, - Value: value, - Lease: lease, - }, - PrevKV: event.KV, - } - - rev, err = l.log.Append(ctx, updateEvent) - if err != nil { - rev, event, err := l.get(ctx, key, "", 1, 0, false) - if event == nil { - return rev, nil, false, err - } - return rev, event.KV, false, err - } - - updateEvent.KV.ModRevision = rev - return rev, updateEvent.KV, true, err + return l.log.Update(ctx, key, value, revision, lease) } func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event { diff --git a/pkg/kine/logstructured/sqllog/sql.go b/pkg/kine/logstructured/sqllog/sql.go index ccff227c..4ce766b4 100644 --- a/pkg/kine/logstructured/sqllog/sql.go +++ b/pkg/kine/logstructured/sqllog/sql.go @@ -67,6 +67,7 @@ type Dialect interface { After(ctx context.Context, rev, limit int64) (*sql.Rows, error) Insert(ctx context.Context, key string, create, delete bool, createRevision, previousRevision int64, ttl int64, value, prevValue []byte) (int64, error) Create(ctx context.Context, key string, value []byte, lease int64) (int64, error) + Update(ctx context.Context, key string, value []byte, prevRev, lease int64) (int64, bool, error) DeleteRevision(ctx context.Context, revision int64) error GetCompactRevision(ctx context.Context) (int64, int64, error) Compact(ctx context.Context, revision int64) error @@ -272,10 +273,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis return rev, result, server.ErrCompacted } - select { - case s.notify <- rev: - default: - } + s.notifyWatcherPoll(rev) return rev, result, err } @@ -441,18 +439,12 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { // and trigger a quick retry for simple out of order events skip = next skipTime = time.Now() - select { - case s.notify <- next: - default: - } + s.notifyWatcherPoll(next) break } else { if err := s.d.Fill(s.ctx, next); err == nil { logrus.Debugf("FILL, revision=%d, err=%v", next, err) - select { - case s.notify <- next: - default: - } + s.notifyWatcherPoll(next) } else { logrus.Debugf("FILL FAILED, revision=%d, err=%v", next, err) } @@ -537,10 +529,7 @@ func (s *SQLLog) Append(ctx context.Context, event *server.Event) (int64, error) if err != nil { return 0, err } - select { - case s.notify <- rev: - default: - } + s.notifyWatcherPoll(rev) return rev, nil } @@ -558,11 +547,25 @@ func (s *SQLLog) Create(ctx context.Context, key string, value []byte, lease int return 0, err } + s.notifyWatcherPoll(rev) + return rev, nil +} + +func (s *SQLLog) Update(ctx context.Context, key string, value []byte, prevRev, lease int64) (rev int64, updated bool, err error) { + rev, updated, err = s.d.Update(ctx, key, value, prevRev, lease) + if err != nil { + return 0, false, err + } + + s.notifyWatcherPoll(rev) + return rev, updated, nil +} + +func (s *SQLLog) notifyWatcherPoll(revision int64) { select { - case s.notify <- rev: + case s.notify <- revision: default: } - return rev, nil } func scan(rows *sql.Rows, event *server.Event) error { diff --git a/pkg/kine/server/types.go b/pkg/kine/server/types.go index 642587a3..67544492 100644 --- a/pkg/kine/server/types.go +++ b/pkg/kine/server/types.go @@ -19,7 +19,7 @@ type Backend interface { Delete(ctx context.Context, key string, revision int64) (int64, *KeyValue, bool, error) List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) - Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *KeyValue, bool, error) + Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, bool, error) Watch(ctx context.Context, key string, revision int64) <-chan []*Event DbSize(ctx context.Context) (int64, error) DoCompact(ctx context.Context) error diff --git a/pkg/kine/server/update.go b/pkg/kine/server/update.go index 94acfd49..5fc5dabb 100644 --- a/pkg/kine/server/update.go +++ b/pkg/kine/server/update.go @@ -27,9 +27,9 @@ func isUpdate(txn *etcdserverpb.TxnRequest) (int64, string, []byte, int64, bool) func (l *LimitedServer) update(ctx context.Context, rev int64, key string, value []byte, lease int64) (*etcdserverpb.TxnResponse, error) { var ( - kv *KeyValue - ok bool - err error + kv *KeyValue + updated bool + err error ) updateCnt.Add(ctx, 1) @@ -46,26 +46,28 @@ func (l *LimitedServer) update(ctx context.Context, rev int64, key string, value if rev == 0 { rev, err = l.backend.Create(ctx, key, value, lease) - ok = true - - span.SetAttributes( - attribute.Int64("revision", rev), - attribute.Bool("ok", ok), - ) + if err == ErrKeyExists { + return &etcdserverpb.TxnResponse{ + Header: txnHeader(rev), + Succeeded: false, + }, nil + } else { + updated = true + } } else { - rev, kv, ok, err = l.backend.Update(ctx, key, value, rev, lease) - span.SetAttributes(attribute.Bool("ok", ok)) + rev, updated, err = l.backend.Update(ctx, key, value, rev, lease) } if err != nil { return nil, err } + span.SetAttributes(attribute.Bool("updated", updated), attribute.Int64("revision", rev)) resp := &etcdserverpb.TxnResponse{ Header: txnHeader(rev), - Succeeded: ok, + Succeeded: updated, } - if ok { + if updated { resp.Responses = []*etcdserverpb.ResponseOp{ { Response: &etcdserverpb.ResponseOp_ResponsePut{ @@ -76,6 +78,10 @@ func (l *LimitedServer) update(ctx context.Context, rev int64, key string, value }, } } else { + rev, kv, err = l.backend.Get(ctx, key, "", 1, rev) + if err != nil { + return nil, err + } resp.Responses = []*etcdserverpb.ResponseOp{ { Response: &etcdserverpb.ResponseOp_ResponseRange{ diff --git a/test/delete_test.go b/test/delete_test.go index 948eb60f..f110b38d 100644 --- a/test/delete_test.go +++ b/test/delete_test.go @@ -104,7 +104,7 @@ func assertMissingKey(ctx context.Context, g Gomega, client *clientv3.Client, ke g.Expect(resp.Kvs).To(HaveLen(0)) } -func deleteKey(ctx context.Context, g Gomega, client *clientv3.Client, key string) { +func deleteKey(ctx context.Context, g Gomega, client *clientv3.Client, key string) int64 { // The Get before the Delete is to trick kine to accept the transaction resp, err := client.Txn(ctx). Then(clientv3.OpGet(key), clientv3.OpDelete(key)). @@ -112,6 +112,7 @@ func deleteKey(ctx context.Context, g Gomega, client *clientv3.Client, key strin g.Expect(err).To(BeNil()) g.Expect(resp.Succeeded).To(BeTrue()) + return resp.Header.Revision } func assertKey(ctx context.Context, g Gomega, client *clientv3.Client, key string, value string) { diff --git a/test/update_test.go b/test/update_test.go index 3345c2e2..fa9698c1 100644 --- a/test/update_test.go +++ b/test/update_test.go @@ -20,20 +20,6 @@ func TestUpdate(t *testing.T) { kine := newKineServer(ctx, t, &kineOptions{backendType: backendType}) - // Testing that update can create a new key if ModRevision is 0 - t.Run("UpdateNewKey", func(t *testing.T) { - g := NewWithT(t) - - createKey(ctx, g, kine.client, "updateNewKey", "testValue") - - resp, err := kine.client.Get(ctx, "updateNewKey", clientv3.WithRange("")) - g.Expect(err).To(BeNil()) - g.Expect(resp.Kvs).To(HaveLen(1)) - g.Expect(resp.Kvs[0].Key).To(Equal([]byte("updateNewKey"))) - g.Expect(resp.Kvs[0].Value).To(Equal([]byte("testValue"))) - g.Expect(resp.Kvs[0].ModRevision).To(Equal(int64(resp.Kvs[0].CreateRevision))) - }) - t.Run("UpdateExisting", func(t *testing.T) { g := NewWithT(t) @@ -41,13 +27,28 @@ func TestUpdate(t *testing.T) { updateRev(ctx, g, kine.client, "updateExistingKey", lastModRev, "testValue2") resp, err := kine.client.Get(ctx, "updateExistingKey", clientv3.WithRange("")) - g.Expect(err).To(BeNil()) + g.Expect(err).ToNot(HaveOccurred()) g.Expect(resp.Kvs).To(HaveLen(1)) g.Expect(resp.Kvs[0].Key).To(Equal([]byte("updateExistingKey"))) g.Expect(resp.Kvs[0].Value).To(Equal([]byte("testValue2"))) g.Expect(resp.Kvs[0].ModRevision).To(BeNumerically(">", resp.Kvs[0].CreateRevision)) }) + t.Run("CreateExistingFails", func(t *testing.T) { + g := NewWithT(t) + + createKey(ctx, g, kine.client, "createExistingKey", "testValue1") + + resp, err := kine.client.Txn(ctx). + If(clientv3.Compare(clientv3.ModRevision("createExistingKey"), "=", 0)). + Then(clientv3.OpPut("createExistingKey", "testValue1")). + Else(clientv3.OpGet("createExistingKey", clientv3.WithRange(""))). + Commit() + + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(resp.Succeeded).To(BeFalse()) + }) + // Trying to update an old revision(in compare) should fail t.Run("UpdateOldRevisionFails", func(t *testing.T) { g := NewWithT(t) @@ -61,7 +62,40 @@ func TestUpdate(t *testing.T) { Else(clientv3.OpGet("updateOldRevKey", clientv3.WithRange(""))). Commit() - g.Expect(err).To(BeNil()) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(resp.Succeeded).To(BeFalse()) + g.Expect(resp.Responses).To(HaveLen(1)) + g.Expect(resp.Responses[0].GetResponseRange()).ToNot(BeNil()) + }) + + t.Run("UpdateNotExistingFails", func(t *testing.T) { + g := NewWithT(t) + + resp, err := kine.client.Txn(ctx). + If(clientv3.Compare(clientv3.ModRevision("updateNotExistingKey"), "=", 1)). + Then(clientv3.OpPut("updateNotExistingKey", "testValue3")). + Else(clientv3.OpGet("updateNotExistingKey", clientv3.WithRange(""))). + Commit() + + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(resp.Succeeded).To(BeFalse()) + g.Expect(resp.Responses).To(HaveLen(1)) + g.Expect(resp.Responses[0].GetResponseRange()).ToNot(BeNil()) + }) + + t.Run("UpdatedDeletedKeyFails", func(t *testing.T) { + g := NewWithT(t) + + createKey(ctx, g, kine.client, "updateDeletedKey", "testValue4") + lastModRev := deleteKey(ctx, g, kine.client, "updateDeletedKey") + + resp, err := kine.client.Txn(ctx). + If(clientv3.Compare(clientv3.ModRevision("updateDeletedKey"), "=", lastModRev)). + Then(clientv3.OpPut("updateDeletedKey", "testValue4")). + Else(clientv3.OpGet("updateDeletedKey", clientv3.WithRange(""))). + Commit() + + g.Expect(err).ToNot(HaveOccurred()) g.Expect(resp.Succeeded).To(BeFalse()) g.Expect(resp.Responses).To(HaveLen(1)) g.Expect(resp.Responses[0].GetResponseRange()).ToNot(BeNil()) @@ -114,7 +148,7 @@ func updateRev(ctx context.Context, g Gomega, client *clientv3.Client, key strin } resp, err := txn.Commit() - g.Expect(err).To(BeNil()) + g.Expect(err).ToNot(HaveOccurred()) g.Expect(resp.Succeeded).To(BeTrue()) return resp.Responses[0].GetResponsePut().Header.Revision