内向基金完成首轮募资
06-17
原创/朱吉谦一,案例说明我刚开始学习Spark的时候,在练习排序算子sortBy时,发现了一个有趣的现象,当使用排序算子sortBy时,如果之后直接打印,你会发现打印的结果是乱序的,没有完整的排序。例如,有一个List数据,其中包含多个(名称,金额)结构。
当这些数据按金额降序排序时,代码及打印效果如下: 代码语言: go copy val Money = ss.sparkContext.parallelize( List((" Alice", ), ("Bob", ), (“查理”, ), (“大卫”, ), (“艾玛”, ), (“弗兰克”, ), (“格蕾丝”, ), (“汉娜”, ), (“艾薇”, ), ( "Jack", )))money.sortBy(x =>x._2, false).foreach(println) 打印结果——(Ivy,)(Grace,)( Jack,)(Frank,)(Emma,) (Alice,)(Charlie,)(Bob,)(Hannah,)(David,)可以看到执行结果并不是按照金额降序排列的。但是,如果使用collect或者将分区重置为1并直接保存结果,你会发现结果可以按照金额降序排序。
(注意,根据保存结果,虽然可能会生成很多part-0~part-5文件,但从part-0到part-5,内部数据实际上是按照数量降序排列的)。代码语言:scala copy Money.sortBy(x =>x._2, false).collect().foreach(println) 或 Money.repartition(1).sortBy(x =>x._2, false).foreach(println) ) 或者 Money.sortBy(x =>x._2, false).saveAsTextFile("result")最终结果——(Alice,)(David,)(Emma,)(Bob,)(Hannah,)(Grace, ) (Ivy,)(Charlie,)(Jack,)(Frank,) 2.SortBy源码分析 为什么单独sortBy后打印的数据是乱序的,但是sortBy后通过collect、save或重新分区repartition(1),数据是否有序?带着这个问题,看一下sortBy的底层源码——代码语言:scala copy def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length ) (隐式 ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope { this.keyBy[K](f) .sortByKey(ascending, numPartitions) .values} 如您所见,核心源码是这个.keyBy[K](f).sortByKey(ascending, numPartitions).values,我将源码分为三部分:this.keyByK、sortByKey(ascending, numPartitions)和values - 2.1,逐节分析sortBy源码一: this.keyBy[K](f) this.keyBy[K](f) 这行代码是根据 _.sortBy(x = 传入的 x =>x._2 >x._2, false) 重新生成一个新的 RDD 数据,可以进入其底层源码看一下 - 代码语言:scala ComplexSystem def keyBy[K](f: T => K): RDD[(K, T)] = withScope { val cleanF = sc.clean(f) map(x => (cleanedF(x), x))}如果执行 _.sortBy(x =>x._2, false),则 f: T => K 匿名函数为 x =>x._2。
因此,keyBy函数的真实代码是这样的——代码语言:txt copy map(x => (sc.clean(x =>x._2), x))sc.clean(x =>x._2)这个clean相当序列化了传入的函数,因为它将这个函数的结果作为排序键,分发到不同的分区节点进行排序,这涉及到网络传输。因此,序列化后,可以方便地在分布式计算中不同节点之间传输和执行功能。
clean最终的底层实现是 SparkEnv.get.closureSerializer.newInstance().serialize(func) 这行代码,有兴趣的可以深入研究一下。keyBy最终会生成一个新的RDD。
至于这个结构是什么,通过调用 keyBy 并打印原始测试数据就一目了然了——代码语言:scala copy val Money = ss.sparkContext.parallelize( List(("Alice", ), ("Bob ", ), (“查理”, ), (“大卫”, ), (“艾玛”, ), (“弗兰克”, ), (“格蕾丝”, ), (“汉娜”, ) , (“艾薇” , ), ("Jack", )))money.keyBy(x =>x._2).foreach(println) 打印结果——(,(Grace,))(,(Hannah,)) (,(David ,))(,(艾玛,))(,(爱丽丝,))(,(查理,))(,(艾维,))(,(杰克,))(,(弗兰克,))(,(鲍勃, )) 由此我们可以看到,原来的RDD具有("Alice", )结构,通过keyBy源码中的map(x => (sc.clean(x =>x._2), x))代码,最终会生成(x._2, x)这样结构的数据,即(,(Alice,)),这意味着需要排序的字段数量将作为key新RDD的。 2.2.一段段分析sortBy源码:sortByKey通过this.keyBy[K](f)获取结构体为(x._2,x)的RDD后,可以看到虽然我们调用了money.sortBy(x => x ._2, false)来排序,但是底层本质还是调用了另一个排序运算符sortByKey,它有两个参数,一个是布尔值的升序,true表示按升序排序,false表示按降序排序,我们这里传递进来的东西是假的。
另一个参数 numPartitions 表示分区的数量。通过定义的rdd.partitions.size可以知道环境中的分区数量。
进入sortByKey源码,通过下面的函数注释,可以知道sortByKey函数做了什么——代码语言:scala copy/** * 对RDD按key进行排序,使得每个分区都包含一个排序范围的元素。对生成的 RDD 调用“collect”或“save”将返回或输出一个有序的记录列表(在“save”情况下,它们将按顺序写入文件系统中的多个“part-X”文件)键)。
* * 按键对 RDD 进行排序,以便每个分区都包含已排序的元素范围。对生成的 RDD 调用collect或save将返回或输出有序的记录列表(在save的情况下,它们将按键顺序写入文件系统中的多个part-X文件)。
*/// TODO:目前这不适用于 Tuple2 以外的 P!def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse)} 此时,就可以知道基于 sortByKey关于注释已经做了什么——第一步是在每个分区中按key对RDD进行shuffle,然后将相同的Key划分到相同的分区中进行排序。第二步,调用collect或save后,会将排序后的分区进行合并,最终得到完整的排序结果。
这意味着,如果不调用collect或save来汇总每个分区的结果并返回给主驱动进程,虽然分区中的数据是排序的,但分区不一定是有序的。这时候如果直接用foreach打印,因为打印是并行执行的,即使分区是有序的,并行打印也会很乱。
可以写一段代码来验证每个分区是否有序 - 代码语言:scala copy Money.sortBy(x => x._2, false).foreachPartition(x => { valpartitionId = TaskContext.get.partitionId //val index = UUID.randomUUID() x.foreach(x => { println("分区号" + partitionId + ": " + x) })}) 打印结果 - 分区号 2: (ivy,) 分区号 2: (查理,)分区2号:(杰克,)分区2号:(弗兰克,)分区1号:(鲍勃,)分区1号:(汉娜,)分区1号:(格蕾丝,)分区No. 0 : (Alice,) 分区编号 0: (David,) 分区编号 0: (Emma,) 将环境设置为 3 个分区。可以看到每个分区中的数据已经按照降序排序了。
如果只有一个分区,则该分区中的数据也会按照降序排序。这就是为什么money.repartition(1).sortBy(x =>x._2, false).foreach(println)得到的数据也是排序后的结果。
。 sortBy的主要流程如下。
假设运行环境有3个分区。当读取的数据创建RDD时,数据会根据默认的Hash分区器分为三个分区。
调用sortBy后,RDD会通过this.keyByK重新生成一个新的RDD,比如(,(David,))这样的结构,然后进行shuffle操作,将RDD数据打乱,将对应范围内的key重新分布到在同一个分区中,表示相同键值的数据会分布到同一个分区,如(,(Jack,)),(,(Alice,)),(,(Frank,)),(中下图,(Hannah,))包含相同Key的都在一起。在shuffleRDD中,mapPartitions用于根据key对每个分区的数据进行升序或降序排序,以获得分区内有序的结果集。
2.3.一段段分析sortBy源码:.values在sortBy底层源码中,this.keyBy[K](f).sortByKey(ascending, numPartitions).values,在sortByKey之后,最终调用了.values 。源码.values中包含def value:RDDV = self.map(_._2),表示排序完成后,只会返回x._2的数据,用于对生成的RDD进行排序。
与排序过程类似,RDD 具有类似 (,(Grace,)) 的结构。排序后,如果只返回x._2,则返回一个结构如(Grace,)的RDD即可。
可以看到,shuffleRDD将对应范围内的key重新分配到同一个分区中。比如 0~ 划分为分区 0,~ 划分为分区 1,~ 划分为分区 2。
这样还有一个好处——当 0, 1 时,分区 2 内的数据已经是有序的,那么从整体来看分区0、1、2,其实是全局有序的。当然,如果要实现全局排序,就需要将它们合并并返回。
给司机。 3、合并各个分区的排序,返回全局排序。
调用collect或save是对每个分区的结果进行汇总,相当于一次归并排序操作。以上是Spark sortBy核心源码的讲解。
我正在参加腾讯科技创造特训营第二期有奖征文比赛,分享奖金池1万元和键盘手表。
版权声明:本文内容由互联网用户自发贡献,本站不拥有所有权,不承担相关法律责任。如果发现本站有涉嫌抄袭的内容,欢迎发送邮件 举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。
标签:
相关文章
06-17
06-17
06-17
06-17
最新文章
【玩转GPU】ControlNet初学者生存指南
【实战】获取小程序中用户的城市信息(附源码)
包雪雪简单介绍Vue.js:开学
Go进阶:使用Gin框架简单实现服务端渲染
线程池介绍及实际案例分享
JMeter 注释 18 - JMeter 常用配置组件介绍
基于Sentry的大数据权限解决方案
【云+社区年度征文集】GPE监控介绍及使用