apache-kafka - 尝试在本机模式下使用 Quarkus 访问 Kafka 时出错
问题描述
我尝试了一个简单的示例代码来测试使用 smallrye-reactive-messaging-kafka 从 Quarkus 2.2.2 访问“kerberized”Kafka:
package org.acme;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MyTopicConsumer {
@Incoming("in")
public void consume(ConsumerRecord<String, String> record) {
System.out.println("read from Kafka : " + record.value() ) ;
}
}
Kafkas 落后于 Kerberos,所以我使用了这样的 application.properties:
quarkus.ssl.native=true
quarkus.native.enable-all-security-services=true
mp.messaging.incoming.in.group.id=my-group
mp.messaging.incoming.in.auto.commit.interval.ms=1000
mp.messaging.incoming.in.security.protocol=SASL_SSL
mp.messaging.incoming.in.sasl.kerberos.service.name=kafka
mp.messaging.incoming.in.sasl.mechanism=GSSAPI
mp.messaging.incoming.in.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule "required" doNotPrompt=true useKeyTab=true storeKey=true serviceName="kafka" keyTab="<keytab>" principal="<principal>" useTicketCache=false;
mp.messaging.incoming.in.ssl.truststore.location=<location>
mp.messaging.incoming.in.ssl.truststore.password=<password>
mp.messaging.incoming.in.connector=smallrye-kafka
mp.messaging.incoming.in.topic=<topic>
mp.messaging.incoming.in.auto.offset.reset=earliest
mp.messaging.incoming.in.enable.auto.commit=false
mp.messaging.incoming.in.bootstrap.servers=<list of servers>
它在 jvm 模式下运行良好,但在本机模式(graalvm-ce-java11-21.2.0)下失败并出现以下错误:
ERROR [io.sma.rea.mes.provider] (main) SRMSG00230: Unable to create the publisher or subscriber during initialization: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)
at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.<init>(ReactiveKafkaConsumer.java:80)
at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.<init>(KafkaSource.java:85)
at io.smallrye.reactive.messaging.kafka.KafkaConnector.getPublisherBuilder(KafkaConnector.java:182)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getPublisherBuilder(KafkaConnector_ClientProxy.zig:159)
at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.createPublisherBuilder(ConfiguredChannelFactory.java:190)
at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:153)
at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:125)
at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory_ClientProxy.initialize(ConfiguredChannelFactory_ClientProxy.zig:189)
at java.util.Iterator.forEachRemaining(Iterator.java:133)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
at io.smallrye.reactive.messaging.extension.MediatorManager.start(MediatorManager.java:189)
at io.smallrye.reactive.messaging.extension.MediatorManager_ClientProxy.start(MediatorManager_ClientProxy.zig:220)
at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:41)
at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_4e8937813d9e8faff65c3c07f88fa96615b70e70.notify(SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_4e8937813d9e8faff65c3c07f88fa96615b70e70.zig:111)
at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:300)
at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:282)
at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:70)
at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:128)
at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:97)
at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(LifecycleEventsBuildStep$startupEvent1144526294.zig:87)
at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(LifecycleEventsBuildStep$startupEvent1144526294.zig:40)
at io.quarkus.runner.ApplicationImpl.doStart(ApplicationImpl.zig:623)
at io.quarkus.runtime.Application.start(Application.java:101)
at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:101)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:66)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:42)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:119)
at io.quarkus.runner.GeneratedMain.main(GeneratedMain.zig:29)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Could not find a public no-argument constructor for org.apache.kafka.common.security.kerberos.KerberosLogin
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:737)
... 30 more
我尝试了一些帖子建议的一些更改,但没有效果。谁能建议如何解决或解决此问题?谢谢
解决方案
It is seems that the code doesn't contain the constructor org.apache.kafka.common.security.kerberos.KerberosLogin. Have you try to add the class as describe in https://quarkus.io/guides/writing-native-applications-tips#registering-for-reflection
Maybe you need to add line in your configuration file
quarkus.native.additional-build-args=-H:ReflectionConfigurationFiles=reflection-config.json
And add the class org.apache.kafka.common.security.kerberos.KerberosLogin in reflection-config.json as describe here https://quarkus.io/guides/writing-native-applications-tips#using-a-configuration-file
推荐阅读
- android - savedInstanceState 在第一次应用程序关闭后为空,但在第二次关闭后不为空
- html - How to apply to @font-face?
- qt - 在使用 LLVM 的 Windows 上的 QtCreator 编译中没有规则来制作目标
- php - 如果记录不存在并从数据库中获取数据,则需要插入 sql
- c++ - string::find_first_of 的实现
- spring-boot - Axon 状态存储聚合测试 IllegalStateException
- jenkins - Jenkins kubernetes-plugin 不理解脚本化管道中的环境变量
- android - Flutter 模拟器错误 | 无法找到具有路径的平台 SDK:平台;android-R
- spring-boot - 我应该在哪里使用 oauth2.0 放置访问令牌?
- bash - 如果第一个字段是带日期的行,如何合并 2 个文件