Spark 之 EnsureRequirements
private def ensureDistributionAndOrdering(
parent: Option[SparkPlan],
originalChildren: Seq[SparkPlan],
requiredChildDistributions: Seq[Distribution],
requiredChildOrderings: Seq[Seq[SortOrder]],
shuffleOrigin: ShuffleOrigin): Seq[SparkPlan] = {
assert(requiredChildDistributions.length == originalChildren.length)
assert(requiredChildOrderings.length == originalChildren.length)
// Ensure that the operator's children satisfy their output distribution requirements.
var children = {
case (child, distribution) if child.outputPartitioning.satisfies(distribution) =>
case (child, BroadcastDistribution(mode)) =>
BroadcastExchangeExec(mode, child)
case (child, distribution) =>
val numPartitions = distribution.requiredNumPartitions
ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child, shuffleOrigin)
// Get the indexes of children which have specified distribution requirements and need to be
// co-partitioned.
val childrenIndexes = requiredChildDistributions.zipWithIndex.filter {
case (_: ClusteredDistribution, _) => true
case _ => false
// Special case: if all sides of the join are single partition and it's physical size less than
// or equal spark.sql.maxSinglePartitionBytes.
val preferSinglePartition = childrenIndexes.forall { i =>
children(i).outputPartitioning == SinglePartition &&
.forall(_.stats.sizeInBytes <= conf.getConf(SQLConf.MAX_SINGLE_PARTITION_BYTES))
// If there are more than one children, we'll need to check partitioning & distribution of them
// and see if extra shuffles are necessary.
if (childrenIndexes.length > 1 && !preferSinglePartition) {
val specs = => {
val requiredDist = requiredChildDistributions(i)
s"Expected ClusteredDistribution but found ${requiredDist.getClass.getSimpleName}")
i -> children(i).outputPartitioning.createShuffleSpec(
// Find out the shuffle spec that gives better parallelism. Currently this is done by
// picking the spec with the largest number of partitions.
// NOTE: this is not optimal for the case when there are more than 2 children. Consider:
// (10, 10, 11)
// where the number represent the number of partitions for each child, it's better to pick 10
// here since we only need to shuffle one side - we'd need to shuffle two sides if we pick 11.
// However this should be sufficient for now since in Spark nodes with multiple children
// always have exactly 2 children.
// Whether we should consider `spark.sql.shuffle.partitions` and ensure enough parallelism
// during shuffle. To achieve a good trade-off between parallelism and shuffle cost, we only
// consider the minimum parallelism iff ALL children need to be re-shuffled.
// A child needs to be re-shuffled iff either one of below is true:
// 1. It can't create partitioning by itself, i.e., `canCreatePartitioning` returns false
// (as for the case of `RangePartitioning`), therefore it needs to be re-shuffled
// according to other shuffle spec.
// 2. It already has `ShuffleExchangeLike`, so we can re-use existing shuffle without
// introducing extra shuffle.
// On the other hand, in scenarios such as:
// HashPartitioning(5) <-> HashPartitioning(6)
// while `spark.sql.shuffle.partitions` is 10, we'll only re-shuffle the left side and make it
// HashPartitioning(6).
val shouldConsiderMinParallelism = specs.forall(p =>
!p._2.canCreatePartitioning || children(p._1).isInstanceOf[ShuffleExchangeLike]
// Choose all the specs that can be used to shuffle other children
val candidateSpecs = specs
.filter(p => !shouldConsiderMinParallelism ||
children(p._1).outputPartitioning.numPartitions >= conf.defaultNumShufflePartitions)
val bestSpecOpt = if (candidateSpecs.isEmpty) {
} else {
// When choosing specs, we should consider those children with no `ShuffleExchangeLike` node
// first. For instance, if we have:
// A: (No_Exchange, 100) <---> B: (Exchange, 120)
// it's better to pick A and change B to (Exchange, 100) instead of picking B and insert a
// new shuffle for A.
val candidateSpecsWithoutShuffle = candidateSpecs.filter { case (k, _) =>
val finalCandidateSpecs = if (candidateSpecsWithoutShuffle.nonEmpty) {
} else {
// Pick the spec with the best parallelism
// Check if the following conditions are satisfied:
// 1. There are exactly two children (e.g., join). Note that Spark doesn't support
// multi-way join at the moment, so this check should be sufficient.
// 2. All children are of `KeyGroupedPartitioning`, and they are compatible with each other
// If both are true, skip shuffle.
val isKeyGroupCompatible = parent.isDefined &&
children.length == 2 && childrenIndexes.length == 2 && {
val left = children.head
val right = children(1)
val newChildren = checkKeyGroupCompatible(
parent.get, left, right, requiredChildDistributions)
if (newChildren.isDefined) {
children = newChildren.get
children = {
case ((child, _), idx) if isKeyGroupCompatible || !childrenIndexes.contains(idx) =>
case ((child, dist), idx) =>
if (bestSpecOpt.isDefined && bestSpecOpt.get.isCompatibleWith(specs(idx))) {
} else {
val newPartitioning = { bestSpec =>
// Use the best spec to create a new partitioning to re-shuffle this child
val clustering = dist.asInstanceOf[ClusteredDistribution].clustering
}.getOrElse {
// No best spec available, so we create default partitioning from the required
// distribution
val numPartitions = dist.requiredNumPartitions
child match {
case ShuffleExchangeExec(_, c, so, ps) =>
ShuffleExchangeExec(newPartitioning, c, so, ps)
case _ => ShuffleExchangeExec(newPartitioning, child)
// Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
children = { case (child, requiredOrdering) =>
// If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort.
if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) {
} else {
SortExec(requiredOrdering, global = false, child = child)
ENSURE_REQUIREMENTS 表示自然需求, 不是认为的 reparation 等。