首页 > 解决方案 > 有什么方法可以暂停kafka流一段时间然后再恢复?

问题描述

我们有一个要求,我们使用 Kafka Streams 从 Kafka 主题中读取数据,然后通过会话池通过网络发送数据。但是,有时,网络调用有点慢,我们需要经常暂停流,确保我们没有超载网络。目前,我们将数据捕获到流中并将其加载到执行器服务,然后通过会话池通过网络发送。

如果 executor 服务中的数据过多,我们需要暂停流一段时间,然后在 executor 服务上的积压清除后恢复它。为了实现这种暂停机制,我们目前正在关闭流并在清除积压后重新开始。

有什么方法可以暂停 kafka 流吗?

标签: javaapache-kafkaapache-kafka-streams

解决方案


如果我对您的理解正确,您无需做任何特别的事情。您正在谈论“背压”,Kafka Streams 可以开箱即用地处理它。

可以做的是将这些数据放入一个最大大小的队列中,并使用这个队列加载到执行器服务中。每当队列达到某个阈值时,有两种方法:

  • 如果您将数据放入队列的调用被阻塞且没有超时,那么您无需再做任何事情。只需等到系统重新联机,您的呼叫返回,处理将恢复。
  • 如果您将数据放入队列的调用因超时而阻塞,只需发出查找以检查队列的大小。重复此操作,直到系统重新联机并且您的呼叫成功。

唯一需要注意的是,只要您的 Streams 应用程序阻塞,内部使用的 Kafka 消费者客户端就不会向 Kafka 发送任何心跳,并且可能会超时。因此,您需要将超时配置参数设置为高于外部系统的预期最大停机时间。

另一种方法是使用 Kafka-streams 中可用的处理器 API,但通常不推荐使用这种模式。

让我知道它是否有帮助!


推荐阅读