首页 > 解决方案 > 由于缺少 java.security.auth.login.config,无法创建 KafkaProducer

问题描述

我正在尝试KafkaProducer使用该akka-stream-kafka库创建一个。

基础设施

使用 docker-compose,仅显示 kafka 和 zookeeper 实例。

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.1.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:5.1.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

我可以报告我已经能够在 CLI 和 REST API 上使用kafka-console-consumer,连接到集群,没有任何问题。kafka-console-producer

配置

这是我的类型安全配置,我尝试使用纯文本连接来连接到客户端。我正在尝试在没有任何身份验证的情况下连接到 kafka 代理。

bootstrap.servers="localhost:29092"
acks = "all"
retries = 2
batch.size = 16384
linger.ms = 1
buffer.memory = 33554432
max.block.ms = 5000

代码

val config = ConfigFactory.load().getConfig("akka.kafka.producer")
val stringSerializer = new StringSerializer()
ProducerSettings[String, String](config, stringSerializer, stringSerializer)
// some code has been omitted here.
Producer.plainSink(producerSettings)

例外

这是我收到的堆栈跟踪,它告诉我没有jaas config

org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:456)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
    at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:226)
    at akka.kafka.scaladsl.Producer$.$anonfun$flexiFlow$1(Producer.scala:155)
    at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:41)
    at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:33)
    at akka.stream.stage.GraphStage.createLogicAndMaterializedValue(GraphStage.scala:93)
    at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:630)
    at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:450)
    at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:415)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
    at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
    at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98)
    at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:119)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:413)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
    at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:226)
    at akka.kafka.scaladsl.Producer$.$anonfun$flexiFlow$1(Producer.scala:155)

如何在本地运行时不使用身份验证连接到 Kafka 集群?


我尝试将KAFKA_OPTS环境变量作为环境变量添加到 docker-compose 中的 kafka 服务,并将其添加到application.conf.

sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username='confluent' password='confluent-secret';"

在前一种情况下,一些关联的服务,例如 kafka-rest API 失败。在后一种情况下,我得到以下异常:

org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:456)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
    at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:226)
    at akka.kafka.scaladsl.Producer$.$anonfun$flexiFlow$1(Producer.scala:155)
    at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:41)
    at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:33)
    at akka.stream.stage.GraphStage.createLogicAndMaterializedValue(GraphStage.scala:93)
    at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:630)
    at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:450)
    at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:415)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:125)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:413)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
    at akka.kafka.ProducerSettings.createKafkaProducer(ProducerSettings.scala:226)
    at akka.kafka.scaladsl.Producer$.$anonfun$flexiFlow$1(Producer.scala:155)
    at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:41)
    at akka.kafka.internal.ProducerStage$DefaultProducerStage.createLogic(ProducerStage.scala:33)
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
    at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:297)
    at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:87)
    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:52)
    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:89)
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:114)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:413)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)

标签: scalaapache-kafkaakkaakka-stream

解决方案


推荐阅读