是否可以直接进行mysql-flinkcdc-hive集成?[阿里云实时计算 Flink版]

hi大神们,参阅资料知道flinkcdc可以代替采集端的kafka以减少组件维护成本。于是我利用flink sql client尝试进行mysql-flinkcdc-hive离线数仓链路的直接集成。我使用的jar包放在lib目录下,分别为:flink-sql-connector-mysql-cdc-2.2.1.jar,flink-connector-hive_2.12-1.14.3.jar,flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar,hive-exec-3.1.2.jar。 在sql client中使用的命令如下: ./bin/sql-client.sh embedded -j lib/flink-sql-connector-mysql-cdc-2.2.1.jar lib/flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar

SET sql-client.execution.result-mode = tableau; SET ‘execution.target’ = ‘yarn-per-job’; SET execution.checkpointing.interval = 5s; SET execution.checkpointing.tolerable-failed-checkpoints=3;

CREATE TABLE mysql_test_a ( id int, data string, create_time timestamp, PRIMARY KEY (id) NOT ENFORCED ) with ( ‘connector’=’mysql-cdc’, ‘hostname’=’my_mysql_ip’, ‘port’=’3306’, ‘username’=’flink’, ‘password’ = ‘my_password’, ‘database-name’=’flink_cdc’, ‘table-name’=’test_a’ );

此时连接mysql可以通。接着创建hive catalog。

SET table.sql-dialect=hive; CREATE CATALOG hive_catalog WITH ( ‘type’ = ‘hive’, ‘hive-conf-dir’ = ‘/usr/local/hive/conf’ );

show catalogs; use catalog hive_catalog; show databases; use test; show tables;

create table hive_test_a( id bigint, data String, create_time Timestamp ) STORED AS parquet TBLPROPERTIES ( ‘sink.partition-commit.trigger’=’partition-time’, ‘sink.partition-commit.delay’=’0S’, ‘sink.partition-commit.policy.kind’=’metastore,success-file’, ‘auto-compaction’=’true’, ‘compaction.file-size’=’128MB’ );

SET table.sql-dialect=default; SET execution.type = batch;

insert into hive_catalog.test.hive_test_a select * from default_catalog.default_database.mysql_test_a;

此时失败:报错提示:

[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Table sink ‘hive_catalog.test.hive_test_a’ doesn’t support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, mysql_test_a]], fields=[id, data, create_time])

我在网上查询不到mysql-flinkcdc-hive的数据同步案例。有人说mysql-flinkcdc-hive必须经过两步:mysql-flinkcdc-kafka,kafka-flinkcdc-hive。(因为数据格式转换的问题)这样的两步法我确实能走通。但是不懂上述的一步法为什么不行,是天然不支持这么做,还是我哪里做得不对?

此外,我也注意到了之前类似的问题。

https://developer.aliyun.com/ask/442766?spm=a2c6h.14164896.0.0.46ca7c2cAa4OjY

https://developer.aliyun.com/ask/367408?spm=a2c6h.14164896.0.0.4d917c2cOsEKvd

我认为这个问题需要被解决,希望大神们知无不言言无不尽。

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
=====这是一个广告位,招租中,联系qq 78315851====