首页 > 解决方案 > 尝试在本机模式下使用 Quarkus 访问 Kafka 时出错

问题描述

我尝试了一个简单的示例代码来测试使用 smallrye-reactive-messaging-kafka 从 Quarkus 2.2.2 访问“kerberized”Kafka:

package org.acme;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MyTopicConsumer {

    @Incoming("in") 
    public void consume(ConsumerRecord<String, String> record) {
        System.out.println("read from Kafka : " + record.value() ) ;
    }

}

Kafkas 落后于 Kerberos,所以我使用了这样的 application.properties:

quarkus.ssl.native=true
quarkus.native.enable-all-security-services=true
mp.messaging.incoming.in.group.id=my-group
mp.messaging.incoming.in.auto.commit.interval.ms=1000
mp.messaging.incoming.in.security.protocol=SASL_SSL
mp.messaging.incoming.in.sasl.kerberos.service.name=kafka
mp.messaging.incoming.in.sasl.mechanism=GSSAPI
mp.messaging.incoming.in.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule "required" doNotPrompt=true useKeyTab=true storeKey=true serviceName="kafka" keyTab="<keytab>" principal="<principal>" useTicketCache=false;
mp.messaging.incoming.in.ssl.truststore.location=<location>
mp.messaging.incoming.in.ssl.truststore.password=<password>
mp.messaging.incoming.in.connector=smallrye-kafka
mp.messaging.incoming.in.topic=<topic>
mp.messaging.incoming.in.auto.offset.reset=earliest
mp.messaging.incoming.in.enable.auto.commit=false
mp.messaging.incoming.in.bootstrap.servers=<list of servers>

它在 jvm 模式下运行良好,但在本机模式(graalvm-ce-java11-21.2.0)下失败并出现以下错误:

ERROR [io.sma.rea.mes.provider] (main) SRMSG00230: Unable to create the publisher or subscriber during initialization: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)
        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.<init>(ReactiveKafkaConsumer.java:80)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.<init>(KafkaSource.java:85)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector.getPublisherBuilder(KafkaConnector.java:182)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getPublisherBuilder(KafkaConnector_ClientProxy.zig:159)
        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.createPublisherBuilder(ConfiguredChannelFactory.java:190)
        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:153)
        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:125)
        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory_ClientProxy.initialize(ConfiguredChannelFactory_ClientProxy.zig:189)
        at java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
        at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
        at io.smallrye.reactive.messaging.extension.MediatorManager.start(MediatorManager.java:189)
        at io.smallrye.reactive.messaging.extension.MediatorManager_ClientProxy.start(MediatorManager_ClientProxy.zig:220)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:41)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_4e8937813d9e8faff65c3c07f88fa96615b70e70.notify(SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_4e8937813d9e8faff65c3c07f88fa96615b70e70.zig:111)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:300)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:282)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:70)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:128)
        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:97)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(LifecycleEventsBuildStep$startupEvent1144526294.zig:87)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(LifecycleEventsBuildStep$startupEvent1144526294.zig:40)
        at io.quarkus.runner.ApplicationImpl.doStart(ApplicationImpl.zig:623)
        at io.quarkus.runtime.Application.start(Application.java:101)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:101)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:66)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:42)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:119)
        at io.quarkus.runner.GeneratedMain.main(GeneratedMain.zig:29)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Could not find a public no-argument constructor for org.apache.kafka.common.security.kerberos.KerberosLogin
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
        at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:737)
        ... 30 more

我尝试了一些帖子建议的一些更改,但没有效果。谁能建议如何解决或解决此问题?谢谢

标签: apache-kafkakerberosquarkus

解决方案


It is seems that the code doesn't contain the constructor org.apache.kafka.common.security.kerberos.KerberosLogin. Have you try to add the class as describe in https://quarkus.io/guides/writing-native-applications-tips#registering-for-reflection

Maybe you need to add line in your configuration file

quarkus.native.additional-build-args=-H:ReflectionConfigurationFiles=reflection-config.json

And add the class org.apache.kafka.common.security.kerberos.KerberosLogin in reflection-config.json as describe here https://quarkus.io/guides/writing-native-applications-tips#using-a-configuration-file


推荐阅读