Spark 之 Aggregate
Aggregate
参考链接:
- https://github.com/PZXWHU/SparkSQL-Kernel-Profiling
完整的聚合查询的关键字包括 group by、 cube、 grouping sets 和 rollup 4 种 。 分组语句 group by 后面可以是一个或多个分组表达式( groupingExpressions )。
聚合查询还支持 OLAP 场景下的多维分析,包括 rollup、 cube 和 grouping sets 3 种操作 。
逻辑节点 Aggregate
逻辑算子树节点通过分组表达式列表( groupingExpressions )、聚合表达式列表( aggregateExpressions )和子节点( child )构造而成,
其中分组表达式类型都是 Expression ,而聚合表达式类型都是 NamedExpression ,意味着聚合表达式一般都需要设置名字。
aggregateExpressions 对应聚合函数,而 resultExpressions 则包含了 Select 语句中选择的所有列信息 。
示例之 partial Aggregate 对应 logical plan
里面的mode 直接也是 Complete
示例之 final Aggregate 对应 logical plan
NamedExpression (这里对应的是Alias) 里 的child 是 AggregateFunction,里面的mode 直接就是 Complete
case class Alias(child: Expression, name: String)
case class Alias(child: Expression, name: String)(
val exprId: ExprId = NamedExpression.newExprId,
val qualifier: Seq[String] = Seq.empty,
val explicitMetadata: Option[Metadata] = None,
val nonInheritableMetadataKeys: Seq[String] = Seq.empty)
extends UnaryExpression with NamedExpression {
物理 Aggregate
对于聚合查询,逻辑算子树转换为物理算子树,必不可少的是 Aggregation 转换策略 。 实际上, Aggregation 策略是基于 PhysicalAggregation 的 。 与 PhysicalOperation 类似,PhysicalAggregation 也是一种逻辑算子树的模式,用来匹配逻辑算子树中的 Aggregate 节点并提取该节点中的相关信息 。 PhysicalAggregation 在提取信息时会进行以下转换 。
select id, count(name) from student group by id
聚合模式
在 SparkSQL 中,聚合过程有 4 种模式,分别是 Partial 模式、 ParitialMerge 模式、 Final 模式 和 Complete 模式 。
上述聚合过程
中在 map 阶段的 sum 函数处于 Partial 模式,在 reduce 阶段的 sum 函数处于 Final 模式。
Complete 模式和Partial/Final 组合方式不一样,不进行局部聚合计算 。
ParitialMerge 主要应用在 distinct 语句中,如图 、所示 。聚合语句针对同一张表进行 sum 和 count (distinct)查询,最终的执行过程包含了 4 步聚合操作 。
- 第 1 步按照( A,C)分组,对 sum 函数进行 Partial 模式聚合计算;
- 第 2 步是 PartialMerge 模式,对上一步计算之后的聚合缓冲区进行合井,但此时仍然不是最终的结果;
- 第 3 步分组的列发生变化,再一次进行 Partial 模式的 count 计算;
- 第 4 步完成 Final 模式的最终计算 。
HashAggregate
常见的聚合查询语句通常采用 HashAggregate 方式,当存在以下几种情况时,会用 SortAggregate 方式来执行 。
- 查询中存在不支持 Partial 方式的聚合函数:此时会调用 AggUtils 中的 planAggregateWithoutPartial 方法,直接生成 SortAggregateExec 聚合算子节点 。
- 聚合函数结果不支持 Buffer 方式:如果结果类型不属于(NullType, BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DateType, TimestampType,DecimalType]集合中的任意一种,则需要执行 SortAggregateExec 方式,例如 collect_set和 collect_list 函数。
- 内存不足:如果在 HashAggregate 执行过程中,内存空间己捕,那么聚合执行会切换到 SortAggregateExec 方式。
注意:
spark 2.2 之后去掉了planAggregateWithoutPartial
参见:
https://issues.apache.org/jira/browse/SPARK-19060
https://github.com/apache/spark/pull/16461
Expand
逻辑计划阶段:
GroupingSets 节点转换为 Aggregate+Expand+Pr付出t3 个节点的组合 。 顾名思义, Expand 表示“扩展”,多维分析在本质上相当于执行多种组合的 group by 操作,因此 Expand 所起的作用就是将一条数据扩展为特定形式的多条数据。
需要注意的是, Expand 方式执行多维分析虽然能够达到只读一次数据表的效果,但是在某些场景下容易造成中间数据的膨胀。 例如,数据的维度太高, Expand 会产生指数级别的数据量 。 针对这种情况,可以进行相应的优化。
AggregateMode
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode
sealed trait AggregateMode
/**
* An [[AggregateFunction]] with [[Partial]] mode is used for partial aggregation.
* This function updates the given aggregation buffer with the original input of this
* function. When it has processed all input rows, the aggregation buffer is returned.
*/
case object Partial extends AggregateMode
/**
* An [[AggregateFunction]] with [[PartialMerge]] mode is used to merge aggregation buffers
* containing intermediate results for this function.
* This function updates the given aggregation buffer by merging multiple aggregation buffers.
* When it has processed all input rows, the aggregation buffer is returned.
*/
case object PartialMerge extends AggregateMode
/**
* An [[AggregateFunction]] with [[Final]] mode is used to merge aggregation buffers
* containing intermediate results for this function and then generate final result.
* This function updates the given aggregation buffer by merging multiple aggregation buffers.
* When it has processed all input rows, the final result of this function is returned.
*/
case object Final extends AggregateMode
/**
* An [[AggregateFunction]] with [[Complete]] mode is used to evaluate this function directly
* from original input rows without any partial aggregation.
* This function updates the given aggregation buffer with the original input of this
* function. When it has processed all input rows, the final result of this function is returned.
*/
case object Complete extends AggregateMode
Aggregate 之 inputBufferOffset
org.apache.spark.sql.execution.aggregate.HashAggregateExec
case class HashAggregateExec(
requiredChildDistributionExpressions: Option[Seq[Expression]],
isStreaming: Boolean,
numShufflePartitions: Option[Int],
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
extends AggregateCodegenSupport {
val aggregationIterator =
new TungstenAggregationIterator(
partIndex,
groupingExpressions,
aggregateExpressions,
aggregateAttributes,
initialInputBufferOffset,
resultExpressions,
(expressions, inputSchema) =>
MutableProjection.create(expressions, inputSchema),
inputAttributes,
iter,
testFallbackStartsAt,
numOutputRows,
peakMemory,
spillSize,
avgHashProbe,
numTasksFallBacked)
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
extends AggregationIterator(
partIndex,
groupingExpressions,
originalInputAttributes,
aggregateExpressions,
aggregateAttributes,
initialInputBufferOffset,
resultExpressions,
newMutableProjection) with Logging {
org.apache.spark.sql.execution.aggregate.AggregationIterator
protected val aggregateFunctions: Array[AggregateFunction] =
initializeAggregateFunctions(aggregateExpressions, initialInputBufferOffset)
for (expression <- expressions) {
val func = expression.aggregateFunction
val funcWithBoundReferences: AggregateFunction = expression.mode match {
case Partial | Complete if func.isInstanceOf[ImperativeAggregate] =>
// We need to create BoundReferences if the function is not an
// expression-based aggregate function (it does not support code-gen) and the mode of
// this function is Partial or Complete because we will call eval of this
// function's children in the update method of this aggregate function.
// Those eval calls require BoundReferences to work.
BindReferences.bindReference(func, inputAttributeSeq)
case _ =>
// We only need to set inputBufferOffset for aggregate functions with mode
// PartialMerge and Final.
val updatedFunc = func match {
case function: ImperativeAggregate =>
function.withNewInputAggBufferOffset(inputBufferOffset)
case function => function
}
inputBufferOffset += func.aggBufferSchema.length
updatedFunc
}
可见 inputBufferOffset 对 Partial | Complete 无效
ObjectHashAggregateExec
参考链接:
- https://dataninjago.com/2022/01/09/spark-sql-query-engine-deep-dive-10-hashaggregateexec-objecthashaggregateexec/
- https://blog.csdn.net/monkeyboy_tech/article/details/123759074
While the HashAggregateExec, backed by the Tungsten execution engine(基于Tungsten执行引擎), performs well for aggregation operations, it can only support the mutable primitive data type with a fixed size. For the user-defined aggregation functions (UDAFs) and some collect functions (e.g. collect_list and collect_set), they are not supported by the HashAggregateExec. Prior Spark 2.2.0, they have to fall back to the less performant SortAggregateExec. Since Spark 2.2.0, the ObjectHashAggregateExec is released to fill this gap, which enables the performant hash-based aggregations on the data types that are not supported by HashAggregateExec.
原文地址:https://blog.csdn.net/zhixingheyi_tian/article/details/143864452
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!