apache-spark - 使用 PySpark 生成 6000 万个 JSON 文件时出现 OutOfMemoryError
问题描述
通过 jdbc 连接,我能够使用下面的 PySpark 代码从 Oracle db 成功生成 6000 万条记录 CSV 文件。
然后现在我想要以 JSON 格式输出,所以我添加了这行代码:df1.toPandas().to_json("/home/user1/empdata.json", orient='records')
,但是在生成 json 时我得到了 OutOfMemoryError。
如果需要任何代码更改,请任何人推荐我,请。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Emp data Extract") \
.config("spark.some.config.option", " ") \
.getOrCreate()
def generateData():
try:
jdbcUrl = "jdbc:oracle:thin:USER/pwd@//hostname:1521/dbname"
jdbcDriver = "oracle.jdbc.driver.OracleDriver"
df1 = spark.read.format('jdbc').options(url=jdbcUrl, dbtable="(SELECT * FROM EMP) alias1", driver=jdbcDriver, fetchSize="2000").load()
#df1.coalesce(1).write.format("csv").option("header", "true").save("/home/user1/empdata" , index=False)
df1.toPandas().to_json("/home/user1/empdata.json", orient='records')
except Exception as err:
print(err)
raise
# finally:
# conn.close()
if __name__ == '__main__':
generateData()
错误日志:
2019-04-15 05:17:06 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
[Stage 0:> (0 + 1) / 1]2019-04-15 05:20:22 ERROR Executor:91 - Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:552)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:256)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-04-15 05:20:22 ERROR SparkUncaughtExceptionHandler:91 - Uncaught exception in thread Thread[Executor task launch worker for task 0,5,main]
java.lang.OutOfMemoryError: Java heap space
根据管理员的要求,我正在更新我的评论:这是一些不同的问题,也存在其他内存输出问题,但在不同的情况下会出现。错误可能相同,但问题不同。就我而言,我得到了大量数据。
解决方案
如果你想以 JSON 格式保存,你应该使用 Spark 的 write 命令——你目前所做的是将所有数据带到驱动程序并尝试将其加载到 pandas 数据帧中
df1.write.format('json').save('/path/file_name.json')
如果你需要一个文件,你可以试试
df1.coalesce(1).write.format('json').save('/path/file_name.json')
推荐阅读
- web-services - 寻找示例或指南或链接以了解 JAX-WS 1.2 项目
- listview - 小部件进入列表视图或颤动中的数据
- reactjs - 包括 id 的 ReactJs 路由路径返回不正确的组件
- c++ - 如何防止 CMAKE 在构建时为共享库创建符号链接(不在安装时)
- github - 仅在阻止拉取请求时手动运行 GitHub Actions
- sql - 在 MS Access 中查询分组数据
- gitlab - 更改 gitlab markdown 美人鱼图大小
- javascript - React Native“TypeError:Object(...)不是函数”反应导航堆栈错误
- javascript - 如何在组标题中添加可点击按钮?
- python - 代码正在执行应该在初始语句中被拒绝的过程,我不知道为什么