From 476d9c83095a42fa8f1604ca1b6c61f7cea26f97 Mon Sep 17 00:00:00 2001 From: Siddharth Shah Date: Thu, 11 Jan 2024 17:45:24 +0530 Subject: [PATCH] [BACKPORT 2.20.1][#20428] CDCSDK: Fix addition of new tables to stream metadata after drop table Summary: Original commit: 201fda83f0dd4d312bdea2237f3a95e7c7448ae4 / D31577 **Problem**: After a table is dropped, state of cdcsdk stream changes from ACTIVE to DELETING_METADATA and remains in this state forever. For dynamic table addition, stream is required to be in the ACTIVE state. Due to this requirement, new tables created after drop table do not get added to stream metadata & cdc_state table **Fix**: In the dynamic table addition codepath, in addition to ACTIVE streams, also consider streams in DELETING_METADATA state. Primary diff for Dynamic table addition : [[ https://phorge.dev.yugabyte.com/D19909 |https://phorge.dev.yugabyte.com/D19909 ]] New test: Added UTs to verify addition of newly created tables after drop table in normal functioning as well as master restart. Added a test flag `cdcsdk_skip_processing_dynamic_table_addition` for testing master restart case. This flag will skip the finding & processing of newly added tables by background thread. Refer the primary diff for dynamic table addition for more details on master restart. Jira: DB-9428 Test Plan: Jenkins: test regex: .*CDCSDK* Reviewers: asrinivasan, skumar, stiwary, xCluster, hsunder Reviewed By: skumar Subscribers: ycdcxcluster, bogdan Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D31637 --- src/yb/integration-tests/cdcsdk_ysql-test.cc | 159 ++++++++++++++++++ .../cdcsdk_ysql_test_base.cc | 21 +++ .../integration-tests/cdcsdk_ysql_test_base.h | 5 + src/yb/master/catalog_manager_bg_tasks.cc | 42 +++-- src/yb/master/xrepl_catalog_manager.cc | 8 +- 5 files changed, 215 insertions(+), 20 deletions(-) diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index 37544d1aa854..c46f023dc4b1 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -1001,6 +1001,165 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestDropTableBeforeCDCStreamDelet ASSERT_EQ(DeleteCDCStream(stream_id), false); } +TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAddTableAfterDropTable)) { + // Setup cluster. + ASSERT_OK(SetUpWithParams(3, 3, false)); + + const vector table_list_suffix = {"_1", "_2", "_3", "_4"}; + const int kNumTables = 4; + vector table(kNumTables); + int idx = 0; + vector> tablets(kNumTables); + + while (idx < 3) { + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, + table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + ASSERT_OK(WriteEnumsRows( + 0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName, + kTableName)); + idx += 1; + } + auto stream_id = ASSERT_RESULT(CreateDBStream(EXPLICIT)); + SleepFor(MonoDelta::FromSeconds(2)); + DropTable(&test_cluster_, "test_table_1"); + + // Drop table will trigger the background thread to start the stream metadata cleanup, here + // wait for the metadata cleanup to finish by the background thread. + std::unordered_set expected_table_ids_after_drop = { + table[1].table_id(), table[2].table_id()}; + VerifyTablesInStreamMetadata( + stream_id, expected_table_ids_after_drop, "Waiting for stream metadata cleanup."); + + // create a new table and verify that it gets added to stream metadata. + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + + std::unordered_set expected_table_ids_after_create_table = + expected_table_ids_after_drop; + expected_table_ids_after_create_table.insert(table[idx].table_id()); + VerifyTablesInStreamMetadata( + stream_id, expected_table_ids_after_create_table, + "Waiting for GetDBStreamInfo after table creation."); + + // verify tablets of the new table are added to cdc_state table. + std::unordered_set expected_tablet_ids; + for (idx = 1; idx < 4; idx++) { + expected_tablet_ids.insert(tablets[idx].Get(0).tablet_id()); + } + + CDCStateTable cdc_state_table(test_client()); + Status s; + std::unordered_set tablets_found; + for (auto row_result : ASSERT_RESULT( + cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeCheckpoint(), &s))) { + ASSERT_OK(row_result); + auto& row = *row_result; + LOG(INFO) << "Read cdc_state table row with tablet_id: " << row.key.tablet_id + << " stream_id: " << row.key.stream_id + << " checkpoint: " << row.checkpoint->ToString(); + if (row.key.stream_id == stream_id) { + tablets_found.insert(row.key.tablet_id); + } + } + ASSERT_OK(s); + LOG(INFO) << "tablets found: " << AsString(tablets_found) + << ", expected tablets: " << AsString(expected_tablet_ids); + ASSERT_EQ(expected_tablet_ids, tablets_found); +} + +TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAddTableAfterDropTableAndMasterRestart)) { + // Setup cluster. + ASSERT_OK(SetUpWithParams(1, 1, false)); + const vector table_list_suffix = {"_1", "_2", "_3", "_4"}; + const int kNumTables = 4; + vector table(kNumTables); + int idx = 0; + vector> tablets(kNumTables); + + while (idx < 3) { + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, + table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + ASSERT_OK(WriteEnumsRows( + 0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName, + kTableName)); + idx += 1; + } + xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream(EXPLICIT)); + SleepFor(MonoDelta::FromSeconds(2)); + DropTable(&test_cluster_, "test_table_1"); + + // Drop table will trigger the background thread to start the stream metadata cleanup, here + // wait for the metadata cleanup to finish by the background thread. + std::unordered_set expected_table_ids_after_drop = { + table[1].table_id(), table[2].table_id()}; + VerifyTablesInStreamMetadata( + stream_id, expected_table_ids_after_drop, "Waiting for stream metadata cleanup."); + + // After metadata cleanup, skip processing any newly created table by bg thread. + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_skip_processing_dynamic_table_addition) = true; + + // create a new table and verify that it does not get added to stream metadata. + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + + SleepFor(MonoDelta::FromSeconds(2 * kTimeMultiplier)); + VerifyTablesInStreamMetadata( + stream_id, expected_table_ids_after_drop, + "Waiting for GetDBStreamInfo after table creation."); + + // Restart leader master to repopulate namespace_to_cdcsdk_unprocessed_table_map_ in-memory map. + auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster()); + ASSERT_OK(leader_master->Restart()); + LOG(INFO) << "Master Restarted"; + SleepFor(MonoDelta::FromSeconds(5)); + + // Enable processing of tables that are not part of cdc stream. + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_skip_processing_dynamic_table_addition) = false; + + // verify newly created table has been added to stream metadata. + std::unordered_set expected_table_ids_after_create_table = + expected_table_ids_after_drop; + expected_table_ids_after_create_table.insert(table[idx].table_id()); + VerifyTablesInStreamMetadata( + stream_id, expected_table_ids_after_create_table, + "Waiting for GetDBStreamInfo after master restart."); + + // verify tablets of the new table are added to cdc_state table. + std::unordered_set expected_tablet_ids; + for (idx = 1; idx < 4; idx++) { + expected_tablet_ids.insert(tablets[idx].Get(0).tablet_id()); + } + + CDCStateTable cdc_state_table(test_client()); + Status s; + std::unordered_set tablets_found; + for (auto row_result : ASSERT_RESULT( + cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeCheckpoint(), &s))) { + ASSERT_OK(row_result); + auto& row = *row_result; + LOG(INFO) << "Read cdc_state table row with tablet_id: " << row.key.tablet_id + << " stream_id: " << row.key.stream_id + << " checkpoint: " << row.checkpoint->ToString(); + if (row.key.stream_id == stream_id) { + tablets_found.insert(row.key.tablet_id); + } + } + ASSERT_OK(s); + LOG(INFO) << "tablets found: " << AsString(tablets_found) + << ", expected tablets: " << AsString(expected_tablet_ids); + ASSERT_EQ(expected_tablet_ids, tablets_found); +} + TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestDropTableBeforeXClusterStreamDelete)) { // Setup cluster. ASSERT_OK(SetUpWithParams(1, 1, false)); diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc index b487235a4654..1d1be4e4a04e 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc @@ -1802,6 +1802,27 @@ namespace cdc { return get_resp; } + void CDCSDKYsqlTest::VerifyTablesInStreamMetadata( + const xrepl::StreamId& stream_id, const std::unordered_set& expected_table_ids, + const std::string& timeout_msg) { + ASSERT_OK(WaitFor( + [&]() -> Result { + auto get_resp = GetDBStreamInfo(stream_id); + if (get_resp.ok() && !get_resp->has_error()) { + const uint64_t table_info_size = get_resp->table_info_size(); + if (table_info_size == expected_table_ids.size()) { + std::unordered_set table_ids; + for (auto entry : get_resp->table_info()) { + table_ids.insert(entry.table_id()); + } + if (expected_table_ids == table_ids) return true; + } + } + return false; + }, + MonoDelta::FromSeconds(60), timeout_msg)); + } + Status CDCSDKYsqlTest::ChangeLeaderOfTablet(size_t new_leader_index, const TabletId tablet_id) { CHECK(!FLAGS_enable_load_balancing); diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.h b/src/yb/integration-tests/cdcsdk_ysql_test_base.h index 5d578660f79e..4da544b4d5a9 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.h +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.h @@ -128,6 +128,7 @@ DECLARE_bool(cdc_populate_end_markers_transactions); DECLARE_uint64(cdc_stream_records_threshold_size_bytes); DECLARE_int64(cdc_resolve_intent_lag_threshold_ms); DECLARE_bool(enable_tablet_split_of_cdcsdk_streamed_tables); +DECLARE_bool(TEST_cdcsdk_skip_processing_dynamic_table_addition); namespace yb { @@ -460,6 +461,10 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { Result GetDBStreamInfo(const xrepl::StreamId db_stream_id); + void VerifyTablesInStreamMetadata( + const xrepl::StreamId& stream_id, const std::unordered_set& expected_table_ids, + const std::string& timeout_msg); + Status ChangeLeaderOfTablet(size_t new_leader_index, const TabletId tablet_id); Status CreateSnapshot(const NamespaceName& ns); diff --git a/src/yb/master/catalog_manager_bg_tasks.cc b/src/yb/master/catalog_manager_bg_tasks.cc index 8f8dbbfb568e..5a50f065ce3d 100644 --- a/src/yb/master/catalog_manager_bg_tasks.cc +++ b/src/yb/master/catalog_manager_bg_tasks.cc @@ -78,6 +78,9 @@ DEFINE_test_flag(bool, pause_catalog_manager_bg_loop_start, false, DEFINE_test_flag(bool, pause_catalog_manager_bg_loop_end, false, "Pause the bg tasks thread at the end of the loop."); +DEFINE_test_flag(bool, cdcsdk_skip_processing_dynamic_table_addition, false, + "Skip finding unprocessed tables for cdcsdk streams"); + DECLARE_bool(enable_ysql); DECLARE_bool(TEST_echo_service_enabled); @@ -261,23 +264,28 @@ void CatalogManagerBgTasks::Run() { } { - // Find if there have been any new tables added to any namespace with an active cdcsdk - // stream. - TableStreamIdsMap table_unprocessed_streams_map; - // In case of master leader restart of leadership changes, we will scan all streams for - // unprocessed tables, but from the second iteration onwards we will only consider the - // 'cdcsdk_unprocessed_tables' field of CDCStreamInfo object stored in the cdc_state_map. - Status s = - catalog_manager_->FindCDCSDKStreamsForAddedTables(&table_unprocessed_streams_map); - - if (s.ok() && !table_unprocessed_streams_map.empty()) { - s = catalog_manager_->ProcessNewTablesForCDCSDKStreams( - table_unprocessed_streams_map, l.epoch()); - } - if (!s.ok()) { - YB_LOG_EVERY_N(WARNING, 10) - << "Encountered failure while trying to add unprocessed tables to cdc_state table: " - << s.ToString(); + if (!FLAGS_TEST_cdcsdk_skip_processing_dynamic_table_addition) { + // Find if there have been any new tables added to any namespace with an active cdcsdk + // stream. + TableStreamIdsMap table_unprocessed_streams_map; + // In case of master leader restart of leadership changes, we will scan all streams for + // unprocessed tables, but from the second iteration onwards we will only consider the + // 'cdcsdk_unprocessed_tables' field of CDCStreamInfo object stored in the cdc_state_map. + Status s = + catalog_manager_->FindCDCSDKStreamsForAddedTables(&table_unprocessed_streams_map); + + if (s.ok() && !table_unprocessed_streams_map.empty()) { + s = catalog_manager_->ProcessNewTablesForCDCSDKStreams( + table_unprocessed_streams_map, l.epoch()); + } + if (!s.ok()) { + YB_LOG_EVERY_N(WARNING, 10) + << "Encountered failure while trying to add unprocessed tables to cdc_state table: " + << s.ToString(); + } + } else { + LOG(INFO) << "Skipping processing of dynamic table addition due to " + "cdcsdk_skip_processing_dynamic_table_addition being true"; } } diff --git a/src/yb/master/xrepl_catalog_manager.cc b/src/yb/master/xrepl_catalog_manager.cc index 8e180195aceb..ec243ebe33ee 100644 --- a/src/yb/master/xrepl_catalog_manager.cc +++ b/src/yb/master/xrepl_catalog_manager.cc @@ -255,8 +255,9 @@ class CDCStreamLoader : public Visitor { // For CDCSDK Streams, we scan all the tables in the namespace, and compare it with all the // tables associated with the stream. - if (metadata.state() == SysCDCStreamEntryPB::ACTIVE && ns && - ns->state() == SysNamespaceEntryPB::RUNNING) { + if ((metadata.state() == SysCDCStreamEntryPB::ACTIVE || + metadata.state() == SysCDCStreamEntryPB::DELETING_METADATA) && + ns && ns->state() == SysNamespaceEntryPB::RUNNING) { catalog_manager_->FindAllTablesMissingInCDCSDKStream( stream_id, metadata.table_id(), metadata.namespace_id()); } @@ -1029,7 +1030,8 @@ Status CatalogManager::FindCDCSDKStreamsForAddedTables( } auto ltm = stream_info->LockForRead(); - if (ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE) { + if (ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE || + ltm->pb.state() == SysCDCStreamEntryPB::DELETING_METADATA) { for (const auto& unprocessed_table_id : *unprocessed_tables) { auto table = tables_->FindTableOrNull(unprocessed_table_id); Schema schema;