tongchenkeji 发表于:2023-4-6 12:13:020次点击 已关注取消关注 关注 私信 请问有大佬用过mysqlcdc的这个createReader 吗?[阿里云消息队列MQ] 暂停朗读为您朗读 请问有大佬用过mysqlcdc的这个createReader 吗? 「点点赞赏,手留余香」 赞赏 还没有人赞赏,快来当第一个赞赏的人吧! 海报 消息队列 MQ# 云数据库 RDS MySQL 版1517
wljslmzAM 2023-11-28 0:12:51 1 MySQLCDC 是阿里云消息队列(MQ)提供的一种数据同步工具,它可以实时捕获 MySQL 数据库的变更,并将变更数据同步到消息队列中。在使用 MySQLCDC 进行数据同步时,确实需要使用到 createReader 函数。 createReader 函数是 MySQLCDC 中的一个重要函数,用于创建一个基于 Canal 协议的数据读取器,该读取器可以对 MySQL 数据库进行实时的变更数据捕获和推送。
穿过生命散发芬芳AM 2023-11-28 0:12:51 2 /*构建sourceReader */ @Override public SourceReader createReader(SourceReaderContext readerContext) throws Exception { // 前面提到了,根据subtask索引创建对应的config MySqlSourceConfig sourceConfig = configFactory.createConfig(readerContext.getIndexOfSubtask()); // 一个阻塞队列,多线程交互用的,不必深入 FutureCompletingBlockingQueue> elementsQueue = new FutureCompletingBlockingQueue<>(); // metric相关 final MySqlSourceReaderMetrics sourceReaderMetrics = new MySqlSourceReaderMetrics(readerContext.metricGroup()); sourceReaderMetrics.registerMetrics(); // 通过supplier函数构建一个SplitReader,解耦的作用,主要看里面的MySqlSplitReader实现即可 Supplier splitReaderSupplier = // 拿到每个reader的config和对应的subtask index () -> new MySqlSplitReader(sourceConfig, readerContext.getIndexOfSubtask()); // 构建了一个具体的sourceReader return new MySqlSourceReader<>( elementsQueue, splitReaderSupplier, new MySqlRecordEmitter<>( deserializationSchema, sourceReaderMetrics, sourceConfig.isIncludeSchemaChanges()), readerContext.getConfiguration(), readerContext, sourceConfig); } @Override
MySQLCDC 是阿里云消息队列(MQ)提供的一种数据同步工具,它可以实时捕获 MySQL 数据库的变更,并将变更数据同步到消息队列中。在使用 MySQLCDC 进行数据同步时,确实需要使用到 createReader 函数。
createReader 函数是 MySQLCDC 中的一个重要函数,用于创建一个基于 Canal 协议的数据读取器,该读取器可以对 MySQL 数据库进行实时的变更数据捕获和推送。