各位老师Flink中我在创建yarn客户端yarn.init和start方法都成功了,然后运行flink任务也成功了,集群中没有生成新的appid是怎么回事,使用的是yarn perjob模式?private void internalSubmitJob(int step, String jobSteps, Long jobId) {
Configuration flinkConfiguration = FlinkCommonConfiguration.getCommonFlinkConf();
String configurationDirectory = FlinkCommonConfiguration.getConfigurationDirectory();
//创建yarn客户点并初始化、启动
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
//指定yarn-site.xml路径
configuration.addResource(new Path("/xxxxxx/yarn-site.xml"));
configuration.addResource(new Path("/xxxxxx/core-site.xml"));
configuration.addResource(new Path("/xxxxxx/resource-types.xml"));
configuration.addResource(new Path("/xxxxxx/mapred-site.xml"));
// String keytabPath = getClass().getClassLoader().getResource(“appuser.keytab”).getPath();
// String principal = “appuser”
//
// UserGroupInformation.loginUserFromKeytab(“”);
YarnClient yarnClient = YarnClient.createYarnClient();
YarnConfiguration yarnConfiguration = new YarnConfiguration(configuration);
yarnClient.init(yarnConfiguration);
yarnClient.start();
// 创建YarnClusterDescriptor
YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever.create(yarnClient);
// 集群模式设置为per-job模式
flinkConfiguration.set(
DeploymentOptions.TARGET,
YarnDeploymentTarget.PER_JOB.getName());
// yarn application name 设置为flink-application
flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "flink-application");
YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.createClusterSpecification();
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(flinkConfiguration, yarnConfiguration, yarnClient, clusterInformationRetriever, true);
StreamGraph streamGraph;
JobGraph jobGraph;
// 替换为你的Flink作业jar文件路径
try {
streamGraph = FlinkJobRunner.execute(jobId);
jobGraph = streamGraph.getJobGraph();
//per job
ClusterClientProvider clusterClientProvider = yarnClusterDescriptor.deployJobCluster(clusterSpecification,jobGraph, true);
ClusterClient clusterClient = clusterClientProvider.getClusterClient();
ApplicationId applicationId = clusterClient.getClusterId();
String webInterfaceURL = clusterClient.getWebInterfaceURL();
jobLifeCycleManager.launchNewJobLifeCycle(jobHandler.getJobDescriptor(), jobSteps);
jobLifeCycleManager.setApplicationId(applicationId.toString());
jobLifeCycleManager.setClusterClient(clusterClient);
} catch (Exception e){
// jobLifeCycleManager.raiseException();
log.warn(“sumbit异常”);
throw new RuntimeException(e);
}
}这个是我具体实现方法
Flink 版本和 YARN 版本是否兼容:在使用 Flink 运行在 YARN 集群上时,需要确保 Flink 版本和 YARN 版本兼容。您可以查看 Flink 官方文档中的版本兼容性矩阵,以确定 Flink 版本和 YARN 版本之间的兼容性。
YARN 配置是否正确:在使用 Flink 运行在 YARN 集群上时,需要正确配置 YARN 的相关参数,例如 YARN 的资源管理器地址、队列名称、内存和 CPU 资源等。您可以检查 YARN 的配置文件,以确保相关参数正确配置。
Flink 配置是否正确:在使用 Flink 运行在 YARN 集群上时,还需要正确配置 Flink 的相关参数,例如 Flink 的运行模式、作业部署方式、作业参数等。您可以检查 Flink 的配置文件,以确保相关参数正确配置。
日志是否有异常信息:在 Flink 运行在 YARN 集群上时,还需要检查 Flink 和 YARN 的日志,以了解运行时的异常信息。您可以查看 Flink 和 YARN 的日志文件,以检查是否存在异常信息和错误提示
在使用Flink的yarn-perjob模式时,如果在创建yarn客户端、初始化和启动方法都成功了,但集群中没有生成新的Application ID,可能有以下几个可能的原因:
1. 提交任务的过程出现错误:尽管您在代码中执行了yarn.init和start方法成功,但可能在提交任务的过程中遇到了错误,导致未能生成新的Application ID。您可以检查提交任务的代码逻辑,确保没有遗漏或错误。
2. 配置文件问题:请确保您在提交任务之前正确配置了相关的Flink和YARN参数。检查Flink的配置文件(如flink-conf.yaml)和YARN的配置文件(如yarn-site.xml),确保有正确设置相关的属性,如YARN管理器地址、资源分配等。
3. YARN集群资源不足:如果YARN集群没有足够的资源来支持新的应用程序启动,可能会导致无法生成新的Application ID。您可以检查YARN集群的资源使用情况,确保有足够的容量来运行您的Flink任务。
4. 网络连接问题:可能存在网络连接不稳定或延迟高的情况,导致与YARN集群通信失败。请确保您的网络连接正常,并且能够正常访问YARN集群。
如果仍然无法解决问题,请提供更多详细的错误日志或代码片段,以便我们能够更好地理解和帮助您解决该问题。同时,您也可以在Flink社区的论坛或邮件列表中提问,以获取更多有关此问题的专业支持。