hi大神们,参阅资料知道flinkcdc可以代替采集端的kafka以减少组件维护成本。于是我利用flink sql client尝试进行mysql-flinkcdc-hive离线数仓链路的直接集成。我使用的jar包放在lib目录下,分别为:flink-sql-connector-mysql-cdc-2.2.1.jar,flink-connector-hive_2.12-1.14.3.jar,flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar,hive-exec-3.1.2.jar。 在sql client中使用的命令如下: ./bin/sql-client.sh embedded -j lib/flink-sql-connector-mysql-cdc-2.2.1.jar lib/flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar
SET sql-client.execution.result-mode = tableau; SET ‘execution.target’ = ‘yarn-per-job’; SET execution.checkpointing.interval = 5s; SET execution.checkpointing.tolerable-failed-checkpoints=3;
CREATE TABLE mysql_test_a ( id
int, data
string, create_time
timestamp, PRIMARY KEY (id
) NOT ENFORCED ) with ( ‘connector’=’mysql-cdc’, ‘hostname’=’my_mysql_ip’, ‘port’=’3306’, ‘username’=’flink’, ‘password’ = ‘my_password’, ‘database-name’=’flink_cdc’, ‘table-name’=’test_a’ );
此时连接mysql可以通。接着创建hive catalog。
SET table.sql-dialect=hive; CREATE CATALOG hive_catalog WITH ( ‘type’ = ‘hive’, ‘hive-conf-dir’ = ‘/usr/local/hive/conf’ );
show catalogs; use catalog hive_catalog; show databases; use test; show tables;
create table hive_test_a( id bigint, data String, create_time Timestamp ) STORED AS parquet TBLPROPERTIES ( ‘sink.partition-commit.trigger’=’partition-time’, ‘sink.partition-commit.delay’=’0S’, ‘sink.partition-commit.policy.kind’=’metastore,success-file’, ‘auto-compaction’=’true’, ‘compaction.file-size’=’128MB’ );
SET table.sql-dialect=default; SET execution.type = batch;
insert into hive_catalog.test.hive_test_a select * from default_catalog.default_database.mysql_test_a;
此时失败:报错提示:
[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Table sink ‘hive_catalog.test.hive_test_a’ doesn’t support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, mysql_test_a]], fields=[id, data, create_time])
我在网上查询不到mysql-flinkcdc-hive的数据同步案例。有人说mysql-flinkcdc-hive必须经过两步:mysql-flinkcdc-kafka,kafka-flinkcdc-hive。(因为数据格式转换的问题)这样的两步法我确实能走通。但是不懂上述的一步法为什么不行,是天然不支持这么做,还是我哪里做得不对?
此外,我也注意到了之前类似的问题。
https://developer.aliyun.com/ask/442766?spm=a2c6h.14164896.0.0.46ca7c2cAa4OjY
https://developer.aliyun.com/ask/367408?spm=a2c6h.14164896.0.0.4d917c2cOsEKvd
我认为这个问题需要被解决,希望大神们知无不言言无不尽。