val env = ExecutionEnvironment.getExecutionEnvironmentval props = new Properties()props.setProperty("bootstrap.servers", "localhost:9092")props.setProperty("group.id", "test")val consumer = new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), props)consumer.setStartFromEarliest()val stream: DataSet[String] = env .addSource(consumer)val batchSize = 1000L // 批处理大小stream .map(parseRecord) // 对每个数据进行解析 .groupingKey(data => data.id) // 根据 ID 进行分组 .reduceGroup { (values, out: Collector[Seq[Data]]) => val batch = scala.collection.mutable.ArrayBuffer[Data]() values.foreach { data => batch += data if (batch.size >= batchSize) { out.collect(batch) batch.clear() } } if (batch.nonEmpty) { out.collect(batch) batch.clear() } } .flatMap { data => writeToEs(data) // 将处理后的数据写入 ES 中 }env.execute("Batch processing from Kafka to ES example")
在此示例中,我们首先使用FlinkKafkaConsumer从 Kafka 中消费数据,然后定义一个处理函数,在函数内我们对数据进行解析,并根据 ID 进行分组。然后我们将同一个数据 ID 的数据打包成一个批次,批次大小是向上取整为 batchSize 的最小值。最后我们将批次数据批量写入到 ES 中。
定义批插入sink或者使用flink 提供的sink函数,我记得flink好几个数据库sink函数都提供批量插入。此回答整理自钉群“【③群】Apache Flink China社区”
Flink实时消费kafka数据,数据经过处理,富化、清洗等操作,写入ES