镁伽科技黄瑜清:智能自动化给生命科学带来巨大变革
06-18
【往期链接】:Spark的基本流程(一)前言本文参考徐立杰老师的《大数据处理框架Apache Spark设计与实现》,在此记录相关笔记,并添加一些个人理解,如有请指正我错了。参考链接:Spark逻辑处理流程概述 上一章提到了逻辑处理流程(Logical plan),本章将详细讲解。
一般来说,逻辑处理流程可以分为四个部分: 数据源(Data Block):即数据从哪里来,比如 HDFS、k-y 形式的 HBase 等。 数据模型:简单来说可以是理解为抽象数据类型。
例如,在MapReduce中,它是
本质上是一个抽象类。与ArrayList等数据类型相比,RDD有两个主要区别:它只是一个逻辑概念。
简单来说,如果用户不进行cache(),RDD就不会存储在内存中,计算完成后就会消失。 ArrayList等数据结构都是驻留在内存中的。
可以包含多个分区。毕竟Spark处理的是分布式计算,必须支持多个分区。
数据操作:也就是我们可以用RDD做什么。简单来说,分为两个操作,一个是transformation(),另一个是action():transformation():顾名思义,就是对RDD到处改变。
这里注意,RDD本身是不能修改的,也就是说每次变换都完成了。创建一个新的,之前的不会被移动。
(结合上一点,就是说如果不使用cache(),那么在transformation()完成之前RDD就没有了)action():是计算最终的结果。 action() 会触发 Spark 提交作业,从而真正开始执行。
例如,count() 一次和 show() 一次。 (所以使用时需要注意,如果前面的RDD或者Dataframe中没有cache(),后面又重复count()和show(),就会重复计算。
)计算处理结果:通常分布式计算结果分为两种类型。 :无需在Driver端计算:例如直接将结果保存在分布式文件系统(如HDFS)中。
rdd.save("").需要在Driver端计算:比如count()需要使用一个task来统计每个RDD中分区元素的数量,然后聚合到Driver中,最后相加进行计算。 2.2 Spark逻辑处理过程生成方法 对于Spark来说,需要一种通用的方法,能够自动将应用转化为确定性的逻辑处理过程,即RDD之间的数据依赖关系。
因此,需要解决三个问题:如何生成R5DD,以及生成什么样的RDD?如何建立RDD之间的数据依赖关系?如何计算RDD中的数据? 2.2.1 如何生成RDD,生成什么样的RDD?一般来说,执行Transformer操作时会产生RDD。这里有两种类似的情况:一个 Transformer 只创建一个 RDD。
比如一些简单的操作比如map(func)。一个 Transformer 创建多个 RDD。
例如,更复杂的操作如join()和distinct(),中间会生成多个RDD,但最终只会返回一个用户。下面展示了一些常用的 Transformation() 生成的 RDD: TransformationGenerated RDDsCompute()map(func)MappedRDDiterator(split).map(f)filter(func)FilteredRDDiterator(split).filter(f)flatMap(func)FlatMappedRDiterator( split).flatMap(f)mapPartitions(func)MapPartitionsRDDf(iterator(split))mapPartitionsWithIndex(func)MapPartitionsRDDf(split.index, iterator(split))sample(withReplacement,fraction,seed)PartitionwiseSampledRDDPoissonSampler.sample(iterator(split)) BernoulliSampler.sample(iterator(split))pipe(command, [envVars])PipedRDDunion(otherDataset) 生成多个 RDDintersection(otherDataset) 生成多个 RDDdistinct([numTasks])) 生成多个 RDDgroupByKey([numTasks]) 生成多个 RDDreduceByKey(func, [numTasks]) 生成多个 RDDsortByKey([ascending], [numTasks]) 生成多个 RDDjoin(otherDataset, [numTasks]) 生成多个 RDDcogroup(otherDataset, [numTasks]) 生成多个 RDDcartesian(otherDataset) 生成多个 RDDcoalesce(numPartitions) 生成多个RDDrepartition(numPartitions) 生成多个RDD2.2.2 RDD之间如何建立数据依赖关系?如上所述,transformation()之后会生成一堆RDD。
下一个问题是这些 RDD 之间的数据依赖关系是什么。一般来说可以分为两类:窄依赖(NarrowDependency)和宽依赖(ShuffleDependency)。
区分这两类依赖关系的依据是:生成的子RDD的每个分区是否完全依赖于父RDD的每个分区的全部或部分。如果依赖于每个分区的整个分区,则为窄依赖;如果仅依赖于每个分区的一部分,则为宽依赖。
划分NarrowDependency和ShuffleDependency的原因是为了生成物理执行图。 1)窄依赖(NarrowDependency)一对一依赖(OneToOneDependency)(1:1)一对一映射关系,如map()、filter()。
RangeDependency(1:1)可以理解为区域一对一,比如union(),如上图所示。多对一依赖(非官方定义,属于 NarrowDependency)(N:1)表示子 RDD 中的某个分区依赖于多个父 RDD 中的分区,例如 join()、cogroup()。
下图是 join() 的一个例子,它将两个 RDD 聚合在一起。首先执行 cogroup() 获得类型为
多对多依赖(非官方定义,属于NarrowDependency)(N:N)是指子RDD中的一个分区依赖于多个父RDD中的分区,一个父RDD又依赖于多个子RDD。常见的是 cartesian()2) 宽依赖(ShuffleDependency)。
宽依赖也可以理解为“部分依赖”。宽依赖和MapReduce中shuffle的数据依赖是一样的(mapper对其输出进行分区,然后每个reducer对所有mapper进行分区,输出中自己的分区通过HTTP fetch获取)。
从上图可以看出,广泛依赖的子RDD的分区只会依赖于父RDD的一部分。为什么要区分窄依赖和宽依赖?主要原因是在执行过程中,窄依赖关系可以在同一阶段进行流水线化,而无需进行shuffle。
顾名思义,宽依赖需要 shuffle。另外,这样区分的话,也很容易实现。
2.2.3 如何计算RDD中的数据。上面我们讲了RDD之间的依赖关系。
现在我们来谈谈如何计算每个RDD的每个分区中的数据(记录)。 Spark的大部分transformation()函数与映射函数类似,都有固定的计算方法,称为“控制流”。
下图给出了两个例子。虽然两者在数据依赖上都具有一对一的依赖关系(OneToOneDependency),但是两者的func的“控制流程”是不同的:map(fuc):读取一条记录,处理一条记录。
记录,然后吐出处理后的mapPartitions(fuc):它处理分区中的所有记录,然后集中输出。
版权声明:本文内容由互联网用户自发贡献,本站不拥有所有权,不承担相关法律责任。如果发现本站有涉嫌抄袭的内容,欢迎发送邮件 举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。
标签:
相关文章
06-18
06-18
06-18
06-17
最新文章
【玩转GPU】ControlNet初学者生存指南
【实战】获取小程序中用户的城市信息(附源码)
包雪雪简单介绍Vue.js:开学
Go进阶:使用Gin框架简单实现服务端渲染
线程池介绍及实际案例分享
JMeter 注释 18 - JMeter 常用配置组件介绍
基于Sentry的大数据权限解决方案
【云+社区年度征文集】GPE监控介绍及使用