首页 > 解决方案 > Kafka Confluent InvalidConfigurationException:未经授权;错误代码:401

问题描述

我正在使用来自 kafka Confluent 模式注册表的 SpecificAvroSerde 读取模式。但我在下面收到此错误:

org.apache.kafka.common.errors.InvalidConfigurationException: Unauthorized; error code: 401
[TestStreamProcess-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [TestStreamProcess] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. 
org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:82)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:957)
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1009)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:907)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Unauthorized; error code: 401

我正在从 config.properties 文件加载成功连接到 Schema Registry 所需的配置:

schema.registry.url=<confluent Schema Registry URL>
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=<Schema Registry API Key>:<schema Registry Secret Key>

已经设置了 serdesConfig 以及必要的导入:

import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.*;

    public static SpecificAvroSerde<TestStream> getTestStreamSerde(Properties props) {
        SpecificAvroSerde<TestStream> testStreamSerde= new SpecificAvroSerde<>();
        testStreamSerde.configure(getSerdeProps(props), false);
        return testStreamSerde;
    }

  protected static Map<String, String> getSerdeProps(Properties props) {
        final HashMap<String, String> map = new HashMap<>();

        final String schemaUrlConfig = props.getProperty(SCHEMA_REGISTRY_URL_CONFIG);
        map.put(SCHEMA_REGISTRY_URL_CONFIG, ofNullable(schemaUrlConfig).orElse(""));
}

        final KStream<String, TestStream> testStream = builder.stream("input-topic",
                Consumed.with(String(), getTestStreamSerde(props.getProperties())));

Schema Registry 工件已经加载到 pom.xml 中:

<plugin>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-schema-registry-maven-plugin</artifactId>
    <version>${confluent.version}</version>
</plugin>

您能帮我确定我的配置中缺少什么吗?

标签: javaapache-kafkaapache-kafka-streamsconfluent-platformconfluent-schema-registry

解决方案


推荐阅读