java - Kafka Confluent InvalidConfigurationException:未经授权;错误代码:401
问题描述
我正在使用来自 kafka Confluent 模式注册表的 SpecificAvroSerde 读取模式。但我在下面收到此错误:
org.apache.kafka.common.errors.InvalidConfigurationException: Unauthorized; error code: 401
[TestStreamProcess-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [TestStreamProcess] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:82)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:957)
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1009)
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:907)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Unauthorized; error code: 401
我正在从 config.properties 文件加载成功连接到 Schema Registry 所需的配置:
schema.registry.url=<confluent Schema Registry URL>
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=<Schema Registry API Key>:<schema Registry Secret Key>
已经设置了 serdesConfig 以及必要的导入:
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.*;
public static SpecificAvroSerde<TestStream> getTestStreamSerde(Properties props) {
SpecificAvroSerde<TestStream> testStreamSerde= new SpecificAvroSerde<>();
testStreamSerde.configure(getSerdeProps(props), false);
return testStreamSerde;
}
protected static Map<String, String> getSerdeProps(Properties props) {
final HashMap<String, String> map = new HashMap<>();
final String schemaUrlConfig = props.getProperty(SCHEMA_REGISTRY_URL_CONFIG);
map.put(SCHEMA_REGISTRY_URL_CONFIG, ofNullable(schemaUrlConfig).orElse(""));
}
final KStream<String, TestStream> testStream = builder.stream("input-topic",
Consumed.with(String(), getTestStreamSerde(props.getProperties())));
Schema Registry 工件已经加载到 pom.xml 中:
<plugin>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-maven-plugin</artifactId>
<version>${confluent.version}</version>
</plugin>
您能帮我确定我的配置中缺少什么吗?
解决方案
推荐阅读
- javascript - XGrid 材质 ui 自定义行颜色
- javascript - 获取 Aspx 页面中的用户控件元素
- django-models - Django Rest Framework 中自定义用户的自定义身份验证令牌
- node.js - 如何在节点 js 应用程序的 heroku procfile 中指定更改目录
- c# - EF Core MySQL 在使用 DateTimeOffset 时使用不一致的时区
- xampp - 你能告诉我这些地址在 XAMPP 中的用途吗?
- javascript - 在 Nodejs (Emscripten) 中调用和包装 C 函数
- android - 登录页面显示在设计预览中但不在模拟器中
- asp.net-core - System.InvalidOperationException: '无法修改响应标头,因为响应已经开始。'
- google-cloud-platform - 如何将 SharePoint 文件迁移到 Google Cloud Storage?