diff --git a/src/query/management/src/udf/udf_mgr.rs b/src/query/management/src/udf/udf_mgr.rs index afd2eedf2aa46..b70cd10a94bc2 100644 --- a/src/query/management/src/udf/udf_mgr.rs +++ b/src/query/management/src/udf/udf_mgr.rs @@ -27,6 +27,7 @@ use databend_common_meta_kvapi::kvapi::DirName; use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MetaError; +use databend_common_meta_types::UpsertKV; use databend_common_meta_types::With; use futures::TryStreamExt; @@ -156,15 +157,26 @@ impl UdfMgr { ) -> Result>, MetaError> { let key = UdfIdent::new(&self.tenant, udf_name); let req = UpsertPB::delete(key).with(seq); - let res = self.kv_api.upsert_pb(&req).await?; - - if res.is_changed() { - Ok(res.prev) + if let Ok(res) = self.kv_api.upsert_pb(&req).await { + if res.is_changed() { + Ok(res.prev) + } else { + Ok(None) + } } else { + self.try_drop_old_udf(udf_name).await?; Ok(None) } } + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn try_drop_old_udf(&self, udf_name: &str) -> Result<(), MetaError> { + let key = UdfIdent::new(&self.tenant, udf_name); + let _res = self.kv_api.upsert_kv(UpsertKV::delete(&key)).await?; + Ok(()) + } + fn ensure_non_builtin(&self, name: &str) -> Result<(), UdfError> { if is_builtin_function(name) { return Err(UdfError::Exists { diff --git a/src/query/management/tests/it/udf.rs b/src/query/management/tests/it/udf.rs index adb09f25f2298..0031c5c0b325e 100644 --- a/src/query/management/tests/it/udf.rs +++ b/src/query/management/tests/it/udf.rs @@ -28,6 +28,8 @@ use databend_common_meta_embedded::MemMeta; use databend_common_meta_kvapi::kvapi::KVApi; use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; +use databend_common_meta_types::Operation; +use databend_common_meta_types::UpsertKV; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_add_udf() -> Result<()> { @@ -109,6 +111,33 @@ async fn test_add_udf() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_drop_old_udf() -> Result<()> { + let (kv_api, udf_api) = new_udf_api().await?; + + // lambda udf + let udf = create_test_lambda_udf(); + let udf_key = format!("__fd_udfs/admin/{}", udf.name); + + let v = serde_json::to_vec("test")?; + let kv_api = kv_api.clone(); + let _upsert_kv = kv_api + .upsert_kv(UpsertKV::new( + &udf_key, + MatchSeq::Exact(0), + Operation::Update(v), + None, + )) + .await?; + let err = udf_api.list_udf().await.is_err(); + assert!(err); + + udf_api.drop_udf(&udf.name, MatchSeq::GE(1)).await?; + let udfs = udf_api.list_udf().await?; + assert_eq!(udfs, vec![]); + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_already_exists_add_udf() -> Result<()> { let (_, udf_api) = new_udf_api().await?;