首页 > 解决方案 > 使用 Payara Cloud 创建 Kafka 消费者

问题描述

package com.maersk.inlandmoves.ejb;
    import javax.ejb.ActivationConfigProperty;
    import javax.ejb.MessageDriven;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import fish.payara.cloud.connectors.kafka.api.KafkaListener;
    import fish.payara.cloud.connectors.kafka.api.OnRecord;
    @MessageDriven(activationConfig = {
            @ActivationConfigProperty(propertyName = "clientId", propertyValue = "sp-document-processor"),
            @ActivationConfigProperty(propertyName = "groupIdConfig", propertyValue = "MSK.inland.AMPSEM.consumerGroup.v1"),
            @ActivationConfigProperty(propertyName = "topics", propertyValue = "MSK.Operation.OperationalTransportOrder.topic.confidential.any.v2"),
            @ActivationConfigProperty(propertyName = "bootstrapServersConfig", propertyValue = "rp-cde-rpx-stage.crb.apmoller.net:9093"),   
            @ActivationConfigProperty(propertyName = "autoCommitInterval", propertyValue = "100"),   
            @ActivationConfigProperty(propertyName = "retryBackoff", propertyValue = "1000"),   
            @ActivationConfigProperty(propertyName = "keyDeserializer", propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"),   
            @ActivationConfigProperty(propertyName = "valueDeserializer", propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"),   
            @ActivationConfigProperty(propertyName = "pollInterval", propertyValue = "1000"),   
        })
        public class KafkaMDB implements KafkaListener {
             
            @OnRecord( topics={"MSK.Operation.OperationalTransportOrder.topic.confidential.any.v2"})
            public void getMessageTest(ConsumerRecord record) {
                System.out.println("Got record on topic testing " + record);
            }
        }

我正在尝试使用上述 MDB 方法创建一个 Kafka 消费者。但我似乎无法更改激活属性中设置的 ConsumerConfig 属性。我看到了我没有设置的 ProducerConfig 属性(可能是 rar 的一部分)。另外我不知道如何设置 ssl 和 sasl 相关属性。我已经部署了 kafka.rar 并且需要 pom 依赖项。有人可以帮忙吗。

标签: javakafka-consumer-apipayara

解决方案


推荐阅读