From 65c24f114c696edc0d700f9b6fdc55a466363591 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Tue, 30 Jul 2024 15:57:07 +0800 Subject: [PATCH] [bugfix](paimon)add support for 'in' and 'not in' (#38390) ## Proposed changes add support for `in` and `not in`: ``` select * from tb where partition_column in ('a','b','c'); select * from tb where partition_column not in ('a','b','c'); ``` --- .../source/PaimonPredicateConverter.java | 39 ++++++ .../paimon/source/PaimonScanNode.java | 23 +++- .../paimon/test_paimon_predict.groovy | 127 ++++++++++++++++++ 3 files changed, 183 insertions(+), 6 deletions(-) create mode 100644 regression-test/suites/external_table_p0/paimon/test_paimon_predict.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonPredicateConverter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonPredicateConverter.java index 605bc1b321a428..9e46474898db4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonPredicateConverter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonPredicateConverter.java @@ -21,9 +21,11 @@ import org.apache.doris.analysis.CompoundPredicate; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.InPredicate; import org.apache.doris.analysis.IsNullPredicate; import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.Subquery; import org.apache.doris.thrift.TExprOpcode; import org.apache.paimon.data.BinaryString; @@ -85,11 +87,48 @@ private Predicate convertToPaimonExpr(Expr dorisExpr) { default: return null; } + } else if (dorisExpr instanceof InPredicate) { + return doInPredicate((InPredicate) dorisExpr); } else { return binaryExprDesc(dorisExpr); } } + private Predicate doInPredicate(InPredicate predicate) { + // InPredicate, only support a in (1,2,3) + if (predicate.contains(Subquery.class)) { + return null; + } + + SlotRef slotRef = convertDorisExprToSlotRef(predicate.getChild(0)); + if (slotRef == null) { + return null; + } + String colName = slotRef.getColumnName(); + int idx = fieldNames.indexOf(colName); + DataType dataType = paimonFieldTypes.get(idx); + List valueList = new ArrayList<>(); + for (int i = 1; i < predicate.getChildren().size(); i++) { + if (!(predicate.getChild(i) instanceof LiteralExpr)) { + return null; + } + LiteralExpr literalExpr = convertDorisExprToLiteralExpr(predicate.getChild(i)); + Object value = dataType.accept(new PaimonValueConverter(literalExpr)); + if (value == null) { + return null; + } + valueList.add(value); + } + + if (predicate.isNotIn()) { + // not in + return builder.notIn(idx, valueList); + } else { + // in + return builder.in(idx, valueList); + } + } + private Predicate binaryExprDesc(Expr dorisExpr) { TExprOpcode opcode = dorisExpr.getOpcode(); // Make sure the col slot is always first diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index aeecbd7eba2ede..45516fd2841a43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -317,15 +317,26 @@ public Map getLocationProperties() throws MetaNotFoundException, @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { - String result = super.getNodeExplainString(prefix, detailLevel) - + String.format("%spaimonNativeReadSplits=%d/%d\n", - prefix, rawFileSplitNum, (paimonSplitNum + rawFileSplitNum)); + StringBuilder sb = new StringBuilder(super.getNodeExplainString(prefix, detailLevel)); + sb.append(String.format("%spaimonNativeReadSplits=%d/%d\n", + prefix, rawFileSplitNum, (paimonSplitNum + rawFileSplitNum))); + + sb.append(prefix).append("predicatesFromPaimon:"); + if (predicates.isEmpty()) { + sb.append(" NONE\n"); + } else { + sb.append("\n"); + for (Predicate predicate : predicates) { + sb.append(prefix).append(prefix).append(predicate).append("\n"); + } + } + if (detailLevel == TExplainLevel.VERBOSE) { - result += prefix + "PaimonSplitStats: \n"; + sb.append(prefix).append("PaimonSplitStats: \n"); for (SplitStat splitStat : splitStats) { - result += String.format("%s %s\n", prefix, splitStat); + sb.append(String.format("%s %s\n", prefix, splitStat)); } } - return result; + return sb.toString(); } } diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_predict.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_predict.groovy new file mode 100644 index 00000000000000..6f07ae1db8e155 --- /dev/null +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_predict.groovy @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_predict", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable paimon test") + return + } + + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String catalog_name = "test_paimon_predict" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type' = 'paimon', + 'warehouse' = 's3://warehouse/wh', + 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}', + 's3.access_key' = 'admin', + 's3.secret_key' = 'password', + 's3.path.style.access' = 'true' + ); + """ + sql """use `${catalog_name}`.`spark_paimon`""" + + explain { + sql("select * from predict_for_in") + contains("inputSplitNum=9") + } + + def explain_one_column = { col_name -> + + explain { + sql("select * from predict_for_in where ${col_name} in ('a')") + contains("inputSplitNum=3") + } + + explain { + sql("select * from predict_for_in where ${col_name} in ('b')") + contains("inputSplitNum=3") + } + + explain { + sql("select * from predict_for_in where ${col_name} in ('a','b')") + contains("inputSplitNum=6") + } + + explain { + sql("select * from predict_for_in where ${col_name} in ('a','x')") + contains("inputSplitNum=3") + } + + explain { + sql("select * from predict_for_in where ${col_name} in ('x','y')") + contains("inputSplitNum=0") + } + + explain { + sql("select * from predict_for_in where ${col_name} in ('a','b','c')") + contains("inputSplitNum=9") + } + + explain { + sql("select * from predict_for_in where ${col_name} in ('y','x','a','c')") + contains("inputSplitNum=6") + } + + explain { + sql("select * from predict_for_in where ${col_name} not in ('y','x','a','c')") + contains("inputSplitNum=3") + } + + explain { + sql("select * from predict_for_in where ${col_name} not in ('a')") + contains("inputSplitNum=6") + } + + explain { + sql("select * from predict_for_in where ${col_name} not in ('x')") + contains("inputSplitNum=9") + } + } + + explain_one_column('dt') + explain_one_column('hh') + + + sql """ drop catalog if exists ${catalog_name} """ +} + + +/* + +for spark: + +create table predict_for_in(id int, dt string, hh string) partitioned by(dt,hh); + +insert into predict_for_in values (1, 'a', 'a'); +insert into predict_for_in values (2, 'a', 'b'); +insert into predict_for_in values (3, 'a', 'c'); + +insert into predict_for_in values (4, 'b', 'a'); +insert into predict_for_in values (5, 'b', 'b'); +insert into predict_for_in values (6, 'b', 'c'); + +insert into predict_for_in values (7, 'c', 'a'); +insert into predict_for_in values (8, 'c', 'b'); +insert into predict_for_in values (9, 'c', 'c'); + +*/ +