flink+hologres实时数仓 宽表merge问题[阿里云]

双流数据有些没有merge上,不确定是否是回撤原因引起,在建结果表的时候不能设置ignoredelete属性,出现”The provided value for option ignoredelete is true, which is not as same as the value false inferred by catalog. You can use the inferred value by removing the specified option from the DDL”
后面就换成创建临时表,设置该属性,但是还是出现没有merge的流,双流都建了同样的主键

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
=====这是一个广告位,招租中,联系qq 78315851====
2 条回复 A 作者 M 管理员
  1. Flink 和 Hologres 结合构建实时数仓时,如果在宽表合并(merge)过程中遇到双流数据没有完全 merge 的问题,可能是由于多种原因导致的。以下是一些可能的原因和相应的解决方案:

    1. 事件时间不一致

      • 确保两个流中的事件时间戳是准确且一致的。使用 Flink 的 Watermark 机制来处理乱序事件。
    2. 主键冲突

      • 检查是否有多个流具有相同的主键值。如果有,考虑使用其他方法来区分不同的记录,如添加额外的唯一标识符或修改主键策略。
    3. 回撤数据处理

      • 如果你的系统支持回撤数据,确保你已经正确地处理了这些数据。可以尝试在 Flink 中启用 sideOutputLateData 或者在 Hologres 中设置合适的 TTL 来处理过期数据。
    4. 并发控制

      • 在写入 Hologres 时,确保并发控制设置正确。你可以尝试降低并行度或者增加 Hologres 表的分区数量来提高写入性能。
    5. 检查点一致性

      • 确保 Flink 的检查点间隔和保留时间足够大,以避免丢失数据。
    6. 临时表与结果表的问题

      • 如果你在创建临时表时设置了 ignoredelete 属性,但在将数据写入结果表时未设置,可能会导致数据丢失。尝试将临时表的数据合并到结果表中,并确保在合并期间保持忽略删除属性。
    7. 网络延迟和连接问题

      • 检查 Flink 与 Hologres 之间的网络连接是否稳定,以及是否存在任何网络延迟或超时问题。
    8. Hologres 版本兼容性

      • 确保使用的 Hologres 版本与 Flink 集成插件兼容。如果不兼容,可能会导致数据无法正确写入。
  2. Link+Hologres实时数仓在宽表merge时遇到的问题,根据你的描述,可能是由于数据流的回撤或主键设置不正确导致的。

    1. 数据流的回撤:如果你的数据流中存在回撤的情况,即某些数据先被插入然后又被删除,这可能会导致merge的结果不准确。你可以检查你的数据源是否有这种情况,并考虑如何处理数据流中的回撤。
    2. 主键设置:你提到双流都建了同样的主键,但在merge时还是出现了问题。这可能是因为你的主键设置存在问题,或者你的数据流中有重复的数据。你可以检查你的主键设置是否正确,以及你的数据流中是否存在重复的数据。

    对于”The provided value for option ignoredelete is true, which is not as same as the value false inferred by catalog. You can use the inferred value by removing the specified option from the DDL“这个错误,这是因为你在创建结果表的时候设置了ignoredelete属性为true,而catalog推断出的值为false。你可以尝试移除DDL中的ignoredelete选项,使用catalog推断出的值。

    另外,你也可以考虑使用Hologres的CDC(Change Data Capture)功能来处理数据流中的增删改操作,这样可以更准确地处理数据流的变化。

  3. 这个问题可能是由于在创建结果表时,没有正确设置ignoredelete属性导致的。你可以尝试以下方法解决这个问题:

    1. 首先,确保你的双流数据中都包含了主键字段。
    2. 在创建结果表时,使用IGNOREDELETE关键字来设置该属性。例如:
    CREATE TABLE result_table (    id INT PRIMARY KEY,    data1 VARCHAR(255),    data2 VARCHAR(255)) IGNOREDELETE;
    1. 如果问题仍然存在,你可以尝试创建一个临时表,并设置ignoredelete属性。例如:
    CREATE TEMPORARY TABLE temp_result_table (    id INT PRIMARY KEY,    data1 VARCHAR(255),    data2 VARCHAR(255)) IGNOREDELETE;
    1. 将双流数据合并到临时表中,然后从临时表中查询结果。例如:
    INSERT INTO temp_result_table (id, data1, data2)SELECT id, data1, data2 FROM stream1UNION ALLSELECT id, data1, data2 FROM stream2;
    1. 最后,从临时表中查询你需要的结果。