Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve rows parsing logic #174

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions pkg/kine/logstructured/logstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ type Log interface {
Start(ctx context.Context) error
Wait()
CurrentRevision(ctx context.Context) (int64, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []server.Event, error)
Create(ctx context.Context, key string, value []byte, lease int64) (int64, error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, updateRet bool, errRet error)
After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
Watch(ctx context.Context, prefix string) <-chan []*server.Event
After(ctx context.Context, prefix string, revision, limit int64) (int64, []server.Event, error)
Watch(ctx context.Context, prefix string) <-chan []server.Event
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
Append(ctx context.Context, event *server.Event) (int64, error)
DbSize(ctx context.Context) (int64, error)
Expand Down Expand Up @@ -124,7 +124,7 @@ func (l *LogStructured) get(ctx context.Context, key, rangeEnd string, limit, re
if len(events) == 0 {
return rev, nil, nil
}
return rev, events[0], nil
return rev, &events[0], nil
}

func (l *LogStructured) adjustRevision(ctx context.Context, rev *int64) {
Expand Down Expand Up @@ -293,8 +293,8 @@ func (l *LogStructured) Update(ctx context.Context, key string, value []byte, re
return l.log.Update(ctx, key, value, revision, lease)
}

func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event {
result := make(chan *server.Event)
func (l *LogStructured) ttlEvents(ctx context.Context) chan server.Event {
result := make(chan server.Event)
var shouldClose atomic.Bool

l.wg.Add(2)
Expand Down Expand Up @@ -345,7 +345,7 @@ func (l *LogStructured) ttl(ctx context.Context) {
// very naive TTL support
mutex := &sync.Mutex{}
for event := range l.ttlEvents(ctx) {
go func(event *server.Event) {
go func(event server.Event) {
select {
case <-ctx.Done():
return
Expand All @@ -358,7 +358,7 @@ func (l *LogStructured) ttl(ctx context.Context) {
}
}

func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event {
func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) <-chan []server.Event {
logrus.Debugf("WATCH %s, revision=%d", prefix, revision)
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Watch", otelName))
defer span.End()
Expand All @@ -376,7 +376,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64
revision -= 1
}

result := make(chan []*server.Event, 100)
result := make(chan []server.Event, 100)

rev, kvs, err := l.log.After(ctx, prefix, revision, 0)
if err != nil {
Expand Down Expand Up @@ -414,7 +414,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64
return result
}

func filter(events []*server.Event, rev int64) []*server.Event {
func filter(events []server.Event, rev int64) []server.Event {
for len(events) > 0 && events[0].KV.ModRevision <= rev {
events = events[1:]
}
Expand Down
71 changes: 71 additions & 0 deletions pkg/kine/logstructured/sqllog/rows_scanner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package sqllog

import (
"database/sql"

"github.com/canonical/k8s-dqlite/pkg/kine/server"
)

// rowsScanner is used as an iterim object to avoid
// allocation during rows iteration. It is effective
// mostly when returning a big amount of rows.
type rowsScanner struct {
currentRev int64
key string
create bool
delete bool
createRev int64
prevRev int64
lease int64
value []byte
prevValue []byte
}

func (rs *rowsScanner) scan(rows *sql.Rows) (server.Event, error) {
err := rows.Scan(
&rs.currentRev,
&rs.key,
&rs.create,
&rs.delete,
&rs.createRev,
&rs.prevRev,
&rs.lease,
&rs.value,
&rs.prevValue,
)
if err != nil {
return server.Event{}, err
}

if rs.create {
return server.Event{
Create: true,
Delete: false,
KV: &server.KeyValue{
ModRevision: rs.currentRev,
Key: rs.key,
CreateRevision: rs.createRev,
Lease: rs.lease,
Value: rs.value,
},
}, nil
} else {
return server.Event{
Create: false,
Delete: rs.delete,
KV: &server.KeyValue{
ModRevision: rs.currentRev,
Key: rs.key,
CreateRevision: rs.createRev,
Lease: rs.lease,
Value: rs.value,
},
PrevKV: &server.KeyValue{
ModRevision: rs.prevRev,
Key: rs.key,
CreateRevision: rs.createRev,
Value: rs.prevValue,
},
}, nil
}
}
59 changes: 19 additions & 40 deletions pkg/kine/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) {
return s.d.CurrentRevision(ctx)
}

func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) {
func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []server.Event, error) {
var err error
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.After", otelName))
defer func() {
Expand Down Expand Up @@ -221,7 +221,7 @@ func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64
return rev, result, err
}

func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (int64, []*server.Event, error) {
func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (int64, []server.Event, error) {
var (
rows *sql.Rows
err error
Expand Down Expand Up @@ -278,23 +278,29 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis
return rev, result, err
}

func RowsToEvents(rows *sql.Rows) ([]*server.Event, error) {
var result []*server.Event
var scannerPool = sync.Pool{
New: func() any { return &rowsScanner{} },
}

func RowsToEvents(rows *sql.Rows) ([]server.Event, error) {
defer rows.Close()

scanner := scannerPool.Get().(*rowsScanner)
defer scannerPool.Put(scanner)

result := []server.Event{}
for rows.Next() {
event := &server.Event{}
if err := scan(rows, event); err != nil {
event, err := scanner.scan(rows)
if err != nil {
return nil, err
}
result = append(result, event)
}

return result, nil
}

func (s *SQLLog) Watch(ctx context.Context, prefix string) <-chan []*server.Event {
res := make(chan []*server.Event, 100)
func (s *SQLLog) Watch(ctx context.Context, prefix string) <-chan []server.Event {
res := make(chan []server.Event, 100)
values, err := s.broadcaster.Subscribe(ctx)
if err != nil {
return nil
Expand All @@ -318,9 +324,9 @@ func (s *SQLLog) Watch(ctx context.Context, prefix string) <-chan []*server.Even
return res
}

func filter(events interface{}, checkPrefix bool, prefix string) ([]*server.Event, bool) {
eventList := events.([]*server.Event)
filteredEventList := make([]*server.Event, 0, len(eventList))
func filter(events interface{}, checkPrefix bool, prefix string) ([]server.Event, bool) {
eventList := events.([]server.Event)
filteredEventList := make([]server.Event, 0, len(eventList))

for _, event := range eventList {
if (checkPrefix && strings.HasPrefix(event.KV.Key, prefix)) || event.KV.Key == prefix {
Expand Down Expand Up @@ -421,7 +427,7 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {

rev := last
var (
sequential []*server.Event
sequential []server.Event
saveLast bool
)

Expand Down Expand Up @@ -568,33 +574,6 @@ func (s *SQLLog) notifyWatcherPoll(revision int64) {
}
}

func scan(rows *sql.Rows, event *server.Event) error {
event.KV = &server.KeyValue{}
event.PrevKV = &server.KeyValue{}

err := rows.Scan(
&event.KV.ModRevision,
&event.KV.Key,
&event.Create,
&event.Delete,
&event.KV.CreateRevision,
&event.PrevKV.ModRevision,
&event.KV.Lease,
&event.KV.Value,
&event.PrevKV.Value,
)
if err != nil {
return err
}

if event.Create {
event.KV.CreateRevision = event.KV.ModRevision
event.PrevKV = nil
}

return nil
}

func (s *SQLLog) DbSize(ctx context.Context) (int64, error) {
var err error
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.DbSize", otelName))
Expand Down
2 changes: 1 addition & 1 deletion pkg/kine/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Backend interface {
List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error)
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, bool, error)
Watch(ctx context.Context, key string, revision int64) <-chan []*Event
Watch(ctx context.Context, key string, revision int64) <-chan []Event
DbSize(ctx context.Context) (int64, error)
DoCompact(ctx context.Context) error
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/kine/server/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest)
}

if logrus.IsLevelEnabled(logrus.DebugLevel) {
for _, event := range events {
logrus.Debugf("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision)
for i := range events {
logrus.Debugf("WATCH READ id=%d, key=%s, revision=%d", id, events[i].KV.Key, events[i].KV.ModRevision)
}
}

if err := w.server.Send(&etcdserverpb.WatchResponse{
Header: txnHeader(events[len(events)-1].KV.ModRevision),
WatchId: id,
Events: toEvents(events...),
Events: toEvents(events),
}); err != nil {
w.Cancel(id, err)
continue
Expand All @@ -96,10 +96,10 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest)
}()
}

func toEvents(events ...*Event) []*mvccpb.Event {
func toEvents(events []Event) []*mvccpb.Event {
ret := make([]*mvccpb.Event, 0, len(events))
for _, e := range events {
ret = append(ret, toEvent(e))
for i := range events {
ret = append(ret, toEvent(&events[i]))
}
return ret
}
Expand Down
Loading