Flink读取mysql cdc有办法过滤掉 update_before类型的数据嘛?
以下为热心网友提供的参考意见
在Apache Flink中,当读取MySQL CDC时,可以通过配置选项或编程方式来过滤掉update_before
类型的数据。
-
使用Flink SQL:
如果您正在使用Flink SQL,可以在创建表源时指定只处理特定的变更事件类型。例如:CREATE TABLE my_table ( ... ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '', 'port' = '', 'username' = '', 'password' = '', 'database-name' = '', 'table-name' = '
', 'server-id' = '', -- 只处理INSERT和UPDATE_AFTER类型的事件 'include-events' = 'insert,update_after' );
编程方式:
如果您正在使用Java/Scala API进行编程,可以自定义一个ChangelogMode
,并将其传递给TableSource
的构造函数。例如:ChangelogMode changelogMode = ChangelogMode.newBuilder() .addContainedKind(Kind.INSERT) .addContainedKind(Kind.UPDATE_AFTER) .build(); TableSource<?> source = ...; source.configure( new Configuration(), new HashMap<>(), changelogMode);
以下为热心网友提供的参考意见
没有办法。此回答整理自钉群“实时计算Flink产品交流群”
「点点赞赏,手留余香」还没有人赞赏,快来当第一个赞赏的人吧!=====这是一个广告位,招租中,联系qq 78315851====