自学内容网 自学内容网

spark sql broadcast 问题

概述

Cannot broadcast the table over … 问题解决

问题

生产中遇到以下问题,以前没有遇到,是因为此次的环境,数据量级激增10多个倍。

2024-03-04 10:36:07,194 INFO operation.ExecuteStatement: Processing root's query[c5423a21-aeca-4fb1-9047-e34eb9bcd103]: RUNNING_STATE -> ERROR_STATE, time taken: 30.172 seconds
2024-03-04 10:36:07.218 INFO org.apache.kyuubi.operation.ExecuteStatement: Query[d6e0ef6d-bdf8-4270-96f9-d9b6ab48a266] in ERROR_STATE
2024-03-04 10:36:07.218 INFO org.apache.kyuubi.operation.ExecuteStatement: Processing root's query[d6e0ef6d-bdf8-4270-96f9-d9b6ab48a266]: RUNNING_STATE -> ERROR_STATE, time taken: 30.196 seconds
Error: org.apache.kyuubi.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: Error operating ExecuteStatement: org.apache.spark.SparkException: Cannot broadcast the table over 357913941 rows: 478354592 rows
        at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotBroadcastTableOverMaxTableRowsError(QueryExecutionErrors.scala:1638)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:141)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:185)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

        at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
        at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.$anonfun$applyOrElse$1(SparkOperation.scala:189)
        at org.apache.kyuubi.Utils$.withLockRequired(Utils.scala:395)
        at org.apache.kyuubi.operation.AbstractOperation.withLockRequired(AbstractOperation.scala:51)
        at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.applyOrElse(SparkOperation.scala:177)
        at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.applyOrElse(SparkOperation.scala:172)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
        at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:88)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$withLocalProperties$1(SparkOperation.scala:155)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.kyuubi.engine.spark.operation.SparkOperation.withLocalProperties(SparkOperation.scala:139)
        at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.executeStatement(ExecuteStatement.scala:78)
        at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:100)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.SparkException: Cannot broadcast the table over 357913941 rows: 478354592 rows
        at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotBroadcastTableOverMaxTableRowsError(QueryExecutionErrors.scala:1638)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:141)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:185)
        ... 4 more

        at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
        at org.apache.kyuubi.operation.ExecuteStatement.waitStatementComplete(ExecuteStatement.scala:129)
        at org.apache.kyuubi.operation.ExecuteStatement.$anonfun$runInternal$1(ExecuteStatement.scala:161)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748) (state=,code=0)

看一下执行计划

== Physical Plan ==
AdaptiveSparkPlan (63)
+- == Current Plan ==
   HashAggregate (36)
   +- Exchange (35)
      +- HashAggregate (34)
         +- HashAggregate (33)
            +- Exchange (32)
               +- HashAggregate (31)
                  +- Expand (30)
                     +- Project (29)
                        +- SortMergeJoin LeftOuter (28)
                           :- Sort (22)
                           :  +- Exchange (21)
                           :     +- Project (20)
                           :        +- BroadcastHashJoin Inner BuildRight (19)
                           :           :- BroadcastHashJoin LeftOuter BuildRight (14)
                           :           :  :- Project (8)
                           :           :  :  +- BroadcastHashJoin Inner BuildRight (7)
                           :           :  :     :- Filter (2)
                           :           :  :     :  +- Scan hive ods.rpt_unit_trackout_detail (1)
                           :           :  :     +- BroadcastQueryStage (6)
                           :           :  :        +- BroadcastExchange (5)
                           :           :  :           +- * Filter (4)
                           :           :  :              +- Scan hive ods.rpt_unit_trackin_detail (3)
                           :           :  +- BroadcastQueryStage (13)
                           :           :     +- BroadcastExchange (12)
                           :           :        +- * Project (11)
                           :           :           +- * Filter (10)
                           :           :              +- Scan hive ods.rpt_process_temp_sn_info_sip (9)
                           :           +- BroadcastQueryStage (18)
                           :              +- BroadcastExchange (17)
                           :                 +- * Filter (16)
                           :                    +- Scan hive ods.wf_eqptestabb (15)
                           +- Sort (27)
                              +- ShuffleQueryStage (26)
                                 +- Exchange (25)
                                    +- * Filter (24)
                                       +- Scan hive ods.wf_eqptestabbdetial (23)
+- == Initial Plan ==
   HashAggregate (62)
   +- Exchange (61)
      +- HashAggregate (60)
         +- HashAggregate (59)
            +- Exchange (58)
               +- HashAggregate (57)
                  +- Expand (56)
                     +- Project (55)
                        +- SortMergeJoin LeftOuter (54)
                           :- Sort (50)
                           :  +- Exchange (49)
                           :     +- Project (48)
                           :        +- BroadcastHashJoin Inner BuildRight (47)
                           :           :- BroadcastHashJoin LeftOuter BuildRight (44)
                           :           :  :- Project (40)
                           :           :  :  +- BroadcastHashJoin Inner BuildRight (39)
                           :           :  :     :- Filter (2)
                           :           :  :     :  +- Scan hive ods.rpt_unit_trackout_detail (1)
                           :           :  :     +- BroadcastExchange (38)
                           :           :  :        +- Filter (37)
                           :           :  :           +- Scan hive ods.rpt_unit_trackin_detail (3)
                           :           :  +- BroadcastExchange (43)
                           :           :     +- Project (42)
                           :           :        +- Filter (41)
                           :           :           +- Scan hive ods.rpt_process_temp_sn_info_sip (9)
                           :           +- BroadcastExchange (46)
                           :              +- Filter (45)
                           :                 +- Scan hive ods.wf_eqptestabb (15)
                           +- Sort (53)
                              +- Exchange (52)
                                 +- Filter (51)
                                    +- Scan hive ods.wf_eqptestabbdetial (23)

多次的 Broadcast 且数据量大,如下图

在这里插入图片描述

解决

我司做的通用性解决方案,SQL 并不能手动优化(SQL 自动生成的,所以其它方案否定)

# 关闭 broadcast 功能
spark.sql.autoBroadcastJoinThreshold=-1

在这里插入图片描述

再执行 SQL

在这里插入图片描述

结束

spark sql broadcast 问题得到解决。


原文地址:https://blog.csdn.net/2301_79691134/article/details/136450600

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!