各位老师Flink中我在创建yarn客户端yarn.init和start方法都成功了,然后运行fli[阿里云实时计算 Flink版]

各位老师Flink中我在创建yarn客户端yarn.init和start方法都成功了,然后运行flink任务也成功了,集群中没有生成新的appid是怎么回事,使用的是yarn perjob模式?private void internalSubmitJob(int step, String jobSteps, Long jobId) {

    Configuration flinkConfiguration = FlinkCommonConfiguration.getCommonFlinkConf();
    String configurationDirectory = FlinkCommonConfiguration.getConfigurationDirectory();

    //创建yarn客户点并初始化、启动
    org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
    //指定yarn-site.xml路径
    configuration.addResource(new Path("/xxxxxx/yarn-site.xml"));
    configuration.addResource(new Path("/xxxxxx/core-site.xml"));
    configuration.addResource(new Path("/xxxxxx/resource-types.xml"));
    configuration.addResource(new Path("/xxxxxx/mapred-site.xml"));

// String keytabPath = getClass().getClassLoader().getResource(“appuser.keytab”).getPath();
// String principal = “appuser”
//
// UserGroupInformation.loginUserFromKeytab(“”);

    YarnClient yarnClient = YarnClient.createYarnClient();
    YarnConfiguration yarnConfiguration = new YarnConfiguration(configuration);

    yarnClient.init(yarnConfiguration);
    yarnClient.start();

    // 创建YarnClusterDescriptor
    YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever.create(yarnClient);

    // 集群模式设置为per-job模式
    flinkConfiguration.set(
            DeploymentOptions.TARGET,
            YarnDeploymentTarget.PER_JOB.getName());


    // yarn application name 设置为flink-application
    flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "flink-application");

    YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);

    ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
            .createClusterSpecification();

    YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(flinkConfiguration, yarnConfiguration, yarnClient, clusterInformationRetriever, true);

    StreamGraph streamGraph;
    JobGraph jobGraph;

    // 替换为你的Flink作业jar文件路径
    try {
        streamGraph = FlinkJobRunner.execute(jobId);
        jobGraph = streamGraph.getJobGraph();
        //per job
        ClusterClientProvider clusterClientProvider = yarnClusterDescriptor.deployJobCluster(clusterSpecification,jobGraph, true);
        ClusterClient clusterClient = clusterClientProvider.getClusterClient();
        ApplicationId applicationId = clusterClient.getClusterId();
        String webInterfaceURL = clusterClient.getWebInterfaceURL();
        jobLifeCycleManager.launchNewJobLifeCycle(jobHandler.getJobDescriptor(), jobSteps);
        jobLifeCycleManager.setApplicationId(applicationId.toString());
        jobLifeCycleManager.setClusterClient(clusterClient);
    } catch (Exception e){

// jobLifeCycleManager.raiseException();
log.warn(“sumbit异常”);
throw new RuntimeException(e);
}

}这个是我具体实现方法

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
=====这是一个广告位,招租中,联系qq 78315851====
1 条回复 A 作者 M 管理员
  1. Flink 版本和 YARN 版本是否兼容:在使用 Flink 运行在 YARN 集群上时,需要确保 Flink 版本和 YARN 版本兼容。您可以查看 Flink 官方文档中的版本兼容性矩阵,以确定 Flink 版本和 YARN 版本之间的兼容性。

    YARN 配置是否正确:在使用 Flink 运行在 YARN 集群上时,需要正确配置 YARN 的相关参数,例如 YARN 的资源管理器地址、队列名称、内存和 CPU 资源等。您可以检查 YARN 的配置文件,以确保相关参数正确配置。

    Flink 配置是否正确:在使用 Flink 运行在 YARN 集群上时,还需要正确配置 Flink 的相关参数,例如 Flink 的运行模式、作业部署方式、作业参数等。您可以检查 Flink 的配置文件,以确保相关参数正确配置。

    日志是否有异常信息:在 Flink 运行在 YARN 集群上时,还需要检查 Flink 和 YARN 的日志,以了解运行时的异常信息。您可以查看 Flink 和 YARN 的日志文件,以检查是否存在异常信息和错误提示

  2. 在使用Flink的yarn-perjob模式时,如果在创建yarn客户端、初始化和启动方法都成功了,但集群中没有生成新的Application ID,可能有以下几个可能的原因:

    1. 提交任务的过程出现错误:尽管您在代码中执行了yarn.init和start方法成功,但可能在提交任务的过程中遇到了错误,导致未能生成新的Application ID。您可以检查提交任务的代码逻辑,确保没有遗漏或错误。

    2. 配置文件问题:请确保您在提交任务之前正确配置了相关的Flink和YARN参数。检查Flink的配置文件(如flink-conf.yaml)和YARN的配置文件(如yarn-site.xml),确保有正确设置相关的属性,如YARN管理器地址、资源分配等。

    3. YARN集群资源不足:如果YARN集群没有足够的资源来支持新的应用程序启动,可能会导致无法生成新的Application ID。您可以检查YARN集群的资源使用情况,确保有足够的容量来运行您的Flink任务。

    4. 网络连接问题:可能存在网络连接不稳定或延迟高的情况,导致与YARN集群通信失败。请确保您的网络连接正常,并且能够正常访问YARN集群。

    如果仍然无法解决问题,请提供更多详细的错误日志或代码片段,以便我们能够更好地理解和帮助您解决该问题。同时,您也可以在Flink社区的论坛或邮件列表中提问,以获取更多有关此问题的专业支持。