From 8b94d716819c861b897fa757effda5e01b9c011d Mon Sep 17 00:00:00 2001 From: Marco Manino Date: Fri, 20 Sep 2024 10:29:10 +0200 Subject: [PATCH] Improve rows parsing logic --- pkg/kine/logstructured/logstructured.go | 20 +++--- pkg/kine/logstructured/sqllog/rows_scanner.go | 71 +++++++++++++++++++ pkg/kine/logstructured/sqllog/sql.go | 59 +++++---------- pkg/kine/server/types.go | 2 +- pkg/kine/server/watch.go | 12 ++-- 5 files changed, 107 insertions(+), 57 deletions(-) create mode 100644 pkg/kine/logstructured/sqllog/rows_scanner.go diff --git a/pkg/kine/logstructured/logstructured.go b/pkg/kine/logstructured/logstructured.go index c32b5756..b92e99ec 100644 --- a/pkg/kine/logstructured/logstructured.go +++ b/pkg/kine/logstructured/logstructured.go @@ -28,11 +28,11 @@ type Log interface { Start(ctx context.Context) error Wait() CurrentRevision(ctx context.Context) (int64, error) - List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error) + List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []server.Event, error) Create(ctx context.Context, key string, value []byte, lease int64) (int64, error) Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, updateRet bool, errRet error) - After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) - Watch(ctx context.Context, prefix string) <-chan []*server.Event + After(ctx context.Context, prefix string, revision, limit int64) (int64, []server.Event, error) + Watch(ctx context.Context, prefix string) <-chan []server.Event Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) Append(ctx context.Context, event *server.Event) (int64, error) DbSize(ctx context.Context) (int64, error) @@ -124,7 +124,7 @@ func (l *LogStructured) get(ctx context.Context, key, rangeEnd string, limit, re if len(events) == 0 { return rev, nil, nil } - return rev, events[0], nil + return rev, &events[0], nil } func (l *LogStructured) adjustRevision(ctx context.Context, rev *int64) { @@ -293,8 +293,8 @@ func (l *LogStructured) Update(ctx context.Context, key string, value []byte, re return l.log.Update(ctx, key, value, revision, lease) } -func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event { - result := make(chan *server.Event) +func (l *LogStructured) ttlEvents(ctx context.Context) chan server.Event { + result := make(chan server.Event) var shouldClose atomic.Bool l.wg.Add(2) @@ -345,7 +345,7 @@ func (l *LogStructured) ttl(ctx context.Context) { // very naive TTL support mutex := &sync.Mutex{} for event := range l.ttlEvents(ctx) { - go func(event *server.Event) { + go func(event server.Event) { select { case <-ctx.Done(): return @@ -358,7 +358,7 @@ func (l *LogStructured) ttl(ctx context.Context) { } } -func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event { +func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) <-chan []server.Event { logrus.Debugf("WATCH %s, revision=%d", prefix, revision) ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Watch", otelName)) defer span.End() @@ -376,7 +376,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64 revision -= 1 } - result := make(chan []*server.Event, 100) + result := make(chan []server.Event, 100) rev, kvs, err := l.log.After(ctx, prefix, revision, 0) if err != nil { @@ -414,7 +414,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64 return result } -func filter(events []*server.Event, rev int64) []*server.Event { +func filter(events []server.Event, rev int64) []server.Event { for len(events) > 0 && events[0].KV.ModRevision <= rev { events = events[1:] } diff --git a/pkg/kine/logstructured/sqllog/rows_scanner.go b/pkg/kine/logstructured/sqllog/rows_scanner.go new file mode 100644 index 00000000..bf317192 --- /dev/null +++ b/pkg/kine/logstructured/sqllog/rows_scanner.go @@ -0,0 +1,71 @@ +package sqllog + +import ( + "database/sql" + + "github.com/canonical/k8s-dqlite/pkg/kine/server" +) + +// rowsScanner is used as an iterim object to avoid +// allocation during rows iteration. It is effective +// mostly when returning a big amount of rows. +type rowsScanner struct { + currentRev int64 + key string + create bool + delete bool + createRev int64 + prevRev int64 + lease int64 + value []byte + prevValue []byte +} + +func (rs *rowsScanner) scan(rows *sql.Rows) (server.Event, error) { + err := rows.Scan( + &rs.currentRev, + &rs.key, + &rs.create, + &rs.delete, + &rs.createRev, + &rs.prevRev, + &rs.lease, + &rs.value, + &rs.prevValue, + ) + if err != nil { + return server.Event{}, err + } + + if rs.create { + return server.Event{ + Create: true, + Delete: false, + KV: &server.KeyValue{ + ModRevision: rs.currentRev, + Key: rs.key, + CreateRevision: rs.createRev, + Lease: rs.lease, + Value: rs.value, + }, + }, nil + } else { + return server.Event{ + Create: false, + Delete: rs.delete, + KV: &server.KeyValue{ + ModRevision: rs.currentRev, + Key: rs.key, + CreateRevision: rs.createRev, + Lease: rs.lease, + Value: rs.value, + }, + PrevKV: &server.KeyValue{ + ModRevision: rs.prevRev, + Key: rs.key, + CreateRevision: rs.createRev, + Value: rs.prevValue, + }, + }, nil + } +} diff --git a/pkg/kine/logstructured/sqllog/sql.go b/pkg/kine/logstructured/sqllog/sql.go index 4ce766b4..0faaabde 100644 --- a/pkg/kine/logstructured/sqllog/sql.go +++ b/pkg/kine/logstructured/sqllog/sql.go @@ -186,7 +186,7 @@ func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) { return s.d.CurrentRevision(ctx) } -func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) { +func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []server.Event, error) { var err error ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.After", otelName)) defer func() { @@ -221,7 +221,7 @@ func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64 return rev, result, err } -func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (int64, []*server.Event, error) { +func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (int64, []server.Event, error) { var ( rows *sql.Rows err error @@ -278,23 +278,29 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis return rev, result, err } -func RowsToEvents(rows *sql.Rows) ([]*server.Event, error) { - var result []*server.Event +var scannerPool = sync.Pool{ + New: func() any { return &rowsScanner{} }, +} + +func RowsToEvents(rows *sql.Rows) ([]server.Event, error) { defer rows.Close() + scanner := scannerPool.Get().(*rowsScanner) + defer scannerPool.Put(scanner) + + result := []server.Event{} for rows.Next() { - event := &server.Event{} - if err := scan(rows, event); err != nil { + event, err := scanner.scan(rows) + if err != nil { return nil, err } result = append(result, event) } - return result, nil } -func (s *SQLLog) Watch(ctx context.Context, prefix string) <-chan []*server.Event { - res := make(chan []*server.Event, 100) +func (s *SQLLog) Watch(ctx context.Context, prefix string) <-chan []server.Event { + res := make(chan []server.Event, 100) values, err := s.broadcaster.Subscribe(ctx) if err != nil { return nil @@ -318,9 +324,9 @@ func (s *SQLLog) Watch(ctx context.Context, prefix string) <-chan []*server.Even return res } -func filter(events interface{}, checkPrefix bool, prefix string) ([]*server.Event, bool) { - eventList := events.([]*server.Event) - filteredEventList := make([]*server.Event, 0, len(eventList)) +func filter(events interface{}, checkPrefix bool, prefix string) ([]server.Event, bool) { + eventList := events.([]server.Event) + filteredEventList := make([]server.Event, 0, len(eventList)) for _, event := range eventList { if (checkPrefix && strings.HasPrefix(event.KV.Key, prefix)) || event.KV.Key == prefix { @@ -421,7 +427,7 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { rev := last var ( - sequential []*server.Event + sequential []server.Event saveLast bool ) @@ -568,33 +574,6 @@ func (s *SQLLog) notifyWatcherPoll(revision int64) { } } -func scan(rows *sql.Rows, event *server.Event) error { - event.KV = &server.KeyValue{} - event.PrevKV = &server.KeyValue{} - - err := rows.Scan( - &event.KV.ModRevision, - &event.KV.Key, - &event.Create, - &event.Delete, - &event.KV.CreateRevision, - &event.PrevKV.ModRevision, - &event.KV.Lease, - &event.KV.Value, - &event.PrevKV.Value, - ) - if err != nil { - return err - } - - if event.Create { - event.KV.CreateRevision = event.KV.ModRevision - event.PrevKV = nil - } - - return nil -} - func (s *SQLLog) DbSize(ctx context.Context) (int64, error) { var err error ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.DbSize", otelName)) diff --git a/pkg/kine/server/types.go b/pkg/kine/server/types.go index 67544492..4d6f3611 100644 --- a/pkg/kine/server/types.go +++ b/pkg/kine/server/types.go @@ -20,7 +20,7 @@ type Backend interface { List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, bool, error) - Watch(ctx context.Context, key string, revision int64) <-chan []*Event + Watch(ctx context.Context, key string, revision int64) <-chan []Event DbSize(ctx context.Context) (int64, error) DoCompact(ctx context.Context) error } diff --git a/pkg/kine/server/watch.go b/pkg/kine/server/watch.go index 40c5d054..4991cfd3 100644 --- a/pkg/kine/server/watch.go +++ b/pkg/kine/server/watch.go @@ -77,15 +77,15 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) } if logrus.IsLevelEnabled(logrus.DebugLevel) { - for _, event := range events { - logrus.Debugf("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision) + for i := range events { + logrus.Debugf("WATCH READ id=%d, key=%s, revision=%d", id, events[i].KV.Key, events[i].KV.ModRevision) } } if err := w.server.Send(&etcdserverpb.WatchResponse{ Header: txnHeader(events[len(events)-1].KV.ModRevision), WatchId: id, - Events: toEvents(events...), + Events: toEvents(events), }); err != nil { w.Cancel(id, err) continue @@ -96,10 +96,10 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) }() } -func toEvents(events ...*Event) []*mvccpb.Event { +func toEvents(events []Event) []*mvccpb.Event { ret := make([]*mvccpb.Event, 0, len(events)) - for _, e := range events { - ret = append(ret, toEvent(e)) + for i := range events { + ret = append(ret, toEvent(&events[i])) } return ret }