Spark排序算子sortBy核心源码图解

发布于:2024-10-24 编辑:匿名 来源:网络

原创/朱吉谦一,案例说明我刚开始学习Spark的时候,在练习排序算子sortBy时,发现了一个有趣的现象,当使用排序算子sortBy时,如果之后直接打印,你会发现打印的结果是乱序的,没有完整的排序。例如,有一个List数据,其中包含多个(名称,金额)结构。

当这些数据按金额降序排序时,代码及打印效果如下: 代码语言: go copy val Money = ss.sparkContext.parallelize( List((" Alice", ), ("Bob", ), (“查理”, ), (“大卫”, ), (“艾玛”, ), (“弗兰克”, ), (“格蕾丝”, ), (“汉娜”, ), (“艾薇”, ), ( "Jack", )))money.sortBy(x =>x._2, false).foreach(println) 打印结果——(Ivy,)(Grace,)( Jack,)(Frank,)(Emma,) (Alice,)(Charlie,)(Bob,)(Hannah,)(David,)可以看到执行结果并不是按照金额降序排列的。但是,如果使用collect或者将分区重置为1并直接保存结果,你会发现结果可以按照金额降序排序。

(注意,根据保存结果,虽然可能会生成很多part-0~part-5文件,但从part-0到part-5,内部数据实际上是按照数量降序排列的)。代码语言:scala copy Money.sortBy(x =>x._2, false).collect().foreach(println) 或 Money.repartition(1).sortBy(x =>x._2, false).foreach(println) ) 或者 Money.sortBy(x =>x._2, false).saveAsTextFile("result")最终结果——(Alice,)(David,)(Emma,)(Bob,)(Hannah,)(Grace, ) (Ivy,)(Charlie,)(Jack,)(Frank,) 2.SortBy源码分析 为什么单独sortBy后打印的数据是乱序的,但是sortBy后通过collect、save或重新分区repartition(1),数据是否有序?带着这个问题,看一下sortBy的底层源码——代码语言:scala copy def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length ) (隐式 ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope { this.keyBy[K](f) .sortByKey(ascending, numPartitions) .values} 如您所见,核心源码是这个.keyBy[K](f).sortByKey(ascending, numPartitions).values,我将源码分为三部分:this.keyByK、sortByKey(ascending, numPartitions)和values - 2.1,逐节分析sortBy源码一: this.keyBy[K](f) this.keyBy[K](f) 这行代码是根据 _.sortBy(x = 传入的 x =>x._2 >x._2, false) 重新生成一个新的 RDD 数据,可以进入其底层源码看一下 - 代码语言:scala ComplexSystem def keyBy[K](f: T => K): RDD[(K, T)] = withScope { val cleanF = sc.clean(f) map(x => (cleanedF(x), x))}如果执行 _.sortBy(x =>x._2, false),则 f: T => K 匿名函数为 x =>x._2。

因此,keyBy函数的真实代码是这样的——代码语言:txt copy map(x => (sc.clean(x =>x._2), x))sc.clean(x =>x._2)这个clean相当序列化了传入的函数,因为它将这个函数的结果作为排序键,分发到不同的分区节点进行排序,这涉及到网络传输。因此,序列化后,可以方便地在分布式计算中不同节点之间传输和执行功能。

clean最终的底层实现是 SparkEnv.get.closureSerializer.newInstance().serialize(func) 这行代码,有兴趣的可以深入研究一下。keyBy最终会生成一个新的RDD。

至于这个结构是什么,通过调用 keyBy 并打印原始测试数据就一目了然了——代码语言:scala copy val Money = ss.sparkContext.parallelize( List(("Alice", ), ("Bob ", ), (“查理”, ), (“大卫”, ), (“艾玛”, ), (“弗兰克”, ), (“格蕾丝”, ), (“汉娜”, ) , (“艾薇” , ), ("Jack", )))money.keyBy(x =>x._2).foreach(println) 打印结果——(,(Grace,))(,(Hannah,)) (,(David ,))(,(艾玛,))(,(爱丽丝,))(,(查理,))(,(艾维,))(,(杰克,))(,(弗兰克,))(,(鲍勃, )) 由此我们可以看到,原来的RDD具有("Alice", )结构,通过keyBy源码中的map(x => (sc.clean(x =>x._2), x))代码,最终会生成(x._2, x)这样结构的数据,即(,(Alice,)),这意味着需要排序的字段数量将作为key新RDD的。 2.2.一段段分析sortBy源码:sortByKey通过this.keyBy[K](f)获取结构体为(x._2,x)的RDD后,可以看到虽然我们调用了money.sortBy(x => x ._2, false)来排序,但是底层本质还是调用了另一个排序运算符sortByKey,它有两个参数,一个是布尔值的升序,true表示按升序排序,false表示按降序排序,我们这里传递进来的东西是假的。

另一个参数 numPartitions 表示分区的数量。通过定义的rdd.partitions.size可以知道环境中的分区数量。

进入sortByKey源码,通过下面的函数注释,可以知道sortByKey函数做了什么——代码语言:scala copy/** * 对RDD按key进行排序,使得每个分区都包含一个排序范围的元素。对生成的 RDD 调用“collect”或“save”将返回或输出一个有序的记录列表(在“save”情况下,它们将按顺序写入文件系统中的多个“part-X”文件)键)。

* * 按键对 RDD 进行排序,以便每个分区都包含已排序的元素范围。对生成的 RDD 调用collect或save将返回或输出有序的记录列表(在save的情况下,它们将按键顺序写入文件系统中的多个part-X文件)。

*/// TODO:目前这不适用于 Tuple2 以外的 P!def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse)} 此时,就可以知道基于 sortByKey关于注释已经做了什么——第一步是在每个分区中按key对RDD进行shuffle,然后将相同的Key划分到相同的分区中进行排序。第二步,调用collect或save后,会将排序后的分区进行合并,最终得到完整的排序结果。

这意味着,如果不调用collect或save来汇总每个分区的结果并返回给主驱动进程,虽然分区中的数据是排序的,但分区不一定是有序的。这时候如果直接用foreach打印,因为打印是并行执行的,即使分区是有序的,并行打印也会很乱。

可以写一段代码来验证每个分区是否有序 - 代码语言:scala copy Money.sortBy(x => x._2, false).foreachPartition(x => { valpartitionId = TaskContext.get.partitionId //val index = UUID.randomUUID() x.foreach(x => { println("分区号" + partitionId + ": " + x) })}) 打印结果 - 分区号 2: (ivy,) 分区号 2: (查理,)分区2号:(杰克,)分区2号:(弗兰克,)分区1号:(鲍勃,)分区1号:(汉娜,)分区1号:(格蕾丝,)分区No. 0 : (Alice,) 分区编号 0: (David,) 分区编号 0: (Emma,) 将环境设置为 3 个分区。可以看到每个分区中的数据已经按照降序排序了。

如果只有一个分区,则该分区中的数据也会按照降序排序。这就是为什么money.repartition(1).sortBy(x =>x._2, false).foreach(println)得到的数据也是排序后的结果。

。 sortBy的主要流程如下。

假设运行环境有3个分区。当读取的数据创建RDD时,数据会根据默认的Hash分区器分为三个分区。

调用sortBy后,RDD会通过this.keyByK重新生成一个新的RDD,比如(,(David,))这样的结构,然后进行shuffle操作,将RDD数据打乱,将对应范围内的key重新分布到在同一个分区中,表示相同键值的数据会分布到同一个分区,如(,(Jack,)),(,(Alice,)),(,(Frank,)),(中下图,(Hannah,))包含相同Key的都在一起。在shuffleRDD中,mapPartitions用于根据key对每个分区的数据进行升序或降序排序,以获得分区内有序的结果集。

2.3.一段段分析sortBy源码:.values在sortBy底层源码中,this.keyBy[K](f).sortByKey(ascending, numPartitions).values,在sortByKey之后,最终调用了.values 。源码.values中包含def value:RDDV = self.map(_._2),表示排序完成后,只会返回x._2的数据,用于对生成的RDD进行排序。

与排序过程类似,RDD 具有类似 (,(Grace,)) 的结构。排序后,如果只返回x._2,则返回一个结构如(Grace,)的RDD即可。

可以看到,shuffleRDD将对应范围内的key重新分配到同一个分区中。比如 0~ 划分为分区 0,~ 划分为分区 1,~ 划分为分区 2。

这样还有一个好处——当 0, 1 时,分区 2 内的数据已经是有序的,那么从整体来看分区0、1、2,其实是全局有序的。当然,如果要实现全局排序,就需要将它们合并并返回。

给司机。 3、合并各个分区的排序,返回全局排序。

调用collect或save是对每个分区的结果进行汇总,相当于一次归并排序操作。以上是Spark sortBy核心源码的讲解。

我正在参加腾讯科技创造特训营第二期有奖征文比赛,分享奖金池1万元和键盘手表。

Spark排序算子sortBy核心源码图解

站长声明

版权声明:本文内容由互联网用户自发贡献,本站不拥有所有权,不承担相关法律责任。如果发现本站有涉嫌抄袭的内容,欢迎发送邮件 举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。

标签:

相关文章

  • 内向基金完成首轮募资

    内向基金完成首轮募资

    据投资界8月27日消息,内向基金(洞察基金)正式完成首期基金首轮超亿元募资,目标规模2亿元,将重点关注消费升级领域的股权投资。 据了解,引进基金是与新经济精品投行“穆棉资本”联合设立的私募股权投资基金。 它是由孙婷婷和Stefanie应金峰共同创立的。 两人在风险投资行

    06-17

  • 东方航天完成近6亿元B轮融资,重力2号中大型可回收液体火箭加速! -元景家族

    东方航天完成近6亿元B轮融资,重力2号中大型可回收液体火箭加速! -元景家族

    东方航天完成近6亿元B轮融资,重力2号中大型可回收液体火箭加速! |远景家族 远景资本 远景资本 远景资本 微信 IDvisionpluscapital 关于专题 远景资本公众平台汇聚创新趋势,以分享启发 01-25 17:12发布于浙江 近日,东方空间完成近6亿元B轮融资,本轮投资被梁溪科技创新产业

    06-18

  • 晶合集成电路股份有限公司今日在科创板挂牌,总市值近400亿元

    晶合集成电路股份有限公司今日在科创板挂牌,总市值近400亿元

    合肥市人民政府 据合肥市人民政府消息,5月5日,合肥市晶和集成电路股份有限公司在上海证券交易所上市。 成功登陆科创板,成为安徽省首家成功登陆资本市场的纯晶圆代工企业。 本次发行价格为19.86元。 超额配售选择权全额行使后,募集资金5500万元,在科创板上市公司融资规模

    06-06

  • 专注绿色制氢技术,“动量守恒”完成数千万元天使轮融资

    专注绿色制氢技术,“动量守恒”完成数千万元天使轮融资

    投资界(ID:pedaily)5月29日消息,国内质子交换膜电解槽核心材料及器件提供商合肥动量守恒绿色动力节能股份有限公司(以下简称“动力节能”)近日正式完成领投方数千万元天使轮融资。 本次完成交割的天使轮领投方为当看同创资本,元和资本担任天使轮融资独家投资方。 本轮融

    06-18

  • 工业互联网公司德云科技完成5亿元B轮融资,中金传化基金等联合领投

    工业互联网公司德云科技完成5亿元B轮融资,中金传化基金等联合领投

    投资界4月26日消息,近日,“新基建”全栈工业互联网产品提供商“退风科技完成5亿元B轮融资。 本轮融资由中金传输基金、深创投、交银国际、越秀金控联合领投,招商致远、青控招商、云启资本、亿唐宏图集成电路及互联网投资基金跟投老股东继续投资。 据悉,德丰科技在10个月内

    06-18

  • 2021年以来新设立基金规模已达5332.79亿元

    2021年以来新设立基金规模已达5332.79亿元

    Wind数据显示,截至2月3日,年初以来累计设立基金,发行规模7900万元。 其中,仅1月份就成立了一只基金,发行规模达4000万元,是继今年7月之后历史上第二高的单月发行规模。

    06-18

  • 登特菲完成数千万Pre-A轮融资,持续加大研发投入和生态拓展

    登特菲完成数千万Pre-A轮融资,持续加大研发投入和生态拓展

    投资界(ID:pedaily)4月24日消息,合肥登特菲医疗器械有限公司近日公告完成数千万元Pre-A轮融资。 本轮融资由海恒资本、宏博资本、创谷资本、合肥天使投资基金共同投资。 本轮融资完成后,登特菲将继续加大智能高端口腔医疗设备的创新研发,加大量产线建设,加速产品迭代和

    06-17

  • 安好时代:APP上线100天内用户数达数十万

    安好时代:APP上线100天内用户数达数十万

    2019年6月6日,北京安好时代科技发展有限公司发布了公司首款移动产品——安好APP,目前活跃用户量已达45万。 据悉,安好时代整合了大量医疗资源,打造了一支由数十名专职医生组成的团队。 不少医生入驻安好医生平台,开设个人诊所。 2019年6月28日,安好联合中国老年保健协会

    06-17

  • AI工具库详细介绍——Midjourney

    AI工具库详细介绍——Midjourney

    Midjourney是一款基于AI的图像生成工具,专注于通过文字描述创建高质量的视觉内容。 主要功能: 1.图像生成:Midjourney接受用户通过文字输入的描述,并将这些描述转换为详细的图片。 用户可以指定样式、主题、颜色和其他元素。 2.风格模仿:该工具能够模仿艺术家已知作品的各

    06-17

  • 长期投资是一个可持续的问题,宜信财富宣布《资产配置策略指引》2018年将重点关注这些行业……

    长期投资是一个可持续的问题,宜信财富宣布《资产配置策略指引》2018年将重点关注这些行业……

    2018年,全球宏观经济和各类资产的表现大幅超出市场年初的悲观预期。 它已经悄然到来。 新的一年,投资者可能面临全球经济、政策和政治环境的哪些变化?如何调整投资策略应对风险并实现资产增值?    据投资界1月10日消息,宜信财富正式发布《年资产配置策略指引》(以下简

    06-18

  • 罗永浩评论坚果手机被废弃,王思聪被迫再次执行

    罗永浩评论坚果手机被废弃,王思聪被迫再次执行

    “罗哥,坚果没了”。 “嗯,是好事。 ”近日,字节跳动宣布暂停手机业务,原锤子科技团队成立的新石实验室,并入教育硬件团队。 业务方面,合并后的硬件团队将专注于教育领域,不再开发坚果手机、TNT显示器等其他无关产品。 坚果手机补充称,售后和系统维护将继续进行,手机

    06-17

  • 东车日报|理想失败后车主被要求签署保密协议-特斯拉扩大Model S召回范围-丰田计划在日本停售凯美瑞

    东车日报|理想失败后车主被要求签署保密协议-特斯拉扩大Model S召回范围-丰田计划在日本停售凯美瑞

    阿维塔11单电机版纯电动SUV介绍华为官宣M5高端智能手机驱动版氪倡议:联合抵制网络水军。 比亚迪即将推出“云柴系统”,疑似与底盘有关。 比亚迪新专利:可通过静脉识别解锁车辆理想ONE突然断电,售后服务需要保密协议。 阿维塔11单电机纯电动SUV今日发布。 在晚间的新品发布

    06-21