pyspark - 在 Databricks 上使用带有火花流 (pyspark) 的检查点的 OOM 和数据丢失问题
问题描述
我在databricks上使用带有火花流的检查点遇到了许多问题。下面的代码导致我们的集群出现 OOM 错误。调查集群的内存使用情况,我们可以看到随着时间的推移内存在缓慢增加,这表明内存泄漏(OOM 前约 10 天,而批处理只持续几分钟)。删除检查点以便创建新的检查点后,内存泄漏消失了,表明错误源于检查点。在类似的流式传输作业中,我们还遇到了一些数据从未被处理过的问题(再次,在重新创建检查点后修复)。
免责声明:我不完全理解检查点的深入行为,因为在线文档是回避的。因此,我不确定我的配置是否良好。
以下是该问题的一个最小示例:
pyspark 3.0.1,python 3.7
集群的 json conf 具有以下元素:
"spark_conf": {
"spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite": "true",
"spark.databricks.delta.properties.defaults.autoOptimize.autoCompact": "true"
}
代码:
import pandas as pd
from pyspark.sql import functions as F
def for_each_batch(data, epoch_id):
pass
spark.readStream.format("delta").load("path/to/delta").filter(
F.col("TIME") > pd.Timestamp.utcnow() - pd.Timedelta(hours=1)
).writeStream.option(
"ignoreChanges", "true"
).option(
"checkpointLocation", "path/to/checkpoint"
).trigger(
processingTime="3 minutes"
).foreachBatch(
for_each_batch
).start()
PS:如果函数'for_each_batch'的内容或者过滤条件改变了,我应该重新创建检查点吗?
解决方案
推荐阅读
- graph - SWI Prolog中无向图的最大空子图
- c# - 无法在 Visual Studio 中使用 Mono.Data.Sqlite.dll
- sql - 如何在 Oracle 的 where 子句中使用用户定义的函数?
- react-native - Expo App每次扫描时都会崩溃
- javafx - 引用位于另一个控制器中的对象时出错“原因:java.lang.NullPointerException”
- c# - Datagridview 列不接受窗口窗体中的空间值
- java - 如何在 JPA 中为 ElementCollection 的列创建外键?
- python - 计算 GeoPandas 中两个 GeoDataFrame(点)之间的所有距离
- reactjs - 找不到模块“react-split”的声明文件
- python - Sagemaker 脚本模式培训:如何在培训脚本中导入自定义模块?