Spark中常用的聚合算子说明及使用
admin
2024-03-11 23:10:24
0

一、groupByKey

1、基本释义

groupByKey 顾名思义是“按照 Key 做分组”,但实际上groupByKey算子包含分组和收集两步。具体来说,对于元素类型为(Key,Value)键值对的 Paired RDD,groupByKey 的功能就是对 Key 值相同的元素做分组,然后把相应的 Value 值,以集合的形式收集到一起。换句话说,groupByKey 会把 RDD 的类型,由 RDD[(Key, Value)]转换为 RDD[(Key, Value 集合)]。

分组但不聚合,带上分组后所有的数据直接进行shuffle操作

2、场景举例

有如下文件student_score.txt,是每个学生考试的分数

100 tom
90 lily
100 小明
80 小亮
100 cat

 统计有哪些学生考了相同的分数

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("agg").setMaster("local")val sc = new SparkContext(conf)val file: String = "./student_score.txt"val lineRDD: RDD[String] = sc.textFile(file)val kvRDD: RDD[(String, String)] = lineRDD.map(line => line.split(" ")).map(arr => (arr(0),arr(1)))val keySetsRDD: RDD[(String, Iterable[String])] = kvRDD.groupByKey()keySetsRDD.foreach(f => {println(f._1,f._2)})}

输出如下

(80,CompactBuffer(小亮))
(100,CompactBuffer(tom, 小明, cat))
(90,CompactBuffer(lily))

二、reduceByKey

1、基本释义

reduceByKey的字面含义是“按照 Key值做聚合”,它的计算逻辑,就是根据聚合函数 f 给出的算法,把 Key 值相同的多个元素,聚合成一个元素。

其实是先分组再聚合的逻辑,与groupByKey相比,会先map端一次聚合运算,减少数据的shuffle操作,然后把聚合后的结果发给reduce端。因为只有一个函数入参,map与reduce阶段只能执行相同的操作

2、场景举例

统计每个分数相同的有多少人

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("agg").setMaster("local")val sc = new SparkContext(conf)val file: String = "./student_score.txt"val lineRDD: RDD[String] = sc.textFile(file)val kvRDD: RDD[(String, String)] = lineRDD.map(line => line.split(" ")).map(arr => (arr(0),arr(1)))val kvRDD2: RDD[(String, Int)] = kvRDD.map(word => (word._1,1))val wordCounts: RDD[(String, Int)] = kvRDD2.reduceByKey((x, y) => x + y)wordCounts.foreach(f => {println(f._1,f._2)})}

 输出如下

(80,1)
(90,1)
(100,3)

三、aggregateByKey

1、基本释义

aggregateByKey逻辑类似 aggregate,但aggregateByKey针对的是PairRDD,即键值对 RDD,所以返回结果也是 PairRDD,结果形式为:(各个Key, 同样Key对应的Value聚合后的值)

aggregateByKey先将每个partition内元素进行分组计算(map端聚合),然后将每个partition的计算结果进行combine(reduce端聚合),得到最终聚合结果。且最终结果允许跟原始RDD类型不同

分组加聚合,在mapper端会做本地的聚合,然后把聚合后的结果发给reducer,与reduceByKey相比还可以可以分区间的聚合操作,即定义reduce阶段的函数,在reduce结算执行更灵活的操作

方法及入参数如下:

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] = self.withScope {aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)}

zeroValue: 每个partition的聚合初始值
seqOp: sequence operation,对partition内数据进行映射,最终1个partition只有1个结果。输入类型为U跟V,输出为U,即每次操作结果要跟zeroValue类型一致
第一次操作时,U为zeroValue(初始值),第一次操作之后输出结果U,作为下一次操作的U
第二次操作及之后操作时,U为前一次操作输出结果,而不再是zeroValue
combOp: combine operation,对每个partition的结果进行combine操作。输入类型为U跟U,输出为U,即输入类型与输出类型一致,最终结果为:(K, U)类型的PairRDD 

2、场景使用

有如下文件,记录了每个学生的各科成绩,求每个学生成绩的总和

99 tom 语文
100 小明 语文
80 小亮 语文
92 tom 数学
80 小明 数学
89 小亮 数学
99 tom 英语
88 小明 英语
90 小亮 英语

 利用aggregateByKey两次求和,最终得到每个学生的总成绩。

由于上述的文件可能会存在多个分区之上,seqFunc函数参与分区内的计算,会有学生部分科目的成绩。combFunc参与分区之间的计算,shuffle时触发,所有分区加起来才是所有科目的成绩

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("agg").setMaster("local")val sc = new SparkContext(conf)val file: String = "./wikiOfSpark.txt"// 读取文件内容val lineRDD: RDD[String] = sc.textFile(file)val kvRDD: RDD[(String, Int)] = lineRDD.map(line => line.split(" ")).map(arr => (arr(1), arr(0).toInt))  val kvRDD2:  RDD[(String, Int)] = kvRDD.aggregateByKey(zeroValue = 0)((x, y) => seqFunc(x, y),(x, y) => combFunc(x, x))kvRDD2.collect().foreach(f => {println(f)})}def seqFunc(a: Int, b: Int): Int = {a+b}def combFunc(a: Int, b: Int): Int = {a+b}

输出

(tom,290)
(小亮,259)
(小明,268)

四、foldByKey

是aggregateByKey函数的简写形式。

方法及参数def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

  • zeroValue:初始值,依次与分区内的数据做迭代。
  • func:(V,V) => V:分区内和分区间的计算逻辑。

aggregateByKey例子中的

kvRDD.aggregateByKey(zeroValue = 0)((x, y) => seqFunc(x, y),(x, y) => combFunc(x, x))

改写成 kvRDD.foldByKey(0)(_+_)依然正确运行

五、combineByKey

1、基本释义

与groupByKey、reduceByKey、aggregateByKey相比,combineByKey在日常开发工作中我们用的更少一些。combineByKey使用也相对复杂一些,可以处理更为复杂和个性化的需求。许多bykey类的内部也是调用combineByKeyWithClassTag函数完成部分计算操作的

下边是combineByKey的参数定义

def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C
): RDD[(K, C)]

可以看到起有三个参数,分别是

createCombiner:对于每个元素(键值对)的key只有两种情况,初次遇到,再次遇到。如果初次遇到执行createCombiner方法,创建Combiner,如果再次遇到同一key,表明已经存在则执行mergeValue,进行value的合并。该函数在格式上是由 V类型 -> C类型的转化

mergeValue:对于已经出现过的key,调用mergeValue来进行聚合操作,对该键的累加器对应的当前值(C类型)与这个新的值(V类型)进行合并,并输出C类型的结果。

mergeCombiners:是针对不同分区而言的,如果同一key出现在了多个分区,就需要使用mergeCombiners方法出马将各个分区的结果(全是C格式)进行一个最终的合并。

createCombiner 和 mergeValue 处理单个分区中数据,属于map端的操作, mergeCombiners是不同分区之间的数据合并,属于reduce端的操作,会触发shuffle计算。因此combineByKey首先通过createCombiner 、mergeValue合并相同的key,减少了对reduce的数据,自然shuffle时对资源的消耗减少,性能提升不言而喻

2、场景使用

1、有如下Tom和Cat两个学生的考试成绩,计算每个人的总分数和科目数量

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("agg").setMaster("local")val sc = new SparkContext(conf)val scores: Array[(String, Int)] = Array(("Tom", 88), ("Tom", 95), ("Tom", 91), ("Cat", 93), ("Cat", 95), ("Cat", 98), ("Tom", 92))val input: RDD[(String, Int)] = sc.parallelize(scores, 3);val combineRDD: RDD[(String,(Int,Int))] = input.combineByKey((v: Int) => (v, 1), //key初次遇到(分数值,1)(acc: (Int, Int), v: Int) => (acc._1 + v, acc._2 + 1), //相同key同一个分区(分数值累加,科目数累加)(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) //相同key不同分区(分数值累加,科目数累加))combineRDD.foreach(f => {println(f)})//打印每个人的平均分combineRDD.map(f => {(f._1,f._2._1/f._2._2)}).foreach(f => println(f))}

输出如下

//总分及科目数
(Tom,(366,4))
(Cat,(286,3))//平均分
(Tom,91)
(Cat,95)
def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C
): RDD[(K, C)]

结合示例代码和函数定义,参数有如下映射关系

(1)createCombiner:V>=C, (v: Int) => (v, 1)  

        key初次来没有,key对应的分数做新key,value就记数量1,(key,value)作为C输出

(2)mergeValue:(C, V) => C,(acc: (Int, Int), v: Int) => (acc._1 + v, acc._2 + 1)  

        相同key同一个分区,分数值累加,科目数累加,同样新(key,value)作为C输出

(3)mergeCombiners:(C, C) => C,(acc1: (Int, Int),acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)  

        相同key不同分区分数值累加,科目数累加,同样新(key,value)作为C输出

相关内容

热门资讯

手机系统安卓和ios系统下载地... 你有没有发现,现在手机的世界里,安卓和iOS两大系统就像是一对双胞胎,各有各的特色,让人爱不释手。今...
安卓系统最早开发公司,从安卓起... 你有没有想过,我们每天离不开的安卓系统,它究竟是由哪家公司最早开发的呢?没错,就是谷歌(Google...
安卓系统平板推荐学生用,学生适... 作为一名热爱学习的学生,你是不是也在寻找一款既实用又好用的平板电脑呢?平板电脑在学习和生活中可是个得...
安卓5.0系统多大容量,存储容... 你有没有想过,你的安卓手机里那个神秘的安卓5.0系统到底有多大容量呢?别急,今天就来给你揭秘这个谜团...
芒果嗨是安卓系统吗,揭秘这款热... 你有没有听说过“芒果嗨”这个名字?最近,这个名词在手机圈里可是火得一塌糊涂。不过,别急,今天咱们就来...
安卓系统锁屏如何破,破解攻略全... 你是不是也遇到了安卓手机锁屏的烦恼?每次解锁都要输入复杂的密码或者滑动图案,有时候真是急得团团转。别...
安卓系统app开机自启,深度解... 你有没有发现,每次手机开机,那些APP就像一群调皮的小精灵,迫不及待地跳出来和你打招呼?没错,说的就...
安卓系统拨号连接在哪,拨号连接... 你是不是也和我一样,有时候在使用安卓手机时,突然想连接一下网络,却发现不知道怎么操作?别急,今天就来...
安卓系统为什么会赢,技术革新与... 你有没有想过,为什么安卓系统在智能手机市场上如此火爆,几乎成了“手机必备”的存在呢?今天,就让我带你...
电脑可以做安卓系统么,电脑上运... 你有没有想过,电脑能不能装上安卓系统呢?这听起来是不是有点像科幻电影里的情节?别急,让我带你一探究竟...
国产安卓系统碎片化软件,软件生... 你有没有发现,现在手机上的安卓系统越来越丰富多样了?没错,这就是我们今天要聊的话题——国产安卓系统的...
安卓系统的蚂蚁花呗,蚂蚁花呗在... 你知道吗?在安卓系统的世界里,有一个超级方便的支付工具,那就是蚂蚁花呗。它就像你的贴心小助手,让你在...
安卓2系统彩蛋在哪找,揭秘隐藏... 你有没有发现,安卓2系统里竟然隐藏着一些神秘的彩蛋?没错,就是那些让你忍不住想要一探究竟的小惊喜。今...
全球最大的安卓系统,全球最大移... 你知道吗?在智能手机的世界里,有一个系统可是当之无愧的“王者”——那就是安卓系统!它就像一位全能的魔...
安卓系统就没有碎片了,迈向无缝... 你知道吗?最近在科技圈里,安卓系统可是掀起了一阵不小的波澜呢!有人说,安卓系统再也没有碎片化了?这可...
安卓系统平板电脑评测,安卓平板... 你有没有想过,在这个信息爆炸的时代,拥有一台性能卓越的安卓系统平板电脑,简直就是移动办公和娱乐的完美...
双系统安卓自动关机,双系统安卓... 你有没有遇到过这样的情况:手机里装了双系统安卓,一边是工作用的,一边是娱乐用的,结果有时候不小心,手...
圣地安列斯安卓9系统,圣地安列... 亲爱的读者,你是否也像我一样,对科技新动态充满好奇?今天,我要和你分享一个超级有趣的话题——圣地安列...
平果有安卓系统的吗,畅享智能生... 你有没有想过,手机的世界里,竟然还有这样一个有趣的现象?那就是——平果手机,竟然也有安卓系统!是不是...
vivoy27安卓系统下载,畅... 你有没有听说最近Vivo Y27这款手机的新鲜事儿?没错,就是它的安卓系统下载!今天,我就要给你来个...