python - Kafka生产者flush和poll的区别
问题描述
我们有一个 Kafka 消费者,它将读取消息并执行此操作,然后使用以下脚本再次发布到 Kafka 主题
生产者配置:
{
"bootstrap.servers": "localhost:9092"
}
我还没有配置任何其他配置,例如queue.buffering.max.messages
queue.buffering.max.ms
batch.num.messages
我假设这些都将是配置中的默认值
queue.buffering.max.messages : 100000
queue.buffering.max.ms : 0
batch.num.messages : 10000
我的理解:当内部队列达到 queue.buffering.max.ms 或 batch.num.messages 消息时,消息将在单独的线程中发布到 Kafka。在我的配置中 queue.buffering.max.ms 是 0,所以当我调用生产()时,每条消息都会被发布。如果我错了,请纠正我。
我的制作人片段:
def send(topic, message):
p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
p.flush()
从这篇文章中我了解到,在每条消息之后使用刷新,生产者将成为同步生产者。如果我使用上面的脚本,发布到 Kafka 大约需要 45 毫秒
如果我将上面的代码段更改为
def send(topic, message):
p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
p.poll(0)
有没有什么性能会有所提升?你能澄清我的理解吗?
谢谢
解决方案
推荐阅读
- php - 如果名称已经在数据库中,则更新 MYSQL 表
- c - 第二个线程上的 PTHREAD_MUTEX_RECURSIVE 块
- forms - 在文本框 powershell 表单中显示日期
- r - 将用户输入向量传递给接受字符串的函数
- javascript - 从全局范围调用函数时,如果不使用严格模式,这将不起作用
- sonarqube - 代码覆盖率结果未使用 sonarqube 中的分支进行更新
- python - Python3 如何从 os.walk 获取当前目录和下一个目录?
- java - jasper 在 excel 中显示错误的日期时间
- drupal - 评论类型的 Drupal 主题命名约定
- github - 部署 Jar 文件的最佳方式是什么?