flink 批处理 使用table去读取MySQL的数据,报这个[阿里云实时计算 Flink版]

java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
=====这是一个广告位,招租中,联系qq 78315851====
13 条回复 A 作者 M 管理员
  1. 在阿里云 Flink 批处理中,使用 Table API 或 SQL 去读取 MySQL 数据库的数据,需要先将 MySQL 数据库作为外部系统注册到 Flink 中,然后才能使用 Table API 或 SQL 进行查询。在注册外部系统时,需要指定外部系统的连接信息和表结构信息。

    在使用 Table API 或 SQL 进行查询时,需要先启动 Flink MiniCluster,否则会报错 java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down. 因为 Table API 和 SQL 需要在 Flink MiniCluster 中运行,才能访问外部系统的数据。

    以下是一个示例代码,演示如何通过 Table API 查询 MySQL 数据库中的数据:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 注册 MySQL 数据库为外部系统tableEnv.connect(        new ExternalSystemConf("jdbc")                .with("url", "jdbc:mysql://localhost:3306/test")                .with("driver", "com.mysql.jdbc.Driver")                .with("username", "root")                .with("password", "123456"))        .withFormat(new Csv())        .withSchema(                new Schema()                        .field("id", DataTypes.BIGINT())                        .field("name", DataTypes.STRING())                        .field("age", DataTypes.INT()))        .createTemporaryTable("myTable");// 使用 Table API 查询数据Table result = tableEnv.sqlQuery("SELECT id, name, age FROM myTable WHERE age > 18");DataStream stream = tableEnv.toAppendStream(result, Row.class);stream.print();env.execute();

    在上述代码中,先通过 connect() 方法注册 MySQL 数据库为外部系统,指定了连接信息和表结构信息。然后使用 createTemporaryTable() 方法创建临时表,表名为 myTable。最后使用 sqlQuery() 方法查询数据,并使用 toAppendStream() 方法将查询结果转换成 DataStream,最终将结果打印出来。

    需要注意的是,在执行查询之前,需要先启动 Flink MiniCluster,否则会报错 java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down. 可以在代码中添加以下代码启动 MiniCluster:

    // 启动 MiniClusterLocalStreamEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());localEnv.startNewSession();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(localEnv);

    在启动 MiniCluster 之后,就可以执行查询了。

  2. 在 Flink 批处理中使用 Table API 或 SQL 读取 MySQL 数据库时,需要通过 Flink SQL Client 或者在 Java/Scala 代码中指定运行环境的执行程序、任务部署方式等参数。如果没有正确指定这些参数,就容易报出 “MiniCluster is not yet running or has already been shut down” 的错误。这是因为 Flink 默认使用 MiniCluster 运行环境,但是 MiniCluster 只用于本地单机测试,不适合生产环境。

    解决方法如下:

    1. 在 Flink SQL Client 中指定执行程序和任务部署方式

    如果是通过 Flink SQL Client 执行 SQL,需要在启动客户端时通过 -e 参数指定执行程序和任务部署方式。例如:

    ./bin/sql-client.sh embedded -e "SET execution.runtime-mode=BATCH; SET table.exec.buffer-size=1000; SELECT * FROM user_info"

    在上述代码中,通过 SET execution.runtime-mode=BATCH 将执行程序设置为批处理模式,避免使用 MiniCluster;通过 SET table.exec.buffer-size=1000 设置 Table API 的缓冲区大小,在缓解OOM方面比不设要好很多。

    1. 在 Java/Scala 代码中指定执行程序和任务部署方式

    如果是在 Java/Scala 代码中使用 Table API 或 SQL,需要在代码中指定执行程序和任务部署方式。例如:

    EnvironmentSettings settings = EnvironmentSettings.newInstance()    .useBlinkPlanner()    .inBatchMode()    .build();TableEnvironment tEnv = TableEnvironment.create(settings);tEnv.getConfig()    .getConfiguration()    .setString("execution.runtime-mode", "BATCH");tEnv.getConfig()    .getConfiguration()    .setString("table.exec.buffer-size", "1000");Table sourceTable = tEnv.from("user_info");

    在上述代码中,通过设置 execution.runtime-modetable.exec.buffer-size 参数,避免使用 MiniCluster。其中 useBlinkPlanner() 表示使用 Blink planner, inBatchMode() 表示设置执行程序为批处理模式。

  3. 该错误通常是由于使用 Flink MiniCluster 时出现异常或错误引起的,Flink MiniCluster 是 Flink 提供的测试工具,可以模拟单机或分布式场景下的 Flink 集群,用于进行单元测试或集成测试。它们最常见的用途是编写自己的 Flink 作业和算子时进行本地测试。

    可能导致该错误的原因有很多,包括但不限于以下原因:

    • 应用程序代码中的问题:在 Test Case 中启动 Flink MiniCluster 之前,如果代码存在错误或者异常,可能会导致启动失败。因此,请确保您的应用程序代码没有语法错误或逻辑错误,并且所有的依赖项都已正确配置。

    • Flink 版本不兼容:请确保您的应用程序和 MiniCluster 对应的 Flink 版本是兼容的。如果您的应用程序或 MiniCluster 对应的 Flink 版本不兼容,可能会导致启动失败。

    • MiniCluster 状态异常:如果您遇到 MiniCluster 在启动后立即崩溃的情况,这可能是由于 MiniCluster 内部的某些状态异常所致。您可以尝试使用适当的配置和参数重新启动 MiniCluster 来解决此问题。

    解决此问题的方法通常包括以下步骤:

    1. 检查应用程序代码:确保您的应用程序代码没有语法错误或逻辑错误,并且所有的依赖项都已正确配置。

    2. 检查 Flink 版本:请确保您的应用程序和 MiniCluster 对应的 Flink 版本是兼容的,可以尝试更换版本看看是否能够启动成功。

    3. 检查 MiniCluster 配置:请检查您的 MiniCluster 配置是否正确,并尝试使用适当的配置和参数重新启动 MiniCluster。

  4. 这个错误信息“java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.”我估计是你在使用Apache Flink MiniCluster时,MiniCluster尚未启动或已经关闭。MiniCluster通常用于本地开发和测试Flink作业。

    我觉得这个问题可能有以下几个原因:

    MiniCluster未正确启动:请确保在尝试执行Flink作业之前已经正确启动了MiniCluster。您可以检查代码中是否正确调用了start()方法来启动MiniCluster。

    MiniCluster意外关闭:可能在执行Flink作业之前,MiniCluster已经因为某种原因关闭。请检查代码中是否有意外关闭MiniCluster的地方,或者查看日志以获取更多关于关闭原因的信息。

    还有可能就是并发问题,你检查下程序是不是有多个线程同时操作MiniCluster。

    您可以根据以上方法尝试去排查下。祝你好运哈。

  5. 这个异常可能是因为在Flink的执行环境(ExecutionEnvironment)中使用了Flink MiniCluster。 MiniCluster对应用程序进行本地测试非常有用,但它只能在本地模式下运行,而不是在集群模式下。

    在这种情况下,可能会尝试使用一个远程Flink集群来执行应用程序,但是,MiniCluster对象仍然被实例化并且没有正确关闭。这可能是在应用程序的一些测试中发生的常见情况。

    要解决这个问题,可以通过以下方式进行操作:

    确保在使用MiniCluster之前,已正确初始化Flink的执行环境,并正确地将Table Environment注册到该执行环境中。

    在应用程序结束时,确保将MiniCluster对象关闭,并避免在那之后再次使用它。

    如果应用程序中的某些测试需要使用MiniCluster,并且您不确定如何正确关闭对象,请考虑使用JUnit Rule或TestWatcher来确保在测试完成时正确关闭MiniCluster。

  6. 因为命令行运行flink和程序main函数执行flink程序都是走MiniCluster模式,所以可能产生了冲突。

  7. MiniCluster未正确启动:在执行Flink任务时,需要首先启动MiniCluster。如果MiniCluster未能正确启动,可能会导致后续任务无法正常执行。可以参考Flink官方文档了解如何启动MiniCluster,并确保MiniCluster已正确启动。

    Flink环境配置不正确:在使用Flink进行批处理时,需要正确配置Flink的环境变量、classpath等。如果这些配置不正确,可能会导致Flink无法正常执行。可以参考Flink官方文档进行环境配置,并确保配置正确。

    数据源配置不正确:在使用Table API读取MySQL数据时,需要正确配置MySQL数据源,包括数据库连接地址、用户名、密码等。如果这些配置不正确,可能会导致Flink无法读取数据。可以参考Flink官方文档进行数据源配置,并确保配置正确。

  8. 在使用 Flink 进行批处理时,在连接 MySQL 数据库进行数据读取操作可以使用 Table API 和 SQL 两种方式。如果您的代码中出现 MiniCluster is not yet running or has already been shut down 异常,可能是以下原因导致:

    1. MiniCluster 没有正确启动

    MiniCluster 是 Flink 提供给开发者本地测试的一个本地执行引擎,该引擎会将程序打包成一个可运行的 JAR 文件,并在内存中模拟整个 Flink 集群环境。而 MiniCluster 的运行对于整个 Flink 程序来说十分关键,如果 MiniCluster 没有正常启动,程序就无法运行完成。

    解决办法:请检查您配置文件中是否正确指定了 MiniCluster相关参数,比如以下的示例配置:

    Configuration conf = new Configuration();conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);final MiniCluster miniCluster = new MiniCluster(conf);miniCluster.start();
    1. 连接 MySQL 数据库失败

    当我们使用 Flink 进行数据操作时,需要连接外部数据源(例如MySQL),如果连接失败也会导致程序异常终止。

    解决办法:请检查数据库连接的相关配置是否正确、网络是否畅通等问题。以使用Table API为例,假设您要连接到 mydb 库的 data 表上,请确认代码中定义的表信息和连接串等内容没有错误。类似以下示例:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);final JdbcConnectionOptions options = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()                    .withUrl("jdbc:mysql://localhost:3306/mydb")                    .withDriverName("com.mysql.jdbc.Driver")                    .withUsername("root")                    .withPassword("")                .build();final JDBCInputFormat jdbcDataSource = JDBCInputFormat.buildJDBCInputFormat()                    .setDBUrl(options.getUrl())                    .setDrivername(options.getDriverName())                    .setUsername(options.getUsername())                    .setPassword(options.getPassword())                    .setQuery("SELECT * FROM data")                    .setRowTypeInfo(typeInfo)                    .finish();DataStreamSource input = env.createInput(jdbcDataSource);input.print().setParallelism(1);
    1. 表结构信息缺失

    在表操作中,我们需要指定一个 Data Type 以便 Flink 进行数据读取和转换,在这个过程中可能会发生字段信息不匹配的情况。

    解决办法:请检查您定义的 Table Schema 是否与数据库中的数据类型一致,并尝试调整进行匹配。举个例子,如果你的表字段 a 的类型为 VARCHAR,则可以通过以下方式使用将其包装成 Traits.STRING 样式:

    val tableSchema =      new Schema()        .field("a", DataTypes.VARCHAR(10))        .field("b", DataTypes.INT())// ...tableEnv.connect(new FileSystem().path(resultPath))         .withFormat(new Csv())         .withSchema(tableSchema)          .createTemporaryTable("resultTable");// 使用 CAST 将 String 转换为 ValueTable table = tableEnv.sqlQuery("SELECT CAST(a AS STRING) AS a1, b FROM inputTable");

    总之,上述方法不一定适用于所有问题。如果你的问题没有解决,请尝试查看完整的 log 信息以便您更好地了解运行状态和错误点位,或者请提供详细的数据、配置文件等内容以方便大家共同探讨问题所在。

  9. 这个错误通常是因为在测试的时候,没有正确设置 MiniCluster 的相关参数,导致 Flink 程序无法启动 MiniCluster。建议您检查一下 MiniCluster 的相关配置是否正确,并且确保 MiniCluster 已经正确启动。

    下面是一个简单的 Flink 批处理程序使用 Table API 从 MySQL 中读取数据的示例代码,您可以参考一下,看看是否符合您的需求:

    public class BatchJob {    public static void main(String[] args) throws Exception {        // 设置 MiniCluster 的配置信息        Configuration config = new Configuration();        config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);            // 创建 MiniCluster        MiniCluster miniCluster = new MiniCluster(config);        miniCluster.start();            // 创建 TableEnvironment        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);        env.setParallelism(1);        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();        TableEnvironment tEnv = TableEnvironment.create(settings);            // 注册 MySQL 数据源        String url = "jdbc:mysql://localhost:3306/test";        String username = "root";        String password = "123456";        String driverName = "com.mysql.jdbc.Driver";        tEnv.getConfig().getConfiguration().setString("table.exec.source.driver", driverName);        tEnv.getConfig().getConfiguration().setString("table.exec.source.url", url);        tEnv.getConfig().getConfiguration().setString("table.exec.source.username", username);        tEnv.getConfig().getConfiguration().setString("table.exec.source.password", password);            // 读取 MySQL 数据        Table inputTable = tEnv.sqlQuery("SELECT * FROM my_table");        DataSet inputDataSet = tEnv.toDataSet(inputTable, Row.class);        inputDataSet.print();            // 关闭 MiniCluster        miniCluster.close();    }}

    在这个示例代码中,我们先创建了一个 MiniCluster,并使用 Table API 从 MySQL 中读取数据,最后关闭 MiniCluster。如果您的代码和这个示例代码类似,但仍然出现了错误,建议您提供更多详细的信息,例如完整的错误信息和代码片段,以便更好地定位问题。

  10. 根据您提供的信息,可能是因为您在未启动 MiniCluster 的情况下尝试访问 MySQL 数据库。在 Flink 中,MiniCluster 是用于测试和开发的本地模拟集群,如果您的代码中使用了 MiniCluster,那么在运行程序之前,需要先启动 MiniCluster。

    如果您确定不需要使用 MiniCluster,可以考虑去掉与之相关的代码,或者调整代码中的相关逻辑。如果您需要使用 MiniCluster 进行测试和开发,可以参考以下步骤启动 MiniCluster:

    在代码中,创建一个 MiniCluster: java Copy code MiniCluster miniCluster = new MiniCluster.Builder() .setNumTaskManagers(1) .setNumSlotsPerTaskManager(1) .build(); 在代码中,启动 MiniCluster: java Copy code miniCluster.start(); 在 MiniCluster 启动之后,使用 Flink SQL 语句查询 MySQL 数据库中的数据: java Copy code String query = “SELECT * FROM your_table”; Table table = tableEnv.sqlQuery(query); 需要注意的是,如果您使用 MiniCluster 进行测试和开发,可能需要在测试结束之后手动关闭 MiniCluster。您可以在程序的最后调用以下方法来关闭 MiniCluster:

    java Copy code miniCluster.close(); 同时,建议您查看相关文档和资料,了解更多关于 MiniCluster 和 Flink SQL 的使用和调试技巧,以便更好地进行测试和开发。

  11. 可能是因为在使用TableEnvironment读取MySQL数据时,没有正确地配置ExecutionEnvironment和StreamExecutionEnvironment,导致MiniCluster没有正确启动。

  12. 在 Flink 批处理任务中使用 Table API 去读取 MySQL 数据库的数据时,需要使用 ExecutionEnvironment 来创建批处理环境,并在之后调用 env.execute() 方法来启动任务。如果你在 MiniCluster 还没有启动或已经关闭时调用了 env.execute() 方法,就会抛出 java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down. 异常。

    要解决这个问题,可以将 env.execute() 方法放到 MiniCluster 启动之后执行。具体做法是将代码封装成一个函数,然后在测试类中调用该函数。示例如下:

    public static void main(String[] args) throws Exception {    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    env.setParallelism(1);    // 设置 Checkpoint 配置    env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);    env.getCheckpointConfig().setCheckpointTimeout(10000L);    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);    // 创建 MiniCluster    MiniCluster miniCluster = new MiniCluster.Builder()            .setNumTaskManagers(1)            .setNumSlotsPerTaskManager(1)            .build();    try {        // 启动 MiniCluster        miniCluster.start();        // 在 MiniCluster 中执行 Flink 任务        runFlinkJob(env);    } finally {        // 停止 MiniCluster        miniCluster.close();    }}private static void runFlinkJob(StreamExecutionEnvironment env) throws Exception {    BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);    // 使用 Table API 读取 MySQL 数据库的数据    Table inputTable = tEnv.sqlQuery("SELECT * FROM my_table");    // 打印结果    DataSet result = tEnv.toDataSet(inputTable, Row.class);    result.print();    // 启动任务    env.execute("My Flink Job");}

    在上述示例中,我们先创建了一个 MiniCluster,并在其中启动 Flink 任务。然后,在 runFlinkJob 函数中使用 Table API 去读取 MySQL 数据库的数据,并最终通过调用 env.execute() 方法来启动任务。需要注意的是,在 MiniCluster 关闭之前一定要停止 Flink 任务,否则会出现类似资源泄漏的问题。

  13. 在使用 Flink 批处理进行 table API 的开发时,如果出现 java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down. 这个异常,可能是因为您的测试代码中启动了 Flink 的 MiniCluster,但是没有正确地关闭该 MiniCluster 导致的。

    在执行 batch job 时,应该先创建 ExecutionEnvironment 对象,并在其中定义要执行的任务。然后,通过调用 execute() 方法来启动任务并等待任务完成。例如:

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    // 使用 MySQL connector 创建 TableEnvironment TableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

    // … 定义 Table API 查询 …

    // 执行查询 Table result = … DataSet rows = tEnv.toDataSet(result, Row.class); rows.print(); 请确保在测试代码中正确地关闭 Flink 的 MiniCluster。可以在 @After 注解的方法中调用 MiniClusterResource.stop() 方法来关闭 MiniCluster。例如:

    import org.apache.flink.test.util.MiniClusterResource; import org.junit.ClassRule; import org.junit.Test; import org.junit.After;

    public class ExampleTest {

    @ClassRulepublic static final MiniClusterResource miniClusterResource =        new MiniClusterResource(            new MiniClusterResourceConfiguration.Builder()                .setNumberTaskManagers(1)                .setNumberSlotsPerTaskManager(1)                .build());@Testpublic void test() throws Exception {    // ... 测试代码 ...}@Afterpublic void tearDown() throws Exception {    miniClusterResource.after();}

    } 在上面的例子中,我们使用 MiniClusterResource 来启动 MiniCluster,并在测试方法执行完成后关闭该 MiniCluster。这样可以确保测试代码的正确性,并避免因为没有正确关闭 MiniCluster 导致的问题。

  14. 从错误信息来看,这个错误可能是由于 Flink MiniCluster 还未启动,或者已经被关闭引起的异常。通常而言,Flink MiniCluster 是通过一些测试用例或者本地调试来使用的,因此不需要显式调用其启动和关闭方法。具体地说,如果你在本地开发环境中开发 Flink 应用,可以通过编写一个如下所示的 main 函数来启动 Flink:

    public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // ...env.execute("My Flink Job");

    } 如果你使用的是 Flink Table API,请将 StreamExecutionEnvironment 换为 ExecutionEnvironment 即可。