python - Confluent Kafka Python 生产者不使用 ACKS= 所有配置进行生产
问题描述
我有一些 python 代码将生成一个 kafka 主题,这适用于默认设置acks=1
,但是当我更改为acks=all
或acks=2
消息没有最终出现在主题中时。该min.insync.replicas
主题的配置设置为2
. 运行代码后没有返回错误信息令人困惑?集群中有 3 个代理。
这是代码
from confluent_kafka import Producer
from kafka.errors import KafkaError
def get_producer_config():
return Producer(get_config())
def get_config():
conf = {
'bootstrap.servers': 'localhost:9092',
'acks': '2',
}
return conf
try:
producer = get_producer_config()
producer.produce('test', 'test message from local app')
producer.flush()
except KafkaError as error:
get_logger().error(str(error))
这源于调试 kafka 生产者 lambda,我们在其中收到错误消息KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
,我正在尝试在本地进行顶级复制,但没有收到错误,但在尝试时注意到了这一点。
谢谢
解决方案
acks 只能有三个值:
acks = 1
:这是默认值,只有领导者将消息写入其日志,但会在不等待所有追随者的完全确认的情况下做出响应。acks = 0
:生产者根本不会等待服务器的任何确认。该记录将立即添加到套接字缓冲区并被视为已发送。acks =all
:这意味着领导者将等待完整的同步副本集来确认记录。这保证了只要至少一个同步副本保持活动状态,记录就不会丢失。这是最有力的保证。
此外,在您的其中指定一个有效值,min.insyn.replica
其中应包含 no。包括领导者在内的副本。
例如,如果您的复制因子为 4,并且您的 min.insync.replica = 3,则它是 acks=all 的有效配置。
推荐阅读
- vue.js - 从 vuejs 中的类触发点击事件
- javascript - How do I run multiple setIntervals efficiently?
- jinja2 - dbt_utils.surrogate_key() 包含除两个以外的所有字段
- php - 如何从数据库中获取电话号码和用户名(PHP,Yii1.1)
- python - DynamoDB 分页根据 MaxItems 分页配置返回元素或不返回任何内容
- javascript - 单击时,将一个项目从一个数组推送到另一个,并根据数组中的项目更改文本
- javascript - 我的个人资料图片在 firebase 上更改了,但在浏览器上没有更改?
- python - 打印具有特定小数位数的浮点值和带有前导空格的 n 数
- python - 我有错误:没有这样的元素:无法找到元素
- css - 如何仅使用 CSS 制作许多径向圆?