Flink读取mysql cdc有办法过滤掉 update_before类型的数据嘛?[阿里云polardb]

Flink读取mysql cdc有办法过滤掉 update_before类型的数据嘛?

以下为热心网友提供的参考意见

在Apache Flink中,当读取MySQL CDC时,可以通过配置选项或编程方式来过滤掉update_before类型的数据。

  1. 使用Flink SQL
    如果您正在使用Flink SQL,可以在创建表源时指定只处理特定的变更事件类型。例如:

    CREATE TABLE my_table (
      ...
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = '',
      'port' = '',
      'username' = '',
      'password' = '',
      'database-name' = '',
      'table-name' = '','server-id'='',-- 只处理INSERT和UPDATE_AFTER类型的事件'include-events'='insert,update_after');
  2. 编程方式
    如果您正在使用Java/Scala API进行编程,可以自定义一个ChangelogMode,并将其传递给TableSource的构造函数。例如:

    ChangelogMode changelogMode = ChangelogMode.newBuilder()
        .addContainedKind(Kind.INSERT)
        .addContainedKind(Kind.UPDATE_AFTER)
        .build();
    
    TableSource<?> source = ...;
    source.configure(
        new Configuration(),
        new HashMap<>(),
        changelogMode);
    
  3. 以下为热心网友提供的参考意见

    没有办法。此回答整理自钉群“实时计算Flink产品交流群”

    「点点赞赏,手留余香」

      还没有人赞赏,快来当第一个赞赏的人吧!
    =====这是一个广告位,招租中,联系qq 78315851====
    2025 年 1 月
     12345
    6789101112
    13141516171819
    20212223242526
    2728293031  

合作伙伴