Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [cdc tidb] where source have update operator,the flink have Conversion expects insert-only records but DataStream API record contains: UPDATE_AFTER #8447

Open
3 tasks done
WenDing-Y opened this issue Jan 3, 2025 · 0 comments
Labels

Comments

@WenDing-Y
Copy link

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {

This is a example source plugin only for test and demonstrate the feature source plugin

TiDB-CDC {
result_table_name = "products_tidb_cdc"
base-url = "jdbc:mysql://tidb0:4000/inventory"
driver = "com.mysql.cj.jdbc.Driver"
tikv.grpc.timeout_in_ms = 20000
pd-addresses = "pd0:2379"
username = "root"
password = ""
database-name = "inventory"
table-name = "products"
}
}

transform {
}

sink {
jdbc {
source_table_name = "products_tidb_cdc"
url = "jdbc:mysql://tidb0:4000/inventory"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = ""
database = "inventory"
table = "products_sink"
generate_sink_sql = true
primary_keys = ["id"]
}
}

SeaTunnel Version

2.3.8

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  TiDB-CDC {
    result_table_name = "products_tidb_cdc"
    base-url = "jdbc:mysql://tidb0:4000/inventory"
    driver = "com.mysql.cj.jdbc.Driver"
    tikv.grpc.timeout_in_ms = 20000
    pd-addresses = "pd0:2379"
    username = "root"
    password = ""
    database-name = "inventory"
    table-name = "products"
  }
}

transform {
}

sink {
  jdbc {
    source_table_name = "products_tidb_cdc"
    url = "jdbc:mysql://tidb0:4000/inventory"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = ""
    database = "inventory"
    table = "products_sink"
    generate_sink_sql = true
    primary_keys = ["id"]
  }
}

Running Command

none

Error Exception

at org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer.SeaTunnelRowStreamingRecordDeserializer.deserialize(SeaTunnelRowStreamingRecordDeserializer.java:76)
	at org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader.TiDBSourceReader.captureStreamingEvents(TiDBSourceReader.java:188)
	at org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.reader.TiDBSourceReader.pollNext(TiDBSourceReader.java:135)
	at org.apache.seatunnel.translation.flink.source.FlinkSourceReader.pollNext(FlinkSourceReader.java:81)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:389)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:545)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:785)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.seatunnel.translation.flink.source.FlinkRowCollector.collect(FlinkRowCollector.java:72)
	at org.apache.seatunnel.translation.flink.source.FlinkRowCollector.collect(FlinkRowCollector.java:40)
	at org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer.SeaTunnelRowStreamingRecordDeserializer.deserialize(SeaTunnelRowStreamingRecordDeserializer.java:71)
	... 15 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)

Zeta or Flink or Spark Version

1.16

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@WenDing-Y WenDing-Y added the bug label Jan 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant