Skip to content

Commit

Permalink
[INLONG-9359][Sort] Fix iceberg all migrate connector stack overflow …
Browse files Browse the repository at this point in the history
…error (#9361)
  • Loading branch information
EMsnap authored Nov 29, 2023
1 parent 7178b39 commit ff3cf51
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import org.apache.inlong.sort.util.SchemaChangeUtils;

import com.google.common.base.Preconditions;
import lombok.Data;
Expand All @@ -54,7 +53,6 @@
import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_FORMAT;
import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_TABLE_PATTERN;
import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_SCHEMA_CHANGE_POLICIES;

@JsonTypeName("icebergLoad")
@Data
Expand Down Expand Up @@ -177,8 +175,6 @@ public IcebergLoadNode(@JsonProperty("id") String id,
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
options.put(IcebergConstant.CONNECTOR_KEY, IcebergConstant.CONNECTOR);
// for test sink.ignore.changelog
// options.put("sink.ignore.changelog", "true");
options.put(IcebergConstant.DATABASE_KEY, dbName);
options.put(IcebergConstant.TABLE_KEY, tableName);
options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName);
Expand All @@ -197,7 +193,6 @@ public Map<String, String> tableOptions() {
options.put(SINK_MULTIPLE_FORMAT, Objects.requireNonNull(sinkMultipleFormat).identifier());
options.put(SINK_MULTIPLE_DATABASE_PATTERN, databasePattern);
options.put(SINK_MULTIPLE_TABLE_PATTERN, tablePattern);
options.put(SINK_SCHEMA_CHANGE_POLICIES, SchemaChangeUtils.serialize(policyMap));
} else {
options.put(SINK_MULTIPLE_ENABLE, "false");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ public final class Constants {
public static final ConfigOption<Boolean> SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT =
ConfigOptions.key("sink.multiple.auto-create-table-when-snapshot")
.booleanType()
.defaultValue(false)
.defaultValue(true)
.withDescription("Whether supporting auto create table when snapshot, default value is 'false'");

public static final ConfigOption<String> INNER_FORMAT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.base.Constants.SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
Expand Down Expand Up @@ -327,6 +328,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
options.add(SINK_MULTIPLE_PK_AUTO_GENERATED);
options.add(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK);
options.add(SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT);
options.add(WRITE_COMPACT_ENABLE);
options.add(WRITE_COMPACT_INTERVAL);
options.add(WRITE_DISTRIBUTION_MODE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -80,6 +81,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;

import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
Expand Down Expand Up @@ -522,10 +524,22 @@ private void handldAlterSchemaEventFromOperator(TableIdentifier tableId, Schema
}

// =============================== Utils method =================================================================
// if newSchema is not same with oldSchema, return false. It include difference in name, type, position, and
// quantity
// if newSchema is not same with oldSchema, return false.
private boolean isCompatible(Schema newSchema, Schema oldSchema) {
return oldSchema.sameSchema(newSchema);
if (newSchema == null) {
return false;
}

List<NestedField> oldSchemaFields = oldSchema.columns();
List<NestedField> newSchemaFields = newSchema.columns();

if (oldSchemaFields.size() != newSchemaFields.size()) {
return false;
}

return IntStream.range(0, oldSchemaFields.size())
.allMatch(i -> oldSchemaFields.get(i).name().equals(newSchemaFields.get(i).name())
&& oldSchemaFields.get(i).type() == newSchemaFields.get(i).type());
}

private TableIdentifier parseId(JsonNode data) throws IOException {
Expand Down

0 comments on commit ff3cf51

Please sign in to comment.