spark中的distinct函数去重方式和Scala中的distinct是不同的。
首先来看Scala中的distinct:
List(1,2,3,1,2).distinct
然后来看源码:
def distinct: Repr = {val isImmutable = this.isInstanceOf[immutable.Seq[_]]if (isImmutable && lengthCompare(1) <= 0) reprelse {val b = newBuilderval seen = new mutable.HashSet[A]()var it = this.iteratorvar different = falsewhile (it.hasNext) {val next = it.nextif (seen.add(next)) b += next else different = true}if (different || !isImmutable) b.result() else repr}}
从源码中可以看到,Scala中的distinct函数是使用HashSet来去重的。
接下来看spark中的distinct函数:
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 1, 2))
val distinctRDD: RDD[Int] = rdd.distinct()
然后看源码:
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = {// Create an instance of external append only map which ignores values.val map = new ExternalAppendOnlyMap[T, Null, Null](createCombiner = _ => null,mergeValue = (a, b) => a,mergeCombiners = (a, b) => a)map.insertAll(partition.map(_ -> null))map.iterator.map(_._1)}partitioner match {case Some(_) if numPartitions == partitions.length =>mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)}}
源码中主要的去重逻辑是:
map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
首先原始数据为:List(1,2,3,1,2)
去重过程为:
map(x => (x, null))
转换为(1,null),(2,null),(3,null),(1,null),(2,null)
reduceByKey
将相同key的value进行聚合,数据变为(1,null),(1,null)
,聚合的逻辑为(x,_)=>x
,相同key的值聚合后为(null,null)
,只取第一个值,返回null
。最后key为1的数据就变为了(1,null)
。map(_._1)
,取出第一个数据1
,就达到了去重的效果。map--reduceByKey--map
算子,采用的是分布式的去重方式。