multithreading - 我们可以在高流量的多线程环境中使用多个 Kafka 生产者吗?
问题描述
我们有一个前端层,它只接收消息并写入 Kafka 主题以进行后端处理。我们以非常高的速度发送消息;我们每天处理 10 亿条消息。我们有一个线程池,它接受消息并写入 Kafka 生产者实例。在这里,我只创建了一个在多个线程之间共享的生产者(单个实例)。
最近,我观察到90%的线程都处于阻塞状态。我发现 Kafka 正在按顺序发送数据。producer.send()
Kafka Java驱动的方法中有一个同步块:
def send(messages: KeyedMessage[K,V]*) {
**lock synchronized {**
if (hasShutdown.get)
throw new ProducerClosedException
recordStats(messages)
sync match {
case true => eventHandler.handle(messages)
case false => asyncSend(messages)
}
}
}
文档说我们不需要创建多个生产者实例;一个实例可以在多线程环境中共享。但是我们怎么能做到呢?还是我们应该更好地创建一个生产者实例池?
解决方案
建议跨线程共享发布者客户端的原因是它可以带来更好的批处理,因为消息是在分区级别进行批处理的。更好的批处理会带来更好的压缩(如果启用)以及更好的吞吐量。您可以考虑调整缓冲内存和 linger.ms 等参数以及批量大小以优化吞吐量。一个这样就完成了,然后你可以考虑添加多个生产者。此外,如果主题的传入率非常高,请考虑增加主题的分区数。
推荐阅读
- aws-cloudwatch-log-insights - 根据日志中两个或字符串的可用性过滤云监视日志见解并使其不区分大小写
- sql - 两个间隔在 SQL-Server 中的某个点重合
- python - 嵌套循环如何平方大 O?
- docker - Docker 容器使用大型共享驱动器的最佳方式
- php - 在 Magento2 中创建多个自定义运营商
- c - C Socket 未连接并返回退出值 255
- php - Mautic - 在 REST API 中使用表达式
- github - 是否可以在运行 GitHub 操作时更新过期的密钥,而不会使当前运行的操作失败?
- python - Python Networkx:用气泡/圆圈可视化边缘权重
- amazon-ec2 - 我们可以在 AWS EC2 实例上安装 Redhat Code Ready 容器吗?