首页 > 解决方案 > 如何避免在火花流中排队

问题描述

我有直接流媒体的火花流,我正在使用下面的配置

批处理间隔 60s

spark.streaming.kafka.maxRatePerPartition 42

auto.offset.reset 最早

当我开始使用最早选项的流式批处理时,为了更快地使用来自 Kafka 的消息并减少延迟,我将 spark.streaming.kafka.maxRatePerPartition 保持为 42。所以它应该消耗 42 x 60s x 60 分区 = 每批 151200 条记录.

我这里有两个问题

  1. 我看到这两个初始批次正确地消耗了 151200 条记录,尽管有很多记录要从 kafka 消耗,但在后面的批次中逐渐减少。请看下面的截图。可能是什么原因
  2. 我看到批次正在排队很多。我们怎样才能避免这种情况。

是否有可能实现以下场景我们将批处理间隔设置为 60s,如果每个批处理在 60s 内运行,则下一个批处理可以按时开始。如果一个批次的时间超过 60 秒,我们不希望下一批来排队。现有运行完成后,下一次运行可以通过选取该时间之前的记录来开始。这样我们就不会有滞后,也不会排队。

Spark UI - 问题 1 的屏幕截图

标签: apache-sparkapache-kafkaspark-streamingspark-kafka-integration

解决方案


您观察到的是 Spark 的背压机制的预期行为。

您已将配置设置spark.streaming.kafka.maxRatePerPartition为 42,并且根据您的计算,作业将开始获取

42 * 60 partitions * 60 seconds batch interval = 151200 records per batch

查看所附屏幕截图中的时间(处理时间),该作业从该数量的记录开始。

但是,由于处理所有这 151200 条记录需要超过 60 秒的时间,因此背压机制将减少后续批次中的输入记录。这仅在几批之后才会发生,因为背压机制(又名“PID 控制器”)需要等到第一批完成,以便它可以使用该经验来估计下一个间隔的输入记录数。如前所述,处理第一个 151200 所花费的时间比一个间隔要长,这意味着随后的两个间隔已经使用 maxRatePerPartition 进行了安排,而没有完成批处理间隔的经验。

这就是为什么您看到输入记录仅在第四批中减少。然后,输入记录的数量仍然太多,无法在 60 秒内处理,因此作业建立了越来越多的延迟,PID 控制器(背压)最终意识到它落后于许多记录,并正在大幅减少输入记录的数量到由 设置的最小值spark.streaming.backpressure.pid.minRate。在您的情况下,此值似乎设置为 2,这导致每个批次间隔 2 * 60 * 60 = 7200 条记录。

总而言之,您观察到的是预期和预期的行为。Streaming 作业需要一些批次来了解和了解它应该从 Kafka 获取多少数据以适应给定的(非灵活的)60 秒的批次间隔。无论一个批次中的处理时间需要多长时间,您的流式传输作业都将每隔 60 秒提前计划下一个批次。

你可以做什么:

  • 建议将其设置maxRatePerPartition为实际容量的 150-200% 左右。只要让工作运行一段时间,您就会看到估计的 100% 会是什么。
  • 当您在 Kafka 中使用 60 个分区时,您需要确保数据在分区中均匀分布。只有这样 maxRatePer分区才会做你打算做的事情
  • 拥有 60 个分区,您可以在 Spark 集群中使用 60 个内核来获得最大的消耗速度。

推荐阅读