在第三篇总结了Unresolved Plan的生成过程,在此之后就是将其转换为Analyzed Plan。这这一步主要涉及到QueryExecution、Analyzer、catalog等。
spark.sql() -> Dataset.ofRows :
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
在上面的方法里面,qe.assertAnalyzed() 这个会触发计划树的analyzed,这么做一部分考虑是为了通过绑定catalog里面的数据信息,及时发现一些错误,例如函数名、字段名不匹配找不到。
真正的analyzed的操作是在调用QueryExecution的实例化analyzed变量的时候进行的。
def assertAnalyzed(): Unit = analyzed
lazy val analyzed: LogicalPlan = {
SparkSession.setActiveSession(sparkSession)
sparkSession.sessionState.analyzer.executeAndCheck(logical)
}
那么QueryExecution以及analyzer还有catalog在什么时候实例化的呢?
这个是在sparkSession中:
lazy val sessionState: SessionState = {
parentSessionState
.map(_.clone(this))
.getOrElse {
val state = SparkSession.instantiateSessionState(
SparkSession.sessionStateClassName(sparkContext.conf),
self)
initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v) }
state
}
}
这里其实是instantiateSessionState() -> BaseSessionStateBuilder.build()方法中build SessionState确定的:
/**
* Build the [[SessionState]].
*/
def build(): SessionState = {
new SessionState(
session.sharedState,
conf,
experimentalMethods,
functionRegistry,
udfRegistration,
() => catalog,
sqlParser,
() => analyzer,
() => optimizer,
planner,
streamingQueryManager,
listenerManager,
() => resourceLoader,
createQueryExecution,
createClone)
}
Analyzer:
protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:
customResolutionRules
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
PreprocessTableCreation(session) +:
PreprocessTableInsertion(conf) +:
DataSourceAnalysis(conf) +:
customPostHocResolutionRules
override val extendedCheckRules: Seq[LogicalPlan => Unit] =
PreWriteCheck +:
PreReadCheck +:
HiveOnlyCheck +:
customCheckRules
}
从上面可以看到Analyzer携带了catalog和sqlconf的引用,所以analyzed中主要在这里面进行;analyzed主要是:
sparkSession.sessionState.analyzer.executeAndCheck(logical)
这一行代码中executeAndCheck内容在Analyzer中:
def executeAndCheck(plan: LogicalPlan): LogicalPlan = {
val analyzed = execute(plan)
try {
checkAnalysis(analyzed)
EliminateBarriers(analyzed)
} catch {
case e: AnalysisException =>
val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed))
ae.setStackTrace(e.getStackTrace)
throw ae
}
}
val analyzed = execute(plan)这行其实调用的是RuleExecutor.execute;
def execute(plan: TreeType): TreeType = {
var curPlan = plan
val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
batches.foreach { batch =>
val batchStartPlan = curPlan
var iteration = 1
var lastPlan = curPlan
var continue = true
// Run until fix point (or the max number of iterations as specified in the strategy.
while (continue) {
curPlan = batch.rules.foldLeft(curPlan) {
case (plan, rule) =>
val startTime = System.nanoTime()
val result = rule(plan)
val runTime = System.nanoTime() - startTime
if (!result.fastEquals(plan)) {
queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
logTrace(
s"""
|=== Applying Rule ${rule.ruleName} ===
|${sideBySide(plan.treeString, result.treeString).mkString("\n")}
""".stripMargin)
}
queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
queryExecutionMetrics.incNumExecution(rule.ruleName)
// Run the structural integrity checker against the plan after each rule.
if (!isPlanIntegral(result)) {
val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
"the structural integrity of the plan is broken."
throw new TreeNodeException(result, message, null)
}
result
}
iteration += 1
if (iteration > batch.strategy.maxIterations) {
// Only log if this is a rule that is supposed to run more than once.
if (iteration != 2) {
val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
if (Utils.isTesting) {
throw new TreeNodeException(curPlan, message, null)
} else {
logWarning(message)
}
}
continue = false
}
if (curPlan.fastEquals(lastPlan)) {
logTrace(
s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
continue = false
}
lastPlan = curPlan
}
if (!batchStartPlan.fastEquals(curPlan)) {
logDebug(
s"""
|=== Result of Batch ${batch.name} ===
|${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")}
""".stripMargin)
} else {
logTrace(s"Batch ${batch.name} has no effect.")
}
}
curPlan
}
}
从上面的代码可以看出,在迭代应用rule的退出条件就是大于batch现在的迭代次数,或者前一次迭代和上次一样。RuleExecutor内部提供了Seq[Batch],子类中可以规定具体的Batch,每个Batch都包含一系列的策略和规则,这里的RuleExecutor.execute只是逻辑的依次遍历batch,然后应用具体的rule,这些具体的batch和rule都由子类去实现。
到这里其实我们主要关心的就可以放在Analyzer中重写的batch的部分,看看到底都有哪些规则和策略:
lazy val batches: Seq[Batch] = Seq(
Batch("Hints", fixedPoint,
new ResolveHints.ResolveBroadcastHints(conf),
ResolveHints.RemoveAllHints),
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Substitution", fixedPoint,
CTESubstitution,
WindowsSubstitution,
EliminateUnions,
new SubstituteUnresolvedOrdinals(conf)),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions ::
ResolveRelations :: // 解析表、列名与树中节点进行绑定
ResolveReferences ::
ResolveCreateNamedStruct ::
ResolveDeserializer ::
ResolveNewInstance ::
ResolveUpCast ::
ResolveGroupingAnalytics ::
ResolvePivot ::
ResolveOrdinalInOrderByAndGroupBy ::
ResolveAggAliasInGroupBy ::
ResolveMissingReferences ::
ExtractGenerator ::
ResolveGenerate ::
ResolveFunctions :: // 解析函数
ResolveAliases ::
ResolveSubquery ::
ResolveSubqueryColumnAliases ::
ResolveWindowOrder ::
ResolveWindowFrame ::
ResolveNaturalAndUsingJoin ::
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
TimeWindowing ::
ResolveInlineTables(conf) ::
ResolveTimeZone(conf) ::
ResolvedUuidExpressions ::
TypeCoercion.typeCoercionRules(conf) ++
extendedResolutionRules : _*),
Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
Batch("View", Once,
AliasViewChild(conf)),
Batch("Nondeterministic", Once,
PullOutNondeterministic),
Batch("UDF", Once,
HandleNullInputsForUDF),
Batch("FixNullability", Once,
FixNullability),
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
CleanupAliases)
)
这里主要看看ResolveRelations和ResolveFunctions看看是怎么和catalog里面的表、函数绑定的:
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
case v: View =>
u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
case other => i.copy(table = other)
}
case u: UnresolvedRelation => resolveRelation(u)
}
// Look up the table with the given name from catalog. The database we used is decided by the
// precedence:
// 1. Use the database part of the table identifier, if it is defined;
// 2. Use defaultDatabase, if it is defined(In this case, no temporary objects can be used,
// and the default database is only used to look up a view);
// 3. Use the currentDb of the SessionCatalog.
private def lookupTableFromCatalog(
u: UnresolvedRelation,
defaultDatabase: Option[String] = None): LogicalPlan = {
val tableIdentWithDb = u.tableIdentifier.copy(
database = u.tableIdentifier.database.orElse(defaultDatabase))
try {
catalog.lookupRelation(tableIdentWithDb)
} catch {
case e: NoSuchTableException =>
u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}", e)
// If the database is defined and that database is not found, throw an AnalysisException.
// Note that if the database is not defined, it is possible we are looking up a temp view.
case e: NoSuchDatabaseException =>
u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}, the " +
s"database ${e.db} doesn‘t exist.", e)
}
}
上面是绑定表信息的过程,主要是就是调用lookupTableFromCatalog在catalog中查找匹配,其次关于函数的匹配也类似
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case q: LogicalPlan =>
q transformExpressions {
case u if !u.childrenResolved => u // Skip until children are resolved.
case u: UnresolvedAttribute if resolver(u.name, VirtualColumn.hiveGroupingIdName) =>
withPosition(u) {
Alias(GroupingID(Nil), VirtualColumn.hiveGroupingIdName)()
}
case u @ UnresolvedGenerator(name, children) =>
withPosition(u) {
catalog.lookupFunction(name, children) match {
case generator: Generator => generator
case other =>
failAnalysis(s"$name is expected to be a generator. However, " +
s"its class is ${other.getClass.getCanonicalName}, which is not a generator.")
}
}
case u @ UnresolvedFunction(funcId, children, isDistinct) =>
withPosition(u) {
catalog.lookupFunction(funcId, children) match {
// AggregateWindowFunctions are AggregateFunctions that can only be evaluated within
// the context of a Window clause. They do not need to be wrapped in an
// AggregateExpression.
case wf: AggregateWindowFunction =>
if (isDistinct) {
failAnalysis(s"${wf.prettyName} does not support the modifier DISTINCT")
} else {
wf
}
// We get an aggregate function, we need to wrap it in an AggregateExpression.
case agg: AggregateFunction => AggregateExpression(agg, Complete, isDistinct)
// This function is not an aggregate function, just return the resolved one.
case other =>
if (isDistinct) {
failAnalysis(s"${other.prettyName} does not support the modifier DISTINCT")
} else {
other
}
}
}
}
}
def lookupFunction(
name: FunctionIdentifier,
children: Seq[Expression]): Expression = synchronized {
// Note: the implementation of this function is a little bit convoluted.
// We probably shouldn‘t use a single FunctionRegistry to register all three kinds of functions
// (built-in, temp, and external).
if (name.database.isEmpty && functionRegistry.functionExists(name)) {
// This function has been already loaded into the function registry.
return functionRegistry.lookupFunction(name, children)
}
// If the name itself is not qualified, add the current database to it.
val database = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val qualifiedName = name.copy(database = Some(database))
if (functionRegistry.functionExists(qualifiedName)) {
// This function has been already loaded into the function registry.
// Unlike the above block, we find this function by using the qualified name.
return functionRegistry.lookupFunction(qualifiedName, children)
}
// The function has not been loaded to the function registry, which means
// that the function is a permanent function (if it actually has been registered
// in the metastore). We need to first put the function in the FunctionRegistry.
// TODO: why not just check whether the function exists first?
val catalogFunction = try {
externalCatalog.getFunction(database, name.funcName)
} catch {
case _: AnalysisException => failFunctionLookup(name)
case _: NoSuchPermanentFunctionException => failFunctionLookup(name)
}
loadFunctionResources(catalogFunction.resources)
// Please note that qualifiedName is provided by the user. However,
// catalogFunction.identifier.unquotedString is returned by the underlying
// catalog. So, it is possible that qualifiedName is not exactly the same as
// catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
// At here, we preserve the input from the user.
registerFunction(catalogFunction.copy(identifier = qualifiedName), overrideIfExists = false)
// Now, we need to create the Expression.
functionRegistry.lookupFunction(qualifiedName, children)
}
函数的绑定是在functionRegistry中寻找绑定;
其他的规则和策略感兴趣的可以阅读源码来学习。这一步之后就生成了Analyzed Plan。
Spark SQL(4)-Unresolved Plan到Analyzed Plan
原文:https://www.cnblogs.com/ldsggv/p/13380523.html