首页 > 解决方案 > 使用事务 PublishKafka_2_0

问题描述

我正在使用 Publish Kafka 处理器的下一个配置:

在此处输入图像描述 编辑了 kafka 和 zookeeper 配置:

zookeeper.properties

authProvider.sasl=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
zookeeper_jaas.conf

Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
   user_super="zookeeper"
   user_admin="admin-secret";
};
server.properties

security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=false
auto.create.topics.enable=false
listeners=SASL_PLAINTEXT://172.23.199.20:9092
advertised.listeners=SASL_PLAINTEXT://172.23.199.20:9092
zookeeper.set.acl=true
super.users=User:admin
kafka_server_jaas.conf

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin-secret";
};

身份验证工作正常。

启用授权

添加管理员:

./bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret]' --entity-type users --entity-name admin

添加用户:

./bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=123456]' --entity-type users --entity-name pkalita

添加权限:

./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:admin --producer --topic test
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:pkalita --producer --topic test

在这些操作之后,处理器 PublishKafka 可以与主要管理员一起正常工作,但如果选择用户 pkalita,则会引发异常:

org.apache.kafka.common.errors.TransactionalIdAuthorizationException: Transactional Id authorization failed

在此处输入图像描述 如果仅设置 Use transactions - false,则处理器工作

我做错了什么?

upd:我尝试使用 spring Kafka 生产者与用户 pkalita 发送消息 - 消息已成功发布在主题上

标签: apache-kafkaapache-nifi

解决方案


设置 ACL 时需要指定事务 ID。

文档

事务生产者使用的主体必须被授权在已配置的transactional.id.

bin/kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \
 --add --allow-principal User:Alice \
 --producer --topic test-topic --transactional-id test-txn

您还可以使用--transactional-id *来允许任何事务 ID。


推荐阅读