flink mongodb cdc 2.4是不是有bug呀?这里也没有这个类呀?[阿里云实时计算 Flink版]

问题1:flink mongodb cdc 2.4是不是有bug呀?这里也没有这个类呀?
源码上面也是这样呀

是我打开方式不对吗
问题2:这个问题是因为我导的包有问题吗?flink的版本是1.6

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
=====这是一个广告位,招租中,联系qq 78315851====
2 条回复 A 作者 M 管理员
  1. 在Flink CDC 2.4.0版本中,并没有提供针对MongoDB的CDC模块。因此,如果您需要实现MongoDB的增量数据抓取和数据同步等功能,需要自己编写相关代码,并集成到Flink任务中。

    通常情况下,实现MongoDB的CDC功能需要使用MongoDB的oplog,即操作日志。oplog记录了MongoDB中所有的数据变更操作,包括插入、更新和删除等。通过解析oplog,可以实现增量数据抓取和数据同步等功能。

    下面是一个示例代码,演示如何使用MongoDB的oplog实现增量数据抓取和数据同步:

    java
    Copy
    MongoClient mongoClient = new MongoClient(“localhost”, 27017);
    MongoDatabase db = mongoClient.getDatabase(“mydb”);
    MongoCollection collection = db.getCollection(“mycollection”);

    // 获取oplog集合
    MongoCollection oplog = db.getCollection(“oplog.rs”);

    // 构造查询条件,过滤出指定集合的操作日志
    Bson filter = Filters.and(
    Filters.eq(“ns”, “mydb.mycollection”),
    Filters.exists(“o”)
    );

    // 获取操作日志的游标
    FindIterable cursor = oplog.find(filter)
    .sort(new Document(“$natural”, 1))
    .cursorType(CursorType.TailableAwait);

    while (true) {
    // 获取下一个操作日志
    Document oplogEntry = cursor.tryNext();

    if (oplogEntry != null) {    // 解析操作日志,获取变更的数据    Document document = (Document) oplogEntry.get("o");    // TODO: 将变更的数据转换为Flink的数据流,并进行处理} else {    // 如果没有新的操作日志,则休眠一段时间    Thread.sleep(100);}

    }
    在上述示例中,我们首先获取MongoDB的oplog集合,并通过查询条件过滤出指定集合的操作日志。然后,我们使用tailable cursor的方式获取操作日志的游标,并不断读取下一个操作日志。在每次读取操作日志时,我们解析操作日志,获取变更的数据,并将其转换为Flink的数据流,以供下游任务使用。

  2. 在 Flink CDC 中,针对 MongoDB 的 CDC 连接器确实存在一些版本和包名的变化。根据您提供的信息,可以回答如下:

    1. 问题1:Flink MongoDB CDC 2.4 版本是否有 bug?    根据您提供的截图,显示无法找到 MongoDbSource 类,这可能是由于包名或类路径的变化导致的。但从截图上看不出具体错误信息。建议您检查以下几点:    – 确认您使用的是正确的 Flink MongoDB CDC 版本,并根据该版本的文档和示例进行操作。    – 检查引入的包和依赖项是否正确,并确保版本兼容性。    – 查看类路径和包名是否与您当前的项目结构匹配。

    2. 问题2:导入的包是否存在问题?    从您提供的截图来看,显示导入的包为 flink-connector-mongodb,而您使用的 Flink 版本为 1.6。请注意,Flink 1.6 版本的 MongoDB Connector 包名为 flink-connector-mongodb_2.11(对应 Scala 2.11)。因此,请确保您导入的包名与您正在使用的 Flink 版本相匹配。

    如果您仍然遇到问题,建议您提供更多详细信息,例如完整的错误日志、代码示例以及相关的依赖项和版本等。这将有助于更准确地定位和解决问题。

    另外,对于 MongoDB CDC 的使用,建议查阅相关文档和示例,并参考 Flink 社区或相关技术论坛上的讨论,以获取更多关于特定版本和问题的信息和解决方案。

  3. 回答1:这个类不是不是cdc包下的啊。包名都不一样的,是flink-connector-base包下的,
    回答2:,此回答整理自钉群“Flink CDC 社区”