首页 > 解决方案 > 如何在 kafka-console-producer 中启用幂等性?

问题描述

我正在尝试在 kafka-console-producer 上启用“幂等”选项。参考以下链接:

使用的命令:

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list node1.com:6667 --topic my_topic --security-protocol SASL_PLAINTEXT --producer-property acks=all --producer-property retries=Integer.MAX_VALUE --producer-property enable.idempotence=true

观察到以下异常:

org.apache.kafka.common.KafkaException: 无法在 org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer) 的 org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:433) 构建 kafka 生产者.java:291) at kafka.producer.NewShinyProducer.(BaseProducer.scala:40) at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:50) at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala) : org.apache.kafka.common.config.ConfigException: 必须将 acks 设置为 all 才能使用幂等生产者。否则我们不能保证幂等性。在 org.apache.kafka.clients.producer.KafkaProducer.configureAcks(KafkaProducer.java:510) 在 org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:375)

虽然 acks 已经设置为“全部”,但我们观察到了这个异常。我错过了什么?

以下是使用的版本:


更新

--request-required-acks -1我可以使用回复中建议的选项在控制台生产者上启用幂等性。

但是,我得到 ClusterAuthorizationException。

bash$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list borker1:6667 --topic my_topic --producer-property enable.idempotence=true  --request-required-acks -1  --security-protocol SASL_PLAINTEXT --property "parse.key=true" --property "key.separator=:"
>key1:value1
>[2018-12-26 04:00:56,074] ERROR [Producer clientId=console-producer] Aborting producer batches due to fatal error (org.apache.kafka.clients.producer.internals.Sender)
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
[2018-12-26 04:00:56,080] ERROR Error when sending message to topic orm_c1_prv_non_sepa_ci with key: 4 bytes, value: 6 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.

此异常仅在启用幂等选项时发生。没有此选项也可以生成消息。

bash$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list broker1:6667 --topic my_topic --security-protocol SASL_PLAINTEXT --property "parse.key=true" --property "key.separator=:"
>key1:value1
>key2:value2

我错过了什么?

标签: apache-kafkakafka-producer-api

解决方案


您不能为 ConsoleProducer 设置acks通过producer-property。改为使用request-required-acks,如下所示:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer-property enable.idempotence=true --request-required-acks -1


推荐阅读