apache-spark - Spark - 如何将约 20TB 的数据从 DataFrame 写入配置单元表或 hdfs?
问题描述
我正在使用 Spark 处理超过 20TB 的数据量。我正在尝试使用以下命令将数据写入 Hive 表:
df.registerTempTable('temporary_table')
sqlContext.sql("INSERT OVERWRITE TABLE my_table SELECT * FROM temporary_table")
df
Spark DataFrame在哪里。不幸的是,它没有我可以划分的任何日期。当我运行上面的代码时,我遇到了错误信息:
py4j.protocol.Py4JJavaError: 调用 z:org.apache.spark.sql.execution.EvaluatePython.takeAndServe 时出错。:org.apache.spark.SparkException:作业因阶段失败而中止:95561 个任务的序列化结果的总大小(1024.0 MB)大于 spark.driver.maxResultSize(1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply$mcI$sp(python.scala:126) at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply(python.scala:124) at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply(python.scala:124) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087) at org.apache.spark.sql.execution.EvaluatePython$.takeAndServe(python.scala:124) at org.apache.spark.sql.execution.EvaluatePython.takeAndServe(python.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745)
错误消息似乎也取决于数据量。使用稍微小的数据,我遇到了以下错误消息
地图输出状态为 395624469 字节,超过 spark.akka.frameSize(134217728 字节)。
实现这一目标的更实际的方法是什么(如果任务可行)?我正在使用 Spark 1.6。
以下是提交 spark 作业时的配置变量:
spark-submit --deploy-mode cluster --master yarn
--executor-memory 20G
--num-executors 500
--driver-memory 64g
--driver-cores 8
--files 'my_script.py'
顺便说一句,我天真地想象当写操作发生时,Spark 会将数据从执行程序写入 hdfs。但是错误信息似乎暗示执行者和驱动程序之间存在一些数据传输?
我对 Spark 的了解很浅,所以请原谅我的愚蠢问题!
解决方案
Check following configuration and modify as per your need ,default values are 1 g
- set by SparkConf: conf.set("spark.driver.maxResultSize", "10g")
- set by spark-defaults.conf: spark.driver.maxResultSize 10g
set when calling spark-submit: --conf spark.driver.maxResultSize=10g
推荐阅读
- error-handling - while 循环中出现不支持的操作数类型错误
- flutter - Flutter 1.17 更新后有人遇到类似问题吗?无法弄清楚是否是由于更新引起的问题
- python-3.x - 将多个 pandas 数据框作为具有多个工作表的单个 excel 文件上传到 Google Cloud Storage
- capl - 使用符号常量的 CAPL 中的数组长度
- python - 我是否需要手动销毁对象(如像素图)?
- amazon-web-services - AWS CLI --debug ouputs to stderror instead of stdout
- java - 我需要在我的 pom.xml 中添加什么来运行这个特定的 Java 文件?
- vim - Vim点链函数缩进问题
- pushdown-automaton - 下图所示的 PDA 接受的语言是什么
- html - 边框和内联标签有问题