Skip to content

Commit

Permalink
refactor: try drop old udf (#17264)
Browse files Browse the repository at this point in the history
In eariler version, udf is serialize as json. In current version we can not drop/list these udfs.

show user functions;
2001=>InvalidReply: source:(PbDecodeError: failed to decode Protobuf message: buffer underflow; when:(decode value of __fd_udfs/tn3ftqihs/plusp))
while list UDFs

drop function IF EXISTS plusp;
2001=>InvalidReply: source:(PbDecodeError: failed to decode Protobuf message: buffer underflow; when:(decode value of __fd_udfs/tn3ftqihs/plusp))

So if drop udf return err, we directly drop the kv.
  • Loading branch information
TCeason authored Jan 13, 2025
1 parent 61e962e commit 3f0cc8a
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 4 deletions.
20 changes: 16 additions & 4 deletions src/query/management/src/udf/udf_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::UpsertKV;
use databend_common_meta_types::With;
use futures::TryStreamExt;

Expand Down Expand Up @@ -156,15 +157,26 @@ impl UdfMgr {
) -> Result<Option<SeqV<UserDefinedFunction>>, MetaError> {
let key = UdfIdent::new(&self.tenant, udf_name);
let req = UpsertPB::delete(key).with(seq);
let res = self.kv_api.upsert_pb(&req).await?;

if res.is_changed() {
Ok(res.prev)
if let Ok(res) = self.kv_api.upsert_pb(&req).await {
if res.is_changed() {
Ok(res.prev)
} else {
Ok(None)
}
} else {
self.try_drop_old_udf(udf_name).await?;
Ok(None)
}
}

#[async_backtrace::framed]
#[fastrace::trace]
pub async fn try_drop_old_udf(&self, udf_name: &str) -> Result<(), MetaError> {
let key = UdfIdent::new(&self.tenant, udf_name);
let _res = self.kv_api.upsert_kv(UpsertKV::delete(&key)).await?;
Ok(())
}

fn ensure_non_builtin(&self, name: &str) -> Result<(), UdfError> {
if is_builtin_function(name) {
return Err(UdfError::Exists {
Expand Down
29 changes: 29 additions & 0 deletions src/query/management/tests/it/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use databend_common_meta_embedded::MemMeta;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::Operation;
use databend_common_meta_types::UpsertKV;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_add_udf() -> Result<()> {
Expand Down Expand Up @@ -109,6 +111,33 @@ async fn test_add_udf() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_drop_old_udf() -> Result<()> {
let (kv_api, udf_api) = new_udf_api().await?;

// lambda udf
let udf = create_test_lambda_udf();
let udf_key = format!("__fd_udfs/admin/{}", udf.name);

let v = serde_json::to_vec("test")?;
let kv_api = kv_api.clone();
let _upsert_kv = kv_api
.upsert_kv(UpsertKV::new(
&udf_key,
MatchSeq::Exact(0),
Operation::Update(v),
None,
))
.await?;
let err = udf_api.list_udf().await.is_err();
assert!(err);

udf_api.drop_udf(&udf.name, MatchSeq::GE(1)).await?;
let udfs = udf_api.list_udf().await?;
assert_eq!(udfs, vec![]);
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_already_exists_add_udf() -> Result<()> {
let (_, udf_api) = new_udf_api().await?;
Expand Down

0 comments on commit 3f0cc8a

Please sign in to comment.