Spark RDD算子
创始人
2024-05-15 14:53:46
0

文章目录

  • Spark RDD算子
    • 一、RDD 转换算子
      • 1、Value 类型
        • (1) map
        • (2) mapPartitions
          • 1)函数说明
          • 2)小案例获取每个分区的最大值
        • (3) map 和 mapParitions 的区别
        • (4) mapParitionsWithIndex
          • 1)小案例只获取第二个分区的最大值
          • 2)小案例获取每一个数据的分区来源

Spark RDD算子

RDD 方法也叫做RDD算子,主要分为两类,第一类是用来做转换的,例如flatMap()Map()方法,第二类是行动的,例如:collenct()方法,只有触发了作业才会被执行。
在这里插入图片描述

一、RDD 转换算子

RDD 根据数据处理方式的不同将算子整体上分为Value类型,双Value类型和Key-value类型。

1、Value 类型

(1) map

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

package com.atguigu.bigdata.spark.core.wc.operatorimport org.apache.spark.{SparkConf, SparkContext}//RDD 算子转换类型
class Spark01_RDD_Transform {}
object Spark01_RDD_Transform{def main(args: Array[String]): Unit = {//配置信息val conf = new SparkConf().setMaster("local[*]").setAppName("RDD_zhuanhuan")val context = new SparkContext(conf)//TODO 算子 => mapval rdd = context.makeRDD(List(1, 2, 3, 4)) //基于内存创建一个RDD//    def hanshu(num:Int):Int = {
//      num * 2
//    }
//
//    val value1 = rdd.map(hanshu)
//    value1.collect().foreach(println)val value = rdd.map(a => a * 2)println(value.collect().foreach(println))context.stop()}
}

map 算子的小测试:从服务器日志数据 apache.log中获取用户请求URL资源的路径
思路:文件最右边的那个是文件的路径。可以使用map方法,里面split(" ")方法用空格分隔开,然后再使用takeRight()方法,取最右边的第一个元素,那就是文件的地址了
在这里插入图片描述

package com.atguigu.bigdata.spark.core.wc.operatorimport org.apache.spark.{SparkConf, SparkContext}
//map 算子的小测试:从服务器日志数据 apache.log中获取用户请求URL资源的路径
class Spark02_RDD_test {}
object Spark02_RDD_test{def main(args: Array[String]): Unit = {//配置信息val conf = new SparkConf().setMaster("local[*]").setAppName("RDD_zhuanhuan")val context = new SparkContext(conf)//TODO 算子 => mapval rdd = context.textFile("datas/apache.log")//长的字符串//短的字符串val value = rdd.map(a => a.split(" ").takeRight(1)//将文件按照空格隔开,然后拿最右边的那一个数据)value.collect().foreach(println)context.stop()}
}

map 分区数据执行顺序测试
1、rdd的计算一个分区内的那么数据是一个一个执行逻辑
只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据
一个分区内的数据的执行是有序的,
2、不同分区数据计算是无序的

package com.atguigu.bigdata.spark.core.wc.operatorimport org.apache.spark.{SparkConf, SparkContext}//测试分区的执行的顺序
class Spark02_RDD_Transform_Par {}
object Spark02_RDD_Transform_Par{def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_Par")val context = new SparkContext(conf)//1、rdd的计算一个分区内的那么数据是一个一个执行逻辑//只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据//一个分区内的数据的执行是有序的,//2、不同分区数据计算是无序的val rdd = context.makeRDD(List(1,2,3,4),2)val rddMap = rdd.map(num => {println("<<<"+num)}) //第一个map转换val rddMap1 = rddMap.map(num=>{println("###"+num)}) //第二个map转换//发现并行计算是没有顺序的rddMap.collect().foreach(println) //第一个rddMap执行rddMap1.collect().foreach(println) //第二个rddMap执行,然后查看他们输出的顺序context.stop()}
}

(2) mapPartitions

1)函数说明

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是值可以进行任意的处理,哪怕是数据过滤。例如这里过滤掉等于2的数据。
val dataRDD1 = dataRDD.mapPartitions(
datas => {
datas.filter(_ == 2)
}
)
说明
map 是一个一个执行的,类似于之前的字节流,所以效率肯定不高,所以需要一个像之前优化字节流的缓冲区那样的方法,所以有了mapParitions 方法,mapParitions 方法是将一个分区内的数据全部拿到之后,然后再进行map操作,那效率肯定就高得多。
注意
mapPartitions:可以以分区为单位进行数据转换操作,但是会将整个分区的数据加载到内存中进行引用,如果处理完的数据是不会被释放掉,存在对象的引用,所以在内存比较小的情况下,数据量较大的情况下,容易出现内存溢出。
总结:两个方法的应用场景不同,如果内存足够那么mapPartitions方法肯定是效率更高的,但是mapPartitions方法存在对象引用,操作完之后内存不会被释放。要是内存小,数据量大的情况下那么最好使用map方法,因为是一条一条操作的,执行完之后内存就会被释放,没有对象引用,虽然效率会低一点,但是不会出错。

package com.atguigu.bigdata.spark.core.wc.operatorimport org.apache.spark.{SparkConf, SparkContext}//map 是一个一个执行的,类似于之前的字节流,所以效率肯定不高
//所以需要一个像之前优化字节流的缓冲区那样的方法
//所以有了mapParitions 方法
class Spark02_RDD_Transform {}
object Spark02_RDD_Transform{def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_Par")val context = new SparkContext(conf)val rdd = context.makeRDD(List(1, 2, 3, 4), 2) //创建一个RDD//TODO 算子 - mapPartitions//mapPartitions:可以以分区为单位进行数据转换操作//但是会将整个分区的数据加载到内存中进行引用//如果处理完的数据是不会被释放掉,存在对象的引用//所以在内存比较小的情况下,数据量较大的情况下,容易出现内存溢出。//这个方法之所以高效,他是把一个分区内的数据全部拿到之后才开始做操作//而不是一个一个的做操作val mpRDD = rdd.mapPartitions(a => { //这这个方法执行底层是迭代器println(">>>>>>>>>>")a.map(_ * 2) //相当于先把一个分区内的数据聚合了,然后再进行map操作,这个效率就要高得多了})mpRDD.collect()foreach(println)context.stop()}
}
2)小案例获取每个分区的最大值

首先创建RDD的时候,就设置好分区数。
思路:因为mapPartitions方法是将待处理的数据以分区为单位发送到计算节点进行处理,所以我们可以直接用它直接按照每一个分区进行操作,然后直接max方法获取最大值。但是这里的难点在于,mapPartitions方法返回的是一个迭代器,而max方法返回的是一个Int类型的值,所以我们需要用List或者其他类型的集合都可以,给它包裹起来,然后用toIterator方法进行转换,例如List(a.max).toIterator。最后就可以得到每一个分区的最大值了,第一个分区1,2 第二个分区的数据3,4 所以最后输出的是2,4。
在这里插入图片描述

package com.atguigu.bigdata.spark.core.wc.operatorimport org.apache.spark.{SparkConf, SparkContext}
//案例:获取每个分区的最大值
class Spark02_RDD_Transform_Par2 {}
object Spark02_RDD_Transform_Par2{def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_Par")val context = new SparkContext(conf)val rdd = context.makeRDD(List(1, 2, 3, 4), 2) //创建一个RDD//TODO 算子 - mapPartitionsval mpRDD = rdd.mapPartitions(a => { //这这个方法执行底层是迭代器println(">>>>>>>>>>")List(a.max).toIterator //因为mapPartitions方法返回的是一个迭代器,a.max得到的是一个Int的数值})                  //所以我们的用列表,或者其他的集合都可以把他包起来,然后toIterator将它转换为迭代器就可以了mpRDD.collect().foreach(println) //得到的结果应该是2和4,第一个分区1,2 第二个分区2,4context.stop()}
}

(3) map 和 mapParitions 的区别

数据处理角度
Map 算子是分区内一个数据一个数的执行,类似于串行操作。而mapParitions算子是已分区为单位进行批处理操作。
功能的角度
Map 算子主要目的是将数据源中的数据进行转换和改变。但是不会减少或增多数据。mapParitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据。
性能的角度
Map 算子因为类似于串行操作,所以性能比较低,mapParitions 算子类似于批处理,所以性能较高。但是mapParitions 算子会长时间占用内容,那么这样会导致内存可能不够用,出现内存溢出的错误,所以在内存有限的情况下,不推荐使用,推荐使用map操作。

(4) mapParitionsWithIndex

函数说明
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

1)小案例只获取第二个分区的最大值

就是跟mapParitions方法一样的,只是多了一个分区编号,可以指定操作哪一个分区。在某些时候非常有用,比如有两个分区,我只要第二个分区的最大值,第一个分区的数据不要。
思路
里面第一个参数是分区的索引,第二个参数是迭代器也就是分区的所有数据。我们可以对分区进行判断,如果等于1说明就是第二个分区,我们直接返回那个迭代器,然后求的是第二个分区的最大值,我们再像刚刚一样用集合包起来,然后使用toIterator方法进行转换。然后如果不为1的话那么返回一个空的迭代器,Nil.iterator Nil 方法是空集合,空集合.迭代器,就是空迭代器。

package com.atguigu.bigdata.spark.core.wc.operatorimport org.apache.spark.{SparkConf, SparkContext}//mapParitionsWithIndex 方法 比mapParitions多了一个分区编号
class Spark03_RDD_mapParitionsWithIndex {}
object Spark03_RDD_mapParitionsWithIndex{def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_Par")val context = new SparkContext(conf)val rdd = context.makeRDD(List(1, 2, 3, 4), 2) //创建一个RDD//TODO 算子 - mapPartitionsWithIndex//[1,2][3,4]val mpRDD = rdd.mapPartitionsWithIndex((index,iter) => { //第一个参数是索引的编号,第二个参数是全部的数据,就是迭代器if (index == 1){List(iter.max).toIterator //因为我们只要第二个分区,第一个分区索引为0,第二个分区索引为1,如果1就直接返回迭代器}else{Nil.iterator //如果不是1,那么我们返回一个空的迭代器,Nil 空集合}})mpRDD.collect().foreach(println)context.stop()}
}
2)小案例获取每一个数据的分区来源

分为了4个分区
思路
使用mapPartitionsWithIndex方法,第一个是索引第二个是迭代器,分区中的每一个数据,然后对迭代器进行map操作,映射,第一个参数是分区的索引,第二个参数是分区中的每个数据。就取出来了。
在这里插入图片描述

package com.atguigu.bigdata.spark.core.wc.operatorimport org.apache.spark.{SparkConf, SparkContext}获取每一个数据来自于哪一个分区
class Spark03_RDD_mapParitionsWithIndex2 {}
object Spark03_RDD_mapParitionsWithIndex2{def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("Spark_Par")val context = new SparkContext(conf)val rdd = context.makeRDD(List(1, 2, 3, 4), 4) //创建一个RDD//TODO 算子 - mapPartitionsWithIndex//[1,2][3,4]val mpRDD = rdd.mapPartitionsWithIndex((index,iter) => {iter.map(a => {(index,a) //第一个是分区索引,第二个是每一个数据})})mpRDD.collect().foreach(println)context.stop()}
}

相关内容

热门资讯

系统如何与安卓互通,技术融合与... 你有没有想过,你的手机系统竟然能和安卓系统这么默契地互通有无?这就像是一场跨越科技界的友谊赛,让我们...
安卓系统 扫码枪,安卓系统下扫... 你有没有想过,在繁忙的超市收银台,那些快速流畅的扫码操作,背后其实隐藏着一个小小的英雄——安卓系统扫...
平板插卡推荐安卓系统,安卓系统... 你有没有想过,你的平板电脑是不是也能像智能手机一样,随时随地扩充存储空间呢?没错,这就是今天我要跟你...
安卓系统固件安装失败,原因排查... 最近是不是你也遇到了安卓系统固件安装失败的问题?别急,让我来给你详细说说这个让人头疼的小麻烦,让你一...
ios系统和安卓区别,系统差异... 你有没有发现,现在手机市场上,iOS系统和安卓系统就像是一对双胞胎,长得差不多,但性格却截然不同。今...
安卓系统2.3优酷,优酷的崛起... 你有没有发现,安卓系统2.3时代的那股怀旧风?那时候,优酷可是视频界的巨头,多少人都是看着优酷长大的...
安卓导航系统密封,安卓导航系统... 你有没有发现,现在手机导航系统越来越智能了?尤其是安卓系统的导航,简直就像一个贴心的导航小助手,带你...
a版安卓11系统,a版深度解析... 你知道吗?最近手机界可是炸开了锅,各大品牌纷纷发布了搭载a版安卓11系统的手机。这可不是什么小打小闹...
安卓系统的模拟吉他,随时随地弹... 你有没有想过,在手机上也能弹奏吉他呢?没错,就是那种模拟吉他的安卓系统应用,让你随时随地都能享受音乐...
王者适配的安卓系统,深度解析适... 你有没有发现,最近玩《王者荣耀》的小伙伴们都在议论纷纷,说新出的安卓系统简直是为王者量身定做的!没错...
安卓系统自动定位关闭,隐私保护... 你有没有发现,手机里的安卓系统有时候会自动定位,这可真是让人又爱又恨啊!有时候,我们并不想让别人知道...
安卓系统电量耗尽测试,全面解析... 手机电量耗尽,这可是每个手机用户都头疼的问题。你有没有想过,你的安卓手机在电量耗尽前,到底经历了哪些...
如何升级车载安卓系统,车载安卓... 亲爱的车主朋友们,你是不是也和我一样,对车载安卓系统升级这件事充满了好奇和期待呢?想象当你驾驶着爱车...
安卓办公哪个系统好,深度解析哪... 你有没有想过,在安卓办公的世界里,哪个系统才是你的最佳拍档呢?在这个信息爆炸的时代,选择一个既强大又...
安卓系统差劲怎么解决,重拾流畅... 你有没有发现,安卓系统有时候真的让人头疼得要命?手机卡顿、应用崩溃、电池续航短,这些问题是不是让你抓...
喜欢安卓系统的原因,探索用户偏... 你有没有发现,身边的朋友、同事,甚至家人,越来越多的人开始使用安卓手机了呢?这可不是简单的潮流,而是...
安卓系统金立手机,品质生活新选... 你有没有发现,最近安卓系统下的金立手机突然火了起来?没错,就是那个曾经陪伴我们走过无数时光的金立手机...
无安卓系统的电视,新型无系统电... 亲爱的读者们,你是否厌倦了那些充斥着安卓系统的电视?想要尝试一些新鲜玩意儿?那就跟我一起探索一下无安...
麒麟系统能刷安卓系统吗,轻松刷... 你有没有想过,你的麒麟手机能不能装上安卓系统呢?这可是个让人好奇不已的问题。现在,就让我来带你一探究...
手机公司安卓系统吗,手机公司引... 你有没有想过,为什么你的手机里装的是安卓系统而不是苹果的iOS呢?这背后可是有着不少故事和门道的哦!...