使用java远程提交spark任务到yarn集群
背景
公司需求中,需要用到java远程提交spark任务,方式还是用yarn提供的方法提交任务。如果你也想远程提交flink任务,请看这篇文章使用java远程提交flink任务到yarn集群
环境准备
大数据集群,只要有hadoop就行
正式开始
1. 打包spark jars目录下所有jar包,上传到hdfs
cd spark-3.4.3-bin-hadoop3/jars
zip -q -r __spark_libs__.zip *
hdfs dfs -put __spark_libs__.zip /
2. 上传你编译好的spark代码
我这里为了方便测试,直接spark提供的案例代码
hdfs dfs -put spark-examples_2.12-3.4.3.jar /
编写java代码,远程提交任务
package com.test;
import org.apache.spark.SparkConf;
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.deploy.yarn.ClientArguments;
public class SparkSubmit {
private static final String SPARK_YARN_QUEUE_KEY = "spark.yarn.queue";
private static final String DEFAULT_SPARK_YARN_QUEUE_VALUE = "default";
public static void main(String[] args) {
dslSubmit();
}
public static String dslSubmit() {
System.setProperty("HADOOP_USER_NAME","root");
String[] runArgs = new String[]{
"--class", "org.apache.spark.examples.JavaSparkPi",
"--jar", "hdfs://node1.itcast.cn/spark-examples_2.12-3.4.3.jar",
"--arg", "-m",
"--arg", "yarn",
"--arg", "-d"
};
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.yarn.scheduler.heartbeat.interval-ms", "1000");
sparkConf.set("spark.submit.deployMode", "cluster");
sparkConf.set("spark.yarn.preserve.staging.files", "false");
sparkConf.set("mapreduce.app-submission.cross-platform", "true");
sparkConf.set("spark.yarn.isHadoopProvided", "true");
sparkConf.set("spark.yarn.archive", "hdfs://node1.itcast.cn/__spark_libs__.zip");
sparkConf.set("spark.hadoop.yarn.resourcemanager.hostname", "node1");
sparkConf.set("spark.hadoop.yarn.resourcemanager.address",
"node1:8032");
sparkConf
.set("spark.hadoop.yarn.resourcemanager.scheduler.address",
"node1:8030");
// sparkConf.set("spark.yarn.jars", sparkYarnJars);
// sparkConf.set("spark.yarn.stagingDir", sparkYarnStagingDir);
// sparkConf.set("spark.files", sparkFiles);
sparkConf.set("spark.hadoop.mapreduce.framework.name", "yarn");
sparkConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
sparkConf.set(SPARK_YARN_QUEUE_KEY, DEFAULT_SPARK_YARN_QUEUE_VALUE);
try {
ClientArguments cArgs = new ClientArguments(runArgs);
Client client = new Client(cArgs, sparkConf, null);
client.submitApplication();
String applicationId = client.getApplicationId().toString();
System.out.println(applicationId);
return applicationId.toString();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
最后就可以去yarn里面看执行情况了