Spark on Yarn部署流程

Spark on Yarn

1 原理

Spark yarn 模式有两种, yarn-client, yarn-cluster, 其中yarn-client适合测试环境, yarn-cluster适合生产环境。
在详细说明Yarn模式之前, 需要先了解几个名词
ResourceManager: 整个集群只有一个, 负责集群资源的统一管理和调度, 因为整个集群只有一个,所以也有单点问题,
NodeManager: 它可以理解为集群中的每一台slave
AM: application master, 对于每一个应用程序都有一个AM, AM主要是向RM申请资源(资源其实就是Container, 目前这个Container就是cpu cores, memory), 然后在每个NodeManager上启动Executors(进一步分布资源给内部任务), 监控跟踪应用程序的进程等。
这里就引入了YARN的调度框架问题: 双层调度框架
(1)RM统一管理集群资源,分配资源给AM
(2)AM将资源进一步分配给Tasks

1.1 Yarn-cluster模式

image-20200320161116768

Yarn-cluster

下面来具体说说Spark Yarn Cluster的流程:
(1) Client端启动应用程序,提交APP到YARN RM
(2)RM收到请求之后, 就会在集群中随机选择一个NM, 为该应用程序分配第一个Container, 然后在这个Contaiiner上启动AM,AM则实现了SC等的初始化
(3)AM启动时会向RM注册,并向RM申请资源
(4)AM一旦申请到资源也就是Container之后, 会在对应的Container(Container信息里面会包含NM节点信息)启动Executor
(5)AM的SC会分配任务及给Executor进行执行(之前Executor会去向AM中的SC注册), 同时EXecutor会向AM汇报运行的状态和进度,也就是上面绿色的通信
(6)AM向RM注册之后, AM会定时向RM汇报程序的运行状态等信息,也就是上面红色部分的通信。

1.2 Yarn-client

image-20200320161149650

Yarn-Client

其具体流程和上面的yarn-cluster很类似
(1) Client端启动应用程序,提交APP到YARN RM, 这个过程则涉及到SC的初始化,SC启动时会去初始化DAGScheduler调度器, 使用反射方法去初始化YarnScheduler 和 YarnClientSchedulerBackend,最终Client会去向RM申请启动AM
(2)RM收到请求之后, 就会在集群中随机选择一个NM, 为该应用程序分配第一个Container, 然后在这个Contaiiner上启动AM,AM则实现了SC等的初始化, 此处与yarn-cluster不同的是,SC的初始化驱动程序的启动并不在AM中, 也就是AM并不是Driver端,但是AM会和SC通信来获取其需要的资源情况(多少cpu, 多少memory)
(3)当客户端的SC与AM启动完毕,会通信, AM可知道SC需要的资源情况, 然后AM会向RM注册, 并向RM申请资源Container
(4)AM申请到资源Container之后, 会与Container对应的NodeManager通信, 要求他在其Container里面启动Executor, 然后去向客户端的SparkContext注册, 并申请任务集Tasks
(5)客户端的SC分配任务集给Executor,
(6)应用程序运行结束之后,客户端的SC会向RM申请资源释放并去关闭自己,kill进程等

上面的流程介绍完之后, 来对比一下YARN-CLUSTER 与 yarn-client的区别:
其主要区别的是AM的作用不大一样,
yarn-client模式下:AM仅仅向RM请求资源, 然后AM会在对应的Container中要求其所属NodeManager去启动Executor, Client会去与此Container Executor通信, 也就是整个程序运行过程中, Client不能离开
yarn-cluster模式: Driver运行在AM中,也就是SC与Executor的所有通信操作都与Client无关了, 在提交完应用程序之后,Client就可以离开了。

2 spark-submit提交到yarn

首先安装好Hadoop 并配置好Yarn, 之后启动sbin/start-yarn.sh

其次针对Spark的spark-env.sh里面增加:

1
2
export HADOOP_CONF_DIR=/home/kason/bigdata/hadoop-2.7.4/etc/hadoop
export YARN_CONF_DIR=/home/kason/bigdata/hadoop-2.7.4/etc/hadoop

下面说明一个最基本的应用程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkDemo {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("SparkDEMO")
val sc: SparkContext = new SparkContext(conf)

val dataRDD: RDD[Int] = sc.parallelize(Array(1,2,3,4))

dataRDD.foreach(print)
print(dataRDD.count())
}
}

通过maven编译成jar包, 然后通过spark-shell去提交此jar包到yarn上

1
2
3
4
5
6
7
./bin/spark-submit 
--class SparkDemo
--master yarn
--deploy-mode cluster
--driver-memory 1G
--executor-memory 1G
--executor-cores 1 /home/kason/workspace/BigdataComponents/SparkLearn/target/SparkLearn-1.0-SNAPSHOT.jar

提交之后显示:

image-20200320161435874

image-20200320161303011

根据Yarn上的application id找到对应日志:

image-20200320161340856

3 IDEA代码提交到yarn

通过IDEA实现代码提交其实很简单, 主要是设置Master, Yarn模式不像mesos以及standalone模式通过传输url来实现资源管理, yarn模式实际上是Hadoop Yarn接管资源管理,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkDemo {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("SparkDEMO")
.setMaster("yarn")
//.setMaster("spark://kason-pc:7077")
.set("spark.yarn.jars","hdfs://kason-pc:9000/system/spark/yarn/jars/*")
.setJars(List("/home/kason/workspace/BigdataComponents/out/artifacts/SparkLearn_jar/SparkLearn.jar"))
//.setJars(GETJars.getJars("/home/kason/workspace/BigdataComponents/spark-main/target/spark-main/WEB-INF/lib"))
val sc: SparkContext = new SparkContext(conf)


val dataRDD: RDD[Int] = sc.parallelize(Array(1,2,3,4))

val result = dataRDD.map(res => res * 2)
result.collect().foreach(println(_))
print(result.count())
}
}

运行spark之后, 去8088yarn页面去查看一下:

image-20200320161459965

注意IDEA代码提交到yarn只能使用yarn-client模式

作者:kason_zhang
链接:https://www.jianshu.com/p/d312cb2f36ed