请问flink-connector-jdbc在sink时,如何设置批量写入?[阿里云实时计算 Flink版]

尝试了sink.buffer-flush.max-rows,感觉没生效。

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
=====这是一个广告位,招租中,联系qq 78315851====
16 条回复 A 作者 M 管理员
  1. 阿里云实时计算 Flink 支持使用 flink-connector-jdbc 连接器将数据从 Flink 程序写入到关系型数据库中,通过设置批量写入可以提高写入性能,降低写入延迟。在 flink-connector-jdbc 中,可以通过以下方式设置批量写入:

    1. 在 JDBC sink 的构造函数中,使用 BatchPreparedStatementSetter 对象实现批量写入。BatchPreparedStatementSetter 可以实现在 PreparedStatement 中设置批量写入的参数。
    JdbcSink.sink(    "INSERT INTO table (column1, column2) VALUES (?, ?)",    new BatchPreparedStatementSetter>() {        @Override        public void setValues(PreparedStatement ps, Tuple2 tuple) throws SQLException {            ps.setString(1, tuple.f0);            ps.setInt(2, tuple.f1);            ps.addBatch();        }        @Override        public void setValues(PreparedStatement ps, int i) throws SQLException {            // do nothing        }    },    // other config)
    1. 在 JDBC sink 的构造函数中,使用 BatchSize 参数设置批量写入的大小。BatchSize 指定每次写入的数据条数,默认值为 5000。
    JdbcSink.sink(    "INSERT INTO table (column1, column2) VALUES (?, ?)",    new JdbcStatementBuilderImpl<>(),    new JdbcExecutionOptions.Builder()        .withBatchSize(1000)        .build(),    // other config)

    设置批量写入可以提高写入性能,但也可能会导致数据不一致的问题。因此,在设置批量写入时,需要根据具体业务场景进行权衡和测试。

  2. 在使用 Flink 提供的 JDBC Connector 时,可以通过设置 batchSize 属性来实现批量写入。具体操作如下:

    1. 引入依赖

    在项目的 pom.xml 文件中添加 JDBC Connector 的依赖:

        org.apache.flink    flink-connector-jdbc_2.11    ${flink.version}

    1. 创建 JDBC Sink

    在创建 JDBC Sink 时,可以通过 JDBCOutputFormatBuildersetBatchIntervalMillis 方法设置批量写入的时间间隔,单位是毫秒。可以根据实际场景调整时间间隔和批量大小:

    JDBCOutputFormatBuilder builder = new JDBCOutputFormatBuilder()        .setDrivername(driverName)        .setDBUrl(dbURL)        .setUsername(userName)        .setPassword(password)        .setQuery(insertQuery)        .setBatchIntervalMillis(batchIntervalMillis)    // 设置批量写入的时间间隔        .setBatchSize(batchSize);                       // 设置批量大小JDBCOutputFormat outputFormat = builder.finish();JDBCTableSink jdbcSink = JDBCTableSink.builder()        .setOutputFormat(outputFormat)        .build();

    其中,batchSize 属性可以设置批量写入的大小,即将多个数据行打包成一个批次进行写入。注意,这个值需要根据你的数据量和实际场景进行调整,过小可能会影响写入性能,过大可能会导致内存溢出。

    1. 将 Sink 应用到 Flink 流计算中

    将 JDBC Sink 应用到 Flink 流计算中,例如:

    DataStream inputStream = ...;inputStream.addSink(jdbcSink);

    在应用到 Flink 流计算中时,可以通过 setFlushOnCheckpoint(true) 方法设置在Checkpoint时也要进行数据写入,保证数据的完整性和可靠性。

  3. 在使用flink-connector-jdbc进行数据sink时,可以通过设置JDBCOutputFormat的batchSize参数来控制批量写入的大小。batchSize参数表示每次写入的记录数,可以根据需要进行调整。例如:

    JDBCOutputFormat jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()    .setDrivername(driver)    .setDBUrl(dbUrl)    .setUsername(username)    .setPassword(password)    .setQuery(insertQuery)    .setBatchInterval(batchInterval)    .setBatchSize(batchSize)    .finish();

    其中,batchSize参数可以设置为一个大于1的整数,表示每次写入的记录数。需要注意的是,设置的batchSize过大可能会导致内存溢出,需要根据实际情况进行调整。

  4. 在使用 Flink 的 JDBC sink 时,可以通过配置 batchSize 来设置批量写入的大小。batchSize 表示一次性向 JDBC 批量提交多少条记录,可以设置为整数值。

    例如,以下代码片段使用 JDBC sink 向 MySQL 数据库中批量写入数据,每次提交 100 条记录:

    DataStream<MyData> stream = ...;FlinkJdbcSink<MyData> sink = JdbcSink.<MyData>batch(        "INSERT INTO myTable (col1, col2, col3) VALUES (?, ?, ?)",        (ps, data) -> {            ps.setString(1, data.getField1());            ps.setInt(2, data.getField2());            ps.setLong(3, data.getField3());        },        JdbcExecutionOptions.builder()                .withBatchSize(100)                .build(),        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()                .withUrl("jdbc:mysql://localhost:3306/myDb")                .withDriverName("com.mysql.jdbc.Driver")                .withUsername("myUser")                .withPassword("myPassword")                .build());stream.addSink(sink);

    在上述示例中,使用了 FlinkJdbcSink 的静态方法 batch,并传入了 batchSize 参数。该参数被设置为 100,因此会每次向数据库批量提交 100 条记录。

    注意,此时需要保证配置的 batchSize 不超过 JDBC 驱动的最大批量提交大小,否则可能会出现异常。通常情况下,可以根据系统性能和数据量大小综合考虑一个适当的 batchSize 值。

  5. 官方文档提供了2个方式。

    1、BatchIntervalChecker

    BatchIntervalChecker是一个用于检查批量写入时间间隔的工具类。您可以使用 BatchIntervalChecker来设置批量写入的时间间隔。BatchIntervalChecker会在每个批次写入之前检查时间间隔是否已经达到,如果达到了,就会触发批量写入操作。

    2、BatchSizeTrigger

    BatchSizeTrigger是一个用于检查批量写入大小的工具类。您可以使用BatchSizeTrigger来设置批量写入的大小。BatchSizeTrigger会在每个批次写入之前检查写入的数据量是否已经达到,如果达到了,就会触发批量写入操作。

    具体使用哪种方式取决于您的需求和场景。如果您的数据量较小,可以使用BatchIntervalChecker来设置时间间隔;如果您的数据量较大,可以使用 BatchSizeTrigger来设置批量写入大小。

  6. Flink JDBC连接器提供了对MySQL、PostgreSQL和Oracle等常见的数据库读写支持,如果需要配置批量写入可以设置sink.buffer-flush.max-rows 大于0 ,flush数据前,缓存记录的最大值。以及设置sink.buffer-flush.interval 大于0,flush数据的时间间隔。数据在Flink中缓存的时间超过该参数指定的时间后,异步线程将flush数据到数据库中。同时需要注意字段类型映射,参考文档:文档

  7. 在Flink SQL连接MySQL等关系型数据库时,通常会使用Flink官方提供的JDBC connector。当想要在sink阶段进行批量写入时,可以考虑以下几个方面。

    配置Sink 除了 sink.buffer-flush.max-rows,还需要配置sink.buffer-flush.interval和sink.buffer-flush.max-size等参数,这些参数共同控制了一个批次的多少和大小。具体参数含义可以参考官方文档(https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#jdbc-sink)。

    数据类型和批量写入 如果sink数据列的类型与表中的数据类型不匹配,可能也会导致写入失败,因此需要确保被写入的数据类型一致。此外,对于大数据量的写入,Flink仅使用JDBC批量编译器API,其中大批量数据的处理是在JDBC驱动程序内部完成的,但该功能仅在JDBC驱动程序支持时才会生效,因此需要确保JDBC驱动程序支持批量处理。

    确认实时更新 最后,确保你正在连接到手动维护的活动数据库,而不是连接到备用副本,否则写入可能会延迟,因为备用副本可能不会实时更新。

    希望这些信息有所帮助,并解决您的问题。

  8. 设置最大缓冲行数 设置最大缓冲时间 默认情况下,JDBC Sink 发送完成后会自动关闭连接,并提交事务。

    还有一些与性能相关的调整方面也可以考虑做出:比如,修改输入并行度(即 max.parallelism),或是调整容器资源限制等都可能受益于增加批量处理参数来保证Flink Job稳定运行。

  9. Flink 中的 JDBC Connector 可以用于读取和写入关系型数据库。在使用 Flink 的 JDBC Sink 进行批量写入时,需要设置 sink.buffer-flush.max-rows 和 sink.buffer-flush.interval 两个参数,分别表示最大缓冲行数和最大缓冲时间。当满足其中一个条件时,就会将数据批量写入 Jdbc。

    1. 设置最大缓冲行数

      在你的 Flink SQL 程序里找到对应的 JDBC sink 并进行如下配置:

    sink.buffer-flush.max-rows = 5000

    这个例子中表示每插入 5000 行触发一次写操作。

    1. 设置最大缓冲时间

    同样地,在你的 Flink SQL 程序配置中添加:

    sink.buffer-flush.interval = 2000ms 

    这个例子中则是代表了每隔 2 秒钟写入一次缓存区内的所有内容。

    默认情况下,JDBC Sink 发送完成后会自动关闭连接,并提交事务。

    还有一些与性能相关的调整方面也可以考虑做出:比如,修改输入并行度(即 max.parallelism),或是调整容器资源限制等都可能受益于增加批量处理参数来保证Flink Job稳定运行。

    注意 JKBC Sink 默认不启用 Batch Api, 如需将 JDBC Sink 扩展成可同时支持 batch insert 和 streaming insert 需要额外开发。

  10. 在使用Flink JDBC Connector的Sink写入数据库时,设置批量flush参数是实现高吞吐量的关键。主要有两个相关参数: 1. sink.buffer-flush.max-rows:触发flush并写入数据库的最大行数阈值。默认为1,即每插入1行就flush。 2. sink.buffer-flush.interval:触发flush的最大时间间隔。默认为0,即不按时间隔断flush。 要实现批量写入数据库,需要同时调大这两个参数。例如:

    properties# 每5000行或1秒钟触发一次flushsink.buffer-flush.max-rows=5000  sink.buffer-flush.interval=1000

    这样,连接器会将插入的数据缓存至内存Buffer,当数据量达到5000行或者超过1秒钟时,会触发flush并批量写入数据库。 adjust这两个参数对吞吐量的影响主要有: 1. sink.buffer-flush.max-rows越大,触发flush的频率越低,插入数据库的批量数据越多,数据库的IO压力越大,从而达到更高的吞吐量。 2. sink.buffer-flush.interval允许在达不到max-rows阈值时,也定期触发flush。这可以避免数据在Buffer中停留时间过长,同时也为数据库提供更均匀的压力,有利于吞吐量的提高。 3. 两个参数都不宜设置过大,否则会给数据库带来过高的压力和较高的延迟。需要根据数据库的处理能力进行评估和调优。 4. interval参数对event time语义的支持很关键。它可以确保无论 max-rows是否达到,数据至少以该频率超越水位线并刷新到数据库。 所以,想实现Flink JDBC Sink的批量写入和高吞吐量,调优这两个flush相关的参数是关键。需要结合具体数据库的性能,选择适当的max-rows和interval值,既能发挥批量写入的优势,又不会过分压榨数据库资源。

  11. 在使用 Flink JDBC Connector 时,可以通过配置 sink.buffer-flush.max-rowssink.buffer-flush.interval 参数来控制批量写入。其中,sink.buffer-flush.max-rows 参数表示可以缓存的最大记录数,sink.buffer-flush.interval 参数表示缓存的最大时间间隔。只要满足其中一个条件,就会触发将缓存中的数据批量写入 JDBC 数据库。

    如果您设置了 sink.buffer-flush.max-rows 参数但是没有生效,可能需要检查以下问题:

    1. 是否设置了正确的参数值。确保 sink.buffer-flush.max-rows 参数的值大于 0。

    2. 是否开启了批量写入。在 Flink JDBC Connector 中,默认是开启了批量写入的,如果您没有手动关闭,那么就应该可以正常批量写入。可以检查一下代码中是否有关闭批量写入的操作。

    3. 是否达到了批量写入的条件。如果您设置了 sink.buffer-flush.max-rows 参数,但是没有达到最大记录数,那么就不会触发批量写入。可以检查一下是否有足够的数据达到了最大记录数。

    如果您仍然无法解决问题,请提供更多详细的信息,例如代码片段、日志信息等,以便更好地定位问题。

  12. 可以使用 Flink 官方提供的 BulkFormatBuilder 工具类,来实现 flink-connector-jdbc 的批量写入。

    // 这里假设已经定义好了 POJO 类型和 JDBC 配置信息

    DataStream records = …; JdbcConnectionOptions connectionOptions = …; JdbcExecutionOptions executionOptions = …;

    // 使用 BulkFormatBuilder 构建 JdbcOutputFormatBuilder

    JdbcOutputFormatBuilder outputFormatBuilder = JdbcOutputFormat.buildJdbcOutputFormat() .setDrivername(“org.apache.derby.jdbc.EmbeddedDriver”) .setDBUrl(“jdbc:derby:memory:test”) .setUsername(“”) .setPassword(“”) .setQuery(“INSERT INTO test (id, name) VALUES (?, ?);”) .setSqlTypes(new int[] { Types.INTEGER, Types.VARCHAR }) .setBatchIntervalMs(1000) .setBatchSize(500) .setJdbcStatementBuilder(new JdbcStatementBuilder() { public void accept(java.sql.PreparedStatement statement, MyRecord record) { statement.setInt(1, record.id); statement.setString(2, record.name); } });

    // 构建 SinkFunction

    SinkFunction sink = JdbcSink.sink( outputFormatBuilder, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(“jdbc:derby:memory:test”) .withDriverName(“org.apache.derby.jdbc.EmbeddedDriver”) .build(), new JdbcExecutionOptions.JdbcExecutionOptionsBuilder() .withBatchSize(500) .withBatchIntervalMs(1000) .withMaxRetries(3) .build());

    // 将 Source 写入 Sink

    records.addSink(sink);

  13. Flink 官方提供的 flink-connector-jdbc 在 Sink 时支持批量写入,可以通过设置以下参数来实现:

    jdbc.max-retries:设置在失败情况下允许的最大重试次数。默认值为 3。

    jdbc.batch-size:设置 JDBC Sink 在写入数据时使用的批量大小。默认值为 1000。

    jdbc.batch-interval-millis:设置 JDBC Sink 在写入数据时使用的批量时间间隔。默认值为 0,表示禁用批量时间间隔。

    sink.buffer-flush.max-rows:设置在写入到 JDBC Sink 之前缓冲区中允许的最大记录数。默认值为 -1,表示禁用缓冲区。设置为正整数时,表示启用缓冲区,并设置缓冲区中允许的最大记录数。

    需要注意的是,sink.buffer-flush.max-rows 参数主要用于控制缓冲区的大小,并不能直接控制批量写入的大小。实际上,在使用 JDBC Sink 时,批量写入的大小由 jdbc.batch-size 和 jdbc.batch-interval-millis 参数共同决定。

    如果您想要实现更高效的批量写入,建议将 jdbc.batch-size 参数设置为较大的值,并设置合理的 jdbc.batch-interval-millis 参数以控制写入的频率。同时,还可以考虑通过调整并发度、优化 SQL 语句、优化数据库连接等方式来进一步提高写入性能。

    Regenerate response

  14. 在使用 Flink JDBC Sink 时,可以通过以下参数控制批量写入:

    1、sink.buffer-flush.max-rows:设置缓冲区大小,即达到该缓冲区大小后批量写入到数据库,默认为 5000; 2、sink.buffer-flush.interval:设置缓冲区时间间隔,即达到该时间间隔后批量写入到数据库,默认为 1s; 3、sink.max-retries:设置写入失败时的最大重试次数,默认为 3; 4、sink.max-parallelism:设置写入的并行度,默认为 1。

    如果你想要自定义批量写入的大小,可以通过在参数中添加如下配置实现:

    .withParameters(JdbcExecutionOptions.builder().withBatchSize(1000).build()) 其中 withBatchSize() 方法可以设置批量写入的大小,即一次性写入多少条数据。例如上述示例中的 1000 表示一次性写入 1000 条数据。

    另外,如果你在使用 Flink JDBC Sink 时出现了 sink.buffer-flush.max-rows 参数不生效的情况,可以尝试将该参数的值设置为较小的值,如 100,看看是否能够生效。如果仍然不生效,可能是因为数据库本身的限制导致无法批量写入。

  15. 在 Flink 中使用 flink-connector-jdbc 作为 Sink 时,可以通过sink.buffer-flush.max-rows来设置批量写入的最大行数。但是需要注意的是,这个参数仅仅是一个提示,Flink 会尽可能地将数据批量写入 JDBC 数据库,并且根据数据库支持的特性进行优化,以达到更高的吞吐量。

    在设置 sink.buffer-flush.max-rows 参数时,需要同时设置 sink.buffer-flush.interval 参数,用于控制数据写入间隔时间,保证不会因为等待写入数据库而阻塞整个任务。例如:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream stream = ...stream.addSink(JdbcSink.sink(        "insert into myTable values (?, ?, ?)",        (ps, t) -> {            ps.setInt(1, t.getField(0));            ps.setString(2, t.getField(1));            ps.setDouble(3, t.getField(2));        },        JdbcExecutionOptions.builder()                .withBatchSize(5000)                .withBatchIntervalMs(1000L)                .withMaxRetries(5)                .build(),        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()                .withUrl("jdbc:mysql://localhost/test")                .withDriverName("com.mysql.jdbc.Driver")                .withUsername("root")                .withPassword("")                .build()));env.execute();

    在上面的示例中,我们设置了JdbcExecutionOptions对象的withBatchSize方法为5000,表示每次写入5000行数据;同时,withBatchIntervalMs方法也被设置为1000毫秒,表示每隔1秒写入一次数据。这样就可以达到批量写入的效果了。

    需要注意的是,在使用flink-connector-jdbc时,还可以通过其他参数来进一步优化写入性能,例如设置 JDBC 连接池、调整并发度等等。

  16. 在使用Flink的JDBC连接器时,可以通过以下两种方式来设置批量写入:

    使用BatchingSinkFunction BatchingSinkFunction是一种可选的SinkFunction,它允许将多个记录作为一个批次进行写入。在Flink的JDBC连接器中,您可以通过继承或实现这个类来创建自定义的批量写入SinkFunction。

    例如,下面的代码使用BatchingSinkFunction来将数据按照批量大小(100)写入MySQL:

    public class MySQLBatchWriter extends BatchingJdbcSinkFunction {

    public MySQLBatchWriter() {    super(JdbcExecutionOptions.builder()            .withBatchSize(100)            .build());}@Overrideprotected void prepareStatement(PreparedStatement statement, MyData data) throws SQLException {    statement.setInt(1, data.getId());    statement.setString(2, data.getName());    statement.setDouble(3, data.getValue());}

    } 其中,prepareStatement方法用于准备SQL查询语句,而构造函数则使用JdbcExecutionOptions.Builder来设置批量大小。

    使用JdbcExecutionOptions JdbcExecutionOptions提供了多个配置选项,其中包括设置批量大小的选项。在使用Flink的JDBC连接器时,您可以通过以下代码来设置批量大小:

    JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder() .withBatchSize(100) .build();

    JdbcSink sink = JdbcSink.sink( “INSERT INTO my_table (id, name, value) VALUES (?, ?, ?)”, (statement, data) -> { statement.setInt(1, data.getId()); statement.setString(2, data.getName()); statement.setDouble(3, data.getValue()); }, JdbcExecutionOptions.builder() .withBatchSize(100) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(“jdbc:mysql://localhost:3306/my_database”) .withDriverName(“com.mysql.jdbc.Driver”) .withUsername(“root”) .withPassword(“root”) .build());

    dataStream.addSink(sink); 其中,withBatchSize方法用于设置批量大小,并将其传递给JdbcExecutionOptions.Builder。

  17. 可能有以下几个原因导致该参数没有生效:

    数据源本身的限制:如果数据源本身对批量写入有限制,那么设置 sink.buffer-flush.max-rows 参数也不会生效。你可以尝试查看数据源的文档或者源码,确认数据源是否支持批量写入,以及支持的批量写入的大小。

    配置参数生效范围:sink.buffer-flush.max-rows 参数只在配置文件中的 flink-conf.yaml 或者执行作业时的 -yD 参数中生效,在代码中通过 env.getConfig().setGlobalJobParameters() 方式设置的参数并不会生效。

    代码中的批量写入限制:在源码中,如果对 sink.buffer-flush.max-rows 参数进行了手动覆盖限制,那么该限制也会覆盖配置文件中的同名限制,你可以去查看代码中是否有这方面的限制。

    需要注意的是,由于 JDBC Sink 是一个异步写入的 Sink,批量写入的大小可能会受到网络带宽、数据源状态等因素的影响。