前言
最近对SparkSql的执行流程有更全面的了解,故写下本文以备后用。
首先一图流:
资料转载自:
https://www.iteblog.com/archives/2561.html
https://www.iteblog.com/archives/2562.html
https://www.iteblog.com/archives/2563.html
一、SparkSqlParser
这是sql的 解析阶段,从spark2.0后引入 ANTLR,anothor tool for language recognition
通过ANTLR 构建出 Logical Plan 逻辑计划。
这时候的 Logical Plan 之所以要叫 “Unresolved Logical Plan ”,是因为它只是一个数据结构,并不具有数据库、数据表、分区和函数等信息。
举例SQL
SELECT
sum(v)
FROM (
SELECT
t1.id,
1 + 2 + t1.value AS v
FROM t1
JOIN t2
on t1.id = t2.id
AND t1.cid = 1
AND t1.did = t1.cid + 1
AND t2.id > 5
) iteblog
where 1=1
逻辑计划长这样
== Parsed Logical Plan ==
'Project [unresolvedalias('sum('v), None)]
+- 'SubqueryAlias `iteblog`
+- 'Project ['t1.id, ((1 + 2) + 't1.value) AS v#16]
+- 'Filter ((('t1.id = 't2.id) && ('t1.cid = 1)) && (('t1.did = ('t1.cid + 1)) && ('t2.id > 5)))
+- 'Join Inner
:- 'UnresolvedRelation `t1`
+- 'UnresolvedRelation `t2`
二、Analyzer
这个阶段绑定逻辑计划,分析器会根据事先定义的rule,对逻辑计划进行transform,在这个阶段输出的是函数资源信息和元数据信息,即 Analyzed Logical Plan。
三、Optimizer
原本 Analyzed Logical Plan 是已经足够提供到转换成物理计划,但当前这个阶段是没有任何优化的。
因此 Optimizer实现的是基于RBO( Rule-base Optimizer ) 规则进行优化,生产更优的逻辑算子树。
在这个阶段主要实现的优化包括:
谓词下推
尽可能把过滤条件放到最底层,最好是能直接从数据源开始进行过滤,从而减少join算子的数据量,提升计算速度。
列裁剪
选择需要的列,减少数据传输,提高数据扫描速度。
常量替换
把可以转换成常量的变量,优先进行转换。
常量累加
把常量表达式提前算好,提前找出条件中的固定值。
四、Planner
在这个节点,逻辑计划会被根据一定的策略,转换为物理计划 Physical Plan;
其次再根据CBO( Cost-based Optimization),代价优化,再进一步优化,输出 Seleted Physical Plan。
物理计划长这样:
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[sum(cast(v#16 as bigint))], output=[sum(v)#22L])
+- Exchange SinglePartition
+- *(2) HashAggregate(keys=[], functions=[partial_sum(cast(v#16 as bigint))], output=[sum#24L])
+- *(2) Project [(3 + value#1) AS v#16]
+- *(2) BroadcastHashJoin [id#0], [id#8], Inner, BuildRight
:- *(2) Project [id#0, value#1]
: +- *(2) Filter (((((isnotnull(cid#2) && isnotnull(did#3)) && (cid#2 = 1)) && (did#3 = 2)) && (id#0 > 5)) && isnotnull(id#0))
: +- *(2) FileScan csv [id#0,value#1,cid#2,did#3] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/iteblog/t1.csv], PartitionFilters: [], PushedFilters: [IsNotNull(cid), IsNotNull(did), EqualTo(cid,1), EqualTo(did,2), GreaterThan(id,5), IsNotNull(id)], ReadSchema: struct<id:int,value:int,cid:int,did:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
+- *(1) Project [id#8]
+- *(1) Filter (isnotnull(id#8) && (id#8 > 5))
+- *(1) FileScan csv [id#8] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/iteblog/t2.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,5)], ReadSchema: struct<id:int>
五、RDDs
虽然已经生成了物理计划了,但spark最后还会根据一定的rule进行一个预处理的过程,具体过程有
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
PlanSubqueries(sparkSession), //特殊子查询物理计划处理
EnsureRequirements(sparkSession.sessionState.conf), //确保执行计划分区与排序正确性
CollapseCodegenStages(sparkSession.sessionState.conf), //代码生成
ReuseExchange(sparkSession.sessionState.conf), //节点重用
ReuseSubquery(sparkSession.sessionState.conf)) //子查询重用
而在这个过程中,就有了 CollapseCodegenStages 代码生成。
1、全阶段代码生成阶段 WholeStageCodegen
为什么要代码生成?
首先绝大多数数据库引擎是把sql翻译成一系列的关系代数算子或者表达式,然后依赖这些算子逐条处理输入数据并产生结果。
这个缺点的存在大量虚函数调用,会引起CPU中断。
而手写代码执行效率比这个方式要高得多。
因此,从spark2.0引入了WholeStageCodegen,目的是模拟手写代码,而生成代码,从而提高sparksql执行效率。
Tungsten分为三个部分:
表达式代码生成(expression codegen)
全阶段代码生成(Whole-stage Code Generation)
加速序列化和反序列化(speed up serialization/deserialization)
2、代码编译
生成了代码后,需要把代码编译然后加载到JVM中,spark引进了Janino缩短了sql表达式的编译时间。
在这个阶段需要注意的是,代码生成是在Driver端进行的,代码编译是在Executor进行的。
3、SQL执行
生成了代码后,Spark就会执行得到结果。