diff --git a/src/query/management/src/warehouse/warehouse_mgr.rs b/src/query/management/src/warehouse/warehouse_mgr.rs index b02bcbcb376ac..13a2ea51065d8 100644 --- a/src/query/management/src/warehouse/warehouse_mgr.rs +++ b/src/query/management/src/warehouse/warehouse_mgr.rs @@ -1015,6 +1015,11 @@ impl WarehouseApi for WarehouseMgr { value: Some(seq_v), .. })), }) => Ok(seq_v.seq), + // compatibility + // After network fail, nodes may become expired due to failed heartbeats. + // For system-managed nodes, this situation has already been handled in resolve_conflicts. + // For self-managed nodes, we need to return seq = 0 so that the next heartbeat can proceed normally. + _ if matches!(node.node_type, NodeType::SelfManaged) => Ok(0), _ => Err(ErrorCode::MetaServiceError("Heartbeat node info failure.")), } } diff --git a/src/query/management/tests/it/warehouse.rs b/src/query/management/tests/it/warehouse.rs index 17632921490e2..aabbe97e920d3 100644 --- a/src/query/management/tests/it/warehouse.rs +++ b/src/query/management/tests/it/warehouse.rs @@ -210,6 +210,32 @@ async fn test_drop_self_managed_warehouse() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_heartbeat_not_exists_self_managed_node() -> Result<()> { + let (_kv, warehouse_manager, _nodes) = nodes(Duration::from_mins(60), 0).await?; + + let node_info = self_managed_node("test_node"); + + // heartbeat not exists node info + let mut heartbeat_node = node_info.clone(); + let seq = warehouse_manager + .heartbeat_node(&mut heartbeat_node, 34234) + .await?; + + assert_eq!(seq, 0); + assert_eq!(heartbeat_node, node_info); + + let mut heartbeat_node = node_info.clone(); + let seq = warehouse_manager + .heartbeat_node(&mut heartbeat_node, seq) + .await?; + + assert_ne!(seq, 0); + assert_eq!(heartbeat_node, node_info); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_successfully_heartbeat_self_managed_node() -> Result<()> { let (kv, warehouse_manager, _nodes) = nodes(Duration::from_mins(60), 0).await?; diff --git a/src/query/service/src/clusters/cluster.rs b/src/query/service/src/clusters/cluster.rs index acfbe065fec72..5ba2fa91125c6 100644 --- a/src/query/service/src/clusters/cluster.rs +++ b/src/query/service/src/clusters/cluster.rs @@ -323,8 +323,15 @@ impl ClusterDiscovery { cluster_nodes.len() as f64, ); - let res = Cluster::create(res, self.local_id.clone()); - Ok(res) + // compatibility, for self-managed nodes, we allow queries to continue executing even when the heartbeat fails. + if cluster_nodes.is_empty() && !config.query.cluster_id.is_empty() { + let mut cluster = Cluster::empty(); + let mut_cluster = Arc::get_mut(&mut cluster).unwrap(); + mut_cluster.local_id = self.local_id.clone(); + return Ok(cluster); + } + + Ok(Cluster::create(res, self.local_id.clone())) } } } diff --git a/src/query/service/tests/it/clusters.rs b/src/query/service/tests/it/clusters.rs index 96b31b3e5899c..e7c19d4d893c1 100644 --- a/src/query/service/tests/it/clusters.rs +++ b/src/query/service/tests/it/clusters.rs @@ -19,6 +19,25 @@ use databend_query::clusters::ClusterHelper; use databend_query::test_kits::*; use pretty_assertions::assert_eq; +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_empty_cluster_discovery() -> Result<()> { + let _guard = TestFixture::setup().await?; + + let config = ConfigBuilder::create().build(); + + let metastore = ClusterDiscovery::create_meta_client(&config).await?; + let cluster_discovery = ClusterDiscovery::try_create(&config, metastore.clone()).await?; + + let discover_cluster = cluster_discovery.discover(&config).await?; + + let discover_cluster_nodes = discover_cluster.get_nodes(); + assert_eq!(discover_cluster_nodes.len(), 0); + assert!(discover_cluster.is_empty()); + assert!(!discover_cluster.unassign); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_single_cluster_discovery() -> Result<()> { let config = ConfigBuilder::create().build();