Flink有没有开源的ETL工具,可以实现读取kafka,写入到clickhouse表?[阿里云实时计算 Flink版]

Flink有没有开源的ETL工具,可以实现读取kafka,写入到clickhouse表,对字段进行过滤重命名等操作?

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
=====这是一个广告位,招租中,联系qq 78315851====
1 条回复 A 作者 M 管理员
  1. Flink提供了一个开源的ETL工具,可以实现从Kafka读取数据,然后写入到ClickHouse表中。

  2. 读取kafka数据并且经过ETL后,通过JDBC存入clickhouse中。 – 定义POJO类:

    public class Student {    private int id;    private String name;    private String password;    private int age;    private String date;    //构造,setter 和 getter 省略}

    • 完整代码:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//###############定义消费kafka source##############Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("zookeeper.connect", "localhost:2181");props.put("group.id", "metric-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");tableEnv.connect(new Kafka().version("0.10")        .topic("student").properties(props).startFromLatest())        .withFormat(new Json().deriveSchema())        .withSchema(new Schema().field("id", Types.INT())                                .field("name", Types.STRING())                                .field("password", Types.STRING())                                .field("age", Types.INT())                                .field("date", Types.STRING()))        .inAppendMode()        .registerTableSource("kafkaTable");Table result = tableEnv.sqlQuery("SELECT * FROM " +  "kafkaTable");//###############定义clickhouse JDBC sink##############String targetTable = "clickhouse";TypeInformation[] fieldTypes = {BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};TableSink jdbcSink =  JDBCAppendTableSink.builder()                      .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")                      .setDBUrl("jdbc:clickhouse://localhost:8123")                      .setQuery("insert into student_local(id, name, password, age, date) values(?, ?, ?, ?, ?)")                      .setParameterTypes(fieldTypes)                      .setBatchSize(15)                      .build();tableEnv.registerTableSink(targetTable,new String[]{"id","name", "password", "age", "date"}, new TypeInformation[]{Types.INT(), Types.STRING(), Types.STRING(), Types.INT(), Types.STRING()}, jdbcSink);result.insertInto(targetTable);env.execute("Flink add sink");

    • POM:

         org.apache.flink     flink-java     ${flink.version}            org.apache.flink     flink-streaming-java_${scala.binary.version}     ${flink.version}           org.apache.flink     flink-streaming-scala_${scala.binary.version}     ${flink.version}                      ru.yandex.clickhouse     clickhouse-jdbc     0.2       org.apache.httpcomponents     httpcore     4.4.4       com.google.guava     guava     19.0       org.apache.flink     flink-jdbc_${scala.binary.version}     ${flink.version}       org.apache.flink     flink-json     ${flink.version}        org.apache.flink     flink-table-api-java-bridge_${scala.binary.version}     ${flink.version}         org.apache.flink     flink-connector-kafka-0.10_${scala.binary.version}     ${flink.version}      org.apache.flink     flink-table-planner_${scala.binary.version}     ${flink.version}