tongchenkeji 发表于:2023-2-7 14:13:040次点击 已关注取消关注 关注 私信 Flink有没有开源的ETL工具,可以实现读取kafka,写入到clickhouse表?[阿里云实时计算 Flink版] 暂停朗读为您朗读 Flink有没有开源的ETL工具,可以实现读取kafka,写入到clickhouse表,对字段进行过滤重命名等操作? 「点点赞赏,手留余香」 赞赏 还没有人赞赏,快来当第一个赞赏的人吧! 海报 实时计算Flink版# Kafka333# 云数据库 ClickHouse98# 云消息队列 Kafka 版375# 实时计算 Flink版3179# 流计算2236# 消息中间件1371
穿过生命散发芬芳AM 2023-11-27 18:10:54 2 读取kafka数据并且经过ETL后,通过JDBC存入clickhouse中。 – 定义POJO类: public class Student { private int id; private String name; private String password; private int age; private String date; //构造,setter 和 getter 省略} 完整代码: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//###############定义消费kafka source##############Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("zookeeper.connect", "localhost:2181");props.put("group.id", "metric-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");tableEnv.connect(new Kafka().version("0.10") .topic("student").properties(props).startFromLatest()) .withFormat(new Json().deriveSchema()) .withSchema(new Schema().field("id", Types.INT()) .field("name", Types.STRING()) .field("password", Types.STRING()) .field("age", Types.INT()) .field("date", Types.STRING())) .inAppendMode() .registerTableSource("kafkaTable");Table result = tableEnv.sqlQuery("SELECT * FROM " + "kafkaTable");//###############定义clickhouse JDBC sink##############String targetTable = "clickhouse";TypeInformation[] fieldTypes = {BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};TableSink jdbcSink = JDBCAppendTableSink.builder() .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") .setDBUrl("jdbc:clickhouse://localhost:8123") .setQuery("insert into student_local(id, name, password, age, date) values(?, ?, ?, ?, ?)") .setParameterTypes(fieldTypes) .setBatchSize(15) .build();tableEnv.registerTableSink(targetTable,new String[]{"id","name", "password", "age", "date"}, new TypeInformation[]{Types.INT(), Types.STRING(), Types.STRING(), Types.INT(), Types.STRING()}, jdbcSink);result.insertInto(targetTable);env.execute("Flink add sink"); POM: org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} ru.yandex.clickhouse clickhouse-jdbc 0.2 org.apache.httpcomponents httpcore 4.4.4 com.google.guava guava 19.0 org.apache.flink flink-jdbc_${scala.binary.version} ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.flink flink-table-api-java-bridge_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kafka-0.10_${scala.binary.version} ${flink.version} org.apache.flink flink-table-planner_${scala.binary.version} ${flink.version}
Flink提供了一个开源的ETL工具,可以实现从Kafka读取数据,然后写入到ClickHouse表中。
读取kafka数据并且经过ETL后,通过JDBC存入clickhouse中。 – 定义POJO类: