java - 有什么方法可以暂停kafka流一段时间然后再恢复?
问题描述
我们有一个要求,我们使用 Kafka Streams 从 Kafka 主题中读取数据,然后通过会话池通过网络发送数据。但是,有时,网络调用有点慢,我们需要经常暂停流,确保我们没有超载网络。目前,我们将数据捕获到流中并将其加载到执行器服务,然后通过会话池通过网络发送。
如果 executor 服务中的数据过多,我们需要暂停流一段时间,然后在 executor 服务上的积压清除后恢复它。为了实现这种暂停机制,我们目前正在关闭流并在清除积压后重新开始。
有什么方法可以暂停 kafka 流吗?
解决方案
如果我对您的理解正确,您无需做任何特别的事情。您正在谈论“背压”,Kafka Streams 可以开箱即用地处理它。
可以做的是将这些数据放入一个最大大小的队列中,并使用这个队列加载到执行器服务中。每当队列达到某个阈值时,有两种方法:
- 如果您将数据放入队列的调用被阻塞且没有超时,那么您无需再做任何事情。只需等到系统重新联机,您的呼叫返回,处理将恢复。
- 如果您将数据放入队列的调用因超时而阻塞,只需发出查找以检查队列的大小。重复此操作,直到系统重新联机并且您的呼叫成功。
唯一需要注意的是,只要您的 Streams 应用程序阻塞,内部使用的 Kafka 消费者客户端就不会向 Kafka 发送任何心跳,并且可能会超时。因此,您需要将超时配置参数设置为高于外部系统的预期最大停机时间。
另一种方法是使用 Kafka-streams 中可用的处理器 API,但通常不推荐使用这种模式。
让我知道它是否有帮助!
推荐阅读
- python - 如何使用正则表达式查找字符串是否有 2 个特定字符,如果有,则将其删除?
- python - 是否可以在单个进程中运行烧瓶?(解决 ipdb 和 Docker ttys 的明显问题)
- javascript - 如何使用 Fuse.js 和 Redis 在 MySQL 表中进行全文搜索?
- machine-learning - 有没有办法在多分类问题中做一个“中性”类?
- azure - Azure 数据库中外部表联接的替代方法
- .net - 静默 .NET Wix 安装仍要求用户确认已安装的软件包
- c# - 在 SQL Server 中将经纬度转换为 UTM X 和 UTM Y 坐标的函数
- javascript - 有没有办法在点击按钮播放时自动更正坐标?
- swift - 什么导致 Xcode gRPC-C++ 编译错误?
- apache - Letsencrypt 虚拟主机与 apache 上的 htaccess