Skip to content

Commit

Permalink
[BACKPORT 2.20.1][#20428] CDCSDK: Fix addition of new tables to strea…
Browse files Browse the repository at this point in the history
…m metadata after drop table

Summary:
Original commit: 201fda8 / D31577

**Problem**:
After a table is dropped, state of cdcsdk stream changes from ACTIVE to DELETING_METADATA and remains in this state forever. For dynamic table addition, stream is required to be in the ACTIVE state. Due to this requirement, new tables created after drop table do not get added to stream metadata & cdc_state table

**Fix**:
In the dynamic table addition codepath, in addition to ACTIVE streams, also consider streams in DELETING_METADATA state.

Primary diff for Dynamic table addition : [[ https://phorge.dev.yugabyte.com/D19909 |https://phorge.dev.yugabyte.com/D19909 ]]

New test:
Added UTs to verify addition of newly created tables after drop table in normal functioning as well as master restart.
Added a test flag  `cdcsdk_skip_processing_dynamic_table_addition` for testing master restart case. This flag will skip the finding & processing of newly added tables by background thread. Refer the primary diff for dynamic table addition for more details on master restart.

Jira: DB-9428

Test Plan: Jenkins: test regex: .*CDCSDK*

Reviewers: asrinivasan, skumar, stiwary, xCluster, hsunder

Reviewed By: skumar

Subscribers: ycdcxcluster, bogdan

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D31637
  • Loading branch information
siddharth2411 committed Jan 11, 2024
1 parent 7eacf1c commit 476d9c8
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 20 deletions.
159 changes: 159 additions & 0 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,165 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestDropTableBeforeCDCStreamDelet
ASSERT_EQ(DeleteCDCStream(stream_id), false);
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAddTableAfterDropTable)) {
// Setup cluster.
ASSERT_OK(SetUpWithParams(3, 3, false));

const vector<string> table_list_suffix = {"_1", "_2", "_3", "_4"};
const int kNumTables = 4;
vector<YBTableName> table(kNumTables);
int idx = 0;
vector<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> tablets(kNumTables);

while (idx < 3) {
table[idx] = ASSERT_RESULT(CreateTable(
&test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true,
table_list_suffix[idx]));
ASSERT_OK(test_client()->GetTablets(
table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr));
ASSERT_OK(WriteEnumsRows(
0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName,
kTableName));
idx += 1;
}
auto stream_id = ASSERT_RESULT(CreateDBStream(EXPLICIT));
SleepFor(MonoDelta::FromSeconds(2));
DropTable(&test_cluster_, "test_table_1");

// Drop table will trigger the background thread to start the stream metadata cleanup, here
// wait for the metadata cleanup to finish by the background thread.
std::unordered_set<std::string> expected_table_ids_after_drop = {
table[1].table_id(), table[2].table_id()};
VerifyTablesInStreamMetadata(
stream_id, expected_table_ids_after_drop, "Waiting for stream metadata cleanup.");

// create a new table and verify that it gets added to stream metadata.
table[idx] = ASSERT_RESULT(CreateTable(
&test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx]));
ASSERT_OK(test_client()->GetTablets(
table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr));

std::unordered_set<std::string> expected_table_ids_after_create_table =
expected_table_ids_after_drop;
expected_table_ids_after_create_table.insert(table[idx].table_id());
VerifyTablesInStreamMetadata(
stream_id, expected_table_ids_after_create_table,
"Waiting for GetDBStreamInfo after table creation.");

// verify tablets of the new table are added to cdc_state table.
std::unordered_set<std::string> expected_tablet_ids;
for (idx = 1; idx < 4; idx++) {
expected_tablet_ids.insert(tablets[idx].Get(0).tablet_id());
}

CDCStateTable cdc_state_table(test_client());
Status s;
std::unordered_set<TabletId> tablets_found;
for (auto row_result : ASSERT_RESULT(
cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeCheckpoint(), &s))) {
ASSERT_OK(row_result);
auto& row = *row_result;
LOG(INFO) << "Read cdc_state table row with tablet_id: " << row.key.tablet_id
<< " stream_id: " << row.key.stream_id
<< " checkpoint: " << row.checkpoint->ToString();
if (row.key.stream_id == stream_id) {
tablets_found.insert(row.key.tablet_id);
}
}
ASSERT_OK(s);
LOG(INFO) << "tablets found: " << AsString(tablets_found)
<< ", expected tablets: " << AsString(expected_tablet_ids);
ASSERT_EQ(expected_tablet_ids, tablets_found);
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAddTableAfterDropTableAndMasterRestart)) {
// Setup cluster.
ASSERT_OK(SetUpWithParams(1, 1, false));
const vector<string> table_list_suffix = {"_1", "_2", "_3", "_4"};
const int kNumTables = 4;
vector<YBTableName> table(kNumTables);
int idx = 0;
vector<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> tablets(kNumTables);

while (idx < 3) {
table[idx] = ASSERT_RESULT(CreateTable(
&test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true,
table_list_suffix[idx]));
ASSERT_OK(test_client()->GetTablets(
table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr));
ASSERT_OK(WriteEnumsRows(
0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName,
kTableName));
idx += 1;
}
xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream(EXPLICIT));
SleepFor(MonoDelta::FromSeconds(2));
DropTable(&test_cluster_, "test_table_1");

// Drop table will trigger the background thread to start the stream metadata cleanup, here
// wait for the metadata cleanup to finish by the background thread.
std::unordered_set<std::string> expected_table_ids_after_drop = {
table[1].table_id(), table[2].table_id()};
VerifyTablesInStreamMetadata(
stream_id, expected_table_ids_after_drop, "Waiting for stream metadata cleanup.");

// After metadata cleanup, skip processing any newly created table by bg thread.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_skip_processing_dynamic_table_addition) = true;

// create a new table and verify that it does not get added to stream metadata.
table[idx] = ASSERT_RESULT(CreateTable(
&test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx]));
ASSERT_OK(test_client()->GetTablets(
table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr));

SleepFor(MonoDelta::FromSeconds(2 * kTimeMultiplier));
VerifyTablesInStreamMetadata(
stream_id, expected_table_ids_after_drop,
"Waiting for GetDBStreamInfo after table creation.");

// Restart leader master to repopulate namespace_to_cdcsdk_unprocessed_table_map_ in-memory map.
auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster());
ASSERT_OK(leader_master->Restart());
LOG(INFO) << "Master Restarted";
SleepFor(MonoDelta::FromSeconds(5));

// Enable processing of tables that are not part of cdc stream.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_skip_processing_dynamic_table_addition) = false;

// verify newly created table has been added to stream metadata.
std::unordered_set<std::string> expected_table_ids_after_create_table =
expected_table_ids_after_drop;
expected_table_ids_after_create_table.insert(table[idx].table_id());
VerifyTablesInStreamMetadata(
stream_id, expected_table_ids_after_create_table,
"Waiting for GetDBStreamInfo after master restart.");

// verify tablets of the new table are added to cdc_state table.
std::unordered_set<std::string> expected_tablet_ids;
for (idx = 1; idx < 4; idx++) {
expected_tablet_ids.insert(tablets[idx].Get(0).tablet_id());
}

CDCStateTable cdc_state_table(test_client());
Status s;
std::unordered_set<TabletId> tablets_found;
for (auto row_result : ASSERT_RESULT(
cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeCheckpoint(), &s))) {
ASSERT_OK(row_result);
auto& row = *row_result;
LOG(INFO) << "Read cdc_state table row with tablet_id: " << row.key.tablet_id
<< " stream_id: " << row.key.stream_id
<< " checkpoint: " << row.checkpoint->ToString();
if (row.key.stream_id == stream_id) {
tablets_found.insert(row.key.tablet_id);
}
}
ASSERT_OK(s);
LOG(INFO) << "tablets found: " << AsString(tablets_found)
<< ", expected tablets: " << AsString(expected_tablet_ids);
ASSERT_EQ(expected_tablet_ids, tablets_found);
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestDropTableBeforeXClusterStreamDelete)) {
// Setup cluster.
ASSERT_OK(SetUpWithParams(1, 1, false));
Expand Down
21 changes: 21 additions & 0 deletions src/yb/integration-tests/cdcsdk_ysql_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1802,6 +1802,27 @@ namespace cdc {
return get_resp;
}

void CDCSDKYsqlTest::VerifyTablesInStreamMetadata(
const xrepl::StreamId& stream_id, const std::unordered_set<std::string>& expected_table_ids,
const std::string& timeout_msg) {
ASSERT_OK(WaitFor(
[&]() -> Result<bool> {
auto get_resp = GetDBStreamInfo(stream_id);
if (get_resp.ok() && !get_resp->has_error()) {
const uint64_t table_info_size = get_resp->table_info_size();
if (table_info_size == expected_table_ids.size()) {
std::unordered_set<std::string> table_ids;
for (auto entry : get_resp->table_info()) {
table_ids.insert(entry.table_id());
}
if (expected_table_ids == table_ids) return true;
}
}
return false;
},
MonoDelta::FromSeconds(60), timeout_msg));
}

Status CDCSDKYsqlTest::ChangeLeaderOfTablet(size_t new_leader_index, const TabletId tablet_id) {
CHECK(!FLAGS_enable_load_balancing);

Expand Down
5 changes: 5 additions & 0 deletions src/yb/integration-tests/cdcsdk_ysql_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ DECLARE_bool(cdc_populate_end_markers_transactions);
DECLARE_uint64(cdc_stream_records_threshold_size_bytes);
DECLARE_int64(cdc_resolve_intent_lag_threshold_ms);
DECLARE_bool(enable_tablet_split_of_cdcsdk_streamed_tables);
DECLARE_bool(TEST_cdcsdk_skip_processing_dynamic_table_addition);

namespace yb {

Expand Down Expand Up @@ -460,6 +461,10 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {

Result<GetCDCDBStreamInfoResponsePB> GetDBStreamInfo(const xrepl::StreamId db_stream_id);

void VerifyTablesInStreamMetadata(
const xrepl::StreamId& stream_id, const std::unordered_set<std::string>& expected_table_ids,
const std::string& timeout_msg);

Status ChangeLeaderOfTablet(size_t new_leader_index, const TabletId tablet_id);

Status CreateSnapshot(const NamespaceName& ns);
Expand Down
42 changes: 25 additions & 17 deletions src/yb/master/catalog_manager_bg_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ DEFINE_test_flag(bool, pause_catalog_manager_bg_loop_start, false,
DEFINE_test_flag(bool, pause_catalog_manager_bg_loop_end, false,
"Pause the bg tasks thread at the end of the loop.");

DEFINE_test_flag(bool, cdcsdk_skip_processing_dynamic_table_addition, false,
"Skip finding unprocessed tables for cdcsdk streams");

DECLARE_bool(enable_ysql);
DECLARE_bool(TEST_echo_service_enabled);

Expand Down Expand Up @@ -261,23 +264,28 @@ void CatalogManagerBgTasks::Run() {
}

{
// Find if there have been any new tables added to any namespace with an active cdcsdk
// stream.
TableStreamIdsMap table_unprocessed_streams_map;
// In case of master leader restart of leadership changes, we will scan all streams for
// unprocessed tables, but from the second iteration onwards we will only consider the
// 'cdcsdk_unprocessed_tables' field of CDCStreamInfo object stored in the cdc_state_map.
Status s =
catalog_manager_->FindCDCSDKStreamsForAddedTables(&table_unprocessed_streams_map);

if (s.ok() && !table_unprocessed_streams_map.empty()) {
s = catalog_manager_->ProcessNewTablesForCDCSDKStreams(
table_unprocessed_streams_map, l.epoch());
}
if (!s.ok()) {
YB_LOG_EVERY_N(WARNING, 10)
<< "Encountered failure while trying to add unprocessed tables to cdc_state table: "
<< s.ToString();
if (!FLAGS_TEST_cdcsdk_skip_processing_dynamic_table_addition) {
// Find if there have been any new tables added to any namespace with an active cdcsdk
// stream.
TableStreamIdsMap table_unprocessed_streams_map;
// In case of master leader restart of leadership changes, we will scan all streams for
// unprocessed tables, but from the second iteration onwards we will only consider the
// 'cdcsdk_unprocessed_tables' field of CDCStreamInfo object stored in the cdc_state_map.
Status s =
catalog_manager_->FindCDCSDKStreamsForAddedTables(&table_unprocessed_streams_map);

if (s.ok() && !table_unprocessed_streams_map.empty()) {
s = catalog_manager_->ProcessNewTablesForCDCSDKStreams(
table_unprocessed_streams_map, l.epoch());
}
if (!s.ok()) {
YB_LOG_EVERY_N(WARNING, 10)
<< "Encountered failure while trying to add unprocessed tables to cdc_state table: "
<< s.ToString();
}
} else {
LOG(INFO) << "Skipping processing of dynamic table addition due to "
"cdcsdk_skip_processing_dynamic_table_addition being true";
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/yb/master/xrepl_catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,9 @@ class CDCStreamLoader : public Visitor<PersistentCDCStreamInfo> {

// For CDCSDK Streams, we scan all the tables in the namespace, and compare it with all the
// tables associated with the stream.
if (metadata.state() == SysCDCStreamEntryPB::ACTIVE && ns &&
ns->state() == SysNamespaceEntryPB::RUNNING) {
if ((metadata.state() == SysCDCStreamEntryPB::ACTIVE ||
metadata.state() == SysCDCStreamEntryPB::DELETING_METADATA) &&
ns && ns->state() == SysNamespaceEntryPB::RUNNING) {
catalog_manager_->FindAllTablesMissingInCDCSDKStream(
stream_id, metadata.table_id(), metadata.namespace_id());
}
Expand Down Expand Up @@ -1029,7 +1030,8 @@ Status CatalogManager::FindCDCSDKStreamsForAddedTables(
}

auto ltm = stream_info->LockForRead();
if (ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE) {
if (ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE ||
ltm->pb.state() == SysCDCStreamEntryPB::DELETING_METADATA) {
for (const auto& unprocessed_table_id : *unprocessed_tables) {
auto table = tables_->FindTableOrNull(unprocessed_table_id);
Schema schema;
Expand Down

0 comments on commit 476d9c8

Please sign in to comment.