大佬们帮看下,Flink CDC中这里有啥问题吗?1.source表创建
create database kafka_database;
use kafka_database;
CREATE TABLE kafka_source1 (
test STRING
) WITH (
‘connector’ = ‘kafka’,
‘topic’ = ‘ProcessTest’,
‘scan.startup.mode’ = ‘earliest-offset’,
‘properties.group.id’ = ‘testGroup’,
‘properties.bootstrap.servers’ = ‘192.168.110.70:6667’,
‘format’ = ‘raw’
);
2.sink表创建通过catalog拉取hive元数据
create catalog myhive with(
‘type’ = ‘hive’,
‘default-database’ = ‘default’,
‘hive-conf-dir’ = ‘/usr/hdp/current/hive-client/conf’
);
use catalog myhive;
3.数据写入
INSERT INTO default.test_internal_location1
select * from kafka_database.kafka_source1 ;
其实场景很简单,就是用Flink从Kafka消费,写入HDFS,source表存在,已经建好了没问题。
sink端catalog创建同步也没问题,但是后面的数据写入过程一直找不到sink表
1.source表创建
create database kafka_database;
use kafka_database;
CREATE TABLE kafka_source1 (
test STRING
) WITH (
‘connector’ = ‘kafka’,
‘topic’ = ‘ProcessTest’,
‘scan.startup.mode’ = ‘earliest-offset’,
‘properties.group.id’ = ‘testGroup’,
‘properties.bootstrap.servers’ = ‘192.168.110.70:6667’,
‘format’ = ‘raw’
);
2.sink表创建通过catalog拉取hive元数据
create catalog myhive with(
‘type’ = ‘hive’,
‘default-database’ = ‘default’,
‘hive-conf-dir’ = ‘/usr/hdp/current/hive-client/conf’
);
use catalog myhive;
3.数据写入
INSERT INTO default.test_internal_location1
select * from kafka_database.kafka_source1 ;那我一直报找不到catalog里的表啊
在 Flink CDC 中,创建数据库和表是非常常见的操作,但是在您提供的代码中,如果只是创建了数据库,并且没有创建表或者其他的操作,那么并没有明显的问题。不过,如果您想要在 Flink CDC 中使用该数据库,需要确保已经正确设置了数据库连接参数,并且可以成功连接到数据库。一般来说,您需要在 Flink CDC 的配置文件中配置数据库连接信息,例如:
properties
Copy
设置数据源类型为 MySQL
flink.sources.kafka_database.source.type=mysql-cdc
设置 MySQL 数据库的连接信息
flink.sources.kafka_database.source.url=jdbc:mysql://localhost:3306/kafka_database
flink.sources.kafka_database.source.username=root
flink.sources.kafka_database.source.password=123456
在上述配置文件中,我们使用 mysql-cdc 数据源类型来指定使用 MySQL 数据库,并设置了 MySQL 数据库的连接信息,包括 URL、用户名和密码。同时,我们将数据源的名称设置为 kafka_database,以便在其他地方引用该数据源。
根据您提供的信息,有几个可能导致找不到 Catalog 中表的问题:
1. Catalog 配置:请确保
myhive
Catalog 的配置正确,并且指向了正确的 Hive 元数据存储位置。检查所使用的目录路径是否正确,以及是否具有适当的权限访问该目录。2. 数据库和表:检查在默认的数据库中是否存在名为
default.test_internal_location1
的表。如果没有,请创建该表,确保表的定义与您的数据写入语句中引用的表一致。3. 表引用:在插入数据的 SELECT 语句中,确保使用的是正确的表引用方式。根据您提供的代码,应该使用
kafka_database.kafka_source1
表的完整引用,而不仅仅是kafka_source1
。请确认kafka_database
数据库中的表名是否正确。4. Flink 版本兼容性:确保您使用的 Flink 版本与所使用的 Catalog(Hive)版本兼容。某些 Flink 和 Catalog 版本组合可能会导致兼容性问题。
5. 日志和错误消息:检查 Flink CDC 任务的日志和错误消息,查看是否有关于找不到表或 Catalog 的详细错误信息。这可能提供更多线索来解决问题。
如果上述步骤仍然无法解决问题,请考虑以下操作:
– 确认网络连接:确保 Flink CDC 任务能够连接到 Hive 和 Kafka 的相应服务。检查网络配置,确保能够访问到这些服务。
– 验证功能性:尝试使用相同的配置和代码在其他环境中运行,以确定是否是特定于当前环境的问题。