spark sql broadcast 问题
概述
Cannot broadcast the table over … 问题解决
问题
生产中遇到以下问题,以前没有遇到,是因为此次的环境,数据量级激增10多个倍。
2024-03-04 10:36:07,194 INFO operation.ExecuteStatement: Processing root's query[c5423a21-aeca-4fb1-9047-e34eb9bcd103]: RUNNING_STATE -> ERROR_STATE, time taken: 30.172 seconds
2024-03-04 10:36:07.218 INFO org.apache.kyuubi.operation.ExecuteStatement: Query[d6e0ef6d-bdf8-4270-96f9-d9b6ab48a266] in ERROR_STATE
2024-03-04 10:36:07.218 INFO org.apache.kyuubi.operation.ExecuteStatement: Processing root's query[d6e0ef6d-bdf8-4270-96f9-d9b6ab48a266]: RUNNING_STATE -> ERROR_STATE, time taken: 30.196 seconds
Error: org.apache.kyuubi.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: Error operating ExecuteStatement: org.apache.spark.SparkException: Cannot broadcast the table over 357913941 rows: 478354592 rows
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotBroadcastTableOverMaxTableRowsError(QueryExecutionErrors.scala:1638)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:141)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:185)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.$anonfun$applyOrElse$1(SparkOperation.scala:189)
at org.apache.kyuubi.Utils$.withLockRequired(Utils.scala:395)
at org.apache.kyuubi.operation.AbstractOperation.withLockRequired(AbstractOperation.scala:51)
at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.applyOrElse(SparkOperation.scala:177)
at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.applyOrElse(SparkOperation.scala:172)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:88)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$withLocalProperties$1(SparkOperation.scala:155)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.kyuubi.engine.spark.operation.SparkOperation.withLocalProperties(SparkOperation.scala:139)
at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.executeStatement(ExecuteStatement.scala:78)
at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:100)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.SparkException: Cannot broadcast the table over 357913941 rows: 478354592 rows
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotBroadcastTableOverMaxTableRowsError(QueryExecutionErrors.scala:1638)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:141)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:185)
... 4 more
at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
at org.apache.kyuubi.operation.ExecuteStatement.waitStatementComplete(ExecuteStatement.scala:129)
at org.apache.kyuubi.operation.ExecuteStatement.$anonfun$runInternal$1(ExecuteStatement.scala:161)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) (state=,code=0)
看一下执行计划
== Physical Plan ==
AdaptiveSparkPlan (63)
+- == Current Plan ==
HashAggregate (36)
+- Exchange (35)
+- HashAggregate (34)
+- HashAggregate (33)
+- Exchange (32)
+- HashAggregate (31)
+- Expand (30)
+- Project (29)
+- SortMergeJoin LeftOuter (28)
:- Sort (22)
: +- Exchange (21)
: +- Project (20)
: +- BroadcastHashJoin Inner BuildRight (19)
: :- BroadcastHashJoin LeftOuter BuildRight (14)
: : :- Project (8)
: : : +- BroadcastHashJoin Inner BuildRight (7)
: : : :- Filter (2)
: : : : +- Scan hive ods.rpt_unit_trackout_detail (1)
: : : +- BroadcastQueryStage (6)
: : : +- BroadcastExchange (5)
: : : +- * Filter (4)
: : : +- Scan hive ods.rpt_unit_trackin_detail (3)
: : +- BroadcastQueryStage (13)
: : +- BroadcastExchange (12)
: : +- * Project (11)
: : +- * Filter (10)
: : +- Scan hive ods.rpt_process_temp_sn_info_sip (9)
: +- BroadcastQueryStage (18)
: +- BroadcastExchange (17)
: +- * Filter (16)
: +- Scan hive ods.wf_eqptestabb (15)
+- Sort (27)
+- ShuffleQueryStage (26)
+- Exchange (25)
+- * Filter (24)
+- Scan hive ods.wf_eqptestabbdetial (23)
+- == Initial Plan ==
HashAggregate (62)
+- Exchange (61)
+- HashAggregate (60)
+- HashAggregate (59)
+- Exchange (58)
+- HashAggregate (57)
+- Expand (56)
+- Project (55)
+- SortMergeJoin LeftOuter (54)
:- Sort (50)
: +- Exchange (49)
: +- Project (48)
: +- BroadcastHashJoin Inner BuildRight (47)
: :- BroadcastHashJoin LeftOuter BuildRight (44)
: : :- Project (40)
: : : +- BroadcastHashJoin Inner BuildRight (39)
: : : :- Filter (2)
: : : : +- Scan hive ods.rpt_unit_trackout_detail (1)
: : : +- BroadcastExchange (38)
: : : +- Filter (37)
: : : +- Scan hive ods.rpt_unit_trackin_detail (3)
: : +- BroadcastExchange (43)
: : +- Project (42)
: : +- Filter (41)
: : +- Scan hive ods.rpt_process_temp_sn_info_sip (9)
: +- BroadcastExchange (46)
: +- Filter (45)
: +- Scan hive ods.wf_eqptestabb (15)
+- Sort (53)
+- Exchange (52)
+- Filter (51)
+- Scan hive ods.wf_eqptestabbdetial (23)
多次的 Broadcast 且数据量大,如下图
解决
我司做的通用性解决方案,SQL 并不能手动优化(SQL 自动生成的,所以其它方案否定)
# 关闭 broadcast 功能
spark.sql.autoBroadcastJoinThreshold=-1
再执行 SQL
结束
spark sql broadcast 问题得到解决。
原文地址:https://blog.csdn.net/2301_79691134/article/details/136450600
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!