Skip to content

Commit

Permalink
Use channels for batch queue changes
Browse files Browse the repository at this point in the history
  • Loading branch information
bschimke95 committed Oct 19, 2023
1 parent 9a884b8 commit 4f0d572
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ type Generic struct {
TranslateErr TranslateErr
ErrCode ErrCode
flushCh chan struct{}
addToBatchQueueCh chan BatchedInsert
removeFromBatchQueueCh chan int
batchingQueue []BatchedInsert

AdmissionControlPolicy AdmissionControlPolicy
Expand Down Expand Up @@ -394,9 +396,28 @@ func (d *Generic) queryRowPrepared(ctx context.Context, txName, sql string, prep
func (d *Generic) InitializeWriteBatching(ctx context.Context) {
if d.BatchingEnabled && d.BatchingInterval > 0 && d.BatchingMaxQueries > 0 {
d.flushCh = make(chan struct{})
d.addToBatchQueueCh = make(chan BatchedInsert)
d.removeFromBatchQueueCh = make(chan int)
d.batchingQueue = make([]BatchedInsert, 0, d.BatchingMaxQueries)

go d.watchBatchQueue(ctx)
go d.batchQueueWatcher(ctx)
}
}

func (d *Generic) batchQueueWatcher(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case item := <-d.addToBatchQueueCh:
d.batchingQueue = append(d.batchingQueue, item)
if len(d.batchingQueue) == d.BatchingMaxQueries {
d.flushCh <- struct{}{}
}
case count := <-d.removeFromBatchQueueCh:
d.batchingQueue = d.batchingQueue[count:]
}
}
}

Expand Down Expand Up @@ -452,7 +473,6 @@ func (d *Generic) processBatchQueue(ctx context.Context) {
copy(dcq, d.batchingQueue)

ids, err := d.processBatchedInserts(ctx, dcq)

if err != nil {
logrus.WithError(err).Error("Process batch queue error")
return
Expand All @@ -462,7 +482,9 @@ func (d *Generic) processBatchQueue(ctx context.Context) {
insertItem.retCh <- ids[i]
}

d.batchingQueue = make([]BatchedInsert, 0, d.BatchingMaxQueries)
d.removeFromBatchQueueCh <- len(dcq)
} else {
fmt.Printf("Queue is empty: %d\n", len(d.batchingQueue))
}
}

Expand All @@ -482,10 +504,7 @@ func (d *Generic) watchBatchQueue(ctx context.Context) {

func (d *Generic) batchQueryRowPrepared(ctx context.Context, txName, sqli string, prepared *sql.Stmt, args ...interface{}) (id int64) {
ret := make(chan int64)
d.batchingQueue = append(d.batchingQueue, BatchedInsert{retCh: ret, args: args})
if len(d.batchingQueue) == d.BatchingMaxQueries {
d.flushCh <- struct{}{}
}
d.addToBatchQueueCh <- BatchedInsert{retCh: ret, args: args}
return <-ret
}

Expand Down

0 comments on commit 4f0d572

Please sign in to comment.