锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

大数据高级开发工程师——Spark学习笔记(8)

时间:2023-12-27 14:37:02 sc连接器挂掉的原因

文章目录

  • Spark内存计算框架
    • Spark SQL
      • Spark动态资源划分
        • 1. Executor动态调整范围?
        • 2. 超时被杀的Executor如何处理中持久化数据?
        • 3. 如何开启Spark动态资源划分
        • 4. 动态资源分配策略
        • 5. 动态资源回收策略
      • Spark调优
        • 1. 分配更多资源
          • 分配哪些资源
          • 参数调整到多大是最大的
          • 为什么调大资源后能提高性能?
        • 2. 提高并行度
          • 设置task的数量
          • 如何设置task数量提高并行度
          • 给RDD重新设置partition的数量
          • 提高sparksql运行的task数量
        • 3. RDD重用和持久化
          • 问题说明
          • 如何对rdd进行持久化
          • rdd序列化可用于持久化
        • 4. 使用广播变量
          • 场景描述
          • 引入广播变量
          • 使用广播变量后的性能分析
          • 广播变量使用注意事项
          • 如何使用广播变量
        • 5. 尽量避免使用shuffle类算子
          • shuffle描述
          • 会产生哪些算子操作?shuffle
          • 如何避免发生shuffle
          • 使用map-side预聚合的shuffle操作
        • 6. 使用高性能算子
          • 使用reduceByKey/aggregateByKey替代groupByKey
          • 使用mapPartitions替代普通map
          • 使用foreachPartition替代foreach
          • 使用filter之后进行coalesce操作
          • 使用repartitionAndSortWithinPartitions替代repartition与sort类操作
          • repartition解决sparkSql低并行性问题
        • 7. 使用 kryo 优化序列化性能
          • spark序列化介绍
          • kryo序列化启用后生效的地方
          • 如何开启kryo序列化机制
        • 8. 使用 fastutil 优化数据格式
          • fastutil介绍
          • fastutil好处
          • spark中应用fastutil场景及使用
        • 9. 调整数据本地化等待时间长
          • 本地化级别
          • 数据本地化等待时间
          • 如何调整参数,测试
        • 10. 基于Spark调整内存模型
          • spark中executor内存划分
          • spark的内存模型
          • 静态内存模型
          • 统一内存模型
          • 提交脚本参数的任务

Spark内存计算框架

Spark SQL

Spark动态资源划分

  • 动态资源划分主要是 spark 用于计算时资源不够或剩余为了最大限度地利用资源,动态资源被划分。
  • 在 spark 所谓资源单位一般是指 executors,和 yarn 中的 containers 一样。
  • 在 Spark On Yarn 模式下,通常使用 -num-executors 来指定 Application 使用的 executors 数量,而 -executor-memory-executor-cores 分别用来指定每一个 executor 使用的内存和虚拟 CPU 核数。

假设有这样的场景,如果使用的话 Hive,同时使用多个用户 hive-cli 只有当用户提交并执行数据开发和分析时 Hive SQL 时间,才会向 YARN 申请资源,执行任务。如果不提交执行,无非就是停留。 hive-cli 命令行,也就是一个 JVM 而且,不会浪费 YARN 的资源。
现在想用 Spark-SQL 代替 Hive 多用户同时使用数据开发和分析。 yarn-client 模式运行 spark-sql 启动时指定命令行–num-executors 10,然后每个用户在启动时都会使用它 10 个YARN的资源(Container),只有当用户退出时,这10个资源才会被占用 spark-sql 命令行时才会释放。

# 直接通过-e执行任务后,回收资源 spark-sql --master yarn-client \ --executor-memory 512m --num-executors 10 \ --conf  spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse \ --jars /bigdata/install/hadoop-3.1.4/share/hadoop/common/hadoop-lzo-0.4.20.jar  \ -e  "select count(*) from game_center.ods_task_log;"  # 进入spark-sql客户端,但不执行任务,总是持有资源(在这种模式下,即使你不提交资源,申请资源也会永远持有,这显然是不合理的) spark-sql --master yarn-client \ --executor-memory 512m --num-executors 1 \
--conf  spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse \
--jars /bigdata/install/hadoop-3.1.4/share/hadoop/common/hadoop-lzo-0.4.20.jar

Spark SQL On Yarn spark-sql On Yarn,能不能像 Hive 一样,执行 SQL 时才去申请资源,不执行的时候就释放掉资源呢?

  • 从 Spark1.2 之后,对于 On Yarn 模式,已经支持动态资源分配(Dynamic Resource Allocation)。这样,就可以根据 Application 的负载(Task情况),动态的增加和减少 executors。这种策略非常适合在 YARN 上使用 spark-sql 做数据开发和分析,以及将 spark-sql 作为长服务来使用的场景。
  • spark 当中支持通过动态资源划分的方式来实现动态资源的配置,尽量减少内存的持久占用,但是动态资源划分又会产生进一步的问题,例如:
    • executor 动态调整的范围是多少?无限减少,还是无限增加?
    • executor 动态调整速率?线性增减,还是指数增减?
    • 何时移除 executor?
    • 何时新增 executor?只要有新提交的 Task 就新增 Executor 吗?
    • Spark 中的 Executor 不仅仅提供计算能力,还可能存储持久化数据,这些数据在宿主 executor 被 kill 后,该如何访问?
  • 通过 spark-shell 当中最简单的 WordCount 为例来查看 spark 当中的资源划分,打开命令行终端,依次执行以下命令:
# ① 以yarn模式执行,并指定executor个数为1
$ spark-shell --master=yarn --num-executors=1

# ② 提交Job1 WordCount
scala> sc.textFile("/bigdata/job1.txt").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).count();

# ③ 提交Job2 WordCount
scala> sc.textFile("/bigdata/job2.txt").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).count();

# ④ Ctrl+C Kill JVM
  • 上述的 Spark 应用中,以 yarn 模式启动 spark-shell,并顺序执行两次 WordCount,最后 Ctrl+C 退出 spark-shell。此例中Executor的生命周期如下图:

在这里插入图片描述

  • 从上图可以看出,Executor 在整个应用执行过程中,其状态一直处于 Busy(执行Task)或 Idle(空等)。处于 Idle 状态的 Executor 造成资源浪费这个问题已经在上面提到,下面重点看下开启Spark动态资源分配功能后 Executor 如何运作。

  • ① spark-shell Start:启动 spark shell 应用,并通过 --num-executor 指定一个执行器;
  • ② Executor1 Start:启动执行器 Executor1。注意:Executor 启动前存在一个 AM 向 RM 申请资源的过程,所以启动时机略微滞后于Driver;
  • ③ Job1 Start:提交第一个 WordCount 作业,此时,Executor1 处于 Busy 状态;
  • ④ Job1 End:作业1结束,Executor1 又处于 Idle 状态;
  • ⑤ Executor timeout:Executor1 空闲一段时间后,超时被 Kill;
  • ⑥ Job Submit:提交第二个 WordCount,此时,没有 Active 的 Executor 可用,Job2 处于 Pending 状态;
  • ⑦ Executor2 Start:检测到 Pending 状态的任务,此时 Spark 会启动 Executor2;
  • ⑧ Job2 Start:此时已经有Active 的执行器,Job2 会被分配到 Executor2 上执行;
  • ⑨ Job2 End:Job2 执行结束;
  • ⑩ Executor2 End:Ctrl + C 杀死 Driver后,Executor2 也会被 RM 杀死。

上述流程中需要重点关注的几个问题:

  • Executor 超时:当 Executor 不执行任何任务时,会被标记为 Idle 状态,空闲一段时间后即被认为超时,会被 kill 掉。该空闲时间由 spark.dynamicAllocation.executorIdleTimeout 决定,默认 60s,对应图中Job1 End 到 Executor timeout之间的时间。
  • 资源不足时,何时新增 Executor:当有 Task 处于 Pending 状态,意味着资源不足,此时需要增加 Executor。这段时间由 spark.dynamicAllocation.schedulerBacklogTimeout 控制,默认 1s,对应图中Job2 Submit 到 Executor2 Start之间的时间。
  • 该新增多少 Executor:新增 Executor 的个数主要依据是当前负载情况,即 running 和 Pending 任务数、以及当前 Executor 个数决定。
    • 用 maxNumExecutorsNeeded 代表当前实际需要的最大 Executor 个数,maxNumExecutorsNeeded 和当前 Executor 个数的差值即为潜在的新增 Executor 个数。
    • 注意:之所以说潜在个数,是因为最终新增的 Executor 个数还有别的因素需要考虑,后面再分析。下面是 maxNumExecutorsNeeded 计算方法:
private def maxNumExecutorsNeeded(): Int = { 
        
  val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
  math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
            tasksPerExecutorForFullParallelism)
    .toInt
}
  • 其中,numRunningOrPendingTasks 为当前 running 和 pending 任务数之和。
  • executorAllocationRatio:最理想的情况下,有多少待执行的任务,那么我们就新增多少个 Executor,从而达到最大的任务并发度。但是这也有副作用,如果当前任务都是小任务,那么这一策略就会造成资源浪费。可能最后申请的 Executor 还没启动,这些小任务就已经被执行完了。该值是一个系数值,范围在 [0, 1],默认值 1。
  • tasksPerExecutorForFullParallelism:每个 Executor 的最大并发数,简单理解为 cpu核心数(spark.executor.cores)/ 每个任务占用的核心数(spark.task.cpus)。

1. Executor动态调整范围?

executor动态调整的范围?无限减少?无限制增加?调整速率?

  • 要实现资源的动态调整,那么限定调整范围是最先考虑的事情,Spark通过下面几个参数实现:
    • spark.dynamicAllocation.minExecutors:Executor 调整下限(默认值:0)
    • spark.dynamicAllocation.maxExecutors:Executor 调整上限(默认值:Integer.MAX_VALUE)
    • spark.dynamicAllocation.initialExecutors:Executor 初始化数量(默认值:minExecutors)
  • 三者的关系必须满足:minExecutors <= initialExecutors <= maxExecutors
  • 注意:如果显示指定了num-executors参数,那么 initialExecutors 就是 num-executor 指定的值。

2. 超时被杀的Executor中持久化数据如何处理?

Spark中的Executor既提供计算能力,也提供存储能力。这些因超时被杀死的Executor中持久化的数据如何处理?

  • 如果 Executor 中缓存了数据,那么该 Executor 的 Idle-timeout 时间就不是由 executorIdleTimeout 决定,而是用 spark.dynamicAllocation.cachedExecutorIdleTimeout 控制,默认值:Integer.MAX_VALUE。
  • 如果手动设置了该值,当这些缓存数据的 Executor 被 kill 后,我们可以通过 NodeManannger 的 External Shuffle Server 来访问这些数据。
  • 这就要求 NodeManager 中 spark.shuffle.service.enabled 必须开启。

3. 如何开启Spark的动态资源划分

  • 第一步、修改 yarn-site.xml 配置文件
<property>
    <name>yarn.nodemanager.aux-servicesname>
    <value>mapreduce_shuffle,spark_shufflevalue>
property>
<property>
    <name>yarn.nodemanager.aux-services.spark_shuffle.classname>
    <value>org.apache.spark.network.yarn.YarnShuffleServicevalue>
property>
<property>
    <name>spark.shuffle.service.portname>
    <value>7337value>
property>
  • 第二步、配置 spark 的配置文件:修改 spark-conf 的配置选项,开启动态资源划分,或者直接修改spark-defaults.conf,增加以下参数
# 启用External shuffle Service服务
spark.shuffle.service.enabled true
# Shuffle Service服务端口,必须和yarn-site中的一致
spark.shuffle.service.port 7337 
# 开启动态资源分配
spark.dynamicAllocation.enabled true
# 每个Application最小分配的executor数
spark.dynamicAllocation.minExecutors 1  
# 每个Application最大并发分配的executor数
spark.dynamicAllocation.maxExecutors 30  
spark.dynamicAllocation.schedulerBacklogTimeout 1s 
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s

4. 动态资源分配策略

  • 开启动态分配策略后,application 会在 task 因没有足够资源被挂起的时候去动态申请资源,这种情况意味着该 application 现有的 executor 无法满足所有 task 并行运行。
  • spark 一轮一轮的申请资源,当有 task 挂起或等待 spark.dynamicAllocation.schedulerBacklogTimeout(默认1s)时间的时候,会开始动态资源分配;
  • 之后会每隔 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout(默认1s)时间申请一次,直到申请到足够的资源。每次申请的资源量是指数增长的,即 1、2、4、8 等。
  • 之所以采用指数增长,出于两方面考虑:
    • 其一、开始申请的少是考虑到可能 application 会马上得到满足;
    • 其次、要成倍增加,是为了防止 application 需要很多资源,而该方式可以在很少次数的申请之后得到满足。

5. 动态资源回收策略

  • 当 application 的 executor 空闲时间超过 spark.dynamicAllocation.executorIdleTimeout(默认60s)后,就会被回收。

Spark调优

1. 分配更多的资源

  • 性能优化调优的王道,就是增加和分配更多的资源,这对于性能和速度上的提升是显而易见的,基本上,在一定范围之内,增加资源与性能的提升,是成正比的;
  • 写完了一个复杂的spark作业之后,进行性能调优的时候,首先第一步,就是要来调节最优的资源配置;
  • 在这个基础之上,如果说你的spark作业,能够分配的资源达到了你的能力范围的顶端之后,无法再分配更多的资源了,公司资源有限,那么才是考虑去做后面的这些性能调优的点。

相关问题:

  • 分配哪些资源?
  • 在哪里可以设置这些资源?
  • 剖析为什么分配这些资源之后,性能可以得到提升?
分配哪些资源
  • 在实际的生产环境中,提交 spark 任务时,使用 spark-submit shell 脚本,在里面调整对应的参数【executor-memory、executor-cores、driver-memory】。
  • 提交脚本:
 spark-submit \
 --master spark://node1:7077 \
 --class com.yw.spark.example.WordCount \
 --num-executors 3 \    # 配置executor的数量
 --driver-memory 1g \   # 配置driver的内存(影响不大)
 --executor-memory 1g \ # 配置每一个executor的内存大小
 --executor-cores 3 \   # 配置每一个executor的cpu个数
 /bigdata/data/WordCount.jar
参数调节到多大,算是最大
  • Standalone 模式:
    • 先计算出公司 spark 集群上的所有资源,每台节点的内存大小和 cpu 核数,比如:一共有 20 台 worker 节点,每台节点 8g 内存,10个cpu。
    • 实际任务在给定资源的时候,可以给 20 个 executor、每个 executor 的内存 8g、每个executor的使用的 cpu 个数10。
  • Yarn 模式:
    • 先计算出 yarn 集群的所有大小,比如一共 500g 内存,100个cpu;
    • 这个时候可以分配的最大资源,比如给定 50 个 executor、每个 executor 的内存大小10g,每个 executor 使用的 cpu 个数为 2。
  • 使用原则:在资源比较充足的情况下,尽可能的使用更多的计算资源,尽量去调节到最大的大小,一般达到90%为优
为什么调大资源后性能可以提升

2. 提高并行度

设置task的数量
  • 至少设置成与 spark Application 的总 cpu core 数量相同,理想情况下,150 个核分配150个 task,一起运行差不多同一时间运行完毕。
  • 官方建议,task 数量设置成 spark Application 总cpu core数量的2~3倍。比如150个cpu core,基本设置task数量为 300~500。与理想情况不同的,有些task会运行快一点,比如50s就完了,有些task 可能会慢一点,要一分半才运行完,所以如果你的task数量,刚好设置的跟cpu core 数量相同,可能会导致资源的浪费。
  • 因为比如150个task中10个先运行完了,剩余140个还在运行,但是这个时候,就有10个cpu core空闲出来了,导致浪费。如果设置2~3倍,那么一个task运行完以后,另外一个task马上补上来,尽量让cpu core不要空闲。同时尽量提升spark运行效率和速度,提升性能。
如何设置task数量来提高并行度
  • 设置参数:spark.default.parallelism
    • 默认是没有值的,如果设置值为10,它会在 shuffle 过程才会起作用。
    • 比如 val rdd2 = rdd1.reduceByKey(_+_) ,此时rdd2的分区数就是10。
  • 也可以通过在构建 SparkConf 对象的时候设置,例如:new SparkConf().set(“spark.defalut.parallelism”,“500”)。
给RDD重新设置partition的数量
  • 使用 rdd.repartition 来重新分区,该方法会生成一个新的rdd,使其分区数变大。
  • 此时由于一个 partition 对应一个task,那么对应的 task 个数越多,通过这种方式也可以提高并行度。
提高sparksql运行的task数量
  • http://spark.apache.org/docs/2.3.3/sql-programming-guide.html
  • 专门针对sparkSQL来设置的
  • 通过设置参数 spark.sql.shuffle.partitions=500 (只是shuffle时候生效) 默认为200;
  • 可以适当增大,来提高并行度,比如设置为 spark.sql.shuffle.partitions=500

3. RDD的重用和持久化

问题说明

  • 当第一次使用 rdd2 做相应的算子操作得到 rdd3 的时候,就会从 rdd1 开始计算,先读取 HDFS上 的文件,然后对 rdd1 做对应的算子操作得到rdd2,再由 rdd2 计算之后得到 rdd3。同样为了计算得到 rdd4,前面的逻辑会被重新计算。
  • 默认情况下多次对一个 rdd 执行算子操作,去获取不同的 rdd,都会对这个 rdd 及之前的父 rdd 全部重新计算一次。这种情况在实际开发代码的时候会经常遇到,但是我们一定要避免一个 rdd 重复计算多次,否则会导致性能急剧降低。
  • 解决办法:可以把多次使用到的 rdd,也就是公共 rdd 进行持久化,避免后续需要,再次重新计算,提升效率。

如何对rdd进行持久化
  • 可以调用 rdd 的 cache 或者 persist 方法。
    • cache 方法默认是把数据持久化到内存中 ,例如:rdd.cache ,其本质还是调用了 persist 方法。
    • persist 方法中有丰富的缓存级别,这些缓存级别都定义在 StorageLevel 这个对象中,可以结合实际的应用场景合理的设置缓存级别。例如: rdd.persist(StorageLevel.MEMORY_ONLY),这是cache方法的实现。
rdd持久化时可以采用序列化
  • 如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,这样的话,也许会导致 OOM 内存溢出。
  • 当纯内存无法支撑公共 RDD 数据完全存放的时候,就优先考虑使用序列化的方式在纯内存中存储。将 RDD 的每个 partition 的数据,序列化成一个字节数组;序列化后,大大减少内存的空间占用。
  • 序列化的方式,唯一的缺点就是,在获取数据的时候,需要反序列化。但是可以减少占用的空间和便于网络传输。
  • 如果序列化纯内存方式,还是导致OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。
  • 为了数据的高可靠性,而且内存充足,可以使用双副本机制,进行持久化。
    • 持久化的单副本机制,持久化后的一个副本,因为机器宕机了,副本丢了,就还是得重新计算一次;
    • 持久化的每个数据单元,存储一份副本,放在其他节点上面,从而进行容错;
    • 一个副本丢了,不用重新计算,还可以使用另外一份副本。这种方式,仅仅针对你的内存资源极度充足。比如:StorageLevel.MEMORY_ONLY_2。

4. 广播变量的使用

场景描述
  • 在实际工作中可能会遇到这样的情况,由于要处理的数据量非常大,这个时候可能会在一个 stage 中出现大量的 task,比如有 1000 个task,这些 task 都需要一份相同的数据来处理业务,这份数据的大小为 100M,该数据会拷贝 1000份副本,通过网络传输到各个 task 中去,给 task 使用。这里会涉及大量的网络传输开销,同时至少需要的内存为 1000*100M=100G,这个内存开销是非常大的。不必要的内存的消耗和占用,就导致了你在进行 RDD 持久化到内存,也许就没法完全在内存中放下;就只能写入磁盘,最后导致后续的操作在磁盘 IO 上消耗性能;这对于 spark 任务处理来说就是一场灾难。
  • 由于内存开销比较大,task 在创建对象的时候,可能会出现堆内存放不下所有对象,就会导致频繁的垃圾回收器的回收 GC。GC 的时候一定是会导致工作线程停止,也就是导致 Spark 暂停工作那么一点时间。频繁 GC 的话,对 Spark 作业的运行的速度会有相当可观的影响。

广播变量引入
  • Spark中分布式执行的代码需要传递到各个 executor 的 task 上运行。对于一些只读、固定的数据,每次都需要 Driver 广播到各个 Task 上,这样效率低下。
  • 广播变量允许将变量只广播给各个 executor。该 executor 上的各个 task 再从所在节点的 BlockManager(负责管理某个executor对应的内存和磁盘上的数据)获取变量,而不是从 Driver 获取变量,从而提升了效率。

  • 广播变量初始的时候,就在 Drvier 上有一份副本。通过在 Driver 把共享数据转换成广播变量。
  • task 在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor 对应的 BlockManager 中,尝试获取变量副本;
  • 如果本地没有,那么就从 Driver 远程拉取广播变量副本,并保存在本地的 BlockManager 中;
  • 此后这个 executor 上的task,都会直接使用本地的 BlockManager 中的副本。那么这个时候所有该 executor 中的 task 都会使用这个广播变量的副本。也就是说一个 executor 只需要在第一个 task 启动时,获得一份广播变量数据,之后的 task 都从本节点的 BlockManager 中获取相关数据。
  • executor 的 BlockManager 除了从 driver 上拉取,也可能从其他节点的 BlockManager 上拉取变量副本,网络距离越近越好。
使用广播变量后的性能分析
  • 比如一个任务需要 50 个executor,1000 个task,共享数据为 100M。
    • 在不使用广播变量的情况下,1000 个task,就需要该共享数据的 1000 个副本,也就是说有 1000 份数需要大量的网络传输和内存开销存储,耗费的内存大小1000 * 100=100G。
    • 使用了广播变量后,50 个 executor 就只需要 50 个副本数据,而且不一定都是从 Driver 传输到每个节点,还可能是就近从最近的节点的 executor 的 blockmanager 上拉取广播变量副本,网络传输速度大大增加;内存开销 50*100M=5G。
  • 总结:
    • 不使用广播变量的内存开销为 100G,使用后的内存开销 5G,这里就相差了 20 倍左右的网络传输性能损耗和内存开销,使用广播变量后对于性能的提升和影响,还是很可观的。
    • 广播变量的使用不一定会对性能产生决定性的作用。比如运行 30 分钟的 spark 作业,可能做了广播变量以后,速度快了2分钟,或者5分钟。但是一点一滴的调优,积少成多,最后还是会有效果的。
广播变量使用注意事项
  • 能不能将一个 RDD 使用广播变量广播出去?不能,因为 RDD 是不存储数据的。可以将 RDD 的结果广播出去。
  • 广播变量只能在 Driver 端定义,不能在 Executor 端定义。
  • 在 Driver 端可以修改广播变量的值,在 Executor 端无法修改广播变量的值。
  • 如果 executor 端用到了 Driver 的变量,如果不使用广播变量在 Executor 有多少 task 就有多少 Driver 端的变量副本。
  • 如果 Executor 端用到了 Driver 的变量,如果使用广播变量在每个 Executor 中只有一份Driver端的变量副本。
如何使用广播变量
  • 通过 sparkContext 的 broadcast 方法把数据转换成广播变量,类型为Broadcast
val broadcastArray: Broadcast[Array[Int]] = sc.broadcast(Array(1,2,3,4,5,6))
  • 然后 executor 上的 BlockManager 就可以拉取该广播变量的副本获取具体的数据。获取广播变量中的值可以通过调用其 value 方法:
val array: Array[Int] = broadcastArray.value

5. 尽量避免使用shuffle类算子

shuffle描述
  • spark 中的 shuffle 涉及到数据要进行大量的网络传输,下游阶段的 task 任务需要通过网络拉取上阶段 task 的输出数据,shuffle 过程,简单来说,就是将分布在集群中多个节点上的同一个 key,拉取到同一个节点上,进行聚合或join 等操作。比如 reduceByKey、join 等算子,都会触发 shuffle 操作。
  • 如果有可能的话,要尽量避免使用 shuffle 类算子,因为 Spark 作业运行过程中,最消耗性能的地方就是 shuffle 过程。
哪些算子操作会产生shuffle
  • spark 程序在开发的过程中使用 reduceByKey、join、distinct、repartition 等算子操作,这里都会产生 shuffle,由于 shuffle 这一块是非常耗费性能的,实际开发中尽量使用 map 类的非 shuffle 算子。
  • 这样的话,没有 shuffle 操作或者仅有较少 shuffle 操作的 Spark 作业,可以大大减少性能开销。
如何避免产生shuffle
  • 错误的做法:传统的 join 操作会导致 shuffle 操作。因为两个 RDD 中,相同的 key 都需要通过网络拉取到一个节点上,由一个 task 进行 join 操作。
  • 正确的做法:Broadcast + map的join操作,不会导致shuffle操作。使用Broadcast将一个数据量较小的RDD作为广播变量。
  • 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。
  • 注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。因为每个Executor的内存中,都会驻留一份rdd2的全量数据。
使用map-side预聚合的shuffle操作

6. 使用高性能算子

使用reduceByKey/aggregateByKey替代groupByKey
  • reduceByKey/aggregateByKey 可以进行预聚合操作,减少数据的传输量,提升性能。
  • groupByKey 不会进行预聚合操作,进行数据的全量拉取,性能比较低。
使用mapPartitions替代普通map
  • mapPartitions 类的算子,一次函数调用会处理一个 partition 所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。
  • 但是有的时候,使用 mapPartitions 会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个 partition 所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现 OOM 异常。所以使用这类操作时要慎重!
使用foreachPartition替代foreach
  • 原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个 partition 的所有数据,而不是一次函数调用处理一条数据。
  • 在实践中发现,foreachPartitions 类的算子,对性能的提升还是很有帮助的。比如在 foreach 函数中,将 RDD 中所有数据写 MySQL,那么如果是普通的foreach 算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;
  • 但是如果用 foreachPartitions 算子一次性处理一个 partition 的数据,那么对于每个 partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于1万条左右的数据量写 MySQL,性能可以提升 30% 以上。
使用filter之后进行coalesce操作
  • 通常对一个 RDD 执行 filter 算子过滤掉 RDD 中较多数据后(比如30%以上的数据),建议使用 coalesce 算子,手动减少 RDD 的 partition 数量,将 RDD 中的数据压缩到更少的 partition 中去。
  • 因为 filter 之后,RDD 的每个 partition 中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个 task 处理的 partition 中的数据量并不是很多,有一点资源浪费,而且此时处理的 task 越多,可能速度反而越慢。
  • 因此用 coalesce 减少 partition 数量,将 RDD 中的数据压缩到更少的 partition 之后,只要使用更少的 task 即可处理完所有的 partition。在某些场景下,对于性能的提升会有一定的帮助。
  • 在Spark任务中我们经常会使用 filter 算子完成 RDD 中数据的过滤,在任务初始阶段,从各个分区中加载到的数据量是相近的,但是一旦进过 filter 过滤后,每个分区的数据量有可能会存在较大差异

  • 如上图我们可以发现两个问题:
    • 每个 partition 的数据量变小了,如果还按照之前与 partition 相等的 task 个数去处理当前数据,有点浪费 task 的计算资源;
    • 每个 partition 的数据量不一样,会导致后面的每个 task 处理每个 partition 数据的时候,每个 task 要处理的数据量不同,这很有可能导致数据倾斜问题。
  • 如图,第二个分区的数据过滤后只剩100条,而第三个分区的数据过滤后剩下800条,在相同的处理逻辑下,第二个分区对应的task处理的数据量与第三个分区对应的task处理的数据量差距达到了8倍,这也会导致运行速度可能存在数倍的差距,这也就是数据倾斜问题。
  • 针对上述的两个问题,我们分别进行分析:
    • 针对第一个问题,既然分区的数据量变小了,我们希望可以对分区数据进行重新分配,比如将原来4个分区的数据转化到2个分区中,这样只需要用后面的两个task进行处理即可,避免了资源的浪费。
    • 针对第二个问题,解决方法和第一个问题的解决方法非常相似,对分区数据重新分配,让每个 partition 中的数据量差不多,这就避免了数据倾斜问题。
  • 那么具体应该如何实现上面的解决思路?我们需要coalesce算子。
  • repartition 与 coalesce 都可以用来进行重分区,其中 repartition 只是 coalesce 接口中 shuffle 为 true 的简易实现,coalesce 默认情况下不进行 shuffle,但是可以通过参数进行设置。

假设我们希望将原本的分区个数A通过重新分区变为B,那么有以下几种情况:

  • A > B(多数分区合并为少数分区)
    • ① A与B相差值不大:此时使用coalesce即可,无需shuffle过程。
    • ② A与B相差值很大:此时可以使用 coalesce 并且不启用 shuffle 过程,但是会导致合并过程性能低下,所以推荐设置 coalesce 的第二个参数为true,即启动 shuffle 过程。
  • A < B(少数分区分解为多数分区)
    • 此时使用 repartition 即可,如果使用 coalesce 需要将 shuffle 设置为true,否则 coalesce 无效。
    • 我们可以在 filter 操作之后,使用 coalesce 算子针对每个 partition 的数据量各不相同的情况,压缩 partition 的数量,而且让每个 partition 的数据量尽量均匀紧凑,以便于后面的 task 进行计算操作,在某种程度上能够在一定程度上提升性能。
    • 注意:local 模式是进程内模拟集群运行,已经对并行度和分区数量有了一定的内部优化,因此不用去设置并行度和分区数量。
使用repartitionAndSortWithinPartitions替代repartition与sort类操作
  • repartitionAndSortWithinPartitions 是 Spark 官网推荐的一个算子,官方建议,如果需要在 repartition 重分区之后,还要进行排序,建议直接使用 repartitionAndSortWithinPartitions 算子。
  • 因为该算子可以一边进行重分区的 shuffle 操作,一边进行排序。shuffle 与 sort 两个操作同时进行,比先 shuffle 再 sort 来说,性能可能是要高的。
repartition解决sparkSql低并行度问题
  • 在常规性能调优中我们讲解了并行度的调节策略,但是,并行度的设置对于 Spark SQL 是不生效的,用户设置的并行度只对于 Spark SQL 以外的所有 Spark 的 stage 生效。
  • Spark SQL 的并行度不允许用户自己指定,Spark SQL 自己会默认根据 hive 表对应的 HDFS 文件的 split 个数自动设置 Spark SQL 所在的那个 stage 的并行度,用户自己通 spark.default.parallelism 参数指定的并行度,只会在没 Spark SQL 的 stage 中生效。
  • 由于 Spark SQL 所在 stage 的并行度无法手动设置,如果数据量较大,并且此 stage 中后续的 transformation 操作有着复杂的业务逻辑,而 Spark SQL 自动设置的 task 数量很少,这就意味着每个 task 要处理为数不少的数据量,然后还要执行非常复杂的处理逻辑,这就可能表现为第一个有 Spark SQL 的 stage 速度很慢,而后续的没有 Spark SQL 的 stage 运行速度非常快。
  • 为了解决 Spark SQL 无法设置并行度和 task 数量的问题,我们可以使用 repartition 算子
  • Spark SQL 这一步的并行度和 task 数量肯定是没有办法去改变了,但是,对于Spark SQL 查询出来的 RDD,立即使用 repartition 算子,去重新进行分区,这样可以重新分区为多个 partition,从 repartition 之后的 RDD 操作,由于不再涉及 Spark SQL,因此 stage 的并行度就会等于你手动设置的值,这样就避免了 Spark SQL 所在的 stage 只能用少量的 task 去处理大量数据并执行复杂的算法逻辑。使用 repartition 算子的前后对比如下图

7. 使用 kryo 优化序列化性能

spark序列化介绍
  • Spark 在进行任务计算的时候,会涉及到数据跨进程的网络传输、数据的持久化,这个时候就需要对数据进行序列化。
  • Spark默认采用 Java 的序列化器,其优点:处理起来方便,不需要我们手动做其他操作,只是在使用一个对象和变量的时候,需要实现Serializble接口。但是其缺点:序列化机制的效率不高,序列化的速度比较慢,序列化以后的数据,占用的内存空间相对还是比较大。
  • Spark 支持使用 Kryo 序列化机制。Kryo 序列化机制,比默认的Java 序列化机制,速度要快,序列化后的数据要更小,大概是 Java序列化机制的 1/10。所以 Kryo 序列化优化以后,可以让网络传输的数据变少,在集群中耗费的内存资源大大减少。
kryo序列化启用后生效的地方

Kryo序列化机制,一旦启用以后,会生效的几个地方:

  • 算子函数中使用到的外部变量:
    • 算子中的外部变量可能来至于driver,需要涉及到网络传输,就需要用到序列化。
    • 最终可以优化网络传输的性能,优化集群中内存的占用和消耗。
  • 持久化 RDD 时进行序列化,StorageLevel.MEMORY_ONLY_SER:
    • 将 rdd 持久化时,对应的存储级别里,需要用到序列化。
    • 最终可以优化内存的占用和消耗;持久化RDD占用的内存越少,task 执行的时候,创建的对象,就不至于频繁的占满内存,频繁发生 GC。
  • 产生 shuffle 的地方,也就是宽依赖:
    • 下游的 stage 中的 task,拉取上游 stage 中的 task 产生的结果数据,跨网络传输,需要用到序列化。最终可以优化网络传输的性能。
如何开启kryo序列化机制
// 创建SparkConf对象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

8. 使用 fastutil 优化数据格式

  • https://github.com/vigna/fastutil
fastutil介绍
  • fastutil 是扩展了 Java 标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了特殊类型的map、set、list 和 queue。
  • fastutil 能够提供更小的内存占用,更快的存取速度,我们使用fastutil 提供的集合类,来替代自己平时使用的 JDK 的原生的 Map、List、Set
fastutil好处
  • fastutil 集合类,可以减小内存的占用,并且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度
spark中应用fastutil的场景和使用
  • 算子函数使用了外部变量:

    • 你可以使用 Broadcast 广播变量优化;
    • 可以使用 Kryo 序列化类库,提升序列化性能和效率;
    • 如果外部变量是某种比较大的集合,那么可以考虑使用 fastutil 改写外部变量。

    首先从源头上就减少内存的占用(fastutil),通过广播变量进一步减少内存占用,再通过Kryo序列化类库进一步减少内存占用。

  • 算子函数里使用了比较大的集合Map/List:

    • 在你的算子函数里,也就是 task 要执行的计算逻辑里面,如果有逻辑中,出现要创建比较大的Map、List等集合,可能会占用较大的内存空间,而且可能涉及到消耗性能的遍历、存取等集合操作;那么此时,可以考虑将这些集合类型使用 fastutil 类库重写。
    • 使用了 fastutil 集合类以后,就可以在一定程度上,减少 task 创建出来的集合类型的内存占用。
    • 避免 executor 内存频繁占满,频繁唤起 GC,导致性能下降。
  • fastutil的使用:

    • 第一步:在pom.xml中引用fastutil的包:
fastutil fastutil 5.0.9 ``` - 第二步:平时使用List (Integer)的替换成IntList即可。 ```scala List的list对应的到fastutil就是IntList类型 ``` > 使用说明:基本都是类似于 IntList 的格式,前缀就是集合的元素类型; 特殊的就是 Map,Int2IntMap,代表了key-value映射的元素类型。

9. 调节数据本地化等待时长

  • Spark 在 Driver 上对 Application 的每一个 stage 的 task 进行分配之前,都会计算出每个 task 要计算的是哪个分片数据、RDD 的某个partition;Spark 的 task 分配算法,优先会希望每个 task 正好分配到它要计算的数据所在的节点,这样的话就不用在网络间传输数据。
  • 但是通常来说,有时事与愿违,可能 task 没有机会分配到它的数据所在的节点,为什么呢?
  • 可能那个节点的计算资源和计算能力都满了;所以这种时候,通常来说,Spark会等待一段时间,默认情况下是3秒(不是绝对的,还有很多种情况,对不同的本地化级别,都会去等待),到最后实在是等待不了了,就会选择一个比较差的本地化级别,比如说将task分配到距离要计算的数据所在节点比较近的一个节点,然后进行计算。
本地化级别
名称 解析
PROCESS_LOCAL 进程本地化,task和数据在同一个Executor中,性能最好。
NODE_LOCAL 节点本地化,task和数据在同一个节点中,但是task和数据不在同一个Executor中,数据需要在进程间进行传输。
RACK_LOCAL 机架本地化,task和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输。
NO_PREF 对于task来说,从哪里获取都一样,没有好坏之分。
ANY task和数据可以在集群的任何地方,而且不在一个机架中,性能最差。
  • PROCESS_LOCAL:进程本地化

    • 代码和数据在同一个进程中,也就是在同一个 executor 中;
    • 计算数据的 task 由 executor 执行,数据在 executor 的 BlockManager 中;
    • 性能最好。
  • NODE_LOCAL:节点本地化

    • 代码和数据在同一个节点中;
    • 比如说数据作为一个 HDFS block 块,就在节点上,而task在节点上某个executor中运行;
    • 或者是数据和 task 在一个节点上的不同 executor 中;
    • 数据需要在进程间进行传输;
    • 性能其次。
  • RACK_LOCAL:机架本地化

    • 数据和 task 在一个机架的两个节点上;
    • 数据需要通过网络在节点之间进行传输;
    • 性能比较差。
  • ANY:无限制

    • 数据和 task 可能在集群中的任何地方,而且不在一个机架中;
    • 性能最差。
数据本地化等待时长
  • spark.locality.wait,默认是3s:首先采用最佳的方式,等待3s后降级,还是不行,继续降级…,最后还是不行,只能够采用最差的。
  • 在 Spark 项目开发阶段,可以使用client模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的task数据本地化的级别,如果大部分都是 PROCESS_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是 NODE_LOCAL、ANY,那么需要对本地化的等待时长进行调节,通过延长本地化等待时长,看看 task 的本地化级别有没有提升,并观察 Spark 作业的运行时

相关文章