watson-studio - Watson Studio“Spark 环境” - 如何增加`spark.driver.maxResultSize`?
问题描述
我正在运行一个 spark 作业,我正在读取、操作并将大量 txt 文件合并到一个文件中,但我遇到了这个问题:
Py4JJavaError:调用 o8483.collectToPython 时出错。:org.apache.spark.SparkException:作业因阶段失败而中止:838个任务的序列化结果的总大小(1025.6 MB)大于spark.driver.maxResultSize(1024.0 MB)
是否可以增加尺寸spark.driver.maxResultSize
?
注意:这个问题是关于 WS Spark“环境”而不是关于分析引擎。
解决方案
如果您使用“Analytics Engine”火花集群实例,您可以通过 Ambari 控制台增加默认值。您可以从 console.bluemix.net 中的 IAE 实例获取到 Ambari 控制台的链接和凭据。在 Ambari 控制台中,添加一个新属性
Spark2 -> “自定义 spark2-defaults” -> 添加属性 -> spark.driver.maxResultSize = 2GB
确保 spark.driver.maxResultSize 值小于设置的驱动程序内存
Spark2 -> “高级 spark2-env” -> 内容 -> SPARK_DRIVER_MEMORY
如果您只是尝试创建单个 CSV 文件并且不想更改 spark conf 值,因为您不知道最终文件有多大,那么另一个建议是使用如下函数,该函数使用 hdfs getmerge 函数像 pandas 一样创建一个 csv 文件。
def writeSparkDFAsCSV_HDFS(spark_df, file_location,file_name, csv_sep=',', csv_quote='"'):
"""
It can be used to write large spark dataframe as a csv file without running
into memory issues while converting to pandas dataframe.
It first writes the spark df to a temp hdfs location and uses getmerge to create
a single file. After adding a header, the merged file is moved to hdfs.
Args:
spark_df (spark dataframe) : Data object to be written to file.
file_location (String) : Directory location of the file.
file_name (String) : Name of file to write to.
csv_sep (character) : Field separator to use in csv file
csv_quote (character) : Quote character to use in csv file
"""
# define temp and final paths
file_path= os.path.join(file_location,file_name)
temp_file_location = tempfile.NamedTemporaryFile().name
temp_file_path = os.path.join(temp_file_location,file_name)
print("Create directories")
#create directories if not exist in both local and hdfs
!mkdir $temp_file_location
!hdfs dfs -mkdir $file_location
!hdfs dfs -mkdir $temp_file_location
# write to temp hdfs location
print("Write to temp hdfs location : {}".format("hdfs://" + temp_file_path))
spark_df.write.csv("hdfs://" + temp_file_path, sep=csv_sep, quote=csv_quote)
# merge file from hadoop to local
print("Merge and put file at {}".format(temp_file_path))
!hdfs dfs -getmerge $temp_file_path $temp_file_path
# Add header to the merged file
header = ",".join(spark_df.columns)
!rm $temp_file_location/.*crc
line_prepender(temp_file_path, header)
#move the final file to hdfs
!hdfs dfs -put -f $temp_file_path $file_path
#cleanup temp locations
print("Cleanup..")
!rm -rf $temp_file_location
!hdfs dfs -rm -r $temp_file_location
print("Done!")
推荐阅读
- java - 仅获取 TCP 客户端发送的问题
- sockets - websockets 与 TCP 和 UDP 有何不同?
- powershell - 禁用在 PowerShell 中调用命令后打印的横幅消息
- ios - 如何修复 [!] 在 `.symlinks/plugins/flutter_test/ios` 中找不到 `flutter_test` 的 podspec
- spring - Spring env,我是否仍应在新项目中使用 Hystrix
- node.js - Gatsby:导航链接到子页面上的顶级路由(使用 createPage 创建)不起作用
- google-app-engine - 替代谷歌云功能
- python - 如何避免在 Keras ImageDataGenerator 的验证拆分中增加数据?
- microsoft-graph-api - Teams Graph API 知道某人何时加入了在线会议
- mongodb - 修改mongoose查询数据依赖输入数组