Flink cdc oracle 2.11
Flink 1.13.6
“` SourceFunction sourceFunction = OracleSource.builder()
.hostname(“xx.x.xx.x”)
.port(1531)
.database(“TEST”) // monitor XE database
.schemaList(“APPS”) // monitor inventory schema
.tableList(“APPS.student”) // monitor products table
.username(“apps”)
.password(“apps”)
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.startupOptions(StartupOptions.latest())
.debeziumProperties(properties)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
// set the source parallelism to 4
env
.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print Oracle Snapshot + RedoLog");
“`
本地测试idea启动后,下图
不知道是什么原因?
以下为热心网友提供的参考意见
如果你的Flink CDC Oracle程序启动后无反应,以下是一些可能的排查步骤:
-
检查Oracle数据库连接:确保提供的hostname、port、database、username和password是正确的,并且可以从运行Flink作业的环境访问到Oracle数据库。
-
检查Debezium连接器配置:确认你的DebeziumProperties配置是否正确,包括数据库的历史数据捕获选项(如snapshot.mode)和其他必要的连接器属性。
-
检查网络连接和防火墙设置:确保运行Flink作业的环境和Oracle数据库之间的网络连接是畅通的,没有被防火墙阻止。
-
查看Flink日志:检查Flink作业的日志文件,看看是否有任何错误或警告信息。这可能会提供一些关于问题的线索。
-
测试Debezium连接器:尝试使用Debezium提供的示例代码或者独立的Debezium工具(如Kafka Connect)来测试Oracle连接器,以确认连接器本身是否能够正常工作。
-
调整Flink作业配置:尝试调整Flink作业的并行度、checkpoint间隔等参数,看看是否会影响到CDC数据的获取。
-
确认Oracle数据库的日志模式和CDC设置:确保你要监控的Oracle数据库启用了适当的日志模式(如ARCHIVELOG模式),并且已经为CDC配置了必要的触发器和补丁。