Skip to content

Commit

Permalink
chore(query): fix udf metrics (#17134)
Browse files Browse the repository at this point in the history
* chore(query): fix udf metrics

* chore(query): rename

* chore(query): update

* chore(query): add test parallel params

* chore(query): add test parallel params

* chore(query): add test parallel params

* chore(query): add test parallel params
  • Loading branch information
sundy-li authored Dec 30, 2024
1 parent af5413f commit d5aaf94
Show file tree
Hide file tree
Showing 13 changed files with 49 additions and 20 deletions.
6 changes: 6 additions & 0 deletions .github/actions/test_sqllogic_standalone_linux/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ inputs:
description: "logic test handlers, mysql,http,clickhouse"
required: true
default: ""
parallel:
description: "logic test parallel"
required: false
default: ""
storage-format:
description: "storage format for databend query to test"
required: true
Expand All @@ -38,6 +42,7 @@ runs:
shell: bash
env:
TEST_HANDLERS: ${{ inputs.handlers }}
TEST_PARALLEL: ${{ inputs.parallel }}
CACHE_ENABLE_TABLE_META_CACHE: ${{ inputs.enable_table_meta_cache}}
run: bash ./scripts/ci/ci-run-sqllogic-tests.sh ${{ inputs.dirs }}

Expand All @@ -46,5 +51,6 @@ runs:
shell: bash
env:
TEST_HANDLERS: ${{ inputs.handlers }}
TEST_PARALLEL: ${{ inputs.parallel }}
CACHE_ENABLE_TABLE_META_CACHE: ${{ inputs.enable_table_meta_cache}}
run: bash ./scripts/ci/ci-run-sqllogic-tests-native.sh ${{ inputs.dirs }}
1 change: 1 addition & 0 deletions .github/workflows/reuse.sqllogic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ jobs:
with:
dirs: udf_server
handlers: mysql,http
parallel: 1
storage-format: all
- name: Upload failure
if: failure() || cancelled()
Expand Down
3 changes: 2 additions & 1 deletion scripts/ci/ci-run-ee-sqllogic-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ echo "Starting standalone DatabendQuery and DatabendMeta"
./scripts/ci/deploy/databend-query-standalone.sh

TEST_HANDLERS=${TEST_HANDLERS:-"mysql,http"}
TEST_PARALLEL=${TEST_PARALLEL:-8}
BUILD_PROFILE=${BUILD_PROFILE:-debug}

RUN_DIR=""
Expand All @@ -19,4 +20,4 @@ fi
echo "Run suites using argument: $RUN_DIR"

echo "Starting ee databend-sqllogic tests"
target/${BUILD_PROFILE}/databend-sqllogictests --handlers ${TEST_HANDLERS} ${RUN_DIR} --enable_sandbox --parallel 8
target/${BUILD_PROFILE}/databend-sqllogictests --handlers ${TEST_HANDLERS} ${RUN_DIR} --enable_sandbox --parallel ${TEST_PARALLEL}
3 changes: 2 additions & 1 deletion scripts/ci/ci-run-sqllogic-tests-cloud.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ echo "Starting standalone DatabendQuery and DatabendMeta"
./scripts/ci/deploy/databend-query-standalone.sh

TEST_HANDLERS=${TEST_HANDLERS:-"mysql,http"}
TEST_PARALLEL=${TEST_PARALLEL:-8}
BUILD_PROFILE=${BUILD_PROFILE:-debug}

RUN_DIR=""
Expand All @@ -19,4 +20,4 @@ fi
echo "Run suites using argument: $RUN_DIR"

echo "Starting databend-sqllogic tests"
target/${BUILD_PROFILE}/databend-sqllogictests --handlers ${TEST_HANDLERS} ${RUN_DIR} --skip_dir management,explain_native,ee --enable_sandbox --parallel 8
target/${BUILD_PROFILE}/databend-sqllogictests --handlers ${TEST_HANDLERS} ${RUN_DIR} --skip_dir management,explain_native,ee --enable_sandbox --parallel ${TEST_PARALLEL}
3 changes: 2 additions & 1 deletion scripts/ci/ci-run-sqllogic-tests-cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ echo "Starting Cluster databend-query"
export RUST_BACKTRACE=1

TEST_HANDLERS=${TEST_HANDLERS:-"mysql,http"}
TEST_PARALLEL=${TEST_PARALLEL:-8}
BUILD_PROFILE=${BUILD_PROFILE:-debug}

RUN_DIR=""
Expand All @@ -21,4 +22,4 @@ fi
echo "Run suites using argument: $RUN_DIR"

echo "Starting databend-sqllogic tests"
target/${BUILD_PROFILE}/databend-sqllogictests --handlers ${TEST_HANDLERS} ${RUN_DIR} --enable_sandbox --parallel 8 --skip_file tpcds_q64.test,tpcds_join_order.test
target/${BUILD_PROFILE}/databend-sqllogictests --handlers ${TEST_HANDLERS} ${RUN_DIR} --enable_sandbox --parallel ${TEST_PARALLEL} --skip_file tpcds_q64.test,tpcds_join_order.test
3 changes: 2 additions & 1 deletion scripts/ci/ci-run-sqllogic-tests-minio.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ echo "Starting standalone DatabendQuery and DatabendMeta"
./scripts/ci/deploy/databend-query-standalone.sh

TEST_HANDLERS=${TEST_HANDLERS:-"mysql,http"}
TEST_PARALLEL=${TEST_PARALLEL:-8}
BUILD_PROFILE=${BUILD_PROFILE:-debug}

RUN_DIR=""
Expand All @@ -31,4 +32,4 @@ fi
echo "Run suites using argument: $RUN_DIR"

echo "Starting databend-sqllogic tests"
target/${BUILD_PROFILE}/databend-sqllogictests --handlers ${TEST_HANDLERS} ${RUN_DIR} --skip_dir management,explain_native,ee --enable_sandbox --parallel 8
target/${BUILD_PROFILE}/databend-sqllogictests --handlers ${TEST_HANDLERS} ${RUN_DIR} --skip_dir management,explain_native,ee --enable_sandbox --parallel ${TEST_PARALLEL}
3 changes: 2 additions & 1 deletion scripts/ci/ci-run-sqllogic-tests-native-minio.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ echo "Starting standalone DatabendQuery and DatabendMeta"
./scripts/ci/deploy/databend-query-standalone-native.sh

TEST_HANDLERS=${TEST_HANDLERS:-"mysql,http"}
TEST_PARALLEL=${TEST_PARALLEL:-8}
BUILD_PROFILE=${BUILD_PROFILE:-debug}

RUN_DIR=""
Expand All @@ -31,4 +32,4 @@ fi
echo "Run suites using argument: $RUN_DIR"

echo "Starting databend-sqllogic tests"
target/${BUILD_PROFILE}/databend-sqllogictests --handlers ${TEST_HANDLERS} ${RUN_DIR} --skip_dir management,cluster,explain,tpch,ee --enable_sandbox --parallel 8
target/${BUILD_PROFILE}/databend-sqllogictests --handlers ${TEST_HANDLERS} ${RUN_DIR} --skip_dir management,cluster,explain,tpch,ee --enable_sandbox --parallel ${TEST_PARALLEL}
3 changes: 2 additions & 1 deletion scripts/ci/ci-run-sqllogic-tests-native.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ echo "Starting standalone DatabendQuery and DatabendMeta"
./scripts/ci/deploy/databend-query-standalone-native.sh

TEST_HANDLERS=${TEST_HANDLERS:-"mysql,http"}
TEST_PARALLEL=${TEST_PARALLEL:-8}
BUILD_PROFILE=${BUILD_PROFILE:-debug}

RUN_DIR=""
Expand All @@ -19,4 +20,4 @@ fi
echo "Run suites using argument: $RUN_DIR"

echo "Starting databend-sqllogic tests"
target/${BUILD_PROFILE}/databend-sqllogictests --handlers ${TEST_HANDLERS} ${RUN_DIR} --skip_dir management,cluster,explain,tpch,ee --enable_sandbox --parallel 8
target/${BUILD_PROFILE}/databend-sqllogictests --handlers ${TEST_HANDLERS} ${RUN_DIR} --skip_dir management,cluster,explain,tpch,ee --enable_sandbox --parallel ${TEST_PARALLEL}
5 changes: 3 additions & 2 deletions scripts/ci/ci-run-sqllogic-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ echo "Starting standalone DatabendQuery and DatabendMeta"
./scripts/ci/deploy/databend-query-standalone.sh

TEST_HANDLERS=${TEST_HANDLERS:-"mysql,http"}
TEST_PARALLEL=${TEST_PARALLEL:-8}
BUILD_PROFILE=${BUILD_PROFILE:-debug}

RUN_DIR=""
Expand All @@ -20,6 +21,6 @@ echo "Run suites using argument: $RUN_DIR"

echo "Starting databend-sqllogic tests"
if [ -z "$RUN_DIR" ]; then
target/${BUILD_PROFILE}/databend-sqllogictests --run_dir temp_table --enable_sandbox --parallel 8
target/${BUILD_PROFILE}/databend-sqllogictests --run_dir temp_table --enable_sandbox --parallel ${TEST_PARALLEL}
fi
target/${BUILD_PROFILE}/databend-sqllogictests --handlers ${TEST_HANDLERS} ${RUN_DIR} --skip_dir management,explain_native,ee,temp_table --enable_sandbox --parallel 8
target/${BUILD_PROFILE}/databend-sqllogictests --handlers ${TEST_HANDLERS} ${RUN_DIR} --skip_dir management,explain_native,ee,temp_table --enable_sandbox --parallel ${TEST_PARALLEL}
10 changes: 5 additions & 5 deletions src/common/metrics/src/metrics/external_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const METRIC_RETRY: &str = "external_retry";
const METRIC_ERROR: &str = "external_error";
const METRIC_RUNNING_REQUESTS: &str = "external_running_requests";
const METRIC_REQUESTS: &str = "external_requests";
const METRIC_EXTERNAL_BLOCK_ROWS: &str = "external_block_rows";
const METRIC_EXTERNAL_BATCH_ROWS: &str = "external_batch_rows";

static REQUEST_EXTERNAL_DURATION: LazyLock<FamilyHistogram<VecLabels>> =
LazyLock::new(|| register_histogram_family_in_milliseconds(METRIC_REQUEST_EXTERNAL_DURATION));
Expand All @@ -49,8 +49,8 @@ static RUNNING_REQUESTS_EXTERNAL: LazyLock<FamilyCounter<VecLabels>> =
static REQUESTS_EXTERNAL_EXTERNAL: LazyLock<FamilyCounter<VecLabels>> =
LazyLock::new(|| register_counter_family(METRIC_REQUESTS));

static EXTERNAL_BLOCK_ROWS: LazyLock<FamilyHistogram<VecLabels>> =
LazyLock::new(|| register_histogram_family_in_rows(METRIC_EXTERNAL_BLOCK_ROWS));
static EXTERNAL_BATCH_ROWS: LazyLock<FamilyHistogram<VecLabels>> =
LazyLock::new(|| register_histogram_family_in_rows(METRIC_EXTERNAL_BATCH_ROWS));

const LABEL_FUNCTION_NAME: &str = "function_name";
const LABEL_ERROR_KIND: &str = "error_kind";
Expand All @@ -69,9 +69,9 @@ pub fn record_request_external_duration(function_name: impl Into<String>, durati
.observe(duration.as_millis_f64());
}

pub fn record_request_external_block_rows(function_name: impl Into<String>, rows: usize) {
pub fn record_request_external_batch_rows(function_name: impl Into<String>, rows: usize) {
let labels = &vec![(LABEL_FUNCTION_NAME, function_name.into())];
EXTERNAL_BLOCK_ROWS
EXTERNAL_BATCH_ROWS
.get_or_create(labels)
.observe(rows as f64);
}
Expand Down
1 change: 1 addition & 0 deletions src/query/functions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,6 @@ fn builtin_functions() -> FunctionRegistry {
scalars::register(&mut registry);
srfs::register(&mut registry);

registry.check_ambiguity();
registry
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use databend_common_expression::DataField;
use databend_common_expression::DataSchema;
use databend_common_metrics::external_server::record_connect_external_duration;
use databend_common_metrics::external_server::record_error_external;
use databend_common_metrics::external_server::record_request_external_block_rows;
use databend_common_metrics::external_server::record_request_external_batch_rows;
use databend_common_metrics::external_server::record_request_external_duration;
use databend_common_metrics::external_server::record_retry_external;
use databend_common_metrics::external_server::record_running_requests_external_finish;
Expand Down Expand Up @@ -164,13 +164,18 @@ impl TransformUdfServer {
record_connect_external_duration(func.func_name.clone(), connect_duration);

Profile::record_usize_profile(ProfileStatisticsName::ExternalServerRequestCount, 1);
record_running_requests_external_start(func.name.clone(), 1);
record_request_external_batch_rows(func.func_name.clone(), num_rows);

let result_batch = client
.do_exchange(&func.func_name, input_batch.clone())
.await?;
.await;

let request_duration = instant.elapsed() - connect_duration;
record_running_requests_external_finish(func.name.clone(), 1);
record_request_external_duration(func.func_name.clone(), request_duration);

let result_batch = result_batch?;
let schema = DataSchema::try_from(&(*result_batch.schema()))?;
let (result_block, result_schema) = DataBlock::from_record_batch(&schema, &result_batch)
.map_err(|err| {
Expand Down Expand Up @@ -241,7 +246,6 @@ impl AsyncTransform for TransformUdfServer {
.map(|start| data_block.slice(start..start + batch_rows.min(rows - start)))
.collect();
for func in self.funcs.iter() {
record_request_external_block_rows(func.func_name.clone(), rows);
let server_addr = func.udf_type.as_server().unwrap();
let endpoint = self.endpoints.get(server_addr).unwrap();
let tasks: Vec<_> = batch_blocks
Expand Down Expand Up @@ -285,10 +289,7 @@ impl AsyncTransform for TransformUdfServer {
})
.collect();

let task_len = tasks.len() as u64;
record_running_requests_external_start(func.name.clone(), task_len);
let task_results = futures::future::join_all(tasks).await;
record_running_requests_external_finish(func.name.clone(), task_len);
batch_blocks = task_results
.into_iter()
.map(|b| b.unwrap())
Expand Down
15 changes: 14 additions & 1 deletion tests/sqllogictests/suites/udf_server/udf_server_test.test
Original file line number Diff line number Diff line change
Expand Up @@ -557,9 +557,22 @@ select * from _tmp_table order by field1;
5



statement ok
remove @udf_stage;

statement ok
drop FUNCTION url_len;

statement ok
select sleep(2);

query I
select metric, max(value::Int > 0) from system.metrics where metric in ('external_batch_rows_count', 'external_requests_total') group by metric order by metric;
----
external_batch_rows_count 1
external_requests_total 1

query I
select sum((value::Int)::Int) from system.metrics where metric = 'external_running_requests_total';
----
0

0 comments on commit d5aaf94

Please sign in to comment.