Skip to content

Commit

Permalink
[ecosystem](flink) update flink connector faq (apache#1706)
Browse files Browse the repository at this point in the history
## Versions 

- [x] dev
- [x] 3.0
- [x] 2.1
- [x] 2.0

## Languages

- [x] Chinese
- [x] English

## Docs Checklist

- [ ] Checked by AI
- [ ] Test Cases Built
  • Loading branch information
JNSimba authored and echo-hhj committed Jan 6, 2025
1 parent ca7b83a commit c79b7d7
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 26 deletions.
3 changes: 2 additions & 1 deletion docs/ecosystem/flink-doris-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,8 @@ from KAFKA_SOURCE;
2. **errCode = 2, detailMessage = transaction [19650] not found**
This occurs during the Commit stage. The transaction ID recorded in the checkpoint has expired on the FE side. When committing again at this time, the above error will occur. At this point, it's impossible to start from the checkpoint. Subsequently, you can extend the expiration time by modifying the `streaming_label_keep_max_second` configuration in `fe.conf`. The default expiration time is 12 hours.
This occurs during the Commit stage. The transaction ID recorded in the checkpoint has expired on the FE side. When committing again at this time, the above error will occur. At this point, it's impossible to start from the checkpoint. Subsequently, you can extend the expiration time by modifying the `streaming_label_keep_max_second` configuration in `fe.conf`. The default expiration time is 12 hours. After doris version 2.0, it will also be limited by the `label_num_threshold` configuration in `fe.conf` (default 2000), which can be increased or changed to -1 (-1 means only limited by time).
3. **errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,8 @@ from KAFKA_SOURCE;
2. **errCode = 2, detailMessage = transaction [19650] not found**
发生在 Commit 阶段,checkpoint 里面记录的事务 ID,在 FE 侧已经过期,此时再次 commit 就会出现上述错误。此时无法从 checkpoint 启动,后续可通过修改 fe.conf 的 streaming_label_keep_max_second 配置来延长过期时间,默认 12 小时。
发生在 Commit 阶段,checkpoint 里面记录的事务 ID,在 FE 侧已经过期,此时再次 commit 就会出现上述错误。此时无法从 checkpoint 启动,后续可通过修改 fe.conf 的 `streaming_label_keep_max_second` 配置来延长过期时间,默认 12 小时。Doris2.0 版本后还会受到 fe.conf 中 `label_num_threshold` 配置的限制 (默认 2000) ,可以调大或者改为 -1(-1 表示只受时间限制)。
3. **errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ source.sinkTo(builder.build());
**CDC 数据流 (JsonDebeziumSchemaSerializer)**

:::info 备注
上游数据必须符合Debezium数据格式
上游数据必须符合 Debezium 数据格式
:::

```java
Expand Down Expand Up @@ -376,7 +376,7 @@ ON a.city = c.city
| Key | Default Value | Required | Comment |
| --------------------------- | ------------- | -------- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| sink.label-prefix | -- | Y | Stream load 导入使用的 label 前缀。2pc 场景下要求全局唯一,用来保证 Flink 的 EOS 语义。 |
| sink.properties.* | -- | N | Stream Load 的导入参数。<br />例如: 'sink.properties.column_separator' = ', ' 定义列分隔符, 'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,`\x01`会被转换为二进制的 0x01 。 <br /><br />JSON 格式导入<br />'sink.properties.format' = 'json' 'sink.properties.read_json_by_line' = 'true'<br />详细参数参考[这里](../data-operate/import/stream-load-manual.md)。<br /><br />Group Commit 模式 <br /> 例如:'sink.properties.group_commit' = 'sync_mode' 设置 group commit 为同步模式。flink connector 从 1.6.2 开始支持导入配置 group commit ,详细使用和限制参考 [group commit](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/group-commit-manual/)
| sink.properties.* | -- | N | Stream Load 的导入参数。<br />例如: 'sink.properties.column_separator' = ', ' 定义列分隔符, 'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,`\x01`会被转换为二进制的 0x01。 <br /><br />JSON 格式导入<br />'sink.properties.format' = 'json' 'sink.properties.read_json_by_line' = 'true'<br />详细参数参考[这里](../data-operate/import/stream-load-manual.md)。<br /><br />Group Commit 模式 <br /> 例如:'sink.properties.group_commit' = 'sync_mode' 设置 group commit 为同步模式。flink connector 从 1.6.2 开始支持导入配置 group commit,详细使用和限制参考 [group commit](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/group-commit-manual/)
| sink.enable-delete | TRUE | N | 是否启用删除。此选项需要 Doris 表开启批量删除功能 (Doris0.15+ 版本默认开启),只支持 Unique 模型。 |
| sink.enable-2pc | TRUE | N | 是否开启两阶段提交 (2pc),默认为 true,保证 Exactly-Once 语义。关于两阶段提交可参考[这里](../data-operate/import/stream-load-manual.md)|
| sink.buffer-size | 1MB | N | 写数据缓存 buffer 大小,单位字节。不建议修改,默认配置即可 |
Expand Down Expand Up @@ -582,7 +582,7 @@ insert into doris_sink select id,name,bank,age from cdc_mysql_source;
| --create-table-only | 是否只仅仅同步表的结构 |
:::info 备注
1. 同步时需要在 `$FLINK_HOME/lib` 目录下添加对应的 Flink CDC 依赖,比如 flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar ,flink-sql-connector-mongodb-cdc-${version}.jar
1. 同步时需要在 `$FLINK_HOME/lib` 目录下添加对应的 Flink CDC 依赖,比如 flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar,flink-sql-connector-mongodb-cdc-${version}.jar
2. Connector 24.0.0 之后依赖的 Flink CDC 版本需要在 3.1 以上,如果需使用 Flink CDC 同步 MySQL 和 Oracle,还需要在 `$FLINK_HOME/lib` 下增加相关的 JDBC 驱动。
:::
Expand Down Expand Up @@ -840,8 +840,7 @@ Exactly-Once 场景下,Flink Job 重启时必须从最新的 Checkpoint/Savepo
5. **errCode = 2, detailMessage = transaction [19650] not found**
发生在 Commit 阶段,checkpoint 里面记录的事务 ID,在 FE 侧已经过期,此时再次 commit 就会出现上述错误。
此时无法从 checkpoint 启动,后续可通过修改 fe.conf 的 streaming_label_keep_max_second 配置来延长过期时间,默认 12 小时。
发生在 Commit 阶段,checkpoint 里面记录的事务 ID,在 FE 侧已经过期,此时再次 commit 就会出现上述错误。此时无法从 checkpoint 启动,后续可通过修改 fe.conf 的 `streaming_label_keep_max_second` 配置来延长过期时间,默认 12 小时。Doris2.0 版本后还会受到 fe.conf 中 `label_num_threshold` 配置的限制 (默认 2000) ,可以调大或者改为 -1(-1 表示只受时间限制)。
6. **errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100**
Expand All @@ -859,7 +858,7 @@ Connector1.1.0 版本以前,是攒批写入的,写入均是由数据驱动
9. **tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235**
通常发生在 Connector1.1.0 之前,是由于写入频率过快,导致版本过多。可以通过设置 sink.batch.size 和 sink.batch.interval 参数来降低 Streamload 的频率。在Connector1.1.0之后,默认写入时机是由Checkpoint控制,可以通过增加Checkpoint间隔来降低写入频率
通常发生在 Connector1.1.0 之前,是由于写入频率过快,导致版本过多。可以通过设置 sink.batch.size 和 sink.batch.interval 参数来降低 Streamload 的频率。在 Connector1.1.0 之后,默认写入时机是由 Checkpoint 控制,可以通过增加 Checkpoint 间隔来降低写入频率
10. **Flink 导入有脏数据,如何跳过?**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ source.sinkTo(builder.build());
**CDC 数据流 (JsonDebeziumSchemaSerializer)**

:::info 备注
上游数据必须符合Debezium数据格式
上游数据必须符合 Debezium 数据格式
:::

```java
Expand Down Expand Up @@ -376,7 +376,7 @@ ON a.city = c.city
| Key | Default Value | Required | Comment |
| --------------------------- | ------------- | -------- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| sink.label-prefix | -- | Y | Stream load 导入使用的 label 前缀。2pc 场景下要求全局唯一,用来保证 Flink 的 EOS 语义。 |
| sink.properties.* | -- | N | Stream Load 的导入参数。<br />例如: 'sink.properties.column_separator' = ', ' 定义列分隔符, 'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,`\x01`会被转换为二进制的 0x01 。 <br /><br />JSON 格式导入<br />'sink.properties.format' = 'json' 'sink.properties.read_json_by_line' = 'true'<br />详细参数参考[这里](../data-operate/import/stream-load-manual.md)。<br /><br />Group Commit 模式 <br /> 例如:'sink.properties.group_commit' = 'sync_mode' 设置 group commit 为同步模式。flink connector 从 1.6.2 开始支持导入配置 group commit ,详细使用和限制参考 [group commit](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/group-commit-manual/)
| sink.properties.* | -- | N | Stream Load 的导入参数。<br />例如: 'sink.properties.column_separator' = ', ' 定义列分隔符, 'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,`\x01`会被转换为二进制的 0x01。 <br /><br />JSON 格式导入<br />'sink.properties.format' = 'json' 'sink.properties.read_json_by_line' = 'true'<br />详细参数参考[这里](../data-operate/import/stream-load-manual.md)。<br /><br />Group Commit 模式 <br /> 例如:'sink.properties.group_commit' = 'sync_mode' 设置 group commit 为同步模式。flink connector 从 1.6.2 开始支持导入配置 group commit,详细使用和限制参考 [group commit](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/group-commit-manual/)
| sink.enable-delete | TRUE | N | 是否启用删除。此选项需要 Doris 表开启批量删除功能 (Doris0.15+ 版本默认开启),只支持 Unique 模型。 |
| sink.enable-2pc | TRUE | N | 是否开启两阶段提交 (2pc),默认为 true,保证 Exactly-Once 语义。关于两阶段提交可参考[这里](../data-operate/import/stream-load-manual.md)|
| sink.buffer-size | 1MB | N | 写数据缓存 buffer 大小,单位字节。不建议修改,默认配置即可 |
Expand Down Expand Up @@ -582,7 +582,7 @@ insert into doris_sink select id,name,bank,age from cdc_mysql_source;
| --create-table-only | 是否只仅仅同步表的结构 |
:::info 备注
1. 同步时需要在 `$FLINK_HOME/lib` 目录下添加对应的 Flink CDC 依赖,比如 flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar ,flink-sql-connector-mongodb-cdc-${version}.jar
1. 同步时需要在 `$FLINK_HOME/lib` 目录下添加对应的 Flink CDC 依赖,比如 flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar,flink-sql-connector-mongodb-cdc-${version}.jar
2. Connector 24.0.0 之后依赖的 Flink CDC 版本需要在 3.1 以上,如果需使用 Flink CDC 同步 MySQL 和 Oracle,还需要在 `$FLINK_HOME/lib` 下增加相关的 JDBC 驱动。
:::
Expand Down Expand Up @@ -840,8 +840,7 @@ Exactly-Once 场景下,Flink Job 重启时必须从最新的 Checkpoint/Savepo
5. **errCode = 2, detailMessage = transaction [19650] not found**
发生在 Commit 阶段,checkpoint 里面记录的事务 ID,在 FE 侧已经过期,此时再次 commit 就会出现上述错误。
此时无法从 checkpoint 启动,后续可通过修改 fe.conf 的 streaming_label_keep_max_second 配置来延长过期时间,默认 12 小时。
发生在 Commit 阶段,checkpoint 里面记录的事务 ID,在 FE 侧已经过期,此时再次 commit 就会出现上述错误。此时无法从 checkpoint 启动,后续可通过修改 fe.conf 的 `streaming_label_keep_max_second` 配置来延长过期时间,默认 12 小时。Doris2.0 版本后还会受到 fe.conf 中 `label_num_threshold` 配置的限制 (默认 2000) ,可以调大或者改为 -1(-1 表示只受时间限制)。
6. **errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100**
Expand All @@ -859,7 +858,7 @@ Connector1.1.0 版本以前,是攒批写入的,写入均是由数据驱动
9. **tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235**
通常发生在 Connector1.1.0 之前,是由于写入频率过快,导致版本过多。可以通过设置 sink.batch.size 和 sink.batch.interval 参数来降低 Streamload 的频率。在Connector1.1.0之后,默认写入时机是由Checkpoint控制,可以通过增加Checkpoint间隔来降低写入频率
通常发生在 Connector1.1.0 之前,是由于写入频率过快,导致版本过多。可以通过设置 sink.batch.size 和 sink.batch.interval 参数来降低 Streamload 的频率。在 Connector1.1.0 之后,默认写入时机是由 Checkpoint 控制,可以通过增加 Checkpoint 间隔来降低写入频率
10. **Flink 导入有脏数据,如何跳过?**
Expand Down
Loading

0 comments on commit c79b7d7

Please sign in to comment.