Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Nov 8, 2023
1 parent 959dd89 commit 99357f7
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public class IcebergSink extends StreamSink {
@ApiModelProperty("Primary key")
private String primaryKey;

@ApiModelProperty("Upsert mode")
private Boolean upsert;
@ApiModelProperty("append mode, UPSERT or APPEND")
private String appendMode;

public IcebergSink() {
this.setSinkType(SinkType.ICEBERG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class IcebergSinkRequest extends SinkRequest {
@ApiModelProperty("Primary key")
private String primaryKey;

@ApiModelProperty("Upsert mode")
private Boolean upsert;
@ApiModelProperty("append mode, UPSERT or APPEND")
private String appendMode;

}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> con
catalogType,
icebergSink.getCatalogUri(),
icebergSink.getWarehouse(),
icebergSink.getUpsert());
icebergSink.getAppendMode());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class IcebergConstant {
public static final String STREAMING = "streaming";
public static final String STARTING_STRATEGY_KEY = "starting-strategy";

public static final String UPSERT_KEY = "upsert";
public static final String APPEND_MODE_KEY = "appendMode";

/**
* Iceberg supported catalog type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata,
@JsonProperty("warehouse")
private String warehouse;

@JsonProperty("upsert")
private Boolean upsert;
@JsonProperty("append mode, UPSERT or APPEND")
private String appendMode;

@JsonCreator
public IcebergLoadNode(@JsonProperty("id") String id,
Expand All @@ -92,15 +92,15 @@ public IcebergLoadNode(@JsonProperty("id") String id,
@JsonProperty("catalogType") IcebergConstant.CatalogType catalogType,
@JsonProperty("uri") String uri,
@JsonProperty("warehouse") String warehouse,
@JsonProperty("upsert") Boolean upsert) {
@JsonProperty("appendMode") String appendMode) {
super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
this.primaryKey = primaryKey;
this.catalogType = catalogType == null ? CatalogType.HIVE : catalogType;
this.uri = uri;
this.warehouse = warehouse;
this.upsert = Optional.ofNullable(upsert).orElse(false);
this.appendMode = appendMode;
}

@Override
Expand All @@ -114,7 +114,7 @@ public Map<String, String> tableOptions() {
options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName);
options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name());
options.put(IcebergConstant.CATALOG_NAME_KEY, catalogType.name());
options.put(IcebergConstant.UPSERT_KEY, upsert.toString());
options.put(IcebergConstant.APPEND_MODE_KEY, appendMode);
if (null != uri) {
options.put(IcebergConstant.URI_KEY, uri);
}
Expand Down

0 comments on commit 99357f7

Please sign in to comment.