Skip to content

Commit

Permalink
Filter all binlogs of the dropped table during partial snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Dec 18, 2024
1 parent 4d1b3ab commit c58d75e
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 12 deletions.
21 changes: 17 additions & 4 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,22 @@ func (j *Job) handlePartialSyncTableNotFound() error {

if dropped, err := j.isTableDropped(tableId); err != nil {
return err
} else if dropped && j.SyncType == TableSync {
return xerror.Errorf(xerror.Normal, "table sync but table %s has been dropped, table id %d",
table, tableId)
} else if dropped {
// skip this partial sync because table has been dropped
log.Warnf("skip this partial sync because table %s has been dropped, table id: %d", table, tableId)
nextCommitSeq := j.progress.CommitSeq
if j.SyncType == DBSync {
j.progress.NextWithPersist(nextCommitSeq, DBTablesIncrementalSync, Done, "")
} else {
j.progress.NextWithPersist(nextCommitSeq, TableIncrementalSync, Done, "")
// Since we don't know the commit seq of the drop table binlog, we set it to the max value to
// skip all binlogs.
//
// FIXME: it will skip drop table binlog too.
if len(j.progress.TableCommitSeqMap) == 0 {
j.progress.TableCommitSeqMap = make(map[int64]int64)
}
j.progress.TableCommitSeqMap[tableId] = math.MaxInt64
j.progress.NextWithPersist(nextCommitSeq, DBTablesIncrementalSync, Done, "")
return nil
} else if newTableName, err := j.srcMeta.GetTableNameById(tableId); err != nil {
return err
Expand Down Expand Up @@ -1931,11 +1938,17 @@ func (j *Job) handleDropTable(binlog *festruct.TBinlog) error {
if _, ok := j.progress.TableMapping[dropTable.TableId]; !ok {
log.Warnf("the dest table is not found, skip drop table binlog, src table id: %d, commit seq: %d",
dropTable.TableId, binlog.GetCommitSeq())
// So that the sync state would convert to DBIncrementalSync,
// see handlePartialSyncTableNotFound for details.
delete(j.progress.TableCommitSeqMap, dropTable.TableId)
return nil
}
}

if j.isBinlogCommitted(dropTable.TableId, binlog.GetCommitSeq()) {
// So that the sync state would convert to DBIncrementalSync,
// see handlePartialSyncTableNotFound for details.
delete(j.progress.TableCommitSeqMap, dropTable.TableId)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.

suite("test_cds_tbl_alter_drop") {
suite('test_cds_tbl_alter_drop') {
def helper = new GroovyShell(new Binding(['suite': delegate]))
.evaluate(new File("${context.config.suitePath}/../common", "helper.groovy"))
.evaluate(new File("${context.config.suitePath}/../common", 'helper.groovy'))

if (!helper.is_version_supported([30003, 20108, 20016])) {
// at least doris 3.0.3, 2.1.8 and doris 2.0.16
Expand All @@ -26,17 +26,20 @@ suite("test_cds_tbl_alter_drop") {
return
}

def oldTableName = "tbl_old_" + helper.randomSuffix()
def newTableName = "tbl_new_" + helper.randomSuffix()
def oldTableName = 'tbl_old_' + helper.randomSuffix()
def newTableName = 'tbl_new_' + helper.randomSuffix()

def exist = { res -> Boolean
return res.size() != 0
}
def notExist = { res -> Boolean
return res.size() == 0
}
def has_count = { count -> { res -> Boolean
return res.size() == count
} }

logger.info("=== Create a fake table ===")
logger.info('=== Create a fake table ===')
sql """
CREATE TABLE if NOT EXISTS ${oldTableName}_fake
(
Expand Down Expand Up @@ -66,7 +69,7 @@ suite("test_cds_tbl_alter_drop") {

assertTrue(helper.checkRestoreFinishTimesOf("${oldTableName}_fake", 60))

logger.info(" ==== create table and drop ==== ")
logger.info(' ==== create table and drop ==== ')

def first_job_progress = helper.get_job_progress()

Expand Down Expand Up @@ -105,14 +108,24 @@ suite("test_cds_tbl_alter_drop") {
""",
exist, 30))

sql "INSERT INTO ${oldTableName} VALUES (5, 500, 1)"
// All binlogs of the dropped table should be ignored.
sql "ALTER TABLE ${oldTableName} ADD COLUMN `value_col` INT DEFAULT \"0\""

assertTrue(helper.checkShowTimesOf("""
SHOW ALTER TABLE COLUMN
FROM ${context.dbName}
WHERE TableName = "${oldTableName}" AND State = "FINISHED"
""",
has_count(2), 30))

sql "INSERT INTO ${oldTableName} VALUES (5, 500, 1, 2)"
sql "DROP TABLE ${oldTableName} FORCE"
sql "INSERT INTO ${oldTableName}_fake VALUES (5, 500)"

helper.ccrJobResume()

assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${oldTableName}_fake", 1, 60))
assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${oldTableName}\"", notExist, 60, "target"))
assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${oldTableName}\"", notExist, 60, 'target'))

// no fullsync are triggered
def last_job_progress = helper.get_job_progress()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite('test_cds_tbl_idx_inverted_drop') {
def helper = new GroovyShell(new Binding(['suite': delegate]))
.evaluate(new File("${context.config.suitePath}/../common", 'helper.groovy'))

if (!helper.is_version_supported([30003, 20108, 20016])) {
// at least doris 3.0.3, 2.1.8 and doris 2.0.16
def version = helper.upstream_version()
logger.info("skip this suite because version is not supported, upstream version ${version}")
return
}

def tableName = 'tbl_' + helper.randomSuffix()

def exist = { res -> Boolean
return res.size() != 0
}
def notExist = { res -> Boolean
return res.size() == 0
}
def has_count = { count -> { res -> Boolean
return res.size() == count
} }

logger.info('=== Create a fake table ===')
sql """
CREATE TABLE if NOT EXISTS ${tableName}_fake
(
`test` INT,
`id` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
PARTITION BY RANGE(`id`)
(
PARTITION `p1` VALUES LESS THAN ("0"),
PARTITION `p2` VALUES LESS THAN ("100"),
PARTITION `p3` VALUES LESS THAN ("200"),
PARTITION `p4` VALUES LESS THAN ("300"),
PARTITION `p5` VALUES LESS THAN ("1000")
)
DISTRIBUTED BY HASH(id) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"binlog.enable" = "true"
)
"""

helper.enableDbBinlog()
helper.ccrJobDelete()
helper.ccrJobCreate()

assertTrue(helper.checkRestoreFinishTimesOf("${tableName}_fake", 60))

logger.info(' ==== create table and drop ==== ')

def first_job_progress = helper.get_job_progress()

helper.ccrJobPause()

sql """
CREATE TABLE if NOT EXISTS ${tableName}
(
`test` INT,
`id` INT,
`value` STRING NULL
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
PARTITION BY RANGE(`id`)
(
PARTITION `p1` VALUES LESS THAN ("0"),
PARTITION `p2` VALUES LESS THAN ("100"),
PARTITION `p3` VALUES LESS THAN ("200"),
PARTITION `p4` VALUES LESS THAN ("300"),
PARTITION `p5` VALUES LESS THAN ("1000")
)
DISTRIBUTED BY HASH(id) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"binlog.enable" = "true"
)
"""

sql "INSERT INTO ${tableName} VALUES (1, 100, ''), (100, 1, ''), (2, 200, ''), (200, 2, '')"

logger.info('=== add inverted index ===')
sql """
CREATE INDEX idx_inverted ON ${tableName} (value) USING INVERTED
"""
sql 'sync'

assertTrue(helper.checkShowTimesOf("""
SHOW ALTER TABLE COLUMN
FROM ${context.dbName}
WHERE TableName = "${tableName}" AND State = "FINISHED"
""",
has_count(1), 30))

sql """ INSERT INTO ${tableName} VALUES (1, 1, "1") """

def show_indexes_result = sql "show indexes from ${tableName}"
logger.info("show indexes: ${show_indexes_result}")

// The drop index will be ignored since table is dropped
sql """
DROP INDEX idx_inverted ON ${tableName}
"""
sql 'sync'

assertTrue(helper.checkShowTimesOf("""
SHOW ALTER TABLE COLUMN
FROM ${context.dbName}
WHERE TableName = "${tableName}" AND State = "FINISHED"
""",
has_count(2), 30))

show_indexes_result = sql "show indexes from ${tableName}"
logger.info("show indexes: ${show_indexes_result}")

sql "INSERT INTO ${tableName} VALUES (5, 500, 'test')"
sql "DROP TABLE ${tableName} FORCE"
sql "INSERT INTO ${tableName}_fake VALUES (5, 500)"

helper.ccrJobResume()

assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableName}_fake", 1, 60))
assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${tableName}\"", notExist, 60, 'target'))

// no fullsync are triggered
def last_job_progress = helper.get_job_progress()
assertTrue(last_job_progress.full_sync_start_at == first_job_progress.full_sync_start_at)
}

0 comments on commit c58d75e

Please sign in to comment.