Spark 键值对RDD的操作
创始人
2024-06-01 13:41:11
0

键值对RDD(Pair RDD)是指每个RDD元素都是(key,value)键值对类型,是一种常见的RDD类型,可以应用于很多的应用场景。

一、 键值对RDD的创建

键值对RDD的创建主要有两种方式:
(1)从文件中加载生成RDD;
(2)通过并行集合(数组)创建RDD。

1,从文件中加载生成RDD

首先使用textFile()方法从文件中加载数据,然后,使用map()函数转换得到相应的键值对RDD。

scala> val  lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/pairrdd/ word.txtMapPartitionsRDD[1] at textFile at :27 
scala> val  pairRDD = lines.flatMap(line => line.split(" ")).map(word => (word,1)) pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at :29 
scala> pairRDD.foreach(println) 
(i,1) 
(love,1) 
(hadoop,1) 
…… 

map(word => (word,1))函数的作用是,取出RDD中的每个元素,也就是每个单词,赋值给word,然后把word转换成(word,1)的键值对形式。

2,通过并行集合(数组)创建RDD

scala> val  list = List("Hadoop","Spark","Hive","Spark")
list: List[String] = List(Hadoop, Spark, Hive, Spark)  scala> val  rdd = sc.parallelize(list) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at :29  
scala> val pairRDD = rdd.map(word => (word,1)) pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at map at :31 
scala> pairRDD.foreach(println) 
(Hadoop,1) 
(Spark,1) 
(Hive,1) 
(Spark,1)

二、常用的键值对转换操作

常用的键值对转换操作包括reduceByKey(func)、groupByKey()、keys、values、sortByKey()、mapValues(func)、join和combineByKey等。

1,reduceByKey(func)

reduceByKey(func)的功能是,使用func函数合并具有相同键的值。
有一个键值对RDD包含4个元素,分别是(“Hadoop”,1)、(“Spark”,1)、(“Hive”,1)和(“Spark”,1)。可以使用reduceByKey()操作,得到每个单词的出现次数,代码及其执行结果如下:

scala> pairRDD.reduceByKey((a,b)=>a+b).foreach(println)
(Spark,2)
(Hive,1)
(Hadoop,1)

2,·groupByKey()

groupByKey()的功能是,对具有相同键的值进行分组。
有四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),采用groupByKey()后得到的结果是:(“spark”,(1,2))和(“hadoop”,(3,5)),代码及其执行结果如下:

scala> pairRDD.groupByKey()
res15: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[15] at groupByKeyat :34

reduceByKey和groupByKey的区别是:reduceByKey用于对每个key对应的多个value进行聚合操作,并且聚合操作可以通过函数func进行自定义;groupByKey也是对每个key进行操作,但是,对每个key只会生成一个value-list,groupByKey本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作。

3,keys()

键值对RDD每个元素都是(key,value)的形式,keys操作只会把键值对RDD中的key返回,形成一个新的RDD。

有一个键值对RDD,名称为pairRDD,包含4个元素,分别是(“Hadoop”,1)、(“Spark”,1)、(“Hive”,1)和(“Spark”,1),可以使用keys方法取出所有的key并打印出来,代码及其执行结果如下:

scala> pairRDD.keys
res17: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at keys at :34 
scala> pairRDD.keys.foreach(println) 
Hadoop 
Spark 
Hive 
Spark

4,values()

values操作只会把键值对RDD中的value返回,形成一个新的RDD。

有一个键值对RDD,名称为pairRDD,包含4个元素,分别是(“Hadoop”,1)、(“Spark”,1)、(“Hive”,1)和(“Spark”,1),可以使用values方法取出所有的value并打印出来,代码及其执行结果如下:

scala> pairRDD.values
res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at values at :34  
scala> pairRDD.values.foreach(println) 
1 
1 
1 
1

5,sortByKey()

sortByKey()的功能是返回一个根据key排序的RDD。

有一个键值对RDD,名称为pairRDD,包含4个元素,分别是(“Hadoop”,1)、(“Spark”,1)、(“Hive”,1)和(“Spark”,1),使用sortByKey()的效果如下:

scala> pairRDD.sortByKey()
res0: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at sortByKey at :34 
scala> pairRDD.sortByKey().foreach(println) 
(Hadoop,1) 
(Hive,1) 
(Spark,1) 
(Spark,1)

6,sortBy()

sortByKey()的功能是返回一个根据key排序的RDD,而sortBy()则可以根据其他字段进行排序。

scala> val  d1 = sc.parallelize(Array(("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9))) 
scala> d1.reduceByKey(_+_).sortByKey(false).collect res2: Array[(String, Int)] = Array((g,21),(f,29),(e,17),(d,9),(c,27),(b,38),(a,42))

sortByKey(false)括号中的参数false表示按照降序排序,如果没有提供参数false,则默认采用升序排序。从上面排序后的效果可以看出,所有键值对都按照key的降序进行了排序,因此输出Array((g,21),(f,29),(e,17),(d,9),(c,27),(b,38),(a,42))。

7,mapValues(func)

mapValues(func)对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化。
有一个键值对RDD,名称为pairRDD,包含4个元素,分别是(“Hadoop”,1)、(“Spark”,1)、(“Hive”,1)和(“Spark”,1),下面使用mapValues()操作把所有RDD元素的value都增加1:

scala> pairRDD.mapValues(x => x+1)res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at mapValues at :34 scala> pairRDD.mapValues(x => x+1).foreach(println) (Hadoop,2) (Spark,2) (Hive,2) (Spark,2)

8,join()

join表示内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

scala> val  pairRDD1 = sc.| parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5))) 
scala> val  pairRDD2 = sc.parallelize(Array(("spark","fast"))) 
scala> pairRDD1.join(pairRDD2) 
scala> pairRDD1.join(pairRDD2).foreach(println) 
(spark,(1,fast)) 
(spark,(2,fast))

pairRDD1中的键值对(“spark”,1)和pairRDD2中的键值对(“spark”,“fast”),因为二者具有相同的key(即"spark"),所以会产生连接结果(“spark”,(1,“fast”))。

9,combineByKey()

combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner,mapSideCombine)中的各个参数的含义如下:
(1)createCombiner:在第一次遇到key时创建组合器函数,将RDD数据集中的V类型值转换C类型值(V => C);
(2)mergeValue:合并值函数,再次遇到相同的Key时,将createCombiner的C类型值与这次传入的V类型值合并成一个C类型值(C,V)=>C;
(3)mergeCombiners:合并组合器函数,将C类型值两两合并成一个C类型值;
(4)partitioner:使用已有的或自定义的分区函数,默认是HashPartitioner;
(5)mapSideCombine:是否在map端进行Combine操作,默认为true。

文章来源:《Spark编程基础》 作者:林子雨

文章内容仅供学习交流,如有侵犯,联系删除哦!

相关内容

热门资讯

安卓系统用的华为应用,探索智能... 你知道吗?在安卓系统里,华为的应用可是个宝库呢!它们不仅功能强大,而且使用起来超级方便。今天,就让我...
安卓变ios系统魅蓝 你知道吗?最近有个朋友突然告诉我,他要把自己的安卓手机换成iOS系统,而且还是魅蓝品牌的!这可真是让...
幻书启世录安卓系统,安卓世界中... 亲爱的读者们,你是否曾在某个夜晚,被一本神奇的书所吸引,仿佛它拥有着穿越时空的力量?今天,我要带你走...
电脑安装安卓系统进不去,安卓系... 电脑安装安卓系统后竟然进不去,这可真是让人头疼的问题啊!你是不是也遇到了这种情况,心里直呼“怎么办怎...
用键盘切换控制安卓系统,畅享安... 你有没有想过,用键盘来控制你的安卓手机?是的,你没听错,就是那个我们每天敲敲打打的小玩意儿——键盘。...
小米安卓镜像系统在哪,小米安卓... 你有没有想过,你的小米手机里有一个隐藏的宝藏——安卓镜像系统?没错,就是那个可以让你的手机瞬间变身成...
安卓手机下载排班系统,高效排班... 你有没有想过,每天忙碌的工作中,有没有什么好帮手能帮你轻松管理时间呢?今天,就让我来给你介绍一个超级...
桌面组件如何弄安卓系统,桌面组... 亲爱的桌面爱好者们,你是否曾梦想过将安卓系统搬到你的电脑桌面上?想象那些流畅的动画、丰富的应用,还有...
安卓13系统介绍视频,新功能与... 亲爱的读者们,你是否对安卓13系统充满好奇?想要一探究竟,却又苦于没有足够的时间去研究?别担心,今天...
车机安卓7.1系统,功能升级与... 你有没有发现,现在的车机系统越来越智能了?尤其是那些搭载了安卓7.1系统的车机,简直就像是个贴心的智...
安卓系统下如何读pdf,And... 你有没有遇到过这种情况:手机里存了一大堆PDF文件,可是怎么也找不到一个能顺畅阅读的工具?别急,今天...
安卓系统全国通用的吗,畅享智能... 你有没有想过,为什么你的手机里装的是安卓系统呢?安卓系统,这个名字听起来是不是有点神秘?今天,就让我...
假苹果手机8安卓系统,颠覆传统... 你有没有想过,如果苹果手机突然变成了安卓系统,会是怎样的景象呢?想象那熟悉的苹果外观,却运行着安卓的...
安卓12.0系统vivo有吗,... 你有没有听说最近安卓系统又升级啦?没错,就是那个让手机焕然一新的安卓12.0系统!那么,咱们国内的手...
核心芯片和安卓系统,探索核心芯... 你知道吗?在科技的世界里,有一对“黄金搭档”正悄悄改变着我们的生活。他们就是——核心芯片和安卓系统。...
如何调安卓系统屏幕颜色,安卓系... 亲爱的手机控们,你是否曾觉得安卓系统的屏幕颜色不够个性,或者是因为长时间盯着屏幕而感到眼睛疲劳?别担...
旧台式电脑安装安卓系统,轻松安... 你那台旧台式电脑是不是已经服役多年,性能逐渐力不从心,却又不忍心让它退役呢?别急,今天就来教你怎么给...
美国要求关闭安卓系统,科技霸权... 美国要求关闭安卓系统:一场技术革新还是政治博弈?在数字化时代,智能手机已经成为我们生活中不可或缺的一...
安卓系统日记本 你有没有发现,手机里的安卓系统日记本,简直就是记录生活点滴的宝藏库呢?想象每天忙碌的生活中,有没有那...
安卓手机广告最少的系统,探索安... 你有没有发现,用安卓手机的时候,广告总是无处不在,让人烦得要命?不过别急,今天我要给你揭秘一个秘密—...