首页 > 解决方案 > 为什么我们在切换到 EMRFS 一致视图后会看到 parquet 写入错误?

问题描述

我们在 EMR 集群上运行了一个大型 ETL 进程,该进程将大量 parquet 文件读写到 S3 存储桶中

这是代码:

a = spark.read.parquet(path1)
a.registerTempTable('a')
b = spark.read.parquet(path2)
b.registerTempTable('b')
c = spark.read.parquet(path3)
c.registerTempTable('c')

sql = '''
select
a.col1,
a.col2,
b.col1,
b.col2,
c.col1,
c.col2,
a.dt
from
a
join 
b
on
a.dt = b.dt
join
c
on
a.dt = c.dt
''''

df_out = spark.sql(sql)

df_out.repartition('dt').write.parquet( path_out, partitionBy='dt', mode='overwrite')

我们最近不得不切换到瞬态集群,因此不得不开始使用一致的视图。我在下面放置我们的 ERMFS 站点设置:

{
"fs.s3.enableServerSideEncryption": "true",
"fs.s3.consistent": "false",
"fs.s3.consistent.retryPeriodSeconds": "10",
"fs.s3.serverSideEncryption.kms.keyId": "xxxxxx",
"fs.s3.consistent.retryCount": "5",
"fs.s3.consistent.metadata.tableName": "xxxxx",
"fs.s3.consistent.throwExceptionOnInconsistency": "true"
}

在启用一致视图的瞬态集群上运行具有相同 spark 配置的相同代码(适用于永久集群)会导致错误。

...
19/02/25 23:01:23 DEBUG S3NativeFileSystem: getFileStatus could not find key 'xxxxxREDACTEDxxxx'
19/02/25 23:01:23 DEBUG S3NativeFileSystem: Delete called for 'xxxxxREDACTEDxxxx' but file does not exist, so returning false
19/02/25 23:01:23 DEBUG DFSClient: DFSClient writeChunk allocating new packet seqno=465, src=/var/log/spark/apps/application_1551126537652_0003.inprogress, packetSize=65016, chunksPerPacket=126, bytesCurBlock=25074688
19/02/25 23:01:23 DEBUG DFSClient: DFSClient flush(): bytesCurBlock=25081892 lastFlushOffset=25075161 createNewBlock=false
19/02/25 23:01:23 DEBUG DFSClient: Queued packet 465
19/02/25 23:01:23 DEBUG DFSClient: Waiting for ack for: 465
19/02/25 23:01:23 DEBUG DFSClient: DataStreamer block BP-75703405-10.13.32.237-1551126523840:blk_1073741876_1052 sending packet packet seqno: 465 offsetInBlock: 25074688 lastPacketInBlock: false lastByteOffsetInBlock: 25081892
19/02/25 23:01:23 DEBUG DFSClient: DFSClient seqno: 465 reply: SUCCESS downstreamAckTimeNanos: 0 flag: 0
Traceback (most recent call last):
File "xxxxxREDACTEDxxxx", line 112, in <module>
main()
File "xxxxxREDACTEDxxxx", line xxxxxREDACTEDxxxx, in main
xxxxxREDACTEDxxxx
File "xxxxxREDACTEDxxxx", line 70, in main
partitionBy='dt', mode='overwrite')
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 691, in parquet
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o232.parquet.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:215)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)

我怀疑该错误是由于 EMRFS 的设置引起的,但我找不到任何可行的 EMRFS 设置。在我运行上面的代码时,唯一导致此错误不会被抛出的事情是将节点的数量增加到正常数量的两倍。如果我减少数据量,也不会引发错误。

改变输出通勤和火花投机也无济于事。

非常感谢。对于任何想法/建议,我都会非常满意。

标签: apache-sparkamazon-s3pysparkamazon-emr

解决方案


"fs.s3.consistent": "false" 应该为 true 才能使 emrfs 一致视图正常工作


推荐阅读