Skip to content

Commit

Permalink
Make Create path uniform with Update/Delete (#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
marco6 authored Oct 24, 2024
1 parent 33a1a5e commit 92bb379
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 44 deletions.
14 changes: 7 additions & 7 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/canonical/k8s-dqlite/pkg/kine/prepared"
"github.com/canonical/k8s-dqlite/pkg/kine/server"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -519,7 +518,7 @@ func (d *Generic) Count(ctx context.Context, prefix, startKey string, revision i
return rev.Int64, id, err
}

func (d *Generic) Create(ctx context.Context, key string, value []byte, ttl int64) (rev int64, err error) {
func (d *Generic) Create(ctx context.Context, key string, value []byte, ttl int64) (rev int64, succeeded bool, err error) {
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Create", otelName))

defer func() {
Expand All @@ -541,16 +540,17 @@ func (d *Generic) Create(ctx context.Context, key string, value []byte, ttl int6
result, err := d.execute(ctx, "create_sql", d.CreateSQL, key, ttl, value, key)
if err != nil {
logrus.WithError(err).Error("failed to create key")
return 0, err
return 0, false, err
}

if insertCount, err := result.RowsAffected(); err != nil {
return 0, err
return 0, false, err
} else if insertCount == 0 {
return 0, server.ErrKeyExists
return 0, false, nil
}
return result.LastInsertId()
rev, err = result.LastInsertId()
return rev, true, err
}

func (d *Generic) Update(ctx context.Context, key string, value []byte, preRev, ttl int64) (rev int64, updated bool, err error) {
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Update", otelName))
defer func() {
Expand Down
12 changes: 6 additions & 6 deletions pkg/kine/logstructured/logstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ type Log interface {
Wait()
CurrentRevision(ctx context.Context) (int64, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
Create(ctx context.Context, key string, value []byte, lease int64) (int64, error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, updateRet bool, errRet error)
Delete(ctx context.Context, key string, revision int64) (revRet int64, deleted bool, errRet error)
Create(ctx context.Context, key string, value []byte, lease int64) (rev int64, created bool, err error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (rev int64, updated bool, err error)
Delete(ctx context.Context, key string, revision int64) (rev int64, deleted bool, err error)
After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
Watch(ctx context.Context, prefix string) <-chan []*server.Event
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
Expand Down Expand Up @@ -137,10 +137,10 @@ func (l *LogStructured) adjustRevision(ctx context.Context, rev *int64) {
}
}

func (l *LogStructured) Create(ctx context.Context, key string, value []byte, lease int64) (rev int64, err error) {
rev, err = l.log.Create(ctx, key, value, lease)
func (l *LogStructured) Create(ctx context.Context, key string, value []byte, lease int64) (rev int64, created bool, err error) {
rev, created, err = l.log.Create(ctx, key, value, lease)
logrus.Debugf("CREATE %s, size=%d, lease=%d => rev=%d, err=%v", key, len(value), lease, rev, err)
return rev, err
return rev, created, err
}

func (l *LogStructured) Delete(ctx context.Context, key string, revision int64) (revRet int64, deleted bool, errRet error) {
Expand Down
17 changes: 9 additions & 8 deletions pkg/kine/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Dialect interface {
CurrentRevision(ctx context.Context) (int64, error)
AfterPrefix(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error)
After(ctx context.Context, rev, limit int64) (*sql.Rows, error)
Create(ctx context.Context, key string, value []byte, lease int64) (int64, error)
Create(ctx context.Context, key string, value []byte, lease int64) (int64, bool, error)
Update(ctx context.Context, key string, value []byte, prevRev, lease int64) (int64, bool, error)
Delete(ctx context.Context, key string, revision int64) (int64, bool, error)
DeleteRevision(ctx context.Context, revision int64) error
Expand Down Expand Up @@ -106,7 +106,7 @@ func (s *SQLLog) compactStart(ctx context.Context) error {
}

if len(events) == 0 {
_, err := s.Create(ctx, "compact_rev_key", []byte(""), 0)
_, _, err := s.Create(ctx, "compact_rev_key", []byte(""), 0)
return err
} else if len(events) == 1 {
return nil
Expand Down Expand Up @@ -493,14 +493,15 @@ func (s *SQLLog) Count(ctx context.Context, prefix, startKey string, revision in
return s.d.Count(ctx, prefix, startKey, revision)
}

func (s *SQLLog) Create(ctx context.Context, key string, value []byte, lease int64) (rev int64, err error) {
rev, err = s.d.Create(ctx, key, value, lease)
func (s *SQLLog) Create(ctx context.Context, key string, value []byte, lease int64) (int64, bool, error) {
rev, created, err := s.d.Create(ctx, key, value, lease)
if err != nil {
return 0, err
return 0, false, err
}

s.notifyWatcherPoll(rev)
return rev, nil
if created {
s.notifyWatcherPoll(rev)
}
return rev, created, nil
}

func (s *SQLLog) Delete(ctx context.Context, key string, revision int64) (rev int64, deleted bool, err error) {
Expand Down
12 changes: 7 additions & 5 deletions pkg/kine/server/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func isCreate(txn *etcdserverpb.TxnRequest) *etcdserverpb.PutRequest {
return nil
}

func (l *LimitedServer) create(ctx context.Context, put *etcdserverpb.PutRequest, txn *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) {
func (l *LimitedServer) create(ctx context.Context, put *etcdserverpb.PutRequest) (*etcdserverpb.TxnResponse, error) {
var err error
createCnt.Add(ctx, 1)
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.create", otelName))
Expand All @@ -42,16 +42,18 @@ func (l *LimitedServer) create(ctx context.Context, put *etcdserverpb.PutRequest
return nil, unsupported("prevKv")
}

rev, err := l.backend.Create(ctx, string(put.Key), put.Value, put.Lease)
rev, created, err := l.backend.Create(ctx, string(put.Key), put.Value, put.Lease)
if err != nil {
return nil, err
}

span.SetAttributes(attribute.Int64("revision", rev))
if err == ErrKeyExists {
if !created {
span.AddEvent("key exists")
return &etcdserverpb.TxnResponse{
Header: txnHeader(rev),
Succeeded: false,
}, nil
} else if err != nil {
return nil, err
}

return &etcdserverpb.TxnResponse{
Expand Down
2 changes: 1 addition & 1 deletion pkg/kine/server/limited.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func txnHeader(rev int64) *etcdserverpb.ResponseHeader {

func (l *LimitedServer) Txn(ctx context.Context, txn *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) {
if put := isCreate(txn); put != nil {
return l.create(ctx, put, txn)
return l.create(ctx, put)
}
if rev, key, ok := isDelete(txn); ok {
return l.delete(ctx, key, rev)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kine/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Backend interface {
Start(ctx context.Context) error
Wait()
Get(ctx context.Context, key, rangeEnd string, limit, revision int64) (int64, *KeyValue, error)
Create(ctx context.Context, key string, value []byte, lease int64) (int64, error)
Create(ctx context.Context, key string, value []byte, lease int64) (int64, bool, error)
Delete(ctx context.Context, key string, revision int64) (int64, bool, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error)
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
Expand Down
24 changes: 8 additions & 16 deletions pkg/kine/server/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func isUpdate(txn *etcdserverpb.TxnRequest) (int64, string, []byte, int64, bool)

func (l *LimitedServer) update(ctx context.Context, rev int64, key string, value []byte, lease int64) (*etcdserverpb.TxnResponse, error) {
var (
kv *KeyValue
updated bool
err error
kv *KeyValue
succeeded bool
err error
)
updateCnt.Add(ctx, 1)

Expand All @@ -45,29 +45,21 @@ func (l *LimitedServer) update(ctx context.Context, rev int64, key string, value
)

if rev == 0 {
rev, err = l.backend.Create(ctx, key, value, lease)
if err == ErrKeyExists {
return &etcdserverpb.TxnResponse{
Header: txnHeader(rev),
Succeeded: false,
}, nil
} else {
updated = true
}
rev, succeeded, err = l.backend.Create(ctx, key, value, lease)
} else {
rev, updated, err = l.backend.Update(ctx, key, value, rev, lease)
rev, succeeded, err = l.backend.Update(ctx, key, value, rev, lease)
}
if err != nil {
return nil, err
}
span.SetAttributes(attribute.Bool("updated", updated), attribute.Int64("revision", rev))
span.SetAttributes(attribute.Bool("updated", succeeded), attribute.Int64("revision", rev))

resp := &etcdserverpb.TxnResponse{
Header: txnHeader(rev),
Succeeded: updated,
Succeeded: succeeded,
}

if updated {
if succeeded {
resp.Responses = []*etcdserverpb.ResponseOp{
{
Response: &etcdserverpb.ResponseOp_ResponsePut{
Expand Down

0 comments on commit 92bb379

Please sign in to comment.