From 6ef1be7ebee65fcef6f9ca5db9984b248b0768be Mon Sep 17 00:00:00 2001 From: Wang Tao Date: Fri, 13 Dec 2024 10:45:34 +0800 Subject: [PATCH] [AMORO-3359] only set parallelism on parallelismSourceFunciton (#3360) issue_3359 only set parallelism on parallelismSourceFunciton --- .../org/apache/amoro/flink/table/FlinkSource.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java index b57b5fb3d3..3e4080e8a8 100644 --- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java +++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java @@ -38,6 +38,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamSource; @@ -289,12 +290,13 @@ private DataStream wrapKrb(DataStream ds) { DataStreamSource sourceStream = env.addSource(functionProxy, tfSource.getName(), tfSource.getOutputType()); context.generateUid(MIXED_FORMAT_FILE_TRANSFORMATION).ifPresent(sourceStream::uid); - return sourceStream - .setParallelism(scanParallelism) - .transform( - tf.getName(), - tf.getOutputType(), - new UnkeyedInputFormatOperatorFactory(inputFormatProxyFactory)); + if (sourceStream instanceof ParallelSourceFunction) { + sourceStream.setParallelism(scanParallelism); + } + return sourceStream.transform( + tf.getName(), + tf.getOutputType(), + new UnkeyedInputFormatOperatorFactory(inputFormatProxyFactory)); } LegacySourceTransformation tfSource = (LegacySourceTransformation) origin;