首页 > 解决方案 > Spark结构化流式maxOffsetsPerTrigger似乎不起作用

问题描述

我遇到了一个 Spark 结构化流 (SSS) 应用程序的问题,该应用程序由于程序错误而崩溃,并且在周末没有处理。当我重新启动它时,有很多关于主题的消息要重新处理(大约 250'000 条消息,每个主题需要加入 3 个主题)。

重新启动时,应用程序再次崩溃并出现 OutOfMemory 异常。我从文档中了解到,maxOffsetsPerTrigger在这些情况下,读取流上的配置应该会有所帮助。我将 PySpark 代码(在 SSS 2.4.3 btw 上运行)更改为所有 3 个主题的内容如下

 rawstream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .option("subscribe", topicName)
    .option("maxOffsetsPerTrigger", 10000L)
    .option("startingOffsets", "earliest")
    .load()

我的期望是,现在 SSS 查询将从每个主题加载约 33'000 个偏移量,并在第一批中加入它们。然后在第二批中,它会清理第一批中的状态记录,由于水印(这将清理第一批中的大部分记录),然后从每个主题中再读取约 33k 条记录。因此,在大约 8 个批次之后,它应该已经处理了滞后,并具有“合理”的内存量。

但是应用程序仍然因 OOM 而崩溃,当我在应用程序主 UI 中检查 DAG 时,它报告它再次尝试读取所有 250'000 条消息。

还有什么我需要配置的吗?我如何检查这个选项是否真的被使用了?(当我检查计划时,不幸的是它被截断并只显示(Options: [includeTimestamp=true,subscribe=IN2,inferSchema=true,failOnDataLoss=false,kafka.b...),我不知道如何显示点后面的部分)

标签: apache-sparkspark-structured-streaming

解决方案


推荐阅读