SparkSQL(6)OptimizedPlan [数据库教程]

database

Spark SQL(6) OptimizedPlan

在这一步spark sql主要应用一些规则,优化生成的Resolved Plan,这一步涉及到的有Optimizer。

之前介绍在sparksession实例化的是会实例化sessionState,进而确定QueryExecution、Analyzer,Optimizer也是在这一步确定的:

  protected def optimizer: Optimizer = {

new SparkOptimizer(catalog, experimentalMethods) {

override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =

super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules

}

}

  Optimizer也是RuleExecutor的子类,而SparkOptimizer是Optimizer子类,在analyzed步骤知道,其实主要规则就是RuleExecutor子类定义的batchs的规则

sparkOptimizer:

 override def batches: Seq[Batch] = (preOptimizationBatches ++ super.batches :+

Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+

Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+

Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+

Batch("Push down operators to data source scan", Once, PushDownOperatorsToDataSource)) ++

postHocOptimizationBatches :+

Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)

Optimizer:

def batches: Seq[Batch] = {

val operatorOptimizationRuleSet =

Seq(

// Operator push down

PushProjectionThroughUnion,

ReorderJoin,

EliminateOuterJoin,

PushPredicateThroughJoin,

PushDownPredicate,

LimitPushDown,

ColumnPruning,

InferFiltersFromConstraints,

// Operator combine

CollapseRepartition,

CollapseProject,

CollapseWindow,

CombineFilters,

CombineLimits,

CombineUnions,

// Constant folding and strength reduction

NullPropagation,

ConstantPropagation,

FoldablePropagation,

OptimizeIn,

ConstantFolding,

ReorderAssociativeOperator,

LikeSimplification,

BooleanSimplification,

SimplifyConditionals,

RemoveDispensableExpressions,

SimplifyBinaryComparison,

PruneFilters,

EliminateSorts,

SimplifyCasts,

SimplifyCaseConversionExpressions,

RewriteCorrelatedScalarSubquery,

EliminateSerialization,

RemoveRedundantAliases,

RemoveRedundantProject,

SimplifyCreateStructOps,

SimplifyCreateArrayOps,

SimplifyCreateMapOps,

CombineConcats) ++

extendedOperatorOptimizationRules

val operatorOptimizationBatch: Seq[Batch] = {

val rulesWithoutInferFiltersFromConstraints =

operatorOptimizationRuleSet.filterNot(_ == InferFiltersFromConstraints)

Batch("Operator Optimization before Inferring Filters", fixedPoint,

rulesWithoutInferFiltersFromConstraints: _*) ::

Batch("Infer Filters", Once,

InferFiltersFromConstraints) ::

Batch("Operator Optimization after Inferring Filters", fixedPoint,

rulesWithoutInferFiltersFromConstraints: _*) :: Nil

}

(Batch("Eliminate Distinct", Once, EliminateDistinct) ::

// Technically some of the rules in Finish Analysis are not optimizer rules and belong more

// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).

// However, because we also use the analyzer to canonicalized queries (for view definition),

// we do not eliminate subqueries or compute current time in the analyzer.

Batch("Finish Analysis", Once,

EliminateSubqueryAliases,

EliminateView,

ReplaceExpressions,

ComputeCurrentTime,

GetCurrentDatabase(sessionCatalog),

RewriteDistinctAggregates,

ReplaceDeduplicateWithAggregate) ::

//////////////////////////////////////////////////////////////////////////////////////////

// Optimizer rules start here

//////////////////////////////////////////////////////////////////////////////////////////

// - Do the first call of CombineUnions before starting the major Optimizer rules,

// since it can reduce the number of iteration and the other rules could add/move

// extra operators between two adjacent Union operators.

// - Call CombineUnions again in Batch("Operator Optimizations"),

// since the other rules might make two separate Unions operators adjacent.

Batch("Union", Once,

CombineUnions) ::

Batch("Pullup Correlated Expressions", Once,

PullupCorrelatedPredicates) ::

Batch("Subquery", Once,

OptimizeSubqueries) ::

Batch("Replace Operators", fixedPoint,

ReplaceIntersectWithSemiJoin,

ReplaceExceptWithFilter,

ReplaceExceptWithAntiJoin,

ReplaceDistinctWithAggregate) ::

Batch("Aggregate", fixedPoint,

RemoveLiteralFromGroupExpressions,

RemoveRepetitionFromGroupExpressions) :: Nil ++

operatorOptimizationBatch) :+

Batch("Join Reorder", Once,

CostBasedJoinReorder) :+

Batch("Decimal Optimizations", fixedPoint,

DecimalAggregates) :+

Batch("Object Expressions Optimization", fixedPoint,

EliminateMapObjects,

CombineTypedFilters) :+

Batch("LocalRelation", fixedPoint,

ConvertToLocalRelation,

PropagateEmptyRelation) :+

// The following batch should be executed after batch "Join Reorder" and "LocalRelation".

Batch("Check Cartesian Products", Once,

CheckCartesianProducts) :+

Batch("RewriteSubquery", Once,

RewritePredicateSubquery,

ColumnPruning,

CollapseProject,

RemoveRedundantProject)

}

  如上这便是在优化这步的所有的规则和策略例如消除子查询别名,表达式替换、算子下推、常量折叠等优化规则,经过这一步之后,就进入物理计划阶段了。

Spark SQL(6) OptimizedPlan

以上是 SparkSQL(6)OptimizedPlan [数据库教程] 的全部内容, 来源链接: utcz.com/z/534875.html

回到顶部