java - 使用 Payara Cloud 创建 Kafka 消费者
问题描述
package com.maersk.inlandmoves.ejb;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import fish.payara.cloud.connectors.kafka.api.KafkaListener;
import fish.payara.cloud.connectors.kafka.api.OnRecord;
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "clientId", propertyValue = "sp-document-processor"),
@ActivationConfigProperty(propertyName = "groupIdConfig", propertyValue = "MSK.inland.AMPSEM.consumerGroup.v1"),
@ActivationConfigProperty(propertyName = "topics", propertyValue = "MSK.Operation.OperationalTransportOrder.topic.confidential.any.v2"),
@ActivationConfigProperty(propertyName = "bootstrapServersConfig", propertyValue = "rp-cde-rpx-stage.crb.apmoller.net:9093"),
@ActivationConfigProperty(propertyName = "autoCommitInterval", propertyValue = "100"),
@ActivationConfigProperty(propertyName = "retryBackoff", propertyValue = "1000"),
@ActivationConfigProperty(propertyName = "keyDeserializer", propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"),
@ActivationConfigProperty(propertyName = "valueDeserializer", propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"),
@ActivationConfigProperty(propertyName = "pollInterval", propertyValue = "1000"),
})
public class KafkaMDB implements KafkaListener {
@OnRecord( topics={"MSK.Operation.OperationalTransportOrder.topic.confidential.any.v2"})
public void getMessageTest(ConsumerRecord record) {
System.out.println("Got record on topic testing " + record);
}
}
我正在尝试使用上述 MDB 方法创建一个 Kafka 消费者。但我似乎无法更改激活属性中设置的 ConsumerConfig 属性。我看到了我没有设置的 ProducerConfig 属性(可能是 rar 的一部分)。另外我不知道如何设置 ssl 和 sasl 相关属性。我已经部署了 kafka.rar 并且需要 pom 依赖项。有人可以帮忙吗。
解决方案
推荐阅读
- python - 如何在django中自定义一个onetomany关系的序列化器数据
- node.js - ES6/TypeScript 动态导入 - 由于静态导入(Nodejs CLI)而导致启动时间缓慢
- python - 如何生成满足某些条件的三个随机整数?
- reactjs - react-hook-form 的 DefaultValues 未将值设置为 React JS 中的输入字段
- firebase - React Native:使用哪个 firebase 存储 npm 包?
- javascript - Suitescript2.0 中的 Netsuite record.save 返回 200OK 但未创建记录(但同样适用于 SuiteScript1.0)
- apache-kafka - 为什么消费者获取消息时 Kafka 主题队列不为空?
- python - scapy.arping() 或 netdiscover 未在 kali linux 2020.1 中检测到任何连接到 wifi 的客户端
- java - 如何从 Main Activity 更改另一个 xml 文件中 TextView 的重力?
- flutter - 颤振显示分割很长的文本并通过分割显示?