Skip to content

Commit

Permalink
Cherry-pick aebc4b8 with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] authored and vitess-bot committed Feb 4, 2025
1 parent 81f09ff commit ed2f5f6
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 26 deletions.
13 changes: 9 additions & 4 deletions go/vt/vttablet/tabletmanager/shard_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,15 @@ func (tm *TabletManager) shardSyncLoop(ctx context.Context, notifyChan <-chan st
// We don't use the watch event except to know that we should
// re-read the shard record, and to know if the watch dies.
log.Info("Change in shard record")
if event.Err != nil {
// The watch failed. Stop it so we start a new one if needed.
log.Errorf("Shard watch failed: %v", event.Err)
shardWatch.stop()

if event != nil {
if event.Err != nil {
// The watch failed. Stop it so we start a new one if needed.
log.Errorf("Shard watch failed: %v", event.Err)
shardWatch.stop()
}
} else {
log.Infof("Got a nil event from the shard watcher for %s. This should not happen.", tm.tabletAlias)
}
case <-ctx.Done():
// Our context was cancelled. Terminate the loop.
Expand Down
5 changes: 1 addition & 4 deletions go/vt/vttablet/tabletmanager/vdiff/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type controller struct {
TableDiffPhaseTimings *stats.Timings
}

func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFactory func() binlogplayer.DBClient,
func newController(row sqltypes.RowNamedValues, dbClientFactory func() binlogplayer.DBClient,
ts *topo.Server, vde *Engine, options *tabletmanagerdata.VDiffOptions) (*controller, error) {

log.Infof("VDiff controller initializing for %+v", row)
Expand All @@ -104,9 +104,6 @@ func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFac
TableDiffRowCounts: stats.NewCountersWithSingleLabel("", "", "Rows"),
TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"),
}
ctx, ct.cancel = context.WithCancel(ctx)
go ct.run(ctx)

return ct, nil
}

Expand Down
22 changes: 8 additions & 14 deletions go/vt/vttablet/tabletmanager/vdiff/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,19 +146,21 @@ func (vde *Engine) openLocked(ctx context.Context) error {
vde.resetControllers()
}

globalStats.initControllerStats()

// At this point the tablet has no controllers running. So
// we want to start any VDiffs that have not been explicitly
// stopped or otherwise finished.
rows, err := vde.getVDiffsToRun(ctx)
if err != nil {
return err
}

vde.ctx, vde.cancel = context.WithCancel(ctx)
vde.isOpen = true // now we are open and have things to close
if err := vde.initControllers(rows); err != nil {
return err
}
vde.updateStats()

// At this point we've fully and successfully opened so begin
// retrying error'd VDiffs until the engine is closed.
Expand Down Expand Up @@ -212,7 +214,7 @@ func (vde *Engine) retry(ctx context.Context, err error) {
// addController creates a new controller using the given vdiff record and adds it to the engine.
// You must already have the main engine mutex (mu) locked before calling this.
func (vde *Engine) addController(row sqltypes.RowNamedValues, options *tabletmanagerdata.VDiffOptions) error {
ct, err := newController(vde.ctx, row, vde.dbClientFactoryDba, vde.ts, vde, options)
ct, err := newController(row, vde.dbClientFactoryDba, vde.ts, vde, options)
if err != nil {
return fmt.Errorf("controller could not be initialized for stream %+v on tablet %v",
row, vde.thisTablet.Alias)
Expand All @@ -221,6 +223,10 @@ func (vde *Engine) addController(row sqltypes.RowNamedValues, options *tabletman
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
globalStats.controllers[ct.id] = ct

controllerCtx, cancel := context.WithCancel(vde.ctx)
ct.cancel = cancel
go ct.run(controllerCtx)
return nil
}

Expand Down Expand Up @@ -395,16 +401,4 @@ func (vde *Engine) resetControllers() {
ct.Stop()
}
vde.controllers = make(map[int64]*controller)
vde.updateStats()
}

// updateStats must only be called while holding the engine lock.
func (vre *Engine) updateStats() {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()

globalStats.controllers = make(map[int64]*controller, len(vre.controllers))
for id, ct := range vre.controllers {
globalStats.controllers[id] = ct
}
}
25 changes: 25 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,32 @@ func (tvde *testVDiffEnv) createController(t *testing.T, id int) *controller {
fmt.Sprintf("%d|%s|%s|%s|%s|%s|%s|%s|", id, uuid.New(), tvde.workflow, tstenv.KeyspaceName, tstenv.ShardName, vdiffDBName, PendingState, optionsJS),
)
tvde.dbClient.ExpectRequest(fmt.Sprintf("select * from _vt.vdiff where id = %d", id), noResults, nil)
<<<<<<< HEAD

Check failure on line 675 in go/vt/vttablet/tabletmanager/vdiff/framework_test.go

View workflow job for this annotation

GitHub Actions / Code Coverage

expected statement, found '<<'

Check failure on line 675 in go/vt/vttablet/tabletmanager/vdiff/framework_test.go

View workflow job for this annotation

GitHub Actions / Code Coverage

expected statement, found '<<'
ct, err := newController(context.Background(), controllerQR.Named().Row(), tvde.dbClientFactory, tstenv.TopoServ, tvde.vde, tvde.opts)
require.NoError(t, err)
=======
ct := tvde.newController(t, controllerQR)
ct.sources = map[string]*migrationSource{
tstenv.ShardName: {
vrID: 1,
shardStreamer: &shardStreamer{
tablet: tvde.vde.thisTablet,
shard: tstenv.ShardName,
},
},
}
ct.sourceKeyspace = tstenv.KeyspaceName

return ct
}

func (tvde *testVDiffEnv) newController(t *testing.T, controllerQR *sqltypes.Result) *controller {
ctx := context.Background()
ct, err := newController(controllerQR.Named().Row(), tvde.dbClientFactory, tstenv.TopoServ, tvde.vde, tvde.opts)
require.NoError(t, err)
ctx2, cancel := context.WithCancel(ctx)
ct.cancel = cancel
go ct.run(ctx2)
>>>>>>> aebc4b82f9 (VDiff: fix race when a vdiff resumes on vttablet restart (#17638))
return ct
}
7 changes: 7 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,18 @@ type vdiffStats struct {
RowsDiffedCount *stats.Counter
}

func (vds *vdiffStats) initControllerStats() {
vds.mu.Lock()
defer vds.mu.Unlock()
vds.controllers = make(map[int64]*controller)
}

func (vds *vdiffStats) register() {
globalStats.Count = stats.NewGauge("", "")
globalStats.ErrorCount = stats.NewCounter("", "")
globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table")
globalStats.RowsDiffedCount = stats.NewCounter("", "")
globalStats.initControllerStats()

stats.NewGaugeFunc("VDiffCount", "Number of current vdiffs", vds.numControllers)

Expand Down
19 changes: 15 additions & 4 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package vdiff

import (
"context"
"fmt"
"strings"
"testing"
Expand Down Expand Up @@ -49,8 +48,22 @@ func TestBuildPlanSuccess(t *testing.T) {
)

vdiffenv.dbClient.ExpectRequest("select * from _vt.vdiff where id = 1", noResults, nil)
<<<<<<< HEAD
ct, err := newController(context.Background(), controllerQR.Named().Row(), vdiffenv.dbClientFactory, tstenv.TopoServ, vdiffenv.vde, vdiffenv.opts)
require.NoError(t, err)
=======
ct := vdenv.newController(t, controllerQR)
ct.sources = map[string]*migrationSource{
tstenv.ShardName: {
vrID: 1,
shardStreamer: &shardStreamer{
tablet: vdenv.vde.thisTablet,
shard: tstenv.ShardName,
},
},
}
ct.sourceKeyspace = tstenv.KeyspaceName
>>>>>>> aebc4b82f9 (VDiff: fix race when a vdiff resumes on vttablet restart (#17638))

testcases := []struct {
input *binlogdatapb.Rule
Expand Down Expand Up @@ -667,9 +680,7 @@ func TestBuildPlanFailure(t *testing.T) {
fmt.Sprintf("1|%s|%s|%s|%s|%s|%s|%s|", UUID, vdiffenv.workflow, tstenv.KeyspaceName, tstenv.ShardName, vdiffDBName, PendingState, optionsJS),
)
vdiffenv.dbClient.ExpectRequest("select * from _vt.vdiff where id = 1", noResults, nil)
ct, err := newController(context.Background(), controllerQR.Named().Row(), vdiffenv.dbClientFactory, tstenv.TopoServ, vdiffenv.vde, vdiffenv.opts)
require.NoError(t, err)

ct := vdenv.newController(t, controllerQR)
testcases := []struct {
input *binlogdatapb.Rule
err string
Expand Down

0 comments on commit ed2f5f6

Please sign in to comment.