apache-spark - 如何避免在火花流中排队
问题描述
我有直接流媒体的火花流,我正在使用下面的配置
批处理间隔 60s
spark.streaming.kafka.maxRatePerPartition 42
auto.offset.reset 最早
当我开始使用最早选项的流式批处理时,为了更快地使用来自 Kafka 的消息并减少延迟,我将 spark.streaming.kafka.maxRatePerPartition 保持为 42。所以它应该消耗 42 x 60s x 60 分区 = 每批 151200 条记录.
我这里有两个问题
- 我看到这两个初始批次正确地消耗了 151200 条记录,尽管有很多记录要从 kafka 消耗,但在后面的批次中逐渐减少。请看下面的截图。可能是什么原因
- 我看到批次正在排队很多。我们怎样才能避免这种情况。
是否有可能实现以下场景我们将批处理间隔设置为 60s,如果每个批处理在 60s 内运行,则下一个批处理可以按时开始。如果一个批次的时间超过 60 秒,我们不希望下一批来排队。现有运行完成后,下一次运行可以通过选取该时间之前的记录来开始。这样我们就不会有滞后,也不会排队。
解决方案
您观察到的是 Spark 的背压机制的预期行为。
您已将配置设置spark.streaming.kafka.maxRatePerPartition
为 42,并且根据您的计算,作业将开始获取
42 * 60 partitions * 60 seconds batch interval = 151200 records per batch
查看所附屏幕截图中的时间(处理时间),该作业从该数量的记录开始。
但是,由于处理所有这 151200 条记录需要超过 60 秒的时间,因此背压机制将减少后续批次中的输入记录。这仅在几批之后才会发生,因为背压机制(又名“PID 控制器”)需要等到第一批完成,以便它可以使用该经验来估计下一个间隔的输入记录数。如前所述,处理第一个 151200 所花费的时间比一个间隔要长,这意味着随后的两个间隔已经使用 maxRatePerPartition 进行了安排,而没有完成批处理间隔的经验。
这就是为什么您看到输入记录仅在第四批中减少。然后,输入记录的数量仍然太多,无法在 60 秒内处理,因此作业建立了越来越多的延迟,PID 控制器(背压)最终意识到它落后于许多记录,并正在大幅减少输入记录的数量到由 设置的最小值spark.streaming.backpressure.pid.minRate
。在您的情况下,此值似乎设置为 2,这导致每个批次间隔 2 * 60 * 60 = 7200 条记录。
总而言之,您观察到的是预期和预期的行为。Streaming 作业需要一些批次来了解和了解它应该从 Kafka 获取多少数据以适应给定的(非灵活的)60 秒的批次间隔。无论一个批次中的处理时间需要多长时间,您的流式传输作业都将每隔 60 秒提前计划下一个批次。
你可以做什么:
- 建议将其设置
maxRatePerPartition
为实际容量的 150-200% 左右。只要让工作运行一段时间,您就会看到估计的 100% 会是什么。 - 当您在 Kafka 中使用 60 个分区时,您需要确保数据在分区中均匀分布。只有这样 maxRatePer分区才会做你打算做的事情
- 拥有 60 个分区,您可以在 Spark 集群中使用 60 个内核来获得最大的消耗速度。
推荐阅读
- python - 如何更新python上的行
- javascript - 从 React 中的相同元素方法获取属性值
- c# - 类型存在于错误共享项目引用中,并在构建时为每个项目提供它自己的库的 DLL 构建
- javascript - React native ref Property 'ref' 在类型'IntrinsicAttributes & 上不存在
- javascript - InvalidStateError:无法在“HTMLInputElement”上设置“value”属性:带有中间模型和服务的 HTTP 请求
- javascript - 将数组映射到下拉菜单
- python - 如何使用 PIL 制作 GIF 的深层副本?
- python - 单击使用 Selenium 启用 ember.js 的元素
- amazon-ec2 - 创建具有可扩展存储的网站/服务器的最佳方式
- c# - MongoDb RunCommand 查询嵌入式数组