大佬们帮看下,Flink CDC中这里有啥问题吗?[阿里云实时计算 Flink版]

大佬们帮看下,Flink CDC中这里有啥问题吗?1.source表创建
create database kafka_database;

use kafka_database;

CREATE TABLE kafka_source1 (
test STRING
) WITH (
‘connector’ = ‘kafka’,
‘topic’ = ‘ProcessTest’,
‘scan.startup.mode’ = ‘earliest-offset’,
‘properties.group.id’ = ‘testGroup’,
‘properties.bootstrap.servers’ = ‘192.168.110.70:6667’,
‘format’ = ‘raw’
);

2.sink表创建通过catalog拉取hive元数据
create catalog myhive with(
‘type’ = ‘hive’,
‘default-database’ = ‘default’,
‘hive-conf-dir’ = ‘/usr/hdp/current/hive-client/conf’
);

use catalog myhive;

3.数据写入
INSERT INTO default.test_internal_location1

select * from kafka_database.kafka_source1 ;
其实场景很简单,就是用Flink从Kafka消费,写入HDFS,source表存在,已经建好了没问题。
sink端catalog创建同步也没问题,但是后面的数据写入过程一直找不到sink表

1.source表创建
create database kafka_database;

use kafka_database;

CREATE TABLE kafka_source1 (
test STRING
) WITH (
‘connector’ = ‘kafka’,
‘topic’ = ‘ProcessTest’,
‘scan.startup.mode’ = ‘earliest-offset’,
‘properties.group.id’ = ‘testGroup’,
‘properties.bootstrap.servers’ = ‘192.168.110.70:6667’,
‘format’ = ‘raw’
);

2.sink表创建通过catalog拉取hive元数据
create catalog myhive with(
‘type’ = ‘hive’,
‘default-database’ = ‘default’,
‘hive-conf-dir’ = ‘/usr/hdp/current/hive-client/conf’
);

use catalog myhive;

3.数据写入
INSERT INTO default.test_internal_location1

select * from kafka_database.kafka_source1 ;那我一直报找不到catalog里的表啊

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
=====这是一个广告位,招租中,联系qq 78315851====
2 条回复 A 作者 M 管理员
  1. 在 Flink CDC 中,创建数据库和表是非常常见的操作,但是在您提供的代码中,如果只是创建了数据库,并且没有创建表或者其他的操作,那么并没有明显的问题。不过,如果您想要在 Flink CDC 中使用该数据库,需要确保已经正确设置了数据库连接参数,并且可以成功连接到数据库。一般来说,您需要在 Flink CDC 的配置文件中配置数据库连接信息,例如:

    properties
    Copy

    设置数据源类型为 MySQL

    flink.sources.kafka_database.source.type=mysql-cdc

    设置 MySQL 数据库的连接信息

    flink.sources.kafka_database.source.url=jdbc:mysql://localhost:3306/kafka_database
    flink.sources.kafka_database.source.username=root
    flink.sources.kafka_database.source.password=123456
    在上述配置文件中,我们使用 mysql-cdc 数据源类型来指定使用 MySQL 数据库,并设置了 MySQL 数据库的连接信息,包括 URL、用户名和密码。同时,我们将数据源的名称设置为 kafka_database,以便在其他地方引用该数据源。

  2. 根据您提供的信息,有几个可能导致找不到 Catalog 中表的问题:

    1. Catalog 配置:请确保 myhive Catalog 的配置正确,并且指向了正确的 Hive 元数据存储位置。检查所使用的目录路径是否正确,以及是否具有适当的权限访问该目录。

    2. 数据库和表:检查在默认的数据库中是否存在名为 default.test_internal_location1 的表。如果没有,请创建该表,确保表的定义与您的数据写入语句中引用的表一致。

    3. 表引用:在插入数据的 SELECT 语句中,确保使用的是正确的表引用方式。根据您提供的代码,应该使用 kafka_database.kafka_source1 表的完整引用,而不仅仅是 kafka_source1。请确认 kafka_database 数据库中的表名是否正确。

    4. Flink 版本兼容性:确保您使用的 Flink 版本与所使用的 Catalog(Hive)版本兼容。某些 Flink 和 Catalog 版本组合可能会导致兼容性问题。

    5. 日志和错误消息:检查 Flink CDC 任务的日志和错误消息,查看是否有关于找不到表或 Catalog 的详细错误信息。这可能提供更多线索来解决问题。

    如果上述步骤仍然无法解决问题,请考虑以下操作:

    – 确认网络连接:确保 Flink CDC 任务能够连接到 Hive 和 Kafka 的相应服务。检查网络配置,确保能够访问到这些服务。

    – 验证功能性:尝试使用相同的配置和代码在其他环境中运行,以确定是否是特定于当前环境的问题。