scala - 由于缺少 java.security.auth.login.config,无法创建 KafkaProducer
问题描述
我正在尝试KafkaProducer
使用该akka-stream-kafka
库创建一个。
基础设施
使用 docker-compose,仅显示 kafka 和 zookeeper 实例。
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka:5.1.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
我可以报告我已经能够在 CLI 和 REST API 上使用kafka-console-consumer
,连接到集群,没有任何问题。kafka-console-producer
配置
这是我的类型安全配置,我尝试使用纯文本连接来连接到客户端。我正在尝试在没有任何身份验证的情况下连接到 kafka 代理。
bootstrap.servers="localhost:29092"
acks = "all"
retries = 2
batch.size = 16384
linger.ms = 1
buffer.memory = 33554432
max.block.ms = 5000
代码
val config = ConfigFactory.load().getConfig("akka.kafka.producer")
val stringSerializer = new StringSerializer()
ProducerSettings[String, String](config, stringSerializer, stringSerializer)
// some code has been omitted here.
Producer.plainSink(producerSettings)
例外
这是我收到的堆栈跟踪,它告诉我没有jaas config
:
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:456)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:226)
at akka.kafka.scaladsl.Producer$.$anonfun$flexiFlow$1(Producer.scala:155)
at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:41)
at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:33)
at akka.stream.stage.GraphStage.createLogicAndMaterializedValue(GraphStage.scala:93)
at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:630)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:450)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:415)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98)
at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:119)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:413)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:226)
at akka.kafka.scaladsl.Producer$.$anonfun$flexiFlow$1(Producer.scala:155)
如何在本地运行时不使用身份验证连接到 Kafka 集群?
我尝试将KAFKA_OPTS
环境变量作为环境变量添加到 docker-compose 中的 kafka 服务,并将其添加到application.conf
.
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username='confluent' password='confluent-secret';"
在前一种情况下,一些关联的服务,例如 kafka-rest API 失败。在后一种情况下,我得到以下异常:
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:456)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:226)
at akka.kafka.scaladsl.Producer$.$anonfun$flexiFlow$1(Producer.scala:155)
at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:41)
at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:33)
at akka.stream.stage.GraphStage.createLogicAndMaterializedValue(GraphStage.scala:93)
at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:630)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:450)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:415)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:125)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:413)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:226)
at akka.kafka.scaladsl.Producer$.$anonfun$flexiFlow$1(Producer.scala:155)
at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:41)
at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:33)
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:297)
at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:87)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:52)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:89)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:114)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:413)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
解决方案
推荐阅读
- c++ - 什么时候应该创建自己的异常类型?
- python - 带约束的简单线性回归
- django - Django Model Formset 忽略 is_valid() 上的 Form Instance
- php - 对通过 while 循环获取的 PHP 数组进行排序
- c# - 当 wpf 数据网格中的 itemssource 为空时显示一个空行
- android - RxJava2 可观察问题
- c# - 使用 C# 在 Windows 10 上控制左右扬声器平衡级别
- php - 查询中的 CodeIgniter 计数为额外列
- android - 播放器播放时在状态 8 中调用 MediaPlayer 暂停
- laravel-5 - 如何使用 Laravel Medialibrary 检索相关模型的媒体?