Spark:sortBy和sortByKey的函数详解

在很多应用场景都需要对结果数据进行排序,Spark中有时也不例外。在Spark中存在两种对RDD进行排序的函数,分别是 sortBy和sortByKey函数。sortBy是对标准的RDD进行排序,它是从Spark 0.9.0之后才引入的(可以参见SPARK-1063)。而sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD。下面将分别对这两个函数的实现以及使用进行说明。

一、sortBy函数实现以及使用

sortBy函数是在org.apache.spark.rdd.RDD类中实现的,它的实现如下:

01/**

02* Return this RDD sorted by the given key function.

03*/

04defsortBy[K](

05f:(T)=> K,

06ascending:Boolean=true,

07numPartitions:Int=this.partitions.size)

08(implicitord:Ordering[K], ctag:ClassTag[K]):RDD[T]=

09this.keyBy[K](f)

10.sortByKey(ascending, numPartitions)

11.values

该函数最多可以传三个参数:

第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;

第二个参数是ascending,从字面的意思大家应该可以猜到,是的,这参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;

第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为this.partitions.size。

从sortBy函数的实现可以看出,第一个参数是必须传入的,而后面的两个参数可以不传入。而且sortBy函数函数的实现依赖于sortByKey函数,关于sortByKey函数后面会进行说明。keyBy函数也是RDD类中进行实现的,它的主要作用就是将将传进来的每个元素作用于f(x)中,并返回tuples类型的

元素,也就变成了Key-Value类型的RDD了,它的实现如下:

1/**

2* Creates tuples of the elements in this RDD by applying `f`.

3*/

4defkeyBy[K](f:T=> K):RDD[(K, T)]={

5map(x=> (f(x), x))

6}

那么,如何使用sortBy函数呢?

01/**

02* User: 过往记忆

03* Date: 14-12-26

04* Time: 上午10:16

05* bolg:http://www.iteblog.com

06* 本文地址:http://www.iteblog.com/archives/1240

07* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货

08* 过往记忆博客微信公共帐号:iteblog_hadoop

09*/

10scala>valdata=List(3,1,90,3,5,12)

11data:List[Int]=List(3,1,90,3,5,12)

12

13scala>valrdd=sc.parallelize(data)

14rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0] at parallelize at :14

15

16scala> rdd.collect

17res0:Array[Int]=Array(3,1,90,3,5,12)

18

19scala> rdd.sortBy(x=> x).collect

20res1:Array[Int]=Array(1,3,3,5,12,90)

21

22scala> rdd.sortBy(x=> x,false).collect

23res3:Array[Int]=Array(90,12,5,3,3,1)

24

25scala>valresult=rdd.sortBy(x=> x,false)

26result:org.apache.spark.rdd.RDD[Int]=MappedRDD[23] at sortBy at :16

27

28scala> result.partitions.size

29res9:Int=2

30

31scala>valresult=rdd.sortBy(x=> x,false,1)

32result:org.apache.spark.rdd.RDD[Int]=MappedRDD[26] at sortBy at :16

33

34scala> result.partitions.size

35res10:Int=1

上面的实例对rdd中的元素进行升序排序。并对排序后的RDD的分区个数进行了修改,上面的result就是排序后的RDD,默认的分区个数是2,而我们对它进行了修改,所以最后变成了1。

二、sortByKey函数实现以及使用

sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的,实现如下

1defsortByKey(ascending:Boolean=true, numPartitions:Int=self.partitions.size)

2:RDD[(K, V)]=

3{

4valpart=newRangePartitioner(numPartitions, self, ascending)

5newShuffledRDD[K, V, V](self, part)

6.setKeyOrdering(if(ascending) orderingelseordering.reverse)

7}

从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。下面对sortByKey的使用进行说明:

01/**

02* User: 过往记忆

03* Date: 14-12-26

04* Time: 上午10:16

05* bolg:http://www.iteblog.com

06*

07* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货

08* 过往记忆博客微信公共帐号:iteblog_hadoop

09*/

10scala>vala=sc.parallelize(List("wyp","iteblog","com","397090770","test"),2)

11a:org.apache.spark.rdd.RDD[String]=

12ParallelCollectionRDD[30] at parallelize at :12

13

14scala>valb=sc. parallelize (1to a.count.toInt ,2)

15b:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[31] at parallelize at :14

16

17scala>valc=a.zip(b)

18c:org.apache.spark.rdd.RDD[(String, Int)]=ZippedPartitionsRDD2[32] at zip at :16

19

20scala> c.sortByKey().collect

21res11:Array[(String, Int)]=Array((397090770,4), (com,3), (iteblog,2), (test,5), (wyp,1))

上面对Key进行了排序。细心的读者可能会问,soryKy函数中的第一个参数可以对排序方式进行重写。为什么sortByKey没有呢?难道只能用默认的排序规则。不是,是有的。其实在OrderedRDDFunctions类中有个变量ordering它是隐形的:private val ordering = implicitly[Ordering[K]]。他就是默认的排序规则,我们可以对它进行重写,如下:

01scala>valb=sc.parallelize(List(3,1,9,12,4))

02b:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[38] at parallelize at :12

03

04scala>valc=b.zip(a)

05c:org.apache.spark.rdd.RDD[(Int, String)]=ZippedPartitionsRDD2[39] at zip at :16

06

07scala> c.sortByKey().collect

08res15:Array[(Int, String)]=Array((1,iteblog), (3,wyp), (4,test), (9,com), (12,397090770))

09

10scala>implicitvalsortIntegersByString=newOrdering[Int]{

11|overridedefcompare(a:Int, b:Int)=

12| a.toString.compare(b.toString)}

13sortIntegersByString:Ordering[Int]=$iwC$$iwC$$iwC$$iwC$$iwC$$anon$1@5d533f7a

14

15scala> c.sortByKey().collect

16res17:Array[(Int, String)]=Array((1,iteblog), (12,397090770), (3,wyp), (4,test), (9,com))

例子中的sortIntegersByString就是修改了默认的排序规则。这样将默认按照Int大小排序改成了对字符串的排序,所以12会排序在3之前。

Leave A Comment