Flink CDC全量没问题,增量报错 file is not a valid field name[阿里云实时计算 Flink版]

源端oracle 和目标mysql库都没有 file字段:

Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name

Flink 1.16.0
cdc 2.3.0 oracle 11g Mysql 5.7

具体日志:

2023-03-14 18:02:29,935 INFO org.apache.flink.runtime.jobmaster.JobMaster [] – 1 tasks will be restarted to recover the failed task 87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_176. 2023-03-14 18:02:29,935 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] – Job insert-into_default_catalog.default_database.M_MY_TABLE (9c57f249480d040657046b7ad13d34b5) switched from state RUNNING to RESTARTING. 2023-03-14 18:02:29,935 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] – Clearing resource requirements of job 9c57f249480d040657046b7ad13d34b5 2023-03-14 18:02:29,935 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] – Removing registered reader after failure for subtask 0 (#176) of source Source: O_TB_TEST[1]. 2023-03-14 18:02:30,936 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] – Job insert-into_default_catalog.default_database.M_MY_TABLE (9c57f249480d040657046b7ad13d34b5) switched from state RESTARTING to RUNNING. 2023-03-14 18:02:30,936 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] – Restoring job 9c57f249480d040657046b7ad13d34b5 from Checkpoint 97 @ 1678788146099 for 9c57f249480d040657046b7ad13d34b5 located at . 2023-03-14 18:02:30,936 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] – No master state to restore 2023-03-14 18:02:30,937 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] – Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177) switched from CREATED to SCHEDULED. 2023-03-14 18:02:30,937 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] – Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177) switched from SCHEDULED to DEPLOYING. 2023-03-14 18:02:30,937 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] – Deploying Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (attempt #177) with attempt id 87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177 and vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to localhost:37022-bfd87b @ localhost (dataPort=44033) with allocation id be462b7f02a3e0e4100f5cfe0d229b65 2023-03-14 18:02:30,937 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] – Received resource requirements from job 9c57f249480d040657046b7ad13d34b5: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}] 2023-03-14 18:02:30,937 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] – Recovering subtask 0 to checkpoint 97 for source Source: O_TB_TEST[1] to checkpoint. 2023-03-14 18:02:30,943 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] – Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177) switched from DEPLOYING to INITIALIZING. 2023-03-14 18:02:31,043 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] – Source Source: O_TB_TEST[1] registering reader for parallel task 0 (#177) @ localhost 2023-03-14 18:02:31,062 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] – Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177) switched from INITIALIZING to RUNNING. 2023-03-14 18:02:33,705 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] – Source: O_TB_TEST[1] -> DropUpdateBefore[2] -> SinkConversion[3] -> Sink: JdbcUpsertTableSink(ID, NAME) (1/1) (87bf3a73786c58f4d6d3a66b907ef6b5_cbc357ccb763df2852fee8c4fc7d55f2_0_177) switched from RUNNING to FAILED on localhost:37022-bfd87b @ localhost (dataPort=44033). java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.0.jar:1.16.0] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_262] Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) ~[flink-connector-files-1.16.0.jar:1.16.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_262] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_262] … 1 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:325) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:123) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask.execute(OracleStreamFetchTask.java:71) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:86) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_262] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_262] … 1 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getString(Struct.java:158) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeRecordValue(JdbcSourceEventDispatcher.java:184) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:214) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.OracleSchemaChangeEventEmitter.emitSchemaChangeEvent(OracleSchemaChangeEventEmitter.java:113) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:143) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:60) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.dispatchSchemaChangeEventAndGetTableForNewCapturedTable(LogMinerQueryResultProcessor.java:336) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.getTableForDmlEvent(LogMinerQueryResultProcessor.java:323) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor.processResult(LogMinerQueryResultProcessor.java:257) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:280) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:123) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask.execute(OracleStreamFetchTask.java:71) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:86) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_262] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_262] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_262] … 1 more

Flink SQL> SET execution.checkpointing.interval = 10s;

Flink SQL> CREATE TABLE O_TB_TEST ( ID STRING, NAME STRING, PRIMARY KEY (ID) NOT ENFORCED ) WITH ( ‘connector’ = ‘oracle-cdc’, ‘hostname’ = ‘ip’, ‘port’ = ‘1521’, ‘username’ = ‘username’, ‘password’ = ‘pswd’, ‘database-name’ = ‘dbname’, ‘schema-name’ = ‘schemaname’, ‘table-name’ = ‘TB_TEST’, ‘scan.startup.mode’ = ‘initial’, ‘debezium.snapshot.mode’ = ‘schema_only’, ‘debezium.log.mining.strategy’ = ‘online_catalog’, ‘debezium.log.mining.continuous.mine’ = ‘true’, ‘debezium.database.connection.adapter’ = ‘logminer’ );

Flink SQL>
CREATE TABLE M_MY_TABLE (
ID STRING, NAME STRING, PRIMARY KEY (ID) NOT ENFORCED
) WITH (
‘connector.type’ = ‘jdbc’, ‘connector.driver’ = ‘com.mysql.jdbc.Driver’, ‘connector.url’ = ‘jdbc:mysql://ip/RSTEST’,
‘connector.table’ = ‘MY_TABLE’,
‘connector.username’ = ‘username’,
‘connector.password’ = ‘pswd’
);

Flink SQL> INSERT INTO M_MY_TABLE(ID, NAME) SELECT ID,NAME FROM O_TB_TEST;

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
=====这是一个广告位,招租中,联系qq 78315851====
5 条回复 A 作者 M 管理员
  1. 某个字段名有问题,检查一下。可以尝试通过修改 Flink CDC Connector 的配置文件,将 “file” 列映射到一个有效的 MySQL 列名。

  2. 这个错误通常表示增量数据源中的某个字段名不正确。请检查以下几点:

    1. 确认你的增量数据源中是否存在一个名为“file”的字段,并且该字段类型为字符串类型。
    2. 确认你的 Flink CDC 配置文件中是否正确指定了增量数据源的路径和文件格式。例如,如果你使用 Kafka 作为增量数据源,则需要在配置文件中指定 Kafka 主题和分区等信息。
    3. 确认你的 Flink CDC 配置文件中是否正确指定了全量数据源的路径和文件格式。如果全量数据源使用的是不同的文件格式或路径,则需要在增量部分中手动指定对应的文件格式和路径。
    4. 如果以上步骤都没有解决问题,可以尝试在 Flink CDC 中启用调试模式,查看具体的错误信息和堆栈跟踪,以便更好地定位问题所在。
  3. 楼主解决这个问题了么,同样也遇到这个问题, oracle 全量没问题,增量写入的时候,报同样的错误。 flink 1.14 、cdc 2.3.0 、oracle 11g

  4. 这个报错信息看起来像是 Flink CDC Connector 在将 Oracle 数据库中的数据同步到 MySQL 数据库时出现了问题,具体原因可能是 Oracle 数据库中存在一个名为 “file” 的列,而这个列名在 MySQL 中不是一个有效的列名。

    你可以尝试通过修改 Flink CDC Connector 的配置文件,将 “file” 列映射到一个有效的 MySQL 列名,例如使用 “file_name” 或者 “file_path”代替 “file”,这样就可以避免这个问题了。

  5. 从Oracle 同步数据到Mysql 启动作业数据全量同步成功,后续增删改出现上述错误。