首页 > 解决方案 > 如何使用 Lagom Kafka Message Broker API 安全地跳过消息?

问题描述

我们定义了一个基本订阅者,它通过抛出异常并依靠 Akka Streams 的流监督来恢复失败的消息(即出于某些业务逻辑原因,我们不会处理)Flow

someLagomService
  .someTopic()
  .subscribe
  .withGroupId("lagom-service")
  .atLeastOnce(
    Flow[Int]
      .mapAsync(1)(el => {
        // Exception may occur here or can map to Done
      })
      .withAttributes(ActorAttributes.supervisionStrategy({
        case t =>
          Supervision.Resume
      })
  )

这似乎适用于负载非常小的基本用例,但我们注意到对于大量消息非常奇怪的事情(例如:非常频繁地重新处理消息等)。

深入研究代码,我们看到 Lagom 的broker.Subscriber.atLeastOnce文档指出:

可能会从上游提取更多元素,但它必须为接收到的每条消息flow发出一条消息。Done它还必须以与接收消息相同的顺序发出它们。这意味着flow不能过滤或收集消息的子集,而是必须将消息拆分为单独的流并将那些将被删除的流映射到Done.

此外,在 Lagom 的 impl 中KafkaSubscriberActor,我们看到 private 的 implatLeastOnce本质上解压缩了消息有效负载和偏移量,然后在我们的用户流将消息映射到Done.

上面这两个花絮似乎暗示,通过使用流管理器和跳过元素,我们最终可能会出现可提交偏移量不再与Done每个 Kafka 消息生成的 s 均匀压缩的情况。

示例:如果我们流式传输 1、2、3、4 并将 1、2 和 4 映射到Done但在 3 上抛出异常,我们有 3 Dones 和 4 个可提交偏移量?

使用 Lagom 1.4.10

标签: scalaapache-kafkaakkalagom

解决方案


这是正确的/预期的吗?这是否意味着我们应该避免在这里使用流监督器?

官方API 文档

如果正在使用 Kafka Lagom 消息代理模块,则默认情况下,当发生故障时流会自动重新启动。

因此,无需添加您自己的supervisionStrategy来管理错误处理。并且默认情况下将重新启动流,您不应考虑“跳过”完成消息。


不均匀的拉链会导致什么样的行为?

正是因为如此,文档说:

这意味着流不得过滤或收集消息的子集

它可能会低估错误的偏移量。并且在重新启动时,您可能会从提交的较低偏移量中以重播的形式获得已处理的消息。


在通过 Lagom 消息代理 API 使用来自 Kafka 的消息时,推荐的错误处理方法是什么?将故障映射/恢复到 Done 是否正确?

Lagom 通过删除导致错误的消息并重新启动流来处理异常。并且映射/恢复失败到完成不会对此有任何改变。

您可以考虑,如果您以后需要访问这些消息,也可以使用Try {}例如,即不抛出异常,并通过将错误消息发送到不同的主题来收集它们,这将使您有机会监控当条件正确时导致错误的错误和重播消息的数量,即错误已修复。


推荐阅读