diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index 37544d1aa854..c46f023dc4b1 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -1001,6 +1001,165 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestDropTableBeforeCDCStreamDelet ASSERT_EQ(DeleteCDCStream(stream_id), false); } +TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAddTableAfterDropTable)) { + // Setup cluster. + ASSERT_OK(SetUpWithParams(3, 3, false)); + + const vector table_list_suffix = {"_1", "_2", "_3", "_4"}; + const int kNumTables = 4; + vector table(kNumTables); + int idx = 0; + vector> tablets(kNumTables); + + while (idx < 3) { + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, + table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + ASSERT_OK(WriteEnumsRows( + 0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName, + kTableName)); + idx += 1; + } + auto stream_id = ASSERT_RESULT(CreateDBStream(EXPLICIT)); + SleepFor(MonoDelta::FromSeconds(2)); + DropTable(&test_cluster_, "test_table_1"); + + // Drop table will trigger the background thread to start the stream metadata cleanup, here + // wait for the metadata cleanup to finish by the background thread. + std::unordered_set expected_table_ids_after_drop = { + table[1].table_id(), table[2].table_id()}; + VerifyTablesInStreamMetadata( + stream_id, expected_table_ids_after_drop, "Waiting for stream metadata cleanup."); + + // create a new table and verify that it gets added to stream metadata. + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + + std::unordered_set expected_table_ids_after_create_table = + expected_table_ids_after_drop; + expected_table_ids_after_create_table.insert(table[idx].table_id()); + VerifyTablesInStreamMetadata( + stream_id, expected_table_ids_after_create_table, + "Waiting for GetDBStreamInfo after table creation."); + + // verify tablets of the new table are added to cdc_state table. + std::unordered_set expected_tablet_ids; + for (idx = 1; idx < 4; idx++) { + expected_tablet_ids.insert(tablets[idx].Get(0).tablet_id()); + } + + CDCStateTable cdc_state_table(test_client()); + Status s; + std::unordered_set tablets_found; + for (auto row_result : ASSERT_RESULT( + cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeCheckpoint(), &s))) { + ASSERT_OK(row_result); + auto& row = *row_result; + LOG(INFO) << "Read cdc_state table row with tablet_id: " << row.key.tablet_id + << " stream_id: " << row.key.stream_id + << " checkpoint: " << row.checkpoint->ToString(); + if (row.key.stream_id == stream_id) { + tablets_found.insert(row.key.tablet_id); + } + } + ASSERT_OK(s); + LOG(INFO) << "tablets found: " << AsString(tablets_found) + << ", expected tablets: " << AsString(expected_tablet_ids); + ASSERT_EQ(expected_tablet_ids, tablets_found); +} + +TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAddTableAfterDropTableAndMasterRestart)) { + // Setup cluster. + ASSERT_OK(SetUpWithParams(1, 1, false)); + const vector table_list_suffix = {"_1", "_2", "_3", "_4"}; + const int kNumTables = 4; + vector table(kNumTables); + int idx = 0; + vector> tablets(kNumTables); + + while (idx < 3) { + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, + table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + ASSERT_OK(WriteEnumsRows( + 0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName, + kTableName)); + idx += 1; + } + xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream(EXPLICIT)); + SleepFor(MonoDelta::FromSeconds(2)); + DropTable(&test_cluster_, "test_table_1"); + + // Drop table will trigger the background thread to start the stream metadata cleanup, here + // wait for the metadata cleanup to finish by the background thread. + std::unordered_set expected_table_ids_after_drop = { + table[1].table_id(), table[2].table_id()}; + VerifyTablesInStreamMetadata( + stream_id, expected_table_ids_after_drop, "Waiting for stream metadata cleanup."); + + // After metadata cleanup, skip processing any newly created table by bg thread. + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_skip_processing_dynamic_table_addition) = true; + + // create a new table and verify that it does not get added to stream metadata. + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + + SleepFor(MonoDelta::FromSeconds(2 * kTimeMultiplier)); + VerifyTablesInStreamMetadata( + stream_id, expected_table_ids_after_drop, + "Waiting for GetDBStreamInfo after table creation."); + + // Restart leader master to repopulate namespace_to_cdcsdk_unprocessed_table_map_ in-memory map. + auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster()); + ASSERT_OK(leader_master->Restart()); + LOG(INFO) << "Master Restarted"; + SleepFor(MonoDelta::FromSeconds(5)); + + // Enable processing of tables that are not part of cdc stream. + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_skip_processing_dynamic_table_addition) = false; + + // verify newly created table has been added to stream metadata. + std::unordered_set expected_table_ids_after_create_table = + expected_table_ids_after_drop; + expected_table_ids_after_create_table.insert(table[idx].table_id()); + VerifyTablesInStreamMetadata( + stream_id, expected_table_ids_after_create_table, + "Waiting for GetDBStreamInfo after master restart."); + + // verify tablets of the new table are added to cdc_state table. + std::unordered_set expected_tablet_ids; + for (idx = 1; idx < 4; idx++) { + expected_tablet_ids.insert(tablets[idx].Get(0).tablet_id()); + } + + CDCStateTable cdc_state_table(test_client()); + Status s; + std::unordered_set tablets_found; + for (auto row_result : ASSERT_RESULT( + cdc_state_table.GetTableRange(CDCStateTableEntrySelector().IncludeCheckpoint(), &s))) { + ASSERT_OK(row_result); + auto& row = *row_result; + LOG(INFO) << "Read cdc_state table row with tablet_id: " << row.key.tablet_id + << " stream_id: " << row.key.stream_id + << " checkpoint: " << row.checkpoint->ToString(); + if (row.key.stream_id == stream_id) { + tablets_found.insert(row.key.tablet_id); + } + } + ASSERT_OK(s); + LOG(INFO) << "tablets found: " << AsString(tablets_found) + << ", expected tablets: " << AsString(expected_tablet_ids); + ASSERT_EQ(expected_tablet_ids, tablets_found); +} + TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestDropTableBeforeXClusterStreamDelete)) { // Setup cluster. ASSERT_OK(SetUpWithParams(1, 1, false)); diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc index b487235a4654..1d1be4e4a04e 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc @@ -1802,6 +1802,27 @@ namespace cdc { return get_resp; } + void CDCSDKYsqlTest::VerifyTablesInStreamMetadata( + const xrepl::StreamId& stream_id, const std::unordered_set& expected_table_ids, + const std::string& timeout_msg) { + ASSERT_OK(WaitFor( + [&]() -> Result { + auto get_resp = GetDBStreamInfo(stream_id); + if (get_resp.ok() && !get_resp->has_error()) { + const uint64_t table_info_size = get_resp->table_info_size(); + if (table_info_size == expected_table_ids.size()) { + std::unordered_set table_ids; + for (auto entry : get_resp->table_info()) { + table_ids.insert(entry.table_id()); + } + if (expected_table_ids == table_ids) return true; + } + } + return false; + }, + MonoDelta::FromSeconds(60), timeout_msg)); + } + Status CDCSDKYsqlTest::ChangeLeaderOfTablet(size_t new_leader_index, const TabletId tablet_id) { CHECK(!FLAGS_enable_load_balancing); diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.h b/src/yb/integration-tests/cdcsdk_ysql_test_base.h index 5d578660f79e..4da544b4d5a9 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.h +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.h @@ -128,6 +128,7 @@ DECLARE_bool(cdc_populate_end_markers_transactions); DECLARE_uint64(cdc_stream_records_threshold_size_bytes); DECLARE_int64(cdc_resolve_intent_lag_threshold_ms); DECLARE_bool(enable_tablet_split_of_cdcsdk_streamed_tables); +DECLARE_bool(TEST_cdcsdk_skip_processing_dynamic_table_addition); namespace yb { @@ -460,6 +461,10 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { Result GetDBStreamInfo(const xrepl::StreamId db_stream_id); + void VerifyTablesInStreamMetadata( + const xrepl::StreamId& stream_id, const std::unordered_set& expected_table_ids, + const std::string& timeout_msg); + Status ChangeLeaderOfTablet(size_t new_leader_index, const TabletId tablet_id); Status CreateSnapshot(const NamespaceName& ns); diff --git a/src/yb/master/catalog_manager_bg_tasks.cc b/src/yb/master/catalog_manager_bg_tasks.cc index 8f8dbbfb568e..5a50f065ce3d 100644 --- a/src/yb/master/catalog_manager_bg_tasks.cc +++ b/src/yb/master/catalog_manager_bg_tasks.cc @@ -78,6 +78,9 @@ DEFINE_test_flag(bool, pause_catalog_manager_bg_loop_start, false, DEFINE_test_flag(bool, pause_catalog_manager_bg_loop_end, false, "Pause the bg tasks thread at the end of the loop."); +DEFINE_test_flag(bool, cdcsdk_skip_processing_dynamic_table_addition, false, + "Skip finding unprocessed tables for cdcsdk streams"); + DECLARE_bool(enable_ysql); DECLARE_bool(TEST_echo_service_enabled); @@ -261,23 +264,28 @@ void CatalogManagerBgTasks::Run() { } { - // Find if there have been any new tables added to any namespace with an active cdcsdk - // stream. - TableStreamIdsMap table_unprocessed_streams_map; - // In case of master leader restart of leadership changes, we will scan all streams for - // unprocessed tables, but from the second iteration onwards we will only consider the - // 'cdcsdk_unprocessed_tables' field of CDCStreamInfo object stored in the cdc_state_map. - Status s = - catalog_manager_->FindCDCSDKStreamsForAddedTables(&table_unprocessed_streams_map); - - if (s.ok() && !table_unprocessed_streams_map.empty()) { - s = catalog_manager_->ProcessNewTablesForCDCSDKStreams( - table_unprocessed_streams_map, l.epoch()); - } - if (!s.ok()) { - YB_LOG_EVERY_N(WARNING, 10) - << "Encountered failure while trying to add unprocessed tables to cdc_state table: " - << s.ToString(); + if (!FLAGS_TEST_cdcsdk_skip_processing_dynamic_table_addition) { + // Find if there have been any new tables added to any namespace with an active cdcsdk + // stream. + TableStreamIdsMap table_unprocessed_streams_map; + // In case of master leader restart of leadership changes, we will scan all streams for + // unprocessed tables, but from the second iteration onwards we will only consider the + // 'cdcsdk_unprocessed_tables' field of CDCStreamInfo object stored in the cdc_state_map. + Status s = + catalog_manager_->FindCDCSDKStreamsForAddedTables(&table_unprocessed_streams_map); + + if (s.ok() && !table_unprocessed_streams_map.empty()) { + s = catalog_manager_->ProcessNewTablesForCDCSDKStreams( + table_unprocessed_streams_map, l.epoch()); + } + if (!s.ok()) { + YB_LOG_EVERY_N(WARNING, 10) + << "Encountered failure while trying to add unprocessed tables to cdc_state table: " + << s.ToString(); + } + } else { + LOG(INFO) << "Skipping processing of dynamic table addition due to " + "cdcsdk_skip_processing_dynamic_table_addition being true"; } } diff --git a/src/yb/master/xrepl_catalog_manager.cc b/src/yb/master/xrepl_catalog_manager.cc index 8e180195aceb..ec243ebe33ee 100644 --- a/src/yb/master/xrepl_catalog_manager.cc +++ b/src/yb/master/xrepl_catalog_manager.cc @@ -255,8 +255,9 @@ class CDCStreamLoader : public Visitor { // For CDCSDK Streams, we scan all the tables in the namespace, and compare it with all the // tables associated with the stream. - if (metadata.state() == SysCDCStreamEntryPB::ACTIVE && ns && - ns->state() == SysNamespaceEntryPB::RUNNING) { + if ((metadata.state() == SysCDCStreamEntryPB::ACTIVE || + metadata.state() == SysCDCStreamEntryPB::DELETING_METADATA) && + ns && ns->state() == SysNamespaceEntryPB::RUNNING) { catalog_manager_->FindAllTablesMissingInCDCSDKStream( stream_id, metadata.table_id(), metadata.namespace_id()); } @@ -1029,7 +1030,8 @@ Status CatalogManager::FindCDCSDKStreamsForAddedTables( } auto ltm = stream_info->LockForRead(); - if (ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE) { + if (ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE || + ltm->pb.state() == SysCDCStreamEntryPB::DELETING_METADATA) { for (const auto& unprocessed_table_id : *unprocessed_tables) { auto table = tables_->FindTableOrNull(unprocessed_table_id); Schema schema;