首页 > 解决方案 > IllegalArgumentException:未知消息类型:9 读取“delta”文件时

问题描述

我在我的项目中使用 <spark.version>3.1.2</spark.version> 和“delta”湖 io.delta:delta-core_2.12:1.0.0。

在阅读“delta”文件时,我遇到了 IllegalArgumentException: Unknown message type: 9 error

java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 4 ($anonfun$apply$2 at DatabricksLogging.scala:77) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: java.lang.IllegalArgumentException: Unknown message type: 9  at org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder.fromByteBuffer(BlockTransferMessage.java:71)  at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:80)    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)     at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)     at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)    ... 1 more 
    at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
    at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
    at com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
    at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:464)
    at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:401)
    at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog$lzycompute(DeltaTableV2.scala:73)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:177)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:305)
    at scala.Option.map(Option.scala:230)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:265)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 4 ($anonfun$apply$2 at DatabricksLogging.scala:77) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: java.lang.IllegalArgumentException: Unknown message type: 9    at org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder.fromByteBuffer(BlockTransferMessage.java:71)  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)  at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unknown message type: 9     at org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder.fromByteBuffer(BlockTransferMessage.java:71)  at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)    ... 1 more 
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)

我正在提交如下火花工作

export SPARK_HOME=/spark-3.1.2-bin-hadoop3.2
$SPARK_HOME/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--packages org.apache.hadoop:hadoop-aws:2.9.2,io.delta:delta-core_2.12:1.0.0,org.apache.hudi:hudi-spark-bundle_2.12:0.6.0

这里有什么问题?有什么线索吗?任何帮助都受到高度评价。

标签: apache-sparkapache-spark-sqldatabricksdelta-lake

解决方案


我遇到了与https://issues.apache.org/jira/browse/SPARK-33093类似的东西

Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unknown message type: 9
at org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder.fromByteBuffer(BlockTransferMessage.java:71)
at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:81)
at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:157)

以下内容也帮助了我

spark.shuffle.useOldFetchProtocol=true

推荐阅读