33-98-spark-核心编程-RDD算子和任务阶段等
创始人
2024-04-22 20:50:56
0

33-spark-核心编程-RDD:

1、RDD的创建,4中方式。分别是从内存中创建,从文件中创建,从其他RDD创建和new RDD,后两者不常用。

创建:big-data-study\Spark-demo\src\main\java\spark\core\com\zh\rdd\builder

2、RDD并行度和分区big-data-study\Spark-demo\src\main\java\spark\core\com\zh\rdd\builder\Spark01_RDD_Memor_Par

3、内存读取数据时,数据的分区Spark01_RDD_Memor_Par_v1

4、文件读取数据时,数据如何分配到对应分区Spark02_RDD_File_Par

5、RDD方法(RDD算子,分为转换算子和行动算子)

RDD方法:

​ 1.转换:功能的补充和封装,将旧的RDD包装成新的RDD,flatMap,map一层一层的。

​ 2.行动:触发任务的调度和作业的执行,collect

RDD方法=》RDD算子

​ 认知心理学认为解决问题其实将问题的状态进行改变:
​ 问题(初始) => 操作(算子) =>问题(审核中) => 操作(算子) => 问题(完成)

RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型

Value类型

1.map:Spark01_RDD_Operator_transform

函数说明:将处理的数据逐条进行映射转换,转换可以是类型的转换,也可以是值的转换。

2.mapPartitions:Spark02_RDD_Operator_transform

函数说明:将待处理的数据以分区为单位发送到计算节点进行处理

3.mapPartitionsWithIndex:Spark03_RDD_Operator_transform

函数说明:相对于mapPartitions而言,可以获取到分区的索引

4.flatMap:Spark04_RDD_Operator_transform_flatmap

函数说明:将处理的数据进行扁平化后再进行映射处理

5.glom:Spark04_RDD_Operator_transform_glom

函数说明:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

6.groupBy:Spark05_RDD_Operator_transform_groupby

函数说明:将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,将该操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中。一个组的数据在一个分区中,但是并不是说一个分区中只有一个组7

7.filter:Spark06_RDD_Operator_transform_filter

算法说明:将数据根据指定的规则进行筛选过滤。数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

8.sample(了解,针对数据倾斜可查找原因等):Spark07_RDD_Operator_transform_sample

算法说明:根据指定的规则从数据集中抽取数据

9.distinct Spark08_RDD_Operator_transform_distinct

算法说明:将数据集中重复的数据去重

10.coalesce Spark09_RDD_Operator_transform_coalesce

算法说明:根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率,当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

11.repartition Spark10_RDD_Operator_transform_repartition

算法说明:该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。

  1. sortBy Spark11_RDD_Operator_transform_sortby

算法说明:该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程

双Value类型

1.intersection(交集),union(并集),subtract(差集),zip(拉链) Spark12_RDD_Operator_transform_doubleValueType

Key - Value 类型

1.partitionBy Spark13_RDD_Operator_transform_keyValue_partitionBy

算法说明:将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner

2.reduceByKey Spark14_RDD_Operator_transform_keyValue_reduceByKey

算法说明:可以将数据按照相同的 Key 对 Value 进行聚合

3.groupByKey Spark15_RDD_Operator_transform_keyValue_groupByKey

算法说明:将数据源的数据根据 key 对 value 进行分组

reduceByKey 和 groupByKey 的区别?

shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。

从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey

groupbykey:先分组,再聚合

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wqI1u0kl-1670771755999)(png/image-20210929233653636.png)]

reduceByKey:在落盘之前先对相同的key进行聚合,落盘数据减少,对读取数据也减少,提升性能。结果一样。combine预聚合

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-quobbNLW-1670771756000)(png/image-20210929234454175.png)]

reduceByKey分区内和分区间计算规则是相同的。

4.aggregateByKey图解,Spark16_RDD_Operator_transform_keyValue_aggregateByKey

算法说明:将数据根据不同的规则进行分区内计算和分区间计算

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4iDNHKw2-1670771756001)(png/image-20211007120428128.png)]

5.foldByKey Spark16_RDD_Operator_transform_keyValue_aggregateByKey

算法说明:当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey

6.combineByKey Spark17_RDD_Operator_transform_keyValue_combineByKey

算法说明:最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

7.key-value reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别 Spark18_RDD_Operator_transform_keyValue_ByKeyDifferent

8.sortByKey Spark19_RDD_Operator_transform_keyValue_sortByKey

算法说明:在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的

9.join,leftOuterJoin,rightOuterJoin Spark20_RDD_Operator_transform_keyValue_join

算法说明:连接,类似数据库的join,leftjoin,rightjoin

10.cogroup Spark21_RDD_Operator_transform_keyValue_cogroup

算法说明:在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD,分组连接

测试:1) 数据准备,agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔。

  1. 需求描述,统计出每一个省份每个广告被点击数量排行的 Top3

  2. 需求分析

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-L0CVtWQ1-1670771756001)(png/image-20211008224951463.png)]

  3. 功能实现 Spark22_RDD_Req

行动算子 :触发作业的执行

1、collect Spark01_RDD_Operaor_Action_collect

算法说明:在驱动程序中,以数组 Array 的形式返回数据集的所有元素

2、reduce,count,first,take,takeOrdered Spark02_RDD_Operaor_Action_Reduce

算法说明:reduce聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

count:返回 RDD 中元素的个数 first:返回 RDD 中的第一个元素 take:返回一个由 RDD 的前 n 个元素组成的数组

takeOrdered:返回该 RDD 排序后的前 n 个元素组成的数组

3、aggregate,fold Spark03_RDD_Operaor_Action_aggregate

算法说明:分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合,fold是aggregate 的简化版操作

4、countByKey Spark04_RDD_Operaor_Action_countByKey

算法说明:统计每种 key 的个数

5、save Spark05_RDD_Operaor_Action_save

算法说明:将数据保存到不同格式的文件中

6、foreach Spark06_RDD_Operaor_Action_forearch

算法说明:分布式遍历 RDD 中的每一个元素,调用指定函数,rdd.collect.foreach(print) 和 rdd.foreach(print)的区别

先采集后打印rdd.collect.foreach(print)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3arZlvTD-1670771756002)(png/image-20211015211503377.png)]

rdd.foreach(print)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VJvHAUVj-1670771756003)(png/image-20211015211621864.png)]

RDD 序列化

  1. 闭包检查

从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。Scala2.12 版本后闭包编译方式发生了改变

  1. 序列化方法和属性

从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行,看如下代码:

operator\serializable\Spark01_RDD_Serial.scala

  1. Kryo 序列化框架

Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型

已经在 Spark 内部使用 Kryo 来序列化。注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。 Spark02_RDD_Kyro

RDD 血缘-依赖关系 Spark02_RDD_Dep

1) RDD 血缘关系

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mC2y4mFQ-1670771756003)(png/image-20211016112019373.png)]

相邻的两个RDD的关系称之为依赖关系
val rdd1 = rdd.map(_ * 2)
新的RDD依赖于旧的RDD
多个连续的RDD的依赖关系,称之为血缘关系.
每个RDD都会保存血缘关系

RDD 依赖关系

这里所谓的依赖关系,其实就是两个相邻 RDD 之间的关系

RDD不会保存数据的
RDD为了提供容错性,需要将RDD间的关系保存下来
一旦出现错误,可以根据血缘关系将数据源重新读取进行计算

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3bvdAsGK-1670771756004)(png/image-20211016112308503.png)]

RDD宽依赖-窄依赖

窄依赖:窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。分区的数据对应,没有进行shuffle打乱。即org.apache.spark.OneToOneDependency

宽依赖:宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。例如入门的数据单双数字的分区,一对多

RDD阶段划分

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5uRHAAvZ-1670771756005)(png/image-20211016122438689.png)]

窄依赖:新旧之间,分区一对一,不需要等待另一个分区的数据完成以后才能进行新的操作。每个分区对应一个task,每个task先执行旧的在执行新的。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wHjhc3Yf-1670771756006)(png/image-20211016122531942.png)]

宽依赖:存在阶段性,需要通过shuffle打乱重排,则需要另一个分区的数据任务执行完毕后,即阶段一统一完成以后才能执行阶段二。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mVyJHiJo-1670771756007)(png/image-20211016122733191.png)]

RDD 阶段划分源码

org\apache\spark\scheduler\DAGScheduler.scalatry {// New stage creation may throw an exception if, for example, jobs are run on 
a// HadoopRDD whose underlying HDFS files have been deleted.finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {case e: Exception =>logWarning("Creating new stage failed due to exception - job: " + jobId, e)listener.jobFailed(e)return
}
……
private def createResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],jobId: Int,callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] 
= {
getShuffleDependencies(rdd).map { shuffleDep =>getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
……
private[scheduler] def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {val toVisit = waitingForVisit.pop()if (!visited(toVisit)) {visited += toVisittoVisit.dependencies.foreach {case shuffleDep: ShuffleDependency[_, _, _] =>parents += shuffleDepcase dependency =>waitingForVisit.push(dependency.rdd)}} }
parents
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VlmaHyqa-1670771756007)(png/image-20211016133352906.png)]

当RDD中存在shuffle依赖时,阶段会自动增加一一个
阶段的数量= shuffle依赖的数量+ 1
ResultStage只有一个,最后需要执行的阶段

RDD任务划分源码

RDD 任务划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task

⚫ Application:初始化一个 SparkContext 即生成一个 Application;

⚫ Job:一个 Action 算子就会生成一个 Job;

⚫ Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;

⚫ Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。

ShuffleMapStage => ShuffleMap Task
ResultStage => ResultTask

注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。 结合代码流程理解

任务数量=当前阶段中最后一个RDD的分区数量

val tasks: Seq[Task[_]] = try {stage match {case stage: ShuffleMapStage =>partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = stage.rdd.partitions(id)new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, 
Option(jobId),Option(sc.applicationId), sc.applicationAttemptId)}case stage: ResultStage =>partitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = stage.rdd.partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)}}
……
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
……
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster.findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions) }

学习路径:https://space.bilibili.com/302417610/,如有侵权,请联系q进行删除:3623472230

相关内容

热门资讯

电视安卓系统哪个品牌好,哪家品... 你有没有想过,家里的电视是不是该升级换代了呢?现在市面上电视品牌琳琅满目,各种操作系统也是让人眼花缭...
安卓会员管理系统怎么用,提升服... 你有没有想过,手机里那些你爱不释手的APP,背后其实有个强大的会员管理系统在默默支持呢?没错,就是那...
安卓系统软件使用技巧,解锁软件... 你有没有发现,用安卓手机的时候,总有一些小技巧能让你玩得更溜?别小看了这些小细节,它们可是能让你的手...
安卓系统提示音替换 你知道吗?手机里那个时不时响起的提示音,有时候真的能让人心情大好,有时候又让人抓狂不已。今天,就让我...
安卓开机不了系统更新 手机突然开不了机,系统更新还卡在那里,这可真是让人头疼的问题啊!你是不是也遇到了这种情况?别急,今天...
安卓系统中微信视频,安卓系统下... 你有没有发现,现在用手机聊天,视频通话简直成了标配!尤其是咱们安卓系统的小伙伴们,微信视频功能更是用...
安卓系统是服务器,服务器端的智... 你知道吗?在科技的世界里,安卓系统可是个超级明星呢!它不仅仅是个手机操作系统,竟然还能成为服务器的得...
pc电脑安卓系统下载软件,轻松... 你有没有想过,你的PC电脑上安装了安卓系统,是不是瞬间觉得世界都大不一样了呢?没错,就是那种“一机在...
电影院购票系统安卓,便捷观影新... 你有没有想过,在繁忙的生活中,一部好电影就像是一剂强心针,能瞬间让你放松心情?而我今天要和你分享的,...
安卓系统可以写程序? 你有没有想过,安卓系统竟然也能写程序呢?没错,你没听错!这个我们日常使用的智能手机操作系统,竟然有着...
安卓系统架构书籍推荐,权威书籍... 你有没有想过,想要深入了解安卓系统架构,却不知道从何下手?别急,今天我就要给你推荐几本超级实用的书籍...
安卓系统看到的炸弹,技术解析与... 安卓系统看到的炸弹——揭秘手机中的隐形威胁在数字化时代,智能手机已经成为我们生活中不可或缺的一部分。...
鸿蒙系统有安卓文件,畅享多平台... 你知道吗?最近在科技圈里,有个大新闻可是闹得沸沸扬扬的,那就是鸿蒙系统竟然有了安卓文件!是不是觉得有...
宝马安卓车机系统切换,驾驭未来... 你有没有发现,现在的汽车越来越智能了?尤其是那些豪华品牌,比如宝马,它们的内饰里那个大屏幕,简直就像...
p30退回安卓系统 你有没有听说最近P30的用户们都在忙活一件大事?没错,就是他们的手机要退回安卓系统啦!这可不是一个简...
oppoa57安卓原生系统,原... 你有没有发现,最近OPPO A57这款手机在安卓原生系统上的表现真是让人眼前一亮呢?今天,就让我带你...
安卓系统输入法联想,安卓系统输... 你有没有发现,手机上的输入法真的是个神奇的小助手呢?尤其是安卓系统的输入法,简直就是智能生活的点睛之...
怎么进入安卓刷机系统,安卓刷机... 亲爱的手机控们,你是否曾对安卓手机的刷机系统充满好奇?想要解锁手机潜能,体验全新的系统魅力?别急,今...
安卓系统程序有病毒 你知道吗?在这个数字化时代,手机已经成了我们生活中不可或缺的好伙伴。但是,你知道吗?即使是安卓系统,...
奥迪中控安卓系统下载,畅享智能... 你有没有发现,现在汽车的中控系统越来越智能了?尤其是奥迪这种豪华品牌,他们的中控系统简直就是科技与艺...