apache-spark - Spark结构化流式maxOffsetsPerTrigger似乎不起作用
问题描述
我遇到了一个 Spark 结构化流 (SSS) 应用程序的问题,该应用程序由于程序错误而崩溃,并且在周末没有处理。当我重新启动它时,有很多关于主题的消息要重新处理(大约 250'000 条消息,每个主题需要加入 3 个主题)。
重新启动时,应用程序再次崩溃并出现 OutOfMemory 异常。我从文档中了解到,maxOffsetsPerTrigger
在这些情况下,读取流上的配置应该会有所帮助。我将 PySpark 代码(在 SSS 2.4.3 btw 上运行)更改为所有 3 个主题的内容如下
rawstream = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topicName)
.option("maxOffsetsPerTrigger", 10000L)
.option("startingOffsets", "earliest")
.load()
我的期望是,现在 SSS 查询将从每个主题加载约 33'000 个偏移量,并在第一批中加入它们。然后在第二批中,它会清理第一批中的状态记录,由于水印(这将清理第一批中的大部分记录),然后从每个主题中再读取约 33k 条记录。因此,在大约 8 个批次之后,它应该已经处理了滞后,并具有“合理”的内存量。
但是应用程序仍然因 OOM 而崩溃,当我在应用程序主 UI 中检查 DAG 时,它报告它再次尝试读取所有 250'000 条消息。
还有什么我需要配置的吗?我如何检查这个选项是否真的被使用了?(当我检查计划时,不幸的是它被截断并只显示(Options: [includeTimestamp=true,subscribe=IN2,inferSchema=true,failOnDataLoss=false,kafka.b...)
,我不知道如何显示点后面的部分)
解决方案
推荐阅读
- dask - Dask 字符串连接聚合
- python - 您可以扫描目录以查找文件名以发现它是什么类型的文件吗?
- ocaml - 将负号绑定到 ocaml 中的值
- php - PHP json_encode - 是否有任何输入会引发错误?
- python - 如何将列表/数组中的列填充到只有列名的空 Pandas 数据框中
- javascript - 反应钩子 - 不能在普通异步函数中使用钩子?
- c++ - 在 C++ 中通过 ODBC 连接到 Azure SQL 数据库时出现问题
- awk - 生成包含 LINK1 元素的 MDF 有限元网格
- angular - 我找不到 Angular 版本的剑道网格刷新按钮功能
- java - 如何使用 Docx4j 获取 XSLX 表的最新行号和最新列号