apache-kafka - 如何在 kafka-console-producer 中启用幂等性?
问题描述
我正在尝试在 kafka-console-producer 上启用“幂等”选项。参考以下链接:
- https://gerardnico.com/dit/kafka/producer#idempotent
- https://gerardnico.com/dit/kafka/kafka-console-producer
使用的命令:
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list node1.com:6667 --topic my_topic --security-protocol SASL_PLAINTEXT --producer-property acks=all --producer-property retries=Integer.MAX_VALUE --producer-property enable.idempotence=true
观察到以下异常:
org.apache.kafka.common.KafkaException: 无法在 org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer) 的 org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:433) 构建 kafka 生产者.java:291) at kafka.producer.NewShinyProducer.(BaseProducer.scala:40) at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:50) at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala) : org.apache.kafka.common.config.ConfigException: 必须将 acks 设置为 all 才能使用幂等生产者。否则我们不能保证幂等性。在 org.apache.kafka.clients.producer.KafkaProducer.configureAcks(KafkaProducer.java:510) 在 org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:375)
虽然 acks 已经设置为“全部”,但我们观察到了这个异常。我错过了什么?
以下是使用的版本:
- 经纪人 - 1.0.0
- client - 与 broker 1.0.0 捆绑的控制台生产者
更新
--request-required-acks -1
我可以使用回复中建议的选项在控制台生产者上启用幂等性。
但是,我得到 ClusterAuthorizationException。
bash$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list borker1:6667 --topic my_topic --producer-property enable.idempotence=true --request-required-acks -1 --security-protocol SASL_PLAINTEXT --property "parse.key=true" --property "key.separator=:"
>key1:value1
>[2018-12-26 04:00:56,074] ERROR [Producer clientId=console-producer] Aborting producer batches due to fatal error (org.apache.kafka.clients.producer.internals.Sender)
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
[2018-12-26 04:00:56,080] ERROR Error when sending message to topic orm_c1_prv_non_sepa_ci with key: 4 bytes, value: 6 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
此异常仅在启用幂等选项时发生。没有此选项也可以生成消息。
bash$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list broker1:6667 --topic my_topic --security-protocol SASL_PLAINTEXT --property "parse.key=true" --property "key.separator=:"
>key1:value1
>key2:value2
我错过了什么?
解决方案
您不能为 ConsoleProducer 设置acks
通过producer-property
。改为使用request-required-acks
,如下所示:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer-property enable.idempotence=true --request-required-acks -1
推荐阅读
- reactjs - 我应该在反应选择器中检查数组是否为空
- docker - 循环与 Falco 崩溃
- java - 如何使用 selenium-java 点击 href 链接
- java - 我无法在 json 中获取对象的数据
- c# - 在不打开新窗口的情况下导航到另一个 wpf
- css - Angular:如何在 ng-template 中显示模态对话框(图表)
- r - 你如何在R中将每隔一行移到它之前的行?
- python - Python:KeyError:<__main__.State object at 0x7fd63d33be90> 但密钥存在
- excel - 循环浏览文件夹并将excel文件数据拉入主工作表并在每次输出完成后保存副本
- c - 您可以在 C 中使用实际日期和时间设置条件吗?